Skip to content

Analyzers

The slopit.behavioral module provides analyzers for detecting AI-assisted responses through behavioral patterns.

Analyzer Base Class

Analyzer

Abstract base class that all analyzers inherit from.

slopit.behavioral.base.Analyzer

Bases: ABC

Base class for all analyzers.

Analyzers process session data and produce analysis results containing metrics and flags.

Attributes:

Name Type Description
name str

Unique identifier for this analyzer.

Source code in src/slopit/behavioral/base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Analyzer(ABC):
    """Base class for all analyzers.

    Analyzers process session data and produce analysis results
    containing metrics and flags.

    Attributes
    ----------
    name
        Unique identifier for this analyzer.
    """

    @property
    @abstractmethod
    def name(self) -> str:
        """Unique identifier for this analyzer."""
        ...

    @abstractmethod
    def analyze_session(self, session: SlopitSession) -> AnalysisResult:
        """Analyze a single session.

        Parameters
        ----------
        session
            Session data to analyze.

        Returns
        -------
        AnalysisResult
            Analysis results including metrics and flags.
        """
        ...

    def analyze_sessions(self, sessions: list[SlopitSession]) -> list[AnalysisResult]:
        """Analyze multiple sessions.

        Override this method for cross-session analysis
        (e.g., homogeneity detection).

        Parameters
        ----------
        sessions
            List of sessions to analyze.

        Returns
        -------
        list[AnalysisResult]
            Analysis results for each session.
        """
        return [self.analyze_session(s) for s in sessions]

name abstractmethod property

name: str

Unique identifier for this analyzer.

analyze_session abstractmethod

analyze_session(session: SlopitSession) -> AnalysisResult

Analyze a single session.

Parameters:

Name Type Description Default
session SlopitSession

Session data to analyze.

required

Returns:

Type Description
AnalysisResult

Analysis results including metrics and flags.

Source code in src/slopit/behavioral/base.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@abstractmethod
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
    """Analyze a single session.

    Parameters
    ----------
    session
        Session data to analyze.

    Returns
    -------
    AnalysisResult
        Analysis results including metrics and flags.
    """
    ...

analyze_sessions

analyze_sessions(sessions: list[SlopitSession]) -> list[AnalysisResult]

Analyze multiple sessions.

Override this method for cross-session analysis (e.g., homogeneity detection).

Parameters:

Name Type Description Default
sessions list[SlopitSession]

List of sessions to analyze.

required

Returns:

Type Description
list[AnalysisResult]

Analysis results for each session.

Source code in src/slopit/behavioral/base.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def analyze_sessions(self, sessions: list[SlopitSession]) -> list[AnalysisResult]:
    """Analyze multiple sessions.

    Override this method for cross-session analysis
    (e.g., homogeneity detection).

    Parameters
    ----------
    sessions
        List of sessions to analyze.

    Returns
    -------
    list[AnalysisResult]
        Analysis results for each session.
    """
    return [self.analyze_session(s) for s in sessions]

AnalyzerConfig

Base configuration class for analyzers.

slopit.behavioral.base.AnalyzerConfig dataclass

Base configuration for analyzers.

Source code in src/slopit/behavioral/base.py
13
14
15
16
17
@dataclass
class AnalyzerConfig:
    """Base configuration for analyzers."""

    pass

Keystroke Analyzer

Analyzes keystroke dynamics to detect transcription patterns. Transcription (typing from a pre-written source) produces different keystroke patterns than authentic composition.

KeystrokeAnalyzer

slopit.behavioral.keystroke.KeystrokeAnalyzer

Bases: Analyzer

Analyzer for keystroke dynamics.

Detects transcription patterns by analyzing inter-keystroke intervals, revision behavior, and typing burst characteristics.

Parameters:

Name Type Description Default
config KeystrokeAnalyzerConfig | None

Analyzer configuration.

None

Examples:

