Integrations Guide
This guide covers integrating slopit with external services including JATOS and Prolific.
JATOS Integration
JATOS (Just Another Tool for Online Studies) is a popular server for running online experiments. slopit provides two methods for working with JATOS data:
- File-based loading: Load exported result files
- API-based streaming: Connect directly to a JATOS server
Loading JATOS Export Files
JATOS exports study results as text files containing JSON data. These can be in two formats:
- JSON array format:
[{trial1}, {trial2}, ...] - Newline-delimited JSON: one JSON object per line
The JATOSLoader handles both formats automatically.
from pathlib import Path
from slopit.io import JATOSLoader
loader = JATOSLoader()
# load a single result file
session = loader.load(Path("study_result_123.txt"))
print(f"Session ID: {session.session_id}")
print(f"Trials: {len(session.trials)}")
# load all results from a directory
for session in loader.load_many(Path("jatos_results/")):
print(f"Loaded: {session.session_id}")
Format Detection
The loader checks for JATOS-specific markers in the file content:
from pathlib import Path
from slopit.io import JATOSLoader
# check if a path is JATOS format
if JATOSLoader.can_load(Path("data/")):
loader = JATOSLoader()
sessions = list(loader.load_many(Path("data/")))
Converting Raw Trial Data
When receiving data from the JATOS API directly, use load_result() to convert raw trial arrays:
from slopit.io import JATOSLoader
loader = JATOSLoader()
# raw trial data from JATOS
trials = [
{
"trial_type": "survey-text",
"rt": 15000,
"response": "This is my answer...",
"time_elapsed": 45000,
"slopit": {
"behavioral": {
"keystrokes": [...],
"focus": [...],
}
}
},
# more trials...
]
session = loader.load_result(trials, result_id="jatos-result-456")
JATOS API Client
For direct integration with a JATOS server, use the JATOSClient:
from slopit.dashboard.integrations import JATOSClient
async def sync_from_jatos():
async with JATOSClient("https://jatos.example.com", "api-token") as client:
# list all accessible studies
studies = await client.list_studies()
for study in studies:
print(f"Study: {study['title']} (ID: {study['id']})")
# get metadata for a specific study
study = await client.get_study("study-123")
print(f"Description: {study.get('description')}")
# get all results for a study
results = await client.get_study_results("study-123")
print(f"Total results: {len(results)}")
# stream results as SlopitSession objects
async for session in client.stream_results("study-123"):
print(f"Session: {session.session_id}")
print(f" Trials: {len(session.trials)}")
# or collect all sessions at once
sessions = await client.get_sessions("study-123")
print(f"Loaded {len(sessions)} sessions")
JATOS API Authentication
The JATOS API uses token-based authentication. Generate a token in the JATOS admin interface:
- Log in to JATOS as admin
- Go to Admin > API Tokens
- Generate a new token with appropriate permissions
Store the token securely (e.g., environment variables):
import os
from slopit.dashboard.integrations import JATOSClient
jatos_url = os.environ["JATOS_URL"]
jatos_token = os.environ["JATOS_TOKEN"]
async with JATOSClient(jatos_url, jatos_token) as client:
sessions = await client.get_sessions("study-123")
Error Handling
The client raises httpx.HTTPStatusError for API errors:
import httpx
from slopit.dashboard.integrations import JATOSClient
async def safe_sync():
async with JATOSClient(url, token) as client:
try:
sessions = await client.get_sessions("study-123")
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
print("Authentication failed; check your API token")
elif e.response.status_code == 404:
print("Study not found")
else:
print(f"API error: {e.response.status_code}")
Prolific Integration
Prolific is a participant recruitment platform. slopit provides a client for managing submissions based on analysis results.
Setting Up the Client
import os
from slopit.dashboard.integrations import ProlificClient
# use environment variable for token
token = os.environ["PROLIFIC_TOKEN"]
async with ProlificClient(token) as client:
studies = await client.list_studies()
Listing Studies and Submissions
from slopit.dashboard.integrations import ProlificClient, SubmissionStatus
async with ProlificClient(token) as client:
# list all studies
studies = await client.list_studies()
for study in studies:
print(f"{study['name']}: {study['status']}")
# get submissions for a study
submissions = await client.get_submissions("study-id-123")
for sub in submissions:
print(f" {sub['participant_id']}: {sub['status']}")
# filter by status
pending = await client.get_submissions(
"study-id-123",
status=SubmissionStatus.AWAITING_REVIEW,
)
print(f"Awaiting review: {len(pending)}")
Submission Statuses
from slopit.dashboard.integrations import SubmissionStatus
# available statuses
SubmissionStatus.ACTIVE # participant is working
SubmissionStatus.AWAITING_REVIEW # complete, needs review
SubmissionStatus.APPROVED # approved and paid
SubmissionStatus.REJECTED # rejected
SubmissionStatus.RETURNED # returned to pool
SubmissionStatus.TIMED_OUT # participant timed out
Approving Submissions
from slopit.dashboard.integrations import ProlificClient
async with ProlificClient(token) as client:
# approve a single submission
result = await client.approve_submission("submission-id")
# approve with a thank you message
result = await client.approve_submission(
"submission-id",
message="Thank you for your participation!",
)
Rejecting Submissions
Rejections require a category and should be used sparingly:
from slopit.dashboard.integrations import ProlificClient
async with ProlificClient(token) as client:
# reject with required category
result = await client.reject_submission(
"submission-id",
rejection_category="NO_DATA",
message="We were unable to record your responses.",
)
Common rejection categories:
NO_DATA: No data was submittedBAD_CODE: Invalid completion codeFAILED_ATTENTION: Failed attention checksINCOMP_LONGITUDINAL: Incomplete longitudinal studyTOO_QUICKLY: Completed too quicklyTOO_SLOWLY: Completed too slowlyOTHER: Other reason (requires message)
Returning Submissions
Returns release the slot without penalty to the participant:
from slopit.dashboard.integrations import ProlificClient
async with ProlificClient(token) as client:
# return a submission
result = await client.return_submission(
"submission-id",
message="Thank you, but we need to re-run this study.",
)
Batch Operations
For efficiency, use batch operations when processing multiple submissions:
from slopit.dashboard.integrations import ProlificClient
async with ProlificClient(token) as client:
# batch approve
result = await client.batch_approve(
"study-id",
["sub-1", "sub-2", "sub-3"],
)
print(f"Approved: {result}")
# batch reject
result = await client.batch_reject(
"study-id",
["sub-4", "sub-5"],
rejection_category="NO_DATA",
)
# batch return
result = await client.batch_return(
"study-id",
["sub-6", "sub-7"],
)
Automated Review Workflow
Combine slopit analysis with Prolific submission management:
from slopit import load_sessions
from slopit.pipeline import AnalysisPipeline
from slopit.behavioral import KeystrokeAnalyzer, FocusAnalyzer
from slopit.dashboard.integrations import ProlificClient, SubmissionStatus
async def review_submissions(study_id: str, data_dir: str) -> None:
"""Review Prolific submissions using slopit analysis."""
# load and analyze sessions
sessions = load_sessions(data_dir)
pipeline = AnalysisPipeline([KeystrokeAnalyzer(), FocusAnalyzer()])
result = pipeline.analyze(sessions)
# map participant IDs to verdicts
participant_verdicts: dict[str, str] = {}
for session in sessions:
if session.participant_id:
verdict = result.verdicts.get(session.session_id)
if verdict:
participant_verdicts[session.participant_id] = verdict.status
# process Prolific submissions
async with ProlificClient(token) as client:
submissions = await client.get_submissions(
study_id,
status=SubmissionStatus.AWAITING_REVIEW,
)
approve_ids: list[str] = []
review_ids: list[str] = []
for sub in submissions:
participant_id = sub["participant_id"]
submission_id = sub["id"]
verdict = participant_verdicts.get(participant_id)
if verdict == "clean":
approve_ids.append(submission_id)
elif verdict in ("suspicious", "flagged"):
review_ids.append(submission_id)
# else: no data, skip
# batch approve clean submissions
if approve_ids:
await client.batch_approve(study_id, approve_ids)
print(f"Approved {len(approve_ids)} submissions")
# flag submissions for manual review
print(f"Flagged for review: {len(review_ids)} submissions")
for sub_id in review_ids:
print(f" - {sub_id}")
Dashboard WebSocket Events
The dashboard uses WebSockets for real-time updates. Clients can connect and receive events as sessions are processed.
Event Types
from slopit.dashboard.websocket import (
SessionNewEvent,
VerdictComputedEvent,
SyncProgressEvent,
)
# new session received
SessionNewEvent(
type="session.new",
data={
"session_id": "abc123",
"timestamp": 1706000000000,
"trial_count": 5,
}
)
# analysis complete
VerdictComputedEvent(
type="verdict.computed",
data={
"session_id": "abc123",
"status": "flagged",
"confidence": 0.85,
"flags": ["low_iki_variance", "blur_paste_pattern"],
}
)
# sync progress (JATOS/Prolific)
SyncProgressEvent(
type="sync.progress",
data={
"source": "jatos",
"study_id": "study-123",
"progress": 50,
"total": 100,
"status": "syncing",
}
)
JavaScript Client Example
const ws = new WebSocket('ws://localhost:8000/ws');
ws.onopen = () => {
console.log('Connected to slopit dashboard');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'session.new':
console.log(`New session: ${data.data.session_id}`);
refreshSessionList();
break;
case 'verdict.computed':
console.log(`Verdict for ${data.data.session_id}: ${data.data.status}`);
updateSessionStatus(data.data.session_id, data.data.status);
break;
case 'sync.progress':
console.log(`Sync: ${data.data.progress}/${data.data.total}`);
updateProgressBar(data.data.progress, data.data.total);
break;
}
};
ws.onclose = () => {
console.log('Disconnected from dashboard');
};
Python Client Example
import asyncio
import json
import websockets
async def listen_for_events():
"""Listen for real-time dashboard events."""
uri = "ws://localhost:8000/ws"
async with websockets.connect(uri) as websocket:
print("Connected to dashboard")
async for message in websocket:
event = json.loads(message)
if event["type"] == "session.new":
session_id = event["data"]["session_id"]
print(f"New session received: {session_id}")
elif event["type"] == "verdict.computed":
session_id = event["data"]["session_id"]
status = event["data"]["status"]
print(f"Verdict for {session_id}: {status}")
elif event["type"] == "sync.progress":
progress = event["data"]["progress"]
total = event["data"]["total"]
print(f"Sync progress: {progress}/{total}")
if __name__ == "__main__":
asyncio.run(listen_for_events())
Broadcasting Events
The ConnectionManager handles event broadcasting:
from slopit.dashboard.websocket import (
ConnectionManager,
SessionNewEvent,
VerdictComputedEvent,
)
manager = ConnectionManager()
# broadcast to all connected clients
await manager.broadcast(
SessionNewEvent(data={"session_id": "abc123"})
)
# send to specific client
await manager.send_personal(
websocket,
VerdictComputedEvent(data={"session_id": "abc123", "status": "clean"})
)
Export Formats
CSV Export
Export analysis results to CSV for use in spreadsheets or statistical software:
from slopit.pipeline import CSVExporter
exporter = CSVExporter()
# export session verdicts
exporter.export(result, "verdicts.csv")
# columns: session_id, status, confidence, flag_count, summary, flag_*
# export individual flags
exporter.export_flags(result, "flags.csv")
# columns: session_id, analyzer, type, severity, message, confidence, trial_ids
verdicts.csv format:
| session_id | status | confidence | flag_count | summary | flag_low_iki_variance | flag_excessive_blur |
|---|---|---|---|---|---|---|
| abc123 | flagged | 0.85 | 2 | Detected: low_iki_variance, excessive_blur | True | True |
| def456 | clean | 1.0 | 0 | No flags detected |
flags.csv format:
| session_id | analyzer | type | severity | message | confidence | trial_ids |
|---|---|---|---|---|---|---|
| abc123 | keystroke | low_iki_variance | medium | Keystroke timing unusually consistent | 0.8 | trial-1 |
| abc123 | focus | excessive_blur | medium | Excessive window switches | 0.7 | trial-1,trial-2 |
JSON Export
Export full results to JSON:
import json
from pathlib import Path
from slopit.pipeline import AnalysisPipeline
result = pipeline.analyze(sessions)
# convert to dict
data = result.to_dict()
# write to file
Path("results.json").write_text(json.dumps(data, indent=2, default=str))
Loading Previous Results
Load results from a previous analysis:
import json
from pathlib import Path
from slopit.schemas.analysis import PipelineResult
# load from JSON
data = json.loads(Path("results.json").read_text())
result = PipelineResult.model_validate(data)
# access verdicts
for session_id, verdict in result.verdicts.items():
print(f"{session_id}: {verdict.status}")
Dashboard API
The dashboard exposes REST endpoints for programmatic access.
Sessions API
# list sessions
curl http://localhost:8000/api/v1/sessions
# list with pagination and filters
curl "http://localhost:8000/api/v1/sessions?page=1&page_size=20&has_verdict=true"
# get single session
curl http://localhost:8000/api/v1/sessions/abc123
# upload session
curl -X POST http://localhost:8000/api/v1/sessions \
-H "Content-Type: application/json" \
-d @session.json
Trials API
# list trials for a session
curl http://localhost:8000/api/v1/trials/abc123
Analysis API
# run analysis on sessions
curl -X POST http://localhost:8000/api/v1/analysis/run \
-H "Content-Type: application/json" \
-d '{"session_ids": ["abc123", "def456"]}'
# get verdict for a session
curl http://localhost:8000/api/v1/analysis/verdict/abc123
Export API
# export to CSV
curl http://localhost:8000/api/v1/export/csv -o results.csv
# export with filters
curl "http://localhost:8000/api/v1/export/csv?status=flagged" -o flagged.csv
Complete Integration Example
"""Complete workflow: JATOS sync, analysis, Prolific management."""
import asyncio
import os
from pathlib import Path
from slopit.pipeline import AnalysisPipeline, CSVExporter
from slopit.behavioral import (
KeystrokeAnalyzer,
FocusAnalyzer,
PasteAnalyzer,
TimingAnalyzer,
)
from slopit.dashboard.integrations import (
JATOSClient,
ProlificClient,
SubmissionStatus,
)
from slopit.dashboard.services import StorageService
async def main() -> None:
# configuration from environment
jatos_url = os.environ["JATOS_URL"]
jatos_token = os.environ["JATOS_TOKEN"]
prolific_token = os.environ["PROLIFIC_TOKEN"]
study_id = os.environ["STUDY_ID"]
prolific_study_id = os.environ["PROLIFIC_STUDY_ID"]
# storage
storage = StorageService(Path("./data"))
# step 1: sync data from JATOS
print("Syncing from JATOS...")
async with JATOSClient(jatos_url, jatos_token) as jatos:
async for session in jatos.stream_results(study_id):
storage.save_session(session)
print(f" Saved: {session.session_id}")
# step 2: run analysis
print("\nRunning analysis...")
sessions = list(storage.iter_sessions())
print(f"Analyzing {len(sessions)} sessions")
pipeline = AnalysisPipeline([
KeystrokeAnalyzer(),
FocusAnalyzer(),
PasteAnalyzer(),
TimingAnalyzer(),
])
result = pipeline.analyze(sessions)
# save verdicts
for session_id, verdict in result.verdicts.items():
storage.save_verdict(session_id, {
"status": verdict.status,
"confidence": verdict.confidence,
"flags": [f.type for f in verdict.flags],
"summary": verdict.summary,
})
# export results
exporter = CSVExporter()
exporter.export(result, "verdicts.csv")
exporter.export_flags(result, "flags.csv")
# step 3: manage Prolific submissions
print("\nProcessing Prolific submissions...")
# map participant IDs to verdicts
participant_verdicts: dict[str, str] = {}
for session in sessions:
if session.participant_id:
verdict = result.verdicts.get(session.session_id)
if verdict:
participant_verdicts[session.participant_id] = verdict.status
async with ProlificClient(prolific_token) as prolific:
submissions = await prolific.get_submissions(
prolific_study_id,
status=SubmissionStatus.AWAITING_REVIEW,
)
approve_ids: list[str] = []
flag_ids: list[str] = []
for sub in submissions:
participant_id = sub["participant_id"]
submission_id = sub["id"]
verdict = participant_verdicts.get(participant_id)
if verdict == "clean":
approve_ids.append(submission_id)
elif verdict in ("suspicious", "flagged"):
flag_ids.append(submission_id)
# auto-approve clean submissions
if approve_ids:
await prolific.batch_approve(prolific_study_id, approve_ids)
print(f" Approved: {len(approve_ids)}")
# report flagged for manual review
if flag_ids:
print(f" Flagged for review: {len(flag_ids)}")
# summary
print("\nSummary:")
counts = {"clean": 0, "suspicious": 0, "flagged": 0}
for verdict in result.verdicts.values():
counts[verdict.status] += 1
print(f" Clean: {counts['clean']}")
print(f" Suspicious: {counts['suspicious']}")
print(f" Flagged: {counts['flagged']}")
if __name__ == "__main__":
asyncio.run(main())