Analyzers
The slopit.behavioral module provides analyzers for detecting AI-assisted responses through behavioral patterns.
Analyzer Base Class
Analyzer
Abstract base class that all analyzers inherit from.
slopit.behavioral.base.Analyzer
Bases: ABC
Base class for all analyzers.
Analyzers process session data and produce analysis results
containing metrics and flags.
Attributes:
| Name |
Type |
Description |
name |
str
|
Unique identifier for this analyzer.
|
Source code in src/slopit/behavioral/base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 | class Analyzer(ABC):
"""Base class for all analyzers.
Analyzers process session data and produce analysis results
containing metrics and flags.
Attributes
----------
name
Unique identifier for this analyzer.
"""
@property
@abstractmethod
def name(self) -> str:
"""Unique identifier for this analyzer."""
...
@abstractmethod
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze a single session.
Parameters
----------
session
Session data to analyze.
Returns
-------
AnalysisResult
Analysis results including metrics and flags.
"""
...
def analyze_sessions(self, sessions: list[SlopitSession]) -> list[AnalysisResult]:
"""Analyze multiple sessions.
Override this method for cross-session analysis
(e.g., homogeneity detection).
Parameters
----------
sessions
List of sessions to analyze.
Returns
-------
list[AnalysisResult]
Analysis results for each session.
"""
return [self.analyze_session(s) for s in sessions]
|
name
abstractmethod
property
Unique identifier for this analyzer.
analyze_session
abstractmethod
analyze_session(session: SlopitSession) -> AnalysisResult
Analyze a single session.
Parameters:
Returns:
| Type |
Description |
AnalysisResult
|
Analysis results including metrics and flags.
|
Source code in src/slopit/behavioral/base.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 | @abstractmethod
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze a single session.
Parameters
----------
session
Session data to analyze.
Returns
-------
AnalysisResult
Analysis results including metrics and flags.
"""
...
|
analyze_sessions
analyze_sessions(sessions: list[SlopitSession]) -> list[AnalysisResult]
Analyze multiple sessions.
Override this method for cross-session analysis
(e.g., homogeneity detection).
Parameters:
| Name |
Type |
Description |
Default |
sessions
|
list[SlopitSession]
|
List of sessions to analyze.
|
required
|
Returns:
| Type |
Description |
list[AnalysisResult]
|
Analysis results for each session.
|
Source code in src/slopit/behavioral/base.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 | def analyze_sessions(self, sessions: list[SlopitSession]) -> list[AnalysisResult]:
"""Analyze multiple sessions.
Override this method for cross-session analysis
(e.g., homogeneity detection).
Parameters
----------
sessions
List of sessions to analyze.
Returns
-------
list[AnalysisResult]
Analysis results for each session.
"""
return [self.analyze_session(s) for s in sessions]
|
AnalyzerConfig
Base configuration class for analyzers.
slopit.behavioral.base.AnalyzerConfig
dataclass
Base configuration for analyzers.
Source code in src/slopit/behavioral/base.py
| @dataclass
class AnalyzerConfig:
"""Base configuration for analyzers."""
pass
|
Keystroke Analyzer
Analyzes keystroke dynamics to detect transcription patterns. Transcription (typing from a pre-written source) produces different keystroke patterns than authentic composition.
KeystrokeAnalyzer
slopit.behavioral.keystroke.KeystrokeAnalyzer
Bases: Analyzer
Analyzer for keystroke dynamics.
Detects transcription patterns by analyzing inter-keystroke intervals,
revision behavior, and typing burst characteristics.
Parameters:
Examples:
>>> from slopit import load_session
>>> from slopit.behavioral import KeystrokeAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = KeystrokeAnalyzer()
>>> result = analyzer.analyze_session(session)
>>>
>>> for flag in result.flags:
... print(f"{flag.type}: {flag.message}")
Source code in src/slopit/behavioral/keystroke.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 | class KeystrokeAnalyzer(Analyzer):
"""Analyzer for keystroke dynamics.
Detects transcription patterns by analyzing inter-keystroke intervals,
revision behavior, and typing burst characteristics.
Parameters
----------
config
Analyzer configuration.
Examples
--------
>>> from slopit import load_session
>>> from slopit.behavioral import KeystrokeAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = KeystrokeAnalyzer()
>>> result = analyzer.analyze_session(session)
>>>
>>> for flag in result.flags:
... print(f"{flag.type}: {flag.message}")
"""
def __init__(self, config: KeystrokeAnalyzerConfig | None = None) -> None:
self.config = config or KeystrokeAnalyzerConfig()
@property
def name(self) -> str:
return "keystroke"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze keystroke patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
trial_metrics: list[KeystrokeMetrics] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
if not self._has_sufficient_data(trial):
continue
# Safe to assert: _has_sufficient_data validates these are not None
assert trial.behavioral is not None
assert trial.behavioral.keystrokes is not None
keystrokes = trial.behavioral.keystrokes
metrics = self._compute_metrics(keystrokes)
flags = self._compute_flags(trial.trial_id, metrics)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.model_dump(),
"flags": [f.model_dump() for f in flags],
}
)
trial_metrics.append(metrics)
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(trial_metrics, trial_results),
)
def _has_sufficient_data(self, trial: SlopitTrial) -> bool:
"""Check if trial has sufficient keystroke data."""
if trial.behavioral is None:
return False
keystrokes = trial.behavioral.keystrokes
if keystrokes is None:
return False
return len(keystrokes) >= self.config.min_keystrokes
def _compute_metrics(self, keystrokes: list[KeystrokeEvent]) -> KeystrokeMetrics:
"""Compute metrics from keystroke events."""
keydowns = [k for k in keystrokes if k.event == "keydown"]
ikis = self._compute_ikis(keydowns)
printable = sum(1 for k in keydowns if len(k.key) == 1)
deletions = sum(1 for k in keydowns if k.key in {"Backspace", "Delete"})
pause_count = sum(1 for iki in ikis if iki > self.config.pause_threshold_ms)
final_length = keydowns[-1].text_length if keydowns and keydowns[-1].text_length else 0
total = len(keydowns)
ppr = final_length / total if total > 0 else 0.0
return KeystrokeMetrics(
total_keystrokes=len(keydowns),
printable_keystrokes=printable,
deletions=deletions,
mean_iki=float(np.mean(ikis)) if len(ikis) > 0 else 0.0,
std_iki=float(np.std(ikis)) if len(ikis) > 0 else 0.0,
median_iki=float(np.median(ikis)) if len(ikis) > 0 else 0.0,
pause_count=pause_count,
product_process_ratio=ppr,
)
def _compute_ikis(self, keydowns: list[KeystrokeEvent]) -> NDArray[np.float64]:
"""Compute inter-keystroke intervals."""
if len(keydowns) < 2:
return np.array([], dtype=np.float64)
times = np.array([k.time for k in keydowns], dtype=np.float64)
return np.diff(times)
def _compute_flags(self, trial_id: str, metrics: KeystrokeMetrics) -> list[AnalysisFlag]:
"""Generate flags based on metrics."""
flags: list[AnalysisFlag] = []
if metrics.std_iki < self.config.min_iki_std_threshold:
flags.append(
AnalysisFlag(
type="low_iki_variance",
analyzer=self.name,
severity="medium",
message=f"Keystroke timing unusually consistent (std={metrics.std_iki:.1f}ms)",
confidence=self._iki_confidence(metrics.std_iki),
evidence={"std_iki": metrics.std_iki},
trial_ids=[trial_id],
)
)
if metrics.product_process_ratio > self.config.max_ppr_threshold:
flags.append(
AnalysisFlag(
type="minimal_revision",
analyzer=self.name,
severity="low",
message=f"Very few revisions during composition (PPR={metrics.product_process_ratio:.2f})",
confidence=0.6,
evidence={"product_process_ratio": metrics.product_process_ratio},
trial_ids=[trial_id],
)
)
if metrics.deletions == 0 and metrics.total_keystrokes > 50:
flags.append(
AnalysisFlag(
type="no_deletions",
analyzer=self.name,
severity="low",
message="No deletion keystrokes in extended response",
confidence=0.5,
evidence={
"deletions": 0,
"total_keystrokes": metrics.total_keystrokes,
},
trial_ids=[trial_id],
)
)
return flags
def _iki_confidence(self, std_iki: float) -> float:
"""Compute confidence for IKI variance flag."""
threshold = self.config.min_iki_std_threshold
if std_iki >= threshold:
return 0.0
return min(1.0, (threshold - std_iki) / threshold)
def _compute_session_summary(
self, trial_metrics: list[KeystrokeMetrics], trial_results: list[dict[str, JsonValue]]
) -> dict[str, JsonValue]:
"""Compute session-level summary."""
if not trial_metrics:
return {"trials_analyzed": 0}
all_mean_ikis = [m.mean_iki for m in trial_metrics]
all_std_ikis = [m.std_iki for m in trial_metrics]
total_flags = sum(
len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
)
return {
"trials_analyzed": len(trial_metrics),
"mean_iki_across_trials": float(np.mean(all_mean_ikis)),
"std_iki_across_trials": float(np.mean(all_std_ikis)),
"total_flags": total_flags,
}
|
analyze_session
analyze_session(session: SlopitSession) -> AnalysisResult
Analyze keystroke patterns for a session.
Source code in src/slopit/behavioral/keystroke.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 | def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze keystroke patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
trial_metrics: list[KeystrokeMetrics] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
if not self._has_sufficient_data(trial):
continue
# Safe to assert: _has_sufficient_data validates these are not None
assert trial.behavioral is not None
assert trial.behavioral.keystrokes is not None
keystrokes = trial.behavioral.keystrokes
metrics = self._compute_metrics(keystrokes)
flags = self._compute_flags(trial.trial_id, metrics)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.model_dump(),
"flags": [f.model_dump() for f in flags],
}
)
trial_metrics.append(metrics)
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(trial_metrics, trial_results),
)
|
KeystrokeAnalyzerConfig
slopit.behavioral.keystroke.KeystrokeAnalyzerConfig
dataclass
Bases: AnalyzerConfig
Configuration for keystroke analysis.
Attributes:
| Name |
Type |
Description |
pause_threshold_ms |
float
|
Minimum IKI to count as a pause (milliseconds).
|
burst_threshold_ms |
float
|
Maximum IKI within a typing burst (milliseconds).
|
min_keystrokes |
int
|
Minimum keystrokes required for analysis.
|
min_iki_std_threshold |
float
|
Minimum IKI standard deviation for authentic typing.
Lower values suggest transcription.
|
max_ppr_threshold |
float
|
Maximum product-process ratio for authentic typing.
Higher values suggest minimal revision.
|
Source code in src/slopit/behavioral/keystroke.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | @dataclass
class KeystrokeAnalyzerConfig(AnalyzerConfig):
"""Configuration for keystroke analysis.
Attributes
----------
pause_threshold_ms
Minimum IKI to count as a pause (milliseconds).
burst_threshold_ms
Maximum IKI within a typing burst (milliseconds).
min_keystrokes
Minimum keystrokes required for analysis.
min_iki_std_threshold
Minimum IKI standard deviation for authentic typing.
Lower values suggest transcription.
max_ppr_threshold
Maximum product-process ratio for authentic typing.
Higher values suggest minimal revision.
"""
pause_threshold_ms: float = 2000.0
burst_threshold_ms: float = 500.0
min_keystrokes: int = 20
min_iki_std_threshold: float = 100.0
max_ppr_threshold: float = 0.95
|
Detection Signals
| Flag Type |
Severity |
Description |
low_iki_variance |
medium |
Keystroke timing unusually consistent |
minimal_revision |
low |
Very few revisions during composition |
no_deletions |
low |
No deletion keystrokes in extended response |
Example
from slopit import load_sessions
from slopit.behavioral import KeystrokeAnalyzer, KeystrokeAnalyzerConfig
# Custom configuration
config = KeystrokeAnalyzerConfig(
pause_threshold_ms=2000.0, # Pauses longer than 2s
burst_threshold_ms=500.0, # Typing bursts under 500ms IKI
min_keystrokes=20, # Minimum keystrokes for analysis
min_iki_std_threshold=100.0, # Flag if std IKI < 100ms
max_ppr_threshold=0.95, # Flag if product/process ratio > 0.95
)
analyzer = KeystrokeAnalyzer(config)
# Analyze sessions
sessions = load_sessions("data/")
for session in sessions:
result = analyzer.analyze_session(session)
for flag in result.flags:
print(f"{flag.type}: {flag.message}")
Metrics Computed
- mean_iki: Mean inter-keystroke interval (milliseconds)
- std_iki: Standard deviation of IKI
- median_iki: Median IKI
- total_keystrokes: Total keydown events
- printable_keystrokes: Printable character keystrokes
- deletions: Backspace and Delete keystrokes
- pause_count: Pauses exceeding threshold
- product_process_ratio: Final length / total keystrokes
Focus Analyzer
Analyzes focus and visibility events to detect patterns suggesting external assistance (tab switching to AI tools).
FocusAnalyzer
slopit.behavioral.focus.FocusAnalyzer
Bases: Analyzer
Analyzer for focus and visibility patterns.
Detects patterns that suggest external assistance such as
excessive tab switching or extended hidden periods.
Parameters:
Examples:
>>> from slopit import load_session
>>> from slopit.behavioral import FocusAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = FocusAnalyzer()
>>> result = analyzer.analyze_session(session)
Source code in src/slopit/behavioral/focus.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 | class FocusAnalyzer(Analyzer):
"""Analyzer for focus and visibility patterns.
Detects patterns that suggest external assistance such as
excessive tab switching or extended hidden periods.
Parameters
----------
config
Analyzer configuration.
Examples
--------
>>> from slopit import load_session
>>> from slopit.behavioral import FocusAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = FocusAnalyzer()
>>> result = analyzer.analyze_session(session)
"""
def __init__(self, config: FocusAnalyzerConfig | None = None) -> None:
self.config = config or FocusAnalyzerConfig()
@property
def name(self) -> str:
return "focus"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze focus patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
trial_metrics: list[FocusMetrics] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
if not self._has_focus_data(trial):
continue
# Safe to assert: _has_focus_data validates these are not None
assert trial.behavioral is not None
assert trial.behavioral.focus is not None
focus_events = trial.behavioral.focus
metrics = self._compute_metrics(focus_events)
flags = self._compute_flags(trial.trial_id, metrics, trial)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.to_dict(),
"flags": [f.model_dump() for f in flags],
}
)
trial_metrics.append(metrics)
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(trial_metrics, trial_results),
)
def _has_focus_data(self, trial: SlopitTrial) -> bool:
"""Check if trial has focus data."""
return (
trial.behavioral is not None
and trial.behavioral.focus is not None
and len(trial.behavioral.focus) > 0
)
def _compute_metrics(self, focus_events: list[FocusEvent]) -> FocusMetrics:
"""Compute metrics from focus events."""
blur_count = sum(1 for e in focus_events if e.event == "blur")
total_blur_duration = sum(e.blur_duration or 0 for e in focus_events if e.event == "blur")
hidden_count = sum(
1 for e in focus_events if e.event == "visibilitychange" and e.visibility == "hidden"
)
# Calculate hidden duration
total_hidden_duration = 0.0
last_hidden_time: float | None = None
for event in focus_events:
if event.event == "visibilitychange":
if event.visibility == "hidden":
last_hidden_time = event.time
elif event.visibility == "visible" and last_hidden_time is not None:
total_hidden_duration += event.time - last_hidden_time
last_hidden_time = None
return FocusMetrics(
blur_count=blur_count,
total_blur_duration=total_blur_duration,
hidden_count=hidden_count,
total_hidden_duration=total_hidden_duration,
)
def _compute_flags(
self, trial_id: str, metrics: FocusMetrics, trial: SlopitTrial
) -> list[AnalysisFlag]:
"""Generate flags based on focus metrics."""
flags: list[AnalysisFlag] = []
# Excessive blur events
if metrics.blur_count > self.config.max_blur_count:
flags.append(
AnalysisFlag(
type="excessive_blur",
analyzer=self.name,
severity="medium",
message=f"Excessive window switches detected ({metrics.blur_count} blur events)",
confidence=min(1.0, metrics.blur_count / 10),
evidence={"blur_count": metrics.blur_count},
trial_ids=[trial_id],
)
)
# Long hidden duration
if metrics.total_hidden_duration > self.config.max_hidden_duration_ms:
flags.append(
AnalysisFlag(
type="extended_hidden",
analyzer=self.name,
severity="medium",
message=f"Extended tab switch detected ({metrics.total_hidden_duration / 1000:.1f}s hidden)",
confidence=min(1.0, metrics.total_hidden_duration / 60000),
evidence={"total_hidden_duration_ms": metrics.total_hidden_duration},
trial_ids=[trial_id],
)
)
# Check for blur-paste pattern
if self._detect_blur_paste_pattern(trial):
flags.append(
AnalysisFlag(
type="blur_paste_pattern",
analyzer=self.name,
severity="high",
message="Paste event detected shortly after tab switch",
confidence=0.8,
evidence={},
trial_ids=[trial_id],
)
)
return flags
def _detect_blur_paste_pattern(self, trial: SlopitTrial) -> bool:
"""Detect if there's a blur followed by paste within window."""
if trial.behavioral is None:
return False
focus_events = trial.behavioral.focus or []
paste_events = trial.behavioral.paste or []
if not focus_events or not paste_events:
return False
for focus_event in focus_events:
if focus_event.event == "focus":
# Check for paste shortly after refocus
for paste_event in paste_events:
time_diff = paste_event.time - focus_event.time
if 0 <= time_diff <= self.config.blur_paste_window_ms:
return True
return False
def _compute_session_summary(
self, trial_metrics: list[FocusMetrics], trial_results: list[dict[str, JsonValue]]
) -> dict[str, JsonValue]:
"""Compute session-level summary."""
if not trial_metrics:
return {"trials_analyzed": 0}
total_blur = sum(m.blur_count for m in trial_metrics)
total_flags = sum(
len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
)
return {
"trials_analyzed": len(trial_metrics),
"total_blur_events": total_blur,
"total_flags": total_flags,
}
|
analyze_session
analyze_session(session: SlopitSession) -> AnalysisResult
Analyze focus patterns for a session.
Source code in src/slopit/behavioral/focus.py
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze focus patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
trial_metrics: list[FocusMetrics] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
if not self._has_focus_data(trial):
continue
# Safe to assert: _has_focus_data validates these are not None
assert trial.behavioral is not None
assert trial.behavioral.focus is not None
focus_events = trial.behavioral.focus
metrics = self._compute_metrics(focus_events)
flags = self._compute_flags(trial.trial_id, metrics, trial)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.to_dict(),
"flags": [f.model_dump() for f in flags],
}
)
trial_metrics.append(metrics)
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(trial_metrics, trial_results),
)
|
FocusAnalyzerConfig
slopit.behavioral.focus.FocusAnalyzerConfig
dataclass
Bases: AnalyzerConfig
Configuration for focus analysis.
Attributes:
| Name |
Type |
Description |
max_blur_count |
int
|
Maximum blur events before flagging.
|
max_hidden_duration_ms |
float
|
Maximum hidden duration in milliseconds.
|
blur_paste_window_ms |
float
|
Window for detecting blur-paste patterns.
|
Source code in src/slopit/behavioral/focus.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | @dataclass
class FocusAnalyzerConfig(AnalyzerConfig):
"""Configuration for focus analysis.
Attributes
----------
max_blur_count
Maximum blur events before flagging.
max_hidden_duration_ms
Maximum hidden duration in milliseconds.
blur_paste_window_ms
Window for detecting blur-paste patterns.
"""
max_blur_count: int = 5
max_hidden_duration_ms: float = 30000.0
blur_paste_window_ms: float = 5000.0
|
Detection Signals
| Flag Type |
Severity |
Description |
excessive_blur |
medium |
Many window blur events |
extended_hidden |
medium |
Long periods with tab hidden |
blur_paste_pattern |
high |
Paste shortly after tab switch |
Example
from slopit.behavioral import FocusAnalyzer, FocusAnalyzerConfig
config = FocusAnalyzerConfig(
max_blur_count=5, # Flag if more than 5 blur events
max_hidden_duration_ms=30000, # Flag if hidden > 30 seconds
blur_paste_window_ms=5000, # Detect paste within 5s of refocus
)
analyzer = FocusAnalyzer(config)
result = analyzer.analyze_session(session)
Metrics Computed
- blur_count: Number of window blur events
- total_blur_duration: Total time with window blurred
- hidden_count: Number of visibility hidden events
- total_hidden_duration: Total time with document hidden
Timing Analyzer
Analyzes response timing to detect suspiciously fast responses or unusual consistency.
TimingAnalyzer
slopit.behavioral.timing.TimingAnalyzer
Bases: Analyzer
Analyzer for response timing patterns.
Detects suspiciously fast responses or unusually consistent
timing across trials.
Parameters:
Examples:
>>> from slopit import load_session
>>> from slopit.behavioral import TimingAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = TimingAnalyzer()
>>> result = analyzer.analyze_session(session)
Source code in src/slopit/behavioral/timing.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 | class TimingAnalyzer(Analyzer):
"""Analyzer for response timing patterns.
Detects suspiciously fast responses or unusually consistent
timing across trials.
Parameters
----------
config
Analyzer configuration.
Examples
--------
>>> from slopit import load_session
>>> from slopit.behavioral import TimingAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = TimingAnalyzer()
>>> result = analyzer.analyze_session(session)
"""
def __init__(self, config: TimingAnalyzerConfig | None = None) -> None:
self.config = config or TimingAnalyzerConfig()
@property
def name(self) -> str:
return "timing"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze timing patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
all_flags: list[AnalysisFlag] = []
rts: list[float] = []
for trial in session.trials:
if trial.rt is None:
continue
metrics = self._compute_trial_metrics(trial)
flags = self._compute_trial_flags(trial.trial_id, trial, metrics)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.to_dict(),
"flags": [f.model_dump() for f in flags],
}
)
all_flags.extend(flags)
rts.append(trial.rt)
# Check for consistent timing across trials
session_flags = self._check_session_consistency(rts, session)
all_flags.extend(session_flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(rts, trial_results),
)
def _compute_trial_metrics(self, trial: SlopitTrial) -> TimingMetrics:
"""Compute timing metrics for a trial."""
rt = trial.rt or 0
char_count = trial.response.character_count if trial.response else None
ms_per_char = rt / char_count if char_count and char_count > 0 else None
chars_per_minute = (char_count / (rt / 60000)) if rt > 0 and char_count else None
return TimingMetrics(
rt=rt,
character_count=char_count,
ms_per_char=ms_per_char,
chars_per_minute=chars_per_minute,
)
def _compute_trial_flags(
self, trial_id: str, trial: SlopitTrial, metrics: TimingMetrics
) -> list[AnalysisFlag]:
"""Generate flags for a single trial."""
flags: list[AnalysisFlag] = []
rt = trial.rt or 0
char_count = metrics.character_count or 0
# Instant response detection
if (
rt < self.config.instant_response_threshold_ms
and char_count > self.config.instant_response_min_chars
):
flags.append(
AnalysisFlag(
type="instant_response",
analyzer=self.name,
severity="high",
message=f"Suspiciously fast response ({rt}ms for {char_count} chars)",
confidence=0.9,
evidence={
"rt": rt,
"character_count": char_count,
},
trial_ids=[trial_id],
)
)
# Too fast per character
ms_per_char = metrics.ms_per_char
if ms_per_char is not None and ms_per_char < self.config.min_rt_per_char_ms:
flags.append(
AnalysisFlag(
type="fast_typing",
analyzer=self.name,
severity="medium",
message=f"Typing speed exceeds human capability ({ms_per_char:.1f}ms/char)",
confidence=0.7,
evidence={"ms_per_char": ms_per_char},
trial_ids=[trial_id],
)
)
return flags
def _check_session_consistency(
self,
rts: list[float],
session: SlopitSession, # noqa: ARG002
) -> list[AnalysisFlag]:
"""Check for suspiciously consistent timing across trials."""
flags: list[AnalysisFlag] = []
if len(rts) < 3:
return flags
rt_array = np.array(rts)
mean_rt = float(np.mean(rt_array))
std_rt = float(np.std(rt_array))
# Coefficient of variation
cv = std_rt / mean_rt if mean_rt > 0 else float("inf")
if cv < self.config.max_rt_cv_threshold:
flags.append(
AnalysisFlag(
type="consistent_timing",
analyzer=self.name,
severity="medium",
message=f"Unusually consistent response times across trials (CV={cv:.3f})",
confidence=0.6,
evidence={
"coefficient_of_variation": cv,
"mean_rt": mean_rt,
"std_rt": std_rt,
},
trial_ids=None,
)
)
return flags
def _compute_session_summary(
self, rts: list[float], trial_results: list[dict[str, JsonValue]]
) -> dict[str, JsonValue]:
"""Compute session-level summary."""
if not rts:
return {"trials_analyzed": 0}
rt_array = np.array(rts)
total_flags = sum(
len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
)
return {
"trials_analyzed": len(trial_results),
"mean_rt": float(np.mean(rt_array)),
"std_rt": float(np.std(rt_array)),
"min_rt": float(np.min(rt_array)),
"max_rt": float(np.max(rt_array)),
"total_flags": total_flags,
}
|
analyze_session
analyze_session(session: SlopitSession) -> AnalysisResult
Analyze timing patterns for a session.
Source code in src/slopit/behavioral/timing.py
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 | def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze timing patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
all_flags: list[AnalysisFlag] = []
rts: list[float] = []
for trial in session.trials:
if trial.rt is None:
continue
metrics = self._compute_trial_metrics(trial)
flags = self._compute_trial_flags(trial.trial_id, trial, metrics)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.to_dict(),
"flags": [f.model_dump() for f in flags],
}
)
all_flags.extend(flags)
rts.append(trial.rt)
# Check for consistent timing across trials
session_flags = self._check_session_consistency(rts, session)
all_flags.extend(session_flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(rts, trial_results),
)
|
TimingAnalyzerConfig
slopit.behavioral.timing.TimingAnalyzerConfig
dataclass
Bases: AnalyzerConfig
Configuration for timing analysis.
Attributes:
| Name |
Type |
Description |
min_rt_per_char_ms |
float
|
Minimum expected milliseconds per character.
|
max_rt_cv_threshold |
float
|
Maximum coefficient of variation for RT.
|
instant_response_threshold_ms |
float
|
Threshold for instant response detection.
|
instant_response_min_chars |
int
|
Minimum characters for instant response flag.
|
Source code in src/slopit/behavioral/timing.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 | @dataclass
class TimingAnalyzerConfig(AnalyzerConfig):
"""Configuration for timing analysis.
Attributes
----------
min_rt_per_char_ms
Minimum expected milliseconds per character.
max_rt_cv_threshold
Maximum coefficient of variation for RT.
instant_response_threshold_ms
Threshold for instant response detection.
instant_response_min_chars
Minimum characters for instant response flag.
"""
min_rt_per_char_ms: float = 20.0
max_rt_cv_threshold: float = 0.1
instant_response_threshold_ms: float = 2000.0
instant_response_min_chars: int = 100
|
Detection Signals
| Flag Type |
Severity |
Description |
instant_response |
high |
Very fast response for character count |
fast_typing |
medium |
Typing speed exceeds human capability |
consistent_timing |
medium |
Unusually consistent RT across trials |
Example
from slopit.behavioral import TimingAnalyzer, TimingAnalyzerConfig
config = TimingAnalyzerConfig(
min_rt_per_char_ms=20.0, # Minimum 20ms per character
max_rt_cv_threshold=0.1, # Flag if CV < 0.1
instant_response_threshold_ms=2000, # Instant if < 2s
instant_response_min_chars=100, # For responses > 100 chars
)
analyzer = TimingAnalyzer(config)
result = analyzer.analyze_session(session)
Metrics Computed
- rt: Response time (milliseconds)
- character_count: Characters in response
- ms_per_char: Milliseconds per character
- chars_per_minute: Characters per minute
Paste Analyzer
Analyzes paste events to detect copy/paste behavior.
PasteAnalyzer
slopit.behavioral.paste.PasteAnalyzer
Bases: Analyzer
Analyzer for paste events and clipboard usage.
Detects suspicious paste patterns such as large pastes
without prior typing.
Parameters:
Examples:
>>> from slopit import load_session
>>> from slopit.behavioral import PasteAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = PasteAnalyzer()
>>> result = analyzer.analyze_session(session)
Source code in src/slopit/behavioral/paste.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 | class PasteAnalyzer(Analyzer):
"""Analyzer for paste events and clipboard usage.
Detects suspicious paste patterns such as large pastes
without prior typing.
Parameters
----------
config
Analyzer configuration.
Examples
--------
>>> from slopit import load_session
>>> from slopit.behavioral import PasteAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = PasteAnalyzer()
>>> result = analyzer.analyze_session(session)
"""
def __init__(self, config: PasteAnalyzerConfig | None = None) -> None:
self.config = config or PasteAnalyzerConfig()
@property
def name(self) -> str:
return "paste"
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze paste patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
trial_metrics: list[PasteMetrics] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
if not self._has_paste_data(trial):
continue
# Safe to assert: _has_paste_data validates these are not None
assert trial.behavioral is not None
assert trial.behavioral.paste is not None
paste_events = trial.behavioral.paste
metrics = self._compute_metrics(paste_events)
flags = self._compute_flags(trial.trial_id, paste_events)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.to_dict(),
"flags": [f.model_dump() for f in flags],
}
)
trial_metrics.append(metrics)
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(trial_metrics, trial_results),
)
def _has_paste_data(self, trial: SlopitTrial) -> bool:
"""Check if trial has paste data."""
return (
trial.behavioral is not None
and trial.behavioral.paste is not None
and len(trial.behavioral.paste) > 0
)
def _compute_metrics(self, paste_events: list[PasteEvent]) -> PasteMetrics:
"""Compute metrics from paste events."""
total_pasted = sum(e.text_length for e in paste_events)
blocked_count = sum(1 for e in paste_events if e.blocked)
large_pastes = sum(
1 for e in paste_events if e.text_length >= self.config.large_paste_threshold
)
return PasteMetrics(
paste_count=len(paste_events),
total_pasted_chars=total_pasted,
blocked_count=blocked_count,
large_paste_count=large_pastes,
)
def _compute_flags(self, trial_id: str, paste_events: list[PasteEvent]) -> list[AnalysisFlag]:
"""Generate flags based on paste events."""
flags: list[AnalysisFlag] = []
for event in paste_events:
# Large paste
if event.text_length >= self.config.large_paste_threshold:
flags.append(
AnalysisFlag(
type="large_paste",
analyzer=self.name,
severity="medium",
message=f"Large paste detected ({event.text_length} characters)",
confidence=0.7,
evidence={
"text_length": event.text_length,
"time": event.time,
},
trial_ids=[trial_id],
)
)
# Paste without prior typing
if event.preceding_keystrokes <= self.config.suspicious_preceding_keystrokes:
severity = "high" if event.text_length >= 100 else "medium"
flags.append(
AnalysisFlag(
type="paste_without_typing",
analyzer=self.name,
severity=severity,
message=f"Paste with minimal prior typing ({event.preceding_keystrokes} keystrokes before)",
confidence=0.8,
evidence={
"preceding_keystrokes": event.preceding_keystrokes,
"text_length": event.text_length,
},
trial_ids=[trial_id],
)
)
return flags
def _compute_session_summary(
self, trial_metrics: list[PasteMetrics], trial_results: list[dict[str, JsonValue]]
) -> dict[str, JsonValue]:
"""Compute session-level summary."""
if not trial_metrics:
return {"trials_analyzed": 0}
total_pastes = sum(m.paste_count for m in trial_metrics)
total_chars = sum(m.total_pasted_chars for m in trial_metrics)
total_flags = sum(
len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
)
return {
"trials_analyzed": len(trial_metrics),
"total_paste_events": total_pastes,
"total_pasted_chars": total_chars,
"total_flags": total_flags,
}
|
analyze_session
analyze_session(session: SlopitSession) -> AnalysisResult
Analyze paste patterns for a session.
Source code in src/slopit/behavioral/paste.py
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 | def analyze_session(self, session: SlopitSession) -> AnalysisResult:
"""Analyze paste patterns for a session."""
trial_results: list[dict[str, JsonValue]] = []
trial_metrics: list[PasteMetrics] = []
all_flags: list[AnalysisFlag] = []
for trial in session.trials:
if not self._has_paste_data(trial):
continue
# Safe to assert: _has_paste_data validates these are not None
assert trial.behavioral is not None
assert trial.behavioral.paste is not None
paste_events = trial.behavioral.paste
metrics = self._compute_metrics(paste_events)
flags = self._compute_flags(trial.trial_id, paste_events)
trial_results.append(
{
"trial_id": trial.trial_id,
"metrics": metrics.to_dict(),
"flags": [f.model_dump() for f in flags],
}
)
trial_metrics.append(metrics)
all_flags.extend(flags)
return AnalysisResult(
analyzer=self.name,
session_id=session.session_id,
trials=trial_results,
flags=all_flags,
session_summary=self._compute_session_summary(trial_metrics, trial_results),
)
|
PasteAnalyzerConfig
slopit.behavioral.paste.PasteAnalyzerConfig
dataclass
Bases: AnalyzerConfig
Configuration for paste analysis.
Attributes:
| Name |
Type |
Description |
large_paste_threshold |
int
|
Minimum characters to flag as large paste.
|
suspicious_preceding_keystrokes |
int
|
Maximum preceding keystrokes for suspicious paste.
|
Source code in src/slopit/behavioral/paste.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | @dataclass
class PasteAnalyzerConfig(AnalyzerConfig):
"""Configuration for paste analysis.
Attributes
----------
large_paste_threshold
Minimum characters to flag as large paste.
suspicious_preceding_keystrokes
Maximum preceding keystrokes for suspicious paste.
"""
large_paste_threshold: int = 50
suspicious_preceding_keystrokes: int = 5
|
Detection Signals
| Flag Type |
Severity |
Description |
large_paste |
medium |
Pasted substantial text |
paste_without_typing |
medium/high |
Paste with minimal prior keystrokes |
Example
from slopit.behavioral import PasteAnalyzer, PasteAnalyzerConfig
config = PasteAnalyzerConfig(
large_paste_threshold=50, # Flag pastes > 50 chars
suspicious_preceding_keystrokes=5, # Flag if < 5 keystrokes before
)
analyzer = PasteAnalyzer(config)
result = analyzer.analyze_session(session)
Metrics Computed
- paste_count: Number of paste events
- total_pasted_chars: Total characters pasted
- blocked_count: Number of blocked paste events
- large_paste_count: Number of large paste events
Combining Analyzers
Use the pipeline to run multiple analyzers and combine their results:
from slopit.pipeline import AnalysisPipeline
from slopit.behavioral import (
KeystrokeAnalyzer,
FocusAnalyzer,
TimingAnalyzer,
PasteAnalyzer,
)
pipeline = AnalysisPipeline([
KeystrokeAnalyzer(),
FocusAnalyzer(),
TimingAnalyzer(),
PasteAnalyzer(),
])
result = pipeline.analyze(sessions)
See Pipeline for aggregation configuration.
Writing Custom Analyzers
See Custom Analyzers Guide for instructions on creating your own analyzers.