Skip to content

Pipeline

The slopit.pipeline module provides orchestration for running multiple analyzers and aggregating their results.

AnalysisPipeline

The main class for orchestrating analysis.

slopit.pipeline.pipeline.AnalysisPipeline

Orchestrates multiple analyzers and aggregates results.

The pipeline runs each analyzer on the input sessions, combines their flags according to the configured strategy, and produces a unified result.

Parameters:

Name Type Description Default
analyzers list[Analyzer]

List of analyzers to run.

required
config PipelineConfig | None

Pipeline configuration.

None

Examples:

>>> from slopit import load_sessions
>>> from slopit.pipeline import AnalysisPipeline
>>> from slopit.behavioral import KeystrokeAnalyzer
>>>
>>> sessions = load_sessions("data/")
>>> pipeline = AnalysisPipeline([KeystrokeAnalyzer()])
>>> result = pipeline.analyze(sessions)
>>>
>>> for session_id, verdict in result.verdicts.items():
...     print(f"{session_id}: {verdict.status}")
Source code in src/slopit/pipeline/pipeline.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class AnalysisPipeline:
    """Orchestrates multiple analyzers and aggregates results.

    The pipeline runs each analyzer on the input sessions, combines
    their flags according to the configured strategy, and produces
    a unified result.

    Parameters
    ----------
    analyzers
        List of analyzers to run.
    config
        Pipeline configuration.

    Examples
    --------
    >>> from slopit import load_sessions
    >>> from slopit.pipeline import AnalysisPipeline
    >>> from slopit.behavioral import KeystrokeAnalyzer
    >>>
    >>> sessions = load_sessions("data/")
    >>> pipeline = AnalysisPipeline([KeystrokeAnalyzer()])
    >>> result = pipeline.analyze(sessions)
    >>>
    >>> for session_id, verdict in result.verdicts.items():
    ...     print(f"{session_id}: {verdict.status}")
    """

    def __init__(
        self,
        analyzers: list[Analyzer],
        config: PipelineConfig | None = None,
    ) -> None:
        self.analyzers = analyzers
        self.config = config or PipelineConfig()

    def analyze(self, sessions: list[SlopitSession]) -> PipelineResult:
        """Run all analyzers and aggregate results.

        Parameters
        ----------
        sessions
            Sessions to analyze.

        Returns
        -------
        PipelineResult
            Combined results from all analyzers.
        """
        # Run each analyzer
        analyzer_results: dict[str, list[AnalysisResult]] = {}

        for analyzer in self.analyzers:
            results = analyzer.analyze_sessions(sessions)
            analyzer_results[analyzer.name] = results

        # Aggregate flags per session
        session_flags: dict[str, list[AnalysisFlag]] = {}

        for session in sessions:
            flags = self._collect_session_flags(session.session_id, analyzer_results)
            filtered = self._filter_flags(flags)
            session_flags[session.session_id] = filtered

        # Compute verdicts
        verdicts = {
            session_id: self._compute_verdict(flags) for session_id, flags in session_flags.items()
        }

        return PipelineResult(
            sessions=[s.session_id for s in sessions],
            analyzer_results=analyzer_results,
            aggregated_flags=session_flags,
            verdicts=verdicts,
        )

    def _collect_session_flags(
        self,
        session_id: str,
        analyzer_results: dict[str, list[AnalysisResult]],
    ) -> list[AnalysisFlag]:
        """Collect all flags for a session from all analyzers."""
        flags: list[AnalysisFlag] = []

        for results in analyzer_results.values():
            for result in results:
                if result.session_id == session_id:
                    flags.extend(result.flags)

        return flags

    def _filter_flags(self, flags: list[AnalysisFlag]) -> list[AnalysisFlag]:
        """Filter flags based on configuration."""
        severity_order = ["info", "low", "medium", "high"]
        threshold_idx = severity_order.index(self.config.severity_threshold)

        filtered: list[AnalysisFlag] = []
        for flag in flags:
            flag_idx = severity_order.index(flag.severity)
            if flag_idx < threshold_idx:
                continue

            if flag.confidence is not None and flag.confidence < self.config.confidence_threshold:
                continue

            filtered.append(flag)

        return filtered

    def _compute_verdict(self, flags: list[AnalysisFlag]) -> SessionVerdict:
        """Compute final verdict for a session."""
        if not flags:
            return SessionVerdict(
                status="clean",
                confidence=1.0,
                flags=[],
                summary="No flags detected",
            )

        status, confidence = aggregate_flags(
            flags,
            self.config.aggregation,
            len(self.analyzers),
        )

        return SessionVerdict(
            status=status,
            confidence=confidence,
            flags=flags,
            summary=self._generate_summary(flags),
        )

    def _generate_summary(self, flags: list[AnalysisFlag]) -> str:
        """Generate human-readable summary of flags."""
        if not flags:
            return "No issues detected"

        flag_types = sorted({f.type for f in flags})
        return f"Detected: {', '.join(flag_types)}"

