This guide shows you how to programmatically fetch, export, and analyze patient health data from JupyterHealth Exchange.
Introduction¶
Viewing Data (Quick Reference)¶
Fetching Data Programmatically¶
Exporting Data for Analysis¶
Advanced Topics¶
Reference¶
Introduction¶
Prerequisites¶
Authenticated access to JupyterHealth Exchange
Authorization for patient’s study and organization
Patient consent for data types you want to retrieve
HTTP client (curl, Postman, or programming language library)
Understanding Data Access¶
Authorization Model¶
You can only access patient data if:
You’re authorized for the patient’s organization
Patient is enrolled in a study you manage
Patient has consented to share the requested data types
Reference: jupyterhealth-exchange/core/models.py
Data Formats¶
JupyterHealth Exchange stores observations in:
Open mHealth (OMH) format: JSON data conforming to OMH schemas
FHIR wrapper: Observations wrapped in FHIR resources for interoperability
The OMH data is base64-encoded inside the FHIR valueAttachment.data field.
Reference: jupyterhealth-exchange/core/models.py
Viewing Data (Quick Reference)¶
For basic viewing of patient observations, see:
View Patient’s Data: View observations in Console or via REST API
Monitor Study Progress: View study-wide data collection
This guide focuses on programmatic data fetching, export, and analysis workflows.
Fetch via Admin REST API¶
The Admin REST API provides direct access to observation data in JSON format with decoded OMH payloads.
List Observations for a Patient¶
curl "https://your-jhe-instance.com/api/v1/observations?organization_id=1&study_id=10001&patient_id=10001" \
-H "Authorization: Bearer $ACCESS_TOKEN"Query Parameters:
organization_id(required): Your organization IDstudy_id(required): Study IDpatient_id(required): Patient IDlimit(optional): Number of results per page (default: 20)offset(optional): Pagination offset
Response:
{
"count": 250,
"next": "https://your-jhe-instance.com/api/v1/observations?limit=20&offset=20&...",
"previous": null,
"results": [
{
"id": 1,
"subjectPatient": 10001,
"patientNameDisplay": "Smith, John",
"codeableConcept": 1,
"codingSystem": "https://w3id.org/openmhealth",
"codingCode": "omh:blood-glucose:4.0",
"codingText": "Blood Glucose",
"dataSource": 70001,
"dataSourceName": "iHealth",
"status": "final",
"valueAttachmentData": {
"header": {
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"schema_id": {
"namespace": "omh",
"name": "blood-glucose",
"version": "4.0"
},
"creation_date_time": "2024-10-25T21:13:31.438Z",
"source_creation_date_time": "2024-05-02T08:30:00-07:00"
},
"body": {
"effective_time_frame": {
"date_time": "2024-05-02T08:30:00-07:00"
},
"blood_glucose": {
"value": 105,
"unit": "mg/dL"
},
"temporal_relationship_to_meal": "fasting"
}
},
"created": "2024-05-02T15:45:00Z",
"lastUpdated": "2024-05-02T15:45:00Z"
}
]
}Reference: jupyterhealth-exchange/core/views/observation.py
Fetch via FHIR API¶
The FHIR API provides standardized interoperability for health data exchange.
Search Observations¶
curl "https://your-jhe-instance.com/fhir/Observation?patient._has:Group:member:_id=10001&patient=10001&code=https://w3id.org/openmhealth|omh:blood-glucose:4.0" \
-H "Authorization: Bearer $ACCESS_TOKEN"Query Parameters:
patient._has:Group:member:_id: Study ID (required)patient: Patient ID (required if study has multiple patients)patient.identifier: Alternative to patient ID (format:system|value)code: Filter by data type (format:system|code)
At least one patient identifier (patient ID or patient.identifier) is required.
Reference: jupyterhealth-exchange/core/views/observation.py
Response (FHIR Bundle):
{
"resourceType": "Bundle",
"type": "searchset",
"total": 250,
"link": [{
"relation": "next",
"url": "https://your-jhe-instance.com/fhir/Observation?..."
}],
"entry": [
{
"fullUrl": "https://your-jhe-instance.com/fhir/Observation/1",
"resource": {
"resourceType": "Observation",
"id": "1",
"status": "final",
"code": {
"coding": [{
"system": "https://w3id.org/openmhealth",
"code": "omh:blood-glucose:4.0"
}]
},
"subject": {
"reference": "Patient/10001"
},
"device": {
"reference": "Device/70001"
},
"identifier": [{
"system": "https://commonhealth.org",
"value": "obs-12345"
}],
"valueAttachment": {
"contentType": "application/json",
"data": "eyJoZWFkZXIiOnsiIHV1aWQiOiI1NTBlODQwMC1lMjliLTQxZDQtYTcxNi00NDY2NTU0NDAwMDAiLCJzY2hlbWFfaWQiOnsibmFtZXNwYWNlIjoib21oIiwibmFtZSI6ImJsb29kLWdsdWNvc2UiLCJ2ZXJzaW9uIjoiNC4wIn0sImNyZWF0aW9uX2RhdGVfdGltZSI6IjIwMjQtMTAtMjVUMjE6MTM6MzEuNDM4WiIsInNvdXJjZV9jcmVhdGlvbl9kYXRlX3RpbWUiOiIyMDI0LTA1LTAyVDA4OjMwOjAwLTA3OjAwIn0sImJvZHkiOnsiZWZmZWN0aXZlX3RpbWVfZnJhbWUiOnsiZGF0ZV90aW1lIjoiMjAyNC0wNS0wMlQwODozMDowMC0wNzowMCJ9LCJibG9vZF9nbHVjb3NlIjp7InZhbHVlIjoxMDUsInVuaXQiOiJtZy9kTCJ9LCJ0ZW1wb3JhbF9yZWxhdGlvbnNoaXBfdG9fbWVhbCI6ImZhc3RpbmcifX0="
}
}
}
]
}Decode Base64 OMH Data¶
The valueAttachment.data field contains base64-encoded OMH JSON:
import base64
import json
def decode_omh_data(fhir_observation):
"""Extract and decode OMH data from FHIR observation"""
encoded_data = fhir_observation["valueAttachment"]["data"]
# Decode base64
decoded_bytes = base64.b64decode(encoded_data)
# Parse JSON
omh_data = json.loads(decoded_bytes.decode("utf-8"))
return omh_data
# Example
omh_data = decode_omh_data(observation)
print(f"UUID: {omh_data['header']['uuid']}")
print(f"Data type: {omh_data['header']['schema_id']['name']}")
print(f"Value: {omh_data['body']}")Output:
{
"header": {
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"schema_id": {"namespace": "omh", "name": "blood-glucose", "version": "4.0"},
"creation_date_time": "2024-10-25T21:13:31.438Z",
"source_creation_date_time": "2024-05-02T08:30:00-07:00",
},
"body": {
"effective_time_frame": {"date_time": "2024-05-02T08:30:00-07:00"},
"blood_glucose": {"value": 105, "unit": "mg/dL"},
"temporal_relationship_to_meal": "fasting",
},
}Reference: jupyterhealth-exchange/core/models.py
Filter by Data Type¶
# Blood glucose only
curl "https://your-jhe-instance.com/fhir/Observation?patient._has:Group:member:_id=10001&patient=10001&code=https://w3id.org/openmhealth|omh:blood-glucose:4.0" \
-H "Authorization: Bearer $ACCESS_TOKEN"
# Blood pressure only
curl "https://your-jhe-instance.com/fhir/Observation?patient._has:Group:member:_id=10001&patient=10001&code=https://w3id.org/openmhealth|omh:blood-pressure:4.0" \
-H "Authorization: Bearer $ACCESS_TOKEN"
# Heart rate only
curl "https://your-jhe-instance.com/fhir/Observation?patient._has:Group:member:_id=10001&patient=10001&code=https://w3id.org/openmhealth|omh:heart-rate:2.0" \
-H "Authorization: Bearer $ACCESS_TOKEN"Search by Patient Identifier¶
If you don’t know the patient’s ID but have another identifier:
curl "https://your-jhe-instance.com/fhir/Observation?patient._has:Group:member:_id=10001&patient.identifier=https://hospital.example.com|MRN-12345" \
-H "Authorization: Bearer $ACCESS_TOKEN"Reference: jupyterhealth-exchange/core/views/observation.py
Pagination Strategies¶
Both Admin REST API and FHIR API support pagination for large result sets.
Admin REST API Pagination¶
The Admin REST API uses next and previous links for pagination:
import requests
BASE_URL = "https://your-jhe-instance.com"
ACCESS_TOKEN = "your-access-token"
def fetch_all_observations(org_id, study_id, patient_id):
"""Fetch all observations using pagination"""
all_observations = []
url = f"{BASE_URL}/api/v1/observations"
params = {
"organization_id": org_id,
"study_id": study_id,
"patient_id": patient_id,
"limit": 100,
}
while url:
response = requests.get(
url,
headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
params=params if params else None,
)
response.raise_for_status()
data = response.json()
all_observations.extend(data["results"])
# Get next page
url = data.get("next")
params = None # Next URL has params embedded
print(f"Fetched {len(all_observations)} / {data['count']} observations")
return all_observations
# Usage
observations = fetch_all_observations(org_id=1, study_id=10001, patient_id=10001)
print(f"Total observations: {len(observations)}")FHIR API Pagination¶
The FHIR API uses link elements with relation="next" for pagination:
def fetch_all_fhir_observations(study_id, patient_id):
"""Fetch all FHIR observations using pagination"""
all_entries = []
url = f"{BASE_URL}/fhir/Observation"
params = {"patient._has:Group:member:_id": study_id, "patient": patient_id}
while url:
response = requests.get(
url,
headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
params=params if params else None,
)
response.raise_for_status()
bundle = response.json()
all_entries.extend(bundle.get("entry", []))
# Find next link
next_link = next(
(link for link in bundle.get("link", []) if link.get("relation") == "next"),
None,
)
url = next_link["url"] if next_link else None
params = None # Next URL has params embedded
print(f"Fetched {len(all_entries)} / {bundle.get('total', 0)} entries")
return all_entriesNote: To fetch the list of patients in a study, see Patient Management.
Export Data for Analysis¶
Export to CSV¶
import requests
import csv
import base64
import json
from datetime import datetime
BASE_URL = "https://your-jhe-instance.com"
ACCESS_TOKEN = "your-access-token"
def fetch_observations(org_id, study_id, patient_id):
"""Fetch all observations for a patient"""
all_obs = []
url = f"{BASE_URL}/api/v1/observations"
params = {
"organization_id": org_id,
"study_id": study_id,
"patient_id": patient_id,
"limit": 100,
}
while url:
response = requests.get(
url,
headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
params=params if params else None,
)
response.raise_for_status()
data = response.json()
all_obs.extend(data["results"])
url = data.get("next")
params = None
return all_obs
def export_blood_glucose_to_csv(observations, output_file):
"""Export blood glucose observations to CSV"""
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
# Header
writer.writerow(
[
"Observation ID",
"Patient Name",
"Date/Time",
"Glucose (mg/dL)",
"Meal Context",
"Data Source",
"Created",
]
)
# Data
for obs in observations:
if obs["codingCode"] != "omh:blood-glucose:4.0":
continue
omh_data = obs["valueAttachmentData"]
body = omh_data["body"]
writer.writerow(
[
obs["id"],
obs["patientNameDisplay"],
body["effective_time_frame"]["date_time"],
body["blood_glucose"]["value"],
body.get("temporal_relationship_to_meal", "N/A"),
obs["dataSourceName"],
obs["created"],
]
)
print(f"Exported to {output_file}")
# Usage
observations = fetch_observations(org_id=1, study_id=10001, patient_id=10001)
export_blood_glucose_to_csv(observations, "blood_glucose.csv")Export to Pandas DataFrame¶
import pandas as pd
import requests
def fetch_to_dataframe(org_id, study_id, patient_id, data_type):
"""Fetch observations and convert to pandas DataFrame"""
# Fetch data
observations = fetch_observations(org_id, study_id, patient_id)
# Filter by data type
filtered = [obs for obs in observations if obs["codingCode"] == data_type]
# Convert to DataFrame
rows = []
for obs in filtered:
omh_data = obs["valueAttachmentData"]
body = omh_data["body"]
row = {
"observation_id": obs["id"],
"patient_id": obs["subjectPatient"],
"patient_name": obs["patientNameDisplay"],
"timestamp": body["effective_time_frame"]["date_time"],
"data_source": obs["dataSourceName"],
"created": obs["created"],
}
# Add data-type-specific fields
if data_type == "omh:blood-glucose:4.0":
row["glucose_mg_dl"] = body["blood_glucose"]["value"]
row["meal_context"] = body.get("temporal_relationship_to_meal")
elif data_type == "omh:blood-pressure:4.0":
row["systolic_mmhg"] = body["systolic_blood_pressure"]["value"]
row["diastolic_mmhg"] = body["diastolic_blood_pressure"]["value"]
elif data_type == "omh:heart-rate:2.0":
row["heart_rate_bpm"] = body["heart_rate"]["value"]
rows.append(row)
df = pd.DataFrame(rows)
# Convert timestamps
df["timestamp"] = pd.to_datetime(df["timestamp"])
df["created"] = pd.to_datetime(df["created"])
return df
# Usage
df_glucose = fetch_to_dataframe(1, 10001, 10001, "omh:blood-glucose:4.0")
df_bp = fetch_to_dataframe(1, 10001, 10001, "omh:blood-pressure:4.0")
# Analyze
print(f"Mean glucose: {df_glucose['glucose_mg_dl'].mean():.1f} mg/dL")
print(f"Mean systolic BP: {df_bp['systolic_mmhg'].mean():.1f} mmHg")
# Save
df_glucose.to_csv("glucose_data.csv", index=False)
df_bp.to_csv("bp_data.csv", index=False)Export All Patients in Study¶
import requests
import pandas as pd
def export_study_data(org_id, study_id, data_type):
"""Export data for all patients in a study"""
# Get all patients
response = requests.get(
f"{BASE_URL}/api/v1/patients",
headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
params={"organization_id": org_id, "study_id": study_id},
)
patients = response.json()["results"]
# Fetch data for each patient
all_data = []
for patient in patients:
print(f"Fetching data for {patient['nameGiven']} {patient['nameFamily']}...")
df = fetch_to_dataframe(org_id, study_id, patient["id"], data_type)
all_data.append(df)
# Combine
combined_df = pd.concat(all_data, ignore_index=True)
return combined_df
# Usage
study_df = export_study_data(1, 10001, "omh:blood-glucose:4.0")
study_df.to_csv("study_blood_glucose.csv", index=False)
print(
f"Exported {len(study_df)} observations from {study_df['patient_id'].nunique()} patients"
)Real-Time Data Monitoring¶
Poll for New Data¶
import time
from datetime import datetime, timedelta
def monitor_new_observations(org_id, study_id, patient_id, poll_interval=60):
"""Poll for new observations every poll_interval seconds"""
last_check = datetime.now()
print(f"Monitoring started at {last_check}")
while True:
# Fetch observations created since last check
observations = fetch_observations(org_id, study_id, patient_id)
new_obs = [
obs
for obs in observations
if datetime.fromisoformat(obs["created"].replace("Z", "+00:00"))
> last_check
]
if new_obs:
print(f"\n{len(new_obs)} new observation(s) at {datetime.now()}:")
for obs in new_obs:
print(f" - {obs['codingText']}: {obs['id']}")
last_check = datetime.now()
# Wait
time.sleep(poll_interval)
# Usage
monitor_new_observations(
org_id=1,
study_id=10001,
patient_id=10001,
poll_interval=60, # Check every 60 seconds
)Webhook Alternative¶
For production systems, implement webhooks instead of polling:
Configure webhook endpoint on your server
Exchange POSTs to your endpoint when new data arrives
Process data immediately without polling overhead
Note: Webhook support depends on Exchange configuration.
Data Quality Checks¶
Identify Missing Data¶
def check_data_completeness(org_id, study_id, expected_data_types):
"""Check which patients have uploaded each data type"""
# Get all patients
response = requests.get(
f"{BASE_URL}/api/v1/patients",
headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
params={"organization_id": org_id, "study_id": study_id},
)
patients = response.json()["results"]
results = []
for patient in patients:
row = {
"patient_id": patient["id"],
"patient_name": f"{patient['nameGiven']} {patient['nameFamily']}",
}
# Check each data type
for data_type in expected_data_types:
observations = fetch_observations(org_id, study_id, patient["id"])
count = sum(1 for obs in observations if obs["codingCode"] == data_type)
row[data_type] = count
results.append(row)
df = pd.DataFrame(results)
return df
# Usage
completeness = check_data_completeness(
org_id=1,
study_id=10001,
expected_data_types=[
"omh:blood-glucose:4.0",
"omh:blood-pressure:4.0",
"omh:heart-rate:2.0",
],
)
print(completeness)
# Identify patients with missing data
missing = completeness[completeness["omh:blood-glucose:4.0"] == 0]
print(f"\n{len(missing)} patients have not uploaded blood glucose data:")
print(missing["patient_name"].tolist())Detect Outliers¶
def detect_outliers(df, column, threshold_std=3):
"""Detect statistical outliers using standard deviation"""
mean = df[column].mean()
std = df[column].std()
outliers = df[
(df[column] < mean - threshold_std * std)
| (df[column] > mean + threshold_std * std)
]
return outliers
# Usage
df_glucose = fetch_to_dataframe(1, 10001, 10001, "omh:blood-glucose:4.0")
outliers = detect_outliers(df_glucose, "glucose_mg_dl", threshold_std=3)
if len(outliers) > 0:
print(f"Found {len(outliers)} outlier readings:")
print(outliers[["timestamp", "glucose_mg_dl"]])Common Issues¶
Empty Results¶
Issue: Query returns 0 observations even though data exists.
Common Causes:
Authorization: You’re not authorized for the patient’s study
Solution: Verify you’re a member/manager of the organization
Consent: Patient hasn’t consented to share data
Solution: Check consent status via
/api/v1/patients/{id}/consents
Wrong parameters: Incorrect organization_id or study_id
Solution: Verify IDs match the patient’s enrollment
Reference: Authorization checks at jupyterhealth-exchange/core/views/observation.py
Base64 Decoding Error¶
Error: “Invalid base64-encoded string”
Cause: The valueAttachment.data field is already decoded (Admin API) or incorrectly formatted.
Solution: Admin REST API returns decoded OMH data in valueAttachmentData. FHIR API returns base64-encoded data in valueAttachment.data.
# Admin API - already decoded
omh_data = observation["valueAttachmentData"] # Already a dict
# FHIR API - needs decoding
encoded = observation["valueAttachment"]["data"]
decoded = base64.b64decode(encoded)
omh_data = json.loads(decoded.decode("utf-8"))Permission Denied¶
Error: 403 Forbidden
Cause: User not authorized to view this patient’s data.
Solution: Verify authorization:
# Check if practitioner is authorized for patient
python manage.py shell
from core.models import Patient, Study
patient_id = 10001
practitioner_user_id = 1
study_id = 10001
# Get authorized patients
authorized = Patient.for_practitioner_organization_study(
practitioner_user_id=practitioner_user_id,
organization_id=1,
study_id=study_id
)
if authorized.filter(id=patient_id).exists():
print("Authorized")
else:
print("Not authorized")Reference: jupyterhealth-exchange/core/models.py
Best Practices¶
Performance¶
Use pagination: Fetch data in chunks (100-1000 records)
Cache tokens: Reuse access tokens until expiry (10 hours default)
Filter server-side: Use query parameters instead of filtering in client
Batch requests: Fetch data for multiple patients in parallel
Data Privacy¶
Minimum necessary: Only fetch data types needed for analysis
De-identify: Remove PII before storing locally
Secure storage: Encrypt exported data at rest
Audit access: Log all data retrieval operations
Data Quality¶
Validate timestamps: Check for future dates or invalid formats
Check units: Verify units match expected values (mmHg, mg/dL, etc.)
Detect duplicates: Use observation IDs to prevent duplicate processing
Handle missing data: Implement strategies for incomplete datasets
Error Handling¶
import requests
from requests.exceptions import HTTPError, ConnectionError, Timeout
import time
def fetch_with_retry(url, headers, params, max_retries=3):
"""Fetch data with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
return response.json()
except HTTPError as e:
if e.response.status_code == 401:
# Token expired - refresh and retry
refresh_token()
continue
elif e.response.status_code == 403:
# Permission denied - don't retry
raise
elif e.response.status_code >= 500:
# Server error - retry with backoff
wait = 2**attempt
print(f"Server error, retrying in {wait}s...")
time.sleep(wait)
continue
else:
raise
except (ConnectionError, Timeout) as e:
# Network error - retry with backoff
wait = 2**attempt
print(f"Network error, retrying in {wait}s...")
time.sleep(wait)
continue
raise Exception(f"Failed after {max_retries} attempts")