rinoxRinox
pushpythonscheduled✓ IRON 91hand-curated

Anomali ThreatStream → Microsoft Sentinel

Push Anomali ThreatStream indicators to Microsoft Sentinel daily

Daily run that pulls indicators from Anomali ThreatStream and forwards them to a Sentinel custom log table for hunting.

anomalisentineliocscheduledpushmoderate
# ============================================================
# RINOX INTEGRATION: Anomali ThreatStream -> Microsoft Sentinel
# Generated by Rinox (rinox.io)
# Use Case: Daily indicator forwarding with confidence filter
# Language: python
# ============================================================

# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import hashlib
import time
from collections import deque
from datetime import datetime, timezone
from pathlib import Path

import requests

logging.basicConfig(
    level=os.environ.get("LOG_LEVEL", "INFO"),
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.anomali_to_sentinel")

# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
ANOMALI_URL = os.environ.get("ANOMALI_URL", "https://api.threatstream.com").rstrip("/")
ANOMALI_USERNAME = os.environ.get("ANOMALI_USERNAME", "")
ANOMALI_API_KEY = os.environ.get("ANOMALI_API_KEY", "")
AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID", "")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET", "")
SENTINEL_DCE = os.environ.get("SENTINEL_DCE_URL", "").rstrip("/")
SENTINEL_DCR_ID = os.environ.get("SENTINEL_DCR_IMMUTABLE_ID", "")
SENTINEL_STREAM = os.environ.get("SENTINEL_STREAM_NAME", "Custom-AnomaliIndicators_CL")
MIN_CONFIDENCE = int(os.environ.get("MIN_CONFIDENCE", "75"))
STATE_PATH = Path(os.environ.get("STATE_PATH", "./anomali_sentinel_state.json"))
FIFO_CAP = int(os.environ.get("FIFO_CAP", "50000"))
PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "1000"))

