Triage Warden
AI-powered security incident triage and response platform
Triage Warden automates the analysis and response to security incidents using AI agents, configurable playbooks, and integrations with your existing security stack.
Features
- AI-Powered Triage: Automated analysis of phishing emails, malware alerts, and suspicious login attempts
- Configurable Playbooks: Define custom investigation and response workflows
- Policy Engine: Role-based approval workflows for sensitive actions
- Connector Framework: Integrate with VirusTotal, Splunk, CrowdStrike, Jira, Microsoft 365, and more
- Web Dashboard: Real-time incident management with approval workflows
- REST API: Programmatic access for automation and integration
- Audit Trail: Complete logging of all actions and decisions
Quick Example
# Analyze a phishing email
tw-cli incident create --type phishing --source "email-gateway" --data '{"subject": "Urgent: Update Account"}'
# Run AI triage
tw-cli triage run --incident INC-2024-001
# View the verdict
tw-cli incident get INC-2024-001 --format json
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ Web Dashboard │
│ (HTMX + Askama Templates) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ REST API │
│ (Axum + Tower) │
└─────────────────────────────────────────────────────────────────┘
│
┌───────────────────────┼───────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────────┐ ┌───────────────┐
│ Policy Engine │ │ AI Triage Agent │ │ Actions │
│ (Rust) │ │ (Python) │ │ (Rust) │
└───────────────┘ └───────────────────┘ └───────────────┘
│ │ │
└───────────────────────┼───────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Connector Layer │
│ (VirusTotal, Splunk, CrowdStrike, Jira, M365) │
└─────────────────────────────────────────────────────────────────┘
Getting Started
- Installation - Install Triage Warden
- Quick Start - Create your first incident
- Configuration - Configure connectors and policies
License
Triage Warden is licensed under the MIT License.
Getting Started
Welcome to Triage Warden! This guide will help you get up and running quickly.
Prerequisites
- Rust 1.75+ (for building from source)
- Python 3.11+ (for AI triage agents)
- SQLite or PostgreSQL (for data storage)
- uv (recommended Python package manager)
Installation Options
- From Source - Build and run locally
- Docker - Run in containers
- Pre-built Binaries - Download releases
Next Steps
- Installation - Detailed installation instructions
- Quick Start - Create your first incident in 5 minutes
- Configuration - Configure connectors and settings
Installation
Building from Source
Prerequisites
# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Install uv (Python package manager)
curl -LsSf https://astral.sh/uv/install.sh | sh
Clone and Build
# Clone the repository
git clone https://github.com/your-org/triage-warden.git
cd triage-warden
# Build Rust components
cargo build --release
# Install Python dependencies
cd python
uv sync
Verify Installation
# Check the CLI
./target/release/tw-cli --version
# Run tests
cargo test
cd python && uv run pytest
Docker
# Build the image
docker build -t triage-warden .
# Run with default settings
docker run -p 8080:8080 triage-warden
# Run with custom configuration
docker run -p 8080:8080 \
-e TW_DATABASE_URL=postgres://user:pass@host/db \
-e TW_VIRUSTOTAL_API_KEY=your-key \
triage-warden
Pre-built Binaries
Download the latest release from the releases page.
Available platforms:
- Linux x86_64 (glibc)
- Linux x86_64 (musl)
- macOS x86_64
- macOS aarch64 (Apple Silicon)
# Example for macOS
curl -LO https://github.com/your-org/triage-warden/releases/latest/download/triage-warden-macos-aarch64.tar.gz
tar xzf triage-warden-macos-aarch64.tar.gz
./tw-cli --version
Database Setup
SQLite (Default)
SQLite is used by default. The database file is created automatically:
# Default location
TW_DATABASE_URL=sqlite://./triage_warden.db
# Custom location
TW_DATABASE_URL=sqlite:///var/lib/triage-warden/data.db
PostgreSQL
For production deployments:
# Create database
createdb triage_warden
# Set connection string
export TW_DATABASE_URL=postgres://user:password@localhost/triage_warden
# Run migrations
tw-cli db migrate
Next Steps
- Quick Start - Create your first incident
- Configuration - Configure the system
Quick Start
Get Triage Warden running and process your first incident in 5 minutes.
1. Start the Server
# Start with default settings (SQLite, mock connectors)
cargo run --bin tw-api
# Or use the release binary
./target/release/tw-api
The web dashboard is now available at http://localhost:8080.
2. Create an Incident
Via Web Dashboard
- Open
http://localhost:8080in your browser - Click "New Incident"
- Fill in the incident details:
- Type: Phishing
- Source: Email Gateway
- Severity: Medium
- Click Create
Via CLI
tw-cli incident create \
--type phishing \
--source "email-gateway" \
--severity medium \
--data '{
"subject": "Urgent: Verify Your Account",
"sender": "security@fake-bank.com",
"recipient": "employee@company.com"
}'
Via API
curl -X POST http://localhost:8080/api/incidents \
-H "Content-Type: application/json" \
-d '{
"incident_type": "phishing",
"source": "email-gateway",
"severity": "medium",
"raw_data": {
"subject": "Urgent: Verify Your Account",
"sender": "security@fake-bank.com"
}
}'
3. Run AI Triage
# Trigger triage for the incident
tw-cli triage run --incident INC-2024-0001
The AI agent will:
- Parse email headers and content
- Check sender reputation
- Analyze URLs and attachments
- Generate a verdict with confidence score
4. View the Verdict
# Get incident with triage results
tw-cli incident get INC-2024-0001
# Example output:
# Incident: INC-2024-0001
# Type: phishing
# Status: triaged
# Verdict: malicious
# Confidence: 0.92
# Recommended Actions:
# - quarantine_email
# - block_sender
# - notify_user
5. Execute Actions
Actions may require approval based on your policy configuration:
# Request to quarantine the email
tw-cli action execute --incident INC-2024-0001 --action quarantine_email
# If auto-approved:
# Action executed: quarantine_email (status: completed)
# If requires approval:
# Action pending approval from: Senior Analyst
Approve pending actions via the dashboard at /approvals.
Next Steps
- Configuration - Set up real connectors
- Playbooks - Create automated workflows
- Policy Engine - Configure approval rules
Configuration
Triage Warden is configured through environment variables and configuration files.
Environment Variables
Core Settings
| Variable | Description | Default |
|---|---|---|
TW_DATABASE_URL | Database connection string | sqlite://./triage_warden.db |
TW_HOST | API server host | 0.0.0.0 |
TW_PORT | API server port | 8080 |
TW_LOG_LEVEL | Logging level (trace, debug, info, warn, error) | info |
TW_ADMIN_PASSWORD | Initial admin password | (generated) |
Connector Selection
| Variable | Description | Values |
|---|---|---|
TW_THREAT_INTEL_MODE | Threat intelligence backend | mock, virustotal |
TW_SIEM_MODE | SIEM backend | mock, splunk |
TW_EDR_MODE | EDR backend | mock, crowdstrike |
TW_EMAIL_GATEWAY_MODE | Email gateway backend | mock, m365 |
TW_TICKETING_MODE | Ticketing backend | mock, jira |
VirusTotal
TW_THREAT_INTEL_MODE=virustotal
TW_VIRUSTOTAL_API_KEY=your-api-key-here
Splunk
TW_SIEM_MODE=splunk
TW_SPLUNK_URL=https://splunk.company.com:8089
TW_SPLUNK_TOKEN=your-token-here
CrowdStrike
TW_EDR_MODE=crowdstrike
TW_CROWDSTRIKE_CLIENT_ID=your-client-id
TW_CROWDSTRIKE_CLIENT_SECRET=your-client-secret
TW_CROWDSTRIKE_REGION=us-1 # us-1, us-2, eu-1
Microsoft 365
TW_EMAIL_GATEWAY_MODE=m365
TW_M365_TENANT_ID=your-tenant-id
TW_M365_CLIENT_ID=your-client-id
TW_M365_CLIENT_SECRET=your-client-secret
Jira
TW_TICKETING_MODE=jira
TW_JIRA_URL=https://company.atlassian.net
TW_JIRA_EMAIL=automation@company.com
TW_JIRA_API_TOKEN=your-api-token
TW_JIRA_PROJECT_KEY=SEC
AI Provider
TW_AI_PROVIDER=anthropic # anthropic, openai
TW_ANTHROPIC_API_KEY=your-api-key
# or
TW_OPENAI_API_KEY=your-api-key
Configuration File
For complex configurations, use a TOML file:
# config.toml
[server]
host = "0.0.0.0"
port = 8080
log_level = "info"
[database]
url = "postgres://user:pass@localhost/triage_warden"
max_connections = 10
[connectors.threat_intel]
mode = "virustotal"
api_key = "${TW_VIRUSTOTAL_API_KEY}"
rate_limit = 4 # requests per minute
[connectors.siem]
mode = "splunk"
url = "https://splunk.company.com:8089"
token = "${TW_SPLUNK_TOKEN}"
[connectors.edr]
mode = "crowdstrike"
client_id = "${TW_CROWDSTRIKE_CLIENT_ID}"
client_secret = "${TW_CROWDSTRIKE_CLIENT_SECRET}"
region = "us-1"
[ai]
provider = "anthropic"
model = "claude-sonnet-4-20250514"
max_tokens = 4096
[policy]
default_action_approval = "auto" # auto, analyst, senior, manager
high_severity_approval = "senior"
critical_action_approval = "manager"
Load with:
tw-api --config config.toml
Policy Rules
Policy rules control action approval requirements. See Policy Engine for details.
# Example policy rule
[[policy.rules]]
name = "isolate_host_requires_manager"
action = "isolate_host"
severity = ["high", "critical"]
approval_level = "manager"
Logging
Configure structured logging:
# JSON output for production
TW_LOG_FORMAT=json
# Pretty output for development
TW_LOG_FORMAT=pretty
# Filter specific modules
RUST_LOG=tw_api=debug,tw_core=info
Next Steps
- Connectors - Detailed connector configuration
- Policy Engine - Configure approval workflows
- API Authentication - Set up API access
Web Dashboard
Browser-based interface for incident management.
Overview
The dashboard provides:
- Real-time incident monitoring
- Approval workflow management
- Playbook configuration
- System settings
Access at: http://localhost:8080
Features
Home Dashboard
The main dashboard displays:
- KPIs: Open incidents, pending approvals, triage rate
- Recent Incidents: Latest incidents with status
- Trend Charts: Incident volume over time
- Quick Actions: Create incident, run playbook
Incident Management
- List view with filtering and sorting
- Detail view with full incident context
- Action execution interface
- Triage results and reasoning
Approval Workflow
- Queue of pending approvals
- One-click approve/reject
- Bulk approval for related actions
- SLA countdown timers
Playbook Management
- Create and edit playbooks
- Visual step editor
- Test with sample data
- Execution history
Settings
- Connector configuration
- Policy rule management
- User administration
- System preferences
Navigation
| Path | Description |
|---|---|
/ | Dashboard home |
/incidents | Incident list |
/incidents/:id | Incident detail |
/approvals | Pending approvals |
/playbooks | Playbook management |
/settings | System settings |
/login | Login page |
Next Steps
- Incidents - Managing incidents
- Approvals - Approval workflow
- Playbooks - Playbook configuration
- Settings - System settings
Incidents
Managing incidents in the web dashboard.
Incident List
Access at /incidents
Filtering
- Status: Open, Triaged, Resolved
- Severity: Low, Medium, High, Critical
- Type: Phishing, Malware, Suspicious Login
- Date Range: Custom time period
Sorting
Click column headers to sort:
- Created (newest/oldest)
- Severity (highest/lowest)
- Status
Bulk Actions
Select multiple incidents for:
- Bulk resolve
- Bulk escalate
- Export to CSV
Incident Detail
Click an incident to view details.
Overview Tab
- Incident metadata
- AI verdict and confidence
- Recommended actions
- Timeline of events
Raw Data Tab
- Original incident data (JSON)
- Parsed email content (for phishing)
- Detection details (for malware)
Actions Tab
- Available actions
- Executed actions with results
- Pending approvals
Enrichment Tab
- Threat intelligence results
- SIEM correlation data
- Related incidents
Creating Incidents
Click "New Incident" button.
Required Fields
- Type: Select incident type
- Source: Origin of the incident
- Severity: Initial severity assessment
Optional Fields
- Description: Free-form description
- Raw Data: JSON payload
- Assignee: Initial assignment
Executing Actions
From the incident detail page:
- Click "Actions" tab
- Select action from dropdown
- Fill in parameters
- Click "Execute"
If approval is required:
- Action appears in pending state
- Notification sent to approvers
- Status updates when approved/rejected
Keyboard Shortcuts
| Shortcut | Action |
|---|---|
j / k | Navigate list |
Enter | Open incident |
Esc | Close modal |
a | Open actions menu |
e | Escalate |
r | Resolve |
Real-time Updates
The dashboard uses HTMX for live updates:
- New incidents appear automatically
- Status changes reflect immediately
- Approval decisions update in real-time
Approvals
Managing action approvals in the web dashboard.
Approval Queue
Access at /approvals
The queue shows all actions pending your approval based on your role level.
Queue Columns
- Action: Type of action requested
- Incident: Related incident
- Requested By: Who/what requested it
- Requested At: When requested
- SLA: Time remaining to respond
Filtering
- Approval Level: Analyst, Senior, Manager
- Action Type: Specific actions
- Incident Type: Phishing, malware, etc.
Approval Detail
Click an approval to see full context.
Context Section
- Full incident details
- AI reasoning (if from triage)
- Related actions already taken
Decision Section
- Approve: Execute the action
- Reject: Decline with reason
- Delegate: Assign to another approver
Approving Actions
Single Approval
- Click on pending action
- Review incident context
- Click "Approve" or "Reject"
- Add optional comment
- Confirm decision
Bulk Approval
For related actions:
- Select multiple actions (checkbox)
- Click "Bulk Approve" or "Bulk Reject"
- Add comment applying to all
- Confirm
Rejection
When rejecting:
- Click "Reject"
- Required: Enter rejection reason
- Optionally suggest alternative
- Confirm
The requester is notified of rejection and reason.
SLA Indicators
| Color | Meaning |
|---|---|
| Green | Plenty of time |
| Yellow | < 50% time remaining |
| Orange | < 25% time remaining |
| Red | SLA exceeded |
Notifications
You receive notifications for:
- New actions requiring your approval
- SLA warnings (50%, 75% elapsed)
- Escalations to your level
Configure notification preferences in Settings.
Delegation
If unavailable:
- Go to Settings > Delegation
- Select delegate user
- Set date range
- Delegate receives your approvals
Audit Trail
All approvals are logged:
- Who approved/rejected
- When decision was made
- Time to approve
- Comments provided
View at Settings > Audit Logs.
Playbooks
Managing playbooks in the web dashboard.
Playbook List
Access at /playbooks
Views
- Active: Currently enabled playbooks
- Inactive: Disabled playbooks
- All: Complete list
Information Displayed
- Name and description
- Trigger conditions
- Last run time
- Success rate
Creating Playbooks
Click "New Playbook" button.
Basic Information
- Name: Unique identifier
- Description: What this playbook does
- Version: Semantic version
Triggers
Configure when playbook runs:
- Incident Type: Phishing, malware, etc.
- Auto Run: Run automatically on new incidents
- Conditions: Additional criteria
Variables
Define playbook variables:
quarantine_threshold: 0.7
notification_channel: "#security"
Step Editor
Visual editor for playbook steps.
Adding Steps
- Click "Add Step"
- Select action type
- Configure parameters
- Set output variable name
Step Types
- Action: Execute an action
- Condition: Branch logic
- AI Analysis: Get AI verdict
- Parallel: Run steps concurrently
Connections
- Drag to reorder steps
- Connect condition branches
- Set dependencies
Testing Playbooks
Dry Run
- Click "Test"
- Select or create test incident
- Toggle "Dry Run"
- View step-by-step execution
With Live Data
- Click "Test"
- Select real incident
- Leave "Dry Run" off
- Actions will execute (with approval)
Execution History
View past executions:
- Execution timestamp
- Incident processed
- Steps completed
- Final verdict
- Duration
Click execution for detailed trace.
Import/Export
Export
- Select playbook
- Click "Export"
- Download YAML file
Import
- Click "Import"
- Upload YAML file
- Review parsed playbook
- Click "Create"
Playbook Versions
Playbooks are versioned:
- Edit playbook
- Bump version number
- Save as new version
- Old version kept for rollback
View version history and compare changes.
Settings
System configuration in the web dashboard.
Settings Tabs
Access at /settings
General
- Instance Name: Display name for this installation
- Time Zone: Default timezone for display
- Date Format: Date/time display format
- Theme: Light/dark mode preference
Connectors
Configure external integrations.
Threat Intelligence
- Mode: Mock or VirusTotal
- API Key (for VirusTotal)
- Rate limit settings
SIEM
- Mode: Mock or Splunk
- URL and authentication
- Default search index
EDR
- Mode: Mock or CrowdStrike
- OAuth credentials
- Region selection
Email Gateway
- Mode: Mock or Microsoft 365
- Azure AD configuration
- Tenant settings
Ticketing
- Mode: Mock or Jira
- Instance URL
- Project configuration
Policies
Manage policy rules.
Creating Rules
- Click "Add Rule"
- Enter rule name
- Define matching criteria
- Set decision (allow/deny/approval)
- Save
Rule Priority
Drag rules to reorder. First matching rule wins.
Users
User management (admin only).
User List
- Username and email
- Role (viewer/analyst/senior/admin)
- Last login
- Status (active/disabled)
Creating Users
- Click "Add User"
- Enter email and username
- Set initial role
- Generate or set password
- Send invitation email
Role Management
Assign roles:
- Viewer: Read-only access
- Analyst: Execute actions, approve analyst-level
- Senior: Approve senior-level
- Admin: Full access
Notifications
Configure notification preferences.
Channels
- Email: SMTP settings
- Slack: Webhook URL
- Teams: Connector URL
- PagerDuty: Integration key
Preferences
For each notification type:
- Enable/disable channel
- Set priority threshold
- Configure quiet hours
Audit Logs
View system audit trail.
Filtering
- Date range
- Event type
- User
- Resource
Export
Export logs to CSV for compliance.
API Keys
Manage API credentials.
Creating Keys
- Click "Create API Key"
- Enter name and description
- Select scopes
- Set expiration (optional)
- Copy generated key
Revoking Keys
Click "Revoke" on any key. Revocation is immediate.
Backup & Restore
Database management.
Backup
- Click "Create Backup"
- Wait for completion
- Download backup file
Restore
- Click "Restore"
- Upload backup file
- Confirm restore
- System restarts
About
System information:
- Version number
- Build information
- License status
- Support links
CLI Reference
Command-line interface for Triage Warden.
Installation
The CLI is built with the main project:
cargo build --release
./target/release/tw-cli --help
Global Options
tw-cli [OPTIONS] <COMMAND>
Options:
-c, --config <FILE> Configuration file path
-v, --verbose Enable verbose output
-q, --quiet Suppress non-error output
--json Output as JSON
-h, --help Print help
-V, --version Print version
Environment Variables
| Variable | Description |
|---|---|
TW_API_URL | API server URL (default: http://localhost:8080) |
TW_API_KEY | API key for authentication |
TW_CONFIG | Path to config file |
Commands Overview
| Command | Description |
|---|---|
incident | Manage incidents |
action | Execute and manage actions |
triage | Run AI triage |
playbook | Manage playbooks |
policy | Manage policy rules |
connector | Manage connectors |
user | User management |
api-key | API key management |
webhook | Webhook management |
config | Configuration management |
db | Database operations |
serve | Start API server |
Quick Examples
# List open incidents
tw-cli incident list --status open
# Create incident
tw-cli incident create --type phishing --severity high
# Run triage
tw-cli triage run --incident INC-2024-001
# Execute action
tw-cli action execute --incident INC-2024-001 --action quarantine_email
# Approve pending action
tw-cli action approve act-abc123
# Start server
tw-cli serve --port 8080
Next Steps
- Commands - Detailed command reference
CLI Commands
Detailed reference for all CLI commands.
incident
Manage security incidents.
list
tw-cli incident list [OPTIONS]
Options:
--status <STATUS> Filter by status (open, triaged, resolved)
--severity <SEVERITY> Filter by severity
--type <TYPE> Filter by incident type
--limit <N> Maximum results (default: 20)
--offset <N> Skip first N results
--sort <FIELD> Sort field (created_at, severity)
--desc Sort descending
get
tw-cli incident get <ID> [OPTIONS]
Options:
--format <FORMAT> Output format (table, json, yaml)
--include-actions Include action history
--include-enrichment Include enrichment data
create
tw-cli incident create [OPTIONS]
Options:
--type <TYPE> Incident type (required)
--source <SOURCE> Incident source (required)
--severity <SEVERITY> Initial severity (default: medium)
--data <JSON> Raw incident data as JSON
--file <FILE> Read data from file
--auto-triage Run triage after creation
update
tw-cli incident update <ID> [OPTIONS]
Options:
--severity <SEVERITY> Update severity
--status <STATUS> Update status
--assignee <USER> Assign to user
resolve
tw-cli incident resolve <ID> [OPTIONS]
Options:
--resolution <TEXT> Resolution notes
--false-positive Mark as false positive
action
Execute and manage actions.
execute
tw-cli action execute [OPTIONS]
Options:
--incident <ID> Associated incident
--action <NAME> Action to execute (required)
--param <KEY=VALUE> Action parameter (repeatable)
--emergency Emergency override (manager only)
list
tw-cli action list [OPTIONS]
Options:
--incident <ID> Filter by incident
--status <STATUS> Filter by status
--pending Show only pending approval
get
tw-cli action get <ID>
approve
tw-cli action approve <ID> [OPTIONS]
Options:
--comment <TEXT> Approval comment
reject
tw-cli action reject <ID> [OPTIONS]
Options:
--reason <TEXT> Rejection reason (required)
rollback
tw-cli action rollback <ID> [OPTIONS]
Options:
--reason <TEXT> Rollback reason
triage
Run AI triage.
run
tw-cli triage run [OPTIONS]
Options:
--incident <ID> Incident to triage (required)
--playbook <NAME> Specific playbook
--model <MODEL> AI model override
--wait Wait for completion
status
tw-cli triage status <TRIAGE_ID>
playbook
Manage playbooks.
list
tw-cli playbook list [OPTIONS]
Options:
--enabled Only enabled playbooks
--trigger-type <TYPE> Filter by trigger type
get
tw-cli playbook get <ID>
add
tw-cli playbook add <FILE>
update
tw-cli playbook update <ID> <FILE>
delete
tw-cli playbook delete <ID>
run
tw-cli playbook run <ID> [OPTIONS]
Options:
--incident <ID> Incident to process
--var <KEY=VALUE> Override variable (repeatable)
--dry-run Don't execute actions
test
tw-cli playbook test <NAME> [OPTIONS]
Options:
--incident <ID> Use existing incident
--data <JSON> Use mock data
--dry-run Don't execute actions
validate
tw-cli playbook validate <FILE>
export
tw-cli playbook export <ID> [OPTIONS]
Options:
-o, --output <FILE> Output file (default: stdout)
policy
Manage policy rules.
list
tw-cli policy list
add
tw-cli policy add [OPTIONS]
Options:
--name <NAME> Rule name (required)
--action <ACTION> Action to match
--pattern <PATTERN> Action pattern (glob)
--severity <SEVERITY> Severity condition
--approval-level <L> Required approval level
--allow Auto-allow
--deny Deny with reason
--reason <TEXT> Denial reason
delete
tw-cli policy delete <NAME>
test
tw-cli policy test [OPTIONS]
Options:
--action <ACTION> Action to test
--severity <SEVERITY> Incident severity
--proposer-type <T> Proposer type
--confidence <N> AI confidence score
connector
Manage connectors.
status
tw-cli connector status
test
tw-cli connector test <NAME>
configure
tw-cli connector configure <NAME> [OPTIONS]
Options:
--mode <MODE> Connector mode
--api-key <KEY> API key
--url <URL> Service URL
user
User management.
list
tw-cli user list
create
tw-cli user create [OPTIONS]
Options:
--username <NAME> Username (required)
--email <EMAIL> Email address
--role <ROLE> User role
--service-account Create as service account
update
tw-cli user update <ID> [OPTIONS]
Options:
--role <ROLE> New role
--enabled Enable user
--disabled Disable user
delete
tw-cli user delete <ID>
api-key
API key management.
list
tw-cli api-key list
create
tw-cli api-key create [OPTIONS]
Options:
--name <NAME> Key name (required)
--scopes <SCOPES> Comma-separated scopes
--user <USER> Associated user
--expires <DATE> Expiration date
revoke
tw-cli api-key revoke <PREFIX>
rotate
tw-cli api-key rotate <PREFIX>
webhook
Webhook management.
list
tw-cli webhook list
add
tw-cli webhook add <SOURCE> [OPTIONS]
Options:
--secret <SECRET> Webhook secret
--auto-triage Enable auto-triage
--playbook <NAME> Playbook to run
test
tw-cli webhook test <SOURCE>
delete
tw-cli webhook delete <SOURCE>
db
Database operations.
migrate
tw-cli db migrate
backup
tw-cli db backup [OPTIONS]
Options:
-o, --output <FILE> Backup file path
restore
tw-cli db restore <FILE>
serve
Start the API server.
tw-cli serve [OPTIONS]
Options:
--host <HOST> Bind address (default: 0.0.0.0)
--port <PORT> Port number (default: 8080)
--config <FILE> Configuration file
Architecture Overview
Triage Warden is built as a modular, layered system combining Rust for performance-critical components and Python for AI capabilities.
System Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Clients │
│ (Web Browser, CLI, API Consumers) │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ API Layer (tw-api) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ REST API │ │ Web Handlers│ │ Webhooks │ │ Metrics │ │
│ │ (Axum) │ │(HTMX+Askama)│ │ │ │ (Prometheus)│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
┌──────────────────────────┼──────────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────────┐ ┌───────────────┐
│ Policy Engine │ │ Action Registry │ │ Event Bus │
│ (tw-policy) │ │ (tw-actions) │ │ (tw-core) │
└───────────────┘ └───────────────────┘ └───────────────┘
│ │ │
└──────────────────────────┼──────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Core Domain (tw-core) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Incidents │ │ Playbooks │ │ Users │ │ Audit │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Database Layer (SQLx) │
│ (SQLite for dev, PostgreSQL for prod) │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Python Bridge (tw-bridge) │
│ (PyO3 Bindings) │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ AI Layer (tw_ai) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │Triage Agent │ │ Tools │ │ Playbook │ │ Evaluation │ │
│ │ (Claude) │ │ │ │ Engine │ │ Framework │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Connector Layer (tw-connectors) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ VirusTotal │ │ Splunk │ │ CrowdStrike │ │ Jira │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Crate Structure
| Crate | Purpose |
|---|---|
tw-api | HTTP server, REST API, web handlers, webhooks |
tw-core | Domain models, database repositories, event bus |
tw-actions | Action handlers (quarantine, isolate, notify, etc.) |
tw-policy | Policy engine, approval rules, decision evaluation |
tw-connectors | External service integrations (VirusTotal, Splunk, etc.) |
tw-bridge | PyO3 bindings exposing Rust to Python |
tw-cli | Command-line interface |
tw-observability | Metrics, tracing, logging infrastructure |
Key Design Decisions
Rust + Python Hybrid
- Rust: Core platform, API server, policy engine, actions
- Python: AI agents, LLM integrations, playbook execution
- Bridge: PyO3 enables Python to call Rust connectors and actions
Trait-Based Connectors
All connectors implement traits for testability:
#![allow(unused)] fn main() { #[async_trait] pub trait ThreatIntelConnector: Send + Sync { async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport>; async fn lookup_url(&self, url: &str) -> ConnectorResult<ThreatReport>; async fn lookup_domain(&self, domain: &str) -> ConnectorResult<ThreatReport>; } }
Event-Driven Architecture
The event bus enables loose coupling:
#![allow(unused)] fn main() { event_bus.publish(Event::IncidentCreated { id, incident_type }); event_bus.publish(Event::ActionExecuted { action_id, result }); }
Policy-First Actions
All actions pass through the policy engine:
Request → Policy Evaluation → (Allowed | Denied | RequiresApproval) → Execute
Next Steps
- Components - Detailed component descriptions
- Data Flow - How data moves through the system
- Security Model - Authentication and authorization
Components
Detailed description of each major component in Triage Warden.
tw-api
The HTTP server and web interface.
REST API Routes
| Route | Description |
|---|---|
GET /api/incidents | List incidents with filtering |
POST /api/incidents | Create new incident |
GET /api/incidents/:id | Get incident details |
POST /api/incidents/:id/actions | Execute action on incident |
GET /api/playbooks | List playbooks |
POST /api/webhooks/:source | Receive webhook events |
Web Handlers
Server-rendered pages using HTMX and Askama templates:
- Dashboard with KPIs
- Incident list and detail views
- Approval workflow interface
- Playbook management
- Settings configuration
Authentication
- Session-based auth for web dashboard
- API key auth for programmatic access
- Role-based access control (admin, analyst, viewer)
tw-core
Core domain logic and data access.
Domain Models
#![allow(unused)] fn main() { pub struct Incident { pub id: Uuid, pub incident_type: IncidentType, pub severity: Severity, pub status: IncidentStatus, pub source: String, pub raw_data: serde_json::Value, pub verdict: Option<Verdict>, pub confidence: Option<f64>, pub created_at: DateTime<Utc>, } pub struct Action { pub id: Uuid, pub incident_id: Uuid, pub action_type: ActionType, pub status: ActionStatus, pub approval_level: Option<ApprovalLevel>, pub executed_by: Option<String>, } }
Repositories
Database access layer with SQLite and PostgreSQL support:
IncidentRepositoryActionRepositoryPlaybookRepositoryUserRepositoryAuditRepository
Event Bus
Async event distribution:
#![allow(unused)] fn main() { pub enum Event { IncidentCreated { id: Uuid }, IncidentUpdated { id: Uuid }, ActionRequested { id: Uuid }, ActionApproved { id: Uuid, approver: String }, ActionExecuted { id: Uuid, success: bool }, } }
tw-actions
Action handlers for incident response.
Email Actions
| Action | Description |
|---|---|
parse_email | Extract headers, body, attachments |
check_email_authentication | Validate SPF/DKIM/DMARC |
quarantine_email | Move to quarantine |
block_sender | Add to blocklist |
Lookup Actions
| Action | Description |
|---|---|
lookup_sender_reputation | Check sender against threat intel |
lookup_urls | Analyze URLs in content |
lookup_attachments | Hash and check attachments |
Host Actions
| Action | Description |
|---|---|
isolate_host | Network isolation via EDR |
scan_host | Trigger endpoint scan |
Notification Actions
| Action | Description |
|---|---|
notify_user | Send user notification |
notify_reporter | Update incident reporter |
escalate | Route to approval level |
create_ticket | Create Jira ticket |
tw-policy
Policy engine for action approval.
Rule Evaluation
#![allow(unused)] fn main() { pub struct PolicyRule { pub name: String, pub action_type: ActionType, pub conditions: Vec<Condition>, pub approval_level: ApprovalLevel, } pub enum PolicyDecision { Allowed, Denied { reason: String }, RequiresApproval { level: ApprovalLevel }, } }
Approval Levels
- Auto - No approval required
- Analyst - Any analyst can approve
- Senior - Senior analyst required
- Manager - SOC manager required
tw-connectors
External service integrations.
Connector Trait
#![allow(unused)] fn main() { #[async_trait] pub trait Connector: Send + Sync { fn name(&self) -> &str; fn connector_type(&self) -> &str; async fn health_check(&self) -> ConnectorResult<ConnectorHealth>; async fn test_connection(&self) -> ConnectorResult<bool>; } }
Available Connectors
| Type | Implementations |
|---|---|
| Threat Intel | VirusTotal, Mock |
| SIEM | Splunk, Mock |
| EDR | CrowdStrike, Mock |
| Email Gateway | Microsoft 365, Mock |
| Ticketing | Jira, Mock |
tw-bridge
PyO3 bindings for Python integration.
Exposed Classes
from tw_bridge import ThreatIntelBridge, SIEMBridge, EDRBridge
# Use connectors from Python
threat_intel = ThreatIntelBridge("virustotal")
result = threat_intel.lookup_hash("abc123...")
tw_ai (Python)
AI triage and playbook execution.
Triage Agent
Claude-powered agent for incident analysis:
agent = TriageAgent(model="claude-sonnet-4-20250514")
verdict = await agent.analyze(incident)
# Returns: Verdict(classification="malicious", confidence=0.92, ...)
Playbook Engine
YAML-based playbook execution:
name: phishing_triage
steps:
- action: parse_email
- action: check_email_authentication
- action: lookup_sender_reputation
- condition: sender_reputation < 0.3
action: quarantine_email
Data Flow
How data moves through Triage Warden from incident creation to resolution.
Incident Lifecycle
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Created │────▶│ Triaging │────▶│ Triaged │────▶│ Resolved │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
▼ ▼ ▼ ▼
Webhook/API AI Agent Actions Executed Closed
receives data analyzes (with approval)
Detailed Flow
1. Incident Creation
External Source (Email Gateway, SIEM, EDR)
│
▼
Webhook Endpoint
/api/webhooks/:source
│
▼
┌──────────────────┐
│ Parse & Validate │
│ Incoming Data │
└──────────────────┘
│
▼
┌──────────────────┐
│ Create Incident │
│ Record in DB │
└──────────────────┘
│
▼
┌──────────────────┐
│ Publish Event: │
│ IncidentCreated │
└──────────────────┘
2. AI Triage
┌──────────────────┐
│ Event: Incident │
│ Created │
└──────────────────┘
│
▼
┌──────────────────┐
│ Load Playbook │
│ (based on type) │
└──────────────────┘
│
▼
┌──────────────────┐
│ Execute Playbook │
│ Steps │
└──────────────────┘
│
┌───────────┴───────────┐
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Enrichment │ │ AI Analysis │
│ Actions │ │ (Claude) │
│ - parse_email │ │ │
│ - lookup_* │ │ Generates: │
└───────────────┘ │ - Verdict │
│ │ - Confidence │
│ │ - Reasoning │
│ │ - Actions │
└───────┬───────└───────────────┘
│
▼
┌──────────────────┐
│ Update Incident │
│ with Verdict │
└──────────────────┘
3. Action Execution
┌──────────────────┐
│ Action Request │
│ (from agent or │
│ human) │
└──────────────────┘
│
▼
┌──────────────────┐
│ Build Action │
│ Context │
└──────────────────┘
│
▼
┌──────────────────┐
│ Policy Engine │
│ Evaluation │
└──────────────────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Allowed │ │Denied │ │Requires│
│ │ │ │ │Approval│
└────────┘ └────────┘ └────────┘
│ │ │
▼ ▼ ▼
Execute Return Queue for
Action Error Approval
│ │
│ ▼
│ ┌──────────────┐
│ │ Notify │
│ │ Approvers │
│ └──────────────┘
│ │
│ ▼
│ ┌──────────────┐
│ │ Wait for │
│ │ Approval │
│ └──────────────┘
│ │
│ ┌──────────────┴──────────────┐
│ ▼ ▼
│ ┌────────┐ ┌────────┐
│ │Approved│ │Rejected│
│ └────────┘ └────────┘
│ │ │
│ ▼ ▼
│ Execute Action Update Status
│ │
└────────┴─────────┐
▼
┌──────────────┐
│ Connector │
│ Execution │
│ (External │
│ Service) │
└──────────────┘
│
▼
┌──────────────┐
│ Update │
│ Action │
│ Status │
└──────────────┘
│
▼
┌──────────────┐
│ Audit Log │
│ Entry │
└──────────────┘
Data Stores
Primary Database
| Table | Purpose |
|---|---|
incidents | Incident records |
actions | Action requests and results |
playbooks | Playbook definitions |
users | User accounts |
sessions | Active sessions |
api_keys | API credentials |
audit_logs | Action audit trail |
connectors | Connector configurations |
policies | Policy rules |
notifications | Notification history |
settings | System settings |
Event Bus (In-Memory)
Transient event distribution for real-time updates:
- Incident lifecycle events
- Action status changes
- Approval notifications
- System health events
External Data Flow
Inbound (Webhooks)
Email Gateway ──────┐
SIEM Alerts ────────┼──▶ Webhook Handler ──▶ Incident Creation
EDR Events ─────────┘
Outbound (Connectors)
┌──▶ VirusTotal (threat intel)
Action Execution ──────────┼──▶ Splunk (SIEM queries)
├──▶ CrowdStrike (host actions)
├──▶ M365 (email actions)
└──▶ Jira (ticketing)
Metrics Flow
Rust Components ──┬──▶ Prometheus Registry ──▶ /metrics endpoint
Python Components ─┘
Exposed metrics:
triage_warden_incidents_total{type, severity}triage_warden_actions_total{action, status}triage_warden_triage_duration_seconds{type}triage_warden_connector_requests_total{connector, status}
Security Model
Triage Warden implements defense-in-depth with multiple security layers.
Authentication
Web Dashboard
Session-based authentication with secure cookies:
- Session tokens: Random 256-bit tokens
- Cookie settings: HttpOnly, Secure, SameSite=Lax
- Session duration: 8 hours (configurable)
- CSRF protection: Per-request tokens on all state-changing forms
API Access
API key authentication for programmatic access:
curl -H "Authorization: Bearer tw_abc123_secretkey" \
https://api.example.com/api/incidents
API key features:
- Prefix stored in plain text for lookup (
tw_abc123) - Secret portion hashed with Argon2
- Scopes limit allowed operations
- Expiration dates supported
Authorization
Role-Based Access Control (RBAC)
| Role | Capabilities |
|---|---|
| Viewer | Read incidents, view dashboards |
| Analyst | Viewer + execute low-risk actions, approve analyst-level |
| Senior Analyst | Analyst + execute medium-risk actions, approve senior-level |
| Admin | Full access, user management, system configuration |
Policy-Based Action Control
The policy engine evaluates every action request:
#![allow(unused)] fn main() { // Policy evaluation flow ActionRequest → Build ActionContext (action_type, target, severity, proposer) → Evaluate policy rules → Return PolicyDecision - Allowed: Execute immediately - Denied: Return error with reason - RequiresApproval: Queue for specified approval level }
Example Policy Rules
# Low-risk actions auto-approve
[[policy.rules]]
name = "auto_approve_lookups"
action_patterns = ["lookup_*"]
decision = "allowed"
# High-severity host isolation requires manager
[[policy.rules]]
name = "isolate_requires_manager"
action = "isolate_host"
severity = ["high", "critical"]
approval_level = "manager"
# Block dangerous actions on production
[[policy.rules]]
name = "no_delete_in_prod"
action_patterns = ["delete_*"]
environment = "production"
decision = "denied"
reason = "Deletion not allowed in production"
Data Protection
At Rest
- Database encryption: SQLite with SQLCipher (optional), PostgreSQL with TDE
- Credential storage: All API keys/tokens hashed with Argon2id
- Secrets management: Environment variables or external secret stores
In Transit
- TLS 1.3: Required for all external connections
- Certificate validation: Strict validation for connectors
- Internal traffic: TLS optional for localhost development
Sensitive Data Handling
#![allow(unused)] fn main() { // Credentials redacted in logs impl std::fmt::Debug for ApiKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "ApiKey {{ prefix: {}, secret: [REDACTED] }}", self.prefix) } } }
Audit Trail
All security-relevant actions logged:
| Event | Data Captured |
|---|---|
| Login | user_id, ip_address, success, timestamp |
| Logout | user_id, session_duration |
| Action executed | action_id, user_id, incident_id, result |
| Action approved | action_id, approver_id, decision |
| Policy change | user_id, old_value, new_value |
| User management | admin_id, target_user, operation |
Audit log retention: 90 days (configurable)
Connector Security
Credential Management
Connector credentials stored encrypted:
# Environment variables (recommended)
TW_VIRUSTOTAL_API_KEY=your-key
# Or encrypted in database
tw-cli connector set virustotal --api-key "$(read -s)"
Rate Limiting
Built-in rate limiting prevents API abuse:
| Connector | Default Limit |
|---|---|
| VirusTotal | 4 req/min (free tier) |
| Splunk | 100 req/min |
| CrowdStrike | 50 req/min |
Circuit Breaker
Automatic failure handling:
#![allow(unused)] fn main() { // After 5 consecutive failures, circuit opens // Requests fail fast for 30 seconds // Then half-open state allows test requests }
Input Validation
API Requests
- JSON schema validation on all endpoints
- Size limits on request bodies (1MB default)
- Type coercion disabled (strict typing)
Webhook Payloads
- HMAC signature verification
- Replay attack prevention (timestamp validation)
- Payload size limits
#![allow(unused)] fn main() { // Webhook signature verification fn verify_webhook(payload: &[u8], signature: &str, secret: &str) -> bool { let expected = hmac_sha256(secret, payload); constant_time_compare(signature, &expected) } }
Secure Defaults
- HTTPS enforced in production
- Secure cookie flags enabled
- CORS restricted to configured origins
- Debug endpoints disabled in production
- Verbose errors only in development
Security Headers
Default response headers:
Strict-Transport-Security: max-age=31536000; includeSubDomains
X-Content-Type-Options: nosniff
X-Frame-Options: DENY
X-XSS-Protection: 1; mode=block
Content-Security-Policy: default-src 'self'
Vulnerability Disclosure
Report security vulnerabilities to: security@example.com
We follow responsible disclosure practices and aim to respond within 48 hours.
Connectors
Connectors integrate Triage Warden with external security tools and services.
Overview
Each connector type has a trait interface and multiple implementations:
| Type | Purpose | Implementations |
|---|---|---|
| Threat Intelligence | Hash/URL/domain reputation | VirusTotal, Mock |
| SIEM | Log queries and correlation | Splunk, Mock |
| EDR | Endpoint detection and response | CrowdStrike, Mock |
| Email Gateway | Email security operations | Microsoft 365, Mock |
| Ticketing | Incident ticket management | Jira, Mock |
Configuration
Select connector implementations via environment variables:
# Use real connectors
TW_THREAT_INTEL_MODE=virustotal
TW_SIEM_MODE=splunk
TW_EDR_MODE=crowdstrike
TW_EMAIL_GATEWAY_MODE=m365
TW_TICKETING_MODE=jira
# Or use mocks for testing
TW_THREAT_INTEL_MODE=mock
TW_SIEM_MODE=mock
Connector Trait
All connectors implement the base Connector trait:
#![allow(unused)] fn main() { #[async_trait] pub trait Connector: Send + Sync { /// Unique identifier for this connector instance fn name(&self) -> &str; /// Type of connector (threat_intel, siem, edr, etc.) fn connector_type(&self) -> &str; /// Check connector health async fn health_check(&self) -> ConnectorResult<ConnectorHealth>; /// Test connection to the service async fn test_connection(&self) -> ConnectorResult<bool>; } pub enum ConnectorHealth { Healthy, Degraded { message: String }, Unhealthy { message: String }, } }
Error Handling
Connectors return ConnectorResult<T> with detailed error types:
#![allow(unused)] fn main() { pub enum ConnectorError { /// Service returned an error RequestFailed(String), /// Resource not found NotFound(String), /// Authentication failed AuthenticationFailed(String), /// Rate limit exceeded RateLimited { retry_after: Option<Duration> }, /// Network or connection error NetworkError(String), /// Invalid response from service InvalidResponse(String), } }
Health Monitoring
Check connector health via the API:
curl http://localhost:8080/api/connectors/health
{
"connectors": [
{ "name": "virustotal", "type": "threat_intel", "status": "healthy" },
{ "name": "splunk", "type": "siem", "status": "healthy" },
{ "name": "crowdstrike", "type": "edr", "status": "degraded", "message": "High latency" }
]
}
Next Steps
- Threat Intelligence - VirusTotal configuration
- SIEM - Splunk configuration
- EDR - CrowdStrike configuration
- Email Gateway - Microsoft 365 configuration
- Ticketing - Jira configuration
Threat Intelligence Connector
Query threat intelligence services for reputation data on hashes, URLs, domains, and IP addresses.
Interface
#![allow(unused)] fn main() { #[async_trait] pub trait ThreatIntelConnector: Connector { /// Look up file hash reputation async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport>; /// Look up URL reputation async fn lookup_url(&self, url: &str) -> ConnectorResult<ThreatReport>; /// Look up domain reputation async fn lookup_domain(&self, domain: &str) -> ConnectorResult<ThreatReport>; /// Look up IP address reputation async fn lookup_ip(&self, ip: &str) -> ConnectorResult<ThreatReport>; } pub struct ThreatReport { pub indicator: String, pub indicator_type: IndicatorType, pub malicious: bool, pub confidence: f64, pub categories: Vec<String>, pub first_seen: Option<DateTime<Utc>>, pub last_seen: Option<DateTime<Utc>>, pub sources: Vec<ThreatSource>, } }
VirusTotal
Configuration
TW_THREAT_INTEL_MODE=virustotal
TW_VIRUSTOTAL_API_KEY=your-api-key-here
Rate Limits
| Tier | Requests/Minute |
|---|---|
| Free | 4 |
| Premium | 500+ |
The connector automatically handles rate limiting with exponential backoff.
Supported Lookups
| Method | VT Endpoint | Notes |
|---|---|---|
lookup_hash | /files/{hash} | MD5, SHA1, SHA256 |
lookup_url | /urls/{url_id} | Base64-encoded URL |
lookup_domain | /domains/{domain} | Domain reputation |
lookup_ip | /ip_addresses/{ip} | IP reputation |
Example Usage
#![allow(unused)] fn main() { let connector = VirusTotalConnector::new(api_key)?; let report = connector.lookup_hash("44d88612fea8a8f36de82e1278abb02f").await?; println!("Malicious: {}", report.malicious); println!("Confidence: {:.2}", report.confidence); println!("Categories: {:?}", report.categories); }
Response Mapping
VirusTotal detection ratios map to confidence scores:
| Detection Ratio | Confidence | Classification |
|---|---|---|
| 0% | 0.0 | Clean |
| 1-10% | 0.3 | Suspicious |
| 11-50% | 0.6 | Likely Malicious |
| 51-100% | 0.9 | Malicious |
Mock Connector
For testing without external API calls:
TW_THREAT_INTEL_MODE=mock
The mock connector returns predictable results based on indicator patterns:
| Pattern | Result |
|---|---|
| Contains "malicious" | Malicious, confidence 0.95 |
| Contains "suspicious" | Suspicious, confidence 0.5 |
| Contains "clean" | Clean, confidence 0.1 |
| Default | Clean, confidence 0.2 |
Python Bridge
Access from Python via the bridge:
from tw_bridge import ThreatIntelBridge
# Create bridge (uses TW_THREAT_INTEL_MODE env var)
bridge = ThreatIntelBridge()
# Or specify mode explicitly
bridge = ThreatIntelBridge("virustotal")
# Lookup hash
result = bridge.lookup_hash("44d88612fea8a8f36de82e1278abb02f")
print(f"Malicious: {result['malicious']}")
print(f"Confidence: {result['confidence']}")
# Lookup URL
result = bridge.lookup_url("https://example.com/suspicious")
# Lookup domain
result = bridge.lookup_domain("malware-site.com")
Caching
Results are cached to reduce API calls:
| Lookup Type | Cache Duration |
|---|---|
| Hash | 24 hours |
| URL | 1 hour |
| Domain | 6 hours |
| IP | 6 hours |
Cache is stored in the database and shared across instances.
Adding Custom Providers
Implement the ThreatIntelConnector trait:
#![allow(unused)] fn main() { pub struct CustomThreatIntelConnector { client: reqwest::Client, api_key: String, } #[async_trait] impl Connector for CustomThreatIntelConnector { fn name(&self) -> &str { "custom" } fn connector_type(&self) -> &str { "threat_intel" } // ... implement health_check, test_connection } #[async_trait] impl ThreatIntelConnector for CustomThreatIntelConnector { async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport> { // Custom implementation } // ... implement other methods } }
See Adding Connectors for full details.
SIEM Connector
Query SIEM platforms for log data, run searches, and correlate events.
Interface
#![allow(unused)] fn main() { #[async_trait] pub trait SIEMConnector: Connector { /// Run a search query async fn search(&self, query: &str, time_range: TimeRange) -> ConnectorResult<SearchResults>; /// Get events by ID async fn get_events(&self, event_ids: &[String]) -> ConnectorResult<Vec<SIEMEvent>>; /// Get related events (correlation) async fn get_related_events( &self, indicator: &str, indicator_type: IndicatorType, time_range: TimeRange, ) -> ConnectorResult<Vec<SIEMEvent>>; } pub struct SIEMEvent { pub id: String, pub timestamp: DateTime<Utc>, pub source: String, pub event_type: String, pub severity: String, pub raw_data: serde_json::Value, } pub struct SearchResults { pub events: Vec<SIEMEvent>, pub total_count: u64, pub search_id: String, } }
Splunk
Configuration
TW_SIEM_MODE=splunk
TW_SPLUNK_URL=https://splunk.company.com:8089
TW_SPLUNK_TOKEN=your-token-here
Token Permissions
The Splunk token requires these capabilities:
search- Run searcheslist_inputs- Health checkrest_access- REST API access
Example Searches
#![allow(unused)] fn main() { let connector = SplunkConnector::new(url, token)?; // Search for events let results = connector.search( r#"index=security sourcetype=firewall action=blocked"#, TimeRange::last_hours(24), ).await?; // Find related events by IP let related = connector.get_related_events( "192.168.1.100", IndicatorType::IpAddress, TimeRange::last_hours(1), ).await?; }
Search Query Translation
Common queries translated to SPL:
| Triage Warden Query | Splunk SPL |
|---|---|
| IP correlation | index=* src_ip="{ip}" OR dest_ip="{ip}" |
| User activity | index=* user="{user}" |
| Hash lookup | index=* (file_hash="{hash}" OR sha256="{hash}") |
Performance Tips
- Use specific indexes in queries
- Limit time ranges when possible
- Use
| head 1000to limit results
Mock Connector
For testing:
TW_SIEM_MODE=mock
The mock returns sample security events matching the query pattern.
Python Bridge
from tw_bridge import SIEMBridge
bridge = SIEMBridge("splunk")
# Run a search
results = bridge.search(
query='index=security action=blocked',
hours=24
)
for event in results['events']:
print(f"{event['timestamp']}: {event['source']}")
# Get related events
related = bridge.get_related_events(
indicator="192.168.1.100",
indicator_type="ip",
hours=1
)
Adding Custom SIEM
Implement the SIEMConnector trait:
#![allow(unused)] fn main() { pub struct ElasticSIEMConnector { client: elasticsearch::Elasticsearch, } #[async_trait] impl SIEMConnector for ElasticSIEMConnector { async fn search(&self, query: &str, time_range: TimeRange) -> ConnectorResult<SearchResults> { // Translate to Elasticsearch DSL and execute } // ... implement other methods } }
EDR Connector
Integrate with Endpoint Detection and Response platforms for host information and response actions.
Interface
#![allow(unused)] fn main() { #[async_trait] pub trait EDRConnector: Connector { /// Get host information async fn get_host(&self, host_id: &str) -> ConnectorResult<HostInfo>; /// Search for hosts async fn search_hosts(&self, query: &str) -> ConnectorResult<Vec<HostInfo>>; /// Get recent detections for a host async fn get_detections(&self, host_id: &str) -> ConnectorResult<Vec<Detection>>; /// Isolate a host from the network async fn isolate_host(&self, host_id: &str) -> ConnectorResult<ActionResult>; /// Remove host isolation async fn unisolate_host(&self, host_id: &str) -> ConnectorResult<ActionResult>; /// Trigger a scan on the host async fn scan_host(&self, host_id: &str) -> ConnectorResult<ActionResult>; } pub struct HostInfo { pub id: String, pub hostname: String, pub platform: String, pub os_version: String, pub agent_version: String, pub last_seen: DateTime<Utc>, pub isolation_status: IsolationStatus, pub tags: Vec<String>, } pub struct Detection { pub id: String, pub timestamp: DateTime<Utc>, pub severity: String, pub tactic: String, pub technique: String, pub description: String, pub process_name: Option<String>, pub file_path: Option<String>, } }
CrowdStrike
Configuration
TW_EDR_MODE=crowdstrike
TW_CROWDSTRIKE_CLIENT_ID=your-client-id
TW_CROWDSTRIKE_CLIENT_SECRET=your-client-secret
TW_CROWDSTRIKE_REGION=us-1 # us-1, us-2, eu-1, usgov-1
API Scopes Required
The API client requires these scopes:
Hosts: Read- Get host informationHosts: Write- Isolation actionsDetections: Read- Get detectionsReal Time Response: Write- Scan actions
OAuth2 Token Management
The connector automatically handles token refresh:
#![allow(unused)] fn main() { // Token refreshed automatically when expired let connector = CrowdStrikeConnector::new(client_id, client_secret, region)?; // All subsequent calls use valid token let host = connector.get_host("abc123").await?; }
Example Usage
#![allow(unused)] fn main() { // Get host information let host = connector.get_host("aid:abc123").await?; println!("Hostname: {}", host.hostname); println!("Last seen: {}", host.last_seen); // Check for detections let detections = connector.get_detections("aid:abc123").await?; for d in detections { println!("{}: {} - {}", d.timestamp, d.severity, d.description); } // Isolate compromised host let result = connector.isolate_host("aid:abc123").await?; if result.success { println!("Host isolated successfully"); } }
Action Confirmation
Isolation and scan actions require policy approval. See Policy Engine.
Mock Connector
TW_EDR_MODE=mock
The mock provides sample hosts and detections for testing.
Python Bridge
from tw_bridge import EDRBridge
bridge = EDRBridge("crowdstrike")
# Get host info
host = bridge.get_host("aid:abc123")
print(f"Hostname: {host['hostname']}")
print(f"Platform: {host['platform']}")
# Get detections
detections = bridge.get_detections("aid:abc123")
for d in detections:
print(f"{d['severity']}: {d['description']}")
# Isolate host (requires policy approval)
result = bridge.isolate_host("aid:abc123")
if result['success']:
print("Host isolated")
Response Actions
| Action | Description | Rollback |
|---|---|---|
isolate_host | Network isolation | unisolate_host |
scan_host | On-demand scan | N/A |
Isolation Behavior
When isolated:
- Host cannot communicate on network
- Falcon agent maintains connection to cloud
- User may see isolation notification
Rate Limits
| Endpoint | Limit |
|---|---|
| Host queries | 100/min |
| Detection queries | 50/min |
| Containment actions | 10/min |
Email Gateway Connector
Manage email security operations including search, quarantine, and sender blocking.
Interface
#![allow(unused)] fn main() { #[async_trait] pub trait EmailGatewayConnector: Connector { /// Search for emails async fn search_emails(&self, query: EmailSearchQuery) -> ConnectorResult<Vec<EmailMessage>>; /// Get specific email by ID async fn get_email(&self, message_id: &str) -> ConnectorResult<EmailMessage>; /// Move email to quarantine async fn quarantine_email(&self, message_id: &str) -> ConnectorResult<ActionResult>; /// Release email from quarantine async fn release_email(&self, message_id: &str) -> ConnectorResult<ActionResult>; /// Block sender async fn block_sender(&self, sender: &str) -> ConnectorResult<ActionResult>; /// Unblock sender async fn unblock_sender(&self, sender: &str) -> ConnectorResult<ActionResult>; /// Get threat data for email async fn get_threat_data(&self, message_id: &str) -> ConnectorResult<EmailThreatData>; } pub struct EmailMessage { pub id: String, pub internet_message_id: String, pub sender: String, pub recipients: Vec<String>, pub subject: String, pub received_at: DateTime<Utc>, pub has_attachments: bool, pub attachments: Vec<EmailAttachment>, pub urls: Vec<String>, pub headers: HashMap<String, String>, pub threat_assessment: Option<ThreatAssessment>, } pub struct EmailSearchQuery { pub sender: Option<String>, pub recipient: Option<String>, pub subject_contains: Option<String>, pub timerange: TimeRange, pub has_attachments: Option<bool>, pub threat_type: Option<String>, pub limit: usize, } }
Microsoft 365
Configuration
TW_EMAIL_GATEWAY_MODE=m365
TW_M365_TENANT_ID=your-tenant-id
TW_M365_CLIENT_ID=your-client-id
TW_M365_CLIENT_SECRET=your-client-secret
App Registration
Create an Azure AD app registration with these API permissions:
| Permission | Type | Purpose |
|---|---|---|
Mail.Read | Application | Read emails |
Mail.ReadWrite | Application | Quarantine actions |
ThreatAssessment.Read.All | Application | Threat data |
Policy.Read.All | Application | Block list management |
Example Usage
#![allow(unused)] fn main() { let connector = M365Connector::new(tenant_id, client_id, client_secret)?; // Search for suspicious emails let query = EmailSearchQuery { sender: Some("suspicious@domain.com".to_string()), timerange: TimeRange::last_hours(24), ..Default::default() }; let emails = connector.search_emails(query).await?; // Quarantine malicious email let result = connector.quarantine_email("AAMkAGI2...").await?; // Block sender let result = connector.block_sender("phisher@malicious.com").await?; }
Quarantine Behavior
When quarantined:
- Email moved to quarantine folder
- User notified (configurable)
- Admin can release if false positive
Mock Connector
TW_EMAIL_GATEWAY_MODE=mock
Provides sample emails with various threat characteristics:
- Phishing with malicious URLs
- Malware with executable attachments
- BEC/impersonation attempts
- Clean legitimate emails
Python Bridge
from tw_bridge import EmailGatewayBridge
bridge = EmailGatewayBridge("m365")
# Search emails
emails = bridge.search_emails(
sender="suspicious@domain.com",
hours=24
)
for email in emails:
print(f"From: {email['sender']}")
print(f"Subject: {email['subject']}")
print(f"Attachments: {len(email['attachments'])}")
# Quarantine email
result = bridge.quarantine_email("AAMkAGI2...")
if result['success']:
print("Email quarantined")
# Block sender
result = bridge.block_sender("phisher@malicious.com")
Response Actions
| Action | Description | Rollback |
|---|---|---|
quarantine_email | Move to quarantine | release_email |
block_sender | Add to blocklist | unblock_sender |
Threat Data
Get detailed threat information:
#![allow(unused)] fn main() { let threat_data = connector.get_threat_data("AAMkAGI2...").await?; println!("Delivery action: {}", threat_data.delivery_action); println!("Threat types: {:?}", threat_data.threat_types); println!("Detection methods: {:?}", threat_data.detection_methods); }
Fields:
delivery_action: Delivered, Quarantined, Blockedthreat_types: Phishing, Malware, Spam, BECdetection_methods: URLAnalysis, AttachmentScanning, ImpersonationDetectionurls_clicked: URLs clicked by recipient (if tracking enabled)
Ticketing Connector
Create and manage security incident tickets in external ticketing systems.
Interface
#![allow(unused)] fn main() { #[async_trait] pub trait TicketingConnector: Connector { /// Create a new ticket async fn create_ticket(&self, ticket: CreateTicketRequest) -> ConnectorResult<Ticket>; /// Get ticket by ID async fn get_ticket(&self, ticket_id: &str) -> ConnectorResult<Ticket>; /// Update ticket fields async fn update_ticket(&self, ticket_id: &str, update: UpdateTicketRequest) -> ConnectorResult<Ticket>; /// Add comment to ticket async fn add_comment(&self, ticket_id: &str, comment: &str) -> ConnectorResult<()>; /// Search tickets async fn search_tickets(&self, query: TicketSearchQuery) -> ConnectorResult<Vec<Ticket>>; } pub struct CreateTicketRequest { pub title: String, pub description: String, pub priority: TicketPriority, pub ticket_type: String, pub labels: Vec<String>, pub assignee: Option<String>, pub custom_fields: HashMap<String, String>, } pub struct Ticket { pub id: String, pub key: String, pub title: String, pub description: String, pub status: String, pub priority: TicketPriority, pub assignee: Option<String>, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, pub url: String, } }
Jira
Configuration
TW_TICKETING_MODE=jira
TW_JIRA_URL=https://company.atlassian.net
TW_JIRA_EMAIL=automation@company.com
TW_JIRA_API_TOKEN=your-api-token
TW_JIRA_PROJECT_KEY=SEC
API Token
Generate an API token at: https://id.atlassian.com/manage-profile/security/api-tokens
Required permissions:
- Create issues
- Edit issues
- Add comments
- Browse project
Example Usage
#![allow(unused)] fn main() { let connector = JiraConnector::new(url, email, token, project_key)?; // Create security ticket let request = CreateTicketRequest { title: "Phishing Incident - INC-2024-001".to_string(), description: "Phishing email detected and quarantined.\n\n## Details\n...".to_string(), priority: TicketPriority::High, ticket_type: "Security Incident".to_string(), labels: vec!["phishing".to_string(), "triage-warden".to_string()], assignee: Some("analyst@company.com".to_string()), custom_fields: HashMap::new(), }; let ticket = connector.create_ticket(request).await?; println!("Created: {} - {}", ticket.key, ticket.url); // Add investigation notes connector.add_comment( &ticket.id, "## Investigation Notes\n\n- Sender reputation: Malicious\n- URLs: 2 phishing links" ).await?; }
Issue Types
Configure the Jira project with these issue types:
| Issue Type | Usage |
|---|---|
| Security Incident | Main incident ticket |
| Investigation | Sub-task for investigation steps |
| Remediation | Sub-task for response actions |
Custom Fields
Map custom fields in configuration:
TW_JIRA_FIELD_SEVERITY=customfield_10001
TW_JIRA_FIELD_INCIDENT_ID=customfield_10002
TW_JIRA_FIELD_VERDICT=customfield_10003
Mock Connector
TW_TICKETING_MODE=mock
Simulates ticket operations with in-memory storage.
Python Bridge
from tw_bridge import TicketingBridge
bridge = TicketingBridge("jira")
# Create ticket
ticket = bridge.create_ticket(
title="Phishing Incident - INC-2024-001",
description="Phishing email detected...",
priority="high",
ticket_type="Security Incident",
labels=["phishing", "triage-warden"]
)
print(f"Created: {ticket['key']}")
print(f"URL: {ticket['url']}")
# Add comment
bridge.add_comment(
ticket_id=ticket['id'],
comment="Investigation complete. Verdict: Malicious"
)
# Update status
bridge.update_ticket(
ticket_id=ticket['id'],
status="Done"
)
# Search tickets
tickets = bridge.search_tickets(
query="project = SEC AND labels = phishing",
limit=10
)
Ticket Templates
Define templates for consistent ticket creation:
# config/ticket_templates.toml
[templates.phishing]
title = "Phishing: {subject}"
description = """
## Incident Summary
- **Type**: Phishing
- **Severity**: {severity}
- **Incident ID**: {incident_id}
## Details
{details}
## Recommended Actions
{recommended_actions}
"""
labels = ["phishing", "triage-warden"]
[templates.malware]
title = "Malware Alert: {hostname}"
description = """
## Incident Summary
- **Type**: Malware
- **Host**: {hostname}
- **Detection**: {detection}
## IOCs
{iocs}
"""
labels = ["malware", "triage-warden"]
Integration with Incidents
Tickets are automatically linked to incidents:
#![allow(unused)] fn main() { // Create ticket action stores the ticket key let action = execute_action("create_ticket", incident_id, params).await?; // Incident updated with ticket reference incident.metadata["ticket_key"] = "SEC-1234"; incident.metadata["ticket_url"] = "https://company.atlassian.net/browse/SEC-1234"; }
Actions
Actions are the executable operations that Triage Warden can perform in response to incidents.
Overview
Actions fall into several categories:
| Category | Purpose | Examples |
|---|---|---|
| Analysis | Extract and parse data | parse_email, check_email_authentication |
| Lookup | Enrich with external data | lookup_sender_reputation, lookup_urls |
| Response | Take containment actions | quarantine_email, isolate_host |
| Notification | Alert stakeholders | notify_user, escalate |
| Ticketing | Create/update tickets | create_ticket, add_ticket_comment |
Action Trait
All actions implement the Action trait:
#![allow(unused)] fn main() { #[async_trait] pub trait Action: Send + Sync { /// Action name (used in playbooks and API) fn name(&self) -> &str; /// Human-readable description fn description(&self) -> &str; /// Required and optional parameters fn required_parameters(&self) -> Vec<ParameterDef>; /// Whether this action supports rollback fn supports_rollback(&self) -> bool; /// Execute the action async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError>; /// Rollback the action (if supported) async fn rollback(&self, context: ActionContext) -> Result<ActionResult, ActionError> { Err(ActionError::RollbackNotSupported) } } }
Action Context
Actions receive an ActionContext with:
#![allow(unused)] fn main() { pub struct ActionContext { /// Unique execution ID pub execution_id: Uuid, /// Parameters passed to the action pub parameters: HashMap<String, serde_json::Value>, /// Related incident (if any) pub incident_id: Option<Uuid>, /// User or agent requesting the action pub proposer: String, /// Connectors available for use pub connectors: ConnectorRegistry, } }
Action Result
Actions return an ActionResult:
#![allow(unused)] fn main() { pub struct ActionResult { /// Whether the action succeeded pub success: bool, /// Action name pub action_name: String, /// Human-readable summary pub message: String, /// Execution duration pub duration: Duration, /// Output data (action-specific) pub output: HashMap<String, serde_json::Value>, /// Whether rollback is available pub rollback_available: bool, } }
Policy Integration
All actions pass through the policy engine before execution:
Action Request → Policy Evaluation → Decision
├─ Allowed → Execute
├─ Denied → Return Error
└─ RequiresApproval → Queue
See Policy Engine for approval configuration.
Executing Actions
Via API
curl -X POST http://localhost:8080/api/incidents/{id}/actions \
-H "Content-Type: application/json" \
-d '{
"action": "quarantine_email",
"parameters": {
"message_id": "AAMkAGI2...",
"reason": "Phishing detected"
}
}'
Via CLI
tw-cli action execute \
--incident INC-2024-001 \
--action quarantine_email \
--param message_id=AAMkAGI2... \
--param reason="Phishing detected"
Via Playbook
steps:
- action: quarantine_email
parameters:
message_id: "{{ incident.raw_data.message_id }}"
reason: "Automated response to phishing"
Available Actions
- Email Actions - Email parsing and response
- Host Actions - Endpoint containment
- Lookup Actions - Threat intelligence enrichment
- Notification Actions - Alerts and escalation
Email Actions
Actions for analyzing and responding to email-based threats.
Analysis Actions
parse_email
Extract headers, body, attachments, and URLs from raw email.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
raw_email | string | Yes | Raw email content (RFC 822) |
Output:
{
"headers": {
"From": "sender@example.com",
"To": "recipient@company.com",
"Subject": "Important Document",
"Date": "2024-01-15T10:30:00Z",
"Message-ID": "<abc123@example.com>",
"X-Originating-IP": "[192.168.1.100]"
},
"sender": "sender@example.com",
"recipients": ["recipient@company.com"],
"subject": "Important Document",
"body_text": "Please review the attached document...",
"body_html": "<html>...",
"attachments": [
{
"filename": "document.pdf",
"content_type": "application/pdf",
"size": 102400,
"sha256": "abc123..."
}
],
"urls": [
"https://example.com/document",
"https://suspicious-site.com/login"
]
}
check_email_authentication
Validate SPF, DKIM, and DMARC authentication results.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
headers | object | Yes | Email headers (from parse_email) |
Output:
{
"spf": {
"result": "pass",
"domain": "example.com"
},
"dkim": {
"result": "pass",
"domain": "example.com",
"selector": "default"
},
"dmarc": {
"result": "pass",
"policy": "reject"
},
"authentication_passed": true,
"risk_indicators": []
}
Risk Indicators:
spf_fail- SPF validation faileddkim_fail- DKIM signature invaliddmarc_fail- DMARC policy violationheader_mismatch- From/Reply-To mismatchsuspicious_routing- Unusual mail routing
Response Actions
quarantine_email
Move email to quarantine via email gateway.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
message_id | string | Yes | Email message ID |
reason | string | No | Reason for quarantine |
Output:
{
"quarantine_id": "quar-abc123",
"message_id": "AAMkAGI2...",
"quarantined_at": "2024-01-15T10:35:00Z"
}
Rollback: release_email - Releases email from quarantine
block_sender
Add sender to organization blocklist.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
sender | string | Yes | Email address to block |
scope | string | No | Block scope: organization or user |
Output:
{
"block_id": "block-abc123",
"sender": "phisher@malicious.com",
"scope": "organization",
"blocked_at": "2024-01-15T10:35:00Z"
}
Rollback: unblock_sender - Removes sender from blocklist
Usage Examples
Phishing Response Playbook
name: phishing_response
steps:
- action: parse_email
output: parsed
- action: check_email_authentication
parameters:
headers: "{{ parsed.headers }}"
output: auth
- action: lookup_sender_reputation
parameters:
sender: "{{ parsed.sender }}"
output: reputation
- condition: "reputation.score < 0.3 or not auth.authentication_passed"
action: quarantine_email
parameters:
message_id: "{{ incident.raw_data.message_id }}"
reason: "Failed authentication and low sender reputation"
- condition: "reputation.score < 0.2"
action: block_sender
parameters:
sender: "{{ parsed.sender }}"
scope: organization
CLI Example
# Quarantine suspicious email
tw-cli action execute \
--action quarantine_email \
--param message_id="AAMkAGI2..." \
--param reason="Phishing indicators detected"
# Block malicious sender
tw-cli action execute \
--action block_sender \
--param sender="phisher@malicious.com" \
--param scope=organization
Host Actions
Actions for endpoint containment and investigation.
isolate_host
Network-isolate a compromised host via EDR.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
host_id | string | Yes | EDR host/agent ID |
reason | string | No | Reason for isolation |
Output:
{
"isolation_id": "iso-abc123",
"host_id": "aid:xyz789",
"hostname": "WORKSTATION-01",
"isolated_at": "2024-01-15T10:40:00Z",
"status": "isolated"
}
Behavior:
- Host network access blocked
- EDR agent maintains cloud connectivity
- User notified (configurable)
Rollback: unisolate_host
Policy: Typically requires senior analyst or manager approval.
unisolate_host
Remove network isolation from a host.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
host_id | string | Yes | EDR host/agent ID |
reason | string | No | Reason for removing isolation |
Output:
{
"host_id": "aid:xyz789",
"hostname": "WORKSTATION-01",
"unisolated_at": "2024-01-15T14:00:00Z",
"status": "active"
}
scan_host
Trigger on-demand malware scan on a host.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
host_id | string | Yes | EDR host/agent ID |
scan_type | string | No | quick or full (default: quick) |
Output:
{
"scan_id": "scan-abc123",
"host_id": "aid:xyz789",
"scan_type": "quick",
"started_at": "2024-01-15T10:45:00Z",
"status": "running"
}
Note: Scan results are retrieved separately as they may take time.
Usage Examples
Malware Response Playbook
name: malware_response
steps:
- action: isolate_host
parameters:
host_id: "{{ incident.raw_data.host_id }}"
reason: "Malware detection - automated isolation"
output: isolation
- action: scan_host
parameters:
host_id: "{{ incident.raw_data.host_id }}"
scan_type: full
- action: create_ticket
parameters:
title: "Malware Incident - {{ incident.raw_data.hostname }}"
priority: high
- action: notify_user
parameters:
user: "{{ incident.raw_data.user }}"
message: "Your workstation has been isolated due to a security incident"
CLI Example
# Isolate compromised host
tw-cli action execute \
--action isolate_host \
--param host_id="aid:xyz789" \
--param reason="Active malware infection"
# This action typically requires approval
# Check approval status:
tw-cli action status act-123456
# After investigation, remove isolation:
tw-cli action execute \
--action unisolate_host \
--param host_id="aid:xyz789" \
--param reason="Malware cleaned, host verified"
API Example
# Request host isolation
curl -X POST http://localhost:8080/api/incidents/INC-2024-001/actions \
-H "Content-Type: application/json" \
-d '{
"action": "isolate_host",
"parameters": {
"host_id": "aid:xyz789",
"reason": "Suspected compromise"
}
}'
# Response (if requires approval):
{
"action_id": "act-abc123",
"status": "pending_approval",
"approval_level": "manager",
"message": "Action requires SOC Manager approval"
}
Policy Configuration
Host actions are typically high-impact and require approval:
[[policy.rules]]
name = "isolate_requires_approval"
action = "isolate_host"
approval_level = "senior"
[[policy.rules]]
name = "critical_isolate_requires_manager"
action = "isolate_host"
severity = ["critical"]
approval_level = "manager"
Lookup Actions
Actions for enriching incidents with threat intelligence data.
lookup_sender_reputation
Query threat intelligence for sender domain and IP reputation.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
sender | string | Yes | Email address |
originating_ip | string | No | Sending server IP |
Output:
{
"sender": "suspicious@domain.com",
"domain": "domain.com",
"domain_reputation": {
"score": 0.25,
"categories": ["phishing", "newly-registered"],
"first_seen": "2024-01-10",
"registrar": "NameCheap"
},
"ip_reputation": {
"ip": "192.168.1.100",
"score": 0.3,
"categories": ["spam", "proxy"],
"country": "RU",
"asn": "AS12345"
},
"overall_score": 0.25,
"risk_level": "high"
}
Score Interpretation:
| Score | Risk Level |
|---|---|
| 0.0 - 0.3 | High risk |
| 0.3 - 0.6 | Medium risk |
| 0.6 - 0.8 | Low risk |
| 0.8 - 1.0 | Clean |
lookup_urls
Check URLs against threat intelligence.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
urls | array | Yes | List of URLs to check |
Output:
{
"results": [
{
"url": "https://legitimate-site.com/page",
"malicious": false,
"categories": ["business"],
"confidence": 0.95
},
{
"url": "https://phishing-site.com/login",
"malicious": true,
"categories": ["phishing", "credential-theft"],
"confidence": 0.92,
"threat_details": {
"targeted_brand": "Microsoft",
"first_seen": "2024-01-14"
}
}
],
"malicious_count": 1,
"total_count": 2
}
lookup_attachments
Hash attachments and check against threat intelligence.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
attachments | array | Yes | List of attachment objects with sha256 |
Output:
{
"results": [
{
"filename": "invoice.pdf",
"sha256": "abc123...",
"malicious": false,
"file_type": "PDF document",
"confidence": 0.9
},
{
"filename": "update.exe",
"sha256": "def456...",
"malicious": true,
"file_type": "Windows executable",
"confidence": 0.98,
"threat_details": {
"malware_family": "Emotet",
"first_seen": "2024-01-12",
"detection_engines": 45
}
}
],
"malicious_count": 1,
"total_count": 2
}
lookup_hash
Look up a single file hash.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
hash | string | Yes | MD5, SHA1, or SHA256 hash |
Output:
{
"hash": "abc123...",
"hash_type": "sha256",
"malicious": true,
"confidence": 0.95,
"malware_family": "Emotet",
"categories": ["trojan", "banking"],
"first_seen": "2024-01-12",
"last_seen": "2024-01-15",
"detection_ratio": "45/70"
}
lookup_ip
Query IP address reputation.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
ip | string | Yes | IP address |
Output:
{
"ip": "192.168.1.100",
"malicious": true,
"confidence": 0.8,
"categories": ["c2", "malware-distribution"],
"country": "RU",
"asn": "AS12345",
"asn_org": "Example ISP",
"last_seen": "2024-01-15",
"associated_malware": ["Cobalt Strike"]
}
Usage in Playbooks
name: email_triage
steps:
- action: parse_email
output: parsed
- action: lookup_sender_reputation
parameters:
sender: "{{ parsed.sender }}"
output: sender_rep
- action: lookup_urls
parameters:
urls: "{{ parsed.urls }}"
output: url_results
- action: lookup_attachments
parameters:
attachments: "{{ parsed.attachments }}"
output: attachment_results
# Make decision based on lookups
- condition: >
sender_rep.risk_level == 'high' or
url_results.malicious_count > 0 or
attachment_results.malicious_count > 0
set_verdict:
classification: malicious
confidence: 0.9
Caching
Lookup results are cached to reduce API calls:
| Lookup | Cache Duration |
|---|---|
| Hash | 24 hours |
| URL | 1 hour |
| Domain | 6 hours |
| IP | 6 hours |
Force fresh lookup with skip_cache: true parameter.
Notification Actions
Actions for alerting stakeholders and managing escalation.
notify_user
Send notification to an affected user.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
user | string | Yes | User email or ID |
message | string | Yes | Notification message |
channel | string | No | email, slack, teams (default: email) |
template | string | No | Notification template name |
Output:
{
"notification_id": "notif-abc123",
"recipient": "user@company.com",
"channel": "email",
"sent_at": "2024-01-15T10:50:00Z",
"status": "delivered"
}
Templates:
# templates/notifications.yaml
security_alert:
subject: "Security Alert: Action Required"
body: |
A security incident affecting your account has been detected.
Incident ID: {{ incident_id }}
Type: {{ incident_type }}
{{ message }}
If you did not initiate this activity, please contact IT Security.
notify_reporter
Send status update to the incident reporter.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
incident_id | string | Yes | Incident ID |
status | string | Yes | Status update message |
include_verdict | bool | No | Include AI verdict (default: false) |
Output:
{
"notification_id": "notif-def456",
"reporter": "reporter@company.com",
"status": "delivered"
}
escalate
Route incident to appropriate approval level.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
incident_id | string | Yes | Incident ID |
escalation_level | string | Yes | analyst, senior, manager |
reason | string | Yes | Reason for escalation |
override_assignee | string | No | Specific person to assign |
custom_sla_hours | int | No | Custom SLA (overrides default) |
notify_channels | array | No | Additional channels (slack, pagerduty) |
Output:
{
"escalation_id": "esc-abc123",
"incident_id": "INC-2024-001",
"escalation_level": "senior",
"assigned_to": "senior.analyst@company.com",
"due_date": "2024-01-15T12:50:00Z",
"priority": "high",
"sla_hours": 2
}
Default SLAs:
| Level | SLA |
|---|---|
| Analyst | 4 hours |
| Senior | 2 hours |
| Manager | 1 hour |
create_ticket
Create ticket in external ticketing system.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
title | string | Yes | Ticket title |
description | string | Yes | Ticket description |
priority | string | No | low, medium, high, critical |
assignee | string | No | Initial assignee |
labels | array | No | Ticket labels |
Output:
{
"ticket_id": "12345",
"ticket_key": "SEC-1234",
"url": "https://company.atlassian.net/browse/SEC-1234",
"created_at": "2024-01-15T10:55:00Z"
}
log_false_positive
Record a false positive for tuning.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
incident_id | string | Yes | Incident ID |
reason | string | Yes | Why this is a false positive |
feedback | string | No | Additional feedback for AI improvement |
Output:
{
"fp_id": "fp-abc123",
"incident_id": "INC-2024-001",
"recorded_at": "2024-01-15T11:00:00Z",
"used_for_training": true
}
run_triage_agent
Trigger AI triage agent on an incident.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
incident_id | string | Yes | Incident ID |
playbook | string | No | Specific playbook to use |
model | string | No | AI model override |
Output:
{
"triage_id": "triage-abc123",
"incident_id": "INC-2024-001",
"verdict": "malicious",
"confidence": 0.92,
"reasoning": "Multiple indicators of phishing...",
"recommended_actions": [
"quarantine_email",
"block_sender",
"notify_user"
],
"completed_at": "2024-01-15T10:52:00Z"
}
Usage Examples
Escalation Playbook
name: auto_escalate
trigger:
- verdict: malicious
- confidence: ">= 0.9"
- severity: critical
steps:
- action: escalate
parameters:
incident_id: "{{ incident.id }}"
escalation_level: manager
reason: "High-confidence critical incident requiring immediate attention"
notify_channels:
- slack
- pagerduty
- action: create_ticket
parameters:
title: "CRITICAL: {{ incident.subject }}"
priority: critical
CLI Examples
# Escalate to senior analyst
tw-cli action execute \
--incident INC-2024-001 \
--action escalate \
--param escalation_level=senior \
--param reason="Complex threat requiring expertise"
# Create ticket
tw-cli action execute \
--incident INC-2024-001 \
--action create_ticket \
--param title="Phishing Investigation" \
--param priority=high
# Record false positive
tw-cli action execute \
--incident INC-2024-001 \
--action log_false_positive \
--param reason="Legitimate vendor communication"
Policy Engine
The policy engine controls action approval workflows and enforces security boundaries.
Overview
Every action request passes through the policy engine:
Action Request → Build Context → Evaluate Rules → Decision
├─ Allowed → Execute
├─ Denied → Reject
└─ RequiresApproval → Queue
Policy Decision Types
| Decision | Behavior |
|---|---|
Allowed | Action executes immediately |
Denied | Action rejected with reason |
RequiresApproval | Queued for specified approval level |
Action Context
The policy engine evaluates these attributes:
#![allow(unused)] fn main() { pub struct ActionContext { /// The action being requested pub action_type: String, /// Target of the action (host, email, user, etc.) pub target: String, /// Incident severity (if associated) pub severity: Option<Severity>, /// AI confidence score (if from triage) pub confidence: Option<f64>, /// Who/what is requesting the action pub proposer: Proposer, /// Additional context pub metadata: HashMap<String, Value>, } pub enum Proposer { User { id: String, role: Role }, Agent { name: String }, Playbook { name: String }, System, } }
Default Policies
Without custom rules, these defaults apply:
| Action Category | Default Decision |
|---|---|
| Lookup actions | Allowed |
| Analysis actions | Allowed |
| Notification actions | Allowed |
| Response actions | RequiresApproval (analyst) |
| Host containment | RequiresApproval (senior) |
Next Steps
- Rules - Configure custom policy rules
- Approval Levels - Understanding approval workflow
Policy Rules
Define rules to control when actions require approval.
Rule Structure
[[policy.rules]]
name = "rule_name"
description = "Human-readable description"
# Matching criteria
action = "action_name" # Specific action
action_patterns = ["pattern_*"] # Glob patterns
# Conditions (all must match)
severity = ["high", "critical"] # Incident severity
confidence_min = 0.8 # Minimum AI confidence
proposer_type = "agent" # Who's requesting
proposer_role = "analyst" # Role (if user)
# Decision
decision = "allowed" # or "denied" or "requires_approval"
approval_level = "senior" # If requires_approval
reason = "Explanation" # If denied
Rule Examples
Auto-Approve Lookups
[[policy.rules]]
name = "auto_approve_lookups"
description = "Lookup actions are always allowed"
action_patterns = ["lookup_*"]
decision = "allowed"
Require Approval for Response Actions
[[policy.rules]]
name = "response_needs_analyst"
description = "Response actions require analyst approval"
action_patterns = ["quarantine_*", "block_*"]
decision = "requires_approval"
approval_level = "analyst"
High-Severity Host Isolation
[[policy.rules]]
name = "critical_isolation_needs_manager"
description = "Critical severity host isolation requires manager"
action = "isolate_host"
severity = ["critical"]
decision = "requires_approval"
approval_level = "manager"
Block Dangerous Actions in Production
[[policy.rules]]
name = "no_delete_production"
description = "Deletion actions not allowed in production"
action_patterns = ["delete_*"]
environment = "production"
decision = "denied"
reason = "Deletion actions are not permitted in production"
Trust High-Confidence AI Decisions
[[policy.rules]]
name = "trust_high_confidence_ai"
description = "Auto-approve when AI is highly confident"
proposer_type = "agent"
confidence_min = 0.95
severity = ["low", "medium"]
action_patterns = ["quarantine_email", "block_sender"]
decision = "allowed"
Analyst Self-Service
[[policy.rules]]
name = "analyst_can_notify"
description = "Analysts can send notifications without approval"
action_patterns = ["notify_*"]
proposer_role = "analyst"
decision = "allowed"
Rule Evaluation Order
Rules are evaluated in order. First matching rule wins.
# More specific rules first
[[policy.rules]]
name = "critical_isolation"
action = "isolate_host"
severity = ["critical"]
approval_level = "manager"
# General fallback
[[policy.rules]]
name = "default_isolation"
action = "isolate_host"
approval_level = "senior"
Condition Operators
Severity Matching
severity = ["high", "critical"] # Match any in list
Confidence Ranges
confidence_min = 0.8 # Minimum confidence
confidence_max = 0.95 # Maximum confidence
Pattern Matching
action_patterns = ["lookup_*"] # Prefix match
action_patterns = ["*_email"] # Suffix match
action_patterns = ["*block*"] # Contains
Proposer Conditions
proposer_type = "user" # user, agent, playbook, system
proposer_role = "analyst" # Only for user proposers
Managing Rules
Via Configuration File
# config/policy.toml
tw-api --config config/policy.toml
Via API
# List rules
curl http://localhost:8080/api/policies
# Create rule
curl -X POST http://localhost:8080/api/policies \
-H "Content-Type: application/json" \
-d '{
"name": "new_rule",
"action": "isolate_host",
"approval_level": "senior"
}'
Via CLI
# List rules
tw-cli policy list
# Add rule
tw-cli policy add \
--name "block_needs_approval" \
--action "block_sender" \
--approval-level analyst
Testing Rules
Simulate policy evaluation without executing:
tw-cli policy test \
--action isolate_host \
--severity critical \
--proposer-type agent \
--confidence 0.92
# Output:
# Decision: RequiresApproval
# Level: manager
# Matched Rule: critical_isolation_needs_manager
Approval Levels
Understanding the approval workflow in Triage Warden.
Approval Hierarchy
Manager (SOC Manager)
│
▼
Senior (Senior Analyst)
│
▼
Analyst (Security Analyst)
│
▼
Auto (No approval needed)
Higher levels can approve actions at their level or below.
Level Definitions
| Level | Role | Typical Actions |
|---|---|---|
| Auto | System | Lookups, analysis, low-risk notifications |
| Analyst | Security Analyst | Email quarantine, sender blocking |
| Senior | Senior Analyst | Host isolation, broad blocks |
| Manager | SOC Manager | Critical containment, policy changes |
Approval Workflow
1. Action Requested
tw-cli action execute --incident INC-001 --action isolate_host
2. Policy Evaluation
Policy engine evaluates and returns:
{
"decision": "requires_approval",
"approval_level": "senior",
"reason": "Host isolation requires senior analyst approval"
}
3. Action Queued
Action stored with pending status:
{
"action_id": "act-abc123",
"incident_id": "INC-001",
"action_type": "isolate_host",
"status": "pending_approval",
"approval_level": "senior",
"requested_by": "analyst@company.com",
"requested_at": "2024-01-15T10:30:00Z"
}
4. Approvers Notified
Notification sent to eligible approvers via configured channels.
5. Approval Decision
Approver reviews and decides:
Approve:
tw-cli action approve act-abc123 --comment "Verified threat"
Reject:
tw-cli action reject act-abc123 --reason "False positive, user traveling"
6. Execution or Rejection
- Approved: Action executes automatically
- Rejected: Action marked rejected, requester notified
Approval UI
Access pending approvals at /approvals in the web dashboard.
Features:
- Filterable list of pending actions
- Incident context display
- One-click approve/reject
- Bulk approval for related actions
SLA Tracking
Each approval level has a default SLA:
| Level | Default SLA |
|---|---|
| Analyst | 4 hours |
| Senior | 2 hours |
| Manager | 1 hour |
Overdue approvals are:
- Highlighted in dashboard
- Re-notified to approvers
- Optionally escalated to next level
Delegation
Approvers can delegate when unavailable:
tw-cli approval delegate \
--from senior.analyst@company.com \
--to backup.analyst@company.com \
--until 2024-01-20
Approval Groups
Configure approval groups for redundancy:
[approval_groups]
senior_analysts = [
"alice@company.com",
"bob@company.com",
"charlie@company.com"
]
managers = [
"soc.manager@company.com",
"backup.manager@company.com"
]
Any member of the group can approve.
Audit Trail
All approval decisions are logged:
{
"event": "action_approved",
"action_id": "act-abc123",
"approver": "senior.analyst@company.com",
"decision": "approved",
"comment": "Verified threat indicators",
"timestamp": "2024-01-15T10:45:00Z",
"time_to_approve": "15m"
}
Emergency Override
In emergencies, managers can bypass approval:
tw-cli action execute \
--incident INC-001 \
--action isolate_host \
--emergency \
--reason "Active ransomware, immediate containment required"
Emergency overrides are:
- Logged with high visibility
- Require manager credentials
- Trigger additional notifications
AI Triage
Automated incident analysis using Claude AI agents.
Overview
The triage agent analyzes security incidents to:
- Classify - Determine if the incident is malicious, suspicious, or benign
- Assess confidence - Quantify certainty in the classification
- Explain - Provide reasoning for the verdict
- Recommend - Suggest response actions
How It Works
Incident → Playbook Selection → Tool Execution → AI Analysis → Verdict
- Incident received - New incident created via webhook or API
- Playbook selected - Based on incident type (phishing, malware, etc.)
- Tools executed - Parse data, lookup reputation, check authentication
- AI analysis - Claude analyzes gathered data
- Verdict returned - Classification with confidence and recommendations
Example Verdict
{
"incident_id": "INC-2024-001",
"classification": "malicious",
"confidence": 0.92,
"category": "phishing",
"reasoning": "Multiple indicators suggest this is a credential phishing attempt:\n1. Sender domain registered 2 days ago\n2. SPF and DKIM authentication failed\n3. URL leads to a fake Microsoft login page\n4. Subject uses urgency tactics",
"recommended_actions": [
{
"action": "quarantine_email",
"priority": 1,
"reason": "Prevent user access to phishing content"
},
{
"action": "block_sender",
"priority": 2,
"reason": "Sender has no legitimate history"
},
{
"action": "notify_user",
"priority": 3,
"reason": "Educate user about phishing attempt"
}
],
"iocs": [
{"type": "domain", "value": "phishing-site.com"},
{"type": "ip", "value": "192.168.1.100"}
],
"mitre_attack": ["T1566.001", "T1078"]
}
Triggering Triage
Automatic (Webhook)
Configure webhooks to auto-triage new incidents:
webhooks:
email_gateway:
auto_triage: true
playbook: phishing_triage
Manual (CLI)
tw-cli triage run --incident INC-2024-001
Manual (API)
curl -X POST http://localhost:8080/api/incidents/INC-2024-001/triage
Next Steps
- Triage Agent - Agent architecture and configuration
- Verdict Types - Understanding classifications
- Confidence Scoring - How confidence is calculated
Triage Agent
The AI agent that analyzes security incidents.
Architecture
┌─────────────────────────────────────────────────────────┐
│ Triage Agent │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Claude │ │ Tools │ │ Playbook │ │
│ │ Model │ │ (Bridge) │ │ Engine │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Python Bridge │
│ (ThreatIntelBridge, SIEMBridge, etc.) │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Rust Connectors │
│ (VirusTotal, Splunk, CrowdStrike, etc.) │
└─────────────────────────────────────────────────────────┘
Agent Configuration
# python/tw_ai/agents/config.py
class AgentConfig:
model: str = "claude-sonnet-4-20250514"
max_tokens: int = 4096
temperature: float = 0.1
max_tool_calls: int = 10
timeout_seconds: int = 120
Environment variables:
TW_AI_PROVIDER=anthropic
TW_ANTHROPIC_API_KEY=your-key
TW_AI_MODEL=claude-sonnet-4-20250514
Available Tools
The agent has access to these tools via the Python bridge:
| Tool | Purpose |
|---|---|
parse_email | Extract email components |
check_email_authentication | Validate SPF/DKIM/DMARC |
lookup_sender_reputation | Query sender reputation |
lookup_urls | Check URL reputation |
lookup_attachments | Check attachment hashes |
search_siem | Query SIEM for related events |
get_host_info | Get EDR host information |
Agent Workflow
async def triage(self, incident: Incident) -> Verdict:
# 1. Load appropriate playbook
playbook = self.load_playbook(incident.incident_type)
# 2. Execute playbook steps (tools)
context = {}
for step in playbook.steps:
result = await self.execute_step(step, incident, context)
context[step.output] = result
# 3. Build analysis prompt
prompt = self.build_analysis_prompt(incident, context)
# 4. Get AI verdict
response = await self.client.messages.create(
model=self.config.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.config.max_tokens
)
# 5. Parse and return verdict
return self.parse_verdict(response)
System Prompt
The agent uses a specialized system prompt:
You are an expert security analyst assistant. Analyze the provided security
incident data and determine:
1. Classification: Is this malicious, suspicious, benign, or inconclusive?
2. Confidence: How certain are you (0.0 to 1.0)?
3. Category: What type of threat is this (phishing, malware, etc.)?
4. Reasoning: Explain your analysis step by step
5. Recommended Actions: What should be done to respond?
Use the tool results provided to inform your analysis. Be thorough but concise.
Cite specific evidence for your conclusions.
Tool Calling
The agent can call tools during analysis:
# Agent decides to check URL reputation
tool_result = await self.call_tool(
name="lookup_urls",
parameters={"urls": ["https://suspicious-site.com/login"]}
)
# Result used in analysis
# {
# "results": [{
# "url": "https://suspicious-site.com/login",
# "malicious": true,
# "categories": ["phishing"],
# "confidence": 0.95
# }]
# }
Customizing the Agent
Custom System Prompt
agent = TriageAgent(
system_prompt="""
You are a SOC analyst specializing in email security.
Focus on phishing indicators and BEC patterns.
Always check sender authentication carefully.
"""
)
Custom Tools
Register additional tools:
@agent.tool
async def custom_lookup(domain: str) -> dict:
"""Look up domain in internal threat database."""
return await internal_db.query(domain)
Model Selection
# Use different models for different scenarios
if incident.severity == "critical":
agent = TriageAgent(model="claude-opus-4-20250514")
else:
agent = TriageAgent(model="claude-sonnet-4-20250514")
Error Handling
The agent handles failures gracefully:
try:
verdict = await agent.triage(incident)
except ToolError as e:
# Tool failed - continue with available data
verdict = await agent.triage_partial(incident, failed_tools=[e.tool])
except AIError as e:
# AI call failed - return inconclusive
verdict = Verdict.inconclusive(reason=str(e))
Metrics
Agent metrics exported to Prometheus:
triage_duration_seconds- Time to complete triagetriage_tool_calls_total- Tool calls per triagetriage_verdict_total- Verdicts by classificationtriage_confidence_histogram- Confidence score distribution
Verdict Types
Understanding the classification outcomes from AI triage.
Classifications
| Classification | Description | Typical Response |
|---|---|---|
| Malicious | Confirmed threat | Immediate containment |
| Suspicious | Likely threat, needs investigation | Queue for analyst review |
| Benign | Not a threat | Close or archive |
| Inconclusive | Insufficient data | Request more information |
Malicious
The incident is a confirmed security threat.
Criteria:
- Multiple strong threat indicators
- High-confidence threat intelligence matches
- Clear malicious intent (credential theft, malware, etc.)
Example:
{
"classification": "malicious",
"confidence": 0.95,
"category": "phishing",
"reasoning": "Email contains credential phishing page targeting Microsoft 365. Sender domain registered yesterday, fails all email authentication. URL redirects to fake login mimicking Microsoft branding."
}
Response:
- Execute recommended containment actions
- Create incident ticket
- Notify affected users
Suspicious
The incident shows concerning indicators but lacks definitive proof.
Criteria:
- Some threat indicators present
- Mixed or conflicting signals
- Unusual but not clearly malicious behavior
Example:
{
"classification": "suspicious",
"confidence": 0.65,
"category": "potential_phishing",
"reasoning": "Email sender is unknown but domain is 6 months old with valid authentication. URL leads to legitimate document sharing service but file name uses urgency tactics. Recipient has not received email from this sender before."
}
Response:
- Queue for analyst review
- Gather additional context
- Consider temporary quarantine pending review
Benign
The incident is not a security threat.
Criteria:
- No threat indicators found
- Known good sender/source
- Normal expected behavior
Example:
{
"classification": "benign",
"confidence": 0.92,
"category": "legitimate_email",
"reasoning": "Email from known vendor with established sending history. All authentication passes. Attachment is a standard invoice PDF matching expected format. No suspicious URLs or indicators."
}
Response:
- Close incident
- Release from quarantine if held
- Update detection rules if false positive
Inconclusive
Insufficient data to make a determination.
Criteria:
- Missing critical information
- Tool failures preventing analysis
- Conflicting strong indicators
Example:
{
"classification": "inconclusive",
"confidence": 0.3,
"category": "unknown",
"reasoning": "Unable to analyze attachment - file corrupted. Sender reputation service unavailable. Email authentication results are mixed (SPF pass, DKIM fail). Need manual review of attachment content.",
"missing_data": [
"attachment_analysis",
"sender_reputation"
]
}
Response:
- Escalate to analyst
- Retry failed tool calls
- Request additional information
Confidence Scores
Confidence ranges and their meaning:
| Range | Interpretation |
|---|---|
| 0.9 - 1.0 | Very high confidence, clear evidence |
| 0.7 - 0.9 | High confidence, strong indicators |
| 0.5 - 0.7 | Moderate confidence, mixed signals |
| 0.3 - 0.5 | Low confidence, limited evidence |
| 0.0 - 0.3 | Very low confidence, insufficient data |
Category Types
Email Threats
| Category | Description |
|---|---|
phishing | Credential theft attempt |
spear_phishing | Targeted phishing |
bec | Business email compromise |
malware_delivery | Malicious attachment/link |
spam | Unsolicited bulk email |
Endpoint Threats
| Category | Description |
|---|---|
malware | Malicious software detected |
ransomware | Ransomware activity |
cryptominer | Cryptocurrency mining |
rat | Remote access trojan |
pup | Potentially unwanted program |
Access Threats
| Category | Description |
|---|---|
brute_force | Password guessing attempt |
credential_stuffing | Leaked credential use |
impossible_travel | Geographically impossible login |
account_takeover | Compromised account |
Using Verdicts
Automation Rules
# Auto-respond to high-confidence malicious
- trigger:
classification: malicious
confidence: ">= 0.9"
actions:
- quarantine_email
- block_sender
- create_ticket
# Queue suspicious for review
- trigger:
classification: suspicious
actions:
- escalate:
level: analyst
reason: "Suspicious activity requires review"
Metrics
Track verdict distribution:
# Verdict counts by classification
sum by (classification) (triage_verdict_total)
# Average confidence by category
avg by (category) (triage_confidence)
Confidence Scoring
How the AI agent determines confidence in its verdicts.
Confidence Factors
The agent considers multiple factors when calculating confidence:
Evidence Quality
| Factor | Impact |
|---|---|
| Threat intel match (high confidence) | +0.3 |
| Threat intel match (low confidence) | +0.1 |
| Authentication failure | +0.2 |
| Known malicious indicator | +0.3 |
| Suspicious pattern | +0.1 |
Evidence Quantity
| Indicators | Confidence Boost |
|---|---|
| 1 indicator | Base |
| 2-3 indicators | +0.1 |
| 4-5 indicators | +0.2 |
| 6+ indicators | +0.3 |
Data Completeness
| Missing Data | Confidence Penalty |
|---|---|
| None | 0 |
| Minor (sender reputation) | -0.1 |
| Moderate (attachment analysis) | -0.2 |
| Major (multiple tools failed) | -0.3 |
Calculation Example
Phishing Email Analysis:
Base confidence: 0.5
Evidence found:
+ SPF failed: +0.15
+ DKIM failed: +0.15
+ Sender domain < 7 days old: +0.2
+ URL matches phishing pattern: +0.25
+ VirusTotal flags URL as phishing: +0.2
Evidence count (5): +0.2
Data completeness: All tools succeeded: +0
Final confidence: 0.5 + 0.15 + 0.15 + 0.2 + 0.25 + 0.2 + 0.2 = 1.0 (capped at 0.99)
Verdict: malicious, confidence: 0.99
Confidence Thresholds
Policy decisions use confidence thresholds:
# Auto-quarantine high confidence malicious
[[policy.rules]]
name = "auto_quarantine_confident"
classification = "malicious"
confidence_min = 0.9
action = "quarantine_email"
decision = "allowed"
# Require review for lower confidence
[[policy.rules]]
name = "review_uncertain"
confidence_max = 0.7
decision = "requires_approval"
approval_level = "analyst"
Confidence Calibration
The agent is calibrated so confidence correlates with accuracy:
| Stated Confidence | Expected Accuracy |
|---|---|
| 0.9 | ~90% of verdicts correct |
| 0.8 | ~80% of verdicts correct |
| 0.7 | ~70% of verdicts correct |
Monitoring Calibration
Track calibration with metrics:
# Accuracy at confidence level
triage_accuracy_by_confidence{confidence_bucket="0.9-1.0"}
Improving Calibration
- Feedback loop - Log false positives to improve
- Periodic review - Sample low-confidence verdicts
- Model updates - Retrain with corrected examples
Handling Low Confidence
When confidence is low:
Option 1: Escalate
- condition: confidence < 0.6
action: escalate
parameters:
level: analyst
reason: "Low confidence verdict requires human review"
Option 2: Gather More Data
- condition: confidence < 0.6
action: request_additional_data
parameters:
- "sender_history"
- "recipient_context"
Option 3: Conservative Default
- condition: confidence < 0.6
action: quarantine_email
parameters:
reason: "Quarantined pending review due to uncertainty"
Confidence in UI
Dashboard displays confidence visually:
| Confidence | Display |
|---|---|
| 0.9+ | Green badge, "High Confidence" |
| 0.7-0.9 | Yellow badge, "Moderate Confidence" |
| 0.5-0.7 | Orange badge, "Low Confidence" |
| <0.5 | Red badge, "Very Low Confidence" |
Improving Confidence
Actions that help the agent be more confident:
- Complete data - Ensure all tools succeed
- Rich context - Provide incident metadata
- Historical data - Include past incidents with similar patterns
- Clear playbooks - Well-defined analysis steps
Playbooks
Playbooks define automated investigation and response workflows.
Overview
A playbook is a sequence of steps that:
- Gather and analyze incident data
- Enrich with threat intelligence
- Determine verdict and response
- Execute approved actions
Playbook Structure
name: phishing_triage
description: Automated phishing email analysis
version: "1.0"
# When this playbook applies
triggers:
incident_type: phishing
auto_run: true
# Variables available to steps
variables:
quarantine_threshold: 0.7
block_threshold: 0.3
# Execution steps
steps:
- name: Parse Email
action: parse_email
parameters:
raw_email: "{{ incident.raw_data.raw_email }}"
output: parsed
- name: Check Authentication
action: check_email_authentication
parameters:
headers: "{{ parsed.headers }}"
output: auth
- name: Check Sender
action: lookup_sender_reputation
parameters:
sender: "{{ parsed.sender }}"
output: sender_rep
- name: Check URLs
action: lookup_urls
parameters:
urls: "{{ parsed.urls }}"
output: url_results
condition: "{{ parsed.urls | length > 0 }}"
- name: Quarantine if Malicious
action: quarantine_email
parameters:
message_id: "{{ incident.raw_data.message_id }}"
reason: "Automated quarantine - phishing detected"
condition: >
sender_rep.score < variables.quarantine_threshold or
url_results.malicious_count > 0 or
not auth.authentication_passed
# Final verdict generation
verdict:
use_ai: true
model: claude-sonnet-4-20250514
context:
- parsed
- auth
- sender_rep
- url_results
Triggers
Define when a playbook runs:
triggers:
# Run for specific incident types
incident_type: phishing
# Auto-run on incident creation
auto_run: true
# Or require manual trigger
auto_run: false
# Conditions
conditions:
severity: ["medium", "high", "critical"]
source: "email_gateway"
Steps
Basic Step
- name: Step Name
action: action_name
parameters:
key: value
output: variable_name
Conditional Step
- name: Block Known Bad
action: block_sender
parameters:
sender: "{{ parsed.sender }}"
condition: "{{ sender_rep.score < 0.2 }}"
Parallel Steps
- parallel:
- action: lookup_urls
parameters:
urls: "{{ parsed.urls }}"
output: url_results
- action: lookup_attachments
parameters:
attachments: "{{ parsed.attachments }}"
output: attachment_results
Loop Steps
- name: Check Each URL
loop: "{{ parsed.urls }}"
action: lookup_url
parameters:
url: "{{ item }}"
output: url_results
aggregate: list
Variables
Built-in Variables
| Variable | Description |
|---|---|
incident | The incident being processed |
incident.id | Incident ID |
incident.raw_data | Original incident data |
incident.severity | Incident severity |
variables | Playbook-defined variables |
Step Outputs
Each step's output is available to subsequent steps:
- action: parse_email
output: parsed
- action: lookup_urls
parameters:
urls: "{{ parsed.urls }}" # Use previous output
Templates
Use Jinja2-style templates:
parameters:
message: "Alert for {{ incident.id }}: {{ parsed.subject }}"
priority: "{{ 'high' if incident.severity == 'critical' else 'medium' }}"
Next Steps
- Creating Playbooks - Write your own playbooks
- Built-in Playbooks - Ready-to-use playbooks
Creating Playbooks
Guide to writing custom playbooks for your security workflows.
Getting Started
1. Create Playbook File
mkdir -p playbooks
touch playbooks/my_playbook.yaml
2. Define Basic Structure
name: my_playbook
description: Description of what this playbook does
version: "1.0"
triggers:
incident_type: phishing
auto_run: true
steps:
- name: First Step
action: parse_email
output: result
3. Register Playbook
tw-cli playbook add playbooks/my_playbook.yaml
Step Types
Action Step
Execute a registered action:
- name: Parse Email Content
action: parse_email
parameters:
raw_email: "{{ incident.raw_data.raw_email }}"
output: parsed
on_error: continue # or "fail" (default)
Condition Step
Branch based on conditions:
- name: Check if High Risk
condition: "{{ sender_rep.score < 0.3 }}"
then:
- action: quarantine_email
parameters:
message_id: "{{ incident.raw_data.message_id }}"
else:
- action: log_event
parameters:
message: "Low risk, no action needed"
AI Analysis Step
Get AI verdict:
- name: AI Analysis
type: ai_analysis
model: claude-sonnet-4-20250514
context:
- parsed
- auth_results
- reputation
prompt: |
Analyze this email for phishing indicators.
Consider the authentication results and sender reputation.
output: ai_verdict
Notification Step
Send alerts:
- name: Alert Team
action: notify_channel
parameters:
channel: slack
message: |
New {{ incident.severity }} incident detected
ID: {{ incident.id }}
Type: {{ incident.incident_type }}
Error Handling
Per-Step Error Handling
- name: Check Reputation
action: lookup_sender_reputation
parameters:
sender: "{{ parsed.sender }}"
output: reputation
on_error: continue # Don't fail playbook if this fails
default_output: # Use this if step fails
score: 0.5
risk_level: "unknown"
Global Error Handler
on_error:
- action: notify_channel
parameters:
channel: slack
message: "Playbook {{ playbook.name }} failed: {{ error.message }}"
- action: escalate
parameters:
level: analyst
reason: "Automated triage failed"
Variables and Templates
Define Variables
variables:
high_risk_threshold: 0.3
quarantine_enabled: true
notification_channel: "#security-alerts"
Use Variables
- name: Check Risk
condition: "{{ sender_rep.score < variables.high_risk_threshold }}"
then:
- action: quarantine_email
condition: "{{ variables.quarantine_enabled }}"
Template Functions
parameters:
# String manipulation
domain: "{{ parsed.sender | split('@') | last }}"
# Conditionals
priority: "{{ 'critical' if incident.severity == 'critical' else 'high' }}"
# Lists
all_urls: "{{ parsed.urls | join(', ') }}"
url_count: "{{ parsed.urls | length }}"
# Defaults
assignee: "{{ incident.assignee | default('unassigned') }}"
Testing Playbooks
Dry Run
tw-cli playbook test my_playbook \
--incident INC-2024-001 \
--dry-run
With Mock Data
tw-cli playbook test my_playbook \
--data '{"raw_email": "From: test@example.com..."}'
Validate Syntax
tw-cli playbook validate playbooks/my_playbook.yaml
Best Practices
1. Use Descriptive Names
# Good
- name: Check sender domain reputation
# Bad
- name: step1
2. Handle Failures Gracefully
- name: External Lookup
action: lookup_sender_reputation
on_error: continue
default_output:
score: 0.5
3. Add Timeouts
- name: Slow External API
action: custom_lookup
timeout: 30s
4. Log Key Decisions
- name: Log Verdict
action: log_event
parameters:
level: info
message: "Verdict: {{ verdict.classification }} ({{ verdict.confidence }})"
5. Version Your Playbooks
name: phishing_triage
version: "2.1.0"
changelog:
- "2.1.0: Added attachment analysis"
- "2.0.0: Restructured for parallel lookups"
Example: Complete Playbook
name: comprehensive_phishing_triage
description: Full phishing email analysis with all checks
version: "2.0"
triggers:
incident_type: phishing
auto_run: true
variables:
quarantine_threshold: 0.3
block_threshold: 0.2
steps:
# Parse email
- name: Parse Email
action: parse_email
parameters:
raw_email: "{{ incident.raw_data.raw_email }}"
output: parsed
# Parallel enrichment
- name: Enrich Data
parallel:
- action: check_email_authentication
parameters:
headers: "{{ parsed.headers }}"
output: auth
- action: lookup_sender_reputation
parameters:
sender: "{{ parsed.sender }}"
output: sender_rep
- action: lookup_urls
parameters:
urls: "{{ parsed.urls }}"
output: urls
condition: "{{ parsed.urls | length > 0 }}"
- action: lookup_attachments
parameters:
attachments: "{{ parsed.attachments }}"
output: attachments
condition: "{{ parsed.attachments | length > 0 }}"
# AI Analysis
- name: AI Verdict
type: ai_analysis
model: claude-sonnet-4-20250514
context: [parsed, auth, sender_rep, urls, attachments]
output: verdict
# Response actions
- name: Quarantine Malicious
action: quarantine_email
parameters:
message_id: "{{ incident.raw_data.message_id }}"
condition: >
verdict.classification == 'malicious' and
verdict.confidence >= variables.quarantine_threshold
- name: Block Repeat Offender
action: block_sender
parameters:
sender: "{{ parsed.sender }}"
condition: >
sender_rep.score < variables.block_threshold
- name: Create Ticket
action: create_ticket
parameters:
title: "{{ verdict.classification | title }}: {{ parsed.subject | truncate(50) }}"
priority: "{{ incident.severity }}"
condition: "{{ verdict.classification != 'benign' }}"
on_error:
- action: escalate
parameters:
level: analyst
reason: "Playbook execution failed"
Built-in Playbooks
Ready-to-use playbooks included with Triage Warden.
Email Security
phishing_triage
Comprehensive phishing email analysis.
Triggers: incident_type: phishing
Steps:
- Parse email headers and body
- Check SPF/DKIM/DMARC authentication
- Look up sender reputation
- Analyze URLs against threat intel
- Check attachment hashes
- AI analysis and verdict
- Auto-quarantine if malicious (confidence > 0.8)
Usage:
tw-cli playbook run phishing_triage --incident INC-2024-001
spam_triage
Quick spam classification.
Triggers: incident_type: spam
Steps:
- Parse email
- Check spam indicators (bulk headers, suspicious patterns)
- Classify as spam/not spam
- Auto-archive low-confidence spam
bec_detection
Business Email Compromise detection.
Triggers: incident_type: bec
Steps:
- Parse email
- Check for executive impersonation
- Analyze reply-to mismatch
- Check for urgency indicators
- Verify sender against directory
- AI analysis for social engineering patterns
Endpoint Security
malware_triage
Malware alert analysis.
Triggers: incident_type: malware
Steps:
- Get host information from EDR
- Look up file hash
- Check related processes
- Query SIEM for lateral movement
- AI verdict
- Auto-isolate if critical severity + high confidence
suspicious_login
Anomalous login investigation.
Triggers: incident_type: suspicious_login
Steps:
- Get login details
- Check for impossible travel
- Query user's recent activity
- Check IP reputation
- Verify device fingerprint
- AI analysis
Customizing Built-in Playbooks
Override Variables
tw-cli playbook run phishing_triage \
--incident INC-2024-001 \
--var quarantine_threshold=0.9 \
--var auto_block=false
Fork and Modify
# Export built-in playbook
tw-cli playbook export phishing_triage > my_phishing.yaml
# Edit as needed
vim my_phishing.yaml
# Register custom version
tw-cli playbook add my_phishing.yaml
Extend with Hooks
# my_phishing.yaml
extends: phishing_triage
# Add steps after parent playbook
after_steps:
- name: Custom Logging
action: log_to_siem
parameters:
event: phishing_verdict
data: "{{ verdict }}"
# Override variables
variables:
quarantine_threshold: 0.85
Playbook Comparison
| Playbook | AI Used | Auto-Response | Typical Duration |
|---|---|---|---|
| phishing_triage | Yes | Quarantine, Block | 30-60s |
| spam_triage | No | Archive | 5-10s |
| bec_detection | Yes | Escalate | 45-90s |
| malware_triage | Yes | Isolate | 60-120s |
| suspicious_login | Yes | Lock account | 30-60s |
Monitoring Playbooks
Execution Metrics
# Playbook execution count
sum by (playbook) (playbook_executions_total)
# Average duration
avg by (playbook) (playbook_duration_seconds)
# Success rate
sum(playbook_executions_total{status="success"}) /
sum(playbook_executions_total)
Alerts
# Alert on playbook failures
- alert: PlaybookFailureRate
expr: |
sum(rate(playbook_executions_total{status="failed"}[5m])) /
sum(rate(playbook_executions_total[5m])) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "Playbook failure rate above 10%"
REST API
Programmatic access to Triage Warden functionality.
Base URL
http://localhost:8080/api
Authentication
See Authentication for details.
API Key
curl -H "Authorization: Bearer tw_abc123_secretkey" \
http://localhost:8080/api/incidents
Session Cookie
For browser-based access, use session authentication via /login.
Response Format
All responses are JSON:
{
"data": { ... },
"meta": {
"page": 1,
"per_page": 20,
"total": 150
}
}
Error Responses
{
"error": {
"code": "not_found",
"message": "Incident not found",
"details": { ... }
}
}
HTTP Status Codes
| Code | Meaning |
|---|---|
| 200 | Success |
| 201 | Created |
| 400 | Bad Request |
| 401 | Unauthorized |
| 403 | Forbidden |
| 404 | Not Found |
| 422 | Validation Error |
| 429 | Rate Limited |
| 500 | Server Error |
Endpoints Overview
Incidents
| Method | Path | Description |
|---|---|---|
| GET | /incidents | List incidents |
| POST | /incidents | Create incident |
| GET | /incidents/:id | Get incident |
| PUT | /incidents/:id | Update incident |
| DELETE | /incidents/:id | Delete incident |
| POST | /incidents/:id/triage | Run triage |
| POST | /incidents/:id/actions | Execute action |
Actions
| Method | Path | Description |
|---|---|---|
| GET | /actions | List actions |
| GET | /actions/:id | Get action |
| POST | /actions/:id/approve | Approve action |
| POST | /actions/:id/reject | Reject action |
Playbooks
| Method | Path | Description |
|---|---|---|
| GET | /playbooks | List playbooks |
| POST | /playbooks | Create playbook |
| GET | /playbooks/:id | Get playbook |
| PUT | /playbooks/:id | Update playbook |
| DELETE | /playbooks/:id | Delete playbook |
| POST | /playbooks/:id/run | Run playbook |
Webhooks
| Method | Path | Description |
|---|---|---|
| POST | /webhooks/:source | Receive webhook |
System
| Method | Path | Description |
|---|---|---|
| GET | /health | Health check |
| GET | /metrics | Prometheus metrics |
| GET | /connectors/health | Connector status |
Pagination
List endpoints support pagination:
curl "http://localhost:8080/api/incidents?page=2&per_page=50"
Parameters:
page- Page number (default: 1)per_page- Items per page (default: 20, max: 100)
Filtering
Filter list results:
curl "http://localhost:8080/api/incidents?status=open&severity=high"
Common filters:
status- Filter by statusseverity- Filter by severitytype- Filter by incident typecreated_after- Created after datecreated_before- Created before date
Sorting
curl "http://localhost:8080/api/incidents?sort=-created_at"
- Prefix with
-for descending order - Default:
-created_at(newest first)
Rate Limiting
API requests are rate limited:
| Endpoint | Limit |
|---|---|
| Read operations | 100/min |
| Write operations | 20/min |
| Triage requests | 10/min |
Rate limit headers:
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 95
X-RateLimit-Reset: 1705320000
Next Steps
- Authentication - API authentication
- Incidents - Incident endpoints
- Actions - Action endpoints
- Playbooks - Playbook endpoints
- Webhooks - Webhook integration
API Authentication
Authenticate with the Triage Warden API.
API Keys
Creating an API Key
# Via CLI
tw-cli api-key create --name "automation-script" --scopes read,write
# Output:
# API Key created successfully
# Key: tw_abc123_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# WARNING: Store this key securely. It cannot be retrieved again.
Using API Keys
Include in the Authorization header:
curl -H "Authorization: Bearer tw_abc123_secretkey" \
http://localhost:8080/api/incidents
API Key Scopes
| Scope | Permissions |
|---|---|
read | Read incidents, actions, playbooks |
write | Create/update incidents, execute actions |
admin | User management, system configuration |
Managing API Keys
# List keys
tw-cli api-key list
# Revoke key
tw-cli api-key revoke tw_abc123
# Rotate key
tw-cli api-key rotate tw_abc123
Session Authentication
For web dashboard access:
Login
curl -X POST http://localhost:8080/login \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=analyst&password=secret&csrf_token=xxx" \
-c cookies.txt
Using Session
curl -b cookies.txt http://localhost:8080/api/incidents
Logout
curl -X POST http://localhost:8080/logout -b cookies.txt
CSRF Protection
State-changing requests require CSRF tokens:
- Get token from login page or API
- Include in request header or body
# Header
curl -X POST http://localhost:8080/api/incidents \
-H "X-CSRF-Token: abc123" \
-b cookies.txt \
-d '{"type": "phishing"}'
# Form body
curl -X POST http://localhost:8080/api/incidents \
-d "csrf_token=abc123&type=phishing" \
-b cookies.txt
Webhook Authentication
Webhooks use HMAC signatures:
Configuring Webhook Secret
tw-cli webhook add email-gateway \
--url http://localhost:8080/api/webhooks/email-gateway \
--secret "your-secret-key"
Verifying Signatures
Triage Warden validates the X-Webhook-Signature header:
X-Webhook-Signature: sha256=abc123...
Signature is computed as:
HMAC-SHA256(secret, timestamp + "." + body)
Signature Verification Example
import hmac
import hashlib
def verify_signature(payload: bytes, signature: str, secret: str, timestamp: str) -> bool:
expected = hmac.new(
secret.encode(),
f"{timestamp}.{payload.decode()}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
Service Accounts
For automated systems:
# Create service account
tw-cli user create \
--username automation-bot \
--role analyst \
--service-account
# Generate API key for service account
tw-cli api-key create \
--user automation-bot \
--name "ci-cd-integration" \
--scopes read,write
Security Best Practices
- Rotate keys regularly - Set up automated rotation
- Use minimal scopes - Only grant necessary permissions
- Secure storage - Use secret managers, not code
- Monitor usage - Review audit logs for suspicious activity
- IP allowlisting - Restrict API access by IP (optional)
# Enable IP allowlist
tw-cli config set api.allowed_ips "10.0.0.0/8,192.168.1.0/24"
Error Responses
401 Unauthorized
Missing or invalid credentials:
{
"error": {
"code": "unauthorized",
"message": "Invalid or missing authentication"
}
}
403 Forbidden
Valid credentials but insufficient permissions:
{
"error": {
"code": "forbidden",
"message": "Insufficient permissions for this operation"
}
}
Incidents API
Create, read, update, and manage security incidents.
List Incidents
GET /api/incidents
Query Parameters
| Parameter | Type | Description |
|---|---|---|
status | string | Filter by status (open, triaged, resolved) |
severity | string | Filter by severity (low, medium, high, critical) |
type | string | Filter by incident type |
created_after | datetime | Created after timestamp |
created_before | datetime | Created before timestamp |
page | integer | Page number |
per_page | integer | Items per page |
sort | string | Sort field (prefix - for desc) |
Example
curl "http://localhost:8080/api/incidents?status=open&severity=high&per_page=10" \
-H "Authorization: Bearer tw_xxx"
Response
{
"data": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"incident_number": "INC-2024-0001",
"incident_type": "phishing",
"severity": "high",
"status": "open",
"source": "email_gateway",
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:30:00Z"
}
],
"meta": {
"page": 1,
"per_page": 10,
"total": 42
}
}
Get Incident
GET /api/incidents/:id
Example
curl "http://localhost:8080/api/incidents/550e8400-e29b-41d4-a716-446655440000" \
-H "Authorization: Bearer tw_xxx"
Response
{
"data": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"incident_number": "INC-2024-0001",
"incident_type": "phishing",
"severity": "high",
"status": "triaged",
"source": "email_gateway",
"raw_data": {
"message_id": "AAMkAGI2...",
"sender": "phisher@malicious.com",
"subject": "Urgent: Update Account"
},
"verdict": {
"classification": "malicious",
"confidence": 0.92,
"category": "phishing",
"reasoning": "Multiple phishing indicators..."
},
"recommended_actions": [
"quarantine_email",
"block_sender"
],
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:35:00Z",
"triaged_at": "2024-01-15T10:35:00Z"
}
}
Create Incident
POST /api/incidents
Request Body
{
"incident_type": "phishing",
"source": "email_gateway",
"severity": "medium",
"raw_data": {
"message_id": "AAMkAGI2...",
"sender": "unknown@domain.com",
"recipient": "employee@company.com",
"subject": "Important Document",
"received_at": "2024-01-15T10:00:00Z"
}
}
Example
curl -X POST "http://localhost:8080/api/incidents" \
-H "Authorization: Bearer tw_xxx" \
-H "Content-Type: application/json" \
-d '{
"incident_type": "phishing",
"source": "email_gateway",
"severity": "medium",
"raw_data": {...}
}'
Response
{
"data": {
"id": "550e8400-e29b-41d4-a716-446655440001",
"incident_number": "INC-2024-0002",
"status": "open",
"created_at": "2024-01-15T11:00:00Z"
}
}
Update Incident
PUT /api/incidents/:id
Request Body
{
"severity": "high",
"status": "resolved",
"resolution": "False positive - legitimate vendor email"
}
Delete Incident
DELETE /api/incidents/:id
Note: Requires admin role.
Run Triage
POST /api/incidents/:id/triage
Trigger AI triage on an incident.
Request Body (Optional)
{
"playbook": "custom_phishing",
"force": true
}
Response
{
"data": {
"triage_id": "triage-abc123",
"status": "completed",
"verdict": {
"classification": "malicious",
"confidence": 0.92
},
"duration_ms": 45000
}
}
Execute Action
POST /api/incidents/:id/actions
Execute an action on an incident.
Request Body
{
"action": "quarantine_email",
"parameters": {
"message_id": "AAMkAGI2...",
"reason": "Phishing detected"
}
}
Response (Immediate Execution)
{
"data": {
"action_id": "act-abc123",
"status": "completed",
"result": {
"success": true,
"message": "Email quarantined successfully"
}
}
}
Response (Pending Approval)
{
"data": {
"action_id": "act-abc123",
"status": "pending_approval",
"approval_level": "senior",
"message": "Action requires senior analyst approval"
}
}
Get Incident Actions
GET /api/incidents/:id/actions
List all actions for an incident.
Response
{
"data": [
{
"id": "act-abc123",
"action_type": "quarantine_email",
"status": "completed",
"executed_at": "2024-01-15T10:40:00Z",
"executed_by": "system"
},
{
"id": "act-def456",
"action_type": "block_sender",
"status": "pending_approval",
"approval_level": "analyst",
"requested_at": "2024-01-15T10:41:00Z"
}
]
}
Actions API
Manage action execution and approvals.
List Actions
GET /api/actions
Query Parameters
| Parameter | Type | Description |
|---|---|---|
status | string | pending, pending_approval, completed, failed |
action_type | string | Filter by action type |
incident_id | uuid | Filter by incident |
approval_level | string | analyst, senior, manager |
Example
curl "http://localhost:8080/api/actions?status=pending_approval" \
-H "Authorization: Bearer tw_xxx"
Response
{
"data": [
{
"id": "act-abc123",
"incident_id": "550e8400-e29b-41d4-a716-446655440000",
"action_type": "isolate_host",
"status": "pending_approval",
"approval_level": "senior",
"parameters": {
"host_id": "aid:xyz789",
"reason": "Malware detected"
},
"requested_by": "triage_agent",
"requested_at": "2024-01-15T10:45:00Z"
}
]
}
Get Action
GET /api/actions/:id
Response
{
"data": {
"id": "act-abc123",
"incident_id": "550e8400-e29b-41d4-a716-446655440000",
"action_type": "isolate_host",
"status": "pending_approval",
"approval_level": "senior",
"parameters": {
"host_id": "aid:xyz789",
"reason": "Malware detected"
},
"requested_by": "triage_agent",
"requested_at": "2024-01-15T10:45:00Z",
"incident": {
"incident_number": "INC-2024-0001",
"incident_type": "malware",
"severity": "high"
}
}
}
Approve Action
POST /api/actions/:id/approve
Request Body
{
"comment": "Verified threat, approved for isolation"
}
Response
{
"data": {
"id": "act-abc123",
"status": "completed",
"approved_by": "senior.analyst@company.com",
"approved_at": "2024-01-15T11:00:00Z",
"result": {
"success": true,
"message": "Host isolated successfully"
}
}
}
Errors
403 Forbidden - Insufficient approval level:
{
"error": {
"code": "insufficient_approval_level",
"message": "This action requires senior analyst approval",
"required_level": "senior",
"your_level": "analyst"
}
}
Reject Action
POST /api/actions/:id/reject
Request Body
{
"reason": "False positive - user confirmed legitimate activity"
}
Response
{
"data": {
"id": "act-abc123",
"status": "rejected",
"rejected_by": "senior.analyst@company.com",
"rejected_at": "2024-01-15T11:00:00Z",
"rejection_reason": "False positive - user confirmed legitimate activity"
}
}
Execute Action Directly
POST /api/actions/execute
Execute an action without associating with an incident.
Request Body
{
"action": "block_sender",
"parameters": {
"sender": "spammer@malicious.com"
}
}
Response
{
"data": {
"action_id": "act-ghi789",
"status": "completed",
"result": {
"success": true,
"message": "Sender blocked"
}
}
}
Get Action Types
GET /api/actions/types
List all available action types.
Response
{
"data": [
{
"name": "quarantine_email",
"description": "Move email to quarantine",
"category": "email",
"supports_rollback": true,
"parameters": [
{
"name": "message_id",
"type": "string",
"required": true
},
{
"name": "reason",
"type": "string",
"required": false
}
]
},
{
"name": "isolate_host",
"description": "Network-isolate a host",
"category": "endpoint",
"supports_rollback": true,
"default_approval_level": "senior",
"parameters": [...]
}
]
}
Rollback Action
POST /api/actions/:id/rollback
Rollback a previously executed action.
Request Body
{
"reason": "False positive confirmed"
}
Response
{
"data": {
"rollback_action_id": "act-jkl012",
"original_action_id": "act-abc123",
"status": "completed",
"result": {
"success": true,
"message": "Host unisolated successfully"
}
}
}
Errors
400 Bad Request - Action doesn't support rollback:
{
"error": {
"code": "rollback_not_supported",
"message": "Action type 'notify_user' does not support rollback"
}
}
Playbooks API
Manage and execute playbooks.
List Playbooks
GET /api/playbooks
Response
{
"data": [
{
"id": "pb-abc123",
"name": "phishing_triage",
"description": "Automated phishing email analysis",
"version": "2.0",
"enabled": true,
"triggers": {
"incident_type": "phishing",
"auto_run": true
},
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-10T00:00:00Z"
}
]
}
Get Playbook
GET /api/playbooks/:id
Response
{
"data": {
"id": "pb-abc123",
"name": "phishing_triage",
"description": "Automated phishing email analysis",
"version": "2.0",
"enabled": true,
"triggers": {
"incident_type": "phishing",
"auto_run": true
},
"variables": {
"quarantine_threshold": 0.7
},
"steps": [
{
"name": "Parse Email",
"action": "parse_email",
"parameters": {
"raw_email": "{{ incident.raw_data.raw_email }}"
},
"output": "parsed"
}
],
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-10T00:00:00Z"
}
}
Create Playbook
POST /api/playbooks
Request Body
{
"name": "custom_playbook",
"description": "My custom investigation playbook",
"triggers": {
"incident_type": "phishing",
"auto_run": false
},
"steps": [
{
"name": "Parse Email",
"action": "parse_email",
"output": "parsed"
}
]
}
Response
{
"data": {
"id": "pb-def456",
"name": "custom_playbook",
"version": "1.0",
"created_at": "2024-01-15T12:00:00Z"
}
}
Update Playbook
PUT /api/playbooks/:id
Request Body
{
"description": "Updated description",
"enabled": false
}
Delete Playbook
DELETE /api/playbooks/:id
Note: Built-in playbooks cannot be deleted.
Run Playbook
POST /api/playbooks/:id/run
Execute a playbook on an incident.
Request Body
{
"incident_id": "550e8400-e29b-41d4-a716-446655440000",
"variables": {
"quarantine_threshold": 0.9
}
}
Response
{
"data": {
"execution_id": "exec-abc123",
"playbook_id": "pb-abc123",
"incident_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"started_at": "2024-01-15T12:00:00Z",
"completed_at": "2024-01-15T12:00:45Z",
"steps_completed": 5,
"steps_total": 5,
"verdict": {
"classification": "malicious",
"confidence": 0.92
}
}
}
Get Playbook Executions
GET /api/playbooks/:id/executions
Response
{
"data": [
{
"execution_id": "exec-abc123",
"incident_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"duration_ms": 45000,
"started_at": "2024-01-15T12:00:00Z"
}
]
}
Validate Playbook
POST /api/playbooks/validate
Validate playbook YAML without creating it.
Request Body
{
"content": "name: test\nsteps:\n - action: parse_email"
}
Response (Valid)
{
"data": {
"valid": true,
"warnings": []
}
}
Response (Invalid)
{
"data": {
"valid": false,
"errors": [
{
"line": 3,
"message": "Unknown action: invalid_action"
}
]
}
}
Export Playbook
GET /api/playbooks/:id/export
Download playbook as YAML file.
Response
name: phishing_triage
description: Automated phishing email analysis
version: "2.0"
...
Webhooks API
Receive events from external security tools.
Endpoint
POST /api/webhooks/:source
Where :source identifies the sending system (e.g., email-gateway, edr, siem).
Authentication
Webhooks are authenticated via HMAC signatures:
X-Webhook-Signature: sha256=abc123...
X-Webhook-Timestamp: 1705320000
Registering Webhook Sources
Via CLI
tw-cli webhook add email-gateway \
--secret "your-secret-key" \
--auto-triage true \
--playbook phishing_triage
Via API
curl -X POST "http://localhost:8080/api/webhooks" \
-H "Authorization: Bearer tw_xxx" \
-d '{
"source": "email-gateway",
"secret": "your-secret-key",
"auto_triage": true,
"playbook": "phishing_triage"
}'
Payload Formats
Generic Format
{
"event_type": "security_alert",
"timestamp": "2024-01-15T10:00:00Z",
"source": "email-gateway",
"data": {
"alert_id": "alert-123",
"severity": "high",
"details": {...}
}
}
Microsoft Defender for Office 365
{
"eventType": "PhishingEmail",
"id": "AAMkAGI2...",
"creationTime": "2024-01-15T10:00:00Z",
"severity": "high",
"category": "Phish",
"entityType": "Email",
"data": {
"sender": "phisher@malicious.com",
"subject": "Urgent Action Required",
"recipients": ["user@company.com"]
}
}
CrowdStrike Falcon
{
"metadata": {
"eventType": "DetectionSummaryEvent",
"eventCreationTime": 1705320000000
},
"event": {
"DetectId": "ldt:abc123",
"Severity": 4,
"HostnameField": "WORKSTATION-01",
"DetectName": "Malicious File Detected"
}
}
Splunk Alert
{
"result": {
"host": "server-01",
"source": "WinEventLog:Security",
"sourcetype": "WinEventLog",
"_raw": "...",
"EventCode": "4625"
},
"search_name": "Failed Login Alert",
"trigger_time": 1705320000
}
Response
Success
{
"status": "accepted",
"incident_id": "550e8400-e29b-41d4-a716-446655440000",
"incident_number": "INC-2024-0001"
}
Queued for Processing
{
"status": "queued",
"queue_id": "queue-abc123",
"message": "Event queued for processing"
}
Configuring Auto-Triage
When auto_triage is enabled, incidents created from webhooks are automatically triaged:
# webhook_config.yaml
sources:
email-gateway:
secret: "${EMAIL_GATEWAY_SECRET}"
auto_triage: true
playbook: phishing_triage
severity_mapping:
critical: critical
high: high
medium: medium
low: low
edr:
secret: "${EDR_SECRET}"
auto_triage: true
playbook: malware_triage
Testing Webhooks
Send Test Event
# Generate signature
TIMESTAMP=$(date +%s)
BODY='{"event_type":"test","data":{}}'
SIGNATURE=$(echo -n "${TIMESTAMP}.${BODY}" | openssl dgst -sha256 -hmac "your-secret")
# Send request
curl -X POST "http://localhost:8080/api/webhooks/email-gateway" \
-H "Content-Type: application/json" \
-H "X-Webhook-Signature: sha256=${SIGNATURE}" \
-H "X-Webhook-Timestamp: ${TIMESTAMP}" \
-d "${BODY}"
Verify Configuration
tw-cli webhook test email-gateway
Error Handling
Invalid Signature
{
"error": {
"code": "invalid_signature",
"message": "Webhook signature verification failed"
}
}
Unknown Source
{
"error": {
"code": "unknown_source",
"message": "Webhook source 'unknown' is not registered"
}
}
Replay Attack
{
"error": {
"code": "timestamp_expired",
"message": "Webhook timestamp is too old (>5 minutes)"
}
}
Monitoring Webhooks
Metrics
# Webhook receive rate
rate(webhook_received_total[5m])
# Error rate by source
rate(webhook_errors_total[5m])
Logs
tw-cli logs --filter webhook --tail 100
Contributing
Guide to contributing to Triage Warden.
Getting Started
- Fork the repository
- Clone your fork
- Set up the development environment
- Create a branch for your changes
- Submit a pull request
Development Setup
Prerequisites
- Rust 1.75+
- Python 3.11+
- uv (Python package manager)
- SQLite (for development)
Initial Setup
# Clone repository
git clone https://github.com/your-username/triage-warden.git
cd triage-warden
# Install Rust dependencies
cargo build
# Install Python dependencies
cd python
uv sync
cd ..
# Run tests
cargo test
cd python && uv run pytest
Code Style
Rust
- Follow standard Rust conventions
- Run
cargo fmtbefore committing - Run
cargo clippyand fix warnings - Document public APIs with doc comments
Python
- Follow PEP 8
- Run
ruff checkandblackbefore committing - Type hints required (mypy strict mode)
- Docstrings for public functions
Pre-commit Hooks
Install pre-commit hooks:
# The project has pre-commit configured in .git/hooks
# It runs automatically on commit:
# - cargo fmt
# - cargo clippy
# - ruff
# - black
# - mypy
Pull Request Process
-
Create a branch
git checkout -b feature/my-feature -
Make changes
- Write code
- Add tests
- Update documentation
-
Run checks
cargo fmt && cargo clippy cargo test cd python && uv run pytest -
Commit
git commit -m "feat: add new feature" -
Push and create PR
git push origin feature/my-feature -
Address review feedback
Commit Messages
Follow conventional commits:
type(scope): description
[optional body]
[optional footer]
Types:
feat: New featurefix: Bug fixdocs: Documentationrefactor: Code refactoringtest: Adding testschore: Maintenance
Testing
Rust Tests
# Run all tests
cargo test
# Run specific crate tests
cargo test -p tw-api
# Run with output
cargo test -- --nocapture
Python Tests
cd python
uv run pytest
# Run specific tests
uv run pytest tests/test_agents.py
# With coverage
uv run pytest --cov=tw_ai
Integration Tests
# Start test server
cargo run --bin tw-api &
# Run integration tests
./scripts/integration-tests.sh
Documentation
- Update docs for API changes
- Add examples for new features
- Keep README.md current
Build docs locally:
cd docs-site
mdbook serve
Issue Reporting
When reporting issues:
- Search existing issues first
- Use issue templates
- Include:
- Version information
- Steps to reproduce
- Expected vs actual behavior
- Relevant logs
Questions
- Open a GitHub Discussion
- Check existing discussions first
- Tag appropriately
License
By contributing, you agree that your contributions will be licensed under the MIT License.
Building from Source
Complete guide to building Triage Warden.
Prerequisites
Rust
# Install Rust via rustup
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Verify installation
rustc --version # Should be 1.75+
Python
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Verify installation
uv --version
System Dependencies
macOS
brew install openssl pkg-config
Ubuntu/Debian
sudo apt-get install build-essential pkg-config libssl-dev
Fedora
sudo dnf install gcc openssl-devel pkgconfig
Building
Debug Build
cargo build
Outputs:
target/debug/tw-apitarget/debug/tw-cli
Release Build
cargo build --release
Outputs:
target/release/tw-apitarget/release/tw-cli
Python Package
cd python
uv sync
uv build
PyO3 Bridge
The bridge is built automatically with cargo:
cd tw-bridge
cargo build --release
Build Options
Feature Flags
# Build with PostgreSQL support only
cargo build --no-default-features --features postgres
# Build with all features
cargo build --all-features
Cross-Compilation
# For Linux (from macOS)
rustup target add x86_64-unknown-linux-gnu
cargo build --release --target x86_64-unknown-linux-gnu
# For musl (static binary)
rustup target add x86_64-unknown-linux-musl
cargo build --release --target x86_64-unknown-linux-musl
Docker Build
Build Image
docker build -t triage-warden .
Multi-Stage Dockerfile
# Builder stage
FROM rust:1.75 as builder
WORKDIR /app
COPY . .
RUN cargo build --release
# Runtime stage
FROM debian:bookworm-slim
COPY --from=builder /app/target/release/tw-api /usr/local/bin/
CMD ["tw-api"]
Verification
Run Tests
# Rust tests
cargo test
# Python tests
cd python && uv run pytest
# All tests
./scripts/test-all.sh
Linting
# Rust
cargo fmt --check
cargo clippy -- -D warnings
# Python
cd python
uv run ruff check
uv run black --check .
uv run mypy .
Smoke Test
# Start server
./target/release/tw-api &
# Health check
curl http://localhost:8080/api/health
# Stop server
kill %1
Troubleshooting
OpenSSL Errors
# macOS
export OPENSSL_DIR=$(brew --prefix openssl)
# Linux
export OPENSSL_DIR=/usr
PyO3 Build Issues
# Ensure Python is found
export PYO3_PYTHON=$(which python3)
# Clean and rebuild
cargo clean -p tw-bridge
cargo build -p tw-bridge
Out of Memory
# Reduce parallel jobs
cargo build -j 2
Testing
Guide to testing Triage Warden.
Test Structure
triage-warden/
├── crates/
│ ├── tw-api/src/
│ │ └── tests/ # API integration tests
│ ├── tw-core/src/
│ │ └── tests/ # Core unit tests
│ └── tw-actions/src/
│ └── tests/ # Action handler tests
└── python/
└── tests/ # Python tests
Running Tests
All Tests
# Rust
cargo test
# Python
cd python && uv run pytest
# Everything
./scripts/test-all.sh
Specific Tests
# Single crate
cargo test -p tw-api
# Single test
cargo test test_incident_creation
# Pattern match
cargo test incident
# With output
cargo test -- --nocapture
Unit Tests
Rust Unit Tests
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; #[test] fn test_incident_creation() { let incident = Incident::new( IncidentType::Phishing, Severity::High, ); assert_eq!(incident.status, IncidentStatus::Open); } #[tokio::test] async fn test_async_operation() { let result = async_function().await; assert!(result.is_ok()); } } }
Python Unit Tests
import pytest
from tw_ai.agents import TriageAgent
def test_agent_creation():
agent = TriageAgent()
assert agent.model == "claude-sonnet-4-20250514"
@pytest.mark.asyncio
async def test_triage():
agent = TriageAgent()
verdict = await agent.triage(mock_incident)
assert verdict.classification in ["malicious", "benign"]
Integration Tests
API Integration Tests
#![allow(unused)] fn main() { #[tokio::test] async fn test_incident_api() { let app = create_test_app().await; // Create incident let response = app .oneshot( Request::builder() .method("POST") .uri("/api/incidents") .header("Content-Type", "application/json") .body(Body::from(r#"{"type":"phishing"}"#)) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::CREATED); } }
Database Tests
#![allow(unused)] fn main() { #[tokio::test] async fn test_repository() { // Use in-memory SQLite let pool = create_test_pool().await; let repo = SqliteIncidentRepository::new(pool); let incident = repo.create(&new_incident).await.unwrap(); let found = repo.get(incident.id).await.unwrap(); assert_eq!(found.unwrap().id, incident.id); } }
Test Fixtures
Rust Fixtures
#![allow(unused)] fn main() { // tests/fixtures.rs pub fn mock_incident() -> Incident { Incident { id: Uuid::new_v4(), incident_type: IncidentType::Phishing, severity: Severity::High, status: IncidentStatus::Open, raw_data: json!({"subject": "Test"}), ..Default::default() } } }
Python Fixtures
# tests/conftest.py
import pytest
@pytest.fixture
def mock_incident():
return {
"id": "test-123",
"type": "phishing",
"severity": "high",
"raw_data": {"subject": "Test Email"}
}
@pytest.fixture
def mock_connector():
return MockThreatIntelConnector()
Mocking
Rust Mocking
#![allow(unused)] fn main() { use mockall::mock; mock! { ThreatIntelConnector {} #[async_trait] impl ThreatIntelConnector for ThreatIntelConnector { async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport>; } } #[tokio::test] async fn test_with_mock() { let mut mock = MockThreatIntelConnector::new(); mock.expect_lookup_hash() .returning(|_| Ok(ThreatReport::clean())); let result = function_using_connector(&mock).await; assert!(result.is_ok()); } }
Python Mocking
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
with patch("tw_ai.agents.tools.lookup_hash") as mock:
mock.return_value = {"malicious": False}
agent = TriageAgent()
verdict = await agent.triage(mock_incident)
mock.assert_called_once()
Test Coverage
Rust Coverage
cargo install cargo-tarpaulin
cargo tarpaulin --out Html
Python Coverage
cd python
uv run pytest --cov=tw_ai --cov-report=html
CI Testing
GitHub Actions runs tests on every PR:
# .github/workflows/test.yml
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo test
- run: cargo clippy -- -D warnings
Test Data
Evaluation Test Cases
Test cases for AI triage evaluation:
# python/tw_ai/evaluation/test_cases/phishing.yaml
- name: obvious_phishing
input:
sender: "security@fake-bank.com"
subject: "Urgent: Verify Account"
urls: ["https://phishing-site.com/login"]
auth_results: {spf: fail, dkim: fail}
expected:
classification: malicious
min_confidence: 0.8
Run evaluation:
cd python
uv run pytest tests/test_evaluation.py
Adding Connectors
Guide to implementing new connectors.
Connector Architecture
Connectors follow a trait-based pattern:
Connector Trait (base)
│
├── ThreatIntelConnector
├── SIEMConnector
├── EDRConnector
├── EmailGatewayConnector
└── TicketingConnector
Implementing a Connector
1. Create the File
touch crates/tw-connectors/src/threat_intel/my_provider.rs
2. Implement Base Trait
#![allow(unused)] fn main() { use crate::traits::{Connector, ConnectorError, ConnectorHealth, ConnectorResult}; use async_trait::async_trait; pub struct MyProviderConnector { client: reqwest::Client, api_key: String, base_url: String, } impl MyProviderConnector { pub fn new(api_key: String) -> Result<Self, ConnectorError> { let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() .map_err(|e| ConnectorError::Configuration(e.to_string()))?; Ok(Self { client, api_key, base_url: "https://api.myprovider.com".to_string(), }) } } #[async_trait] impl Connector for MyProviderConnector { fn name(&self) -> &str { "my_provider" } fn connector_type(&self) -> &str { "threat_intel" } async fn health_check(&self) -> ConnectorResult<ConnectorHealth> { let response = self.client .get(format!("{}/health", self.base_url)) .header("Authorization", format!("Bearer {}", self.api_key)) .send() .await .map_err(|e| ConnectorError::NetworkError(e.to_string()))?; if response.status().is_success() { Ok(ConnectorHealth::Healthy) } else { Ok(ConnectorHealth::Unhealthy { message: "Health check failed".to_string(), }) } } async fn test_connection(&self) -> ConnectorResult<bool> { match self.health_check().await? { ConnectorHealth::Healthy => Ok(true), _ => Ok(false), } } } }
3. Implement Specialized Trait
#![allow(unused)] fn main() { use crate::traits::{ThreatIntelConnector, ThreatReport, IndicatorType}; #[async_trait] impl ThreatIntelConnector for MyProviderConnector { async fn lookup_hash(&self, hash: &str) -> ConnectorResult<ThreatReport> { let response = self.client .get(format!("{}/files/{}", self.base_url, hash)) .header("Authorization", format!("Bearer {}", self.api_key)) .send() .await .map_err(|e| ConnectorError::NetworkError(e.to_string()))?; if response.status() == reqwest::StatusCode::NOT_FOUND { return Ok(ThreatReport { indicator: hash.to_string(), indicator_type: IndicatorType::FileHash, malicious: false, confidence: 0.0, categories: vec![], first_seen: None, last_seen: None, sources: vec![], }); } let data: ApiResponse = response.json().await .map_err(|e| ConnectorError::InvalidResponse(e.to_string()))?; Ok(self.convert_response(data)) } async fn lookup_url(&self, url: &str) -> ConnectorResult<ThreatReport> { // Similar implementation todo!() } async fn lookup_domain(&self, domain: &str) -> ConnectorResult<ThreatReport> { // Similar implementation todo!() } async fn lookup_ip(&self, ip: &str) -> ConnectorResult<ThreatReport> { // Similar implementation todo!() } } }
4. Add to Module
#![allow(unused)] fn main() { // crates/tw-connectors/src/threat_intel/mod.rs mod my_provider; pub use my_provider::MyProviderConnector; }
5. Register in Bridge
#![allow(unused)] fn main() { // tw-bridge/src/lib.rs impl ThreatIntelBridge { pub fn new(mode: &str) -> PyResult<Self> { let connector: Arc<dyn ThreatIntelConnector + Send + Sync> = match mode { "virustotal" => Arc::new(VirusTotalConnector::new( std::env::var("TW_VIRUSTOTAL_API_KEY") .map_err(|_| PyErr::new::<pyo3::exceptions::PyValueError, _>( "TW_VIRUSTOTAL_API_KEY not set" ))? )?), "my_provider" => Arc::new(MyProviderConnector::new( std::env::var("TW_MY_PROVIDER_API_KEY") .map_err(|_| PyErr::new::<pyo3::exceptions::PyValueError, _>( "TW_MY_PROVIDER_API_KEY not set" ))? )?), _ => Arc::new(MockThreatIntelConnector::new("mock")), }; Ok(Self { connector }) } } }
Error Handling
Use appropriate error types:
#![allow(unused)] fn main() { pub enum ConnectorError { /// Configuration issue Configuration(String), /// Network/connection error NetworkError(String), /// Authentication failed AuthenticationFailed(String), /// Resource not found NotFound(String), /// Rate limited RateLimited { retry_after: Option<Duration> }, /// Invalid response from service InvalidResponse(String), /// Request failed RequestFailed(String), } }
Rate Limiting
Implement rate limiting in your connector:
#![allow(unused)] fn main() { use governor::{Quota, RateLimiter}; pub struct MyProviderConnector { client: reqwest::Client, api_key: String, rate_limiter: RateLimiter<...>, } impl MyProviderConnector { async fn make_request(&self, url: &str) -> ConnectorResult<Response> { self.rate_limiter.until_ready().await; self.client.get(url) .header("Authorization", format!("Bearer {}", self.api_key)) .send() .await .map_err(|e| ConnectorError::NetworkError(e.to_string())) } } }
Testing
Unit Tests
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; use wiremock::{MockServer, Mock, ResponseTemplate}; use wiremock::matchers::{method, path}; #[tokio::test] async fn test_lookup_hash() { let mock_server = MockServer::start().await; Mock::given(method("GET")) .and(path("/files/abc123")) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "malicious": true, "confidence": 0.95 }))) .mount(&mock_server) .await; let connector = MyProviderConnector::with_base_url( "test-key".to_string(), mock_server.uri(), ); let result = connector.lookup_hash("abc123").await.unwrap(); assert!(result.malicious); } } }
Documentation
Document your connector:
#![allow(unused)] fn main() { //! MyProvider threat intelligence connector. //! //! # Configuration //! //! Set `TW_MY_PROVIDER_API_KEY` environment variable. //! //! # Example //! //! ```rust //! let connector = MyProviderConnector::new(api_key)?; //! let report = connector.lookup_hash("abc123").await?; //! ``` }
Adding Actions
Guide to implementing new action handlers.
Action Architecture
Actions implement the Action trait:
#![allow(unused)] fn main() { #[async_trait] pub trait Action: Send + Sync { fn name(&self) -> &str; fn description(&self) -> &str; fn required_parameters(&self) -> Vec<ParameterDef>; fn supports_rollback(&self) -> bool; async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError>; async fn rollback(&self, context: ActionContext) -> Result<ActionResult, ActionError> { Err(ActionError::RollbackNotSupported) } } }
Implementing an Action
1. Create the File
touch crates/tw-actions/src/my_action.rs
2. Define the Action
#![allow(unused)] fn main() { use crate::registry::{ Action, ActionContext, ActionError, ActionResult, ParameterDef, ParameterType, }; use async_trait::async_trait; use chrono::Utc; use std::collections::HashMap; use tracing::{info, instrument}; /// My custom action handler. pub struct MyAction; impl MyAction { pub fn new() -> Self { Self } } impl Default for MyAction { fn default() -> Self { Self::new() } } #[async_trait] impl Action for MyAction { fn name(&self) -> &str { "my_action" } fn description(&self) -> &str { "Description of what this action does" } fn required_parameters(&self) -> Vec<ParameterDef> { vec![ ParameterDef::required( "target", "The target of the action", ParameterType::String, ), ParameterDef::optional( "force", "Force the action even if conditions aren't met", ParameterType::Boolean, serde_json::json!(false), ), ] } fn supports_rollback(&self) -> bool { true } #[instrument(skip(self, context))] async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError> { let started_at = Utc::now(); // Get required parameter let target = context.require_string("target")?; // Get optional parameter with default let force = context .get_param("force") .and_then(|v| v.as_bool()) .unwrap_or(false); info!("Executing my_action on target: {}", target); // Perform the action // ... // Build output let mut output = HashMap::new(); output.insert("target".to_string(), serde_json::json!(target)); output.insert("success".to_string(), serde_json::json!(true)); Ok(ActionResult::success( self.name(), &format!("Action completed on {}", target), started_at, output, )) } async fn rollback(&self, context: ActionContext) -> Result<ActionResult, ActionError> { let started_at = Utc::now(); let target = context.require_string("target")?; info!("Rolling back my_action on target: {}", target); // Perform rollback // ... let mut output = HashMap::new(); output.insert("target".to_string(), serde_json::json!(target)); Ok(ActionResult::success( &format!("{}_rollback", self.name()), &format!("Rollback completed on {}", target), started_at, output, )) } } }
3. Add to Module
#![allow(unused)] fn main() { // crates/tw-actions/src/lib.rs mod my_action; pub use my_action::MyAction; }
4. Register in Registry
#![allow(unused)] fn main() { // crates/tw-actions/src/registry.rs impl ActionRegistry { pub fn new() -> Self { let mut registry = Self { actions: HashMap::new(), }; // Register built-in actions registry.register(Box::new(QuarantineEmailAction::new())); registry.register(Box::new(BlockSenderAction::new())); registry.register(Box::new(MyAction::new())); // Add here registry } } }
Parameter Types
Available parameter types:
#![allow(unused)] fn main() { pub enum ParameterType { String, Integer, Float, Boolean, List, Object, } }
Define parameters:
#![allow(unused)] fn main() { fn required_parameters(&self) -> Vec<ParameterDef> { vec![ ParameterDef::required("name", "Description", ParameterType::String), ParameterDef::optional("count", "Description", ParameterType::Integer, json!(10)), ParameterDef::optional("tags", "Description", ParameterType::List, json!([])), ] } }
Using Connectors
Actions can use connectors via dependency injection:
#![allow(unused)] fn main() { pub struct MyAction { connector: Arc<dyn MyConnector + Send + Sync>, } impl MyAction { pub fn new(connector: Arc<dyn MyConnector + Send + Sync>) -> Self { Self { connector } } } #[async_trait] impl Action for MyAction { async fn execute(&self, context: ActionContext) -> Result<ActionResult, ActionError> { // Use connector let result = self.connector.do_something().await .map_err(|e| ActionError::ExecutionFailed(e.to_string()))?; // ... } } }
Error Handling
Use appropriate error types:
#![allow(unused)] fn main() { pub enum ActionError { /// Missing or invalid parameters InvalidParameters(String), /// Execution failed ExecutionFailed(String), /// Action timed out Timeout, /// Rollback not supported RollbackNotSupported, /// Policy denied the action PolicyDenied(String), } }
Testing
Unit Tests
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; use uuid::Uuid; #[tokio::test] async fn test_my_action_success() { let action = MyAction::new(); let context = ActionContext::new(Uuid::new_v4()) .with_param("target", serde_json::json!("test-target")); let result = action.execute(context).await.unwrap(); assert!(result.success); assert_eq!(result.output["target"], "test-target"); } #[tokio::test] async fn test_my_action_missing_param() { let action = MyAction::new(); let context = ActionContext::new(Uuid::new_v4()); let result = action.execute(context).await; assert!(matches!(result, Err(ActionError::InvalidParameters(_)))); } #[tokio::test] async fn test_my_action_rollback() { let action = MyAction::new(); assert!(action.supports_rollback()); let context = ActionContext::new(Uuid::new_v4()) .with_param("target", serde_json::json!("test-target")); let result = action.rollback(context).await.unwrap(); assert!(result.success); } } }
Policy Integration
Actions are automatically evaluated by the policy engine. Configure default approval:
# Default policy for new action
[[policy.rules]]
name = "my_action_default"
action = "my_action"
approval_level = "analyst"
Documentation
Document your action:
#![allow(unused)] fn main() { //! My custom action. //! //! This action performs X on target Y. //! //! # Parameters //! //! - `target` (required): The target to act on //! - `force` (optional): Force execution (default: false) //! //! # Example //! //! ```yaml //! - action: my_action //! parameters: //! target: "example" //! force: true //! ``` //! //! # Rollback //! //! This action supports rollback via `my_action_rollback`. }
Changelog
All notable changes to Triage Warden.
[Unreleased]
Added
- AI-powered triage agent with Claude integration
- Configurable playbooks for automated investigation
- Policy engine with approval workflows
- Connector framework for external integrations
- Web dashboard with HTMX
- REST API for programmatic access
- CLI for command-line operations
Connectors
- VirusTotal threat intelligence
- Splunk SIEM integration
- CrowdStrike EDR integration
- Microsoft 365 email gateway
- Jira ticketing integration
Actions
- Email: parse_email, check_email_authentication, quarantine_email, block_sender
- Lookup: lookup_sender_reputation, lookup_urls, lookup_attachments
- Host: isolate_host, scan_host
- Notification: notify_user, escalate, create_ticket
[0.1.0] - 2024-01-15
Added
- Initial release
- Core incident management
- Basic web interface
- SQLite database support
- Mock connectors for development
Version Numbering
This project follows Semantic Versioning:
- MAJOR: Incompatible API changes
- MINOR: Backwards-compatible new features
- PATCH: Backwards-compatible bug fixes
Upgrade Guide
From 0.x to 1.0
When 1.0 is released, an upgrade guide will be provided here.