rinoxRinox
pushpythonscheduled✓ IRON 92hand-curated

CrowdStrike Falcon → Microsoft Sentinel

Forward CrowdStrike detections to Microsoft Sentinel every 15 minutes

Polls the CrowdStrike Falcon detections API every 15 minutes and forwards new detections to a Sentinel custom log table via the Logs Ingestion API.

crowdstrikesentineldetectionscheduledpushmoderate
# ============================================================
# RINOX INTEGRATION: CrowdStrike Falcon -> Microsoft Sentinel
# Generated by Rinox (rinox.io)
# Use Case: 15-min detection forwarding via Logs Ingestion API
# Language: python
# ============================================================

# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import time
from collections import deque
from datetime import datetime, timezone
from pathlib import Path

import requests

logging.basicConfig(
    level=os.environ.get("LOG_LEVEL", "INFO"),
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.cs_to_sentinel")

# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
CS_BASE = os.environ.get("CROWDSTRIKE_BASE_URL", "https://api.crowdstrike.com").rstrip("/")
CS_CLIENT_ID = os.environ.get("CROWDSTRIKE_CLIENT_ID", "")
CS_CLIENT_SECRET = os.environ.get("CROWDSTRIKE_CLIENT_SECRET", "")
SENTINEL_DCE = os.environ.get("SENTINEL_DCE_URL", "").rstrip("/")
SENTINEL_DCR_ID = os.environ.get("SENTINEL_DCR_IMMUTABLE_ID", "")
SENTINEL_STREAM = os.environ.get("SENTINEL_STREAM_NAME", "Custom-CrowdStrikeDetections_CL")
AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID", "")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET", "")
STATE_PATH = Path(os.environ.get("STATE_PATH", "./cs_sentinel_state.json"))
FIFO_CAP = int(os.environ.get("FIFO_CAP", "10000"))

