Triage Warden

AI-powered security incident triage and response platform

Triage Warden automates the analysis and response to security incidents using AI agents, configurable playbooks, and integrations with your existing security stack.

Features

  • AI-Powered Triage: Automated analysis of phishing emails, malware alerts, and suspicious login attempts
  • Configurable Playbooks: Define custom investigation and response workflows
  • Policy Engine: Role-based approval workflows for sensitive actions
  • Connector Framework: Integrate with VirusTotal, Splunk, CrowdStrike, Jira, Microsoft 365, and more
  • Web Dashboard: Real-time incident management with approval workflows
  • REST API: Programmatic access for automation and integration
  • Audit Trail: Complete logging of all actions and decisions

Quick Example

# Analyze a phishing email
tw-cli incident create --type phishing --source "email-gateway" --data '{"subject": "Urgent: Update Account"}'

# Run AI triage
tw-cli triage run --incident INC-2024-001

# View the verdict
tw-cli incident get INC-2024-001 --format json

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                        Web Dashboard                             │
│                    (HTMX + Askama Templates)                     │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                         REST API                                 │
│                     (Axum + Tower)                               │
└─────────────────────────────────────────────────────────────────┘
                                │
        ┌───────────────────────┼───────────────────────┐
        ▼                       ▼                       ▼
┌───────────────┐    ┌───────────────────┐    ┌───────────────┐
│ Policy Engine │    │   AI Triage Agent │    │    Actions    │
│    (Rust)     │    │     (Python)      │    │    (Rust)     │
└───────────────┘    └───────────────────┘    └───────────────┘
        │                       │                       │
        └───────────────────────┼───────────────────────┘
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                       Connector Layer                            │
│        (VirusTotal, Splunk, CrowdStrike, Jira, M365)            │
└─────────────────────────────────────────────────────────────────┘

Getting Started

  1. Installation - Install Triage Warden
  2. Quick Start - Create your first incident
  3. Configuration - Configure connectors and policies

License

Triage Warden is licensed under the MIT License.

Getting Started

Welcome to Triage Warden! This guide will help you get up and running quickly.

Prerequisites

  • Rust 1.75+ (for building from source)
  • Python 3.11+ (for AI triage agents)
  • SQLite or PostgreSQL (for data storage)
  • uv (recommended Python package manager)

Installation Options

  1. From Source - Build and run locally
  2. Docker - Run in containers
  3. Pre-built Binaries - Download releases

Next Steps

Installation

Building from Source

Prerequisites

# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Install uv (Python package manager)
curl -LsSf https://astral.sh/uv/install.sh | sh

Clone and Build

# Clone the repository
git clone https://github.com/your-org/triage-warden.git
cd triage-warden

# Build Rust components
cargo build --release

# Install Python dependencies
cd python
uv sync

Verify Installation

# Check the CLI
./target/release/tw-cli --version

# Run tests
cargo test
cd python && uv run pytest

Docker

# Build the image
docker build -t triage-warden .

# Run with default settings
docker run -p 8080:8080 triage-warden

# Run with custom configuration
docker run -p 8080:8080 \
  -e TW_DATABASE_URL=postgres://user:pass@host/db \
  -e TW_VIRUSTOTAL_API_KEY=your-key \
  triage-warden

Pre-built Binaries

Download the latest release from the releases page.

Available platforms:

  • Linux x86_64 (glibc)
  • Linux x86_64 (musl)
  • macOS x86_64
  • macOS aarch64 (Apple Silicon)
# Example for macOS
curl -LO https://github.com/your-org/triage-warden/releases/latest/download/triage-warden-macos-aarch64.tar.gz
tar xzf triage-warden-macos-aarch64.tar.gz
./tw-cli --version

Database Setup

SQLite (Default)

SQLite is used by default. The database file is created automatically:

# Default location
TW_DATABASE_URL=sqlite://./triage_warden.db

# Custom location
TW_DATABASE_URL=sqlite:///var/lib/triage-warden/data.db

PostgreSQL

For production deployments:

# Create database
createdb triage_warden

# Set connection string
export TW_DATABASE_URL=postgres://user:password@localhost/triage_warden

# Run migrations
tw-cli db migrate

Next Steps

Quick Start

Get Triage Warden running and process your first incident in 5 minutes.

1. Start the Server

# Start with default settings (SQLite, mock connectors)
cargo run --bin tw-api

# Or use the release binary
./target/release/tw-api

The web dashboard is now available at http://localhost:8080.

2. Create an Incident

Via Web Dashboard

  1. Open http://localhost:8080 in your browser
  2. Click "New Incident"
  3. Fill in the incident details:
    • Type: Phishing
    • Source: Email Gateway
    • Severity: Medium
  4. Click Create

Via CLI

tw-cli incident create \
  --type phishing \
  --source "email-gateway" \
  --severity medium \
  --data '{
    "subject": "Urgent: Verify Your Account",
    "sender": "security@fake-bank.com",
    "recipient": "employee@company.com"
  }'

Via API

curl -X POST http://localhost:8080/api/incidents \
  -H "Content-Type: application/json" \
  -d '{
    "incident_type": "phishing",
    "source": "email-gateway",
    "severity": "medium",
    "raw_data": {
      "subject": "Urgent: Verify Your Account",
      "sender": "security@fake-bank.com"
    }
  }'

3. Run AI Triage

# Trigger triage for the incident
tw-cli triage run --incident INC-2024-0001

The AI agent will:

  1. Parse email headers and content
  2. Check sender reputation
  3. Analyze URLs and attachments
  4. Generate a verdict with confidence score

4. View the Verdict

# Get incident with triage results
tw-cli incident get INC-2024-0001

# Example output:
# Incident: INC-2024-0001
# Type: phishing
# Status: triaged
# Verdict: malicious
# Confidence: 0.92
# Recommended Actions:
#   - quarantine_email
#   - block_sender
#   - notify_user

5. Execute Actions

Actions may require approval based on your policy configuration:

# Request to quarantine the email
tw-cli action execute --incident INC-2024-0001 --action quarantine_email

# If auto-approved:
# Action executed: quarantine_email (status: completed)

# If requires approval:
# Action pending approval from: Senior Analyst

Approve pending actions via the dashboard at /approvals.

Next Steps

Configuration

Triage Warden is configured through environment variables and configuration files.

Environment Variables

Core Settings

VariableDescriptionDefault
TW_DATABASE_URLDatabase connection stringsqlite://./triage_warden.db
TW_HOSTAPI server host0.0.0.0
TW_PORTAPI server port8080
TW_LOG_LEVELLogging level (trace, debug, info, warn, error)info
TW_ADMIN_PASSWORDInitial admin password(generated)

Connector Selection

VariableDescriptionValues
TW_THREAT_INTEL_MODEThreat intelligence backendmock, virustotal
TW_SIEM_MODESIEM backendmock, splunk
TW_EDR_MODEEDR backendmock, crowdstrike
TW_EMAIL_GATEWAY_MODEEmail gateway backendmock, m365
TW_TICKETING_MODETicketing backendmock, jira

VirusTotal

TW_THREAT_INTEL_MODE=virustotal
TW_VIRUSTOTAL_API_KEY=your-api-key-here

Splunk

TW_SIEM_MODE=splunk
TW_SPLUNK_URL=https://splunk.company.com:8089
TW_SPLUNK_TOKEN=your-token-here

CrowdStrike

TW_EDR_MODE=crowdstrike
TW_CROWDSTRIKE_CLIENT_ID=your-client-id
TW_CROWDSTRIKE_CLIENT_SECRET=your-client-secret
TW_CROWDSTRIKE_REGION=us-1  # us-1, us-2, eu-1

Microsoft 365

TW_EMAIL_GATEWAY_MODE=m365
TW_M365_TENANT_ID=your-tenant-id
TW_M365_CLIENT_ID=your-client-id
TW_M365_CLIENT_SECRET=your-client-secret

Jira

TW_TICKETING_MODE=jira
TW_JIRA_URL=https://company.atlassian.net
TW_JIRA_EMAIL=automation@company.com
TW_JIRA_API_TOKEN=your-api-token
TW_JIRA_PROJECT_KEY=SEC

AI Provider

TW_AI_PROVIDER=anthropic  # anthropic, openai
TW_ANTHROPIC_API_KEY=your-api-key
# or
TW_OPENAI_API_KEY=your-api-key

Configuration File

For complex configurations, use a TOML file:

# config.toml

[server]
host = "0.0.0.0"
port = 8080
log_level = "info"

[database]
url = "postgres://user:pass@localhost/triage_warden"
max_connections = 10

[connectors.threat_intel]
mode = "virustotal"
api_key = "${TW_VIRUSTOTAL_API_KEY}"
rate_limit = 4  # requests per minute

[connectors.siem]
mode = "splunk"
url = "https://splunk.company.com:8089"
token = "${TW_SPLUNK_TOKEN}"

[connectors.edr]
mode = "crowdstrike"
client_id = "${TW_CROWDSTRIKE_CLIENT_ID}"
client_secret = "${TW_CROWDSTRIKE_CLIENT_SECRET}"
region = "us-1"

[ai]
provider = "anthropic"
model = "claude-sonnet-4-20250514"
max_tokens = 4096

[policy]
default_action_approval = "auto"  # auto, analyst, senior, manager
high_severity_approval = "senior"
critical_action_approval = "manager"

Load with:

tw-api --config config.toml

Policy Rules

Policy rules control action approval requirements. See Policy Engine for details.

# Example policy rule
[[policy.rules]]
name = "isolate_host_requires_manager"
action = "isolate_host"
severity = ["high", "critical"]
approval_level = "manager"

Logging

Configure structured logging:

# JSON output for production
TW_LOG_FORMAT=json

# Pretty output for development
TW_LOG_FORMAT=pretty

# Filter specific modules
RUST_LOG=tw_api=debug,tw_core=info

Next Steps

Web Dashboard

Browser-based interface for incident management.

Overview

The dashboard provides:

  • Real-time incident monitoring
  • Approval workflow management
  • Playbook configuration
  • System settings

Access at: http://localhost:8080

Features

Home Dashboard

The main dashboard displays:

  • KPIs: Open incidents, pending approvals, triage rate
  • Recent Incidents: Latest incidents with status
  • Trend Charts: Incident volume over time
  • Quick Actions: Create incident, run playbook

Incident Management

  • List view with filtering and sorting
  • Detail view with full incident context
  • Action execution interface
  • Triage results and reasoning

Approval Workflow

  • Queue of pending approvals
  • One-click approve/reject
  • Bulk approval for related actions
  • SLA countdown timers

Playbook Management

  • Create and edit playbooks
  • Visual step editor
  • Test with sample data
  • Execution history

Settings

  • Connector configuration
  • Policy rule management
  • User administration
  • System preferences
PathDescription
/Dashboard home
/incidentsIncident list
/incidents/:idIncident detail
/approvalsPending approvals
/playbooksPlaybook management
/settingsSystem settings
/loginLogin page

Next Steps

Incidents

Managing incidents in the web dashboard.

Incident List

Access at /incidents

Filtering

  • Status: Open, Triaged, Resolved
  • Severity: Low, Medium, High, Critical
  • Type: Phishing, Malware, Suspicious Login
  • Date Range: Custom time period

Sorting

Click column headers to sort:

  • Created (newest/oldest)
  • Severity (highest/lowest)
  • Status

Bulk Actions

Select multiple incidents for:

  • Bulk resolve
  • Bulk escalate
  • Export to CSV

Incident Detail

Click an incident to view details.

Overview Tab

  • Incident metadata
  • AI verdict and confidence
  • Recommended actions
  • Timeline of events

Raw Data Tab

  • Original incident data (JSON)
  • Parsed email content (for phishing)
  • Detection details (for malware)

Actions Tab

  • Available actions
  • Executed actions with results
  • Pending approvals

Enrichment Tab

  • Threat intelligence results
  • SIEM correlation data
  • Related incidents

Creating Incidents

Click "New Incident" button.

Required Fields

  • Type: Select incident type
  • Source: Origin of the incident
  • Severity: Initial severity assessment

Optional Fields

  • Description: Free-form description
  • Raw Data: JSON payload
  • Assignee: Initial assignment

Executing Actions

From the incident detail page:

  1. Click "Actions" tab
  2. Select action from dropdown
  3. Fill in parameters
  4. Click "Execute"

If approval is required:

  • Action appears in pending state
  • Notification sent to approvers
  • Status updates when approved/rejected

Keyboard Shortcuts

ShortcutAction
j / kNavigate list
EnterOpen incident
EscClose modal
aOpen actions menu
eEscalate
rResolve

Real-time Updates

The dashboard uses HTMX for live updates:

  • New incidents appear automatically
  • Status changes reflect immediately
  • Approval decisions update in real-time

Approvals

Managing action approvals in the web dashboard.

Approval Queue

Access at /approvals

The queue shows all actions pending your approval based on your role level.

Queue Columns

  • Action: Type of action requested
  • Incident: Related incident
  • Requested By: Who/what requested it
  • Requested At: When requested
  • SLA: Time remaining to respond

Filtering

  • Approval Level: Analyst, Senior, Manager
  • Action Type: Specific actions
  • Incident Type: Phishing, malware, etc.

Approval Detail

Click an approval to see full context.

Context Section

  • Full incident details
  • AI reasoning (if from triage)
  • Related actions already taken

Decision Section

  • Approve: Execute the action
  • Reject: Decline with reason
  • Delegate: Assign to another approver

Approving Actions

Single Approval

  1. Click on pending action
  2. Review incident context
  3. Click "Approve" or "Reject"
  4. Add optional comment
  5. Confirm decision

Bulk Approval

For related actions:

  1. Select multiple actions (checkbox)
  2. Click "Bulk Approve" or "Bulk Reject"
  3. Add comment applying to all
  4. Confirm

Rejection

When rejecting:

  1. Click "Reject"
  2. Required: Enter rejection reason
  3. Optionally suggest alternative
  4. Confirm

The requester is notified of rejection and reason.

SLA Indicators

ColorMeaning
GreenPlenty of time
Yellow< 50% time remaining
Orange< 25% time remaining
RedSLA exceeded

Notifications

You receive notifications for:

  • New actions requiring your approval
  • SLA warnings (50%, 75% elapsed)
  • Escalations to your level

Configure notification preferences in Settings.

Delegation

If unavailable:

  1. Go to Settings > Delegation
  2. Select delegate user
  3. Set date range
  4. Delegate receives your approvals

Audit Trail

All approvals are logged:

  • Who approved/rejected
  • When decision was made
  • Time to approve
  • Comments provided

View at Settings > Audit Logs.

Playbooks

Managing playbooks in the web dashboard.

Playbook List

Access at /playbooks

Views

  • Active: Currently enabled playbooks
  • Inactive: Disabled playbooks
  • All: Complete list

Information Displayed

  • Name and description
  • Trigger conditions
  • Last run time
  • Success rate

Creating Playbooks

Click "New Playbook" button.

Basic Information

  • Name: Unique identifier
  • Description: What this playbook does
  • Version: Semantic version

Triggers

Configure when playbook runs:

  • Incident Type: Phishing, malware, etc.
  • Auto Run: Run automatically on new incidents
  • Conditions: Additional criteria

Variables

Define playbook variables:

quarantine_threshold: 0.7
notification_channel: "#security"

Step Editor

Visual editor for playbook steps.

Adding Steps

  1. Click "Add Step"
  2. Select action type
  3. Configure parameters
  4. Set output variable name

Step Types

  • Action: Execute an action
  • Condition: Branch logic
  • AI Analysis: Get AI verdict
  • Parallel: Run steps concurrently

Connections

  • Drag to reorder steps
  • Connect condition branches
  • Set dependencies

Testing Playbooks

Dry Run

  1. Click "Test"
  2. Select or create test incident
  3. Toggle "Dry Run"
  4. View step-by-step execution

With Live Data

  1. Click "Test"
  2. Select real incident
  3. Leave "Dry Run" off
  4. Actions will execute (with approval)

Execution History

View past executions:

  • Execution timestamp
  • Incident processed
  • Steps completed
  • Final verdict
  • Duration

Click execution for detailed trace.

Import/Export

Export

  1. Select playbook
  2. Click "Export"
  3. Download YAML file

Import

  1. Click "Import"
  2. Upload YAML file
  3. Review parsed playbook
  4. Click "Create"

Playbook Versions

Playbooks are versioned:

  1. Edit playbook
  2. Bump version number
  3. Save as new version
  4. Old version kept for rollback

View version history and compare changes.

Settings

System configuration in the web dashboard.

Settings Tabs

Access at /settings

General

  • Instance Name: Display name for this installation
  • Time Zone: Default timezone for display
  • Date Format: Date/time display format
  • Theme: Light/dark mode preference

Connectors

Configure external integrations.

Threat Intelligence

  • Mode: Mock or VirusTotal
  • API Key (for VirusTotal)
  • Rate limit settings

SIEM

  • Mode: Mock or Splunk
  • URL and authentication
  • Default search index

EDR

  • Mode: Mock or CrowdStrike
  • OAuth credentials
  • Region selection

Email Gateway

  • Mode: Mock or Microsoft 365
  • Azure AD configuration
  • Tenant settings

Ticketing

  • Mode: Mock or Jira
  • Instance URL
  • Project configuration

Policies

Manage policy rules.

Creating Rules

  1. Click "Add Rule"
  2. Enter rule name
  3. Define matching criteria
  4. Set decision (allow/deny/approval)
  5. Save

Rule Priority

Drag rules to reorder. First matching rule wins.

Users

User management (admin only).

User List

  • Username and email
  • Role (viewer/analyst/senior/admin)
  • Last login
  • Status (active/disabled)

Creating Users

  1. Click "Add User"
  2. Enter email and username
  3. Set initial role
  4. Generate or set password
  5. Send invitation email

Role Management

Assign roles:

  • Viewer: Read-only access
  • Analyst: Execute actions, approve analyst-level
  • Senior: Approve senior-level
  • Admin: Full access

Notifications

Configure notification preferences.

Channels

  • Email: SMTP settings
  • Slack: Webhook URL
  • Teams: Connector URL
  • PagerDuty: Integration key

Preferences

For each notification type:

  • Enable/disable channel
  • Set priority threshold
  • Configure quiet hours

Audit Logs

View system audit trail.

Filtering

  • Date range
  • Event type
  • User
  • Resource

Export

Export logs to CSV for compliance.

API Keys

Manage API credentials.

Creating Keys

  1. Click "Create API Key"
  2. Enter name and description
  3. Select scopes
  4. Set expiration (optional)
  5. Copy generated key

Revoking Keys

Click "Revoke" on any key. Revocation is immediate.

Backup & Restore

Database management.

Backup

  1. Click "Create Backup"
  2. Wait for completion
  3. Download backup file

Restore

  1. Click "Restore"
  2. Upload backup file
  3. Confirm restore
  4. System restarts

About

System information:

  • Version number
  • Build information
  • License status
  • Support links

CLI Reference

Command-line interface for Triage Warden.

Installation

The CLI is built with the main project:

cargo build --release
./target/release/tw-cli --help

Global Options

tw-cli [OPTIONS] <COMMAND>

Options:
  -c, --config <FILE>     Configuration file path
  -v, --verbose           Enable verbose output
  -q, --quiet             Suppress non-error output
  --json                  Output as JSON
  -h, --help              Print help
  -V, --version           Print version

Environment Variables

