Builds automated malware submission pipeline: collects suspicious files from EDR/email gateways, submits to sandboxes/VirusTotal/Cuckoo, extracts IOCs for SIEM integration. Scales SOC high-volume alert triage beyond manual processes.
How this skill is triggered — by the user, by Claude, or both
Slash command
/cybersecurity-skills-zh:building-automated-malware-submission-pipelineThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
以下情况使用本技能:
以下情况使用本技能:
不适用于在生产环境中分析实时恶意软件样本——请始终使用隔离的沙箱基础设施。
requests、vt-py、pefile 库从多个来源收集可疑文件:
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime
class MalwareCollector:
def __init__(self, quarantine_dir="/opt/malware_quarantine"):
self.quarantine_dir = Path(quarantine_dir)
self.quarantine_dir.mkdir(exist_ok=True)
def collect_from_edr(self, edr_api_url, api_token):
"""从 CrowdStrike Falcon 拉取隔离文件"""
headers = {"Authorization": f"Bearer {api_token}"}
# 获取最近的隔离事件
response = requests.get(
f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
headers=headers,
params={"filter": "state:'quarantined'", "limit": 50}
)
file_ids = response.json()["resources"]
for file_id in file_ids:
# 下载隔离文件
dl_response = requests.get(
f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
headers=headers,
params={"ids": file_id}
)
file_data = dl_response.content
sha256 = hashlib.sha256(file_data).hexdigest()
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(file_data)
yield {"sha256": sha256, "path": str(filepath), "source": "edr"}
def collect_from_email_gateway(self, smtp_quarantine_path):
"""从邮件网关隔离区拉取附件"""
import email
from email import policy
for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
msg = email.message_from_binary_file(
eml_file.open("rb"), policy=policy.default
)
for attachment in msg.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
sha256 = hashlib.sha256(content).hexdigest()
filename = attachment.get_filename() or "unknown"
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(content)
yield {
"sha256": sha256,
"path": str(filepath),
"source": "email",
"original_filename": filename,
"sender": msg["From"],
"subject": msg["Subject"]
}
def compute_hashes(self, filepath):
"""计算文件的 MD5、SHA1、SHA256"""
with open(filepath, "rb") as f:
content = f.read()
return {
"md5": hashlib.md5(content).hexdigest(),
"sha1": hashlib.sha1(content).hexdigest(),
"sha256": hashlib.sha256(content).hexdigest(),
"size": len(content)
}
在提交沙箱前检查文件是否已知:
import vt
class MalwarePreScreener:
def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
self.vt_client = vt.Client(vt_api_key)
self.mb_api_url = mb_api_url
def check_virustotal(self, sha256):
"""在 VirusTotal 中查询哈希"""
try:
file_obj = self.vt_client.get_object(f"/files/{sha256}")
stats = file_obj.last_analysis_stats
return {
"found": True,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "Unknown"
),
"type": getattr(file_obj, "type_description", "Unknown")
}
except vt.APIError:
return {"found": False}
def check_malwarebazaar(self, sha256):
"""在 MalwareBazaar 中查询哈希"""
response = requests.post(
self.mb_api_url,
data={"query": "get_info", "hash": sha256}
)
data = response.json()
if data["query_status"] == "ok":
entry = data["data"][0]
return {
"found": True,
"signature": entry.get("signature", "Unknown"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown")
}
return {"found": False}
def pre_screen(self, sha256):
"""执行所有预筛选检查"""
vt_result = self.check_virustotal(sha256)
mb_result = self.check_malwarebazaar(sha256)
verdict = "UNKNOWN"
if vt_result["found"] and vt_result.get("malicious", 0) > 10:
verdict = "KNOWN_MALICIOUS"
elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
verdict = "LIKELY_CLEAN"
return {
"sha256": sha256,
"virustotal": vt_result,
"malwarebazaar": mb_result,
"pre_screen_verdict": verdict,
"needs_sandbox": verdict == "UNKNOWN"
}
def close(self):
self.vt_client.close()
Cuckoo Sandbox 提交:
class SandboxSubmitter:
def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
self.cuckoo_url = cuckoo_url
def submit_to_cuckoo(self, filepath, timeout=300):
"""提交文件到 Cuckoo Sandbox"""
with open(filepath, "rb") as f:
response = requests.post(
f"{self.cuckoo_url}/tasks/create/file",
files={"file": f},
data={
"timeout": timeout,
"options": "procmemdump=yes,route=none",
"priority": 2,
"machine": "win10_x64"
}
)
task_id = response.json()["task_id"]
return task_id
def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
"""等待沙箱分析完成"""
import time
elapsed = 0
while elapsed < max_wait:
response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
status = response.json()["task"]["status"]
if status == "reported":
return self.get_report(task_id)
elif status == "failed_analysis":
return {"error": "Analysis failed"}
time.sleep(poll_interval)
elapsed += poll_interval
return {"error": "Analysis timed out"}
def get_report(self, task_id):
"""获取分析报告"""
response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
report = response.json()
# 提取关键指标
return {
"task_id": task_id,
"score": report.get("info", {}).get("score", 0),
"signatures": [
{"name": s["name"], "severity": s["severity"], "description": s["description"]}
for s in report.get("signatures", [])
],
"network": {
"dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
"http": [
{"url": h["uri"], "method": h["method"]}
for h in report.get("network", {}).get("http", [])
],
"hosts": report.get("network", {}).get("hosts", [])
},
"dropped_files": [
{"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
for f in report.get("dropped", [])
],
"processes": [
{"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
for p in report.get("behavior", {}).get("processes", [])
],
"registry_keys": [
k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
]
}
def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
"""提交到 Joe Sandbox Cloud"""
with open(filepath, "rb") as f:
response = requests.post(
f"{joe_url}/v2/submission/new",
headers={"Authorization": f"Bearer {joe_api_key}"},
files={"sample": f},
data={
"systems": "w10_64",
"internet-access": False,
"report-cache": True
}
)
return response.json()["data"]["webid"]
class VerdictGenerator:
def __init__(self):
self.malicious_threshold = 7 # Cuckoo 评分阈值
def generate_verdict(self, pre_screen, sandbox_report):
"""综合预筛选和沙箱结果生成最终研判"""
iocs = {
"ips": [],
"domains": [],
"urls": [],
"hashes": [],
"registry_keys": [],
"files_dropped": []
}
# 从沙箱报告提取 IOC
if sandbox_report:
iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
iocs["urls"] = [
h["url"] for h in sandbox_report.get("network", {}).get("http", [])
]
iocs["hashes"] = [
f["sha256"] for f in sandbox_report.get("dropped_files", [])
]
iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
iocs["files_dropped"] = sandbox_report.get("dropped_files", [])
# 确定研判结论
vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0
combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)
if combined_score >= 100:
verdict = "MALICIOUS"
confidence = "HIGH"
elif combined_score >= 50:
verdict = "SUSPICIOUS"
confidence = "MEDIUM"
elif combined_score >= 20:
verdict = "POTENTIALLY_UNWANTED"
confidence = "LOW"
else:
verdict = "CLEAN"
confidence = "HIGH"
return {
"verdict": verdict,
"confidence": confidence,
"combined_score": combined_score,
"iocs": iocs,
"vt_detections": vt_malicious,
"sandbox_score": sandbox_score,
"signatures": sandbox_report.get("signatures", []) if sandbox_report else []
}
def push_to_splunk(verdict_result, splunk_url, splunk_token):
"""通过 Splunk HEC 发送恶意软件分析研判结论"""
import json
event = {
"sourcetype": "malware_analysis",
"source": "malware_pipeline",
"event": {
"sha256": verdict_result["sha256"],
"verdict": verdict_result["verdict"],
"confidence": verdict_result["confidence"],
"score": verdict_result["combined_score"],
"vt_detections": verdict_result["vt_detections"],
"sandbox_score": verdict_result["sandbox_score"],
"malware_family": verdict_result.get("threat_label", "Unknown"),
"iocs": verdict_result["iocs"],
"signatures": [s["name"] for s in verdict_result["signatures"]]
}
}
response = requests.post(
f"{splunk_url}/services/collector/event",
headers={
"Authorization": f"Splunk {splunk_token}",
"Content-Type": "application/json"
},
json=event,
verify=False
)
return response.status_code == 200
def push_iocs_to_blocklist(iocs, firewall_api):
"""将提取的 IOC 推送至封锁基础设施"""
for ip in iocs.get("ips", []):
requests.post(
f"{firewall_api}/block",
json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
)
for domain in iocs.get("domains", []):
requests.post(
f"{firewall_api}/block",
json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
)
def run_malware_pipeline(sample_path, config):
"""执行完整的恶意软件分析流水线"""
collector = MalwareCollector()
screener = MalwarePreScreener(config["vt_key"])
submitter = SandboxSubmitter(config["cuckoo_url"])
generator = VerdictGenerator()
# 第一步:哈希计算和预筛选
hashes = collector.compute_hashes(sample_path)
pre_screen = screener.pre_screen(hashes["sha256"])
# 第二步:如未知则提交沙箱
sandbox_report = None
if pre_screen["needs_sandbox"]:
task_id = submitter.submit_to_cuckoo(sample_path)
sandbox_report = submitter.wait_for_analysis(task_id)
# 第三步:生成研判结论
verdict = generator.generate_verdict(pre_screen, sandbox_report)
verdict["sha256"] = hashes["sha256"]
verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")
# 第四步:推送至 SIEM
push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])
# 第五步:如为恶意则封锁
if verdict["verdict"] == "MALICIOUS":
push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])
screener.close()
return verdict
| 术语 | 定义 |
|---|---|
| 动态分析(Dynamic Analysis) | 在沙箱中执行恶意软件以观察运行时行为(进程创建、网络、文件系统变更) |
| 静态分析(Static Analysis) | 不执行恶意软件的检查(哈希查询、字符串分析、PE 头检查) |
| 沙箱规避(Sandbox Evasion) | 恶意软件用于检测沙箱环境并改变行为以规避分析的技术 |
| IOC 提取(IOC Extraction) | 从沙箱报告自动识别网络指标、文件取证痕迹和注册表变更的过程 |
| 多 AV 扫描(Multi-AV Scanning) | 将样本提交至多个杀毒引擎(VirusTotal)以进行基于共识的检测 |
| 研判结论(Verdict) | 样本的最终分类:Malicious(恶意)、Suspicious(可疑)、Potentially Unwanted(可能不需要)或 Clean(干净) |
MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample: invoice_march.docx
SHA256: a1b2c3d4e5f6a7b8...
File Type: Microsoft Word Document (macro-enabled)
Pre-Screening:
VirusTotal: 34/72 malicious (Emotet.Downloader)
MalwareBazaar: Tags: emotet, macro, downloader
Sandbox Analysis (Cuckoo):
Score: 9.2/10 (MALICIOUS)
Signatures:
- Macro executes PowerShell download cradle (severity: 8)
- Process injection into explorer.exe (severity: 9)
- Connects to known Emotet C2 server (severity: 9)
Extracted IOCs:
C2 IPs: 185.234.218[.]50:8080, 45.77.123[.]45:443
Domains: update-service[.]evil[.]com
Dropped Files: payload.dll (SHA256: b2c3d4e5...)
Registry: HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update
VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
[DONE] IOCs pushed to Splunk threat intel
[DONE] C2 IPs blocked on firewall
[DONE] Domain sinkholed on DNS
[DONE] Hash blocked on endpoint
npx claudepluginhub killvxk/cybersecurity-skills-zhBuilds automated malware submission pipeline: collects suspicious files from endpoints/email gateways, submits to sandboxes/multi-engine scanners like VirusTotal/Cuckoo, generates IOC verdicts for SIEM. Scales SOC high-volume triage.
Builds an automated malware submission and analysis pipeline for SOC teams, collecting suspicious files and submitting to sandboxes for verdict generation.
Builds an automated malware submission and analysis pipeline for SOC teams, collecting suspicious files and submitting to sandboxes for verdict generation.