Faisal-writeups

Threat Detection Pipeline: Architecture & Code Skeletons

2025-01-06 · Faisal-writeups

Threat Detection Pipeline — Architecture,

Directory Structure & Code Skeletons

This document contains a complete architecture diagram (Mermaid), recommended directory layout, and starter code/config skeletons for Zeek, Logstash, Elasticsearch, Kibana, and the Python-based anomaly scoring engine.

1. Architecture Overview

flowchart LR subgraph Network Net[Network / SPAN Mirror] end subgraph Sensors Zeek[Zeek Sensor] Hosts[Endpoints: Wazuh / Osquery / Sysmon] end subgraph Ingest Filebeat[Filebeat] Logstash[Logstash Pipelines] Kafka[Optional: Kafka] end subgraph Storage ES[Elasticsearch] end subgraph Analytics Kibana[Kibana + SIEM] PyEngine[Python Anomaly Scoring] ML[Elastic ML (optional)] end subgraph Response SOAR[SOAR / Webhook] Notif[Slack / Email] Firewall[Firewall API] end

Net --> Zeek

Net --> Hosts

Zeek --> Filebeat --> Logstash --> ES Hosts --> Filebeat --> Logstash --> ES Logstash --> Kafka --> ES ES --> Kibana

1

ES --> PyEngine PyEngine --> ES Kibana --> SOAR PyEngine --> SOAR SOAR --> Firewall SOAR --> Notif

2. High-level Components & Responsibilities

Zeek: network traffic parsing, extraction of conn, http, dns , ssl , weird, files, notice logs. Endpoint Agents: Wazuh/OSQuery/Sysmon -> collect processes, auth events, file events. Filebeat: forwarder for Zeek/host logs to Logstash or Elasticsearch. Logstash: parsing, normalization, enrichment (geoip, threat intel), and indexing. Elasticsearch: storage, indexing, queries, and ML jobs. Kibana: dashboards, detection rules, timelion/visualizations. Python Anomaly Engine: score aggregated entities, push threat-scores index. SOAR : execute playbooks (block IP, quarantine host, notify analysts).

3. Recommended Directory Structure

threat-pipeline/ ├── infra/ │ ├── docker-compose.yml │ └── terraform/ (optional cloud infra) ├── zeek/ │ ├── scripts/ │ │ └── custom-dns-logger.zeek │ └── configs/ ├── agents/ │ ├── filebeat/ │ │ └── filebeat.yml │ └── wazuh/ ├── logstash/ │ ├── pipelines/ │ │ ├── zeek.conf │ │ └── hosts.conf │ └── config/logstash.yml ├── elastic/ │ ├── index_templates/ │ │ └── zeek-template.json │ └── ilm/policy.json

2

├── python_engine/ │ ├── requirements.txt │ ├── scorer.py │ ├── es_client.py │ └── jobs/ ├── kibana/ │ └── dashboards/ (exported ndjson) ├── playbooks/ │ ├── block_ip.md │ └── quarantine_host.md └── docs/ └── architecture.md

4. Zeek: Minimal Custom Script (scripts/custom-dns-logger.zeek)

custom-dns-logger.zeek

@load base/protocols/dns eventdns_request(c: connection, msg: dns_msg)

{

Example: print enriched DNS info to zeek log

if( msg?$qname)

{

localq = msg$qname; printfmt("%s\t%s\t%s\n", c$id$orig_h, c$id$resp_h, q);

}

}

Save Zeek logs to /opt/zeek/logs/current/ and ensure Filebeat picks them up.

5. Filebeat (agents/filebeat/filebeat.yml)

filebeat.inputs:

  • type: log enabled: true paths:

- /opt/zeek/logs/current/*.log

fields: source_type: zeek

3

output.logstash: hosts: ["logstash:5044"]

6. Logstash Pipelines (logstash/pipelines/zeek.conf)

input { beats { port => 5044

}

}

filter { if [fields][source_type] == "zeek" {

Zeek logs are tab-separated with header in first line; use csv

csv { separator => "\t" autogenerate_column_names => true skip_empty_columns => true target => "zeek"

}

mutate { rename => { "zeek_1" => "ts" } convert => { "zeek_2" => "ip" }

}

GeoIP enrichment

geoip { source => "ip" target => "geoip"

}

Threat intel enrichment - example: lookup using translate/filter or custom

enrich

Add MITRE mapping tags if patterns match

}

}

output { elasticsearch { hosts => ["http://elasticsearch:9200"] index => "zeek-%{+YYYY.MM.dd}"

}

}

Note: Real-world Zeek parsing often requires parsing header line, using grok, or Zeek-to- JSON output modules. This is a simplified skeleton.

4

7. Elasticsearch Index Template (elastic/index_templates/zeek-

template.json)

{

"index_patterns": ["zeek-*"],

"settings": {

"number_of_shards": 1

},

"mappings": {

"properties": {

"ts": { "type":"date"},

"ip": { "type":"ip"},

"geoip": { "properties": { "location": { "type":"geo_point"} } }

}

}

}

8. Python Anomaly Scoring Engine

8.1 requirements.txt elasticsearch>=7.17.0 pandas scikit-learn numpy requests python-dateutil 8.2 es_client.py fromelasticsearchimportElasticsearch ES_HOST='http://localhost:9200' es = Elasticsearch(ES_HOST) defsearch(index, body): returnes.search(index=index, body=body, size=10000) defbulk_index(index, docs): actions= []

5

fordocindocs: actions.append({"index": {"_index": index}}) actions.append(doc) fromelasticsearchimporthelpers helpers.bulk(es, actions) 8.3 scorer.py (core logic)

"""

Lightweight anomaly scorer:

  • Aggregates connections by (src_ip, dest_ip, dest_port, host)
  • Computes features like count, avg_duration, bytes
  • Uses IsolationForest to produce anomaly scores
  • Pushes results to threat-scores-YYYY.MM.dd index

"""

importtime importpandasaspd importnumpyasnp fromsklearn.ensembleimportIsolationForest fromes_clientimportsearch, bulk_index

Query sample: last 15 minutes

BODY= {

"query": {

"range": {

"ts": {"gte":"now-15m"}

}

},

"size": 10000

}

res= search('zeek-*', BODY) hits= res['hits']['hits'] ifnothits: print('No data') exit(0) rows= [h['_source']forhinhits] df = pd.DataFrame(rows)

Feature engineering (example - adapt to your fields)

df['bytes'] = pd.to_numeric(df.get('orig_bytes', 0)) + pd.to_numeric(df.get('resp_bytes', 0))

6

agg= df.groupby('ip').agg({

'bytes': ['sum','mean','max'],

'ts':'count'

})

agg.columns= ['bytes_sum','bytes_mean','bytes_max','conn_count']

Fill and normalize

agg= agg.fillna(0) model= IsolationForest(contamination=0.02, random_state=42) model.fit(agg) scores= model.decision_function(agg)# higher -> more normal anomaly_score= (1 - (scores- scores.min())/(scores.max()-scores.min()))*100

Prepare docs

now= int(time.time()*1000) docs= [] forip, scinzip(agg.index, anomaly_score): docs.append({ 'ip': ip, 'anomaly_score': float(sc), 'bytes_sum': int(agg.loc[ip,'bytes_sum']), 'conn_count': int(agg.loc[ip,'conn_count']), '@timestamp': now

})

bulk_index('threat-scores', docs) print('Pushed', len(docs),'scores') Notes: - This is a starter; production should use streaming, pagination, error handling, model persistency, and tuning for contamination and features.

9. Kibana: Visuals & Detection Rules

Create visualizations for: Top anomalous IPs (from threat-scores). Anomaly score timeline. DNS request heatmap. Suspicious user-agent list. Create detection rules in Kibana to alert when: anomaly_score >= 80 Known IOC matches from threat intel fields

7

Sudden spikes in conn_count or bytes_sum for single IP Export dashboards as ndjson to kibana/dashboards for version control.

10. Playbook Template (playbooks/block_ip.md)

Playbook: Block Malicious IP

Trigger: Anomaly score >= 90 OR IOC match

Steps:

  • Validate: Check threat-scores and zeek-* for related events (last 1 hour)
  • Enrich: Query threat intel (VirusTotal/OTX) for the IP
  • Contain: Add IP to firewall blocklist via API
  • Notify: Post message to #soc-alerts with evidence
  • Investigate: Run endpoint hunt for lateral movement
  • Document: Create incident ticket with timeline and artifacts

11. Docker Compose (infra/docker-compose.yml) — Minimal Dev

Setup

version:'3.7' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.13 environment:

  • discovery.type=single-node
  • ES_JAVA_OPTS=-Xms1g -Xmx1g ports:

-"9200:9200"

kibana: image: docker.elastic.co/kibana/kibana:7.17.13 ports:

-"5601:5601"

depends_on:

  • elasticsearch logstash: image: docker.elastic.co/logstash/logstash:7.17.13 volumes:

- ./logstash/pipelines:/usr/share/logstash/pipeline

ports:

-"5044:5044"

8

filebeat: image: docker.elastic.co/beats/filebeat:7.17.13 volumes:

- ./agents/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml

depends_on:

  • logstash

12. Testing & Validation

Synthetic Traffic: Use scapy or tcpreplay to replay pcap traffic containing C2-like beaconing, DNS tunneling, and HTTP exfil patterns. IOC Injection: Add known malicious IPs/domains into threat intel feed files and confirm enrichment and alerting. Performance: Measure ingestion latency and index sizing. Add ILM policies for retention.

13. Next Steps & Hardening

Add authentication, TLS between components. Configure role-based access controls (RBAC) in Kibana. Persist ML models and provide retraining automation. Add Kafka or Redis buffer for high throughput. Implement unit tests for Python engine and CI/CD.

14. Appendix — Helpful Commands

Start Zeek:

zeekctldeploy

Test Filebeat -> Logstash:

filebeattestoutput Run Python scorer (dev): python3scorer.py

1.

2.

3.

9

If you'd like, I can now: - Generate full code files for each skeleton (ready to copy). - Produce a Kibana ndjson export for baseline dashboards. - Build a Terraform + Ansible playbook for provisioning. Tell me which of the above you want next.

10


F

Faisal

Cybersecurity graduate focused on SOC operations, threat intelligence, and defensive security. Writing practical, no-BS explanations based on real learning and hands-on analysis.