>>> from slopit import load_session
>>> from slopit.behavioral import KeystrokeAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = KeystrokeAnalyzer()
>>> result = analyzer.analyze_session(session)
>>>
>>> for flag in result.flags:
...     print(f"{flag.type}: {flag.message}")
Source code in src/slopit/behavioral/keystroke.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class KeystrokeAnalyzer(Analyzer):
    """Analyzer for keystroke dynamics.

    Detects transcription patterns by analyzing inter-keystroke intervals,
    revision behavior, and typing burst characteristics.

    Parameters
    ----------
    config
        Analyzer configuration.

    Examples
    --------
    >>> from slopit import load_session
    >>> from slopit.behavioral import KeystrokeAnalyzer
    >>>
    >>> session = load_session("data/session.json")
    >>> analyzer = KeystrokeAnalyzer()
    >>> result = analyzer.analyze_session(session)
    >>>
    >>> for flag in result.flags:
    ...     print(f"{flag.type}: {flag.message}")
    """

    def __init__(self, config: KeystrokeAnalyzerConfig | None = None) -> None:
        self.config = config or KeystrokeAnalyzerConfig()

    @property
    def name(self) -> str:
        return "keystroke"

    def analyze_session(self, session: SlopitSession) -> AnalysisResult:
        """Analyze keystroke patterns for a session."""
        trial_results: list[dict[str, JsonValue]] = []
        trial_metrics: list[KeystrokeMetrics] = []
        all_flags: list[AnalysisFlag] = []

        for trial in session.trials:
            if not self._has_sufficient_data(trial):
                continue

            # Safe to assert: _has_sufficient_data validates these are not None
            assert trial.behavioral is not None
            assert trial.behavioral.keystrokes is not None
            keystrokes = trial.behavioral.keystrokes
            metrics = self._compute_metrics(keystrokes)
            flags = self._compute_flags(trial.trial_id, metrics)

            trial_results.append(
                {
                    "trial_id": trial.trial_id,
                    "metrics": metrics.model_dump(),
                    "flags": [f.model_dump() for f in flags],
                }
            )
            trial_metrics.append(metrics)
            all_flags.extend(flags)

        return AnalysisResult(
            analyzer=self.name,
            session_id=session.session_id,
            trials=trial_results,
            flags=all_flags,
            session_summary=self._compute_session_summary(trial_metrics, trial_results),
        )

    def _has_sufficient_data(self, trial: SlopitTrial) -> bool:
        """Check if trial has sufficient keystroke data."""
        if trial.behavioral is None:
            return False

        keystrokes = trial.behavioral.keystrokes
        if keystrokes is None:
            return False
        return len(keystrokes) >= self.config.min_keystrokes

    def _compute_metrics(self, keystrokes: list[KeystrokeEvent]) -> KeystrokeMetrics:
        """Compute metrics from keystroke events."""
        keydowns = [k for k in keystrokes if k.event == "keydown"]
        ikis = self._compute_ikis(keydowns)

        printable = sum(1 for k in keydowns if len(k.key) == 1)
        deletions = sum(1 for k in keydowns if k.key in {"Backspace", "Delete"})

        pause_count = sum(1 for iki in ikis if iki > self.config.pause_threshold_ms)

        final_length = keydowns[-1].text_length if keydowns and keydowns[-1].text_length else 0
        total = len(keydowns)
        ppr = final_length / total if total > 0 else 0.0

        return KeystrokeMetrics(
            total_keystrokes=len(keydowns),
            printable_keystrokes=printable,
            deletions=deletions,
            mean_iki=float(np.mean(ikis)) if len(ikis) > 0 else 0.0,
            std_iki=float(np.std(ikis)) if len(ikis) > 0 else 0.0,
            median_iki=float(np.median(ikis)) if len(ikis) > 0 else 0.0,
            pause_count=pause_count,
            product_process_ratio=ppr,
        )

    def _compute_ikis(self, keydowns: list[KeystrokeEvent]) -> NDArray[np.float64]:
        """Compute inter-keystroke intervals."""
        if len(keydowns) < 2:
            return np.array([], dtype=np.float64)

        times = np.array([k.time for k in keydowns], dtype=np.float64)
        return np.diff(times)

    def _compute_flags(self, trial_id: str, metrics: KeystrokeMetrics) -> list[AnalysisFlag]:
        """Generate flags based on metrics."""
        flags: list[AnalysisFlag] = []

        if metrics.std_iki < self.config.min_iki_std_threshold:
            flags.append(
                AnalysisFlag(
                    type="low_iki_variance",
                    analyzer=self.name,
                    severity="medium",
                    message=f"Keystroke timing unusually consistent (std={metrics.std_iki:.1f}ms)",
                    confidence=self._iki_confidence(metrics.std_iki),
                    evidence={"std_iki": metrics.std_iki},
                    trial_ids=[trial_id],
                )
            )

        if metrics.product_process_ratio > self.config.max_ppr_threshold:
            flags.append(
                AnalysisFlag(
                    type="minimal_revision",
                    analyzer=self.name,
                    severity="low",
                    message=f"Very few revisions during composition (PPR={metrics.product_process_ratio:.2f})",
                    confidence=0.6,
                    evidence={"product_process_ratio": metrics.product_process_ratio},
                    trial_ids=[trial_id],
                )
            )

        if metrics.deletions == 0 and metrics.total_keystrokes > 50:
            flags.append(
                AnalysisFlag(
                    type="no_deletions",
                    analyzer=self.name,
                    severity="low",
                    message="No deletion keystrokes in extended response",
                    confidence=0.5,
                    evidence={
                        "deletions": 0,
                        "total_keystrokes": metrics.total_keystrokes,
                    },
                    trial_ids=[trial_id],
                )
            )

        return flags

    def _iki_confidence(self, std_iki: float) -> float:
        """Compute confidence for IKI variance flag."""
        threshold = self.config.min_iki_std_threshold
        if std_iki >= threshold:
            return 0.0

        return min(1.0, (threshold - std_iki) / threshold)

    def _compute_session_summary(
        self, trial_metrics: list[KeystrokeMetrics], trial_results: list[dict[str, JsonValue]]
    ) -> dict[str, JsonValue]:
        """Compute session-level summary."""
        if not trial_metrics:
            return {"trials_analyzed": 0}

        all_mean_ikis = [m.mean_iki for m in trial_metrics]
        all_std_ikis = [m.std_iki for m in trial_metrics]
        total_flags = sum(
            len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
        )

        return {
            "trials_analyzed": len(trial_metrics),
            "mean_iki_across_trials": float(np.mean(all_mean_ikis)),
            "std_iki_across_trials": float(np.mean(all_std_ikis)),
            "total_flags": total_flags,
        }