VariableDescription
TW_API_URLAPI server URL (default: http://localhost:8080)
TW_API_KEYAPI key for authentication
TW_CONFIGPath to config file

Commands Overview

CommandDescription
incidentManage incidents
actionExecute and manage actions
triageRun AI triage
playbookManage playbooks
policyManage policy rules
connectorManage connectors
userUser management
api-keyAPI key management
webhookWebhook management
configConfiguration management
dbDatabase operations
serveStart API server

Quick Examples

# List open incidents
tw-cli incident list --status open

# Create incident
tw-cli incident create --type phishing --severity high

# Run triage
tw-cli triage run --incident INC-2024-001

# Execute action
tw-cli action execute --incident INC-2024-001 --action quarantine_email

# Approve pending action
tw-cli action approve act-abc123

# Start server
tw-cli serve --port 8080

Next Steps

CLI Commands

Detailed reference for all CLI commands.

incident

Manage security incidents.

list

tw-cli incident list [OPTIONS]

Options:
  --status <STATUS>      Filter by status (open, triaged, resolved)
  --severity <SEVERITY>  Filter by severity
  --type <TYPE>          Filter by incident type
  --limit <N>            Maximum results (default: 20)
  --offset <N>           Skip first N results
  --sort <FIELD>         Sort field (created_at, severity)
  --desc                 Sort descending

get

tw-cli incident get <ID> [OPTIONS]

Options:
  --format <FORMAT>      Output format (table, json, yaml)
  --include-actions      Include action history
  --include-enrichment   Include enrichment data

create

tw-cli incident create [OPTIONS]

Options:
  --type <TYPE>          Incident type (required)
  --source <SOURCE>      Incident source (required)
  --severity <SEVERITY>  Initial severity (default: medium)
  --data <JSON>          Raw incident data as JSON
  --file <FILE>          Read data from file
  --auto-triage          Run triage after creation

update

tw-cli incident update <ID> [OPTIONS]

Options:
  --severity <SEVERITY>  Update severity
  --status <STATUS>      Update status
  --assignee <USER>      Assign to user

resolve

tw-cli incident resolve <ID> [OPTIONS]

Options:
  --resolution <TEXT>    Resolution notes
  --false-positive       Mark as false positive

action

Execute and manage actions.

execute

tw-cli action execute [OPTIONS]

Options:
  --incident <ID>        Associated incident
  --action <NAME>        Action to execute (required)
  --param <KEY=VALUE>    Action parameter (repeatable)
  --emergency            Emergency override (manager only)

list

tw-cli action list [OPTIONS]

Options:
  --incident <ID>        Filter by incident
  --status <STATUS>      Filter by status
  --pending              Show only pending approval

get

tw-cli action get <ID>

approve

tw-cli action approve <ID> [OPTIONS]

Options:
  --comment <TEXT>       Approval comment

reject

tw-cli action reject <ID> [OPTIONS]

Options:
  --reason <TEXT>        Rejection reason (required)

rollback

tw-cli action rollback <ID> [OPTIONS]

Options:
  --reason <TEXT>        Rollback reason

triage

Run AI triage.

run

tw-cli triage run [OPTIONS]

Options:
  --incident <ID>        Incident to triage (required)
  --playbook <NAME>      Specific playbook
  --model <MODEL>        AI model override
  --wait                 Wait for completion

status

tw-cli triage status <TRIAGE_ID>

playbook

Manage playbooks.

list

tw-cli playbook list [OPTIONS]

Options:
  --enabled              Only enabled playbooks
  --trigger-type <TYPE>  Filter by trigger type

get

tw-cli playbook get <ID>

add

tw-cli playbook add <FILE>

update

tw-cli playbook update <ID> <FILE>

delete

tw-cli playbook delete <ID>

run

tw-cli playbook run <ID> [OPTIONS]

Options:
  --incident <ID>        Incident to process
  --var <KEY=VALUE>      Override variable (repeatable)
  --dry-run              Don't execute actions

test

tw-cli playbook test <NAME> [OPTIONS]

Options:
  --incident <ID>        Use existing incident
  --data <JSON>          Use mock data
  --dry-run              Don't execute actions

validate

tw-cli playbook validate <FILE>

export

tw-cli playbook export <ID> [OPTIONS]

Options:
  -o, --output <FILE>    Output file (default: stdout)

policy

Manage policy rules.

list

tw-cli policy list

add

tw-cli policy add [OPTIONS]

Options:
  --name <NAME>          Rule name (required)
  --action <ACTION>      Action to match
  --pattern <PATTERN>    Action pattern (glob)
  --severity <SEVERITY>  Severity condition
  --approval-level <L>   Required approval level
  --allow                Auto-allow
  --deny                 Deny with reason
  --reason <TEXT>        Denial reason

delete

tw-cli policy delete <NAME>

test

tw-cli policy test [OPTIONS]

Options:
  --action <ACTION>      Action to test
  --severity <SEVERITY>  Incident severity
  --proposer-type <T>    Proposer type
  --confidence <N>       AI confidence score

connector

Manage connectors.

status

tw-cli connector status

test

tw-cli connector test <NAME>

configure

tw-cli connector configure <NAME> [OPTIONS]

Options:
  --mode <MODE>          Connector mode
  --api-key <KEY>        API key
  --url <URL>            Service URL

user

User management.

list

tw-cli user list

create

tw-cli user create [OPTIONS]

Options:
  --username <NAME>      Username (required)
  --email <EMAIL>        Email address
  --role <ROLE>          User role
  --service-account      Create as service account

update

tw-cli user update <ID> [OPTIONS]

Options:
  --role <ROLE>          New role
  --enabled              Enable user
  --disabled             Disable user

delete

tw-cli user delete <ID>

api-key

API key management.

list

tw-cli api-key list

create

tw-cli api-key create [OPTIONS]

Options:
  --name <NAME>          Key name (required)
  --scopes <SCOPES>      Comma-separated scopes
  --user <USER>          Associated user
  --expires <DATE>       Expiration date

revoke

tw-cli api-key revoke <PREFIX>

rotate

tw-cli api-key rotate <PREFIX>

webhook

Webhook management.

list

tw-cli webhook list

add

tw-cli webhook add <SOURCE> [OPTIONS]

Options:
  --secret <SECRET>      Webhook secret
  --auto-triage          Enable auto-triage
  --playbook <NAME>      Playbook to run

test

tw-cli webhook test <SOURCE>

delete

tw-cli webhook delete <SOURCE>

db

Database operations.

migrate

tw-cli db migrate

backup

tw-cli db backup [OPTIONS]

Options:
  -o, --output <FILE>    Backup file path

restore

tw-cli db restore <FILE>

serve

Start the API server.

tw-cli serve [OPTIONS]

Options:
  --host <HOST>          Bind address (default: 0.0.0.0)
  --port <PORT>          Port number (default: 8080)
  --config <FILE>        Configuration file

Architecture Overview

Triage Warden is built as a modular, layered system combining Rust for performance-critical components and Python for AI capabilities.

System Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                           Clients                                    │
│              (Web Browser, CLI, API Consumers)                       │
└─────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                         API Layer (tw-api)                           │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │  REST API   │  │ Web Handlers│  │  Webhooks   │  │   Metrics   │ │
│  │   (Axum)    │  │(HTMX+Askama)│  │             │  │ (Prometheus)│ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
                                   │
        ┌──────────────────────────┼──────────────────────────┐
        ▼                          ▼                          ▼
┌───────────────┐        ┌───────────────────┐       ┌───────────────┐
│ Policy Engine │        │   Action Registry │       │  Event Bus    │
│   (tw-policy) │        │    (tw-actions)   │       │  (tw-core)    │
└───────────────┘        └───────────────────┘       └───────────────┘
        │                          │                          │
        └──────────────────────────┼──────────────────────────┘
                                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                       Core Domain (tw-core)                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │  Incidents  │  │  Playbooks  │  │   Users     │  │   Audit     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    Database Layer (SQLx)                             │
│              (SQLite for dev, PostgreSQL for prod)                   │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                      Python Bridge (tw-bridge)                       │
│                          (PyO3 Bindings)                             │
└─────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                       AI Layer (tw_ai)                               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │Triage Agent │  │    Tools    │  │  Playbook   │  │  Evaluation │ │
│  │  (Claude)   │  │             │  │   Engine    │  │  Framework  │ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    Connector Layer (tw-connectors)                   │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │ VirusTotal  │  │   Splunk    │  │ CrowdStrike │  │    Jira     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

Crate Structure

CratePurpose
tw-apiHTTP server, REST API, web handlers, webhooks
tw-coreDomain models, database repositories, event bus
tw-actionsAction handlers (quarantine, isolate, notify, etc.)
tw-policyPolicy engine, approval rules, decision evaluation
tw-connectorsExternal service integrations (VirusTotal, Splunk, etc.)
tw-bridgePyO3 bindings exposing Rust to Python
tw-cliCommand-line interface
tw-observabilityMetrics, tracing, logging infrastructure

Key Design Decisions

Rust + Python Hybrid

  • Rust: Core platform, API server, policy engine, actions
  • Python: AI agents, LLM integrations, playbook execution
  • Bridge: PyO3 enables Python to call Rust connectors and actions

Trait-Based Connectors

All connectors implement traits for testability:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait ThreatIntelConnector: Send + Sync {
    async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport>;
    async fn lookup_url(&self, url: &str) -> ConnectorResult<ThreatReport>;
    async fn lookup_domain(&self, domain: &str) -> ConnectorResult<ThreatReport>;
}
}

Event-Driven Architecture

The event bus enables loose coupling:

#![allow(unused)]
fn main() {
event_bus.publish(Event::IncidentCreated { id, incident_type });
event_bus.publish(Event::ActionExecuted { action_id, result });
}

Policy-First Actions

All actions pass through the policy engine:

Request → Policy Evaluation → (Allowed | Denied | RequiresApproval) → Execute

Next Steps

Components

Detailed description of each major component in Triage Warden.

tw-api

The HTTP server and web interface.

REST API Routes

RouteDescription
GET /api/incidentsList incidents with filtering
POST /api/incidentsCreate new incident
GET /api/incidents/:idGet incident details
POST /api/incidents/:id/actionsExecute action on incident
GET /api/playbooksList playbooks
POST /api/webhooks/:sourceReceive webhook events

Web Handlers

Server-rendered pages using HTMX and Askama templates:

  • Dashboard with KPIs
  • Incident list and detail views
  • Approval workflow interface
  • Playbook management
  • Settings configuration

Authentication

  • Session-based auth for web dashboard
  • API key auth for programmatic access
  • Role-based access control (admin, analyst, viewer)

tw-core

Core domain logic and data access.

Domain Models

#![allow(unused)]
fn main() {
pub struct Incident {
    pub id: Uuid,
    pub incident_type: IncidentType,
    pub severity: Severity,
    pub status: IncidentStatus,
    pub source: String,
    pub raw_data: serde_json::Value,
    pub verdict: Option<Verdict>,
    pub confidence: Option<f64>,
    pub created_at: DateTime<Utc>,
}

pub struct Action {
    pub id: Uuid,
    pub incident_id: Uuid,
    pub action_type: ActionType,
    pub status: ActionStatus,
    pub approval_level: Option<ApprovalLevel>,
    pub executed_by: Option<String>,
}
}

Repositories

Database access layer with SQLite and PostgreSQL support:

  • IncidentRepository
  • ActionRepository
  • PlaybookRepository
  • UserRepository
  • AuditRepository

Event Bus

Async event distribution:

#![allow(unused)]
fn main() {
pub enum Event {
    IncidentCreated { id: Uuid },
    IncidentUpdated { id: Uuid },
    ActionRequested { id: Uuid },
    ActionApproved { id: Uuid, approver: String },
    ActionExecuted { id: Uuid, success: bool },
}
}

tw-actions

Action handlers for incident response.

Email Actions

ActionDescription
parse_emailExtract headers, body, attachments
check_email_authenticationValidate SPF/DKIM/DMARC
quarantine_emailMove to quarantine
block_senderAdd to blocklist

Lookup Actions

ActionDescription
lookup_sender_reputationCheck sender against threat intel
lookup_urlsAnalyze URLs in content
lookup_attachmentsHash and check attachments

Host Actions

ActionDescription
isolate_hostNetwork isolation via EDR
scan_hostTrigger endpoint scan

Notification Actions

ActionDescription
notify_userSend user notification
notify_reporterUpdate incident reporter
escalateRoute to approval level
create_ticketCreate Jira ticket

tw-policy

Policy engine for action approval.

Rule Evaluation

#![allow(unused)]
fn main() {
pub struct PolicyRule {
    pub name: String,
    pub action_type: ActionType,
    pub conditions: Vec<Condition>,
    pub approval_level: ApprovalLevel,
}

pub enum PolicyDecision {
    Allowed,
    Denied { reason: String },
    RequiresApproval { level: ApprovalLevel },
}
}

Approval Levels

  1. Auto - No approval required
  2. Analyst - Any analyst can approve
  3. Senior - Senior analyst required
  4. Manager - SOC manager required

tw-connectors

External service integrations.

Connector Trait

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Connector: Send + Sync {
    fn name(&self) -> &str;
    fn connector_type(&self) -> &str;
    async fn health_check(&self) -> ConnectorResult<ConnectorHealth>;
    async fn test_connection(&self) -> ConnectorResult<bool>;
}
}

Available Connectors

TypeImplementations
Threat IntelVirusTotal, Mock
SIEMSplunk, Mock
EDRCrowdStrike, Mock
Email GatewayMicrosoft 365, Mock
TicketingJira, Mock

tw-bridge

PyO3 bindings for Python integration.

Exposed Classes

from tw_bridge import ThreatIntelBridge, SIEMBridge, EDRBridge

# Use connectors from Python
threat_intel = ThreatIntelBridge("virustotal")
result = threat_intel.lookup_hash("abc123...")

tw_ai (Python)

AI triage and playbook execution.

Triage Agent

Claude-powered agent for incident analysis:

agent = TriageAgent(model="claude-sonnet-4-20250514")
verdict = await agent.analyze(incident)
# Returns: Verdict(classification="malicious", confidence=0.92, ...)

Playbook Engine

YAML-based playbook execution:

name: phishing_triage
steps:
  - action: parse_email
  - action: check_email_authentication
  - action: lookup_sender_reputation
  - condition: sender_reputation < 0.3
    action: quarantine_email

Data Flow

How data moves through Triage Warden from incident creation to resolution.

Incident Lifecycle

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Created   │────▶│   Triaging  │────▶│   Triaged   │────▶│  Resolved   │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │                   │
       ▼                   ▼                   ▼                   ▼
   Webhook/API        AI Agent          Actions Executed      Closed
   receives data      analyzes          (with approval)

Detailed Flow

1. Incident Creation

External Source (Email Gateway, SIEM, EDR)
                    │
                    ▼
            Webhook Endpoint
            /api/webhooks/:source
                    │
                    ▼
         ┌──────────────────┐
         │  Parse & Validate │
         │  Incoming Data    │
         └──────────────────┘
                    │
                    ▼
         ┌──────────────────┐
         │ Create Incident   │
         │ Record in DB      │
         └──────────────────┘
                    │
                    ▼
         ┌──────────────────┐
         │ Publish Event:    │
         │ IncidentCreated   │
         └──────────────────┘

2. AI Triage

         ┌──────────────────┐
         │ Event: Incident   │
         │ Created           │
         └──────────────────┘
                    │
                    ▼
         ┌──────────────────┐
         │ Load Playbook     │
         │ (based on type)   │
         └──────────────────┘
                    │
                    ▼
         ┌──────────────────┐
         │ Execute Playbook  │
         │ Steps             │
         └──────────────────┘
                    │
        ┌───────────┴───────────┐
        ▼                       ▼
┌───────────────┐       ┌───────────────┐
│ Enrichment    │       │ AI Analysis   │
│ Actions       │       │ (Claude)      │
│ - parse_email │       │               │
│ - lookup_*    │       │ Generates:    │
└───────────────┘       │ - Verdict     │
        │               │ - Confidence  │
        │               │ - Reasoning   │
        │               │ - Actions     │
        └───────┬───────└───────────────┘
                │
                ▼
         ┌──────────────────┐
         │ Update Incident   │
         │ with Verdict      │
         └──────────────────┘

3. Action Execution

         ┌──────────────────┐
         │ Action Request    │
         │ (from agent or    │
         │  human)           │
         └──────────────────┘
                    │
                    ▼
         ┌──────────────────┐
         │ Build Action      │
         │ Context           │
         └──────────────────┘
                    │
                    ▼
         ┌──────────────────┐
         │ Policy Engine     │
         │ Evaluation        │
         └──────────────────┘
                    │
        ┌───────────┼───────────┐
        ▼           ▼           ▼
   ┌────────┐  ┌────────┐  ┌────────┐
   │Allowed │  │Denied  │  │Requires│
   │        │  │        │  │Approval│
   └────────┘  └────────┘  └────────┘
        │           │           │
        ▼           ▼           ▼
   Execute      Return       Queue for
   Action       Error        Approval
        │                       │
        │                       ▼
        │              ┌──────────────┐
        │              │ Notify       │
        │              │ Approvers    │
        │              └──────────────┘
        │                       │
        │                       ▼
        │              ┌──────────────┐
        │              │ Wait for     │
        │              │ Approval     │
        │              └──────────────┘
        │                       │
        │        ┌──────────────┴──────────────┐
        │        ▼                             ▼
        │   ┌────────┐                    ┌────────┐
        │   │Approved│                    │Rejected│
        │   └────────┘                    └────────┘
        │        │                             │
        │        ▼                             ▼
        │   Execute Action               Update Status
        │        │
        └────────┴─────────┐
                           ▼
                  ┌──────────────┐
                  │ Connector    │
                  │ Execution    │
                  │ (External    │
                  │  Service)    │
                  └──────────────┘
                           │
                           ▼
                  ┌──────────────┐
                  │ Update       │
                  │ Action       │
                  │ Status       │
                  └──────────────┘
                           │
                           ▼
                  ┌──────────────┐
                  │ Audit Log    │
                  │ Entry        │
                  └──────────────┘

Data Stores

Primary Database

TablePurpose
incidentsIncident records
actionsAction requests and results
playbooksPlaybook definitions
usersUser accounts
sessionsActive sessions
api_keysAPI credentials
audit_logsAction audit trail
connectorsConnector configurations
policiesPolicy rules
notificationsNotification history
settingsSystem settings

Event Bus (In-Memory)

Transient event distribution for real-time updates:

  • Incident lifecycle events
  • Action status changes
  • Approval notifications
  • System health events

External Data Flow

Inbound (Webhooks)

Email Gateway ──────┐
SIEM Alerts ────────┼──▶ Webhook Handler ──▶ Incident Creation
EDR Events ─────────┘

Outbound (Connectors)

                           ┌──▶ VirusTotal (threat intel)
Action Execution ──────────┼──▶ Splunk (SIEM queries)
                           ├──▶ CrowdStrike (host actions)
                           ├──▶ M365 (email actions)
                           └──▶ Jira (ticketing)

Metrics Flow

Rust Components ──┬──▶ Prometheus Registry ──▶ /metrics endpoint
Python Components ─┘

Exposed metrics:

  • triage_warden_incidents_total{type, severity}
  • triage_warden_actions_total{action, status}
  • triage_warden_triage_duration_seconds{type}
  • triage_warden_connector_requests_total{connector, status}

Security Model

Triage Warden implements defense-in-depth with multiple security layers.

Authentication

Web Dashboard

Session-based authentication with secure cookies:

  • Session tokens: Random 256-bit tokens
  • Cookie settings: HttpOnly, Secure, SameSite=Lax
  • Session duration: 8 hours (configurable)
  • CSRF protection: Per-request tokens on all state-changing forms

API Access

API key authentication for programmatic access:

curl -H "Authorization: Bearer tw_abc123_secretkey" \
  https://api.example.com/api/incidents

API key features:

  • Prefix stored in plain text for lookup (tw_abc123)
  • Secret portion hashed with Argon2
  • Scopes limit allowed operations
  • Expiration dates supported

Authorization

Role-Based Access Control (RBAC)

RoleCapabilities
ViewerRead incidents, view dashboards
AnalystViewer + execute low-risk actions, approve analyst-level
Senior AnalystAnalyst + execute medium-risk actions, approve senior-level
AdminFull access, user management, system configuration

Policy-Based Action Control

The policy engine evaluates every action request:

#![allow(unused)]
fn main() {
// Policy evaluation flow
ActionRequest
    → Build ActionContext (action_type, target, severity, proposer)
    → Evaluate policy rules
    → Return PolicyDecision
        - Allowed: Execute immediately
        - Denied: Return error with reason
        - RequiresApproval: Queue for specified approval level
}

Example Policy Rules

# Low-risk actions auto-approve
[[policy.rules]]
name = "auto_approve_lookups"
action_patterns = ["lookup_*"]
decision = "allowed"

# High-severity host isolation requires manager
[[policy.rules]]
name = "isolate_requires_manager"
action = "isolate_host"
severity = ["high", "critical"]
approval_level = "manager"

# Block dangerous actions on production
[[policy.rules]]
name = "no_delete_in_prod"
action_patterns = ["delete_*"]
environment = "production"
decision = "denied"
reason = "Deletion not allowed in production"

Data Protection

At Rest

  • Database encryption: SQLite with SQLCipher (optional), PostgreSQL with TDE
  • Credential storage: All API keys/tokens hashed with Argon2id
  • Secrets management: Environment variables or external secret stores

In Transit

  • TLS 1.3: Required for all external connections
  • Certificate validation: Strict validation for connectors
  • Internal traffic: TLS optional for localhost development

Sensitive Data Handling

