Autonomous Tier 1 Phishing Triage Pipeline
Executive Summary
Phishing remains the primary initial access vector for enterprise breaches. Typically, a Tier 1 analyst in a Security Operations Center (SOC) checks sender reputation, extracts URLs, queries threat intel, and logs a verdict. This process takes 10 to 20 minutes for each alert. At high volumes, it becomes hard to manage, leading to inconsistent results from analyst fatigue.
This project automates that workflow with a new pipeline. A LangGraph ReAct agent monitors the local inbox. It sends phishing emails to a specialized MCP tool server for IOC extraction and live VirusTotal checks. Then, it creates structured JSON verdicts directly in a Splunk SIEM. This method ensures a consistent decision in under a minute for each email. Each log event includes a correlation ID for easy investigation later.
Target reduction: Tier 1 manual triage time from ~15 minutes per alert to under 60 seconds, with zero analyst involvement for clear-cut cases.
graph TD subgraph Agent Client A[inbox/*.txt] --> B(LangGraph ReAct Agent) B --> |HTTP Port 8001| C end subgraph FastMCP Server C{Tool Router} --> D[extract_iocs_from_email] C --> E[get_url_reputation] C --> F[get_file_hash_reputation] F -.-> |VT API v3| G[(VirusTotal)] end B --> |JSON Log| H[triage_logs.json] subgraph Splunk Enterprise H --> |Continuous Monitor| I[(phishing_triage index)] I --> J[Threat Distribution Dashboard] I --> K[Recent Triage Decisions Table] end
Dashboard Preview
Single Pane of Glass dashboard showing the Threat Distribution pie chart (malicious vs. benign verdict breakdown) and the Recent Triage Decisions queue, populated from live Splunk queries against the phishing_triage index.
SPL queries used:
# Threat distribution pie chart
index="phishing_triage" event_action="complete_triage_task"
| stats count by threat_verdict
# Recent triage decisions table
index="phishing_triage" event_action="complete_triage_task"
| table _time, source_file, threat_verdict, threat_summary
| sort -_timeProject Structure
C:\phishing-triage\
├── mcp_server.py # FastMCP server exposing 3 triage tools
├── agent_client.py # LangGraph ReAct batch processing agent
├── triage_logs.json # SIEM log file (append-only, Splunk monitored)
├── inbox\ # Drop .txt email files here for processing
├── archive\ # Processed emails moved here automatically
├── .env # API keys (not committed)
├── .gitignore
└── pyproject.toml
Data Flow Summary
- Drop
.txtemail files intoinbox/ - Agent reads each file, invokes MCP tools sequentially via HTTP
- MCP server extracts IOCs (Regex), checks URL reputation, queries VirusTotal for file hashes
- Agent synthesizes tool results into a structured
PhishingAnalysisPydantic model (verdict+summary) python-json-loggerwrites the verdict totriage_logs.jsonwith full correlation metadata- Splunk ingests the log file in real time; the dashboard updates automatically
- Processed email is moved to
archive/
Log Schema
Every triage decision produces a JSON event with the following fields:
| Field | Description |
|---|---|
timestamp | ISO-format event time |
log_level | Severity (INFO, WARNING, ERROR) |
message | Human-readable summary |
agent_id | Static agent identifier |
model_id | LLM used for inference |
principal_user_id | Analyst account that triggered the run |
event_correlation_id | UUID linking all tool calls within one triage session |
event_action | Action type (e.g., complete_triage_task) |
threat_verdict | malicious | suspicious | benign |
threat_summary | Agent-generated prose explanation |
source_file | Originating email filename |
Technical Stack
| Component | Technology |
|---|---|
| Runtime | Python 3.12, uv package manager |
| Agent Framework | LangGraph (create_react_agent) |
| LLM | Google Gemini (gemini-3-flash-preview) via langchain-google-genai |
| MCP Server | FastMCP (streamable-http transport, port 8001) |
| MCP Client | langchain-mcp-adapters (MultiServerMCPClient) |
| Threat Intel | VirusTotal API v3 (requests) |
| Structured Logging | python-json-logger with custom formatter |
| SIEM | Splunk Enterprise (local), phishing_triage index |
Prerequisites
- Python 3.12+
uvpackage manager- A Google AI Studio API key (free tier supported)
- A VirusTotal API key (free tier supported)
- Splunk Enterprise (local install, for SIEM integration)
Setup
1. Clone and initialize
git clone https://github.com/eabboa/phishing-triage.git
cd C:\phishing-triage
uv syncNote: Deploy the project root at
C:\phishing-triage\. Splunk’s service account runs under theSYSTEMuser and cannot access paths underC:\Users\. Any other root path will cause Splunk ingestion to fail with aPath is not readableerror.
2. Configure environment variables
Create a .env file in the project root:
GOOGLE_API_KEY=your_google_ai_studio_key_here
VT_API_KEY=your_virustotal_api_key_hereKey naming:
langchain-google-genairesolvesGOOGLE_API_KEY. The variable nameGEMINI_API_KEYis not recognized by the library and will raise a PydanticValidationErrorat startup.
3. Create the inbox and archive directories
mkdir C:\phishing-triage\inbox
mkdir C:\phishing-triage\archive4. Configure Splunk
- Create a dedicated index:
Settings → Indexes → New Index. Name itphishing_triage. - Add a data input:
Settings → Data Inputs → Files & Directories → New Local File & Directory.- Path:
C:\phishing-triage\triage_logs.json - Monitoring:
Continuously Monitor - Source type:
_json - Index:
phishing_triage
- Path:
- Verify ingestion with:
index="phishing_triage"
Usage
Terminal 1 — Start the MCP server
cd C:\phishing-triage
uv run python mcp_server.pyThe server binds to http://127.0.0.1:8001/mcp. Leave this running.
Terminal 2 — Run the batch triage agent
Drop one or more .txt email files into C:\phishing-triage\inbox\, then run:
cd C:\phishing-triage
uv run python agent_client.pyThe agent processes each file sequentially, prints the verdict to the terminal, appends the full structured event to triage_logs.json, and moves the file to archive/.
Example output (terminal):
Found 2 emails in inbox. Starting batch triage...
Scanning: email1.txt
Verdict: BENIGN. Moved to archive.
Scanning: email2.txt
Verdict: MALICIOUS. Moved to archive.
Batch processing complete.
Example log event (triage_logs.json):
{
"message": "Completed phishing triage task for email2.txt.",
"agent_id": "phishing-triage-agent-01",
"agent_name": "PhishingTriageAssistant",
"model_id": "gemini-3-flash-preview",
"principal_user_id": "soc_analyst_jane_doe",
"event_correlation_id": "220f55bb-edb3-4d9e-bd80-ae0e92d7a7c3",
"event_action": "complete_triage_task",
"threat_verdict": "malicious",
"threat_summary": "The email contains a file hash flagged as malicious malware by 42 security vendors. It also uses social engineering tactics like urgency and claims of suspicious activity.",
"source_file": "email2.txt",
"timestamp": "2026-04-05 15:59:48,533",
"log_level": "INFO"
}Lessons Learned
These are the concrete engineering problems encountered during the build, and how each was resolved.
1. Environment variable naming mismatch
Problem: The .env file was configured with GEMINI_API_KEY, but the langchain-google-genai library resolves only GOOGLE_API_KEY. This produced a Pydantic ValidationError at initialization not a missing key warning making the root cause non-obvious.
Fix: Rename the variable to GOOGLE_API_KEY in .env. The GEMINI_API_KEY name is not supported by the LangChain Google GenAI integration.
2. MultiServerMCPClient async context manager removal
Problem: Using async with client: raised NotImplementedError after upgrading to langchain-mcp-adapters>=0.1.0. The async context manager pattern was deprecated and removed in that release.
Fix: Remove the async with block. Initialize the client, set client.connections directly, and call await client.get_tools() on the plain client object:
client = MultiServerMCPClient()
client.connections = {
MCP_SERVER_NAME: StreamableHttpConnection(
transport="streamable_http",
url="http://localhost:8001/mcp",
session_kwargs={"logging_callback": ...}
)
}
tools = await client.get_tools()3. Port conflict between Splunk Web and FastMCP
Problem: FastMCP’s default port (8000) conflicted with the Splunk Web interface, which also binds to 8000 on a default local install.
Fix: Migrate the MCP transport to port 8001 by modifying the server entrypoint and updating the client connection URL:
# mcp_server.py
mcp.run(transport="streamable-http", port=8001)
# agent_client.py
url="http://localhost:8001/mcp"4. Windows ACL blocking Splunk file ingestion
Problem: Splunk’s data input raised Parameter name: Path is not readable when pointed at C:\Users\enesa\phishing-triage\triage_logs.json. Splunk Enterprise runs its service under the SYSTEM account, which does not have read access to paths under C:\Users\ by default on Windows 11.
Fix: Move the entire project to C:\phishing-triage\. Paths under the drive root are accessible to the SYSTEM user without any ACL modification. Update all hardcoded paths in agent_client.py and reconfigure the Splunk data input to point to the new location.
5. Gemini Free Tier rate limiting (HTTP 429)
Problem: Processing a batch of more than ~5 emails in rapid succession hit the Gemini Free Tier daily quota (GenerateRequestsPerDayPerProjectPerModel-FreeTier, limit: 20 requests). The error surface was a RESOURCE_EXHAUSTED exception mid-batch, leaving remaining emails unprocessed.
Fix: Insert an asyncio.sleep(5) between each email in the batch loop. This keeps throughput within the 15 RPM free tier ceiling and prevents mid-batch failures without requiring a paid API plan for a development environment.
# Free tier rate limit protection (15 RPM max)
await asyncio.sleep(5)For production deployments, replace with a paid Gemini API key and implement proper exponential backoff with jitter.
Future Roadmap
The current pipeline handles commodity phishing, malicious URLs, and known-bad file hashes. The following attack vectors require additional detection logic and are planned for future iterations:
-
QR code phishing (quishing): URLs embedded in images bypass regex-based IOC extraction entirely. Mitigation requires an OCR or multimodal vision step before the IOC extraction tool is invoked.
-
Living-off-the-land payload hosting: Attackers increasingly host payloads on legitimate infrastructure (SharePoint, OneDrive, GitHub). File hash reputation checks will return benign results. Detection requires behavioral sandboxing integration (e.g., VirusTotal file submission, Any.run API) rather than static hash lookup.
-
Attachment parsing: The current pipeline processes email body text only. A future
parse_attachmentMCP tool should handle.pdf,.docx, and.htmlattachment types, extracting embedded IOCs before reputation checks. -
Alerting integration: Wire Splunk alert actions (or a Webhook) to a ticketing system (Jira, TheHive) so that
maliciousverdicts automatically open an incident with the correlation ID pre-populated.