analyze_session

analyze_session(session: SlopitSession) -> AnalysisResult

Analyze keystroke patterns for a session.

Source code in src/slopit/behavioral/keystroke.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
    """Analyze keystroke patterns for a session."""
    trial_results: list[dict[str, JsonValue]] = []
    trial_metrics: list[KeystrokeMetrics] = []
    all_flags: list[AnalysisFlag] = []

    for trial in session.trials:
        if not self._has_sufficient_data(trial):
            continue

        # Safe to assert: _has_sufficient_data validates these are not None
        assert trial.behavioral is not None
        assert trial.behavioral.keystrokes is not None
        keystrokes = trial.behavioral.keystrokes
        metrics = self._compute_metrics(keystrokes)
        flags = self._compute_flags(trial.trial_id, metrics)

        trial_results.append(
            {
                "trial_id": trial.trial_id,
                "metrics": metrics.model_dump(),
                "flags": [f.model_dump() for f in flags],
            }
        )
        trial_metrics.append(metrics)
        all_flags.extend(flags)

    return AnalysisResult(
        analyzer=self.name,
        session_id=session.session_id,
        trials=trial_results,
        flags=all_flags,
        session_summary=self._compute_session_summary(trial_metrics, trial_results),
    )

KeystrokeAnalyzerConfig

slopit.behavioral.keystroke.KeystrokeAnalyzerConfig dataclass

Bases: AnalyzerConfig

Configuration for keystroke analysis.

Attributes:

Name Type Description
pause_threshold_ms float

Minimum IKI to count as a pause (milliseconds).

burst_threshold_ms float

Maximum IKI within a typing burst (milliseconds).

min_keystrokes int

Minimum keystrokes required for analysis.

min_iki_std_threshold float

Minimum IKI standard deviation for authentic typing. Lower values suggest transcription.

max_ppr_threshold float

Maximum product-process ratio for authentic typing. Higher values suggest minimal revision.

Source code in src/slopit/behavioral/keystroke.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclass
class KeystrokeAnalyzerConfig(AnalyzerConfig):
    """Configuration for keystroke analysis.

    Attributes
    ----------
    pause_threshold_ms
        Minimum IKI to count as a pause (milliseconds).
    burst_threshold_ms
        Maximum IKI within a typing burst (milliseconds).
    min_keystrokes
        Minimum keystrokes required for analysis.
    min_iki_std_threshold
        Minimum IKI standard deviation for authentic typing.
        Lower values suggest transcription.
    max_ppr_threshold
        Maximum product-process ratio for authentic typing.
        Higher values suggest minimal revision.
    """

    pause_threshold_ms: float = 2000.0
    burst_threshold_ms: float = 500.0
    min_keystrokes: int = 20
    min_iki_std_threshold: float = 100.0
    max_ppr_threshold: float = 0.95

Detection Signals

Flag Type Severity Description
low_iki_variance medium Keystroke timing unusually consistent
minimal_revision low Very few revisions during composition
no_deletions low No deletion keystrokes in extended response

Example

from slopit import load_sessions
from slopit.behavioral import KeystrokeAnalyzer, KeystrokeAnalyzerConfig

# Custom configuration
config = KeystrokeAnalyzerConfig(
    pause_threshold_ms=2000.0,      # Pauses longer than 2s
    burst_threshold_ms=500.0,       # Typing bursts under 500ms IKI
    min_keystrokes=20,              # Minimum keystrokes for analysis
    min_iki_std_threshold=100.0,    # Flag if std IKI < 100ms
    max_ppr_threshold=0.95,         # Flag if product/process ratio > 0.95
)

analyzer = KeystrokeAnalyzer(config)

# Analyze sessions
sessions = load_sessions("data/")
for session in sessions:
    result = analyzer.analyze_session(session)

    for flag in result.flags:
        print(f"{flag.type}: {flag.message}")

Metrics Computed

  • mean_iki: Mean inter-keystroke interval (milliseconds)
  • std_iki: Standard deviation of IKI
  • median_iki: Median IKI
  • total_keystrokes: Total keydown events
  • printable_keystrokes: Printable character keystrokes
  • deletions: Backspace and Delete keystrokes
  • pause_count: Pauses exceeding threshold
  • product_process_ratio: Final length / total keystrokes

Focus Analyzer

Analyzes focus and visibility events to detect patterns suggesting external assistance (tab switching to AI tools).

FocusAnalyzer

slopit.behavioral.focus.FocusAnalyzer

Bases: Analyzer

Analyzer for focus and visibility patterns.

Detects patterns that suggest external assistance such as excessive tab switching or extended hidden periods.

Parameters:

Name Type Description Default
config FocusAnalyzerConfig | None

Analyzer configuration.

None

Examples:

>>> from slopit import load_session
>>> from slopit.behavioral import FocusAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = FocusAnalyzer()
>>> result = analyzer.analyze_session(session)
Source code in src/slopit/behavioral/focus.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class FocusAnalyzer(Analyzer):
    """Analyzer for focus and visibility patterns.

    Detects patterns that suggest external assistance such as
    excessive tab switching or extended hidden periods.

    Parameters
    ----------
    config
        Analyzer configuration.

    Examples
    --------
    >>> from slopit import load_session
    >>> from slopit.behavioral import FocusAnalyzer
    >>>
    >>> session = load_session("data/session.json")
    >>> analyzer = FocusAnalyzer()
    >>> result = analyzer.analyze_session(session)
    """

    def __init__(self, config: FocusAnalyzerConfig | None = None) -> None:
        self.config = config or FocusAnalyzerConfig()

    @property
    def name(self) -> str:
        return "focus"

    def analyze_session(self, session: SlopitSession) -> AnalysisResult:
        """Analyze focus patterns for a session."""
        trial_results: list[dict[str, JsonValue]] = []
        trial_metrics: list[FocusMetrics] = []
        all_flags: list[AnalysisFlag] = []

        for trial in session.trials:
            if not self._has_focus_data(trial):
                continue

            # Safe to assert: _has_focus_data validates these are not None
            assert trial.behavioral is not None
            assert trial.behavioral.focus is not None
            focus_events = trial.behavioral.focus
            metrics = self._compute_metrics(focus_events)
            flags = self._compute_flags(trial.trial_id, metrics, trial)

            trial_results.append(
                {
                    "trial_id": trial.trial_id,
                    "metrics": metrics.to_dict(),
                    "flags": [f.model_dump() for f in flags],
                }
            )
            trial_metrics.append(metrics)
            all_flags.extend(flags)

        return AnalysisResult(
            analyzer=self.name,
            session_id=session.session_id,
            trials=trial_results,
            flags=all_flags,
            session_summary=self._compute_session_summary(trial_metrics, trial_results),
        )

    def _has_focus_data(self, trial: SlopitTrial) -> bool:
        """Check if trial has focus data."""
        return (
            trial.behavioral is not None
            and trial.behavioral.focus is not None
            and len(trial.behavioral.focus) > 0
        )

    def _compute_metrics(self, focus_events: list[FocusEvent]) -> FocusMetrics:
        """Compute metrics from focus events."""
        blur_count = sum(1 for e in focus_events if e.event == "blur")
        total_blur_duration = sum(e.blur_duration or 0 for e in focus_events if e.event == "blur")

        hidden_count = sum(
            1 for e in focus_events if e.event == "visibilitychange" and e.visibility == "hidden"
        )

        # Calculate hidden duration
        total_hidden_duration = 0.0
        last_hidden_time: float | None = None
        for event in focus_events:
            if event.event == "visibilitychange":
                if event.visibility == "hidden":
                    last_hidden_time = event.time
                elif event.visibility == "visible" and last_hidden_time is not None:
                    total_hidden_duration += event.time - last_hidden_time
                    last_hidden_time = None

        return FocusMetrics(
            blur_count=blur_count,
            total_blur_duration=total_blur_duration,
            hidden_count=hidden_count,
            total_hidden_duration=total_hidden_duration,
        )

    def _compute_flags(
        self, trial_id: str, metrics: FocusMetrics, trial: SlopitTrial
    ) -> list[AnalysisFlag]:
        """Generate flags based on focus metrics."""
        flags: list[AnalysisFlag] = []

        # Excessive blur events
        if metrics.blur_count > self.config.max_blur_count:
            flags.append(
                AnalysisFlag(
                    type="excessive_blur",
                    analyzer=self.name,
                    severity="medium",
                    message=f"Excessive window switches detected ({metrics.blur_count} blur events)",
                    confidence=min(1.0, metrics.blur_count / 10),
                    evidence={"blur_count": metrics.blur_count},
                    trial_ids=[trial_id],
                )
            )

        # Long hidden duration
        if metrics.total_hidden_duration > self.config.max_hidden_duration_ms:
            flags.append(
                AnalysisFlag(
                    type="extended_hidden",
                    analyzer=self.name,
                    severity="medium",
                    message=f"Extended tab switch detected ({metrics.total_hidden_duration / 1000:.1f}s hidden)",
                    confidence=min(1.0, metrics.total_hidden_duration / 60000),
                    evidence={"total_hidden_duration_ms": metrics.total_hidden_duration},
                    trial_ids=[trial_id],
                )
            )

        # Check for blur-paste pattern
        if self._detect_blur_paste_pattern(trial):
            flags.append(
                AnalysisFlag(
                    type="blur_paste_pattern",
                    analyzer=self.name,
                    severity="high",
                    message="Paste event detected shortly after tab switch",
                    confidence=0.8,
                    evidence={},
                    trial_ids=[trial_id],
                )
            )

        return flags

    def _detect_blur_paste_pattern(self, trial: SlopitTrial) -> bool:
        """Detect if there's a blur followed by paste within window."""
        if trial.behavioral is None:
            return False

        focus_events = trial.behavioral.focus or []
        paste_events = trial.behavioral.paste or []

        if not focus_events or not paste_events:
            return False

        for focus_event in focus_events:
            if focus_event.event == "focus":
                # Check for paste shortly after refocus
                for paste_event in paste_events:
                    time_diff = paste_event.time - focus_event.time
                    if 0 <= time_diff <= self.config.blur_paste_window_ms:
                        return True

        return False

    def _compute_session_summary(
        self, trial_metrics: list[FocusMetrics], trial_results: list[dict[str, JsonValue]]
    ) -> dict[str, JsonValue]:
        """Compute session-level summary."""
        if not trial_metrics:
            return {"trials_analyzed": 0}

        total_blur = sum(m.blur_count for m in trial_metrics)
        total_flags = sum(
            len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
        )

        return {
            "trials_analyzed": len(trial_metrics),
            "total_blur_events": total_blur,
            "total_flags": total_flags,
        }