#![allow(unused)]
fn main() {
// Credentials redacted in logs
impl std::fmt::Debug for ApiKey {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "ApiKey {{ prefix: {}, secret: [REDACTED] }}", self.prefix)
    }
}
}

Audit Trail

All security-relevant actions logged:

EventData Captured
Loginuser_id, ip_address, success, timestamp
Logoutuser_id, session_duration
Action executedaction_id, user_id, incident_id, result
Action approvedaction_id, approver_id, decision
Policy changeuser_id, old_value, new_value
User managementadmin_id, target_user, operation

Audit log retention: 90 days (configurable)

Connector Security

Credential Management

Connector credentials stored encrypted:

# Environment variables (recommended)
TW_VIRUSTOTAL_API_KEY=your-key

# Or encrypted in database
tw-cli connector set virustotal --api-key "$(read -s)"

Rate Limiting

Built-in rate limiting prevents API abuse:

ConnectorDefault Limit
VirusTotal4 req/min (free tier)
Splunk100 req/min
CrowdStrike50 req/min

Circuit Breaker

Automatic failure handling:

#![allow(unused)]
fn main() {
// After 5 consecutive failures, circuit opens
// Requests fail fast for 30 seconds
// Then half-open state allows test requests
}

Input Validation

API Requests

  • JSON schema validation on all endpoints
  • Size limits on request bodies (1MB default)
  • Type coercion disabled (strict typing)

Webhook Payloads

  • HMAC signature verification
  • Replay attack prevention (timestamp validation)
  • Payload size limits
#![allow(unused)]
fn main() {
// Webhook signature verification
fn verify_webhook(payload: &[u8], signature: &str, secret: &str) -> bool {
    let expected = hmac_sha256(secret, payload);
    constant_time_compare(signature, &expected)
}
}

Secure Defaults

  • HTTPS enforced in production
  • Secure cookie flags enabled
  • CORS restricted to configured origins
  • Debug endpoints disabled in production
  • Verbose errors only in development

Security Headers

Default response headers:

Strict-Transport-Security: max-age=31536000; includeSubDomains
X-Content-Type-Options: nosniff
X-Frame-Options: DENY
X-XSS-Protection: 1; mode=block
Content-Security-Policy: default-src 'self'

Vulnerability Disclosure

Report security vulnerabilities to: security@example.com

We follow responsible disclosure practices and aim to respond within 48 hours.

Connectors

Connectors integrate Triage Warden with external security tools and services.

Overview

Each connector type has a trait interface and multiple implementations:

TypePurposeImplementations
Threat IntelligenceHash/URL/domain reputationVirusTotal, Mock
SIEMLog queries and correlationSplunk, Mock
EDREndpoint detection and responseCrowdStrike, Mock
Email GatewayEmail security operationsMicrosoft 365, Mock
TicketingIncident ticket managementJira, Mock

Configuration

Select connector implementations via environment variables:

# Use real connectors
TW_THREAT_INTEL_MODE=virustotal
TW_SIEM_MODE=splunk
TW_EDR_MODE=crowdstrike
TW_EMAIL_GATEWAY_MODE=m365
TW_TICKETING_MODE=jira

# Or use mocks for testing
TW_THREAT_INTEL_MODE=mock
TW_SIEM_MODE=mock

Connector Trait

All connectors implement the base Connector trait:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Connector: Send + Sync {
    /// Unique identifier for this connector instance
    fn name(&self) -> &str;

    /// Type of connector (threat_intel, siem, edr, etc.)
    fn connector_type(&self) -> &str;

    /// Check connector health
    async fn health_check(&self) -> ConnectorResult<ConnectorHealth>;

    /// Test connection to the service
    async fn test_connection(&self) -> ConnectorResult<bool>;
}

pub enum ConnectorHealth {
    Healthy,
    Degraded { message: String },
    Unhealthy { message: String },
}
}

Error Handling

Connectors return ConnectorResult<T> with detailed error types:

#![allow(unused)]
fn main() {
pub enum ConnectorError {
    /// Service returned an error
    RequestFailed(String),

    /// Resource not found
    NotFound(String),

    /// Authentication failed
    AuthenticationFailed(String),

    /// Rate limit exceeded
    RateLimited { retry_after: Option<Duration> },

    /// Network or connection error
    NetworkError(String),

    /// Invalid response from service
    InvalidResponse(String),
}
}

Health Monitoring

Check connector health via the API:

curl http://localhost:8080/api/connectors/health

{
  "connectors": [
    { "name": "virustotal", "type": "threat_intel", "status": "healthy" },
    { "name": "splunk", "type": "siem", "status": "healthy" },
    { "name": "crowdstrike", "type": "edr", "status": "degraded", "message": "High latency" }
  ]
}

Next Steps

Threat Intelligence Connector

Query threat intelligence services for reputation data on hashes, URLs, domains, and IP addresses.

Interface

#![allow(unused)]
fn main() {
#[async_trait]
pub trait ThreatIntelConnector: Connector {
    /// Look up file hash reputation
    async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport>;

    /// Look up URL reputation
    async fn lookup_url(&self, url: &str) -> ConnectorResult<ThreatReport>;

    /// Look up domain reputation
    async fn lookup_domain(&self, domain: &str) -> ConnectorResult<ThreatReport>;

    /// Look up IP address reputation
    async fn lookup_ip(&self, ip: &str) -> ConnectorResult<ThreatReport>;
}

pub struct ThreatReport {
    pub indicator: String,
    pub indicator_type: IndicatorType,
    pub malicious: bool,
    pub confidence: f64,
    pub categories: Vec<String>,
    pub first_seen: Option<DateTime<Utc>>,
    pub last_seen: Option<DateTime<Utc>>,
    pub sources: Vec<ThreatSource>,
}
}

VirusTotal

Configuration

TW_THREAT_INTEL_MODE=virustotal
TW_VIRUSTOTAL_API_KEY=your-api-key-here

Rate Limits

TierRequests/Minute
Free4
Premium500+

The connector automatically handles rate limiting with exponential backoff.

Supported Lookups

MethodVT EndpointNotes
lookup_hash/files/{hash}MD5, SHA1, SHA256
lookup_url/urls/{url_id}Base64-encoded URL
lookup_domain/domains/{domain}Domain reputation
lookup_ip/ip_addresses/{ip}IP reputation

Example Usage

#![allow(unused)]
fn main() {
let connector = VirusTotalConnector::new(api_key)?;

let report = connector.lookup_hash("44d88612fea8a8f36de82e1278abb02f").await?;
println!("Malicious: {}", report.malicious);
println!("Confidence: {:.2}", report.confidence);
println!("Categories: {:?}", report.categories);
}

Response Mapping

VirusTotal detection ratios map to confidence scores:

Detection RatioConfidenceClassification
0%0.0Clean
1-10%0.3Suspicious
11-50%0.6Likely Malicious
51-100%0.9Malicious

Mock Connector

For testing without external API calls:

TW_THREAT_INTEL_MODE=mock

The mock connector returns predictable results based on indicator patterns:

PatternResult
Contains "malicious"Malicious, confidence 0.95
Contains "suspicious"Suspicious, confidence 0.5
Contains "clean"Clean, confidence 0.1
DefaultClean, confidence 0.2

Python Bridge

Access from Python via the bridge:

from tw_bridge import ThreatIntelBridge

# Create bridge (uses TW_THREAT_INTEL_MODE env var)
bridge = ThreatIntelBridge()

# Or specify mode explicitly
bridge = ThreatIntelBridge("virustotal")

# Lookup hash
result = bridge.lookup_hash("44d88612fea8a8f36de82e1278abb02f")
print(f"Malicious: {result['malicious']}")
print(f"Confidence: {result['confidence']}")

# Lookup URL
result = bridge.lookup_url("https://example.com/suspicious")

# Lookup domain
result = bridge.lookup_domain("malware-site.com")

Caching

Results are cached to reduce API calls:

Lookup TypeCache Duration
Hash24 hours
URL1 hour
Domain6 hours
IP6 hours

Cache is stored in the database and shared across instances.

Adding Custom Providers

Implement the ThreatIntelConnector trait:

#![allow(unused)]
fn main() {
pub struct CustomThreatIntelConnector {
    client: reqwest::Client,
    api_key: String,
}

#[async_trait]
impl Connector for CustomThreatIntelConnector {
    fn name(&self) -> &str { "custom" }
    fn connector_type(&self) -> &str { "threat_intel" }
    // ... implement health_check, test_connection
}

#[async_trait]
impl ThreatIntelConnector for CustomThreatIntelConnector {
    async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport> {
        // Custom implementation
    }
    // ... implement other methods
}
}

See Adding Connectors for full details.

SIEM Connector

Query SIEM platforms for log data, run searches, and correlate events.

Interface

#![allow(unused)]
fn main() {
#[async_trait]
pub trait SIEMConnector: Connector {
    /// Run a search query
    async fn search(&self, query: &str, time_range: TimeRange) -> ConnectorResult<SearchResults>;

    /// Get events by ID
    async fn get_events(&self, event_ids: &[String]) -> ConnectorResult<Vec<SIEMEvent>>;

    /// Get related events (correlation)
    async fn get_related_events(
        &self,
        indicator: &str,
        indicator_type: IndicatorType,
        time_range: TimeRange,
    ) -> ConnectorResult<Vec<SIEMEvent>>;
}

pub struct SIEMEvent {
    pub id: String,
    pub timestamp: DateTime<Utc>,
    pub source: String,
    pub event_type: String,
    pub severity: String,
    pub raw_data: serde_json::Value,
}

pub struct SearchResults {
    pub events: Vec<SIEMEvent>,
    pub total_count: u64,
    pub search_id: String,
}
}

Splunk

Configuration

TW_SIEM_MODE=splunk
TW_SPLUNK_URL=https://splunk.company.com:8089
TW_SPLUNK_TOKEN=your-token-here

Token Permissions

The Splunk token requires these capabilities:

  • search - Run searches
  • list_inputs - Health check
  • rest_access - REST API access

Example Searches

#![allow(unused)]
fn main() {
let connector = SplunkConnector::new(url, token)?;

// Search for events
let results = connector.search(
    r#"index=security sourcetype=firewall action=blocked"#,
    TimeRange::last_hours(24),
).await?;

// Find related events by IP
let related = connector.get_related_events(
    "192.168.1.100",
    IndicatorType::IpAddress,
    TimeRange::last_hours(1),
).await?;
}

Search Query Translation

Common queries translated to SPL:

Triage Warden QuerySplunk SPL
IP correlationindex=* src_ip="{ip}" OR dest_ip="{ip}"
User activityindex=* user="{user}"
Hash lookupindex=* (file_hash="{hash}" OR sha256="{hash}")

Performance Tips

  • Use specific indexes in queries
  • Limit time ranges when possible
  • Use | head 1000 to limit results

Mock Connector

For testing:

TW_SIEM_MODE=mock

The mock returns sample security events matching the query pattern.

Python Bridge

from tw_bridge import SIEMBridge

bridge = SIEMBridge("splunk")

# Run a search
results = bridge.search(
    query='index=security action=blocked',
    hours=24
)

for event in results['events']:
    print(f"{event['timestamp']}: {event['source']}")

# Get related events
related = bridge.get_related_events(
    indicator="192.168.1.100",
    indicator_type="ip",
    hours=1
)

Adding Custom SIEM

Implement the SIEMConnector trait:

#![allow(unused)]
fn main() {
pub struct ElasticSIEMConnector {
    client: elasticsearch::Elasticsearch,
}

#[async_trait]
impl SIEMConnector for ElasticSIEMConnector {
    async fn search(&self, query: &str, time_range: TimeRange) -> ConnectorResult<SearchResults> {
        // Translate to Elasticsearch DSL and execute
    }
    // ... implement other methods
}
}

EDR Connector

Integrate with Endpoint Detection and Response platforms for host information and response actions.

Interface

#![allow(unused)]
fn main() {
#[async_trait]
pub trait EDRConnector: Connector {
    /// Get host information
    async fn get_host(&self, host_id: &str) -> ConnectorResult<HostInfo>;

    /// Search for hosts
    async fn search_hosts(&self, query: &str) -> ConnectorResult<Vec<HostInfo>>;

    /// Get recent detections for a host
    async fn get_detections(&self, host_id: &str) -> ConnectorResult<Vec<Detection>>;

    /// Isolate a host from the network
    async fn isolate_host(&self, host_id: &str) -> ConnectorResult<ActionResult>;

    /// Remove host isolation
    async fn unisolate_host(&self, host_id: &str) -> ConnectorResult<ActionResult>;

    /// Trigger a scan on the host
    async fn scan_host(&self, host_id: &str) -> ConnectorResult<ActionResult>;
}

pub struct HostInfo {
    pub id: String,
    pub hostname: String,
    pub platform: String,
    pub os_version: String,
    pub agent_version: String,
    pub last_seen: DateTime<Utc>,
    pub isolation_status: IsolationStatus,
    pub tags: Vec<String>,
}

pub struct Detection {
    pub id: String,
    pub timestamp: DateTime<Utc>,
    pub severity: String,
    pub tactic: String,
    pub technique: String,
    pub description: String,
    pub process_name: Option<String>,
    pub file_path: Option<String>,
}
}

CrowdStrike

Configuration

TW_EDR_MODE=crowdstrike
TW_CROWDSTRIKE_CLIENT_ID=your-client-id
TW_CROWDSTRIKE_CLIENT_SECRET=your-client-secret
TW_CROWDSTRIKE_REGION=us-1  # us-1, us-2, eu-1, usgov-1

API Scopes Required

The API client requires these scopes:

  • Hosts: Read - Get host information
  • Hosts: Write - Isolation actions
  • Detections: Read - Get detections
  • Real Time Response: Write - Scan actions

OAuth2 Token Management

The connector automatically handles token refresh:

#![allow(unused)]
fn main() {
// Token refreshed automatically when expired
let connector = CrowdStrikeConnector::new(client_id, client_secret, region)?;

// All subsequent calls use valid token
let host = connector.get_host("abc123").await?;
}

Example Usage

#![allow(unused)]
fn main() {
// Get host information
let host = connector.get_host("aid:abc123").await?;
println!("Hostname: {}", host.hostname);
println!("Last seen: {}", host.last_seen);

// Check for detections
let detections = connector.get_detections("aid:abc123").await?;
for d in detections {
    println!("{}: {} - {}", d.timestamp, d.severity, d.description);
}

// Isolate compromised host
let result = connector.isolate_host("aid:abc123").await?;
if result.success {
    println!("Host isolated successfully");
}
}

Action Confirmation

Isolation and scan actions require policy approval. See Policy Engine.

Mock Connector

TW_EDR_MODE=mock

The mock provides sample hosts and detections for testing.

Python Bridge

from tw_bridge import EDRBridge

bridge = EDRBridge("crowdstrike")

# Get host info
host = bridge.get_host("aid:abc123")
print(f"Hostname: {host['hostname']}")
print(f"Platform: {host['platform']}")

# Get detections
detections = bridge.get_detections("aid:abc123")
for d in detections:
    print(f"{d['severity']}: {d['description']}")

# Isolate host (requires policy approval)
result = bridge.isolate_host("aid:abc123")
if result['success']:
    print("Host isolated")

Response Actions

ActionDescriptionRollback
isolate_hostNetwork isolationunisolate_host
scan_hostOn-demand scanN/A

Isolation Behavior

When isolated:

  • Host cannot communicate on network
  • Falcon agent maintains connection to cloud
  • User may see isolation notification

Rate Limits

EndpointLimit
Host queries100/min
Detection queries50/min
Containment actions10/min

Email Gateway Connector

Manage email security operations including search, quarantine, and sender blocking.

Interface

#![allow(unused)]
fn main() {
#[async_trait]
pub trait EmailGatewayConnector: Connector {
    /// Search for emails
    async fn search_emails(&self, query: EmailSearchQuery) -> ConnectorResult<Vec<EmailMessage>>;

    /// Get specific email by ID
    async fn get_email(&self, message_id: &str) -> ConnectorResult<EmailMessage>;

    /// Move email to quarantine
    async fn quarantine_email(&self, message_id: &str) -> ConnectorResult<ActionResult>;

    /// Release email from quarantine
    async fn release_email(&self, message_id: &str) -> ConnectorResult<ActionResult>;

    /// Block sender
    async fn block_sender(&self, sender: &str) -> ConnectorResult<ActionResult>;

    /// Unblock sender
    async fn unblock_sender(&self, sender: &str) -> ConnectorResult<ActionResult>;

    /// Get threat data for email
    async fn get_threat_data(&self, message_id: &str) -> ConnectorResult<EmailThreatData>;
}

pub struct EmailMessage {
    pub id: String,
    pub internet_message_id: String,
    pub sender: String,
    pub recipients: Vec<String>,
    pub subject: String,
    pub received_at: DateTime<Utc>,
    pub has_attachments: bool,
    pub attachments: Vec<EmailAttachment>,
    pub urls: Vec<String>,
    pub headers: HashMap<String, String>,
    pub threat_assessment: Option<ThreatAssessment>,
}

pub struct EmailSearchQuery {
    pub sender: Option<String>,
    pub recipient: Option<String>,
    pub subject_contains: Option<String>,
    pub timerange: TimeRange,
    pub has_attachments: Option<bool>,
    pub threat_type: Option<String>,
    pub limit: usize,
}
}

Microsoft 365

Configuration

TW_EMAIL_GATEWAY_MODE=m365
TW_M365_TENANT_ID=your-tenant-id
TW_M365_CLIENT_ID=your-client-id
TW_M365_CLIENT_SECRET=your-client-secret

App Registration

Create an Azure AD app registration with these API permissions:

PermissionTypePurpose
Mail.ReadApplicationRead emails
Mail.ReadWriteApplicationQuarantine actions
ThreatAssessment.Read.AllApplicationThreat data
Policy.Read.AllApplicationBlock list management

Example Usage

#![allow(unused)]
fn main() {
let connector = M365Connector::new(tenant_id, client_id, client_secret)?;

// Search for suspicious emails
let query = EmailSearchQuery {
    sender: Some("suspicious@domain.com".to_string()),
    timerange: TimeRange::last_hours(24),
    ..Default::default()
};
let emails = connector.search_emails(query).await?;

// Quarantine malicious email
let result = connector.quarantine_email("AAMkAGI2...").await?;

// Block sender
let result = connector.block_sender("phisher@malicious.com").await?;
}

Quarantine Behavior

When quarantined:

  • Email moved to quarantine folder
  • User notified (configurable)
  • Admin can release if false positive

Mock Connector

TW_EMAIL_GATEWAY_MODE=mock

Provides sample emails with various threat characteristics:

  • Phishing with malicious URLs
  • Malware with executable attachments
  • BEC/impersonation attempts
  • Clean legitimate emails

Python Bridge

from tw_bridge import EmailGatewayBridge

bridge = EmailGatewayBridge("m365")

# Search emails
emails = bridge.search_emails(
    sender="suspicious@domain.com",
    hours=24
)

for email in emails:
    print(f"From: {email['sender']}")
    print(f"Subject: {email['subject']}")
    print(f"Attachments: {len(email['attachments'])}")

# Quarantine email
result = bridge.quarantine_email("AAMkAGI2...")
if result['success']:
    print("Email quarantined")

# Block sender
result = bridge.block_sender("phisher@malicious.com")

Response Actions

ActionDescriptionRollback
quarantine_emailMove to quarantinerelease_email
block_senderAdd to blocklistunblock_sender

Threat Data

Get detailed threat information:

#![allow(unused)]
fn main() {
let threat_data = connector.get_threat_data("AAMkAGI2...").await?;

println!("Delivery action: {}", threat_data.delivery_action);
println!("Threat types: {:?}", threat_data.threat_types);
println!("Detection methods: {:?}", threat_data.detection_methods);
}

Fields:

  • delivery_action: Delivered, Quarantined, Blocked
  • threat_types: Phishing, Malware, Spam, BEC
  • detection_methods: URLAnalysis, AttachmentScanning, ImpersonationDetection
  • urls_clicked: URLs clicked by recipient (if tracking enabled)

Ticketing Connector

Create and manage security incident tickets in external ticketing systems.

Interface

