Builds automated threat intelligence pipelines integrating STIX/TAXII sources, open-source feeds like Abuse.ch, and commercial platforms into SIEM for real-time IOC matching and alerting. For SOC teams standardizing and distributing TI.
How this skill is triggered — by the user, by Claude, or both
Slash command
/cybersecurity-skills-zh:building-threat-intelligence-feed-integrationThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
以下情况使用本技能:
以下情况使用本技能:
不适用于手动 IOC 查询——对于临时查询,请使用专用富化工具(VirusTotal、AbuseIPDB)。
taxii2-client、stix2 Python 包)按类型、格式和更新频率映射可用源:
| 情报源 | 格式 | IOC 类型 | 更新频率 | 费用 |
|---|---|---|---|---|
| AlienVault OTX | STIX/JSON | IP、域名、哈希、URL | 实时 | 免费 |
| Abuse.ch URLhaus | CSV/JSON | URL、域名 | 每 5 分钟 | 免费 |
| Abuse.ch MalwareBazaar | JSON API | 文件哈希 | 实时 | 免费 |
| CISA AIS | STIX/TAXII 2.1 | 全类型 | 每日 | 免费(美国政府) |
| CrowdStrike Intel | STIX/JSON | 全类型 + 威胁行为者 TTP | 实时 | 商业 |
| Mandiant Advantage | STIX 2.1 | 全类型 + 报告 | 实时 | 商业 |
连接到 TAXII 2.1 服务器并下载指标:
from taxii2client.v21 import Server, Collection
from stix2 import parse
# 连接到 TAXII 服务器(示例:CISA AIS)
server = Server(
"https://taxii.cisa.gov/taxii2/",
user="your_username",
password="your_password"
)
# 列出可用集合
for api_root in server.api_roots:
print(f"API Root: {api_root.title}")
for collection in api_root.collections:
print(f" Collection: {collection.title} (ID: {collection.id})")
# 从集合获取指标
collection = Collection(
"https://taxii.cisa.gov/taxii2/collections/COLLECTION_ID/",
user="your_username",
password="your_password"
)
# 获取过去 24 小时添加的指标
from datetime import datetime, timedelta
added_after = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
response = collection.get_objects(added_after=added_after, type=["indicator"])
for obj in response.get("objects", []):
indicator = parse(obj)
print(f"Type: {indicator.type}")
print(f"Pattern: {indicator.pattern}")
print(f"Valid Until: {indicator.valid_until}")
print(f"Confidence: {indicator.confidence}")
print("---")
Abuse.ch URLhaus 源:
import requests
import csv
from io import StringIO
# 下载 URLhaus 近期 URL
response = requests.get("https://urlhaus.abuse.ch/downloads/csv_recent/")
reader = csv.reader(StringIO(response.text), delimiter=',')
indicators = []
for row in reader:
if row[0].startswith("#"):
continue
indicators.append({
"id": row[0],
"dateadded": row[1],
"url": row[2],
"url_status": row[3],
"threat": row[5],
"tags": row[6]
})
print(f"从 URLhaus 接入了 {len(indicators)} 条 URL")
# 仅过滤活跃威胁
active = [i for i in indicators if i["url_status"] == "online"]
print(f"活跃威胁:{len(active)} 条")
AlienVault OTX Pulse 源:
from OTXv2 import OTXv2, IndicatorTypes
otx = OTXv2("YOUR_OTX_API_KEY")
# 获取订阅的 pulse(过去 24 小时)
pulses = otx.getall(modified_since="2024-03-14T00:00:00")
for pulse in pulses:
print(f"Pulse: {pulse['name']}")
print(f"Tags: {pulse['tags']}")
for indicator in pulse["indicators"]:
print(f" IOC: {indicator['indicator']} ({indicator['type']})")
Abuse.ch Feodo Tracker(C2 IP):
response = requests.get("https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.json")
c2_data = response.json()
for entry in c2_data:
print(f"IP: {entry['ip_address']}:{entry['port']}")
print(f"Malware: {entry['malware']}")
print(f"First Seen: {entry['first_seen']}")
print(f"Last Online: {entry['last_online']}")
将所有源转换为 STIX 2.1 格式以实现标准化:
from stix2 import Indicator, Bundle
import hashlib
def create_stix_indicator(ioc_value, ioc_type, source, confidence=50):
"""将原始 IOC 转换为 STIX 2.1 指标"""
pattern_map = {
"ipv4": f"[ipv4-addr:value = '{ioc_value}']",
"domain": f"[domain-name:value = '{ioc_value}']",
"url": f"[url:value = '{ioc_value}']",
"sha256": f"[file:hashes.'SHA-256' = '{ioc_value}']",
"md5": f"[file:hashes.MD5 = '{ioc_value}']",
}
return Indicator(
name=f"{ioc_type}: {ioc_value}",
pattern=pattern_map[ioc_type],
pattern_type="stix",
valid_from="2024-03-15T00:00:00Z",
confidence=confidence,
labels=[source],
custom_properties={"x_source_feed": source}
)
# 跨源去重
seen_iocs = set()
unique_indicators = []
for ioc in all_collected_iocs:
ioc_hash = hashlib.sha256(f"{ioc['type']}:{ioc['value']}".encode()).hexdigest()
if ioc_hash not in seen_iocs:
seen_iocs.add(ioc_hash)
unique_indicators.append(
create_stix_indicator(ioc["value"], ioc["type"], ioc["source"])
)
bundle = Bundle(objects=unique_indicators)
print(f"唯一指标数:{len(unique_indicators)}")
推送至 Splunk ES 威胁情报:
import requests
splunk_url = "https://splunk.company.com:8089"
headers = {"Authorization": f"Bearer {splunk_token}"}
for indicator in unique_indicators:
# 从 STIX 模式中提取 IOC 值
ioc_value = indicator.pattern.split("'")[1]
# 上传至 Splunk ES 威胁情报集合
data = {
"ip": ioc_value,
"description": indicator.name,
"weight": indicator.confidence // 10,
"threat_key": indicator.id,
"source_feed": indicator.get("x_source_feed", "unknown")
}
requests.post(
f"{splunk_url}/services/data/threat_intel/item/ip_intel",
headers=headers, data=data, verify=False
)
推送至 MISP 进行集中管理:
from pymisp import PyMISP, MISPEvent, MISPAttribute
misp = PyMISP("https://misp.company.com", "YOUR_MISP_API_KEY")
# 为源批次创建事件
event = MISPEvent()
event.info = f"TI Feed Import - {datetime.now().strftime('%Y-%m-%d')}"
event.threat_level_id = 2 # 中等
event.analysis = 2 # 已完成
# 将指标作为属性添加
for ioc in unique_indicators:
attr = MISPAttribute()
attr.type = "ip-dst" if "ipv4" in ioc.pattern else "domain"
attr.value = ioc.pattern.split("'")[1]
attr.to_ids = True
attr.comment = f"Source: {ioc.get('x_source_feed', 'mixed')}"
event.add_attribute(**attr)
result = misp.add_event(event)
print(f"MISP 事件已创建:{result['Event']['id']}")
跟踪源有效性指标:
index=threat_intel sourcetype="threat_intel_manager"
| stats count AS total_iocs,
dc(threat_key) AS unique_iocs,
dc(source_feed) AS feed_count
by source_feed
| join source_feed [
search index=notable source="Threat Intelligence"
| stats count AS matches by source_feed
]
| eval match_rate = round(matches / unique_iocs * 100, 2)
| sort - match_rate
| table source_feed, unique_iocs, matches, match_rate
| 术语 | 定义 |
|---|---|
| STIX 2.1 | 结构化威胁信息表达——用于共享威胁情报对象的标准化 JSON 格式 |
| TAXII | 指标信息可信自动化交换——通过 REST API 共享 STIX 数据的传输协议 |
| TIP | 威胁情报平台——用于聚合、评分和分发威胁情报的集中系统 |
| IOC 评分(IOC Scoring) | 根据源可靠性和相互印证为指标分配可信度值的过程 |
| 源去重(Feed Deduplication) | 跨多个源删除重复 IOC,同时保留多源归因信息 |
| IOC 过期(IOC Expiration) | 删除过时指标的生存时间策略(IP:30 天,域名:90 天,哈希:1 年) |
威胁情报源状态 — 每日报告
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
日期: 2024-03-15
IOC 总数: 45,892 个活跃指标
源健康状态:
源名称 IOC 数 匹配数 匹配率 状态
Abuse.ch URLhaus 12,340 47 0.38% 健康
AlienVault OTX 18,567 23 0.12% 健康
Abuse.ch Feodo 1,203 12 1.00% 健康
CISA AIS 8,945 8 0.09% 健康
CrowdStrike Intel 4,837 31 0.64% 健康
今日操作:
新增 IOC: 1,247 条
过期 IOC: 892 条
删除重复: 156 条
SIEM 匹配: 121 个显著事件生成
误报: 3 个(CDN IP 已从源移除)
npx claudepluginhub killvxk/cybersecurity-skills-zhAutomates ingestion, normalization, and distribution of STIX/TAXII threat intelligence feeds into SIEM platforms for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intel from multiple sources.
Builds pipelines integrating STIX/TAXII feeds, open-source and commercial threat intel into SIEM/security tools for real-time IOC matching and alerting. For SOC teams automating TI ingestion, normalization, scoring, and distribution.
Builds automated threat intelligence feed ingestion pipelines connecting STIX/TAXII feeds, open-source intel, and commercial TI platforms into SIEM tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence.