Detects network attacks on OT historian servers (OSIsoft PI, Ignition, Wonderware) including data tampering, unauthorized queries, lateral movement, and specific vulnerabilities at IT/OT boundaries.
How this skill is triggered — by the user, by Claude, or both
Slash command
/cybersecurity-skills-zh:detecting-attacks-on-historian-serversThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
- 监控连接IT与OT网络的历史数据服务器,检测入侵指标
不适用于一般数据库安全监控(参见数据库安全技能)、历史数据服务器部署与配置,或仅限IT的数据仓库安全。
#!/usr/bin/env python3
"""OT Historian Attack Detector.
Monitors historian servers for unauthorized access, data manipulation,
lateral movement indicators, and exploitation of historian-specific
vulnerabilities. Supports OSIsoft PI and Ignition platforms.
"""
import json
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional
try:
import requests
except ImportError:
print("Install requests: pip install requests")
sys.exit(1)
class HistorianAttackDetector:
"""Detects attacks targeting OT historian servers."""
def __init__(self, historian_type: str, historian_url: str,
api_credentials: dict, verify_ssl: bool = False):
self.historian_type = historian_type
self.historian_url = historian_url.rstrip("/")
self.credentials = api_credentials
self.verify_ssl = verify_ssl
self.alerts = []
self.authorized_clients = set()
self.authorized_queries = {}
def set_baseline(self, authorized_clients: List[str],
authorized_query_patterns: Dict[str, List[str]]):
"""Set baseline of authorized historian clients and query patterns."""
self.authorized_clients = set(authorized_clients)
self.authorized_queries = authorized_query_patterns
def check_active_connections(self) -> List[dict]:
"""Check for unauthorized connections to historian."""
connections = []
if self.historian_type == "osisoft_pi":
try:
resp = requests.get(
f"{self.historian_url}/piwebapi/system/status",
auth=(self.credentials.get("username"), self.credentials.get("password")),
verify=self.verify_ssl,
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
connections = data.get("ConnectedClients", [])
except requests.RequestException as e:
print(f"[!] PI Web API 错误: {e}")
elif self.historian_type == "ignition":
try:
resp = requests.get(
f"{self.historian_url}/data/status/connections",
headers={"Authorization": f"Bearer {self.credentials.get('token')}"},
verify=self.verify_ssl,
timeout=10,
)
if resp.status_code == 200:
connections = resp.json().get("connections", [])
except requests.RequestException as e:
print(f"[!] Ignition API 错误: {e}")
# 检查未授权客户端
for conn in connections:
client_ip = conn.get("client_ip", conn.get("address", ""))
if self.authorized_clients and client_ip not in self.authorized_clients:
self.alerts.append({
"severity": "HIGH",
"type": "UNAUTHORIZED_HISTORIAN_CLIENT",
"timestamp": datetime.now().isoformat(),
"source_ip": client_ip,
"details": f"未授权客户端 {client_ip} 连接到 {self.historian_type} 历史数据服务器",
"mitre": "T0802 - Automated Collection",
})
return connections
def check_data_integrity(self, tags: List[str], hours_back: int = 24):
"""Check historian data for manipulation indicators."""
print(f"[*] 正在检查 {len(tags)} 个标签点过去 {hours_back} 小时的数据完整性")
integrity_issues = []
for tag in tags:
try:
if self.historian_type == "osisoft_pi":
resp = requests.get(
f"{self.historian_url}/piwebapi/streams/{tag}/recorded",
params={"startTime": f"*-{hours_back}h", "endTime": "*"},
auth=(self.credentials.get("username"), self.credentials.get("password")),
verify=self.verify_ssl,
timeout=15,
)
if resp.status_code == 200:
items = resp.json().get("Items", [])
# 检查可疑模式
if len(items) == 0:
integrity_issues.append({
"tag": tag, "issue": "NO_DATA",
"detail": "预期时间段内无数据点 - 可能已被删除",
})
else:
values = [i.get("Value", 0) for i in items if isinstance(i.get("Value"), (int, float))]
if values and len(set(values)) == 1 and len(values) > 100:
integrity_issues.append({
"tag": tag, "issue": "FLATLINE",
"detail": f"恒定值 {values[0]} 持续 {len(values)} 个数据点 - 可能是回放/欺骗攻击",
})
except requests.RequestException:
pass
for issue in integrity_issues:
self.alerts.append({
"severity": "HIGH",
"type": f"DATA_INTEGRITY_{issue['issue']}",
"timestamp": datetime.now().isoformat(),
"tag": issue["tag"],
"details": issue["detail"],
"mitre": "T0809 - Data Destruction" if issue["issue"] == "NO_DATA" else "T0832 - Manipulation of View",
})
return integrity_issues
def check_lateral_movement_indicators(self):
"""Check for indicators of historian being used as pivot point."""
indicators = []
# 检查1:历史数据服务器向一级设备发起出站连接
# (历史数据服务器应接收数据,而不是主动发起到PLC的连接)
indicators.append({
"check": "向PLC子网的出站连接",
"description": "历史数据服务器向一级设备发起连接可能表明已被入侵",
"detection": "监控防火墙日志,检查历史数据服务器IP是否连接PLC端口(502、102、44818)",
})
# 检查2:历史数据服务器上的新进程或服务
indicators.append({
"check": "历史数据服务器上的未授权进程",
"description": "攻击者可能在历史数据服务器上安装工具以进行横向移动",
"detection": "监控历史数据服务器上的进程创建事件(Sysmon EventID 1)",
})
# 检查3:历史数据服务器的异常身份验证
indicators.append({
"check": "来自意外来源的身份验证",
"description": "已入侵的IT系统向历史数据服务器进行身份验证以进行跳板攻击",
"detection": "监控Windows安全事件4624,检查非基线来源的登录",
})
return indicators
def generate_report(self):
"""Generate historian attack detection report."""
print(f"\n{'='*70}")
print("历史数据服务器攻击检测报告")
print(f"{'='*70}")
print(f"历史数据服务器类型: {self.historian_type}")
print(f"历史数据服务器URL: {self.historian_url}")
print(f"报告时间: {datetime.now().isoformat()}")
print(f"告警总数: {len(self.alerts)}")
if self.alerts:
print(f"\n--- 告警 ---")
for alert in self.alerts:
print(f"\n [{alert['severity']}] {alert['type']}")
print(f" 时间: {alert['timestamp']}")
print(f" 详情: {alert['details']}")
print(f" MITRE ICS: {alert.get('mitre', 'N/A')}")
print(f"\n--- 横向移动检查 ---")
for indicator in self.check_lateral_movement_indicators():
print(f"\n 检查: {indicator['check']}")
print(f" 风险: {indicator['description']}")
print(f" 检测: {indicator['detection']}")
if __name__ == "__main__":
detector = HistorianAttackDetector(
historian_type="osisoft_pi",
historian_url="https://pi-server.plant.local",
api_credentials={"username": "pi_reader", "password": "api_key_here"},
)
detector.set_baseline(
authorized_clients=["10.10.2.10", "10.10.2.20", "10.10.3.50", "10.10.150.10"],
authorized_query_patterns={},
)
detector.check_active_connections()
detector.check_data_integrity(tags=["REACTOR_01.TEMP", "PUMP_03.FLOW"], hours_back=24)
detector.generate_report()
| 术语 | 定义 |
|---|---|
| OT历史数据服务器(OT Historian) | 存储SCADA/DCS系统时间序列过程数据的数据库服务器(OSIsoft PI、Ignition、Wonderware) |
| 跳板点(Pivot Point) | 历史数据服务器位于IT和OT网络之间,是攻击者在区域间移动的主要目标 |
| 数据回放攻击(Data Replay Attack) | 向HMI馈送历史数据以掩盖实时过程操控(Stuxnet技术) |
| OSIsoft PI | 最广泛部署的OT历史数据服务器,被全球500强过程工业企业中65%使用 |
| Ignition | Inductive Automation的SCADA平台,带有历史数据模块,因Python脚本功能而日益成为攻击目标 |
| CVE-2025-0921 | Ignition SCADA特权文件系统漏洞,允许通过恶意项目文件进行权限提升 |
历史数据服务器攻击检测报告
====================================
历史数据服务器: [类型和主机名]
日期: YYYY-MM-DD
连接分析:
已授权客户端: [数量]
检测到未授权客户端: [数量及IP]
数据完整性:
检查的标签点: [数量]
完整性问题: [数量]
平线检测: [数量]
数据缺口: [数量]
横向移动指标:
出站PLC连接: [发现/未发现]
未授权进程: [发现/未发现]
异常身份验证: [发现/未发现]
npx claudepluginhub killvxk/cybersecurity-skills-zhDetects cyber attacks on OT historian servers (OSIsoft PI, Ignition, Wonderware) including unauthorized queries, data manipulation, and exploitation of historian-specific vulnerabilities.
Detects cyber attacks on OT historian servers at the IT/OT boundary, including data manipulation, unauthorized queries, and exploitation of historian-specific vulnerabilities like CVE-2025-0921.
Detects cyber attacks on OT historian servers (OSIsoft PI, Ignition, Wonderware) including data manipulation, unauthorized queries, and lateral movement at IT/OT boundaries. Use for monitoring compromise indicators.