#![allow(unused)]
fn main() {
#[async_trait]
pub trait TicketingConnector: Connector {
    /// Create a new ticket
    async fn create_ticket(&self, ticket: CreateTicketRequest) -> ConnectorResult<Ticket>;

    /// Get ticket by ID
    async fn get_ticket(&self, ticket_id: &str) -> ConnectorResult<Ticket>;

    /// Update ticket fields
    async fn update_ticket(&self, ticket_id: &str, update: UpdateTicketRequest) -> ConnectorResult<Ticket>;

    /// Add comment to ticket
    async fn add_comment(&self, ticket_id: &str, comment: &str) -> ConnectorResult<()>;

    /// Search tickets
    async fn search_tickets(&self, query: TicketSearchQuery) -> ConnectorResult<Vec<Ticket>>;
}

pub struct CreateTicketRequest {
    pub title: String,
    pub description: String,
    pub priority: TicketPriority,
    pub ticket_type: String,
    pub labels: Vec<String>,
    pub assignee: Option<String>,
    pub custom_fields: HashMap<String, String>,
}

pub struct Ticket {
    pub id: String,
    pub key: String,
    pub title: String,
    pub description: String,
    pub status: String,
    pub priority: TicketPriority,
    pub assignee: Option<String>,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub url: String,
}
}

Jira

Configuration

TW_TICKETING_MODE=jira
TW_JIRA_URL=https://company.atlassian.net
TW_JIRA_EMAIL=automation@company.com
TW_JIRA_API_TOKEN=your-api-token
TW_JIRA_PROJECT_KEY=SEC

API Token

Generate an API token at: https://id.atlassian.com/manage-profile/security/api-tokens

Required permissions:

  • Create issues
  • Edit issues
  • Add comments
  • Browse project

Example Usage

#![allow(unused)]
fn main() {
let connector = JiraConnector::new(url, email, token, project_key)?;

// Create security ticket
let request = CreateTicketRequest {
    title: "Phishing Incident - INC-2024-001".to_string(),
    description: "Phishing email detected and quarantined.\n\n## Details\n...".to_string(),
    priority: TicketPriority::High,
    ticket_type: "Security Incident".to_string(),
    labels: vec!["phishing".to_string(), "triage-warden".to_string()],
    assignee: Some("analyst@company.com".to_string()),
    custom_fields: HashMap::new(),
};

let ticket = connector.create_ticket(request).await?;
println!("Created: {} - {}", ticket.key, ticket.url);

// Add investigation notes
connector.add_comment(
    &ticket.id,
    "## Investigation Notes\n\n- Sender reputation: Malicious\n- URLs: 2 phishing links"
).await?;
}

Issue Types

Configure the Jira project with these issue types:

Issue TypeUsage
Security IncidentMain incident ticket
InvestigationSub-task for investigation steps
RemediationSub-task for response actions

Custom Fields

Map custom fields in configuration:

TW_JIRA_FIELD_SEVERITY=customfield_10001
TW_JIRA_FIELD_INCIDENT_ID=customfield_10002
TW_JIRA_FIELD_VERDICT=customfield_10003

Mock Connector

TW_TICKETING_MODE=mock

Simulates ticket operations with in-memory storage.

Python Bridge

from tw_bridge import TicketingBridge

bridge = TicketingBridge("jira")

# Create ticket
ticket = bridge.create_ticket(
    title="Phishing Incident - INC-2024-001",
    description="Phishing email detected...",
    priority="high",
    ticket_type="Security Incident",
    labels=["phishing", "triage-warden"]
)
print(f"Created: {ticket['key']}")
print(f"URL: {ticket['url']}")

# Add comment
bridge.add_comment(
    ticket_id=ticket['id'],
    comment="Investigation complete. Verdict: Malicious"
)

# Update status
bridge.update_ticket(
    ticket_id=ticket['id'],
    status="Done"
)

# Search tickets
tickets = bridge.search_tickets(
    query="project = SEC AND labels = phishing",
    limit=10
)

Ticket Templates

Define templates for consistent ticket creation:

# config/ticket_templates.toml

[templates.phishing]
title = "Phishing: {subject}"
description = """
## Incident Summary
- **Type**: Phishing
- **Severity**: {severity}
- **Incident ID**: {incident_id}

## Details
{details}

## Recommended Actions
{recommended_actions}
"""
labels = ["phishing", "triage-warden"]

[templates.malware]
title = "Malware Alert: {hostname}"
description = """
## Incident Summary
- **Type**: Malware
- **Host**: {hostname}
- **Detection**: {detection}

## IOCs
{iocs}
"""
labels = ["malware", "triage-warden"]

Integration with Incidents

Tickets are automatically linked to incidents:

#![allow(unused)]
fn main() {
// Create ticket action stores the ticket key
let action = execute_action("create_ticket", incident_id, params).await?;

// Incident updated with ticket reference
incident.metadata["ticket_key"] = "SEC-1234";
incident.metadata["ticket_url"] = "https://company.atlassian.net/browse/SEC-1234";
}

Actions

Actions are the executable operations that Triage Warden can perform in response to incidents.

Overview

Actions fall into several categories:

CategoryPurposeExamples
AnalysisExtract and parse dataparse_email, check_email_authentication
LookupEnrich with external datalookup_sender_reputation, lookup_urls
ResponseTake containment actionsquarantine_email, isolate_host
NotificationAlert stakeholdersnotify_user, escalate
TicketingCreate/update ticketscreate_ticket, add_ticket_comment

Action Trait

All actions implement the Action trait:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Action: Send + Sync {
    /// Action name (used in playbooks and API)
    fn name(&self) -> &str;

    /// Human-readable description
    fn description(&self) -> &str;

    /// Required and optional parameters
    fn required_parameters(&self) -> Vec<ParameterDef>;

    /// Whether this action supports rollback
    fn supports_rollback(&self) -> bool;

    /// Execute the action
    async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError>;

    /// Rollback the action (if supported)
    async fn rollback(&self, context: ActionContext) -> Result<ActionResult, ActionError> {
        Err(ActionError::RollbackNotSupported)
    }
}
}

Action Context

Actions receive an ActionContext with:

#![allow(unused)]
fn main() {
pub struct ActionContext {
    /// Unique execution ID
    pub execution_id: Uuid,

    /// Parameters passed to the action
    pub parameters: HashMap<String, serde_json::Value>,

    /// Related incident (if any)
    pub incident_id: Option<Uuid>,

    /// User or agent requesting the action
    pub proposer: String,

    /// Connectors available for use
    pub connectors: ConnectorRegistry,
}
}

Action Result

Actions return an ActionResult:

#![allow(unused)]
fn main() {
pub struct ActionResult {
    /// Whether the action succeeded
    pub success: bool,

    /// Action name
    pub action_name: String,

    /// Human-readable summary
    pub message: String,

    /// Execution duration
    pub duration: Duration,

    /// Output data (action-specific)
    pub output: HashMap<String, serde_json::Value>,

    /// Whether rollback is available
    pub rollback_available: bool,
}
}

Policy Integration

All actions pass through the policy engine before execution:

Action Request → Policy Evaluation → Decision
                                       ├─ Allowed → Execute
                                       ├─ Denied → Return Error
                                       └─ RequiresApproval → Queue

See Policy Engine for approval configuration.

Executing Actions

Via API

curl -X POST http://localhost:8080/api/incidents/{id}/actions \
  -H "Content-Type: application/json" \
  -d '{
    "action": "quarantine_email",
    "parameters": {
      "message_id": "AAMkAGI2...",
      "reason": "Phishing detected"
    }
  }'

Via CLI

tw-cli action execute \
  --incident INC-2024-001 \
  --action quarantine_email \
  --param message_id=AAMkAGI2... \
  --param reason="Phishing detected"

Via Playbook

steps:
  - action: quarantine_email
    parameters:
      message_id: "{{ incident.raw_data.message_id }}"
      reason: "Automated response to phishing"

Available Actions

Email Actions

Actions for analyzing and responding to email-based threats.

Analysis Actions

parse_email

Extract headers, body, attachments, and URLs from raw email.

Parameters:

NameTypeRequiredDescription
raw_emailstringYesRaw email content (RFC 822)

Output:

{
  "headers": {
    "From": "sender@example.com",
    "To": "recipient@company.com",
    "Subject": "Important Document",
    "Date": "2024-01-15T10:30:00Z",
    "Message-ID": "<abc123@example.com>",
    "X-Originating-IP": "[192.168.1.100]"
  },
  "sender": "sender@example.com",
  "recipients": ["recipient@company.com"],
  "subject": "Important Document",
  "body_text": "Please review the attached document...",
  "body_html": "<html>...",
  "attachments": [
    {
      "filename": "document.pdf",
      "content_type": "application/pdf",
      "size": 102400,
      "sha256": "abc123..."
    }
  ],
  "urls": [
    "https://example.com/document",
    "https://suspicious-site.com/login"
  ]
}

check_email_authentication

Validate SPF, DKIM, and DMARC authentication results.

Parameters:

NameTypeRequiredDescription
headersobjectYesEmail headers (from parse_email)

Output:

{
  "spf": {
    "result": "pass",
    "domain": "example.com"
  },
  "dkim": {
    "result": "pass",
    "domain": "example.com",
    "selector": "default"
  },
  "dmarc": {
    "result": "pass",
    "policy": "reject"
  },
  "authentication_passed": true,
  "risk_indicators": []
}

Risk Indicators:

  • spf_fail - SPF validation failed
  • dkim_fail - DKIM signature invalid
  • dmarc_fail - DMARC policy violation
  • header_mismatch - From/Reply-To mismatch
  • suspicious_routing - Unusual mail routing

Response Actions

quarantine_email

Move email to quarantine via email gateway.

Parameters:

NameTypeRequiredDescription
message_idstringYesEmail message ID
reasonstringNoReason for quarantine

Output:

{
  "quarantine_id": "quar-abc123",
  "message_id": "AAMkAGI2...",
  "quarantined_at": "2024-01-15T10:35:00Z"
}

Rollback: release_email - Releases email from quarantine

block_sender

Add sender to organization blocklist.

Parameters:

NameTypeRequiredDescription
senderstringYesEmail address to block
scopestringNoBlock scope: organization or user

Output:

{
  "block_id": "block-abc123",
  "sender": "phisher@malicious.com",
  "scope": "organization",
  "blocked_at": "2024-01-15T10:35:00Z"
}

Rollback: unblock_sender - Removes sender from blocklist

Usage Examples

Phishing Response Playbook

name: phishing_response
steps:
  - action: parse_email
    output: parsed

  - action: check_email_authentication
    parameters:
      headers: "{{ parsed.headers }}"
    output: auth

  - action: lookup_sender_reputation
    parameters:
      sender: "{{ parsed.sender }}"
    output: reputation

  - condition: "reputation.score < 0.3 or not auth.authentication_passed"
    action: quarantine_email
    parameters:
      message_id: "{{ incident.raw_data.message_id }}"
      reason: "Failed authentication and low sender reputation"

  - condition: "reputation.score < 0.2"
    action: block_sender
    parameters:
      sender: "{{ parsed.sender }}"
      scope: organization

CLI Example

# Quarantine suspicious email
tw-cli action execute \
  --action quarantine_email \
  --param message_id="AAMkAGI2..." \
  --param reason="Phishing indicators detected"

# Block malicious sender
tw-cli action execute \
  --action block_sender \
  --param sender="phisher@malicious.com" \
  --param scope=organization

Host Actions

Actions for endpoint containment and investigation.

isolate_host

Network-isolate a compromised host via EDR.

Parameters:

NameTypeRequiredDescription
host_idstringYesEDR host/agent ID
reasonstringNoReason for isolation

Output:

{
  "isolation_id": "iso-abc123",
  "host_id": "aid:xyz789",
  "hostname": "WORKSTATION-01",
  "isolated_at": "2024-01-15T10:40:00Z",
  "status": "isolated"
}

Behavior:

  • Host network access blocked
  • EDR agent maintains cloud connectivity
  • User notified (configurable)

Rollback: unisolate_host

Policy: Typically requires senior analyst or manager approval.

unisolate_host

Remove network isolation from a host.

Parameters:

NameTypeRequiredDescription
host_idstringYesEDR host/agent ID
reasonstringNoReason for removing isolation

Output:

{
  "host_id": "aid:xyz789",
  "hostname": "WORKSTATION-01",
  "unisolated_at": "2024-01-15T14:00:00Z",
  "status": "active"
}

scan_host

Trigger on-demand malware scan on a host.

Parameters:

NameTypeRequiredDescription
host_idstringYesEDR host/agent ID
scan_typestringNoquick or full (default: quick)

Output:

{
  "scan_id": "scan-abc123",
  "host_id": "aid:xyz789",
  "scan_type": "quick",
  "started_at": "2024-01-15T10:45:00Z",
  "status": "running"
}

Note: Scan results are retrieved separately as they may take time.

Usage Examples

Malware Response Playbook

name: malware_response
steps:
  - action: isolate_host
    parameters:
      host_id: "{{ incident.raw_data.host_id }}"
      reason: "Malware detection - automated isolation"
    output: isolation

  - action: scan_host
    parameters:
      host_id: "{{ incident.raw_data.host_id }}"
      scan_type: full

  - action: create_ticket
    parameters:
      title: "Malware Incident - {{ incident.raw_data.hostname }}"
      priority: high

  - action: notify_user
    parameters:
      user: "{{ incident.raw_data.user }}"
      message: "Your workstation has been isolated due to a security incident"

CLI Example

# Isolate compromised host
tw-cli action execute \
  --action isolate_host \
  --param host_id="aid:xyz789" \
  --param reason="Active malware infection"

# This action typically requires approval
# Check approval status:
tw-cli action status act-123456

# After investigation, remove isolation:
tw-cli action execute \
  --action unisolate_host \
  --param host_id="aid:xyz789" \
  --param reason="Malware cleaned, host verified"

API Example

# Request host isolation
curl -X POST http://localhost:8080/api/incidents/INC-2024-001/actions \
  -H "Content-Type: application/json" \
  -d '{
    "action": "isolate_host",
    "parameters": {
      "host_id": "aid:xyz789",
      "reason": "Suspected compromise"
    }
  }'

# Response (if requires approval):
{
  "action_id": "act-abc123",
  "status": "pending_approval",
  "approval_level": "manager",
  "message": "Action requires SOC Manager approval"
}

Policy Configuration

Host actions are typically high-impact and require approval:

[[policy.rules]]
name = "isolate_requires_approval"
action = "isolate_host"
approval_level = "senior"

[[policy.rules]]
name = "critical_isolate_requires_manager"
action = "isolate_host"
severity = ["critical"]
approval_level = "manager"

Lookup Actions

Actions for enriching incidents with threat intelligence data.

lookup_sender_reputation

Query threat intelligence for sender domain and IP reputation.

Parameters:

NameTypeRequiredDescription
senderstringYesEmail address
originating_ipstringNoSending server IP

Output:

{
  "sender": "suspicious@domain.com",
  "domain": "domain.com",
  "domain_reputation": {
    "score": 0.25,
    "categories": ["phishing", "newly-registered"],
    "first_seen": "2024-01-10",
    "registrar": "NameCheap"
  },
  "ip_reputation": {
    "ip": "192.168.1.100",
    "score": 0.3,
    "categories": ["spam", "proxy"],
    "country": "RU",
    "asn": "AS12345"
  },
  "overall_score": 0.25,
  "risk_level": "high"
}

Score Interpretation:

ScoreRisk Level
0.0 - 0.3High risk
0.3 - 0.6Medium risk
0.6 - 0.8Low risk
0.8 - 1.0Clean

lookup_urls

Check URLs against threat intelligence.

Parameters:

NameTypeRequiredDescription
urlsarrayYesList of URLs to check

Output:

{
  "results": [
    {
      "url": "https://legitimate-site.com/page",
      "malicious": false,
      "categories": ["business"],
      "confidence": 0.95
    },
    {
      "url": "https://phishing-site.com/login",
      "malicious": true,
      "categories": ["phishing", "credential-theft"],
      "confidence": 0.92,
      "threat_details": {
        "targeted_brand": "Microsoft",
        "first_seen": "2024-01-14"
      }
    }
  ],
  "malicious_count": 1,
  "total_count": 2
}

lookup_attachments

Hash attachments and check against threat intelligence.

Parameters:

NameTypeRequiredDescription
attachmentsarrayYesList of attachment objects with sha256

Output:

{
  "results": [
    {
      "filename": "invoice.pdf",
      "sha256": "abc123...",
      "malicious": false,
      "file_type": "PDF document",
      "confidence": 0.9
    },
    {
      "filename": "update.exe",
      "sha256": "def456...",
      "malicious": true,
      "file_type": "Windows executable",
      "confidence": 0.98,
      "threat_details": {
        "malware_family": "Emotet",
        "first_seen": "2024-01-12",
        "detection_engines": 45
      }
    }
  ],
  "malicious_count": 1,
  "total_count": 2
}

lookup_hash

Look up a single file hash.

Parameters:

NameTypeRequiredDescription
hashstringYesMD5, SHA1, or SHA256 hash

Output:

{
  "hash": "abc123...",
  "hash_type": "sha256",
  "malicious": true,
  "confidence": 0.95,
  "malware_family": "Emotet",
  "categories": ["trojan", "banking"],
  "first_seen": "2024-01-12",
  "last_seen": "2024-01-15",
  "detection_ratio": "45/70"
}

lookup_ip

Query IP address reputation.

Parameters:

NameTypeRequiredDescription
ipstringYesIP address

Output:

{
  "ip": "192.168.1.100",
  "malicious": true,
  "confidence": 0.8,
  "categories": ["c2", "malware-distribution"],
  "country": "RU",
  "asn": "AS12345",
  "asn_org": "Example ISP",
  "last_seen": "2024-01-15",
  "associated_malware": ["Cobalt Strike"]
}

Usage in Playbooks

name: email_triage
steps:
  - action: parse_email
    output: parsed

  - action: lookup_sender_reputation
    parameters:
      sender: "{{ parsed.sender }}"
    output: sender_rep

  - action: lookup_urls
    parameters:
      urls: "{{ parsed.urls }}"
    output: url_results

  - action: lookup_attachments
    parameters:
      attachments: "{{ parsed.attachments }}"
    output: attachment_results

  # Make decision based on lookups
  - condition: >
      sender_rep.risk_level == 'high' or
      url_results.malicious_count > 0 or
      attachment_results.malicious_count > 0
    set_verdict:
      classification: malicious
      confidence: 0.9

Caching

Lookup results are cached to reduce API calls:

LookupCache Duration
Hash24 hours
URL1 hour
Domain6 hours
IP6 hours

Force fresh lookup with skip_cache: true parameter.

Notification Actions

Actions for alerting stakeholders and managing escalation.

notify_user

Send notification to an affected user.

Parameters:

NameTypeRequiredDescription
userstringYesUser email or ID
messagestringYesNotification message
channelstringNoemail, slack, teams (default: email)
templatestringNoNotification template name

Output:

{
  "notification_id": "notif-abc123",
  "recipient": "user@company.com",
  "channel": "email",
  "sent_at": "2024-01-15T10:50:00Z",
  "status": "delivered"
}

Templates:

# templates/notifications.yaml
security_alert:
  subject: "Security Alert: Action Required"
  body: |
    A security incident affecting your account has been detected.

    Incident ID: {{ incident_id }}
    Type: {{ incident_type }}

    {{ message }}

    If you did not initiate this activity, please contact IT Security.

notify_reporter

Send status update to the incident reporter.

Parameters:

NameTypeRequiredDescription
incident_idstringYesIncident ID
statusstringYesStatus update message
include_verdictboolNoInclude AI verdict (default: false)

Output:

{
  "notification_id": "notif-def456",
  "reporter": "reporter@company.com",
  "status": "delivered"
}

escalate

Route incident to appropriate approval level.

Parameters:

NameTypeRequiredDescription
incident_idstringYesIncident ID
escalation_levelstringYesanalyst, senior, manager
reasonstringYesReason for escalation
override_assigneestringNoSpecific person to assign
custom_sla_hoursintNoCustom SLA (overrides default)
notify_channelsarrayNoAdditional channels (slack, pagerduty)

Output:

{
  "escalation_id": "esc-abc123",
  "incident_id": "INC-2024-001",
  "escalation_level": "senior",
  "assigned_to": "senior.analyst@company.com",
  "due_date": "2024-01-15T12:50:00Z",
  "priority": "high",
  "sla_hours": 2
}

Default SLAs:

LevelSLA
Analyst4 hours
Senior2 hours
Manager1 hour

create_ticket

Create ticket in external ticketing system.

Parameters:

NameTypeRequiredDescription
titlestringYesTicket title
descriptionstringYesTicket description
prioritystringNolow, medium, high, critical
assigneestringNoInitial assignee
labelsarrayNoTicket labels

Output:

{
  "ticket_id": "12345",
  "ticket_key": "SEC-1234",
  "url": "https://company.atlassian.net/browse/SEC-1234",
  "created_at": "2024-01-15T10:55:00Z"
}

log_false_positive