analyze

analyze(sessions: list[SlopitSession]) -> PipelineResult

Run all analyzers and aggregate results.

Parameters:

Name Type Description Default
sessions list[SlopitSession]

Sessions to analyze.

required

Returns:

Type Description
PipelineResult

Combined results from all analyzers.

Source code in src/slopit/pipeline/pipeline.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def analyze(self, sessions: list[SlopitSession]) -> PipelineResult:
    """Run all analyzers and aggregate results.

    Parameters
    ----------
    sessions
        Sessions to analyze.

    Returns
    -------
    PipelineResult
        Combined results from all analyzers.
    """
    # Run each analyzer
    analyzer_results: dict[str, list[AnalysisResult]] = {}

    for analyzer in self.analyzers:
        results = analyzer.analyze_sessions(sessions)
        analyzer_results[analyzer.name] = results

    # Aggregate flags per session
    session_flags: dict[str, list[AnalysisFlag]] = {}

    for session in sessions:
        flags = self._collect_session_flags(session.session_id, analyzer_results)
        filtered = self._filter_flags(flags)
        session_flags[session.session_id] = filtered

    # Compute verdicts
    verdicts = {
        session_id: self._compute_verdict(flags) for session_id, flags in session_flags.items()
    }

    return PipelineResult(
        sessions=[s.session_id for s in sessions],
        analyzer_results=analyzer_results,
        aggregated_flags=session_flags,
        verdicts=verdicts,
    )

Example

from slopit import load_sessions
from slopit.pipeline import AnalysisPipeline, PipelineConfig
from slopit.behavioral import KeystrokeAnalyzer, FocusAnalyzer

# Load data
sessions = load_sessions("data/")

# Configure pipeline
config = PipelineConfig(
    aggregation="weighted",
    severity_threshold="low",
    confidence_threshold=0.5,
)

# Create pipeline
pipeline = AnalysisPipeline(
    analyzers=[KeystrokeAnalyzer(), FocusAnalyzer()],
    config=config,
)

# Run analysis
result = pipeline.analyze(sessions)

PipelineConfig

Configuration for the analysis pipeline.

slopit.pipeline.pipeline.PipelineConfig dataclass

Configuration for the analysis pipeline.

Attributes:

Name Type Description
aggregation AggregationStrategy

Strategy for combining flags from multiple analyzers. - "any": Flag if any analyzer flags - "majority": Flag if majority of analyzers flag - "weighted": Use confidence-weighted voting

severity_threshold Literal['info', 'low', 'medium', 'high']

Minimum severity level to include in final verdict.

confidence_threshold float

Minimum confidence to include flag in aggregation.

Source code in src/slopit/pipeline/pipeline.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@dataclass
class PipelineConfig:
    """Configuration for the analysis pipeline.

    Attributes
    ----------
    aggregation
        Strategy for combining flags from multiple analyzers.
        - "any": Flag if any analyzer flags
        - "majority": Flag if majority of analyzers flag
        - "weighted": Use confidence-weighted voting
    severity_threshold
        Minimum severity level to include in final verdict.
    confidence_threshold
        Minimum confidence to include flag in aggregation.
    """

    aggregation: AggregationStrategy = "weighted"
    severity_threshold: Literal["info", "low", "medium", "high"] = "low"
    confidence_threshold: float = 0.5

