Python Guide for HyperStudy API
This guide provides complete, working examples for accessing HyperStudy data using Python.
Prerequisites
Installation
Install required packages:
pip install requests pandas python-dotenv
API Key Setup
- Generate an API key (see API Key Management)
- Create a
.envfile in your project directory:
# .env
HYPERSTUDY_API_KEY=hst_live_your_api_key_here
HYPERSTUDY_BASE_URL=https://api.hyperstudy.io/api/v3
- Add
.envto your.gitignore:
# .gitignore
.env
Quick Start
Minimal Example
import os
import requests
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
API_KEY = os.getenv('HYPERSTUDY_API_KEY')
BASE_URL = os.getenv('HYPERSTUDY_BASE_URL')
# Make your first request
response = requests.get(
f'{BASE_URL}/data/events/room/your-room-id',
headers={'X-API-Key': API_KEY},
params={'limit': 10}
)
if response.ok:
result = response.json()
events = result['data']
print(f"Retrieved {len(events)} events")
for event in events:
print(f"{event['onset']}ms: {event['componentType']}")
else:
print(f"Error: {response.status_code} - {response.text}")
Complete Examples
Example 1: Download Events for One Participant
Goal: Fetch all events for a specific participant in a session.
import os
import requests
from dotenv import load_dotenv
def download_participant_events(api_key, participant_id, room_id):
"""
Download all events for a participant in a specific session.
Args:
api_key: Your HyperStudy API key
participant_id: Participant's ID
room_id: Room/session ID
Returns:
list: List of event dictionaries
"""
base_url = 'https://api.hyperstudy.io/api/v3'
response = requests.get(
f'{base_url}/data/events/participant/{participant_id}',
headers={'X-API-Key': api_key},
params={
'roomId': room_id,
'sort': 'onset',
'order': 'asc'
}
)
# Check for errors
if not response.ok:
print(f"Error {response.status_code}: {response.text}")
return []
result = response.json()
# Check API status
if result['status'] != 'success':
print(f"API Error: {result.get('error', {}).get('message')}")
return []
events = result['data']
metadata = result['metadata']
print(f"Retrieved {len(events)} events")
print(f"Experiment started at: {metadata['processing']['experimentStartedAt']}")
print(f"Total events available: {metadata['pagination']['total']}")
return events
# Usage
if __name__ == '__main__':
load_dotenv()
events = download_participant_events(
api_key=os.getenv('HYPERSTUDY_API_KEY'),
participant_id='your-participant-id',
room_id='your-room-id'
)
# Analyze events
if events:
print(f"\nFirst 5 events:")
for event in events[:5]:
onset_sec = event['onset'] / 1000
print(f" {onset_sec:7.2f}s: {event['componentType']:20s} - {event['content']}")
Example 2: Download All Data Types for Analysis
Goal: Fetch events, recordings, chat, and ratings for comprehensive analysis.
import os
import requests
from dotenv import load_dotenv
def download_all_participant_data(api_key, participant_id, room_id):
"""
Download all data types for a participant.
Args:
api_key: Your HyperStudy API key
participant_id: Participant's ID
room_id: Room/session ID
Returns:
dict: Dictionary with keys: events, recordings, chat, ratings
"""
base_url = 'https://api.hyperstudy.io/api/v3'
headers = {'X-API-Key': api_key}
params = {'roomId': room_id}
data_types = {
'events': 'events',
'recordings': 'recordings',
'chat': 'chat',
'continuous_ratings': 'ratings/continuous',
'sync_metrics': 'sync'
}
all_data = {}
print(f"Downloading data for participant {participant_id}...")
for key, endpoint in data_types.items():
url = f'{base_url}/data/{endpoint}/participant/{participant_id}'
try:
response = requests.get(url, headers=headers, params=params, timeout=30)
if response.ok:
result = response.json()
all_data[key] = result['data']
print(f" ✓ {key}: {len(result['data'])} records")
else:
print(f" ✗ {key}: Error {response.status_code}")
all_data[key] = []
except requests.exceptions.RequestException as e:
print(f" ✗ {key}: Request failed - {e}")
all_data[key] = []
print(f"\nDownload complete!")
return all_data
# Usage
if __name__ == '__main__':
load_dotenv()
all_data = download_all_participant_data(
api_key=os.getenv('HYPERSTUDY_API_KEY'),
participant_id='your-participant-id',
room_id='your-room-id'
)
# Summary
print("\nData Summary:")
for data_type, records in all_data.items():
if records:
print(f" {data_type}: {len(records)} records")
if data_type == 'events' and len(records) > 0:
first = records[0]
last = records[-1]
duration = (last['onset'] - first['onset']) / 1000
print(f" Duration: {duration:.1f} seconds")
Example 3: Export to pandas DataFrame and CSV
Goal: Download event data and export to pandas DataFrame and CSV file.
import os
import requests
import pandas as pd
from dotenv import load_dotenv
from datetime import datetime
def export_events_to_csv(api_key, room_id, output_file):
"""
Download events and export to CSV file with pandas.
Args:
api_key: Your HyperStudy API key
room_id: Room/session ID
output_file: Path to output CSV file
"""
base_url = 'https://api.hyperstudy.io/api/v3'
print(f"Fetching events for room {room_id}...")
response = requests.get(
f'{base_url}/data/events/room/{room_id}',
headers={'X-API-Key': api_key},
params={'sort': 'onset', 'order': 'asc'}
)
if not response.ok:
print(f"Error: {response.status_code} - {response.text}")
return None
result = response.json()
events = result['data']
print(f"Retrieved {len(events)} events")
# Convert to DataFrame
df = pd.DataFrame(events)
# Add computed columns
df['onset_sec'] = df['onset'] / 1000 # Convert to seconds
# Parse timestamps
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Extract response values if present
if 'response' in df.columns:
# Flatten response column if it exists
df['response_value'] = df['response'].apply(
lambda x: x.get('value') if isinstance(x, dict) else None
)
df['response_time'] = df['response'].apply(
lambda x: x.get('responseTime') if isinstance(x, dict) else None
)
# Export to CSV
df.to_csv(output_file, index=False)
print(f"Exported to {output_file}")
# Display summary statistics
print("\nSummary Statistics:")
print(f" Total events: {len(df)}")
print(f" Unique participants: {df['participantId'].nunique()}")
print(f" Component types: {df['componentType'].nunique()}")
print(f" Time range: {df['onset_sec'].min():.1f}s - {df['onset_sec'].max():.1f}s")
# Show component type distribution
print("\nEvents by Component Type:")
print(df['componentType'].value_counts().head(10))
return df
# Usage
if __name__ == '__main__':
load_dotenv()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f'events_{timestamp}.csv'
df = export_events_to_csv(
api_key=os.getenv('HYPERSTUDY_API_KEY'),
room_id='your-room-id',
output_file=output_file
)
# Further analysis with pandas
if df is not None:
# Filter to specific component types
video_events = df[df['componentType'] == 'ShowVideo']
print(f"\nVideo events: {len(video_events)}")
# Group by participant
by_participant = df.groupby('participantId').size()
print(f"\nEvents per participant:")
print(by_participant)
Example 4: Download Entire Experiment with Pagination
Goal: Download all events for an experiment, handling pagination for large datasets.
import os
import requests
from dotenv import load_dotenv
def download_experiment_all_pages(api_key, experiment_id):
"""
Download all events for an experiment using pagination.
Args:
api_key: Your HyperStudy API key
experiment_id: Experiment ID
Returns:
list: All events from all pages
"""
base_url = 'https://api.hyperstudy.io/api/v3'
headers = {'X-API-Key': api_key}
all_events = []
offset = 0
limit = 1000
has_more = True
print(f"Downloading all events for experiment {experiment_id}...")
while has_more:
# Make request with pagination
response = requests.get(
f'{base_url}/data/events/experiment/{experiment_id}',
headers=headers,
params={
'limit': limit,
'offset': offset,
'sort': 'onset',
'order': 'asc'
},
timeout=60
)
if not response.ok:
print(f"Error: {response.status_code} - {response.text}")
break
result = response.json()
# Add events from this page
page_events = result['data']
all_events.extend(page_events)
# Check pagination
pagination = result['metadata']['pagination']
total = pagination['total']
has_more = pagination.get('hasMore', False)
print(f" Downloaded {len(all_events)}/{total} events ({len(all_events)/total*100:.1f}%)")
# Update offset for next page
if has_more:
offset = pagination.get('nextOffset', offset + limit)
else:
print(" All pages downloaded!")
print(f"\nTotal: {len(all_events)} events downloaded")
return all_events
# Usage
if __name__ == '__main__':
load_dotenv()
all_events = download_experiment_all_pages(
api_key=os.getenv('HYPERSTUDY_API_KEY'),
experiment_id='your-experiment-id'
)
# Analyze complete dataset
if all_events:
import pandas as pd
df = pd.DataFrame(all_events)
df['onset_sec'] = df['onset'] / 1000
print(f"\nDataset Summary:")
print(f" Total events: {len(df)}")
print(f" Participants: {df['participantId'].nunique()}")
print(f" Sessions: {df['sessionId'].nunique()}")
print(f" Duration: {df['onset_sec'].max():.1f} seconds")
# Save to CSV
output_file = f"experiment_{experiment_id}_all_events.csv"
df.to_csv(output_file, index=False)
print(f"\nSaved to {output_file}")
Reusable Client Class
For production use, create a reusable client class:
# hyperstudy_client.py
import requests
import pandas as pd
from typing import Optional, List, Dict
class HyperStudyClient:
"""Python client for HyperStudy API v3"""
def __init__(self, api_key: str, base_url: str = 'https://api.hyperstudy.io/api/v3'):
self.api_key = api_key
self.base_url = base_url
self.headers = {'X-API-Key': api_key}
def get_events(self, scope: str, scope_id: str, **kwargs) -> Optional[List[Dict]]:
"""Fetch events"""
result = self._request(f'data/events/{scope}/{scope_id}', params=kwargs)
return result['data'] if result else None
def get_recordings(self, scope: str, scope_id: str, **kwargs) -> Optional[List[Dict]]:
"""Fetch recordings"""
result = self._request(f'data/recordings/{scope}/{scope_id}', params=kwargs)
return result['data'] if result else None
def get_all_participant_data(self, participant_id: str, room_id: str) -> Dict:
"""Fetch all data types for a participant"""
return {
'events': self.get_events('participant', participant_id, roomId=room_id) or [],
'recordings': self.get_recordings('participant', participant_id, roomId=room_id) or [],
}
def to_dataframe(self, data: List[Dict]) -> pd.DataFrame:
"""Convert data to pandas DataFrame"""
df = pd.DataFrame(data)
if 'onset' in df.columns:
df['onset_sec'] = df['onset'] / 1000
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
def _request(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
"""Make API request"""
url = f"{self.base_url}/{endpoint}"
response = requests.get(url, headers=self.headers, params=params, timeout=30)
if not response.ok:
print(f"Error: {response.status_code} - {response.text}")
return None
result = response.json()
return result if result['status'] == 'success' else None
# Usage
from dotenv import load_dotenv
import os
load_dotenv()
client = HyperStudyClient(api_key=os.getenv('HYPERSTUDY_API_KEY'))
# Fetch and analyze
events = client.get_events('room', 'your-room-id')
df = client.to_dataframe(events)
print(df.describe())
Error Handling
Robust Error Handling Pattern
import requests
import time
def make_robust_request(url, headers, params=None, max_retries=3):
"""Make API request with retry logic and error handling"""
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, params=params, timeout=30)
# Check HTTP status
response.raise_for_status()
# Parse JSON
result = response.json()
# Check API status
if result['status'] == 'error':
error = result['error']
print(f"API Error [{error['code']}]: {error['message']}")
if 'details' in error:
print(f"Details: {error['details']}")
return None
return result
except requests.exceptions.Timeout:
print(f"Timeout on attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # Rate limit
wait_time = int(e.response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
print(f"HTTP Error: {e}")
return None
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
print("Max retries exceeded")
return None
Common Patterns
Download Multiple Rooms
def download_multiple_rooms(api_key, room_ids):
"""Download data for multiple rooms"""
all_data = {}
for room_id in room_ids:
print(f"\nProcessing room: {room_id}")
response = requests.get(
f'https://api.hyperstudy.io/api/v3/data/events/room/{room_id}',
headers={'X-API-Key': api_key}
)
if response.ok:
result = response.json()
all_data[room_id] = result['data']
print(f" Retrieved {len(result['data'])} events")
return all_data
Real-time Monitoring
import time
def monitor_experiment(api_key, experiment_id, interval=30):
"""Monitor experiment and report new events"""
last_count = 0
while True:
response = requests.get(
f'https://api.hyperstudy.io/api/v3/data/events/experiment/{experiment_id}',
headers={'X-API-Key': api_key},
params={'limit': 1} # Just get metadata
)
if response.ok:
result = response.json()
current_count = result['metadata']['pagination']['total']
if current_count > last_count:
new_events = current_count - last_count
print(f"[{time.strftime('%H:%M:%S')}] {new_events} new events (total: {current_count})")
last_count = current_count
time.sleep(interval)
Troubleshooting
Common Issues
"Module not found" error:
pip install requests pandas python-dotenv
"HYPERSTUDY_API_KEY not found":
- Check
.envfile exists - Verify
load_dotenv()is called - Print
os.getenv('HYPERSTUDY_API_KEY')to debug
"Invalid API key":
- Verify you copied the full key (not masked version)
- Check key hasn't expired
- Ensure using
X-API-Keyheader
Slow downloads:
- Use pagination with smaller
limit - Download data types separately
- Consider parallel requests (but respect rate limits)
Next Steps
- Learn about other languages: JavaScript | R
- Understand all endpoints: Data Types & Endpoints
- Manage your keys: API Key Management