Record a false positive for tuning.

Parameters:

NameTypeRequiredDescription
incident_idstringYesIncident ID
reasonstringYesWhy this is a false positive
feedbackstringNoAdditional feedback for AI improvement

Output:

{
  "fp_id": "fp-abc123",
  "incident_id": "INC-2024-001",
  "recorded_at": "2024-01-15T11:00:00Z",
  "used_for_training": true
}

run_triage_agent

Trigger AI triage agent on an incident.

Parameters:

NameTypeRequiredDescription
incident_idstringYesIncident ID
playbookstringNoSpecific playbook to use
modelstringNoAI model override

Output:

{
  "triage_id": "triage-abc123",
  "incident_id": "INC-2024-001",
  "verdict": "malicious",
  "confidence": 0.92,
  "reasoning": "Multiple indicators of phishing...",
  "recommended_actions": [
    "quarantine_email",
    "block_sender",
    "notify_user"
  ],
  "completed_at": "2024-01-15T10:52:00Z"
}

Usage Examples

Escalation Playbook

name: auto_escalate
trigger:
  - verdict: malicious
  - confidence: ">= 0.9"
  - severity: critical

steps:
  - action: escalate
    parameters:
      incident_id: "{{ incident.id }}"
      escalation_level: manager
      reason: "High-confidence critical incident requiring immediate attention"
      notify_channels:
        - slack
        - pagerduty

  - action: create_ticket
    parameters:
      title: "CRITICAL: {{ incident.subject }}"
      priority: critical

CLI Examples

# Escalate to senior analyst
tw-cli action execute \
  --incident INC-2024-001 \
  --action escalate \
  --param escalation_level=senior \
  --param reason="Complex threat requiring expertise"

# Create ticket
tw-cli action execute \
  --incident INC-2024-001 \
  --action create_ticket \
  --param title="Phishing Investigation" \
  --param priority=high

# Record false positive
tw-cli action execute \
  --incident INC-2024-001 \
  --action log_false_positive \
  --param reason="Legitimate vendor communication"

Policy Engine

The policy engine controls action approval workflows and enforces security boundaries.

Overview

Every action request passes through the policy engine:

Action Request → Build Context → Evaluate Rules → Decision
                                                    ├─ Allowed → Execute
                                                    ├─ Denied → Reject
                                                    └─ RequiresApproval → Queue

Policy Decision Types

DecisionBehavior
AllowedAction executes immediately
DeniedAction rejected with reason
RequiresApprovalQueued for specified approval level

Action Context

The policy engine evaluates these attributes:

#![allow(unused)]
fn main() {
pub struct ActionContext {
    /// The action being requested
    pub action_type: String,

    /// Target of the action (host, email, user, etc.)
    pub target: String,

    /// Incident severity (if associated)
    pub severity: Option<Severity>,

    /// AI confidence score (if from triage)
    pub confidence: Option<f64>,

    /// Who/what is requesting the action
    pub proposer: Proposer,

    /// Additional context
    pub metadata: HashMap<String, Value>,
}

pub enum Proposer {
    User { id: String, role: Role },
    Agent { name: String },
    Playbook { name: String },
    System,
}
}

Default Policies

Without custom rules, these defaults apply:

Action CategoryDefault Decision
Lookup actionsAllowed
Analysis actionsAllowed
Notification actionsAllowed
Response actionsRequiresApproval (analyst)
Host containmentRequiresApproval (senior)

Next Steps

Policy Rules

Define rules to control when actions require approval.

Rule Structure

[[policy.rules]]
name = "rule_name"
description = "Human-readable description"

# Matching criteria
action = "action_name"           # Specific action
action_patterns = ["pattern_*"]  # Glob patterns

# Conditions (all must match)
severity = ["high", "critical"]  # Incident severity
confidence_min = 0.8             # Minimum AI confidence
proposer_type = "agent"          # Who's requesting
proposer_role = "analyst"        # Role (if user)

# Decision
decision = "allowed"             # or "denied" or "requires_approval"
approval_level = "senior"        # If requires_approval
reason = "Explanation"           # If denied

Rule Examples

Auto-Approve Lookups

[[policy.rules]]
name = "auto_approve_lookups"
description = "Lookup actions are always allowed"
action_patterns = ["lookup_*"]
decision = "allowed"

Require Approval for Response Actions

[[policy.rules]]
name = "response_needs_analyst"
description = "Response actions require analyst approval"
action_patterns = ["quarantine_*", "block_*"]
decision = "requires_approval"
approval_level = "analyst"

High-Severity Host Isolation

[[policy.rules]]
name = "critical_isolation_needs_manager"
description = "Critical severity host isolation requires manager"
action = "isolate_host"
severity = ["critical"]
decision = "requires_approval"
approval_level = "manager"

Block Dangerous Actions in Production

[[policy.rules]]
name = "no_delete_production"
description = "Deletion actions not allowed in production"
action_patterns = ["delete_*"]
environment = "production"
decision = "denied"
reason = "Deletion actions are not permitted in production"

Trust High-Confidence AI Decisions

[[policy.rules]]
name = "trust_high_confidence_ai"
description = "Auto-approve when AI is highly confident"
proposer_type = "agent"
confidence_min = 0.95
severity = ["low", "medium"]
action_patterns = ["quarantine_email", "block_sender"]
decision = "allowed"

Analyst Self-Service

[[policy.rules]]
name = "analyst_can_notify"
description = "Analysts can send notifications without approval"
action_patterns = ["notify_*"]
proposer_role = "analyst"
decision = "allowed"

Rule Evaluation Order

Rules are evaluated in order. First matching rule wins.

# More specific rules first
[[policy.rules]]
name = "critical_isolation"
action = "isolate_host"
severity = ["critical"]
approval_level = "manager"

# General fallback
[[policy.rules]]
name = "default_isolation"
action = "isolate_host"
approval_level = "senior"

Condition Operators

Severity Matching

severity = ["high", "critical"]  # Match any in list

Confidence Ranges

confidence_min = 0.8   # Minimum confidence
confidence_max = 0.95  # Maximum confidence

Pattern Matching

action_patterns = ["lookup_*"]        # Prefix match
action_patterns = ["*_email"]         # Suffix match
action_patterns = ["*block*"]         # Contains

Proposer Conditions

proposer_type = "user"      # user, agent, playbook, system
proposer_role = "analyst"   # Only for user proposers

Managing Rules

Via Configuration File

# config/policy.toml
tw-api --config config/policy.toml

Via API

# List rules
curl http://localhost:8080/api/policies

# Create rule
curl -X POST http://localhost:8080/api/policies \
  -H "Content-Type: application/json" \
  -d '{
    "name": "new_rule",
    "action": "isolate_host",
    "approval_level": "senior"
  }'

Via CLI

# List rules
tw-cli policy list

# Add rule
tw-cli policy add \
  --name "block_needs_approval" \
  --action "block_sender" \
  --approval-level analyst

Testing Rules

Simulate policy evaluation without executing:

tw-cli policy test \
  --action isolate_host \
  --severity critical \
  --proposer-type agent \
  --confidence 0.92

# Output:
# Decision: RequiresApproval
# Level: manager
# Matched Rule: critical_isolation_needs_manager

Approval Levels

Understanding the approval workflow in Triage Warden.

Approval Hierarchy

Manager (SOC Manager)
    │
    ▼
Senior (Senior Analyst)
    │
    ▼
Analyst (Security Analyst)
    │
    ▼
Auto (No approval needed)

Higher levels can approve actions at their level or below.

Level Definitions

LevelRoleTypical Actions
AutoSystemLookups, analysis, low-risk notifications
AnalystSecurity AnalystEmail quarantine, sender blocking
SeniorSenior AnalystHost isolation, broad blocks
ManagerSOC ManagerCritical containment, policy changes

Approval Workflow

1. Action Requested

tw-cli action execute --incident INC-001 --action isolate_host

2. Policy Evaluation

Policy engine evaluates and returns:

{
  "decision": "requires_approval",
  "approval_level": "senior",
  "reason": "Host isolation requires senior analyst approval"
}

3. Action Queued

Action stored with pending status:

{
  "action_id": "act-abc123",
  "incident_id": "INC-001",
  "action_type": "isolate_host",
  "status": "pending_approval",
  "approval_level": "senior",
  "requested_by": "analyst@company.com",
  "requested_at": "2024-01-15T10:30:00Z"
}

4. Approvers Notified

Notification sent to eligible approvers via configured channels.

5. Approval Decision

Approver reviews and decides:

Approve:

tw-cli action approve act-abc123 --comment "Verified threat"

Reject:

tw-cli action reject act-abc123 --reason "False positive, user traveling"

6. Execution or Rejection

  • Approved: Action executes automatically
  • Rejected: Action marked rejected, requester notified

Approval UI

Access pending approvals at /approvals in the web dashboard.

Features:

  • Filterable list of pending actions
  • Incident context display
  • One-click approve/reject
  • Bulk approval for related actions

SLA Tracking

Each approval level has a default SLA:

LevelDefault SLA
Analyst4 hours
Senior2 hours
Manager1 hour

Overdue approvals are:

  1. Highlighted in dashboard
  2. Re-notified to approvers
  3. Optionally escalated to next level

Delegation

Approvers can delegate when unavailable:

tw-cli approval delegate \
  --from senior.analyst@company.com \
  --to backup.analyst@company.com \
  --until 2024-01-20

Approval Groups

Configure approval groups for redundancy:

[approval_groups]
senior_analysts = [
  "alice@company.com",
  "bob@company.com",
  "charlie@company.com"
]

managers = [
  "soc.manager@company.com",
  "backup.manager@company.com"
]

Any member of the group can approve.

Audit Trail

All approval decisions are logged:

{
  "event": "action_approved",
  "action_id": "act-abc123",
  "approver": "senior.analyst@company.com",
  "decision": "approved",
  "comment": "Verified threat indicators",
  "timestamp": "2024-01-15T10:45:00Z",
  "time_to_approve": "15m"
}

Emergency Override

In emergencies, managers can bypass approval:

tw-cli action execute \
  --incident INC-001 \
  --action isolate_host \
  --emergency \
  --reason "Active ransomware, immediate containment required"

Emergency overrides are:

  • Logged with high visibility
  • Require manager credentials
  • Trigger additional notifications

AI Triage

Automated incident analysis using Claude AI agents.

Overview

The triage agent analyzes security incidents to:

  1. Classify - Determine if the incident is malicious, suspicious, or benign
  2. Assess confidence - Quantify certainty in the classification
  3. Explain - Provide reasoning for the verdict
  4. Recommend - Suggest response actions

How It Works

Incident → Playbook Selection → Tool Execution → AI Analysis → Verdict
  1. Incident received - New incident created via webhook or API
  2. Playbook selected - Based on incident type (phishing, malware, etc.)
  3. Tools executed - Parse data, lookup reputation, check authentication
  4. AI analysis - Claude analyzes gathered data
  5. Verdict returned - Classification with confidence and recommendations

Example Verdict

{
  "incident_id": "INC-2024-001",
  "classification": "malicious",
  "confidence": 0.92,
  "category": "phishing",
  "reasoning": "Multiple indicators suggest this is a credential phishing attempt:\n1. Sender domain registered 2 days ago\n2. SPF and DKIM authentication failed\n3. URL leads to a fake Microsoft login page\n4. Subject uses urgency tactics",
  "recommended_actions": [
    {
      "action": "quarantine_email",
      "priority": 1,
      "reason": "Prevent user access to phishing content"
    },
    {
      "action": "block_sender",
      "priority": 2,
      "reason": "Sender has no legitimate history"
    },
    {
      "action": "notify_user",
      "priority": 3,
      "reason": "Educate user about phishing attempt"
    }
  ],
  "iocs": [
    {"type": "domain", "value": "phishing-site.com"},
    {"type": "ip", "value": "192.168.1.100"}
  ],
  "mitre_attack": ["T1566.001", "T1078"]
}

Triggering Triage

Automatic (Webhook)

Configure webhooks to auto-triage new incidents:

webhooks:
  email_gateway:
    auto_triage: true
    playbook: phishing_triage

Manual (CLI)

tw-cli triage run --incident INC-2024-001

Manual (API)

curl -X POST http://localhost:8080/api/incidents/INC-2024-001/triage

Next Steps

Triage Agent

The AI agent that analyzes security incidents.

Architecture

┌─────────────────────────────────────────────────────────┐
│                     Triage Agent                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │   Claude    │  │   Tools     │  │  Playbook   │     │
│  │   Model     │  │   (Bridge)  │  │   Engine    │     │
│  └─────────────┘  └─────────────┘  └─────────────┘     │
└─────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                    Python Bridge                         │
│           (ThreatIntelBridge, SIEMBridge, etc.)         │
└─────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                   Rust Connectors                        │
│        (VirusTotal, Splunk, CrowdStrike, etc.)          │
└─────────────────────────────────────────────────────────┘

Agent Configuration

# python/tw_ai/agents/config.py
class AgentConfig:
    model: str = "claude-sonnet-4-20250514"
    max_tokens: int = 4096
    temperature: float = 0.1
    max_tool_calls: int = 10
    timeout_seconds: int = 120

Environment variables:

TW_AI_PROVIDER=anthropic
TW_ANTHROPIC_API_KEY=your-key
TW_AI_MODEL=claude-sonnet-4-20250514

Available Tools

The agent has access to these tools via the Python bridge:

ToolPurpose
parse_emailExtract email components
check_email_authenticationValidate SPF/DKIM/DMARC
lookup_sender_reputationQuery sender reputation
lookup_urlsCheck URL reputation
lookup_attachmentsCheck attachment hashes
search_siemQuery SIEM for related events
get_host_infoGet EDR host information

Agent Workflow

async def triage(self, incident: Incident) -> Verdict:
    # 1. Load appropriate playbook
    playbook = self.load_playbook(incident.incident_type)

    # 2. Execute playbook steps (tools)
    context = {}
    for step in playbook.steps:
        result = await self.execute_step(step, incident, context)
        context[step.output] = result

    # 3. Build analysis prompt
    prompt = self.build_analysis_prompt(incident, context)

    # 4. Get AI verdict
    response = await self.client.messages.create(
        model=self.config.model,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=self.config.max_tokens
    )

    # 5. Parse and return verdict
    return self.parse_verdict(response)

System Prompt

The agent uses a specialized system prompt:

You are an expert security analyst assistant. Analyze the provided security
incident data and determine:

1. Classification: Is this malicious, suspicious, benign, or inconclusive?
2. Confidence: How certain are you (0.0 to 1.0)?
3. Category: What type of threat is this (phishing, malware, etc.)?
4. Reasoning: Explain your analysis step by step
5. Recommended Actions: What should be done to respond?

Use the tool results provided to inform your analysis. Be thorough but concise.
Cite specific evidence for your conclusions.

Tool Calling

The agent can call tools during analysis:

# Agent decides to check URL reputation
tool_result = await self.call_tool(
    name="lookup_urls",
    parameters={"urls": ["https://suspicious-site.com/login"]}
)

# Result used in analysis
# {
#   "results": [{
#     "url": "https://suspicious-site.com/login",
#     "malicious": true,
#     "categories": ["phishing"],
#     "confidence": 0.95
#   }]
# }

Customizing the Agent

Custom System Prompt

agent = TriageAgent(
    system_prompt="""
    You are a SOC analyst specializing in email security.
    Focus on phishing indicators and BEC patterns.
    Always check sender authentication carefully.
    """
)

Custom Tools

Register additional tools:

@agent.tool
async def custom_lookup(domain: str) -> dict:
    """Look up domain in internal threat database."""
    return await internal_db.query(domain)

Model Selection

# Use different models for different scenarios
if incident.severity == "critical":
    agent = TriageAgent(model="claude-opus-4-20250514")
else:
    agent = TriageAgent(model="claude-sonnet-4-20250514")

Error Handling

The agent handles failures gracefully:

try:
    verdict = await agent.triage(incident)
except ToolError as e:
    # Tool failed - continue with available data
    verdict = await agent.triage_partial(incident, failed_tools=[e.tool])
except AIError as e:
    # AI call failed - return inconclusive
    verdict = Verdict.inconclusive(reason=str(e))

Metrics

Agent metrics exported to Prometheus:

  • triage_duration_seconds - Time to complete triage
  • triage_tool_calls_total - Tool calls per triage
  • triage_verdict_total - Verdicts by classification
  • triage_confidence_histogram - Confidence score distribution

Verdict Types

Understanding the classification outcomes from AI triage.

Classifications

ClassificationDescriptionTypical Response
MaliciousConfirmed threatImmediate containment
SuspiciousLikely threat, needs investigationQueue for analyst review
BenignNot a threatClose or archive
InconclusiveInsufficient dataRequest more information

Malicious

The incident is a confirmed security threat.

Criteria:

  • Multiple strong threat indicators
  • High-confidence threat intelligence matches
  • Clear malicious intent (credential theft, malware, etc.)

Example:

{
  "classification": "malicious",
  "confidence": 0.95,
  "category": "phishing",
  "reasoning": "Email contains credential phishing page targeting Microsoft 365. Sender domain registered yesterday, fails all email authentication. URL redirects to fake login mimicking Microsoft branding."
}

Response:

  • Execute recommended containment actions
  • Create incident ticket
  • Notify affected users

Suspicious

The incident shows concerning indicators but lacks definitive proof.

Criteria:

  • Some threat indicators present
  • Mixed or conflicting signals
  • Unusual but not clearly malicious behavior

Example:

{
  "classification": "suspicious",
  "confidence": 0.65,
  "category": "potential_phishing",
  "reasoning": "Email sender is unknown but domain is 6 months old with valid authentication. URL leads to legitimate document sharing service but file name uses urgency tactics. Recipient has not received email from this sender before."
}

Response:

  • Queue for analyst review
  • Gather additional context
  • Consider temporary quarantine pending review

Benign

The incident is not a security threat.

Criteria:

  • No threat indicators found
  • Known good sender/source
  • Normal expected behavior

Example:

{
  "classification": "benign",
  "confidence": 0.92,
  "category": "legitimate_email",
  "reasoning": "Email from known vendor with established sending history. All authentication passes. Attachment is a standard invoice PDF matching expected format. No suspicious URLs or indicators."
}

Response:

  • Close incident
  • Release from quarantine if held
  • Update detection rules if false positive

Inconclusive

Insufficient data to make a determination.

Criteria:

  • Missing critical information
  • Tool failures preventing analysis
  • Conflicting strong indicators

Example:

{
  "classification": "inconclusive",
  "confidence": 0.3,
  "category": "unknown",
  "reasoning": "Unable to analyze attachment - file corrupted. Sender reputation service unavailable. Email authentication results are mixed (SPF pass, DKIM fail). Need manual review of attachment content.",
  "missing_data": [
    "attachment_analysis",
    "sender_reputation"
  ]
}

Response:

  • Escalate to analyst
  • Retry failed tool calls
  • Request additional information

Confidence Scores

Confidence ranges and their meaning:

RangeInterpretation
0.9 - 1.0Very high confidence, clear evidence
0.7 - 0.9High confidence, strong indicators
0.5 - 0.7Moderate confidence, mixed signals
0.3 - 0.5Low confidence, limited evidence
0.0 - 0.3Very low confidence, insufficient data

Category Types

Email Threats

CategoryDescription
phishingCredential theft attempt
spear_phishingTargeted phishing
becBusiness email compromise
malware_deliveryMalicious attachment/link
spamUnsolicited bulk email

Endpoint Threats

CategoryDescription
malwareMalicious software detected
ransomwareRansomware activity
cryptominerCryptocurrency mining
ratRemote access trojan
pupPotentially unwanted program

Access Threats

CategoryDescription
brute_forcePassword guessing attempt
credential_stuffingLeaked credential use
impossible_travelGeographically impossible login
account_takeoverCompromised account

Using Verdicts

Automation Rules

# Auto-respond to high-confidence malicious
- trigger:
    classification: malicious
    confidence: ">= 0.9"
  actions:
    - quarantine_email
    - block_sender
    - create_ticket

# Queue suspicious for review
- trigger:
    classification: suspicious
  actions:
    - escalate:
        level: analyst
        reason: "Suspicious activity requires review"

Metrics

Track verdict distribution:

# Verdict counts by classification
sum by (classification) (triage_verdict_total)

# Average confidence by category
avg by (category) (triage_confidence)

Confidence Scoring

How the AI agent determines confidence in its verdicts.

Confidence Factors

The agent considers multiple factors when calculating confidence:

Evidence Quality