analyze_session

analyze_session(session: SlopitSession) -> AnalysisResult

Analyze focus patterns for a session.

Source code in src/slopit/behavioral/focus.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
    """Analyze focus patterns for a session."""
    trial_results: list[dict[str, JsonValue]] = []
    trial_metrics: list[FocusMetrics] = []
    all_flags: list[AnalysisFlag] = []

    for trial in session.trials:
        if not self._has_focus_data(trial):
            continue

        # Safe to assert: _has_focus_data validates these are not None
        assert trial.behavioral is not None
        assert trial.behavioral.focus is not None
        focus_events = trial.behavioral.focus
        metrics = self._compute_metrics(focus_events)
        flags = self._compute_flags(trial.trial_id, metrics, trial)

        trial_results.append(
            {
                "trial_id": trial.trial_id,
                "metrics": metrics.to_dict(),
                "flags": [f.model_dump() for f in flags],
            }
        )
        trial_metrics.append(metrics)
        all_flags.extend(flags)

    return AnalysisResult(
        analyzer=self.name,
        session_id=session.session_id,
        trials=trial_results,
        flags=all_flags,
        session_summary=self._compute_session_summary(trial_metrics, trial_results),
    )

FocusAnalyzerConfig

slopit.behavioral.focus.FocusAnalyzerConfig dataclass

Bases: AnalyzerConfig

Configuration for focus analysis.

Attributes:

Name Type Description
max_blur_count int

Maximum blur events before flagging.

max_hidden_duration_ms float

Maximum hidden duration in milliseconds.

blur_paste_window_ms float

Window for detecting blur-paste patterns.

Source code in src/slopit/behavioral/focus.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@dataclass
class FocusAnalyzerConfig(AnalyzerConfig):
    """Configuration for focus analysis.

    Attributes
    ----------
    max_blur_count
        Maximum blur events before flagging.
    max_hidden_duration_ms
        Maximum hidden duration in milliseconds.
    blur_paste_window_ms
        Window for detecting blur-paste patterns.
    """

    max_blur_count: int = 5
    max_hidden_duration_ms: float = 30000.0
    blur_paste_window_ms: float = 5000.0

Detection Signals

Flag Type Severity Description
excessive_blur medium Many window blur events
extended_hidden medium Long periods with tab hidden
blur_paste_pattern high Paste shortly after tab switch

Example

from slopit.behavioral import FocusAnalyzer, FocusAnalyzerConfig

config = FocusAnalyzerConfig(
    max_blur_count=5,               # Flag if more than 5 blur events
    max_hidden_duration_ms=30000,   # Flag if hidden > 30 seconds
    blur_paste_window_ms=5000,      # Detect paste within 5s of refocus
)

analyzer = FocusAnalyzer(config)
result = analyzer.analyze_session(session)

Metrics Computed

  • blur_count: Number of window blur events
  • total_blur_duration: Total time with window blurred
  • hidden_count: Number of visibility hidden events
  • total_hidden_duration: Total time with document hidden

Timing Analyzer

Analyzes response timing to detect suspiciously fast responses or unusual consistency.

TimingAnalyzer

slopit.behavioral.timing.TimingAnalyzer

Bases: Analyzer

Analyzer for response timing patterns.

Detects suspiciously fast responses or unusually consistent timing across trials.

Parameters:

Name Type Description Default
config TimingAnalyzerConfig | None

Analyzer configuration.

None

Examples:

