Deploys and configures TAXII 2.1 servers using Medallion or OpenTAXII to share STIX 2.1 bundles for automated threat intelligence and IOC exchange.
How this skill is triggered — by the user, by Claude, or both
Slash command
/cybersecurity-skills-zh:implementing-taxii-server-with-opentaxiiThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
TAXII(可信自动化情报信息交换)是 OASIS 标准协议,用于通过 HTTPS 交换网络威胁情报。OpenTAXII 是 EclecticIQ 开发的开源 TAXII 服务器实现,支持 TAXII 1.x;OASIS cti-taxii-server 提供 TAXII 2.1 参考实现。本技能涵盖部署 TAXII 服务器,配置威胁情报 Feed 集合,发布 STIX 2.1 Bundle,以及与 SIEM/SOAR 平台集成实现自动化指标摄入。
TAXII(可信自动化情报信息交换)是 OASIS 标准协议,用于通过 HTTPS 交换网络威胁情报。OpenTAXII 是 EclecticIQ 开发的开源 TAXII 服务器实现,支持 TAXII 1.x;OASIS cti-taxii-server 提供 TAXII 2.1 参考实现。本技能涵盖部署 TAXII 服务器,配置威胁情报 Feed 集合,发布 STIX 2.1 Bundle,以及与 SIEM/SOAR 平台集成实现自动化指标摄入。
medallion、stix2、taxii2-client、opentaxii、cabby 库TAXII 2.1 定义三种服务:发现(查找可用 API 根)、API 根(集合的入口点)和集合(CTI 对象的存储库)。集合支持两种访问模式:集合端点允许消费者轮询对象,状态端点跟踪添加操作的结果。TAXII 使用带 application/taxii+json;version=2.1 的 HTTP 内容协商。
TAXII 支持集线器和辐射(中心服务器向消费者分发)、点对点(合作伙伴之间双向共享)和源-订阅(生产者发布,消费者订阅)模型。每个集合可以有只读、只写或读写访问控制。
TAXII 传输包含结构化威胁信息对象的 STIX 2.1 Bundle:指标(检测模式)、观测数据、恶意软件、攻击模式、威胁行为者、入侵集合、活动、关系和目击记录。每个对象都有唯一的 STIX ID、创建/修改时间戳和可选的 TLP 标记定义。
# 安装 medallion(OASIS 参考实现)
# pip install medallion
# medallion_config.json
import json
config = {
"backend": {
"module_class": "MemoryBackend",
"filename": "taxii_data.json"
},
"users": {
"admin": "admin_password_change_me",
"analyst": "analyst_password_change_me",
"readonly": "readonly_password_change_me"
},
"taxii": {
"max_content_length": 10485760
}
}
# 创建初始数据存储
taxii_data = {
"discovery": {
"title": "威胁情报 TAXII 服务器",
"description": "用于共享 CTI 指标的 TAXII 2.1 服务器",
"contact": "[email protected]",
"default": "https://taxii.organization.com/api/",
"api_roots": ["https://taxii.organization.com/api/"]
},
"api_roots": {
"api": {
"title": "威胁情报 API 根",
"description": "威胁情报共享的主要 API 根",
"versions": ["application/taxii+json;version=2.1"],
"max_content_length": 10485760,
"collections": {
"malware-iocs": {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
"title": "恶意软件 IOC",
"description": "来自恶意软件分析的失陷指标",
"can_read": True,
"can_write": True,
"media_types": ["application/stix+json;version=2.1"]
},
"apt-intelligence": {
"id": "52892447-4d7e-4f70-b94a-5460e242dd23",
"title": "APT 情报",
"description": "高级持续性威胁组织情报",
"can_read": True,
"can_write": True,
"media_types": ["application/stix+json;version=2.1"]
},
"phishing-indicators": {
"id": "64993447-4d7e-4f70-b94a-5460e242ee34",
"title": "网络钓鱼指标",
"description": "网络钓鱼 URL、域名和电子邮件指标",
"can_read": True,
"can_write": True,
"media_types": ["application/stix+json;version=2.1"]
}
}
}
}
}
with open("medallion_config.json", "w") as f:
json.dump(config, f, indent=2)
with open("taxii_data.json", "w") as f:
json.dump(taxii_data, f, indent=2)
print("[+] TAXII 服务器配置已创建")
# docker-compose.yml
version: '3.8'
services:
taxii-server:
image: python:3.11-slim
container_name: taxii-server
working_dir: /app
volumes:
- ./medallion_config.json:/app/medallion_config.json
- ./taxii_data.json:/app/taxii_data.json
- ./certs:/app/certs
ports:
- "6100:6100"
command: >
bash -c "pip install medallion &&
medallion --host 0.0.0.0 --port 6100
--config /app/medallion_config.json"
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6100/taxii2/"]
interval: 30s
timeout: 10s
retries: 3
from stix2 import Indicator, Malware, Relationship, Bundle, TLP_WHITE
from taxii2client.v21 import Server, Collection, as_pages
import json
from datetime import datetime
class TAXIIPublisher:
def __init__(self, server_url, username, password):
self.server = Server(
server_url,
user=username,
password=password,
)
def list_collections(self):
"""列出所有可用集合。"""
api_root = self.server.api_roots[0]
for collection in api_root.collections:
print(f" [{collection.id}] {collection.title} "
f"(读={collection.can_read}, 写={collection.can_write})")
return api_root.collections
def publish_indicators(self, collection_id, indicators):
"""将 STIX 指标发布到 TAXII 集合。"""
api_root = self.server.api_roots[0]
collection = Collection(
f"{api_root.url}collections/{collection_id}/",
user=self.server._user,
password=self.server._password,
)
bundle = Bundle(objects=indicators)
response = collection.add_objects(bundle.serialize())
print(f"[+] 已向 {collection_id} 发布 {len(indicators)} 个对象")
print(f" 状态:{response.status}")
return response
def create_malware_indicators(self):
"""创建示例 STIX 恶意软件指标。"""
malware = Malware(
name="SUNBURST",
description="SolarWinds 供应链攻击(2020)中使用的后门。"
"被木马化的 SolarWinds.Orion.Core.BusinessLayer.dll 模块。",
malware_types=["backdoor", "trojan"],
is_family=True,
object_marking_refs=[TLP_WHITE],
)
indicator_hash = Indicator(
name="SUNBURST SHA-256 哈希",
description="被木马化的 SolarWinds Orion DLL 的 SHA-256 哈希",
pattern="[file:hashes.'SHA-256' = "
"'32519b85c0b422e4656de6e6c41878e95fd95026267daab4215ee59c107d6c77']",
pattern_type="stix",
valid_from=datetime(2020, 12, 13),
indicator_types=["malicious-activity"],
object_marking_refs=[TLP_WHITE],
)
indicator_domain = Indicator(
name="SUNBURST C2 域名模式",
description="SUNBURST 用于 C2 的 DGA 域名模式",
pattern="[domain-name:value MATCHES "
"'^[a-z0-9]{4,}\\.appsync-api\\..*\\.avsvmcloud\\.com$']",
pattern_type="stix",
valid_from=datetime(2020, 12, 13),
indicator_types=["malicious-activity"],
object_marking_refs=[TLP_WHITE],
)
rel = Relationship(
relationship_type="indicates",
source_ref=indicator_hash.id,
target_ref=malware.id,
)
return [malware, indicator_hash, indicator_domain, rel]
publisher = TAXIIPublisher(
"https://taxii.organization.com/taxii2/",
"admin", "admin_password_change_me"
)
collections = publisher.list_collections()
indicators = publisher.create_malware_indicators()
publisher.publish_indicators("91a7b528-80eb-42ed-a74d-c6fbd5a26116", indicators)
from taxii2client.v21 import Server, Collection, as_pages
import json
class TAXIIConsumer:
def __init__(self, server_url, username, password):
self.server = Server(server_url, user=username, password=password)
def poll_collection(self, collection_id, added_after=None):
"""轮询集合获取新 STIX 对象。"""
api_root = self.server.api_roots[0]
collection = Collection(
f"{api_root.url}collections/{collection_id}/",
user=self.server._user,
password=self.server._password,
)
kwargs = {}
if added_after:
kwargs["added_after"] = added_after
all_objects = []
for bundle in as_pages(collection.get_objects, per_request=50, **kwargs):
objects = json.loads(bundle).get("objects", [])
all_objects.extend(objects)
indicators = [o for o in all_objects if o.get("type") == "indicator"]
malware = [o for o in all_objects if o.get("type") == "malware"]
relationships = [o for o in all_objects if o.get("type") == "relationship"]
print(f"[+] 已轮询 {len(all_objects)} 个对象:"
f"{len(indicators)} 个指标,{len(malware)} 个恶意软件,"
f"{len(relationships)} 个关系")
return all_objects
def extract_iocs_for_siem(self, stix_objects):
"""从 STIX 对象中提取 IOC 用于 SIEM 摄入。"""
iocs = []
for obj in stix_objects:
if obj.get("type") == "indicator":
pattern = obj.get("pattern", "")
iocs.append({
"id": obj.get("id"),
"name": obj.get("name", ""),
"pattern": pattern,
"valid_from": obj.get("valid_from", ""),
"indicator_types": obj.get("indicator_types", []),
"confidence": obj.get("confidence", 0),
})
return iocs
consumer = TAXIIConsumer(
"https://taxii.organization.com/taxii2/",
"analyst", "analyst_password_change_me"
)
objects = consumer.poll_collection("91a7b528-80eb-42ed-a74d-c6fbd5a26116")
iocs = consumer.extract_iocs_for_siem(objects)
import requests
def push_to_splunk(iocs, splunk_url, hec_token):
"""通过 HEC 将提取的 IOC 推送到 Splunk。"""
headers = {"Authorization": f"Splunk {hec_token}"}
for ioc in iocs:
event = {
"event": ioc,
"sourcetype": "stix:indicator",
"source": "taxii-server",
"index": "threat_intel",
}
resp = requests.post(
f"{splunk_url}/services/collector/event",
headers=headers,
json=event,
verify=False,
)
if resp.status_code != 200:
print(f"[-] Splunk HEC 错误:{resp.text}")
print(f"[+] 已向 Splunk 推送 {len(iocs)} 个 IOC")
def push_to_elasticsearch(iocs, es_url, index="threat-intel"):
"""将 IOC 推送到 Elasticsearch。"""
for ioc in iocs:
resp = requests.post(
f"{es_url}/{index}/_doc",
json=ioc,
headers={"Content-Type": "application/json"},
)
if resp.status_code not in (200, 201):
print(f"[-] ES 错误:{resp.text}")
print(f"[+] 已在 Elasticsearch 中索引 {len(iocs)} 个 IOC")
npx claudepluginhub killvxk/cybersecurity-skills-zhDeploys and configures OpenTAXII or Medallion TAXII 2.1 servers to share and consume STIX-formatted cyber threat intelligence for automated indicator exchange between organizations.
Deploys and configures OpenTAXII or Medallion TAXII 2.1 servers to share and consume STIX-formatted cyber threat intelligence for automated indicator exchange between organizations.
Deploy an OpenTAXII server for STIX 2.1 threat intelligence sharing using Python and Docker. Automates indicator exchange between organizations.