FactorImpact
Threat intel match (high confidence)+0.3
Threat intel match (low confidence)+0.1
Authentication failure+0.2
Known malicious indicator+0.3
Suspicious pattern+0.1

Evidence Quantity

IndicatorsConfidence Boost
1 indicatorBase
2-3 indicators+0.1
4-5 indicators+0.2
6+ indicators+0.3

Data Completeness

Missing DataConfidence Penalty
None0
Minor (sender reputation)-0.1
Moderate (attachment analysis)-0.2
Major (multiple tools failed)-0.3

Calculation Example

Phishing Email Analysis:

Base confidence: 0.5

Evidence found:
+ SPF failed: +0.15
+ DKIM failed: +0.15
+ Sender domain < 7 days old: +0.2
+ URL matches phishing pattern: +0.25
+ VirusTotal flags URL as phishing: +0.2

Evidence count (5): +0.2

Data completeness: All tools succeeded: +0

Final confidence: 0.5 + 0.15 + 0.15 + 0.2 + 0.25 + 0.2 + 0.2 = 1.0 (capped at 0.99)

Verdict: malicious, confidence: 0.99

Confidence Thresholds

Policy decisions use confidence thresholds:

# Auto-quarantine high confidence malicious
[[policy.rules]]
name = "auto_quarantine_confident"
classification = "malicious"
confidence_min = 0.9
action = "quarantine_email"
decision = "allowed"

# Require review for lower confidence
[[policy.rules]]
name = "review_uncertain"
confidence_max = 0.7
decision = "requires_approval"
approval_level = "analyst"

Confidence Calibration

The agent is calibrated so confidence correlates with accuracy:

Stated ConfidenceExpected Accuracy
0.9~90% of verdicts correct
0.8~80% of verdicts correct
0.7~70% of verdicts correct

Monitoring Calibration

Track calibration with metrics:

# Accuracy at confidence level
triage_accuracy_by_confidence{confidence_bucket="0.9-1.0"}

Improving Calibration

  1. Feedback loop - Log false positives to improve
  2. Periodic review - Sample low-confidence verdicts
  3. Model updates - Retrain with corrected examples

Handling Low Confidence

When confidence is low:

Option 1: Escalate

- condition: confidence < 0.6
  action: escalate
  parameters:
    level: analyst
    reason: "Low confidence verdict requires human review"

Option 2: Gather More Data

- condition: confidence < 0.6
  action: request_additional_data
  parameters:
    - "sender_history"
    - "recipient_context"

Option 3: Conservative Default

- condition: confidence < 0.6
  action: quarantine_email
  parameters:
    reason: "Quarantined pending review due to uncertainty"

Confidence in UI

Dashboard displays confidence visually:

ConfidenceDisplay
0.9+Green badge, "High Confidence"
0.7-0.9Yellow badge, "Moderate Confidence"
0.5-0.7Orange badge, "Low Confidence"
<0.5Red badge, "Very Low Confidence"

Improving Confidence

Actions that help the agent be more confident:

  1. Complete data - Ensure all tools succeed
  2. Rich context - Provide incident metadata
  3. Historical data - Include past incidents with similar patterns
  4. Clear playbooks - Well-defined analysis steps

Playbooks

Playbooks define automated investigation and response workflows.

Overview

A playbook is a sequence of steps that:

  1. Gather and analyze incident data
  2. Enrich with threat intelligence
  3. Determine verdict and response
  4. Execute approved actions

Playbook Structure

name: phishing_triage
description: Automated phishing email analysis
version: "1.0"

# When this playbook applies
triggers:
  incident_type: phishing
  auto_run: true

# Variables available to steps
variables:
  quarantine_threshold: 0.7
  block_threshold: 0.3

# Execution steps
steps:
  - name: Parse Email
    action: parse_email
    parameters:
      raw_email: "{{ incident.raw_data.raw_email }}"
    output: parsed

  - name: Check Authentication
    action: check_email_authentication
    parameters:
      headers: "{{ parsed.headers }}"
    output: auth

  - name: Check Sender
    action: lookup_sender_reputation
    parameters:
      sender: "{{ parsed.sender }}"
    output: sender_rep

  - name: Check URLs
    action: lookup_urls
    parameters:
      urls: "{{ parsed.urls }}"
    output: url_results
    condition: "{{ parsed.urls | length > 0 }}"

  - name: Quarantine if Malicious
    action: quarantine_email
    parameters:
      message_id: "{{ incident.raw_data.message_id }}"
      reason: "Automated quarantine - phishing detected"
    condition: >
      sender_rep.score < variables.quarantine_threshold or
      url_results.malicious_count > 0 or
      not auth.authentication_passed

# Final verdict generation
verdict:
  use_ai: true
  model: claude-sonnet-4-20250514
  context:
    - parsed
    - auth
    - sender_rep
    - url_results

Triggers

Define when a playbook runs:

triggers:
  # Run for specific incident types
  incident_type: phishing

  # Auto-run on incident creation
  auto_run: true

  # Or require manual trigger
  auto_run: false

  # Conditions
  conditions:
    severity: ["medium", "high", "critical"]
    source: "email_gateway"

Steps

Basic Step

- name: Step Name
  action: action_name
  parameters:
    key: value
  output: variable_name

Conditional Step

- name: Block Known Bad
  action: block_sender
  parameters:
    sender: "{{ parsed.sender }}"
  condition: "{{ sender_rep.score < 0.2 }}"

Parallel Steps

- parallel:
    - action: lookup_urls
      parameters:
        urls: "{{ parsed.urls }}"
      output: url_results

    - action: lookup_attachments
      parameters:
        attachments: "{{ parsed.attachments }}"
      output: attachment_results

Loop Steps

- name: Check Each URL
  loop: "{{ parsed.urls }}"
  action: lookup_url
  parameters:
    url: "{{ item }}"
  output: url_results
  aggregate: list

Variables

Built-in Variables

VariableDescription
incidentThe incident being processed
incident.idIncident ID
incident.raw_dataOriginal incident data
incident.severityIncident severity
variablesPlaybook-defined variables

Step Outputs

Each step's output is available to subsequent steps:

- action: parse_email
  output: parsed

- action: lookup_urls
  parameters:
    urls: "{{ parsed.urls }}"  # Use previous output

Templates

Use Jinja2-style templates:

parameters:
  message: "Alert for {{ incident.id }}: {{ parsed.subject }}"
  priority: "{{ 'high' if incident.severity == 'critical' else 'medium' }}"

Next Steps

Creating Playbooks

Guide to writing custom playbooks for your security workflows.

Getting Started

1. Create Playbook File

mkdir -p playbooks
touch playbooks/my_playbook.yaml

2. Define Basic Structure

name: my_playbook
description: Description of what this playbook does
version: "1.0"

triggers:
  incident_type: phishing
  auto_run: true

steps:
  - name: First Step
    action: parse_email
    output: result

3. Register Playbook

tw-cli playbook add playbooks/my_playbook.yaml

Step Types

Action Step

Execute a registered action:

- name: Parse Email Content
  action: parse_email
  parameters:
    raw_email: "{{ incident.raw_data.raw_email }}"
  output: parsed
  on_error: continue  # or "fail" (default)

Condition Step

Branch based on conditions:

- name: Check if High Risk
  condition: "{{ sender_rep.score < 0.3 }}"
  then:
    - action: quarantine_email
      parameters:
        message_id: "{{ incident.raw_data.message_id }}"
  else:
    - action: log_event
      parameters:
        message: "Low risk, no action needed"

AI Analysis Step

Get AI verdict:

- name: AI Analysis
  type: ai_analysis
  model: claude-sonnet-4-20250514
  context:
    - parsed
    - auth_results
    - reputation
  prompt: |
    Analyze this email for phishing indicators.
    Consider the authentication results and sender reputation.
  output: ai_verdict

Notification Step

Send alerts:

- name: Alert Team
  action: notify_channel
  parameters:
    channel: slack
    message: |
      New {{ incident.severity }} incident detected
      ID: {{ incident.id }}
      Type: {{ incident.incident_type }}

Error Handling

Per-Step Error Handling

- name: Check Reputation
  action: lookup_sender_reputation
  parameters:
    sender: "{{ parsed.sender }}"
  output: reputation
  on_error: continue  # Don't fail playbook if this fails
  default_output:     # Use this if step fails
    score: 0.5
    risk_level: "unknown"

Global Error Handler

on_error:
  - action: notify_channel
    parameters:
      channel: slack
      message: "Playbook {{ playbook.name }} failed: {{ error.message }}"
  - action: escalate
    parameters:
      level: analyst
      reason: "Automated triage failed"

Variables and Templates

Define Variables

variables:
  high_risk_threshold: 0.3
  quarantine_enabled: true
  notification_channel: "#security-alerts"

Use Variables

- name: Check Risk
  condition: "{{ sender_rep.score < variables.high_risk_threshold }}"
  then:
    - action: quarantine_email
      condition: "{{ variables.quarantine_enabled }}"

Template Functions

parameters:
  # String manipulation
  domain: "{{ parsed.sender | split('@') | last }}"

  # Conditionals
  priority: "{{ 'critical' if incident.severity == 'critical' else 'high' }}"

  # Lists
  all_urls: "{{ parsed.urls | join(', ') }}"
  url_count: "{{ parsed.urls | length }}"

  # Defaults
  assignee: "{{ incident.assignee | default('unassigned') }}"

Testing Playbooks

Dry Run

tw-cli playbook test my_playbook \
  --incident INC-2024-001 \
  --dry-run

With Mock Data

tw-cli playbook test my_playbook \
  --data '{"raw_email": "From: test@example.com..."}'

Validate Syntax

tw-cli playbook validate playbooks/my_playbook.yaml

Best Practices

1. Use Descriptive Names

# Good
- name: Check sender domain reputation

# Bad
- name: step1

2. Handle Failures Gracefully

- name: External Lookup
  action: lookup_sender_reputation
  on_error: continue
  default_output:
    score: 0.5

3. Add Timeouts

- name: Slow External API
  action: custom_lookup
  timeout: 30s

4. Log Key Decisions

- name: Log Verdict
  action: log_event
  parameters:
    level: info
    message: "Verdict: {{ verdict.classification }} ({{ verdict.confidence }})"

5. Version Your Playbooks

name: phishing_triage
version: "2.1.0"
changelog:
  - "2.1.0: Added attachment analysis"
  - "2.0.0: Restructured for parallel lookups"

Example: Complete Playbook

name: comprehensive_phishing_triage
description: Full phishing email analysis with all checks
version: "2.0"

triggers:
  incident_type: phishing
  auto_run: true

variables:
  quarantine_threshold: 0.3
  block_threshold: 0.2

steps:
  # Parse email
  - name: Parse Email
    action: parse_email
    parameters:
      raw_email: "{{ incident.raw_data.raw_email }}"
    output: parsed

  # Parallel enrichment
  - name: Enrich Data
    parallel:
      - action: check_email_authentication
        parameters:
          headers: "{{ parsed.headers }}"
        output: auth

      - action: lookup_sender_reputation
        parameters:
          sender: "{{ parsed.sender }}"
        output: sender_rep

      - action: lookup_urls
        parameters:
          urls: "{{ parsed.urls }}"
        output: urls
        condition: "{{ parsed.urls | length > 0 }}"

      - action: lookup_attachments
        parameters:
          attachments: "{{ parsed.attachments }}"
        output: attachments
        condition: "{{ parsed.attachments | length > 0 }}"

  # AI Analysis
  - name: AI Verdict
    type: ai_analysis
    model: claude-sonnet-4-20250514
    context: [parsed, auth, sender_rep, urls, attachments]
    output: verdict

  # Response actions
  - name: Quarantine Malicious
    action: quarantine_email
    parameters:
      message_id: "{{ incident.raw_data.message_id }}"
    condition: >
      verdict.classification == 'malicious' and
      verdict.confidence >= variables.quarantine_threshold

  - name: Block Repeat Offender
    action: block_sender
    parameters:
      sender: "{{ parsed.sender }}"
    condition: >
      sender_rep.score < variables.block_threshold

  - name: Create Ticket
    action: create_ticket
    parameters:
      title: "{{ verdict.classification | title }}: {{ parsed.subject | truncate(50) }}"
      priority: "{{ incident.severity }}"
    condition: "{{ verdict.classification != 'benign' }}"

on_error:
  - action: escalate
    parameters:
      level: analyst
      reason: "Playbook execution failed"

Built-in Playbooks

Ready-to-use playbooks included with Triage Warden.

Email Security

phishing_triage

Comprehensive phishing email analysis.

Triggers: incident_type: phishing

Steps:

  1. Parse email headers and body
  2. Check SPF/DKIM/DMARC authentication
  3. Look up sender reputation
  4. Analyze URLs against threat intel
  5. Check attachment hashes
  6. AI analysis and verdict
  7. Auto-quarantine if malicious (confidence > 0.8)

Usage:

tw-cli playbook run phishing_triage --incident INC-2024-001

spam_triage

Quick spam classification.

Triggers: incident_type: spam

Steps:

  1. Parse email
  2. Check spam indicators (bulk headers, suspicious patterns)
  3. Classify as spam/not spam
  4. Auto-archive low-confidence spam

bec_detection

Business Email Compromise detection.

Triggers: incident_type: bec

Steps:

  1. Parse email
  2. Check for executive impersonation
  3. Analyze reply-to mismatch
  4. Check for urgency indicators
  5. Verify sender against directory
  6. AI analysis for social engineering patterns

Endpoint Security

malware_triage

Malware alert analysis.

Triggers: incident_type: malware

Steps:

  1. Get host information from EDR
  2. Look up file hash
  3. Check related processes
  4. Query SIEM for lateral movement
  5. AI verdict
  6. Auto-isolate if critical severity + high confidence

suspicious_login

Anomalous login investigation.

Triggers: incident_type: suspicious_login

Steps:

  1. Get login details
  2. Check for impossible travel
  3. Query user's recent activity
  4. Check IP reputation
  5. Verify device fingerprint
  6. AI analysis

Customizing Built-in Playbooks

Override Variables

tw-cli playbook run phishing_triage \
  --incident INC-2024-001 \
  --var quarantine_threshold=0.9 \
  --var auto_block=false

Fork and Modify

# Export built-in playbook
tw-cli playbook export phishing_triage > my_phishing.yaml

# Edit as needed
vim my_phishing.yaml

# Register custom version
tw-cli playbook add my_phishing.yaml

Extend with Hooks

# my_phishing.yaml
extends: phishing_triage

# Add steps after parent playbook
after_steps:
  - name: Custom Logging
    action: log_to_siem
    parameters:
      event: phishing_verdict
      data: "{{ verdict }}"

# Override variables
variables:
  quarantine_threshold: 0.85

Playbook Comparison

PlaybookAI UsedAuto-ResponseTypical Duration
phishing_triageYesQuarantine, Block30-60s
spam_triageNoArchive5-10s
bec_detectionYesEscalate45-90s
malware_triageYesIsolate60-120s
suspicious_loginYesLock account30-60s

Monitoring Playbooks

Execution Metrics

# Playbook execution count
sum by (playbook) (playbook_executions_total)

# Average duration
avg by (playbook) (playbook_duration_seconds)

# Success rate
sum(playbook_executions_total{status="success"}) /
sum(playbook_executions_total)

Alerts

# Alert on playbook failures
- alert: PlaybookFailureRate
  expr: |
    sum(rate(playbook_executions_total{status="failed"}[5m])) /
    sum(rate(playbook_executions_total[5m])) > 0.1
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Playbook failure rate above 10%"

REST API

Programmatic access to Triage Warden functionality.

Base URL

http://localhost:8080/api

Authentication

See Authentication for details.

API Key

curl -H "Authorization: Bearer tw_abc123_secretkey" \
  http://localhost:8080/api/incidents

For browser-based access, use session authentication via /login.

Response Format

All responses are JSON:

{
  "data": { ... },
  "meta": {
    "page": 1,
    "per_page": 20,
    "total": 150
  }
}

Error Responses

{
  "error": {
    "code": "not_found",
    "message": "Incident not found",
    "details": { ... }
  }
}

HTTP Status Codes

CodeMeaning
200Success
201Created
400Bad Request
401Unauthorized
403Forbidden
404Not Found
422Validation Error
429Rate Limited
500Server Error

Endpoints Overview

Incidents

MethodPathDescription
GET/incidentsList incidents
POST/incidentsCreate incident
GET/incidents/:idGet incident
PUT/incidents/:idUpdate incident
DELETE/incidents/:idDelete incident
POST/incidents/:id/triageRun triage
POST/incidents/:id/actionsExecute action

Actions

MethodPathDescription
GET/actionsList actions
GET/actions/:idGet action
POST/actions/:id/approveApprove action
POST/actions/:id/rejectReject action

Playbooks

MethodPathDescription
GET/playbooksList playbooks
POST/playbooksCreate playbook
GET/playbooks/:idGet playbook
PUT/playbooks/:idUpdate playbook
DELETE/playbooks/:idDelete playbook
POST/playbooks/:id/runRun playbook

Webhooks

MethodPathDescription
POST/webhooks/:sourceReceive webhook

System

MethodPathDescription
GET/healthHealth check
GET/metricsPrometheus metrics
GET/connectors/healthConnector status

Pagination

List endpoints support pagination:

curl "http://localhost:8080/api/incidents?page=2&per_page=50"

Parameters:

  • page - Page number (default: 1)
  • per_page - Items per page (default: 20, max: 100)

Filtering

Filter list results:

curl "http://localhost:8080/api/incidents?status=open&severity=high"

Common filters:

  • status - Filter by status
  • severity - Filter by severity
  • type - Filter by incident type
  • created_after - Created after date
  • created_before - Created before date

Sorting

curl "http://localhost:8080/api/incidents?sort=-created_at"
  • Prefix with - for descending order
  • Default: -created_at (newest first)

Rate Limiting

API requests are rate limited:

EndpointLimit
Read operations100/min
Write operations20/min
Triage requests10/min

Rate limit headers:

X-RateLimit-Limit: 100
X-RateLimit-Remaining: 95
X-RateLimit-Reset: 1705320000

Next Steps

API Authentication

Authenticate with the Triage Warden API.

API Keys

Creating an API Key

# Via CLI
tw-cli api-key create --name "automation-script" --scopes read,write

# Output:
# API Key created successfully
# Key: tw_abc123_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# WARNING: Store this key securely. It cannot be retrieved again.

Using API Keys

Include in the Authorization header:

curl -H "Authorization: Bearer tw_abc123_secretkey" \
  http://localhost:8080/api/incidents

API Key Scopes

ScopePermissions
readRead incidents, actions, playbooks
writeCreate/update incidents, execute actions
adminUser management, system configuration

Managing API Keys

# List keys
tw-cli api-key list

# Revoke key
tw-cli api-key revoke tw_abc123

# Rotate key
tw-cli api-key rotate tw_abc123

Session Authentication

For web dashboard access:

Login

curl -X POST http://localhost:8080/login \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "username=analyst&password=secret&csrf_token=xxx" \
  -c cookies.txt

Using Session

curl -b cookies.txt http://localhost:8080/api/incidents

Logout

curl -X POST http://localhost:8080/logout -b cookies.txt

CSRF Protection

State-changing requests require CSRF tokens:

  1. Get token from login page or API
  2. Include in request header or body
# Header
curl -X POST http://localhost:8080/api/incidents \
  -H "X-CSRF-Token: abc123" \
  -b cookies.txt \
  -d '{"type": "phishing"}'

# Form body
curl -X POST http://localhost:8080/api/incidents \
  -d "csrf_token=abc123&type=phishing" \
  -b cookies.txt

Webhook Authentication

Webhooks use HMAC signatures:

Configuring Webhook Secret

tw-cli webhook add email-gateway \
  --url http://localhost:8080/api/webhooks/email-gateway \
  --secret "your-secret-key"

Verifying Signatures

Triage Warden validates the X-Webhook-Signature header:

X-Webhook-Signature: sha256=abc123...

Signature is computed as:

HMAC-SHA256(secret, timestamp + "." + body)

Signature Verification Example

import hmac
import hashlib

def verify_signature(payload: bytes, signature: str, secret: str, timestamp: str) -> bool:
    expected = hmac.new(
        secret.encode(),
        f"{timestamp}.{payload.decode()}".encode(),
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)

Service Accounts

For automated systems:

# Create service account
tw-cli user create \
  --username automation-bot \
  --role analyst \
  --service-account

# Generate API key for service account
tw-cli api-key create \
  --user automation-bot \
  --name "ci-cd-integration" \
  --scopes read,write

