Automates responses to AWS GuardDuty findings using EventBridge and Lambda, including EC2 instance isolation and SNS notifications. Useful for real-time cloud security incident handling.
How this skill is triggered — by the user, by Claude, or both
Slash command
/cybersecurity-skills-zh:detecting-aws-guardduty-findings-automationThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Amazon GuardDuty 是一种威胁检测服务,持续监控 AWS 账户的恶意活动和未授权行为。通过将 GuardDuty 与 Amazon EventBridge 和 AWS Lambda 集成,安全团队实现自动化、实时的威胁响应,将平均响应时间(MTTR)从数小时缩短到数秒。GuardDuty 分析 VPC 流日志、CloudTrail 管理和数据事件、DNS 日志、EKS 审计日志和 S3 数据事件。
Amazon GuardDuty 是一种威胁检测服务,持续监控 AWS 账户的恶意活动和未授权行为。通过将 GuardDuty 与 Amazon EventBridge 和 AWS Lambda 集成,安全团队实现自动化、实时的威胁响应,将平均响应时间(MTTR)从数小时缩短到数秒。GuardDuty 分析 VPC 流日志、CloudTrail 管理和数据事件、DNS 日志、EKS 审计日志和 S3 数据事件。
# 启用 GuardDuty
aws guardduty create-detector --enable --finding-publishing-frequency FIFTEEN_MINUTES
# 启用附加数据源
aws guardduty update-detector \
--detector-id DETECTOR_ID \
--data-sources '{
"S3Logs": {"Enable": true},
"Kubernetes": {"AuditLogs": {"Enable": true}},
"MalwareProtection": {"ScanEc2InstanceWithFindings": {"EbsVolumes": true}},
"RuntimeMonitoring": {"Enable": true}
}'
{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [{"numeric": [">=", 7.0]}]
}
}
aws events put-rule \
--name "guardduty-high-severity" \
--event-pattern '{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [{"numeric": [">=", 7.0]}]
}
}'
aws events put-targets \
--rule "guardduty-high-severity" \
--targets "Id"="lambda-handler","Arn"="arn:aws:lambda:us-east-1:123456789012:function:guardduty-response"
import boto3
import json
import os
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
QUARANTINE_SG = os.environ.get('QUARANTINE_SECURITY_GROUP')
SNS_TOPIC = os.environ.get('SNS_TOPIC_ARN')
def lambda_handler(event, context):
finding = event['detail']
finding_type = finding['type']
severity = finding['severity']
account_id = finding['accountId']
region = finding['region']
# 提取资源信息
resource = finding.get('resource', {})
resource_type = resource.get('resourceType', '')
if resource_type == 'Instance':
instance_id = resource['instanceDetails']['instanceId']
instance_tags = {t['key']: t['value']
for t in resource['instanceDetails'].get('tags', [])}
# 如果已隔离则跳过
if instance_tags.get('SecurityStatus') == 'Quarantined':
return {'statusCode': 200, 'body': 'Already quarantined'}
# 获取当前安全组用于取证
instance = ec2.describe_instances(InstanceIds=[instance_id])
current_sgs = [sg['GroupId'] for sg in
instance['Reservations'][0]['Instances'][0]['SecurityGroups']]
# 使用发现信息和原始安全组标记实例
ec2.create_tags(
Resources=[instance_id],
Tags=[
{'Key': 'SecurityStatus', 'Value': 'Quarantined'},
{'Key': 'GuardDutyFinding', 'Value': finding_type},
{'Key': 'OriginalSecurityGroups', 'Value': ','.join(current_sgs)},
{'Key': 'QuarantineTime', 'Value': finding['updatedAt']}
]
)
# 移至隔离安全组(阻断所有流量)
if QUARANTINE_SG:
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[QUARANTINE_SG]
)
# 创建 EBS 快照用于取证
volumes = ec2.describe_volumes(
Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
)
for vol in volumes['Volumes']:
ec2.create_snapshot(
VolumeId=vol['VolumeId'],
Description=f'GuardDuty forensic snapshot - {finding_type}',
TagSpecifications=[{
'ResourceType': 'snapshot',
'Tags': [
{'Key': 'Purpose', 'Value': 'ForensicCapture'},
{'Key': 'SourceInstance', 'Value': instance_id},
{'Key': 'FindingType', 'Value': finding_type}
]
}]
)
# 通知安全团队
sns.publish(
TopicArn=SNS_TOPIC,
Subject=f'[GuardDuty] {finding_type} - 实例 {instance_id} 已隔离',
Message=json.dumps({
'action': 'instance_quarantined',
'instance_id': instance_id,
'finding_type': finding_type,
'severity': severity,
'account': account_id,
'region': region,
'original_security_groups': current_sgs,
'description': finding.get('description', '')
}, indent=2)
)
return {
'statusCode': 200,
'body': f'实例 {instance_id} 已隔离并创建快照'
}
return {'statusCode': 200, 'body': '非 EC2 发现已处理'}
import boto3
import json
import os
iam = boto3.client('iam')
sns = boto3.client('sns')
SNS_TOPIC = os.environ.get('SNS_TOPIC_ARN')
def lambda_handler(event, context):
finding = event['detail']
finding_type = finding['type']
if 'IAMUser' not in finding_type and 'UnauthorizedAccess' not in finding_type:
return {'statusCode': 200, 'body': 'Not an IAM finding'}
resource = finding.get('resource', {})
access_key_details = resource.get('accessKeyDetails', {})
user_name = access_key_details.get('userName', '')
access_key_id = access_key_details.get('accessKeyId', '')
if not user_name:
return {'statusCode': 200, 'body': 'No user identified'}
actions_taken = []
# 停用受损的访问密钥
if access_key_id and access_key_id != 'GeneratedFindingAccessKeyId':
try:
iam.update_access_key(
UserName=user_name,
AccessKeyId=access_key_id,
Status='Inactive'
)
actions_taken.append(f'已停用访问密钥 {access_key_id}')
except Exception as e:
actions_taken.append(f'停用密钥失败: {str(e)}')
# 为用户附加拒绝所有策略
deny_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Action": "*",
"Resource": "*"
}]
}
try:
iam.put_user_policy(
UserName=user_name,
PolicyName='GuardDuty-DenyAll-Quarantine',
PolicyDocument=json.dumps(deny_policy)
)
actions_taken.append(f'已为 {user_name} 应用拒绝所有策略')
except Exception as e:
actions_taken.append(f'应用拒绝策略失败: {str(e)}')
# 通知
sns.publish(
TopicArn=SNS_TOPIC,
Subject=f'[GuardDuty] IAM 入侵 - {user_name}',
Message=json.dumps({
'finding_type': finding_type,
'user': user_name,
'access_key': access_key_id,
'actions_taken': actions_taken,
'severity': finding['severity']
}, indent=2)
)
return {'statusCode': 200, 'body': json.dumps(actions_taken)}
resource "aws_guardduty_detector" "main" {
enable = true
finding_publishing_frequency = "FIFTEEN_MINUTES"
datasources {
s3_logs { enable = true }
kubernetes { audit_logs { enable = true } }
malware_protection {
scan_ec2_instance_with_findings {
ebs_volumes { enable = true }
}
}
}
}
resource "aws_cloudwatch_event_rule" "guardduty_high" {
name = "guardduty-high-severity"
description = "GuardDuty 高严重性发现"
event_pattern = jsonencode({
source = ["aws.guardduty"]
detail-type = ["GuardDuty Finding"]
detail = {
severity = [{ numeric = [">=", 7.0] }]
}
})
}
resource "aws_cloudwatch_event_target" "lambda" {
rule = aws_cloudwatch_event_rule.guardduty_high.name
arn = aws_lambda_function.guardduty_response.arn
}
| 类别 | 严重性范围 | 示例 |
|---|---|---|
| Backdoor | 5.0 - 8.0 | Backdoor:EC2/C&CActivity |
| CryptoCurrency | 5.0 - 8.0 | CryptoCurrency:EC2/BitcoinTool |
| Trojan | 5.0 - 8.0 | Trojan:EC2/BlackholeTraffic |
| UnauthorizedAccess | 5.0 - 8.0 | UnauthorizedAccess:IAMUser/ConsoleLogin |
| Recon | 2.0 - 5.0 | Recon:EC2/PortProbeUnprotected |
| Persistence | 5.0 - 8.0 | Persistence:IAMUser/AnomalousBehavior |
# 指定 GuardDuty 管理员
aws guardduty enable-organization-admin-account \
--admin-account-id 111111111111
# 为新账户自动启用
aws guardduty update-organization-configuration \
--detector-id DETECTOR_ID \
--auto-enable
npx claudepluginhub killvxk/cybersecurity-skills-zhAutomates AWS GuardDuty threat findings processing via EventBridge and Lambda for real-time incident response, resource quarantine, and security notifications.
Automate AWS GuardDuty threat detection findings processing using EventBridge and Lambda to enable real-time incident response, automatic quarantine of compromised resources, and security notification workflows.
Automate AWS GuardDuty threat detection findings processing using EventBridge and Lambda to enable real-time incident response, automatic quarantine of compromised resources, and security notification workflows.