Threat Detection Pipeline: Architecture & Code Skeletons
Threat Detection Pipeline — Architecture,
Directory Structure & Code Skeletons
This document contains a complete architecture diagram (Mermaid), recommended directory layout, and starter code/config skeletons for Zeek, Logstash, Elasticsearch, Kibana, and the Python-based anomaly scoring engine.
1. Architecture Overview
flowchart LR subgraph Network Net[Network / SPAN Mirror] end subgraph Sensors Zeek[Zeek Sensor] Hosts[Endpoints: Wazuh / Osquery / Sysmon] end subgraph Ingest Filebeat[Filebeat] Logstash[Logstash Pipelines] Kafka[Optional: Kafka] end subgraph Storage ES[Elasticsearch] end subgraph Analytics Kibana[Kibana + SIEM] PyEngine[Python Anomaly Scoring] ML[Elastic ML (optional)] end subgraph Response SOAR[SOAR / Webhook] Notif[Slack / Email] Firewall[Firewall API] end
Net --> Zeek
Net --> Hosts
Zeek --> Filebeat --> Logstash --> ES Hosts --> Filebeat --> Logstash --> ES Logstash --> Kafka --> ES ES --> Kibana
1
ES --> PyEngine PyEngine --> ES Kibana --> SOAR PyEngine --> SOAR SOAR --> Firewall SOAR --> Notif
2. High-level Components & Responsibilities
Zeek: network traffic parsing, extraction of conn, http, dns , ssl , weird, files, notice logs. Endpoint Agents: Wazuh/OSQuery/Sysmon -> collect processes, auth events, file events. Filebeat: forwarder for Zeek/host logs to Logstash or Elasticsearch. Logstash: parsing, normalization, enrichment (geoip, threat intel), and indexing. Elasticsearch: storage, indexing, queries, and ML jobs. Kibana: dashboards, detection rules, timelion/visualizations. Python Anomaly Engine: score aggregated entities, push threat-scores index. SOAR : execute playbooks (block IP, quarantine host, notify analysts).
3. Recommended Directory Structure
threat-pipeline/ ├── infra/ │ ├── docker-compose.yml │ └── terraform/ (optional cloud infra) ├── zeek/ │ ├── scripts/ │ │ └── custom-dns-logger.zeek │ └── configs/ ├── agents/ │ ├── filebeat/ │ │ └── filebeat.yml │ └── wazuh/ ├── logstash/ │ ├── pipelines/ │ │ ├── zeek.conf │ │ └── hosts.conf │ └── config/logstash.yml ├── elastic/ │ ├── index_templates/ │ │ └── zeek-template.json │ └── ilm/policy.json
•
•
•
•
•
•
•
•
2
├── python_engine/ │ ├── requirements.txt │ ├── scorer.py │ ├── es_client.py │ └── jobs/ ├── kibana/ │ └── dashboards/ (exported ndjson) ├── playbooks/ │ ├── block_ip.md │ └── quarantine_host.md └── docs/ └── architecture.md
4. Zeek: Minimal Custom Script (scripts/custom-dns-logger.zeek)
custom-dns-logger.zeek
@load base/protocols/dns eventdns_request(c: connection, msg: dns_msg)
{
Example: print enriched DNS info to zeek log
if( msg?$qname)
{
localq = msg$qname; printfmt("%s\t%s\t%s\n", c$id$orig_h, c$id$resp_h, q);
}
}
Save Zeek logs to /opt/zeek/logs/current/ and ensure Filebeat picks them up.
5. Filebeat (agents/filebeat/filebeat.yml)
filebeat.inputs:
- type: log enabled: true paths:
- /opt/zeek/logs/current/*.log
fields: source_type: zeek
3
output.logstash: hosts: ["logstash:5044"]
6. Logstash Pipelines (logstash/pipelines/zeek.conf)
input { beats { port => 5044
}
}
filter { if [fields][source_type] == "zeek" {
Zeek logs are tab-separated with header in first line; use csv
csv { separator => "\t" autogenerate_column_names => true skip_empty_columns => true target => "zeek"
}
mutate { rename => { "zeek_1" => "ts" } convert => { "zeek_2" => "ip" }
}
GeoIP enrichment
geoip { source => "ip" target => "geoip"
}
Threat intel enrichment - example: lookup using translate/filter or custom
enrich
Add MITRE mapping tags if patterns match
}
}
output { elasticsearch { hosts => ["http://elasticsearch:9200"] index => "zeek-%{+YYYY.MM.dd}"
}
}
Note: Real-world Zeek parsing often requires parsing header line, using grok, or Zeek-to- JSON output modules. This is a simplified skeleton.
4
7. Elasticsearch Index Template (elastic/index_templates/zeek-
template.json)
{
"index_patterns": ["zeek-*"],
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"ts": { "type":"date"},
"ip": { "type":"ip"},
"geoip": { "properties": { "location": { "type":"geo_point"} } }
}
}
}
8. Python Anomaly Scoring Engine
8.1 requirements.txt elasticsearch>=7.17.0 pandas scikit-learn numpy requests python-dateutil 8.2 es_client.py fromelasticsearchimportElasticsearch ES_HOST='http://localhost:9200' es = Elasticsearch(ES_HOST) defsearch(index, body): returnes.search(index=index, body=body, size=10000) defbulk_index(index, docs): actions= []
5
fordocindocs: actions.append({"index": {"_index": index}}) actions.append(doc) fromelasticsearchimporthelpers helpers.bulk(es, actions) 8.3 scorer.py (core logic)
"""
Lightweight anomaly scorer:
- Aggregates connections by (src_ip, dest_ip, dest_port, host)
- Computes features like count, avg_duration, bytes
- Uses IsolationForest to produce anomaly scores
- Pushes results to
threat-scores-YYYY.MM.ddindex
"""
importtime importpandasaspd importnumpyasnp fromsklearn.ensembleimportIsolationForest fromes_clientimportsearch, bulk_index
Query sample: last 15 minutes
BODY= {
"query": {
"range": {
"ts": {"gte":"now-15m"}
}
},
"size": 10000
}
res= search('zeek-*', BODY) hits= res['hits']['hits'] ifnothits: print('No data') exit(0) rows= [h['_source']forhinhits] df = pd.DataFrame(rows)
Feature engineering (example - adapt to your fields)
df['bytes'] = pd.to_numeric(df.get('orig_bytes', 0)) + pd.to_numeric(df.get('resp_bytes', 0))
6
agg= df.groupby('ip').agg({
'bytes': ['sum','mean','max'],
'ts':'count'
})
agg.columns= ['bytes_sum','bytes_mean','bytes_max','conn_count']
Fill and normalize
agg= agg.fillna(0) model= IsolationForest(contamination=0.02, random_state=42) model.fit(agg) scores= model.decision_function(agg)# higher -> more normal anomaly_score= (1 - (scores- scores.min())/(scores.max()-scores.min()))*100
Prepare docs
now= int(time.time()*1000) docs= [] forip, scinzip(agg.index, anomaly_score): docs.append({ 'ip': ip, 'anomaly_score': float(sc), 'bytes_sum': int(agg.loc[ip,'bytes_sum']), 'conn_count': int(agg.loc[ip,'conn_count']), '@timestamp': now
})
bulk_index('threat-scores', docs) print('Pushed', len(docs),'scores') Notes: - This is a starter; production should use streaming, pagination, error handling, model persistency, and tuning for contamination and features.
9. Kibana: Visuals & Detection Rules
Create visualizations for: Top anomalous IPs (from threat-scores). Anomaly score timeline. DNS request heatmap. Suspicious user-agent list. Create detection rules in Kibana to alert when: anomaly_score >= 80 Known IOC matches from threat intel fields
•
•
•
•
•
•
•
•
7
Sudden spikes in conn_count or bytes_sum for single IP Export dashboards as ndjson to kibana/dashboards for version control.
10. Playbook Template (playbooks/block_ip.md)
Playbook: Block Malicious IP
Trigger: Anomaly score >= 90 OR IOC match
Steps:
- Validate: Check
threat-scoresandzeek-*for related events (last 1 hour) - Enrich: Query threat intel (VirusTotal/OTX) for the IP
- Contain: Add IP to firewall blocklist via API
- Notify: Post message to #soc-alerts with evidence
- Investigate: Run endpoint hunt for lateral movement
- Document: Create incident ticket with timeline and artifacts
11. Docker Compose (infra/docker-compose.yml) — Minimal Dev
Setup
version:'3.7' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.13 environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms1g -Xmx1g ports:
-"9200:9200"
kibana: image: docker.elastic.co/kibana/kibana:7.17.13 ports:
-"5601:5601"
depends_on:
- elasticsearch logstash: image: docker.elastic.co/logstash/logstash:7.17.13 volumes:
- ./logstash/pipelines:/usr/share/logstash/pipeline
ports:
-"5044:5044"
•
8
filebeat: image: docker.elastic.co/beats/filebeat:7.17.13 volumes:
- ./agents/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
depends_on:
- logstash
12. Testing & Validation
Synthetic Traffic: Use scapy or tcpreplay to replay pcap traffic containing C2-like beaconing, DNS tunneling, and HTTP exfil patterns. IOC Injection: Add known malicious IPs/domains into threat intel feed files and confirm enrichment and alerting. Performance: Measure ingestion latency and index sizing. Add ILM policies for retention.
13. Next Steps & Hardening
Add authentication, TLS between components. Configure role-based access controls (RBAC) in Kibana. Persist ML models and provide retraining automation. Add Kafka or Redis buffer for high throughput. Implement unit tests for Python engine and CI/CD.
14. Appendix — Helpful Commands
Start Zeek:
zeekctldeploy
Test Filebeat -> Logstash:
filebeattestoutput Run Python scorer (dev): python3scorer.py
1.
2.
3.
•
•
•
•
•
•
•
•
9
If you'd like, I can now: - Generate full code files for each skeleton (ready to copy). - Produce a Kibana ndjson export for baseline dashboards. - Build a Terraform + Ansible playbook for provisioning. Tell me which of the above you want next.
10
Faisal
Cybersecurity graduate focused on SOC operations, threat intelligence, and defensive security. Writing practical, no-BS explanations based on real learning and hands-on analysis.