Security Best Practices

  1. Rotate keys regularly - Set up automated rotation
  2. Use minimal scopes - Only grant necessary permissions
  3. Secure storage - Use secret managers, not code
  4. Monitor usage - Review audit logs for suspicious activity
  5. IP allowlisting - Restrict API access by IP (optional)
# Enable IP allowlist
tw-cli config set api.allowed_ips "10.0.0.0/8,192.168.1.0/24"

Error Responses

401 Unauthorized

Missing or invalid credentials:

{
  "error": {
    "code": "unauthorized",
    "message": "Invalid or missing authentication"
  }
}

403 Forbidden

Valid credentials but insufficient permissions:

{
  "error": {
    "code": "forbidden",
    "message": "Insufficient permissions for this operation"
  }
}

Incidents API

Create, read, update, and manage security incidents.

List Incidents

GET /api/incidents

Query Parameters

ParameterTypeDescription
statusstringFilter by status (open, triaged, resolved)
severitystringFilter by severity (low, medium, high, critical)
typestringFilter by incident type
created_afterdatetimeCreated after timestamp
created_beforedatetimeCreated before timestamp
pageintegerPage number
per_pageintegerItems per page
sortstringSort field (prefix - for desc)

Example

curl "http://localhost:8080/api/incidents?status=open&severity=high&per_page=10" \
  -H "Authorization: Bearer tw_xxx"

Response

{
  "data": [
    {
      "id": "550e8400-e29b-41d4-a716-446655440000",
      "incident_number": "INC-2024-0001",
      "incident_type": "phishing",
      "severity": "high",
      "status": "open",
      "source": "email_gateway",
      "created_at": "2024-01-15T10:30:00Z",
      "updated_at": "2024-01-15T10:30:00Z"
    }
  ],
  "meta": {
    "page": 1,
    "per_page": 10,
    "total": 42
  }
}

Get Incident

GET /api/incidents/:id

Example

curl "http://localhost:8080/api/incidents/550e8400-e29b-41d4-a716-446655440000" \
  -H "Authorization: Bearer tw_xxx"

Response

{
  "data": {
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "incident_number": "INC-2024-0001",
    "incident_type": "phishing",
    "severity": "high",
    "status": "triaged",
    "source": "email_gateway",
    "raw_data": {
      "message_id": "AAMkAGI2...",
      "sender": "phisher@malicious.com",
      "subject": "Urgent: Update Account"
    },
    "verdict": {
      "classification": "malicious",
      "confidence": 0.92,
      "category": "phishing",
      "reasoning": "Multiple phishing indicators..."
    },
    "recommended_actions": [
      "quarantine_email",
      "block_sender"
    ],
    "created_at": "2024-01-15T10:30:00Z",
    "updated_at": "2024-01-15T10:35:00Z",
    "triaged_at": "2024-01-15T10:35:00Z"
  }
}

Create Incident

POST /api/incidents

Request Body

{
  "incident_type": "phishing",
  "source": "email_gateway",
  "severity": "medium",
  "raw_data": {
    "message_id": "AAMkAGI2...",
    "sender": "unknown@domain.com",
    "recipient": "employee@company.com",
    "subject": "Important Document",
    "received_at": "2024-01-15T10:00:00Z"
  }
}

Example

curl -X POST "http://localhost:8080/api/incidents" \
  -H "Authorization: Bearer tw_xxx" \
  -H "Content-Type: application/json" \
  -d '{
    "incident_type": "phishing",
    "source": "email_gateway",
    "severity": "medium",
    "raw_data": {...}
  }'

Response

{
  "data": {
    "id": "550e8400-e29b-41d4-a716-446655440001",
    "incident_number": "INC-2024-0002",
    "status": "open",
    "created_at": "2024-01-15T11:00:00Z"
  }
}

Update Incident

PUT /api/incidents/:id

Request Body

{
  "severity": "high",
  "status": "resolved",
  "resolution": "False positive - legitimate vendor email"
}

Delete Incident

DELETE /api/incidents/:id

Note: Requires admin role.

Run Triage

POST /api/incidents/:id/triage

Trigger AI triage on an incident.

Request Body (Optional)

{
  "playbook": "custom_phishing",
  "force": true
}

Response

{
  "data": {
    "triage_id": "triage-abc123",
    "status": "completed",
    "verdict": {
      "classification": "malicious",
      "confidence": 0.92
    },
    "duration_ms": 45000
  }
}

Execute Action

POST /api/incidents/:id/actions

Execute an action on an incident.

Request Body

{
  "action": "quarantine_email",
  "parameters": {
    "message_id": "AAMkAGI2...",
    "reason": "Phishing detected"
  }
}

Response (Immediate Execution)

{
  "data": {
    "action_id": "act-abc123",
    "status": "completed",
    "result": {
      "success": true,
      "message": "Email quarantined successfully"
    }
  }
}

Response (Pending Approval)

{
  "data": {
    "action_id": "act-abc123",
    "status": "pending_approval",
    "approval_level": "senior",
    "message": "Action requires senior analyst approval"
  }
}

Get Incident Actions

GET /api/incidents/:id/actions

List all actions for an incident.

Response

{
  "data": [
    {
      "id": "act-abc123",
      "action_type": "quarantine_email",
      "status": "completed",
      "executed_at": "2024-01-15T10:40:00Z",
      "executed_by": "system"
    },
    {
      "id": "act-def456",
      "action_type": "block_sender",
      "status": "pending_approval",
      "approval_level": "analyst",
      "requested_at": "2024-01-15T10:41:00Z"
    }
  ]
}

Actions API

Manage action execution and approvals.

List Actions

GET /api/actions

Query Parameters

ParameterTypeDescription
statusstringpending, pending_approval, completed, failed
action_typestringFilter by action type
incident_iduuidFilter by incident
approval_levelstringanalyst, senior, manager

Example

curl "http://localhost:8080/api/actions?status=pending_approval" \
  -H "Authorization: Bearer tw_xxx"

Response

{
  "data": [
    {
      "id": "act-abc123",
      "incident_id": "550e8400-e29b-41d4-a716-446655440000",
      "action_type": "isolate_host",
      "status": "pending_approval",
      "approval_level": "senior",
      "parameters": {
        "host_id": "aid:xyz789",
        "reason": "Malware detected"
      },
      "requested_by": "triage_agent",
      "requested_at": "2024-01-15T10:45:00Z"
    }
  ]
}

Get Action

GET /api/actions/:id

Response

{
  "data": {
    "id": "act-abc123",
    "incident_id": "550e8400-e29b-41d4-a716-446655440000",
    "action_type": "isolate_host",
    "status": "pending_approval",
    "approval_level": "senior",
    "parameters": {
      "host_id": "aid:xyz789",
      "reason": "Malware detected"
    },
    "requested_by": "triage_agent",
    "requested_at": "2024-01-15T10:45:00Z",
    "incident": {
      "incident_number": "INC-2024-0001",
      "incident_type": "malware",
      "severity": "high"
    }
  }
}

Approve Action

POST /api/actions/:id/approve

Request Body

{
  "comment": "Verified threat, approved for isolation"
}

Response

{
  "data": {
    "id": "act-abc123",
    "status": "completed",
    "approved_by": "senior.analyst@company.com",
    "approved_at": "2024-01-15T11:00:00Z",
    "result": {
      "success": true,
      "message": "Host isolated successfully"
    }
  }
}

Errors

403 Forbidden - Insufficient approval level:

{
  "error": {
    "code": "insufficient_approval_level",
    "message": "This action requires senior analyst approval",
    "required_level": "senior",
    "your_level": "analyst"
  }
}

Reject Action

POST /api/actions/:id/reject

Request Body

{
  "reason": "False positive - user confirmed legitimate activity"
}

Response

{
  "data": {
    "id": "act-abc123",
    "status": "rejected",
    "rejected_by": "senior.analyst@company.com",
    "rejected_at": "2024-01-15T11:00:00Z",
    "rejection_reason": "False positive - user confirmed legitimate activity"
  }
}

Execute Action Directly

POST /api/actions/execute

Execute an action without associating with an incident.

Request Body

{
  "action": "block_sender",
  "parameters": {
    "sender": "spammer@malicious.com"
  }
}

Response

{
  "data": {
    "action_id": "act-ghi789",
    "status": "completed",
    "result": {
      "success": true,
      "message": "Sender blocked"
    }
  }
}

Get Action Types

GET /api/actions/types

List all available action types.

Response

{
  "data": [
    {
      "name": "quarantine_email",
      "description": "Move email to quarantine",
      "category": "email",
      "supports_rollback": true,
      "parameters": [
        {
          "name": "message_id",
          "type": "string",
          "required": true
        },
        {
          "name": "reason",
          "type": "string",
          "required": false
        }
      ]
    },
    {
      "name": "isolate_host",
      "description": "Network-isolate a host",
      "category": "endpoint",
      "supports_rollback": true,
      "default_approval_level": "senior",
      "parameters": [...]
    }
  ]
}

Rollback Action

POST /api/actions/:id/rollback

Rollback a previously executed action.

Request Body

{
  "reason": "False positive confirmed"
}

Response

{
  "data": {
    "rollback_action_id": "act-jkl012",
    "original_action_id": "act-abc123",
    "status": "completed",
    "result": {
      "success": true,
      "message": "Host unisolated successfully"
    }
  }
}

Errors

400 Bad Request - Action doesn't support rollback:

{
  "error": {
    "code": "rollback_not_supported",
    "message": "Action type 'notify_user' does not support rollback"
  }
}

Playbooks API

Manage and execute playbooks.

List Playbooks

GET /api/playbooks

Response

{
  "data": [
    {
      "id": "pb-abc123",
      "name": "phishing_triage",
      "description": "Automated phishing email analysis",
      "version": "2.0",
      "enabled": true,
      "triggers": {
        "incident_type": "phishing",
        "auto_run": true
      },
      "created_at": "2024-01-01T00:00:00Z",
      "updated_at": "2024-01-10T00:00:00Z"
    }
  ]
}

Get Playbook

GET /api/playbooks/:id

Response

{
  "data": {
    "id": "pb-abc123",
    "name": "phishing_triage",
    "description": "Automated phishing email analysis",
    "version": "2.0",
    "enabled": true,
    "triggers": {
      "incident_type": "phishing",
      "auto_run": true
    },
    "variables": {
      "quarantine_threshold": 0.7
    },
    "steps": [
      {
        "name": "Parse Email",
        "action": "parse_email",
        "parameters": {
          "raw_email": "{{ incident.raw_data.raw_email }}"
        },
        "output": "parsed"
      }
    ],
    "created_at": "2024-01-01T00:00:00Z",
    "updated_at": "2024-01-10T00:00:00Z"
  }
}

Create Playbook

POST /api/playbooks

Request Body

{
  "name": "custom_playbook",
  "description": "My custom investigation playbook",
  "triggers": {
    "incident_type": "phishing",
    "auto_run": false
  },
  "steps": [
    {
      "name": "Parse Email",
      "action": "parse_email",
      "output": "parsed"
    }
  ]
}

Response

{
  "data": {
    "id": "pb-def456",
    "name": "custom_playbook",
    "version": "1.0",
    "created_at": "2024-01-15T12:00:00Z"
  }
}

Update Playbook

PUT /api/playbooks/:id

Request Body

{
  "description": "Updated description",
  "enabled": false
}

Delete Playbook

DELETE /api/playbooks/:id

Note: Built-in playbooks cannot be deleted.

Run Playbook

POST /api/playbooks/:id/run

Execute a playbook on an incident.

Request Body

{
  "incident_id": "550e8400-e29b-41d4-a716-446655440000",
  "variables": {
    "quarantine_threshold": 0.9
  }
}

Response

{
  "data": {
    "execution_id": "exec-abc123",
    "playbook_id": "pb-abc123",
    "incident_id": "550e8400-e29b-41d4-a716-446655440000",
    "status": "completed",
    "started_at": "2024-01-15T12:00:00Z",
    "completed_at": "2024-01-15T12:00:45Z",
    "steps_completed": 5,
    "steps_total": 5,
    "verdict": {
      "classification": "malicious",
      "confidence": 0.92
    }
  }
}

Get Playbook Executions

GET /api/playbooks/:id/executions

Response

{
  "data": [
    {
      "execution_id": "exec-abc123",
      "incident_id": "550e8400-e29b-41d4-a716-446655440000",
      "status": "completed",
      "duration_ms": 45000,
      "started_at": "2024-01-15T12:00:00Z"
    }
  ]
}

Validate Playbook

POST /api/playbooks/validate

Validate playbook YAML without creating it.

Request Body

{
  "content": "name: test\nsteps:\n  - action: parse_email"
}

Response (Valid)

{
  "data": {
    "valid": true,
    "warnings": []
  }
}

Response (Invalid)

{
  "data": {
    "valid": false,
    "errors": [
      {
        "line": 3,
        "message": "Unknown action: invalid_action"
      }
    ]
  }
}

Export Playbook

GET /api/playbooks/:id/export

Download playbook as YAML file.

Response

name: phishing_triage
description: Automated phishing email analysis
version: "2.0"
...

Webhooks API

Receive events from external security tools.

Endpoint

POST /api/webhooks/:source

Where :source identifies the sending system (e.g., email-gateway, edr, siem).

Authentication

Webhooks are authenticated via HMAC signatures:

X-Webhook-Signature: sha256=abc123...
X-Webhook-Timestamp: 1705320000

Registering Webhook Sources

Via CLI

tw-cli webhook add email-gateway \
  --secret "your-secret-key" \
  --auto-triage true \
  --playbook phishing_triage

Via API

curl -X POST "http://localhost:8080/api/webhooks" \
  -H "Authorization: Bearer tw_xxx" \
  -d '{
    "source": "email-gateway",
    "secret": "your-secret-key",
    "auto_triage": true,
    "playbook": "phishing_triage"
  }'

Payload Formats

Generic Format

{
  "event_type": "security_alert",
  "timestamp": "2024-01-15T10:00:00Z",
  "source": "email-gateway",
  "data": {
    "alert_id": "alert-123",
    "severity": "high",
    "details": {...}
  }
}

Microsoft Defender for Office 365

{
  "eventType": "PhishingEmail",
  "id": "AAMkAGI2...",
  "creationTime": "2024-01-15T10:00:00Z",
  "severity": "high",
  "category": "Phish",
  "entityType": "Email",
  "data": {
    "sender": "phisher@malicious.com",
    "subject": "Urgent Action Required",
    "recipients": ["user@company.com"]
  }
}

CrowdStrike Falcon

{
  "metadata": {
    "eventType": "DetectionSummaryEvent",
    "eventCreationTime": 1705320000000
  },
  "event": {
    "DetectId": "ldt:abc123",
    "Severity": 4,
    "HostnameField": "WORKSTATION-01",
    "DetectName": "Malicious File Detected"
  }
}

Splunk Alert

{
  "result": {
    "host": "server-01",
    "source": "WinEventLog:Security",
    "sourcetype": "WinEventLog",
    "_raw": "...",
    "EventCode": "4625"
  },
  "search_name": "Failed Login Alert",
  "trigger_time": 1705320000
}

Response

Success

{
  "status": "accepted",
  "incident_id": "550e8400-e29b-41d4-a716-446655440000",
  "incident_number": "INC-2024-0001"
}

Queued for Processing

{
  "status": "queued",
  "queue_id": "queue-abc123",
  "message": "Event queued for processing"
}

Configuring Auto-Triage

When auto_triage is enabled, incidents created from webhooks are automatically triaged:

# webhook_config.yaml
sources:
  email-gateway:
    secret: "${EMAIL_GATEWAY_SECRET}"
    auto_triage: true
    playbook: phishing_triage
    severity_mapping:
      critical: critical
      high: high
      medium: medium
      low: low

  edr:
    secret: "${EDR_SECRET}"
    auto_triage: true
    playbook: malware_triage

Testing Webhooks

Send Test Event

# Generate signature
TIMESTAMP=$(date +%s)
BODY='{"event_type":"test","data":{}}'
SIGNATURE=$(echo -n "${TIMESTAMP}.${BODY}" | openssl dgst -sha256 -hmac "your-secret")

# Send request
curl -X POST "http://localhost:8080/api/webhooks/email-gateway" \
  -H "Content-Type: application/json" \
  -H "X-Webhook-Signature: sha256=${SIGNATURE}" \
  -H "X-Webhook-Timestamp: ${TIMESTAMP}" \
  -d "${BODY}"

Verify Configuration

tw-cli webhook test email-gateway

Error Handling

Invalid Signature

{
  "error": {
    "code": "invalid_signature",
    "message": "Webhook signature verification failed"
  }
}

Unknown Source

{
  "error": {
    "code": "unknown_source",
    "message": "Webhook source 'unknown' is not registered"
  }
}

Replay Attack

{
  "error": {
    "code": "timestamp_expired",
    "message": "Webhook timestamp is too old (>5 minutes)"
  }
}

Monitoring Webhooks

Metrics

# Webhook receive rate
rate(webhook_received_total[5m])

# Error rate by source
rate(webhook_errors_total[5m])

Logs

tw-cli logs --filter webhook --tail 100

Contributing

Guide to contributing to Triage Warden.

Getting Started

  1. Fork the repository
  2. Clone your fork
  3. Set up the development environment
  4. Create a branch for your changes
  5. Submit a pull request

Development Setup

Prerequisites

  • Rust 1.75+
  • Python 3.11+
  • uv (Python package manager)
  • SQLite (for development)

Initial Setup

# Clone repository
git clone https://github.com/your-username/triage-warden.git
cd triage-warden

# Install Rust dependencies
cargo build

# Install Python dependencies
cd python
uv sync
cd ..

# Run tests
cargo test
cd python && uv run pytest

Code Style

Rust

  • Follow standard Rust conventions
  • Run cargo fmt before committing
  • Run cargo clippy and fix warnings
  • Document public APIs with doc comments

Python

  • Follow PEP 8
  • Run ruff check and black before committing
  • Type hints required (mypy strict mode)
  • Docstrings for public functions

Pre-commit Hooks

Install pre-commit hooks:

# The project has pre-commit configured in .git/hooks
# It runs automatically on commit:
# - cargo fmt
# - cargo clippy
# - ruff
# - black
# - mypy

Pull Request Process

  1. Create a branch

    git checkout -b feature/my-feature
    
  2. Make changes

    • Write code
    • Add tests
    • Update documentation
  3. Run checks

    cargo fmt && cargo clippy
    cargo test
    cd python && uv run pytest
    
  4. Commit

    git commit -m "feat: add new feature"
    
  5. Push and create PR

    git push origin feature/my-feature
    
  6. Address review feedback

Commit Messages

Follow conventional commits:

type(scope): description

[optional body]

[optional footer]

Types:

  • feat: New feature
  • fix: Bug fix
  • docs: Documentation
  • refactor: Code refactoring
  • test: Adding tests
  • chore: Maintenance

Testing

Rust Tests

# Run all tests
cargo test

# Run specific crate tests
cargo test -p tw-api

# Run with output
cargo test -- --nocapture

Python Tests

cd python
uv run pytest

# Run specific tests
uv run pytest tests/test_agents.py

# With coverage
uv run pytest --cov=tw_ai

Integration Tests

# Start test server
cargo run --bin tw-api &

# Run integration tests
./scripts/integration-tests.sh

Documentation

  • Update docs for API changes
  • Add examples for new features
  • Keep README.md current

Build docs locally:

cd docs-site
mdbook serve

Issue Reporting

When reporting issues:

  1. Search existing issues first
  2. Use issue templates
  3. Include:
    • Version information
    • Steps to reproduce
    • Expected vs actual behavior
    • Relevant logs

Questions

  • Open a GitHub Discussion
  • Check existing discussions first
  • Tag appropriately

License

By contributing, you agree that your contributions will be licensed under the MIT License.

Building from Source

Complete guide to building Triage Warden.

Prerequisites

Rust

# Install Rust via rustup
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Verify installation
rustc --version  # Should be 1.75+

Python

# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh

# Verify installation
uv --version

System Dependencies

macOS

brew install openssl pkg-config

Ubuntu/Debian

sudo apt-get install build-essential pkg-config libssl-dev

Fedora

sudo dnf install gcc openssl-devel pkgconfig

Building

Debug Build

cargo build

Outputs:

  • target/debug/tw-api
  • target/debug/tw-cli

Release Build

cargo build --release

Outputs:

  • target/release/tw-api
  • target/release/tw-cli

Python Package

cd python
uv sync
uv build

PyO3 Bridge

The bridge is built automatically with cargo:

cd tw-bridge
cargo build --release

Build Options

Feature Flags

# Build with PostgreSQL support only
cargo build --no-default-features --features postgres

