Custom Analyzers
This guide covers writing custom analyzers for domain-specific detection.
Analyzer Architecture
Analyzers in slopit follow a simple pattern:
- Receive a
SlopitSession - Process behavioral data
- Compute metrics
- Generate flags based on thresholds
- Return an
AnalysisResult
Basic Structure
from dataclasses import dataclass
from slopit.behavioral.base import Analyzer, AnalyzerConfig
from slopit.schemas import SlopitSession
from slopit.schemas.analysis import AnalysisResult
from slopit.schemas.flags import AnalysisFlag
from slopit.schemas.types import JsonValue
@dataclass
class MyAnalyzerConfig(AnalyzerConfig):
"""Configuration for my analyzer."""
threshold: float = 0.5
class MyAnalyzer(Analyzer):
"""My custom analyzer."""
def __init__(self, config: MyAnalyzerConfig | None = None) -> None:
self.config = config or MyAnalyzerConfig()
@property
def name(self) -> str:
return "my_analyzer"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze a session."""
trial_results: list[dict[str, JsonValue]] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
# Skip trials without needed data
if not self._has_required_data(trial):
continue
# Compute metrics
metrics = self._compute_metrics(trial)
# Generate flags
flags = self._compute_flags(trial.trial_id, metrics)
# Store results
trial_results.append({
"trial_id": trial.trial_id,
"metrics": metrics,
"flags": [f.model_dump() for f in flags],
})
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_summary(trial_results),
)
def _has_required_data(self, trial) -> bool:
"""Check if trial has required data."""
return trial.behavioral is not None
def _compute_metrics(self, trial) -> dict[str, JsonValue]:
"""Compute metrics from trial data."""
return {"example_metric": 42.0}
def _compute_flags(
self, trial_id: str, metrics: dict[str, JsonValue]
) -> list[AnalysisFlag]:
"""Generate flags based on metrics."""
flags: list[AnalysisFlag] = []
value = metrics.get("example_metric", 0)
if isinstance(value, (int, float)) and value > self.config.threshold:
flags.append(
AnalysisFlag(
type="example_flag",
analyzer=self.name,
severity="medium",
message=f"Example metric exceeded threshold ({value:.1f})",
confidence=0.7,
evidence={"example_metric": value},
trial_ids=[trial_id],
)
)
return flags
def _compute_summary(
self, trial_results: list[dict[str, JsonValue]]
) -> dict[str, JsonValue]:
"""Compute session-level summary."""
return {
"trials_analyzed": len(trial_results),
"total_flags": sum(
len(r["flags"]) if isinstance(r["flags"], list) else 0
for r in trial_results
),
}
Example: Response Length Analyzer
An analyzer that flags suspiciously long responses:
from dataclasses import dataclass
from slopit.behavioral.base import Analyzer, AnalyzerConfig
from slopit.schemas import SlopitSession, SlopitTrial
from slopit.schemas.analysis import AnalysisResult
from slopit.schemas.flags import AnalysisFlag
from slopit.schemas.types import JsonValue
@dataclass
class ResponseLengthConfig(AnalyzerConfig):
"""Configuration for response length analysis."""
max_chars_per_minute: float = 500.0
min_response_length: int = 100
class ResponseLengthAnalyzer(Analyzer):
"""Analyzes response length relative to time spent."""
def __init__(self, config: ResponseLengthConfig | None = None) -> None:
self.config = config or ResponseLengthConfig()
@property
def name(self) -> str:
return "response_length"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
trial_results: list[dict[str, JsonValue]] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
if not self._is_analyzable(trial):
continue
metrics = self._compute_metrics(trial)
flags = self._compute_flags(trial.trial_id, metrics)
trial_results.append({
"trial_id": trial.trial_id,
"metrics": metrics,
"flags": [f.model_dump() for f in flags],
})
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary={"trials_analyzed": len(trial_results)},
)
def _is_analyzable(self, trial: SlopitTrial) -> bool:
"""Check if trial has response with sufficient length."""
if trial.response is None:
return False
if trial.response.character_count is None:
return False
if trial.response.character_count < self.config.min_response_length:
return False
if trial.rt is None or trial.rt <= 0:
return False
return True
def _compute_metrics(self, trial: SlopitTrial) -> dict[str, JsonValue]:
"""Compute characters per minute."""
char_count = trial.response.character_count # type: ignore
rt_minutes = trial.rt / 60000 # type: ignore
chars_per_minute = char_count / rt_minutes if rt_minutes > 0 else 0
return {
"character_count": char_count,
"rt_ms": trial.rt,
"chars_per_minute": chars_per_minute,
}
def _compute_flags(
self, trial_id: str, metrics: dict[str, JsonValue]
) -> list[AnalysisFlag]:
"""Flag if typing speed is unrealistic."""
flags: list[AnalysisFlag] = []
cpm = metrics.get("chars_per_minute", 0)
if isinstance(cpm, (int, float)) and cpm > self.config.max_chars_per_minute:
confidence = min(1.0, cpm / (self.config.max_chars_per_minute * 2))
flags.append(
AnalysisFlag(
type="unrealistic_typing_speed",
analyzer=self.name,
severity="high",
message=f"Response produced at {cpm:.0f} chars/min (threshold: {self.config.max_chars_per_minute})",
confidence=confidence,
evidence={"chars_per_minute": cpm},
trial_ids=[trial_id],
)
)
return flags
Example: Cross-Trial Similarity Analyzer
An analyzer that detects similar responses across trials (possible template usage):
from dataclasses import dataclass
from slopit.behavioral.base import Analyzer, AnalyzerConfig
from slopit.schemas import SlopitSession
from slopit.schemas.analysis import AnalysisResult
from slopit.schemas.flags import AnalysisFlag
from slopit.schemas.types import JsonValue
def jaccard_similarity(a: set[str], b: set[str]) -> float:
"""Compute Jaccard similarity between two sets."""
if not a and not b:
return 1.0
intersection = len(a & b)
union = len(a | b)
return intersection / union if union > 0 else 0.0
@dataclass
class SimilarityConfig(AnalyzerConfig):
"""Configuration for similarity analysis."""
min_similarity_threshold: float = 0.8
min_response_words: int = 20
class SimilarityAnalyzer(Analyzer):
"""Detects suspiciously similar responses across trials."""
def __init__(self, config: SimilarityConfig | None = None) -> None:
self.config = config or SimilarityConfig()
@property
def name(self) -> str:
return "similarity"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
# Extract text responses
responses: list[tuple[str, str]] = [] # (trial_id, text)
for trial in session.trials:
if trial.response is None:
continue
if trial.response.type != "text":
continue
if not isinstance(trial.response.value, str):
continue
text = trial.response.value
words = text.lower().split()
if len(words) >= self.config.min_response_words:
responses.append((trial.trial_id, text))
# Compute pairwise similarities
flags: list[AnalysisFlag] = []
similarities: list[dict[str, JsonValue]] = []
for i, (trial_a, text_a) in enumerate(responses):
words_a = set(text_a.lower().split())
for trial_b, text_b in responses[i + 1:]:
words_b = set(text_b.lower().split())
sim = jaccard_similarity(words_a, words_b)
if sim >= self.config.min_similarity_threshold:
similarities.append({
"trial_a": trial_a,
"trial_b": trial_b,
"similarity": sim,
})
flags.append(
AnalysisFlag(
type="high_response_similarity",
analyzer=self.name,
severity="medium",
message=f"Responses highly similar ({sim:.0%} word overlap)",
confidence=sim,
evidence={"similarity": sim},
trial_ids=[trial_a, trial_b],
)
)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=[],
flags=flags,
session_summary={
"responses_compared": len(responses),
"high_similarity_pairs": len(similarities),
},
)
Cross-Session Analysis
For analyzers that need to compare across sessions, override analyze_sessions:
class HomogeneityAnalyzer(Analyzer):
"""Detects identical responses across different participants."""
@property
def name(self) -> str:
return "homogeneity"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
# Single-session analysis (can be empty for cross-session analyzers)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=[],
flags=[],
session_summary={},
)
def analyze_sessions(self, sessions: list[SlopitSession]) -> list[AnalysisResult]:
"""Compare responses across all sessions."""
# Collect all responses
response_to_sessions: dict[str, list[str]] = {}
for session in sessions:
for trial in session.trials:
if trial.response and isinstance(trial.response.value, str):
text = trial.response.value.strip().lower()
if text:
if text not in response_to_sessions:
response_to_sessions[text] = []
response_to_sessions[text].append(session.session_id)
# Find duplicate responses
duplicates = {
text: session_ids
for text, session_ids in response_to_sessions.items()
if len(session_ids) > 1
}
# Generate results
results: list[AnalysisResult] = []
flagged_sessions = set()
for text, session_ids in duplicates.items():
for session_id in session_ids:
flagged_sessions.add(session_id)
for session in sessions:
flags: list[AnalysisFlag] = []
if session.session_id in flagged_sessions:
flags.append(
AnalysisFlag(
type="duplicate_response",
analyzer=self.name,
severity="high",
message="Response identical to another participant",
confidence=0.95,
evidence={},
trial_ids=None,
)
)
results.append(
AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=[],
flags=flags,
session_summary={
"is_duplicate": session.session_id in flagged_sessions,
},
)
)
return results
Using Custom Analyzers
from slopit import load_sessions
from slopit.pipeline import AnalysisPipeline
# Import your custom analyzers
from my_analyzers import ResponseLengthAnalyzer, SimilarityAnalyzer
sessions = load_sessions("data/")
pipeline = AnalysisPipeline([
ResponseLengthAnalyzer(),
SimilarityAnalyzer(),
])
result = pipeline.analyze(sessions)
Best Practices
- Single responsibility: Each analyzer should detect one type of pattern
- Clear flag types: Use descriptive, unique flag type names
- Configurable thresholds: Make detection parameters configurable
- Meaningful confidence: Compute confidence based on how far the metric exceeds the threshold
- Rich evidence: Include relevant metrics in the evidence field
- Trial IDs: Always include trial_ids when the flag is trial-specific
- Session summary: Provide useful aggregate statistics
- Type safety: Use proper type annotations throughout
- Documentation: Include docstrings with NumPy format
Testing Analyzers
import pytest
from slopit.schemas import SlopitSession, SlopitTrial, ResponseInfo
from my_analyzers import ResponseLengthAnalyzer, ResponseLengthConfig
@pytest.fixture
def fast_response_session() -> SlopitSession:
"""Session with unrealistically fast response."""
return SlopitSession(
schema_version="1.0",
session_id="test",
platform=...,
environment=...,
timing=...,
trials=[
SlopitTrial(
trial_id="trial-0",
trial_index=0,
start_time=0,
end_time=1000,
rt=1000, # 1 second
response=ResponseInfo(
type="text",
value="x" * 1000, # 1000 characters
character_count=1000,
),
)
],
global_events=...,
)
class TestResponseLengthAnalyzer:
def test_flags_fast_response(self, fast_response_session):
"""Should flag unrealistically fast typing."""
analyzer = ResponseLengthAnalyzer()
result = analyzer.analyze_session(fast_response_session)
assert len(result.flags) == 1
assert result.flags[0].type == "unrealistic_typing_speed"
def test_custom_threshold(self, fast_response_session):
"""Should respect custom threshold."""
config = ResponseLengthConfig(max_chars_per_minute=100000)
analyzer = ResponseLengthAnalyzer(config)
result = analyzer.analyze_session(fast_response_session)
assert len(result.flags) == 0 # High threshold, no flag