Configuration Options

Parameter Type Default Description
aggregation "any" | "majority" | "weighted" "weighted" Strategy for combining flags
severity_threshold "info" | "low" | "medium" | "high" "low" Minimum severity to include
confidence_threshold float 0.5 Minimum confidence to include

Example Configurations

High sensitivity (flag on any evidence):

config = PipelineConfig(
    aggregation="any",
    severity_threshold="info",
    confidence_threshold=0.3,
)

Balanced (recommended for most use cases):

config = PipelineConfig(
    aggregation="weighted",
    severity_threshold="low",
    confidence_threshold=0.5,
)

High specificity (require strong evidence):

config = PipelineConfig(
    aggregation="majority",
    severity_threshold="medium",
    confidence_threshold=0.7,
)

Aggregation Strategies

aggregate_flags

slopit.pipeline.aggregation.aggregate_flags

aggregate_flags(flags: list[AnalysisFlag], strategy: AggregationStrategy, total_analyzers: int) -> tuple[VerdictStatus, float]

Aggregate flags using the specified strategy.

Parameters:

Name Type Description Default
flags list[AnalysisFlag]

List of flags to aggregate.

required
strategy AggregationStrategy

Aggregation strategy to use.

required
total_analyzers int

Total number of analyzers in the pipeline.

required

Returns:

Type Description
tuple[VerdictStatus, float]

The verdict status and confidence score.

Source code in src/slopit/pipeline/aggregation.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def aggregate_flags(
    flags: list[AnalysisFlag],
    strategy: AggregationStrategy,
    total_analyzers: int,
) -> tuple[VerdictStatus, float]:
    """Aggregate flags using the specified strategy.

    Parameters
    ----------
    flags
        List of flags to aggregate.
    strategy
        Aggregation strategy to use.
    total_analyzers
        Total number of analyzers in the pipeline.

    Returns
    -------
    tuple[VerdictStatus, float]
        The verdict status and confidence score.
    """
    if not flags:
        return ("clean", 1.0)

    if strategy == "any":
        return _aggregate_any(flags)
    elif strategy == "majority":
        return _aggregate_majority(flags, total_analyzers)
    else:  # weighted
        return _aggregate_weighted(flags)

Strategy Details

"any" (most sensitive):

  • Flags the session if any analyzer produces a flag
  • Highest sensitivity, highest false positive rate
  • Returns the maximum confidence among all flags

"majority" (balanced):

  • Flags if more than half of analyzers produce flags
  • Returns "suspicious" if some but not majority flag
  • Confidence is the proportion of flagging analyzers

"weighted" (recommended):

  • Uses confidence-weighted voting
  • Computes average confidence across all flags
  • flagged if average >= 0.7
  • suspicious if average >= 0.4
  • clean otherwise

Example

from slopit.pipeline.aggregation import aggregate_flags

flags = [...]  # List of AnalysisFlag objects
status, confidence = aggregate_flags(
    flags=flags,
    strategy="weighted",
    total_analyzers=4,
)

print(f"Status: {status}, Confidence: {confidence:.2f}")

Result Types

See Schemas > Analysis Result Types for the full API reference.

Working with PipelineResult

result = pipeline.analyze(sessions)

# List of analyzed session IDs
print(result.sessions)

# Results from each analyzer
for analyzer_name, results in result.analyzer_results.items():
    print(f"{analyzer_name}: {len(results)} results")

# Aggregated flags per session
for session_id, flags in result.aggregated_flags.items():
    print(f"{session_id}: {len(flags)} flags")

# Final verdicts
for session_id, verdict in result.verdicts.items():
    print(f"{session_id}: {verdict.status}")

Verdict Status Values