# Build with all features
cargo build --all-features

Cross-Compilation

# For Linux (from macOS)
rustup target add x86_64-unknown-linux-gnu
cargo build --release --target x86_64-unknown-linux-gnu

# For musl (static binary)
rustup target add x86_64-unknown-linux-musl
cargo build --release --target x86_64-unknown-linux-musl

Docker Build

Build Image

docker build -t triage-warden .

Multi-Stage Dockerfile

# Builder stage
FROM rust:1.75 as builder
WORKDIR /app
COPY . .
RUN cargo build --release

# Runtime stage
FROM debian:bookworm-slim
COPY --from=builder /app/target/release/tw-api /usr/local/bin/
CMD ["tw-api"]

Verification

Run Tests

# Rust tests
cargo test

# Python tests
cd python && uv run pytest

# All tests
./scripts/test-all.sh

Linting

# Rust
cargo fmt --check
cargo clippy -- -D warnings

# Python
cd python
uv run ruff check
uv run black --check .
uv run mypy .

Smoke Test

# Start server
./target/release/tw-api &

# Health check
curl http://localhost:8080/api/health

# Stop server
kill %1

Troubleshooting

OpenSSL Errors

# macOS
export OPENSSL_DIR=$(brew --prefix openssl)

# Linux
export OPENSSL_DIR=/usr

PyO3 Build Issues

# Ensure Python is found
export PYO3_PYTHON=$(which python3)

# Clean and rebuild
cargo clean -p tw-bridge
cargo build -p tw-bridge

Out of Memory

# Reduce parallel jobs
cargo build -j 2

Testing

Guide to testing Triage Warden.

Test Structure

triage-warden/
├── crates/
│   ├── tw-api/src/
│   │   └── tests/           # API integration tests
│   ├── tw-core/src/
│   │   └── tests/           # Core unit tests
│   └── tw-actions/src/
│       └── tests/           # Action handler tests
└── python/
    └── tests/               # Python tests

Running Tests

All Tests

# Rust
cargo test

# Python
cd python && uv run pytest

# Everything
./scripts/test-all.sh

Specific Tests

# Single crate
cargo test -p tw-api

# Single test
cargo test test_incident_creation

# Pattern match
cargo test incident

# With output
cargo test -- --nocapture

Unit Tests

Rust Unit Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_incident_creation() {
        let incident = Incident::new(
            IncidentType::Phishing,
            Severity::High,
        );
        assert_eq!(incident.status, IncidentStatus::Open);
    }

    #[tokio::test]
    async fn test_async_operation() {
        let result = async_function().await;
        assert!(result.is_ok());
    }
}
}

Python Unit Tests

import pytest
from tw_ai.agents import TriageAgent

def test_agent_creation():
    agent = TriageAgent()
    assert agent.model == "claude-sonnet-4-20250514"

@pytest.mark.asyncio
async def test_triage():
    agent = TriageAgent()
    verdict = await agent.triage(mock_incident)
    assert verdict.classification in ["malicious", "benign"]

Integration Tests

API Integration Tests

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_incident_api() {
    let app = create_test_app().await;

    // Create incident
    let response = app
        .oneshot(
            Request::builder()
                .method("POST")
                .uri("/api/incidents")
                .header("Content-Type", "application/json")
                .body(Body::from(r#"{"type":"phishing"}"#))
                .unwrap(),
        )
        .await
        .unwrap();

    assert_eq!(response.status(), StatusCode::CREATED);
}
}

Database Tests

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_repository() {
    // Use in-memory SQLite
    let pool = create_test_pool().await;
    let repo = SqliteIncidentRepository::new(pool);

    let incident = repo.create(&new_incident).await.unwrap();
    let found = repo.get(incident.id).await.unwrap();

    assert_eq!(found.unwrap().id, incident.id);
}
}

Test Fixtures

Rust Fixtures

#![allow(unused)]
fn main() {
// tests/fixtures.rs
pub fn mock_incident() -> Incident {
    Incident {
        id: Uuid::new_v4(),
        incident_type: IncidentType::Phishing,
        severity: Severity::High,
        status: IncidentStatus::Open,
        raw_data: json!({"subject": "Test"}),
        ..Default::default()
    }
}
}

Python Fixtures

# tests/conftest.py
import pytest

@pytest.fixture
def mock_incident():
    return {
        "id": "test-123",
        "type": "phishing",
        "severity": "high",
        "raw_data": {"subject": "Test Email"}
    }

@pytest.fixture
def mock_connector():
    return MockThreatIntelConnector()

Mocking

Rust Mocking

#![allow(unused)]
fn main() {
use mockall::mock;

mock! {
    ThreatIntelConnector {}

    #[async_trait]
    impl ThreatIntelConnector for ThreatIntelConnector {
        async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport>;
    }
}

#[tokio::test]
async fn test_with_mock() {
    let mut mock = MockThreatIntelConnector::new();
    mock.expect_lookup_hash()
        .returning(|_| Ok(ThreatReport::clean()));

    let result = function_using_connector(&mock).await;
    assert!(result.is_ok());
}
}

Python Mocking

from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    with patch("tw_ai.agents.tools.lookup_hash") as mock:
        mock.return_value = {"malicious": False}

        agent = TriageAgent()
        verdict = await agent.triage(mock_incident)

        mock.assert_called_once()

Test Coverage

Rust Coverage

cargo install cargo-tarpaulin
cargo tarpaulin --out Html

Python Coverage

cd python
uv run pytest --cov=tw_ai --cov-report=html

CI Testing

GitHub Actions runs tests on every PR:

# .github/workflows/test.yml
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: dtolnay/rust-toolchain@stable
      - run: cargo test
      - run: cargo clippy -- -D warnings

Test Data

Evaluation Test Cases

Test cases for AI triage evaluation:

# python/tw_ai/evaluation/test_cases/phishing.yaml
- name: obvious_phishing
  input:
    sender: "security@fake-bank.com"
    subject: "Urgent: Verify Account"
    urls: ["https://phishing-site.com/login"]
    auth_results: {spf: fail, dkim: fail}
  expected:
    classification: malicious
    min_confidence: 0.8

Run evaluation:

cd python
uv run pytest tests/test_evaluation.py

Adding Connectors

Guide to implementing new connectors.

Connector Architecture

Connectors follow a trait-based pattern:

Connector Trait (base)
    │
    ├── ThreatIntelConnector
    ├── SIEMConnector
    ├── EDRConnector
    ├── EmailGatewayConnector
    └── TicketingConnector

Implementing a Connector

1. Create the File

touch crates/tw-connectors/src/threat_intel/my_provider.rs

2. Implement Base Trait

#![allow(unused)]
fn main() {
use crate::traits::{Connector, ConnectorError, ConnectorHealth, ConnectorResult};
use async_trait::async_trait;

pub struct MyProviderConnector {
    client: reqwest::Client,
    api_key: String,
    base_url: String,
}

impl MyProviderConnector {
    pub fn new(api_key: String) -> Result<Self, ConnectorError> {
        let client = reqwest::Client::builder()
            .timeout(std::time::Duration::from_secs(30))
            .build()
            .map_err(|e| ConnectorError::Configuration(e.to_string()))?;

        Ok(Self {
            client,
            api_key,
            base_url: "https://api.myprovider.com".to_string(),
        })
    }
}

#[async_trait]
impl Connector for MyProviderConnector {
    fn name(&self) -> &str {
        "my_provider"
    }

    fn connector_type(&self) -> &str {
        "threat_intel"
    }

    async fn health_check(&self) -> ConnectorResult<ConnectorHealth> {
        let response = self.client
            .get(format!("{}/health", self.base_url))
            .header("Authorization", format!("Bearer {}", self.api_key))
            .send()
            .await
            .map_err(|e| ConnectorError::NetworkError(e.to_string()))?;

        if response.status().is_success() {
            Ok(ConnectorHealth::Healthy)
        } else {
            Ok(ConnectorHealth::Unhealthy {
                message: "Health check failed".to_string(),
            })
        }
    }

    async fn test_connection(&self) -> ConnectorResult<bool> {
        match self.health_check().await? {
            ConnectorHealth::Healthy => Ok(true),
            _ => Ok(false),
        }
    }
}
}

3. Implement Specialized Trait

#![allow(unused)]
fn main() {
use crate::traits::{ThreatIntelConnector, ThreatReport, IndicatorType};

#[async_trait]
impl ThreatIntelConnector for MyProviderConnector {
    async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport> {
        let response = self.client
            .get(format!("{}/files/{}", self.base_url, hash))
            .header("Authorization", format!("Bearer {}", self.api_key))
            .send()
            .await
            .map_err(|e| ConnectorError::NetworkError(e.to_string()))?;

        if response.status() == reqwest::StatusCode::NOT_FOUND {
            return Ok(ThreatReport {
                indicator: hash.to_string(),
                indicator_type: IndicatorType::FileHash,
                malicious: false,
                confidence: 0.0,
                categories: vec![],
                first_seen: None,
                last_seen: None,
                sources: vec![],
            });
        }

        let data: ApiResponse = response.json().await
            .map_err(|e| ConnectorError::InvalidResponse(e.to_string()))?;

        Ok(self.convert_response(data))
    }

    async fn lookup_url(&self, url: &str) -> ConnectorResult<ThreatReport> {
        // Similar implementation
        todo!()
    }

    async fn lookup_domain(&self, domain: &str) -> ConnectorResult<ThreatReport> {
        // Similar implementation
        todo!()
    }

    async fn lookup_ip(&self, ip: &str) -> ConnectorResult<ThreatReport> {
        // Similar implementation
        todo!()
    }
}
}

4. Add to Module

#![allow(unused)]
fn main() {
// crates/tw-connectors/src/threat_intel/mod.rs
mod my_provider;
pub use my_provider::MyProviderConnector;
}

5. Register in Bridge

#![allow(unused)]
fn main() {
// tw-bridge/src/lib.rs
impl ThreatIntelBridge {
    pub fn new(mode: &str) -> PyResult<Self> {
        let connector: Arc<dyn ThreatIntelConnector + Send + Sync> = match mode {
            "virustotal" => Arc::new(VirusTotalConnector::new(
                std::env::var("TW_VIRUSTOTAL_API_KEY")
                    .map_err(|_| PyErr::new::<pyo3::exceptions::PyValueError, _>(
                        "TW_VIRUSTOTAL_API_KEY not set"
                    ))?
            )?),
            "my_provider" => Arc::new(MyProviderConnector::new(
                std::env::var("TW_MY_PROVIDER_API_KEY")
                    .map_err(|_| PyErr::new::<pyo3::exceptions::PyValueError, _>(
                        "TW_MY_PROVIDER_API_KEY not set"
                    ))?
            )?),
            _ => Arc::new(MockThreatIntelConnector::new("mock")),
        };

        Ok(Self { connector })
    }
}
}

Error Handling

Use appropriate error types:

#![allow(unused)]
fn main() {
pub enum ConnectorError {
    /// Configuration issue
    Configuration(String),

    /// Network/connection error
    NetworkError(String),

    /// Authentication failed
    AuthenticationFailed(String),

    /// Resource not found
    NotFound(String),

    /// Rate limited
    RateLimited { retry_after: Option<Duration> },

    /// Invalid response from service
    InvalidResponse(String),

    /// Request failed
    RequestFailed(String),
}
}

Rate Limiting

Implement rate limiting in your connector:

#![allow(unused)]
fn main() {
use governor::{Quota, RateLimiter};

pub struct MyProviderConnector {
    client: reqwest::Client,
    api_key: String,
    rate_limiter: RateLimiter<...>,
}

impl MyProviderConnector {
    async fn make_request(&self, url: &str) -> ConnectorResult<Response> {
        self.rate_limiter.until_ready().await;

        self.client.get(url)
            .header("Authorization", format!("Bearer {}", self.api_key))
            .send()
            .await
            .map_err(|e| ConnectorError::NetworkError(e.to_string()))
    }
}
}

Testing

Unit Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;
    use wiremock::{MockServer, Mock, ResponseTemplate};
    use wiremock::matchers::{method, path};

    #[tokio::test]
    async fn test_lookup_hash() {
        let mock_server = MockServer::start().await;

        Mock::given(method("GET"))
            .and(path("/files/abc123"))
            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
                "malicious": true,
                "confidence": 0.95
            })))
            .mount(&mock_server)
            .await;

        let connector = MyProviderConnector::with_base_url(
            "test-key".to_string(),
            mock_server.uri(),
        );

        let result = connector.lookup_hash("abc123").await.unwrap();
        assert!(result.malicious);
    }
}
}

Documentation

Document your connector:

#![allow(unused)]
fn main() {
//! MyProvider threat intelligence connector.
//!
//! # Configuration
//!
//! Set `TW_MY_PROVIDER_API_KEY` environment variable.
//!
//! # Example
//!
//! ```rust
//! let connector = MyProviderConnector::new(api_key)?;
//! let report = connector.lookup_hash("abc123").await?;
//! ```
}

Adding Actions

Guide to implementing new action handlers.

Action Architecture

Actions implement the Action trait:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Action: Send + Sync {
    fn name(&self) -> &str;
    fn description(&self) -> &str;
    fn required_parameters(&self) -> Vec<ParameterDef>;
    fn supports_rollback(&self) -> bool;

    async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError>;

    async fn rollback(&self, context: ActionContext) -> Result<ActionResult, ActionError> {
        Err(ActionError::RollbackNotSupported)
    }
}
}

Implementing an Action

1. Create the File

touch crates/tw-actions/src/my_action.rs

2. Define the Action

#![allow(unused)]
fn main() {
use crate::registry::{
    Action, ActionContext, ActionError, ActionResult, ParameterDef, ParameterType,
};
use async_trait::async_trait;
use chrono::Utc;
use std::collections::HashMap;
use tracing::{info, instrument};

/// My custom action handler.
pub struct MyAction;

impl MyAction {
    pub fn new() -> Self {
        Self
    }
}

impl Default for MyAction {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl Action for MyAction {
    fn name(&self) -> &str {
        "my_action"
    }

    fn description(&self) -> &str {
        "Description of what this action does"
    }

    fn required_parameters(&self) -> Vec<ParameterDef> {
        vec![
            ParameterDef::required(
                "target",
                "The target of the action",
                ParameterType::String,
            ),
            ParameterDef::optional(
                "force",
                "Force the action even if conditions aren't met",
                ParameterType::Boolean,
                serde_json::json!(false),
            ),
        ]
    }

    fn supports_rollback(&self) -> bool {
        true
    }

    #[instrument(skip(self, context))]
    async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError> {
        let started_at = Utc::now();

        // Get required parameter
        let target = context.require_string("target")?;

        // Get optional parameter with default
        let force = context
            .get_param("force")
            .and_then(|v| v.as_bool())
            .unwrap_or(false);

        info!("Executing my_action on target: {}", target);

        // Perform the action
        // ...

        // Build output
        let mut output = HashMap::new();
        output.insert("target".to_string(), serde_json::json!(target));
        output.insert("success".to_string(), serde_json::json!(true));

        Ok(ActionResult::success(
            self.name(),
            &format!("Action completed on {}", target),
            started_at,
            output,
        ))
    }

    async fn rollback(&self, context: ActionContext) -> Result<ActionResult, ActionError> {
        let started_at = Utc::now();
        let target = context.require_string("target")?;

        info!("Rolling back my_action on target: {}", target);

        // Perform rollback
        // ...

        let mut output = HashMap::new();
        output.insert("target".to_string(), serde_json::json!(target));

        Ok(ActionResult::success(
            &format!("{}_rollback", self.name()),
            &format!("Rollback completed on {}", target),
            started_at,
            output,
        ))
    }
}
}

3. Add to Module

#![allow(unused)]
fn main() {
// crates/tw-actions/src/lib.rs
mod my_action;
pub use my_action::MyAction;
}

4. Register in Registry

#![allow(unused)]
fn main() {
// crates/tw-actions/src/registry.rs
impl ActionRegistry {
    pub fn new() -> Self {
        let mut registry = Self {
            actions: HashMap::new(),
        };

        // Register built-in actions
        registry.register(Box::new(QuarantineEmailAction::new()));
        registry.register(Box::new(BlockSenderAction::new()));
        registry.register(Box::new(MyAction::new())); // Add here

        registry
    }
}
}

Parameter Types

Available parameter types:

#![allow(unused)]
fn main() {
pub enum ParameterType {
    String,
    Integer,
    Float,
    Boolean,
    List,
    Object,
}
}

Define parameters:

#![allow(unused)]
fn main() {
fn required_parameters(&self) -> Vec<ParameterDef> {
    vec![
        ParameterDef::required("name", "Description", ParameterType::String),
        ParameterDef::optional("count", "Description", ParameterType::Integer, json!(10)),
        ParameterDef::optional("tags", "Description", ParameterType::List, json!([])),
    ]
}
}

Using Connectors

Actions can use connectors via dependency injection:

#![allow(unused)]
fn main() {
pub struct MyAction {
    connector: Arc<dyn MyConnector + Send + Sync>,
}

impl MyAction {
    pub fn new(connector: Arc<dyn MyConnector + Send + Sync>) -> Self {
        Self { connector }
    }
}

#[async_trait]
impl Action for MyAction {
    async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError> {
        // Use connector
        let result = self.connector.do_something().await
            .map_err(|e| ActionError::ExecutionFailed(e.to_string()))?;

        // ...
    }
}
}

Error Handling

Use appropriate error types:

#![allow(unused)]
fn main() {
pub enum ActionError {
    /// Missing or invalid parameters
    InvalidParameters(String),

    /// Execution failed
    ExecutionFailed(String),

    /// Action timed out
    Timeout,

    /// Rollback not supported
    RollbackNotSupported,

    /// Policy denied the action
    PolicyDenied(String),
}
}

Testing

Unit Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;
    use uuid::Uuid;

    #[tokio::test]
    async fn test_my_action_success() {
        let action = MyAction::new();

        let context = ActionContext::new(Uuid::new_v4())
            .with_param("target", serde_json::json!("test-target"));

        let result = action.execute(context).await.unwrap();

        assert!(result.success);
        assert_eq!(result.output["target"], "test-target");
    }

    #[tokio::test]
    async fn test_my_action_missing_param() {
        let action = MyAction::new();
        let context = ActionContext::new(Uuid::new_v4());

        let result = action.execute(context).await;

        assert!(matches!(result, Err(ActionError::InvalidParameters(_))));
    }

    #[tokio::test]
    async fn test_my_action_rollback() {
        let action = MyAction::new();
        assert!(action.supports_rollback());

        let context = ActionContext::new(Uuid::new_v4())
            .with_param("target", serde_json::json!("test-target"));

        let result = action.rollback(context).await.unwrap();
        assert!(result.success);
    }
}
}

Policy Integration

Actions are automatically evaluated by the policy engine. Configure default approval:

# Default policy for new action
[[policy.rules]]
name = "my_action_default"
action = "my_action"
approval_level = "analyst"

Documentation

Document your action:

#![allow(unused)]
fn main() {
//! My custom action.
//!
//! This action performs X on target Y.
//!
//! # Parameters
//!
//! - `target` (required): The target to act on
//! - `force` (optional): Force execution (default: false)
//!
//! # Example
//!
//! ```yaml
//! - action: my_action
//!   parameters:
//!     target: "example"
//!     force: true
//! ```
//!
//! # Rollback
//!
//! This action supports rollback via `my_action_rollback`.
}

Changelog

All notable changes to Triage Warden.

[Unreleased]

Added

  • AI-powered triage agent with Claude integration
  • Configurable playbooks for automated investigation
  • Policy engine with approval workflows
  • Connector framework for external integrations
  • Web dashboard with HTMX
  • REST API for programmatic access
  • CLI for command-line operations

Connectors

  • VirusTotal threat intelligence
  • Splunk SIEM integration
  • CrowdStrike EDR integration
  • Microsoft 365 email gateway
  • Jira ticketing integration

Actions

  • Email: parse_email, check_email_authentication, quarantine_email, block_sender
  • Lookup: lookup_sender_reputation, lookup_urls, lookup_attachments
  • Host: isolate_host, scan_host
  • Notification: notify_user, escalate, create_ticket

[0.1.0] - 2024-01-15

Added

  • Initial release
  • Core incident management
  • Basic web interface
  • SQLite database support
  • Mock connectors for development

Version Numbering

This project follows Semantic Versioning:

  • MAJOR: Incompatible API changes
  • MINOR: Backwards-compatible new features
  • PATCH: Backwards-compatible bug fixes

Upgrade Guide

From 0.x to 1.0

When 1.0 is released, an upgrade guide will be provided here.