>>> from slopit import load_session
>>> from slopit.behavioral import TimingAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = TimingAnalyzer()
>>> result = analyzer.analyze_session(session)
Source code in src/slopit/behavioral/timing.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class TimingAnalyzer(Analyzer):
    """Analyzer for response timing patterns.

    Detects suspiciously fast responses or unusually consistent
    timing across trials.

    Parameters
    ----------
    config
        Analyzer configuration.

    Examples
    --------
    >>> from slopit import load_session
    >>> from slopit.behavioral import TimingAnalyzer
    >>>
    >>> session = load_session("data/session.json")
    >>> analyzer = TimingAnalyzer()
    >>> result = analyzer.analyze_session(session)
    """

    def __init__(self, config: TimingAnalyzerConfig | None = None) -> None:
        self.config = config or TimingAnalyzerConfig()

    @property
    def name(self) -> str:
        return "timing"

    def analyze_session(self, session: SlopitSession) -> AnalysisResult:
        """Analyze timing patterns for a session."""
        trial_results: list[dict[str, JsonValue]] = []
        all_flags: list[AnalysisFlag] = []
        rts: list[float] = []

        for trial in session.trials:
            if trial.rt is None:
                continue

            metrics = self._compute_trial_metrics(trial)
            flags = self._compute_trial_flags(trial.trial_id, trial, metrics)

            trial_results.append(
                {
                    "trial_id": trial.trial_id,
                    "metrics": metrics.to_dict(),
                    "flags": [f.model_dump() for f in flags],
                }
            )
            all_flags.extend(flags)
            rts.append(trial.rt)

        # Check for consistent timing across trials
        session_flags = self._check_session_consistency(rts, session)
        all_flags.extend(session_flags)

        return AnalysisResult(
            analyzer=self.name,
            session_id=session.session_id,
            trials=trial_results,
            flags=all_flags,
            session_summary=self._compute_session_summary(rts, trial_results),
        )

    def _compute_trial_metrics(self, trial: SlopitTrial) -> TimingMetrics:
        """Compute timing metrics for a trial."""
        rt = trial.rt or 0
        char_count = trial.response.character_count if trial.response else None

        ms_per_char = rt / char_count if char_count and char_count > 0 else None
        chars_per_minute = (char_count / (rt / 60000)) if rt > 0 and char_count else None

        return TimingMetrics(
            rt=rt,
            character_count=char_count,
            ms_per_char=ms_per_char,
            chars_per_minute=chars_per_minute,
        )

    def _compute_trial_flags(
        self, trial_id: str, trial: SlopitTrial, metrics: TimingMetrics
    ) -> list[AnalysisFlag]:
        """Generate flags for a single trial."""
        flags: list[AnalysisFlag] = []

        rt = trial.rt or 0
        char_count = metrics.character_count or 0

        # Instant response detection
        if (
            rt < self.config.instant_response_threshold_ms
            and char_count > self.config.instant_response_min_chars
        ):
            flags.append(
                AnalysisFlag(
                    type="instant_response",
                    analyzer=self.name,
                    severity="high",
                    message=f"Suspiciously fast response ({rt}ms for {char_count} chars)",
                    confidence=0.9,
                    evidence={
                        "rt": rt,
                        "character_count": char_count,
                    },
                    trial_ids=[trial_id],
                )
            )

        # Too fast per character
        ms_per_char = metrics.ms_per_char
        if ms_per_char is not None and ms_per_char < self.config.min_rt_per_char_ms:
            flags.append(
                AnalysisFlag(
                    type="fast_typing",
                    analyzer=self.name,
                    severity="medium",
                    message=f"Typing speed exceeds human capability ({ms_per_char:.1f}ms/char)",
                    confidence=0.7,
                    evidence={"ms_per_char": ms_per_char},
                    trial_ids=[trial_id],
                )
            )

        return flags

    def _check_session_consistency(
        self,
        rts: list[float],
        session: SlopitSession,  # noqa: ARG002
    ) -> list[AnalysisFlag]:
        """Check for suspiciously consistent timing across trials."""
        flags: list[AnalysisFlag] = []

        if len(rts) < 3:
            return flags

        rt_array = np.array(rts)
        mean_rt = float(np.mean(rt_array))
        std_rt = float(np.std(rt_array))

        # Coefficient of variation
        cv = std_rt / mean_rt if mean_rt > 0 else float("inf")

        if cv < self.config.max_rt_cv_threshold:
            flags.append(
                AnalysisFlag(
                    type="consistent_timing",
                    analyzer=self.name,
                    severity="medium",
                    message=f"Unusually consistent response times across trials (CV={cv:.3f})",
                    confidence=0.6,
                    evidence={
                        "coefficient_of_variation": cv,
                        "mean_rt": mean_rt,
                        "std_rt": std_rt,
                    },
                    trial_ids=None,
                )
            )

        return flags

    def _compute_session_summary(
        self, rts: list[float], trial_results: list[dict[str, JsonValue]]
    ) -> dict[str, JsonValue]:
        """Compute session-level summary."""
        if not rts:
            return {"trials_analyzed": 0}

        rt_array = np.array(rts)
        total_flags = sum(
            len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
        )

        return {
            "trials_analyzed": len(trial_results),
            "mean_rt": float(np.mean(rt_array)),
            "std_rt": float(np.std(rt_array)),
            "min_rt": float(np.min(rt_array)),
            "max_rt": float(np.max(rt_array)),
            "total_flags": total_flags,
        }

analyze_session

analyze_session(session: SlopitSession) -> AnalysisResult

Analyze timing patterns for a session.

Source code in src/slopit/behavioral/timing.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
    """Analyze timing patterns for a session."""
    trial_results: list[dict[str, JsonValue]] = []
    all_flags: list[AnalysisFlag] = []
    rts: list[float] = []

    for trial in session.trials:
        if trial.rt is None:
            continue

        metrics = self._compute_trial_metrics(trial)
        flags = self._compute_trial_flags(trial.trial_id, trial, metrics)

        trial_results.append(
            {
                "trial_id": trial.trial_id,
                "metrics": metrics.to_dict(),
                "flags": [f.model_dump() for f in flags],
            }
        )
        all_flags.extend(flags)
        rts.append(trial.rt)

    # Check for consistent timing across trials
    session_flags = self._check_session_consistency(rts, session)
    all_flags.extend(session_flags)

    return AnalysisResult(
        analyzer=self.name,
        session_id=session.session_id,
        trials=trial_results,
        flags=all_flags,
        session_summary=self._compute_session_summary(rts, trial_results),
    )

TimingAnalyzerConfig

slopit.behavioral.timing.TimingAnalyzerConfig dataclass

Bases: AnalyzerConfig

Configuration for timing analysis.

Attributes:

Name Type Description
min_rt_per_char_ms float