Status Description
clean No flags detected, or all flags below threshold
suspicious Some evidence of AI assistance, requires review
flagged Strong evidence of AI assistance

Reporting

TextReporter

Generate text reports from analysis results.

slopit.pipeline.reporting.TextReporter

Generate text reports from analysis results.

Examples:

>>> reporter = TextReporter()
>>> report = reporter.generate(pipeline_result)
>>> print(report)
Source code in src/slopit/pipeline/reporting.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class TextReporter:
    """Generate text reports from analysis results.

    Examples
    --------
    >>> reporter = TextReporter()
    >>> report = reporter.generate(pipeline_result)
    >>> print(report)
    """

    def generate(self, result: PipelineResult) -> str:
        """Generate a text report.

        Parameters
        ----------
        result
            Pipeline result to report on.

        Returns
        -------
        str
            Formatted text report.
        """
        lines: list[str] = []

        lines.append("=" * 60)
        lines.append("slopit Analysis Report")
        lines.append("=" * 60)
        lines.append("")

        # Summary statistics
        total = len(result.sessions)
        flagged = sum(1 for v in result.verdicts.values() if v.status == "flagged")
        suspicious = sum(1 for v in result.verdicts.values() if v.status == "suspicious")
        clean = sum(1 for v in result.verdicts.values() if v.status == "clean")

        lines.append(f"Sessions Analyzed: {total}")
        lines.append(f"  Flagged:    {flagged} ({flagged / total * 100:.1f}%)" if total else "")
        lines.append(
            f"  Suspicious: {suspicious} ({suspicious / total * 100:.1f}%)" if total else ""
        )
        lines.append(f"  Clean:      {clean} ({clean / total * 100:.1f}%)" if total else "")
        lines.append("")

        # Flagged sessions
        if flagged > 0:
            lines.append("-" * 60)
            lines.append("Flagged Sessions")
            lines.append("-" * 60)
            lines.append("")

            for session_id, verdict in result.verdicts.items():
                if verdict.status != "flagged":
                    continue

                lines.append(f"Session: {session_id}")
                lines.append(f"  Status: {verdict.status} (confidence: {verdict.confidence:.2f})")
                lines.append("  Flags:")

                for flag in verdict.flags:
                    lines.append(f"    - [{flag.analyzer}] {flag.type}: {flag.message}")

                lines.append("")

        # Suspicious sessions
        if suspicious > 0:
            lines.append("-" * 60)
            lines.append("Suspicious Sessions")
            lines.append("-" * 60)
            lines.append("")

            for session_id, verdict in result.verdicts.items():
                if verdict.status != "suspicious":
                    continue

                lines.append(f"Session: {session_id}")
                lines.append(f"  Status: {verdict.status} (confidence: {verdict.confidence:.2f})")
                lines.append(f"  Summary: {verdict.summary}")

                if verdict.flags:
                    lines.append("  Flags:")
                    for flag in verdict.flags:
                        lines.append(f"    - [{flag.analyzer}] {flag.type}: {flag.message}")

                lines.append("")

        return "\n".join(lines)

    def print_summary(self, result: PipelineResult) -> None:
        """Print a summary table using Rich.

        Parameters
        ----------
        result
            Pipeline result to summarize.
        """
        console = Console()

        table = Table(title="Analysis Summary")
        table.add_column("Status", style="bold")
        table.add_column("Count", justify="right")
        table.add_column("Percentage", justify="right")

        total = len(result.sessions)
        if total == 0:
            console.print("[yellow]No sessions analyzed[/yellow]")
            return

        flagged = sum(1 for v in result.verdicts.values() if v.status == "flagged")
        suspicious = sum(1 for v in result.verdicts.values() if v.status == "suspicious")
        clean = sum(1 for v in result.verdicts.values() if v.status == "clean")

        table.add_row("Flagged", str(flagged), f"{flagged / total * 100:.1f}%", style="red")
        table.add_row(
            "Suspicious", str(suspicious), f"{suspicious / total * 100:.1f}%", style="yellow"
        )
        table.add_row("Clean", str(clean), f"{clean / total * 100:.1f}%", style="green")

        console.print(table)

