pushpythonscheduled✓ IRON 91hand-curated
Anomali ThreatStream → Microsoft Sentinel
Push Anomali ThreatStream indicators to Microsoft Sentinel daily
Daily run that pulls indicators from Anomali ThreatStream and forwards them to a Sentinel custom log table for hunting.
anomalisentineliocscheduledpushmoderate
# ============================================================
# RINOX INTEGRATION: Anomali ThreatStream -> Microsoft Sentinel
# Generated by Rinox (rinox.io)
# Use Case: Daily indicator forwarding with confidence filter
# Language: python
# ============================================================
# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import hashlib
import time
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
import requests
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.anomali_to_sentinel")
# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
ANOMALI_URL = os.environ.get("ANOMALI_URL", "https://api.threatstream.com").rstrip("/")
ANOMALI_USERNAME = os.environ.get("ANOMALI_USERNAME", "")
ANOMALI_API_KEY = os.environ.get("ANOMALI_API_KEY", "")
AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID", "")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET", "")
SENTINEL_DCE = os.environ.get("SENTINEL_DCE_URL", "").rstrip("/")
SENTINEL_DCR_ID = os.environ.get("SENTINEL_DCR_IMMUTABLE_ID", "")
SENTINEL_STREAM = os.environ.get("SENTINEL_STREAM_NAME", "Custom-AnomaliIndicators_CL")
MIN_CONFIDENCE = int(os.environ.get("MIN_CONFIDENCE", "75"))
STATE_PATH = Path(os.environ.get("STATE_PATH", "./anomali_sentinel_state.json"))
FIFO_CAP = int(os.environ.get("FIFO_CAP", "50000"))
PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "1000"))
REQUIRED = {
"ANOMALI_USERNAME": ANOMALI_USERNAME, "ANOMALI_API_KEY": ANOMALI_API_KEY,
"AZURE_TENANT_ID": AZURE_TENANT_ID, "AZURE_CLIENT_ID": AZURE_CLIENT_ID,
"AZURE_CLIENT_SECRET": AZURE_CLIENT_SECRET,
"SENTINEL_DCE_URL": SENTINEL_DCE, "SENTINEL_DCR_IMMUTABLE_ID": SENTINEL_DCR_ID,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
log.error("Missing env vars: %s", ", ".join(missing))
sys.exit(1)
def azure_token():
resp = requests.post(
f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token",
data={"grant_type": "client_credentials", "client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET, "scope": "https://monitor.azure.com//.default"},
timeout=30,
)
resp.raise_for_status()
return resp.json()["access_token"]
def load_state():
if STATE_PATH.exists():
d = json.loads(STATE_PATH.read_text())
return {
"last_modified_ts": d.get("last_modified_ts", ""),
"processed_ids": deque(d.get("processed_ids", []), maxlen=FIFO_CAP),
"processed_lookup": set(d.get("processed_ids", [])),
}
return {"last_modified_ts": "", "processed_ids": deque(maxlen=FIFO_CAP), "processed_lookup": set()}
def save_state(state):
payload = {
"last_modified_ts": state["last_modified_ts"],
"processed_ids": list(state["processed_ids"]),
}
tmp = STATE_PATH.with_suffix(".tmp")
tmp.write_text(json.dumps(payload))
tmp.replace(STATE_PATH)
def composite_hash(value, indicator_type):
return hashlib.sha256(f"{indicator_type}|{value}".encode()).hexdigest()
# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (Anomali ThreatStream)
# ============================================================
def fetch_indicators(modified_since):
headers = {"Authorization": f"apikey {ANOMALI_USERNAME}:{ANOMALI_API_KEY}"}
results = []
offset = 0
while True:
params = {
"limit": PAGE_SIZE, "offset": offset,
"confidence__gte": MIN_CONFIDENCE, "status": "active", "order_by": "modified_ts",
}
if modified_since:
params["modified_ts__gte"] = modified_since
resp = requests.get(f"{ANOMALI_URL}/api/v2/intelligence/", headers=headers, params=params, timeout=120)
resp.raise_for_status()
body = resp.json()
batch = body.get("objects", [])
if not batch:
break
results.extend(batch)
if len(batch) < PAGE_SIZE:
break
offset += PAGE_SIZE
if offset > 100000:
log.warning("Pagination cap reached at offset 100000; remaining indicators picked up next run.")
break
return results
# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
def translate(indicators, processed_lookup):
payload = []
all_seen = []
max_modified = ""
for ind in indicators:
value = ind.get("value")
itype = ind.get("itype") or ind.get("type")
if not value or not itype:
continue
cid = composite_hash(value, itype)
all_seen.append(cid)
modified = ind.get("modified_ts") or ind.get("created_ts") or ""
if modified > max_modified:
max_modified = modified
if cid in processed_lookup:
continue
payload.append({
"TimeGenerated": modified or datetime.now(timezone.utc).isoformat(),
"IndicatorValue": value,
"IndicatorType": itype,
"Confidence": ind.get("confidence", 0),
"Severity": ind.get("meta", {}).get("severity") or ind.get("severity"),
"Source": ind.get("source"),
"Tags": [t.get("name") for t in (ind.get("tags") or []) if isinstance(t, dict)],
"ASN": ind.get("asn"),
"Country": ind.get("country"),
"Tlp": ind.get("tlp"),
"RawIndicator": ind,
})
return payload, max_modified, all_seen
# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (Sentinel Logs Ingestion)
# ============================================================
def deliver_chunked(records, az_token, chunk=500):
if not records:
return True
url = f"{SENTINEL_DCE}/dataCollectionRules/{SENTINEL_DCR_ID}/streams/{SENTINEL_STREAM}?api-version=2023-01-01"
headers = {"Authorization": f"Bearer {az_token}", "Content-Type": "application/json"}
for i in range(0, len(records), chunk):
batch = records[i:i + chunk]
resp = requests.post(url, headers=headers, data=json.dumps(batch), timeout=120)
if resp.status_code >= 300:
log.error("Sentinel delivery failed at chunk %d: %s %s", i // chunk, resp.status_code, resp.text[:300])
return False
return True
# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)
# ============================================================
# SECTION 7: MAIN ORCHESTRATOR
# ============================================================
def main():
state = load_state()
log.info("Fetching Anomali indicators modified since %s", state["last_modified_ts"] or "(initial run)")
try:
indicators = fetch_indicators(state["last_modified_ts"])
except requests.RequestException as e:
log.error("Anomali fetch failed: %s", e)
sys.exit(1)
if not indicators:
log.info("No new indicators.")
return
payload, new_cursor, all_seen = translate(indicators, state["processed_lookup"])
az = azure_token()
if not payload:
if new_cursor and new_cursor > state["last_modified_ts"]:
state["last_modified_ts"] = new_cursor
for sid in all_seen:
if sid not in state["processed_lookup"]:
state["processed_ids"].append(sid)
state["processed_lookup"].add(sid)
save_state(state)
return
if not deliver_chunked(payload, az):
log.error("Delivery failed — state NOT updated, will retry next run")
sys.exit(1)
if new_cursor and new_cursor > state["last_modified_ts"]:
state["last_modified_ts"] = new_cursor
for sid in all_seen:
if sid not in state["processed_lookup"]:
state["processed_ids"].append(sid)
state["processed_lookup"].add(sid)
save_state(state)
log.info("Run complete: delivered=%d cursor=%s", len(payload), new_cursor)
if __name__ == "__main__":
main()
Useful?
Used by 0 teams · Viewed 7 times · Last validated 5/17/2026