Minimum expected milliseconds per character.

max_rt_cv_threshold float

Maximum coefficient of variation for RT.

instant_response_threshold_ms float

Threshold for instant response detection.

instant_response_min_chars int

Minimum characters for instant response flag.

Source code in src/slopit/behavioral/timing.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@dataclass
class TimingAnalyzerConfig(AnalyzerConfig):
    """Configuration for timing analysis.

    Attributes
    ----------
    min_rt_per_char_ms
        Minimum expected milliseconds per character.
    max_rt_cv_threshold
        Maximum coefficient of variation for RT.
    instant_response_threshold_ms
        Threshold for instant response detection.
    instant_response_min_chars
        Minimum characters for instant response flag.
    """

    min_rt_per_char_ms: float = 20.0
    max_rt_cv_threshold: float = 0.1
    instant_response_threshold_ms: float = 2000.0
    instant_response_min_chars: int = 100

Detection Signals

Flag Type Severity Description
instant_response high Very fast response for character count
fast_typing medium Typing speed exceeds human capability
consistent_timing medium Unusually consistent RT across trials

Example

from slopit.behavioral import TimingAnalyzer, TimingAnalyzerConfig

config = TimingAnalyzerConfig(
    min_rt_per_char_ms=20.0,              # Minimum 20ms per character
    max_rt_cv_threshold=0.1,              # Flag if CV < 0.1
    instant_response_threshold_ms=2000,   # Instant if < 2s
    instant_response_min_chars=100,       # For responses > 100 chars
)

analyzer = TimingAnalyzer(config)
result = analyzer.analyze_session(session)

Metrics Computed

  • rt: Response time (milliseconds)
  • character_count: Characters in response
  • ms_per_char: Milliseconds per character
  • chars_per_minute: Characters per minute

Paste Analyzer

Analyzes paste events to detect copy/paste behavior.

PasteAnalyzer

slopit.behavioral.paste.PasteAnalyzer

Bases: Analyzer

Analyzer for paste events and clipboard usage.

Detects suspicious paste patterns such as large pastes without prior typing.

Parameters:

Name Type Description Default
config PasteAnalyzerConfig | None

Analyzer configuration.

None

Examples:

>>> from slopit import load_session
>>> from slopit.behavioral import PasteAnalyzer
>>>
>>> session = load_session("data/session.json")
>>> analyzer = PasteAnalyzer()
>>> result = analyzer.analyze_session(session)
Source code in src/slopit/behavioral/paste.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class PasteAnalyzer(Analyzer):
    """Analyzer for paste events and clipboard usage.

    Detects suspicious paste patterns such as large pastes
    without prior typing.

    Parameters
    ----------
    config
        Analyzer configuration.

    Examples
    --------
    >>> from slopit import load_session
    >>> from slopit.behavioral import PasteAnalyzer
    >>>
    >>> session = load_session("data/session.json")
    >>> analyzer = PasteAnalyzer()
    >>> result = analyzer.analyze_session(session)
    """

    def __init__(self, config: PasteAnalyzerConfig | None = None) -> None:
        self.config = config or PasteAnalyzerConfig()

    @property
    def name(self) -> str:
        return "paste"

    def analyze_session(self, session: SlopitSession) -> AnalysisResult:
        """Analyze paste patterns for a session."""
        trial_results: list[dict[str, JsonValue]] = []
        trial_metrics: list[PasteMetrics] = []
        all_flags: list[AnalysisFlag] = []

        for trial in session.trials:
            if not self._has_paste_data(trial):
                continue

            # Safe to assert: _has_paste_data validates these are not None
            assert trial.behavioral is not None
            assert trial.behavioral.paste is not None
            paste_events = trial.behavioral.paste
            metrics = self._compute_metrics(paste_events)
            flags = self._compute_flags(trial.trial_id, paste_events)

            trial_results.append(
                {
                    "trial_id": trial.trial_id,
                    "metrics": metrics.to_dict(),
                    "flags": [f.model_dump() for f in flags],
                }
            )
            trial_metrics.append(metrics)
            all_flags.extend(flags)

        return AnalysisResult(
            analyzer=self.name,
            session_id=session.session_id,
            trials=trial_results,
            flags=all_flags,
            session_summary=self._compute_session_summary(trial_metrics, trial_results),
        )

    def _has_paste_data(self, trial: SlopitTrial) -> bool:
        """Check if trial has paste data."""
        return (
            trial.behavioral is not None
            and trial.behavioral.paste is not None
            and len(trial.behavioral.paste) > 0
        )

    def _compute_metrics(self, paste_events: list[PasteEvent]) -> PasteMetrics:
        """Compute metrics from paste events."""
        total_pasted = sum(e.text_length for e in paste_events)
        blocked_count = sum(1 for e in paste_events if e.blocked)
        large_pastes = sum(
            1 for e in paste_events if e.text_length >= self.config.large_paste_threshold
        )

        return PasteMetrics(
            paste_count=len(paste_events),
            total_pasted_chars=total_pasted,
            blocked_count=blocked_count,
            large_paste_count=large_pastes,
        )

    def _compute_flags(self, trial_id: str, paste_events: list[PasteEvent]) -> list[AnalysisFlag]:
        """Generate flags based on paste events."""
        flags: list[AnalysisFlag] = []

        for event in paste_events:
            # Large paste
            if event.text_length >= self.config.large_paste_threshold:
                flags.append(
                    AnalysisFlag(
                        type="large_paste",
                        analyzer=self.name,
                        severity="medium",
                        message=f"Large paste detected ({event.text_length} characters)",
                        confidence=0.7,
                        evidence={
                            "text_length": event.text_length,
                            "time": event.time,
                        },
                        trial_ids=[trial_id],
                    )
                )

            # Paste without prior typing
            if event.preceding_keystrokes <= self.config.suspicious_preceding_keystrokes:
                severity = "high" if event.text_length >= 100 else "medium"
                flags.append(
                    AnalysisFlag(
                        type="paste_without_typing",
                        analyzer=self.name,
                        severity=severity,
                        message=f"Paste with minimal prior typing ({event.preceding_keystrokes} keystrokes before)",
                        confidence=0.8,
                        evidence={
                            "preceding_keystrokes": event.preceding_keystrokes,
                            "text_length": event.text_length,
                        },
                        trial_ids=[trial_id],
                    )
                )

        return flags

    def _compute_session_summary(
        self, trial_metrics: list[PasteMetrics], trial_results: list[dict[str, JsonValue]]
    ) -> dict[str, JsonValue]:
        """Compute session-level summary."""
        if not trial_metrics:
            return {"trials_analyzed": 0}

        total_pastes = sum(m.paste_count for m in trial_metrics)
        total_chars = sum(m.total_pasted_chars for m in trial_metrics)
        total_flags = sum(
            len(r["flags"]) if isinstance(r["flags"], list) else 0 for r in trial_results
        )

        return {
            "trials_analyzed": len(trial_metrics),
            "total_paste_events": total_pastes,
            "total_pasted_chars": total_chars,
            "total_flags": total_flags,
        }