generate

generate(result: PipelineResult) -> str

Generate a text report.

Parameters:

Name Type Description Default
result PipelineResult

Pipeline result to report on.

required

Returns:

Type Description
str

Formatted text report.

Source code in src/slopit/pipeline/reporting.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def generate(self, result: PipelineResult) -> str:
    """Generate a text report.

    Parameters
    ----------
    result
        Pipeline result to report on.

    Returns
    -------
    str
        Formatted text report.
    """
    lines: list[str] = []

    lines.append("=" * 60)
    lines.append("slopit Analysis Report")
    lines.append("=" * 60)
    lines.append("")

    # Summary statistics
    total = len(result.sessions)
    flagged = sum(1 for v in result.verdicts.values() if v.status == "flagged")
    suspicious = sum(1 for v in result.verdicts.values() if v.status == "suspicious")
    clean = sum(1 for v in result.verdicts.values() if v.status == "clean")

    lines.append(f"Sessions Analyzed: {total}")
    lines.append(f"  Flagged:    {flagged} ({flagged / total * 100:.1f}%)" if total else "")
    lines.append(
        f"  Suspicious: {suspicious} ({suspicious / total * 100:.1f}%)" if total else ""
    )
    lines.append(f"  Clean:      {clean} ({clean / total * 100:.1f}%)" if total else "")
    lines.append("")

    # Flagged sessions
    if flagged > 0:
        lines.append("-" * 60)
        lines.append("Flagged Sessions")
        lines.append("-" * 60)
        lines.append("")

        for session_id, verdict in result.verdicts.items():
            if verdict.status != "flagged":
                continue

            lines.append(f"Session: {session_id}")
            lines.append(f"  Status: {verdict.status} (confidence: {verdict.confidence:.2f})")
            lines.append("  Flags:")

            for flag in verdict.flags:
                lines.append(f"    - [{flag.analyzer}] {flag.type}: {flag.message}")

            lines.append("")

    # Suspicious sessions
    if suspicious > 0:
        lines.append("-" * 60)
        lines.append("Suspicious Sessions")
        lines.append("-" * 60)
        lines.append("")

        for session_id, verdict in result.verdicts.items():
            if verdict.status != "suspicious":
                continue

            lines.append(f"Session: {session_id}")
            lines.append(f"  Status: {verdict.status} (confidence: {verdict.confidence:.2f})")
            lines.append(f"  Summary: {verdict.summary}")

            if verdict.flags:
                lines.append("  Flags:")
                for flag in verdict.flags:
                    lines.append(f"    - [{flag.analyzer}] {flag.type}: {flag.message}")

            lines.append("")

    return "\n".join(lines)

print_summary

print_summary(result: PipelineResult) -> None

Print a summary table using Rich.

Parameters:

Name Type Description Default
result PipelineResult

Pipeline result to summarize.

required
Source code in src/slopit/pipeline/reporting.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def print_summary(self, result: PipelineResult) -> None:
    """Print a summary table using Rich.

    Parameters
    ----------
    result
        Pipeline result to summarize.
    """
    console = Console()

    table = Table(title="Analysis Summary")
    table.add_column("Status", style="bold")
    table.add_column("Count", justify="right")
    table.add_column("Percentage", justify="right")

    total = len(result.sessions)
    if total == 0:
        console.print("[yellow]No sessions analyzed[/yellow]")
        return

    flagged = sum(1 for v in result.verdicts.values() if v.status == "flagged")
    suspicious = sum(1 for v in result.verdicts.values() if v.status == "suspicious")
    clean = sum(1 for v in result.verdicts.values() if v.status == "clean")

    table.add_row("Flagged", str(flagged), f"{flagged / total * 100:.1f}%", style="red")
    table.add_row(
        "Suspicious", str(suspicious), f"{suspicious / total * 100:.1f}%", style="yellow"
    )
    table.add_row("Clean", str(clean), f"{clean / total * 100:.1f}%", style="green")

    console.print(table)

