Detects and tests OWASP API3:2023 BOPLA vulnerabilities in APIs, including excessive data exposure and mass assignment attacks via Python scripts. Useful for API penetration testing.
How this skill is triggered — by the user, by Claude, or both
Slash command
/cybersecurity-skills-zh:detecting-broken-object-property-level-authorizationThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
对象属性级授权失效(Broken Object Property Level Authorization,BOPLA)被OWASP API安全Top 10归类为API3:2023,结合了两类相关漏洞:过度数据暴露(API返回超出所需的数据)和批量赋值(Mass Assignment,API接受超出预期的数据)。即使API正确地执行了对象级授权,也可能无法控制用户对对象特定属性的读写权限。攻击者利用此漏洞从API响应中读取敏感属性,或向请求体中注入额外属性来修改其无权访问的字段。
对象属性级授权失效(Broken Object Property Level Authorization,BOPLA)被OWASP API安全Top 10归类为API3:2023,结合了两类相关漏洞:过度数据暴露(API返回超出所需的数据)和批量赋值(Mass Assignment,API接受超出预期的数据)。即使API正确地执行了对象级授权,也可能无法控制用户对对象特定属性的读写权限。攻击者利用此漏洞从API响应中读取敏感属性,或向请求体中注入额外属性来修改其无权访问的字段。
API返回的对象属性超出客户端所需:
// GET /api/v1/users/123
// 响应包含UI未显示的敏感字段:
{
"id": 123,
"username": "john_doe",
"email": "[email protected]",
"name": "John Doe",
"ssn": "123-45-6789", // 敏感 - UI不需要
"salary": 95000, // 敏感 - UI不需要
"internal_notes": "VIP client", // 内部 - 不应暴露
"password_hash": "$2b$12...", // 严重 - 永远不应暴露
"role": "admin", // 可能暴露权限信息
"created_by": "system_admin", // 内部元数据
"credit_card_last4": "4242" // PCI合规违规
}
API未过滤地将客户端提供的数据绑定到内部对象属性:
// 普通用户更新请求
PUT /api/v1/users/123
Content-Type: application/json
{
"name": "John Updated",
"email": "[email protected]",
"role": "admin", // 攻击者注入:权限提升
"is_verified": true, // 攻击者注入:绕过验证
"discount_rate": 100, // 攻击者注入:业务逻辑滥用
"account_balance": 999999 // 攻击者注入:金融欺诈
}
#!/usr/bin/env python3
"""BOPLA漏洞扫描器
测试API是否存在对象属性级授权失效(BOPLA)漏洞,
包括过度数据暴露和批量赋值。
"""
import requests
import json
import sys
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from copy import deepcopy
@dataclass
class BOPLAFinding:
endpoint: str
method: str
vulnerability_type: str # "excessive_exposure" 或 "mass_assignment"
severity: str
property_name: str
details: str
class BOPLAScanner:
SENSITIVE_PROPERTY_PATTERNS = {
"critical": [
"password", "password_hash", "secret", "token", "api_key",
"private_key", "secret_key", "access_token", "refresh_token",
],
"high": [
"ssn", "social_security", "tax_id", "credit_card", "card_number",
"cvv", "bank_account", "routing_number",
],
"medium": [
"salary", "income", "internal_notes", "admin_notes",
"created_by", "modified_by", "ip_address", "session_id",
"role", "permissions", "is_admin", "is_superuser", "privilege",
],
"low": [
"phone", "address", "date_of_birth", "dob", "age",
"gender", "ethnicity", "religion",
]
}
MASS_ASSIGNMENT_FIELDS = [
("role", "admin"),
("is_admin", True),
("is_verified", True),
("is_active", True),
("email_verified", True),
("account_type", "premium"),
("discount_rate", 100),
("credit_limit", 999999),
("permissions", ["admin", "write", "delete"]),
("account_balance", 999999),
("subscription_tier", "enterprise"),
("rate_limit", 999999),
]
def __init__(self, base_url: str, auth_headers: Dict[str, str]):
self.base_url = base_url.rstrip('/')
self.auth_headers = auth_headers
self.findings: List[BOPLAFinding] = []
def test_excessive_data_exposure(self, endpoint: str,
expected_fields: Set[str]) -> List[BOPLAFinding]:
"""测试API响应是否包含超出预期字段的内容。"""
findings = []
url = f"{self.base_url}{endpoint}"
try:
response = requests.get(url, headers=self.auth_headers, timeout=10)
if response.status_code != 200:
return findings
data = response.json()
# 处理单对象和列表响应
objects = data if isinstance(data, list) else [data]
if isinstance(data, dict) and "data" in data:
objects = data["data"] if isinstance(data["data"], list) else [data["data"]]
for obj in objects[:5]: # 检查前5个对象
if not isinstance(obj, dict):
continue
response_fields = set(self._flatten_keys(obj))
unexpected_fields = response_fields - expected_fields
for field_name in unexpected_fields:
severity = self._classify_sensitivity(field_name)
if severity:
finding = BOPLAFinding(
endpoint=endpoint,
method="GET",
vulnerability_type="excessive_exposure",
severity=severity,
property_name=field_name,
details=f"响应中出现意外的敏感字段 '{field_name}'"
)
findings.append(finding)
self.findings.append(finding)
except (requests.exceptions.RequestException, json.JSONDecodeError):
pass
return findings
def test_mass_assignment(self, endpoint: str, method: str = "PUT",
original_data: Optional[dict] = None) -> List[BOPLAFinding]:
"""测试API是否接受并处理额外注入的属性。"""
findings = []
url = f"{self.base_url}{endpoint}"
# 首先获取当前对象状态
if original_data is None:
try:
response = requests.get(url, headers=self.auth_headers, timeout=10)
if response.status_code == 200:
original_data = response.json()
else:
original_data = {}
except (requests.exceptions.RequestException, json.JSONDecodeError):
original_data = {}
# 测试每个批量赋值字段
for field_name, injected_value in self.MASS_ASSIGNMENT_FIELDS:
if field_name in original_data:
# 字段存在 - 测试是否可以修改
original_value = original_data[field_name]
if original_value == injected_value:
continue # 已有该值
test_data = deepcopy(original_data)
test_data[field_name] = injected_value
headers = {**self.auth_headers, "Content-Type": "application/json"}
try:
if method == "PUT":
response = requests.put(url, json=test_data,
headers=headers, timeout=10)
elif method == "PATCH":
response = requests.patch(url, json={field_name: injected_value},
headers=headers, timeout=10)
elif method == "POST":
response = requests.post(url, json=test_data,
headers=headers, timeout=10)
if response.status_code in (200, 201, 204):
# 验证字段是否实际被修改
verify_response = requests.get(url, headers=self.auth_headers, timeout=10)
if verify_response.status_code == 200:
updated_data = verify_response.json()
if updated_data.get(field_name) == injected_value:
finding = BOPLAFinding(
endpoint=endpoint,
method=method,
vulnerability_type="mass_assignment",
severity="CRITICAL" if field_name in ["role", "is_admin", "permissions"]
else "HIGH",
property_name=field_name,
details=f"成功注入 '{field_name}={injected_value}'"
)
findings.append(finding)
self.findings.append(finding)
# 如可能则恢复原始值
if field_name in original_data:
restore_data = {field_name: original_data[field_name]}
requests.patch(url, json=restore_data,
headers=headers, timeout=10)
except requests.exceptions.RequestException:
continue
return findings
def test_graphql_property_exposure(self, graphql_endpoint: str,
query: str) -> List[BOPLAFinding]:
"""测试GraphQL API是否存在属性级授权问题。"""
findings = []
url = f"{self.base_url}{graphql_endpoint}"
# 用于发现可用字段的自省查询
introspection = """
{
__schema {
types {
name
fields {
name
type { name kind }
}
}
}
}
"""
try:
response = requests.post(
url,
json={"query": introspection},
headers=self.auth_headers,
timeout=10
)
if response.status_code == 200:
data = response.json()
if "errors" not in data:
finding = BOPLAFinding(
endpoint=graphql_endpoint,
method="POST",
vulnerability_type="excessive_exposure",
severity="MEDIUM",
property_name="__schema",
details="GraphQL自省已启用 - 完整模式已暴露"
)
findings.append(finding)
self.findings.append(finding)
except requests.exceptions.RequestException:
pass
return findings
def _flatten_keys(self, obj: dict, prefix: str = "") -> List[str]:
"""递归展开嵌套字典的键。"""
keys = []
for key, value in obj.items():
full_key = f"{prefix}.{key}" if prefix else key
keys.append(full_key)
if isinstance(value, dict):
keys.extend(self._flatten_keys(value, full_key))
return keys
def _classify_sensitivity(self, field_name: str) -> Optional[str]:
"""对字段名进行敏感度分类。"""
lower_name = field_name.lower().split('.')[-1]
for severity, patterns in self.SENSITIVE_PROPERTY_PATTERNS.items():
for pattern in patterns:
if pattern in lower_name:
return severity.upper()
return None
def generate_report(self) -> dict:
return {
"total_findings": len(self.findings),
"by_type": {
"excessive_exposure": len([f for f in self.findings
if f.vulnerability_type == "excessive_exposure"]),
"mass_assignment": len([f for f in self.findings
if f.vulnerability_type == "mass_assignment"]),
},
"by_severity": {
"CRITICAL": len([f for f in self.findings if f.severity == "CRITICAL"]),
"HIGH": len([f for f in self.findings if f.severity == "HIGH"]),
"MEDIUM": len([f for f in self.findings if f.severity == "MEDIUM"]),
"LOW": len([f for f in self.findings if f.severity == "LOW"]),
},
"findings": [
{
"endpoint": f.endpoint,
"method": f.method,
"type": f.vulnerability_type,
"severity": f.severity,
"property": f.property_name,
"details": f.details,
}
for f in self.findings
]
}
# 服务端:显式属性白名单
class UserSerializer:
# 只暴露这些字段 - 永远不要使用to_json()或to_dict()
PUBLIC_FIELDS = ['id', 'username', 'name', 'avatar_url']
OWNER_FIELDS = PUBLIC_FIELDS + ['email', 'phone', 'preferences']
ADMIN_FIELDS = OWNER_FIELDS + ['role', 'created_at', 'last_login']
def serialize(self, user, requesting_user):
if requesting_user.is_admin:
fields = self.ADMIN_FIELDS
elif requesting_user.id == user.id:
fields = self.OWNER_FIELDS
else:
fields = self.PUBLIC_FIELDS
return {field: getattr(user, field) for field in fields}
# 批量赋值保护 - 可写字段的显式白名单
WRITABLE_FIELDS = {'name', 'email', 'phone', 'avatar_url', 'preferences'}
def update_user(user_id, request_data, requesting_user):
# 过滤掉不在白名单中的字段
safe_data = {k: v for k, v in request_data.items() if k in WRITABLE_FIELDS}
# 仅使用安全数据应用更新
User.objects.filter(id=user_id).update(**safe_data)
npx claudepluginhub killvxk/cybersecurity-skills-zhDetects and tests for OWASP API3:2023 Broken Object Property Level Authorization vulnerabilities including excessive data exposure and mass assignment attacks.
Detects and tests for OWASP API3:2023 Broken Object Property Level Authorization vulnerabilities including excessive data exposure and mass assignment attacks.
Detects and tests OWASP API3:2023 Broken Object Property Level Authorization (BOPLA) vulnerabilities including excessive data exposure and mass assignment attacks in APIs.