analyze_session

analyze_session(session: SlopitSession) -> AnalysisResult

Analyze paste patterns for a session.

Source code in src/slopit/behavioral/paste.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def analyze_session(self, session: SlopitSession) -> AnalysisResult:
    """Analyze paste patterns for a session."""
    trial_results: list[dict[str, JsonValue]] = []
    trial_metrics: list[PasteMetrics] = []
    all_flags: list[AnalysisFlag] = []

    for trial in session.trials:
        if not self._has_paste_data(trial):
            continue

        # Safe to assert: _has_paste_data validates these are not None
        assert trial.behavioral is not None
        assert trial.behavioral.paste is not None
        paste_events = trial.behavioral.paste
        metrics = self._compute_metrics(paste_events)
        flags = self._compute_flags(trial.trial_id, paste_events)

        trial_results.append(
            {
                "trial_id": trial.trial_id,
                "metrics": metrics.to_dict(),
                "flags": [f.model_dump() for f in flags],
            }
        )
        trial_metrics.append(metrics)
        all_flags.extend(flags)

    return AnalysisResult(
        analyzer=self.name,
        session_id=session.session_id,
        trials=trial_results,
        flags=all_flags,
        session_summary=self._compute_session_summary(trial_metrics, trial_results),
    )

PasteAnalyzerConfig

slopit.behavioral.paste.PasteAnalyzerConfig dataclass

Bases: AnalyzerConfig

Configuration for paste analysis.

Attributes:

Name Type Description
large_paste_threshold int

Minimum characters to flag as large paste.

suspicious_preceding_keystrokes int

Maximum preceding keystrokes for suspicious paste.

Source code in src/slopit/behavioral/paste.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@dataclass
class PasteAnalyzerConfig(AnalyzerConfig):
    """Configuration for paste analysis.

    Attributes
    ----------
    large_paste_threshold
        Minimum characters to flag as large paste.
    suspicious_preceding_keystrokes
        Maximum preceding keystrokes for suspicious paste.
    """

    large_paste_threshold: int = 50
    suspicious_preceding_keystrokes: int = 5

Detection Signals

Flag Type Severity Description
large_paste medium Pasted substantial text
paste_without_typing medium/high Paste with minimal prior keystrokes

Example

from slopit.behavioral import PasteAnalyzer, PasteAnalyzerConfig

config = PasteAnalyzerConfig(
    large_paste_threshold=50,            # Flag pastes > 50 chars
    suspicious_preceding_keystrokes=5,   # Flag if < 5 keystrokes before
)

analyzer = PasteAnalyzer(config)
result = analyzer.analyze_session(session)

Metrics Computed

  • paste_count: Number of paste events
  • total_pasted_chars: Total characters pasted
  • blocked_count: Number of blocked paste events
  • large_paste_count: Number of large paste events

Combining Analyzers

Use the pipeline to run multiple analyzers and combine their results:

from slopit.pipeline import AnalysisPipeline
from slopit.behavioral import (
    KeystrokeAnalyzer,
    FocusAnalyzer,
    TimingAnalyzer,
    PasteAnalyzer,
)

pipeline = AnalysisPipeline([
    KeystrokeAnalyzer(),
    FocusAnalyzer(),
    TimingAnalyzer(),
    PasteAnalyzer(),
])

result = pipeline.analyze(sessions)

See Pipeline for aggregation configuration.

Writing Custom Analyzers

See Custom Analyzers Guide for instructions on creating your own analyzers.