Example

from slopit.pipeline import TextReporter

reporter = TextReporter()

# Generate full report
report = reporter.generate(result)
print(report)

# Print summary table (uses Rich)
reporter.print_summary(result)

Sample Output

============================================================
slopit Analysis Report
============================================================

Sessions Analyzed: 100
  Flagged:    12 (12.0%)
  Suspicious: 8 (8.0%)
  Clean:      80 (80.0%)

------------------------------------------------------------
Flagged Sessions
------------------------------------------------------------

Session: session_042
  Status: flagged (confidence: 0.85)
  Flags:
    - [keystroke] low_iki_variance: Keystroke timing unusually consistent (std=45.2ms)
    - [focus] blur_paste_pattern: Paste event detected shortly after tab switch

CSVExporter

Export analysis results to CSV format.

slopit.pipeline.reporting.CSVExporter

Export analysis results to CSV format.

Examples:

>>> exporter = CSVExporter()
>>> exporter.export(pipeline_result, "results.csv")
Source code in src/slopit/pipeline/reporting.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class CSVExporter:
    """Export analysis results to CSV format.

    Examples
    --------
    >>> exporter = CSVExporter()
    >>> exporter.export(pipeline_result, "results.csv")
    """

    def export(self, result: PipelineResult, path: str | Path) -> None:
        """Export results to a CSV file.

        Parameters
        ----------
        result
            Pipeline result to export.
        path
            Output file path.
        """
        path = Path(path)

        rows: list[dict[str, CSVValue]] = []

        for session_id, verdict in result.verdicts.items():
            row: dict[str, CSVValue] = {
                "session_id": session_id,
                "status": verdict.status,
                "confidence": verdict.confidence,
                "flag_count": len(verdict.flags),
                "summary": verdict.summary,
            }

            # Add flag type columns
            flag_types = {f.type for f in verdict.flags}
            for flag_type in flag_types:
                row[f"flag_{flag_type}"] = True

            rows.append(row)

        if not rows:
            return

        # Get all column names
        all_columns: set[str] = set()
        for row in rows:
            all_columns.update(row.keys())

        fieldnames = ["session_id", "status", "confidence", "flag_count", "summary"]
        fieldnames.extend(sorted(c for c in all_columns if c.startswith("flag_")))

        with path.open("w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
            writer.writeheader()
            writer.writerows(rows)

    def export_flags(self, result: PipelineResult, path: str | Path) -> None:
        """Export individual flags to a CSV file.

        Parameters
        ----------
        result
            Pipeline result to export.
        path
            Output file path.
        """
        path = Path(path)

        rows: list[dict[str, CSVValue]] = []

        for session_id, verdict in result.verdicts.items():
            for flag in verdict.flags:
                rows.append(
                    {
                        "session_id": session_id,
                        "analyzer": flag.analyzer,
                        "type": flag.type,
                        "severity": flag.severity,
                        "message": flag.message,
                        "confidence": flag.confidence,
                        "trial_ids": ",".join(flag.trial_ids) if flag.trial_ids else "",
                    }
                )

        if not rows:
            return

        fieldnames = [
            "session_id",
            "analyzer",
            "type",
            "severity",
            "message",
            "confidence",
            "trial_ids",
        ]

        with path.open("w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(rows)

export

export(result: PipelineResult, path: str | Path) -> None

Export results to a CSV file.

Parameters:

Name Type Description Default
result PipelineResult

Pipeline result to export.

required
path str | Path

Output file path.

required
Source code in src/slopit/pipeline/reporting.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def export(self, result: PipelineResult, path: str | Path) -> None:
    """Export results to a CSV file.

    Parameters
    ----------
    result
        Pipeline result to export.
    path
        Output file path.
    """
    path = Path(path)

    rows: list[dict[str, CSVValue]] = []

    for session_id, verdict in result.verdicts.items():
        row: dict[str, CSVValue] = {
            "session_id": session_id,
            "status": verdict.status,
            "confidence": verdict.confidence,
            "flag_count": len(verdict.flags),
            "summary": verdict.summary,
        }

        # Add flag type columns
        flag_types = {f.type for f in verdict.flags}
        for flag_type in flag_types:
            row[f"flag_{flag_type}"] = True

        rows.append(row)

    if not rows:
        return

    # Get all column names
    all_columns: set[str] = set()
    for row in rows:
        all_columns.update(row.keys())

    fieldnames = ["session_id", "status", "confidence", "flag_count", "summary"]
    fieldnames.extend(sorted(c for c in all_columns if c.startswith("flag_")))

    with path.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
        writer.writeheader()
        writer.writerows(rows)

export_flags

export_flags(result: PipelineResult, path: str | Path) -> None

Export individual flags to a CSV file.

Parameters:

Name Type Description Default
result PipelineResult

Pipeline result to export.

required
path str | Path

Output file path.

required
Source code in src/slopit/pipeline/reporting.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def export_flags(self, result: PipelineResult, path: str | Path) -> None:
    """Export individual flags to a CSV file.

    Parameters
    ----------
    result
        Pipeline result to export.
    path
        Output file path.
    """
    path = Path(path)

    rows: list[dict[str, CSVValue]] = []

    for session_id, verdict in result.verdicts.items():
        for flag in verdict.flags:
            rows.append(
                {
                    "session_id": session_id,
                    "analyzer": flag.analyzer,
                    "type": flag.type,
                    "severity": flag.severity,
                    "message": flag.message,
                    "confidence": flag.confidence,
                    "trial_ids": ",".join(flag.trial_ids) if flag.trial_ids else "",
                }
            )

    if not rows:
        return

    fieldnames = [
        "session_id",
        "analyzer",
        "type",
        "severity",
        "message",
        "confidence",
        "trial_ids",
    ]

    with path.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)

Example

from slopit.pipeline import CSVExporter

exporter = CSVExporter()

# Export session-level results
exporter.export(result, "results.csv")

# Export individual flags
exporter.export_flags(result, "flags.csv")

Output Format

results.csv:

session_id status confidence flag_count summary flag_low_iki_variance flag_excessive_blur
session_001 clean 1.00 0 No flags detected
session_002 flagged 0.85 2 Detected: low_iki_variance, excessive_blur True True

flags.csv:

session_id analyzer type severity message confidence trial_ids
session_002 keystroke low_iki_variance medium Keystroke timing unusually consistent 0.75 trial-0,trial-1
session_002 focus excessive_blur medium Excessive window switches detected 0.70 trial-0

Complete Pipeline Example

from slopit import load_sessions
from slopit.pipeline import (
    AnalysisPipeline,
    PipelineConfig,
    TextReporter,
    CSVExporter,
)
from slopit.behavioral import (
    KeystrokeAnalyzer,
    FocusAnalyzer,
    TimingAnalyzer,
    PasteAnalyzer,
)

# Load data
sessions = load_sessions("data/")
print(f"Loaded {len(sessions)} sessions")

# Configure pipeline
config = PipelineConfig(
    aggregation="weighted",
    severity_threshold="low",
    confidence_threshold=0.5,
)

# Create pipeline with all analyzers
pipeline = AnalysisPipeline(
    analyzers=[
        KeystrokeAnalyzer(),
        FocusAnalyzer(),
        TimingAnalyzer(),
        PasteAnalyzer(),
    ],
    config=config,
)

# Run analysis
result = pipeline.analyze(sessions)

# Print summary
reporter = TextReporter()
reporter.print_summary(result)

# Export results
exporter = CSVExporter()
exporter.export(result, "results.csv")
exporter.export_flags(result, "flags.csv")

# Print flagged sessions
print("\nFlagged Sessions:")
for session_id, verdict in result.verdicts.items():
    if verdict.status == "flagged":
        print(f"\n{session_id}:")
        print(f"  Confidence: {verdict.confidence:.2f}")
        for flag in verdict.flags:
            print(f"  - [{flag.analyzer}] {flag.message}")