REQUIRED = {
    "CROWDSTRIKE_CLIENT_ID": CS_CLIENT_ID, "CROWDSTRIKE_CLIENT_SECRET": CS_CLIENT_SECRET,
    "SENTINEL_DCE_URL": SENTINEL_DCE, "SENTINEL_DCR_IMMUTABLE_ID": SENTINEL_DCR_ID,
    "AZURE_TENANT_ID": AZURE_TENANT_ID, "AZURE_CLIENT_ID": AZURE_CLIENT_ID,
    "AZURE_CLIENT_SECRET": AZURE_CLIENT_SECRET,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
    log.error("Missing env vars: %s", ", ".join(missing))
    sys.exit(1)


def cs_token():
    resp = requests.post(
        f"{CS_BASE}/oauth2/token",
        data={"client_id": CS_CLIENT_ID, "client_secret": CS_CLIENT_SECRET},
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json()["access_token"]


def azure_token():
    resp = requests.post(
        f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token",
        data={
            "grant_type": "client_credentials",
            "client_id": AZURE_CLIENT_ID,
            "client_secret": AZURE_CLIENT_SECRET,
            "scope": "https://monitor.azure.com//.default",
        },
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json()["access_token"]


def load_state():
    if STATE_PATH.exists():
        d = json.loads(STATE_PATH.read_text())
        return {
            "last_created_ts": int(d.get("last_created_ts", 0)),
            "processed_ids": deque(d.get("processed_ids", []), maxlen=FIFO_CAP),
            "processed_lookup": set(d.get("processed_ids", [])),
        }
    return {"last_created_ts": 0, "processed_ids": deque(maxlen=FIFO_CAP), "processed_lookup": set()}


def save_state(state):
    payload = {
        "last_created_ts": int(state["last_created_ts"]),
        "processed_ids": list(state["processed_ids"]),
    }
    tmp = STATE_PATH.with_suffix(".tmp")
    tmp.write_text(json.dumps(payload))
    tmp.replace(STATE_PATH)


# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (CrowdStrike)
# ============================================================
def fetch_detections(token, since_iso):
    """Bulk fetch detection IDs, then bulk hydrate their details."""
    headers = {"Authorization": f"Bearer {token}"}
    fql = f"created_timestamp:>'{since_iso}'+status:'new'"
    ids_resp = requests.get(
        f"{CS_BASE}/detects/queries/detects/v1",
        headers=headers,
        params={"filter": fql, "limit": 200, "sort": "created_timestamp.asc"},
        timeout=60,
    )
    ids_resp.raise_for_status()
    ids = ids_resp.json().get("resources", [])
    if not ids:
        return []

    detail_resp = requests.post(
        f"{CS_BASE}/detects/entities/summaries/GET/v1",
        headers={**headers, "Content-Type": "application/json"},
        json={"ids": ids},
        timeout=60,
    )
    detail_resp.raise_for_status()
    return detail_resp.json().get("resources", [])


# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
def translate(detections, processed_lookup):
    payload = []
    all_seen = []
    max_ts = 0

    for det in detections:
        det_id = det.get("detection_id") or det.get("composite_id")
        if not det_id:
            continue
        all_seen.append(det_id)
        created = det.get("created_timestamp") or det.get("date_updated") or ""
        try:
            ts = int(datetime.fromisoformat(created.replace("Z", "+00:00")).timestamp())
        except (ValueError, TypeError):
            ts = 0
        if ts > max_ts:
            max_ts = ts
        if det_id in processed_lookup:
            continue

        payload.append({
            "TimeGenerated": created or datetime.now(timezone.utc).isoformat(),
            "DetectionId": det_id,
            "Severity": det.get("max_severity_displayname", "Unknown"),
            "SeverityValue": det.get("max_severity", 0),
            "Tactic": det.get("tactic"),
            "Technique": det.get("technique"),
            "DeviceHostname": (det.get("device") or {}).get("hostname"),
            "DeviceId": (det.get("device") or {}).get("device_id"),
            "Status": det.get("status"),
            "Description": det.get("description", ""),
            "RawEvent": det,
        })

    return payload, max_ts, all_seen


# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (Sentinel Logs Ingestion API)
# ============================================================
def deliver(records, az_token):
    if not records:
        return True
    url = f"{SENTINEL_DCE}/dataCollectionRules/{SENTINEL_DCR_ID}/streams/{SENTINEL_STREAM}?api-version=2023-01-01"
    headers = {"Authorization": f"Bearer {az_token}", "Content-Type": "application/json"}
    # The Logs Ingestion API expects a JSON array of records.
    resp = requests.post(url, headers=headers, data=json.dumps(records), timeout=60)
    if resp.status_code >= 300:
        log.error("Sentinel delivery failed: %s %s", resp.status_code, resp.text[:300])
        return False
    return True


# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)


# ============================================================
# SECTION 7: MAIN ORCHESTRATOR
# ============================================================
def main():
    state = load_state()
    if state["last_created_ts"]:
        since_iso = datetime.fromtimestamp(state["last_created_ts"], tz=timezone.utc).isoformat()
    else:
        since_iso = (datetime.now(timezone.utc).replace(microsecond=0)).isoformat()

    cs = cs_token()
    az = azure_token()

    try:
        detections = fetch_detections(cs, since_iso)
    except requests.RequestException as e:
        log.error("CrowdStrike fetch failed: %s", e)
        sys.exit(1)

    if not detections:
        log.info("No new detections.")
        return

    payload, new_cursor, all_seen = translate(detections, state["processed_lookup"])

    if not payload:
        if new_cursor > state["last_created_ts"]:
            state["last_created_ts"] = new_cursor
        for sid in all_seen:
            if sid not in state["processed_lookup"]:
                state["processed_ids"].append(sid)
                state["processed_lookup"].add(sid)
        save_state(state)
        return

    if not deliver(payload, az):
        log.error("Delivery failed — state NOT updated, will retry next run")
        sys.exit(1)

    if new_cursor > state["last_created_ts"]:
        state["last_created_ts"] = new_cursor
    for sid in all_seen:
        if sid not in state["processed_lookup"]:
            state["processed_ids"].append(sid)
            state["processed_lookup"].add(sid)
    save_state(state)
    log.info("Run complete: delivered=%d, cursor=%s", len(payload), new_cursor)


if __name__ == "__main__":
    main()

Useful?

Used by 0 teams · Viewed 5 times · Last validated 5/17/2026