REQUIRED = {
    "ANOMALI_USERNAME": ANOMALI_USERNAME, "ANOMALI_API_KEY": ANOMALI_API_KEY,
    "AZURE_TENANT_ID": AZURE_TENANT_ID, "AZURE_CLIENT_ID": AZURE_CLIENT_ID,
    "AZURE_CLIENT_SECRET": AZURE_CLIENT_SECRET,
    "SENTINEL_DCE_URL": SENTINEL_DCE, "SENTINEL_DCR_IMMUTABLE_ID": SENTINEL_DCR_ID,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
    log.error("Missing env vars: %s", ", ".join(missing))
    sys.exit(1)


def azure_token():
    resp = requests.post(
        f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token",
        data={"grant_type": "client_credentials", "client_id": AZURE_CLIENT_ID,
              "client_secret": AZURE_CLIENT_SECRET, "scope": "https://monitor.azure.com//.default"},
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json()["access_token"]


def load_state():
    if STATE_PATH.exists():
        d = json.loads(STATE_PATH.read_text())
        return {
            "last_modified_ts": d.get("last_modified_ts", ""),
            "processed_ids": deque(d.get("processed_ids", []), maxlen=FIFO_CAP),
            "processed_lookup": set(d.get("processed_ids", [])),
        }
    return {"last_modified_ts": "", "processed_ids": deque(maxlen=FIFO_CAP), "processed_lookup": set()}


def save_state(state):
    payload = {
        "last_modified_ts": state["last_modified_ts"],
        "processed_ids": list(state["processed_ids"]),
    }
    tmp = STATE_PATH.with_suffix(".tmp")
    tmp.write_text(json.dumps(payload))
    tmp.replace(STATE_PATH)


def composite_hash(value, indicator_type):
    return hashlib.sha256(f"{indicator_type}|{value}".encode()).hexdigest()


# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (Anomali ThreatStream)
# ============================================================
def fetch_indicators(modified_since):
    headers = {"Authorization": f"apikey {ANOMALI_USERNAME}:{ANOMALI_API_KEY}"}
    results = []
    offset = 0
    while True:
        params = {
            "limit": PAGE_SIZE, "offset": offset,
            "confidence__gte": MIN_CONFIDENCE, "status": "active", "order_by": "modified_ts",
        }
        if modified_since:
            params["modified_ts__gte"] = modified_since
        resp = requests.get(f"{ANOMALI_URL}/api/v2/intelligence/", headers=headers, params=params, timeout=120)
        resp.raise_for_status()
        body = resp.json()
        batch = body.get("objects", [])
        if not batch:
            break
        results.extend(batch)
        if len(batch) < PAGE_SIZE:
            break
        offset += PAGE_SIZE
        if offset > 100000:
            log.warning("Pagination cap reached at offset 100000; remaining indicators picked up next run.")
            break
    return results


# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
def translate(indicators, processed_lookup):
    payload = []
    all_seen = []
    max_modified = ""

    for ind in indicators:
        value = ind.get("value")
        itype = ind.get("itype") or ind.get("type")
        if not value or not itype:
            continue
        cid = composite_hash(value, itype)
        all_seen.append(cid)

        modified = ind.get("modified_ts") or ind.get("created_ts") or ""
        if modified > max_modified:
            max_modified = modified

        if cid in processed_lookup:
            continue

        payload.append({
            "TimeGenerated": modified or datetime.now(timezone.utc).isoformat(),
            "IndicatorValue": value,
            "IndicatorType": itype,
            "Confidence": ind.get("confidence", 0),
            "Severity": ind.get("meta", {}).get("severity") or ind.get("severity"),
            "Source": ind.get("source"),
            "Tags": [t.get("name") for t in (ind.get("tags") or []) if isinstance(t, dict)],
            "ASN": ind.get("asn"),
            "Country": ind.get("country"),
            "Tlp": ind.get("tlp"),
            "RawIndicator": ind,
        })

    return payload, max_modified, all_seen


# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (Sentinel Logs Ingestion)
# ============================================================
def deliver_chunked(records, az_token, chunk=500):
    if not records:
        return True
    url = f"{SENTINEL_DCE}/dataCollectionRules/{SENTINEL_DCR_ID}/streams/{SENTINEL_STREAM}?api-version=2023-01-01"
    headers = {"Authorization": f"Bearer {az_token}", "Content-Type": "application/json"}
    for i in range(0, len(records), chunk):
        batch = records[i:i + chunk]
        resp = requests.post(url, headers=headers, data=json.dumps(batch), timeout=120)
        if resp.status_code >= 300:
            log.error("Sentinel delivery failed at chunk %d: %s %s", i // chunk, resp.status_code, resp.text[:300])
            return False
    return True


# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)


# ============================================================
# SECTION 7: MAIN ORCHESTRATOR
# ============================================================
def main():
    state = load_state()
    log.info("Fetching Anomali indicators modified since %s", state["last_modified_ts"] or "(initial run)")

    try:
        indicators = fetch_indicators(state["last_modified_ts"])
    except requests.RequestException as e:
        log.error("Anomali fetch failed: %s", e)
        sys.exit(1)

    if not indicators:
        log.info("No new indicators.")
        return

    payload, new_cursor, all_seen = translate(indicators, state["processed_lookup"])
    az = azure_token()

    if not payload:
        if new_cursor and new_cursor > state["last_modified_ts"]:
            state["last_modified_ts"] = new_cursor
        for sid in all_seen:
            if sid not in state["processed_lookup"]:
                state["processed_ids"].append(sid)
                state["processed_lookup"].add(sid)
        save_state(state)
        return

    if not deliver_chunked(payload, az):
        log.error("Delivery failed — state NOT updated, will retry next run")
        sys.exit(1)

    if new_cursor and new_cursor > state["last_modified_ts"]:
        state["last_modified_ts"] = new_cursor
    for sid in all_seen:
        if sid not in state["processed_lookup"]:
            state["processed_ids"].append(sid)
            state["processed_lookup"].add(sid)
    save_state(state)
    log.info("Run complete: delivered=%d cursor=%s", len(payload), new_cursor)


if __name__ == "__main__":
    main()

Useful?

Used by 0 teams · Viewed 7 times · Last validated 5/17/2026