rinoxRinox
pushpythonscheduled✓ IRON 92hand-curated

MISP → Splunk

Push MISP IOC attributes to Splunk HEC hourly

Pulls new IOC attributes from MISP every hour and forwards them to Splunk via HEC as NDJSON. Tracks attribute UUIDs (not event UUIDs) for accurate dedup.

mispsplunkheciocscheduledpushmoderate
# ============================================================
# RINOX INTEGRATION: MISP -> Splunk HEC
# Generated by Rinox (rinox.io)
# Use Case: Hourly IOC attribute forwarding with state tracking
# Language: python
# ============================================================

# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import hashlib
from collections import deque
from datetime import datetime, timezone
from pathlib import Path

import requests

logging.basicConfig(
    level=os.environ.get("LOG_LEVEL", "INFO"),
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.misp_to_splunk")

# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
MISP_URL = os.environ.get("MISP_URL", "").rstrip("/")
MISP_API_KEY = os.environ.get("MISP_API_KEY", "")
SPLUNK_HEC_URL = os.environ.get("SPLUNK_HEC_URL", "").rstrip("/")
SPLUNK_HEC_TOKEN = os.environ.get("SPLUNK_HEC_TOKEN", "")
SPLUNK_INDEX = os.environ.get("SPLUNK_INDEX", "threat_intel")
STATE_PATH = Path(os.environ.get("STATE_PATH", "./misp_splunk_state.json"))
FIFO_CAP = int(os.environ.get("FIFO_CAP", "10000"))

REQUIRED = {
    "MISP_URL": MISP_URL, "MISP_API_KEY": MISP_API_KEY,
    "SPLUNK_HEC_URL": SPLUNK_HEC_URL, "SPLUNK_HEC_TOKEN": SPLUNK_HEC_TOKEN,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
    log.error("Missing env vars: %s", ", ".join(missing))
    sys.exit(1)


def load_state():
    if STATE_PATH.exists():
        with STATE_PATH.open() as f:
            data = json.load(f)
            return {
                "last_timestamp": int(data.get("last_timestamp", 0)),
                "processed_ids": deque(data.get("processed_ids", []), maxlen=FIFO_CAP),
                "processed_lookup": set(data.get("processed_ids", [])),
            }
    return {"last_timestamp": 0, "processed_ids": deque(maxlen=FIFO_CAP), "processed_lookup": set()}


def save_state(state):
    payload = {
        "last_timestamp": int(state["last_timestamp"]),
        "processed_ids": list(state["processed_ids"]),
    }
    tmp = STATE_PATH.with_suffix(".tmp")
    tmp.write_text(json.dumps(payload))
    tmp.replace(STATE_PATH)


# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (MISP)
# ============================================================
MISP_HEADERS = {"Authorization": MISP_API_KEY, "Accept": "application/json", "Content-Type": "application/json"}


def fetch_misp_events(since_ts: int):
    """Bulk fetch with includeAttributeAttachments=false so we get one
    response containing all attributes inline (Iron Phase 3 — no N+1)."""
    body = {
        "returnFormat": "json",
        "timestamp": str(since_ts) if since_ts else "30d",
        "enforceWarninglist": True,
        "includeEventTags": True,
    }
    resp = requests.post(f"{MISP_URL}/events/restSearch", headers=MISP_HEADERS, json=body, timeout=120)
    resp.raise_for_status()
    return resp.json().get("response", [])


# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
def translate(raw_events, processed_lookup):
    """Return (payload, new_cursor, all_seen_ids). State save NEVER
    called inside here — that's main()'s job after delivery succeeds."""
    payload = []
    all_seen_ids = []
    max_ts = 0

    for event_wrapper in raw_events:
        event = event_wrapper.get("Event", {})
        attributes = event.get("Attribute", [])
        for attr in attributes:
            attr_uuid = attr.get("uuid")
            if not attr_uuid:
                continue
            # Track every attribute we evaluated so the cursor can
            # advance even when everything was filtered (Appendix B).
            all_seen_ids.append(attr_uuid)

            # Iron Phase 5: timestamp from the attribute, not the parent
            # event and not the script clock.
            try:
                ts = int(attr.get("timestamp", "0"))
            except (TypeError, ValueError):
                ts = 0
            if ts > max_ts:
                max_ts = ts

            if attr_uuid in processed_lookup:
                continue  # already sent in a prior run

            payload.append({
                "time": ts or int(datetime.now(timezone.utc).timestamp()),
                "host": "misp",
                "source": "rinox-misp-splunk",
                "sourcetype": "misp:ioc",
                "index": SPLUNK_INDEX,
                "event": {
                    "uuid": attr_uuid,
                    "event_id": event.get("id"),
                    "event_uuid": event.get("uuid"),
                    "type": attr.get("type"),
                    "category": attr.get("category"),
                    "value": attr.get("value"),
                    "comment": attr.get("comment", ""),
                    "to_ids": attr.get("to_ids", False),
                    "tags": [t.get("name") for t in (event.get("Tag") or [])],
                },
            })

    return payload, max_ts, all_seen_ids


# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (Splunk HEC)
# ============================================================
def deliver(events):
    """NDJSON to HEC. Iron Phase 4: one event per line, no JSON array."""
    if not events:
        return True
    headers = {"Authorization": f"Splunk {SPLUNK_HEC_TOKEN}", "Content-Type": "application/json"}
    ndjson = "\n".join(json.dumps(e) for e in events)
    resp = requests.post(f"{SPLUNK_HEC_URL}/services/collector/event", headers=headers, data=ndjson, timeout=60)
    if resp.status_code >= 300:
        log.error("HEC delivery failed: %s %s", resp.status_code, resp.text[:200])
        return False
    try:
        body = resp.json()
        if body.get("code") != 0:
            log.error("HEC reported error: %s", body)
            return False
    except json.JSONDecodeError:
        pass
    return True


# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = hashlib.sha256(str(datetime.now(timezone.utc).timestamp()).encode()).hexdigest()[:12]
log.info("Rinox run started: id=%s", RINOX_RUN_ID)


# ============================================================
# SECTION 7: MAIN ORCHESTRATOR
# ============================================================
def main():
    state = load_state()
    log.info("Fetching MISP events since ts=%s", state["last_timestamp"])

    try:
        raw = fetch_misp_events(state["last_timestamp"])
    except requests.RequestException as e:
        log.error("MISP fetch failed: %s", e)
        sys.exit(1)

    if not raw:
        log.info("No new events.")
        return

    payload, new_cursor, all_seen_ids = translate(raw, state["processed_lookup"])

    if not payload:
        log.info("Fetched %d events but all attributes already processed; advancing cursor.", len(raw))
        if new_cursor > state["last_timestamp"]:
            state["last_timestamp"] = new_cursor
        for sid in all_seen_ids:
            if sid not in state["processed_lookup"]:
                state["processed_ids"].append(sid)
                state["processed_lookup"].add(sid)
        save_state(state)
        return

    log.info("Delivering %d IOC attributes to Splunk HEC", len(payload))
    if not deliver(payload):
        log.error("Delivery failed — state NOT updated, will retry next run")
        sys.exit(1)

    if new_cursor > state["last_timestamp"]:
        state["last_timestamp"] = new_cursor
    for sid in all_seen_ids:
        if sid not in state["processed_lookup"]:
            state["processed_ids"].append(sid)
            state["processed_lookup"].add(sid)
    save_state(state)
    log.info("Run complete: delivered=%d, cursor=%s, rinox_id=%s", len(payload), new_cursor, RINOX_RUN_ID)


if __name__ == "__main__":
    main()

Useful?

Used by 0 teams · Viewed 5 times · Last validated 5/17/2026