From great-econometrics
End-to-end data pipeline for empirical research: fetch economic data from APIs (FRED, World Bank, IMF, BLS, OECD, Yahoo Finance), clean and transform raw data, construct strategy-specific variables, and validate panel structure. Use when asked to fetch data, download data, clean data, merge datasets, prepare analysis-ready data.
How this skill is triggered — by the user, by Claude, or both
Slash command
/great-econometrics:data-pipelineThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
本 skill 是一体化数据管道,由 `/data` 命令调用,负责完整的数据工作流:
本 skill 是一体化数据管道,由 /data 命令调用,负责完整的数据工作流:
产出:data/clean/[project_name]_clean.[dta|parquet|csv] + data/clean/cleaning_log.md
/data 命令自动调用(主要场景,分别在 Step 1 和 Step 2 调用)执行任何代码前必须先检查所需 API Key。
[plugin_root]/.env 文件(与 CLAUDE.md 同目录,即插件根目录)FRED_API_KEY 和 BLS_API_KEY若 FRED_API_KEY 缺失或为空:
FRED_API_KEY=<value> 追加到 .env若 BLS_API_KEY 缺失:
若 .env 已存在且 Key 已设置:
python-dotenv 注入所有代码中from dotenv import load_dotenv
load_dotenv() # 自动从 CWD 及父目录搜索 .env
⚠️
.env格式要求: 标准KEY=VALUE格式(每行一个),非 JSON。若为 JSON 格式,需先转换。
本 skill 由 /data 传入结构化上下文,无需重新询问用户。
接收参数:
| 参数 | 说明 | 示例 |
|---|---|---|
target_datasets | 目标数据集名称列表 | ["FRED宏观", "World Bank"] |
variables | 变量清单 | Y=gdp_growth, Z=tariff_rate |
time_range | 起止年份与频率 | 2000–2023, 年度 |
geo_scope | 地理范围 | 中国31省 / 美国全国 |
identification_strategy | 识别策略 | DiD / RDD / IV / Panel FE |
output_format | 输出格式 | csv(默认)/ dta / parquet |
根据传入的数据集名称和地理范围,选择对应 API:
| 数据类型 | 最佳数据源 | 方式 |
|---|---|---|
| 美国宏观时间序列 | FRED | fredapi |
| 全球发展指标 | World Bank | wbdata |
| 美国劳动力市场 | BLS API v2 | requests(需分块) |
| 跨国宏观/金融 | IMF WEO | imf-reader |
| 金融资产价格 | Yahoo Finance | yfinance |
无法自动获取的数据源(需用户手动下载):
- OECD:数据端点受 Cloudflare 防护,沙箱不可访问。优先用 IMF WEO 替代。
- 国家统计局(NBS):API 限制中国大陆 IP。请前往 https://data.stats.gov.cn 下载后走手动上传流程。
生成的脚本必须包含:
① 基础要素
python-dotenv),不硬编码② 输出路径规范
from datetime import date
filename = f"data/raw/{source_name}_{date.today().strftime('%Y%m%d')}.csv"
df.to_csv(filename, index=False)
③ 写入 data/raw/data_log.md
log_entry = f"""
## {dataset_name} 获取记录
- 获取日期:{date.today()}
- 数据来源:{source_name}
- 原始文件路径:{filename}
- 变量数:{df.shape[1]},观测数:{df.shape[0]}
- 频率:{frequency}
- 合并键:{merge_key}
"""
os.makedirs("data/raw", exist_ok=True)
with open("data/raw/data_log.md", "a", encoding="utf-8") as f:
f.write(log_entry)
④ 关键变量验证
required_vars = [Y_var, D_var, Z_var] + control_vars
available = df.columns.tolist()
found = [v for v in required_vars if v in available]
missing = [v for v in required_vars if v not in available]
print("=== 变量到位检查 ===")
for v in found:
print(f"✅ {v}")
for v in missing:
print(f"⚠️ {v} — 需在清洗阶段处理")
接收参数(来自 identification-memo.md):
| 参数 | 说明 | 示例 |
|---|---|---|
identification_strategy | 识别策略 | DiD / RDD / IV |
Y_var | 结果变量 | log_gdp_growth |
D_var | 处理变量 | policy_dummy |
Z_var | 识别变量 | tariff_rate_1990 |
control_vars | 协变量列表 | ["log_gdp", "population"] |
id_var | 面板个体标识 | province_code |
time_var | 时间变量 | year |
input_files | 原始数据文件路径 | ["data/raw/fred_*.csv"] |
merge_plan | 多源合并方案 | 主文件 + 辅助文件 |
触发条件: input_files 包含 ≥2 个文件且 merge_plan 已确认
import pandas as pd
df = pd.read_csv("data/raw/[主文件]")
n_before = len(df)
for aux_file, merge_key, how in merge_plan:
aux = pd.read_csv(f"data/raw/{aux_file}")
df = df.merge(aux, on=merge_key, how=how, suffixes=("", "_aux"))
print(f"合并 {aux_file}:{len(df):,} 行({how} join on {merge_key})")
n_after = len(df)
match_rate = n_after / n_before * 100
print(f"匹配率:{match_rate:.1f}%")
dups = df.duplicated(subset=merge_key).sum()
if dups > 0:
print(f"⚠️ 发现 {dups} 行重复")
for v in [Y_var, D_var, Z_var]:
miss_rate = df[v].isna().mean() * 100
print(f"{v} 缺失率:{miss_rate:.1f}%")
若匹配率低于 70%,需向用户报告并说明原因。
按以下顺序执行,每步都在 cleaning_log.md 中记录样本量和关键指标变化。
dup_mask = df.duplicated(subset=[id_var, time_var], keep=False)
n_dups = dup_mask.sum()
print(f"重复观测:{n_dups} 行")
if n_dups > 0:
df_dups = df[dup_mask]
df_dups.to_csv("data/clean/duplicates_removed.csv", index=False)
df = df.drop_duplicates(subset=[id_var, time_var], keep="first")
MISSING_CODES = [-99, -88, -77, -9, 9999, 99999, -9999]
df.replace(MISSING_CODES, pd.NA, inplace=True)
miss_report = df.isnull().mean().sort_values(ascending=False)
miss_high = miss_report[miss_report > 0.05]
print("缺失率 > 5% 的变量:")
for var, rate in miss_high.items():
print(f" {var}: {rate:.1%}")
缺失值处理决策:
| 情形 | 推荐处理 |
|---|---|
| Y / D 缺失 | 明确报告,删除;不得插补 |
| Z 缺失 | 明确报告;可能非随机 |
| 协变量 < 5% | 可考虑均值/中位数插补 |
| 协变量 5–20% | 建议多重插补或敏感性分析 |
| 协变量 > 20% | 寻找替代变量 |
# 列名统一为 snake_case
df.columns = (
df.columns
.str.strip()
.str.lower()
.str.replace(r"\s+", "_", regex=True)
.str.replace(r"[^a-z0-9_]", "", regex=True)
)
# 关键变量类型修正
df[time_var] = pd.to_numeric(df[time_var], errors="coerce").astype("Int64")
df[id_var] = df[id_var].astype(str).str.strip()
for col in numeric_vars:
df[col] = pd.to_numeric(df[col], errors="coerce")
def winsorize(series, lo=0.01, hi=0.99):
"""将连续变量在 lo/hi 分位数处缩尾"""
lo_val = series.quantile(lo)
hi_val = series.quantile(hi)
return series.clip(lo_val, hi_val)
# 仅对连续型协变量缩尾,不对 Y / D / Z 自动缩尾
for col in continuous_controls:
n_outliers = ((df[col] < df[col].quantile(0.01)) |
(df[col] > df[col].quantile(0.99))).sum()
print(f"{col}: {n_outliers} 个观测超出 [p1, p99]")
df[f"{col}_w"] = winsorize(df[col]) # 保留原始变量
⚠️ 经济学提醒:Winsorize 需要经济学判断,不是默认必须步骤。
label variable `Y_var' "结果变量:[描述]"
label variable `D_var' "处理变量:[描述]"
label variable `Z_var' "识别变量:[描述]"
根据传入的 identification_strategy,自动触发对应的变量构建模块。
* DiD 核心变量构建
gen treated = (group_var == "treated_group")
gen post = (year >= policy_year)
gen treated_post = treated * post
gen event_time = year - policy_year
* 交错处理时点
bysort id: egen first_treated_year = min(cond(treated_post == 1, year, .))
gen relative_time = year - first_treated_year
Python 版本:
df["treated"] = (df["group_var"] == "treated_group").astype(int)
df["post"] = (df[time_var] >= policy_year).astype(int)
df["treated_post"] = df["treated"] * df["post"]
df["event_time"] = df[time_var] - policy_year
first_treated = (df[df["treated_post"] == 1]
.groupby(id_var)[time_var].min()
.rename("first_treated_year"))
df = df.merge(first_treated, on=id_var, how="left")
df["relative_time"] = df[time_var] - df["first_treated_year"]
gen running_centered = running_var - cutoff_value
gen above_cutoff = (running_var >= cutoff_value)
gen running_above = running_centered * above_cutoff
Python 版本:
df["running_centered"] = df["running_var"] - cutoff_value
df["above_cutoff"] = (df["running_var"] >= cutoff_value).astype(int)
df["running_above"] = df["running_centered"] * df["above_cutoff"]
print(f"阈值以上样本:{df['above_cutoff'].sum()}")
# 范式 1:历史值作工具变量
df["Z_historical"] = df["D_var_baseyear"]
# 范式 2:Bartik 工具
df["Z_bartik"] = (df["industry_share_base"] * df["national_growth"]).groupby(id_var).transform("sum")
# 范式 3:政策 × 时间外生变动
df["Z_policy_interact"] = df["policy_var"] * df["exogenous_shock"]
# 初步检查:Z 与 D 的相关性
corr = df[[Z_var, D_var]].corr().iloc[0, 1]
print(f"Z-D 相关系数:{corr:.3f}")
if abs(corr) < 0.1:
print("⚠️ 相关性较弱,一阶段 F 可能 < 10")
# 唯一标识符验证
assert not df.duplicated(subset=[id_var, time_var]).any()
# 面板平衡性检查
counts = df.groupby(id_var)[time_var].count()
is_balanced = (counts == counts.max()).all()
print(f"面板平衡性:{'强平衡' if is_balanced else '弱平衡'}")
# Within / Between 方差分解
df["Y_mean_i"] = df.groupby(id_var)[Y_var].transform("mean")
df["Y_within"] = df[Y_var] - df["Y_mean_i"]
var_within = df["Y_within"].var()
var_between = df["Y_mean_i"].var()
print(f"Within 方差占比:{var_within / (var_within + var_between):.1%}")
assert id_var in df.columns and time_var in df.columns
df["is_treated_unit"] = (df[id_var] == treated_unit_id).astype(int)
df["is_donor"] = (1 - df["is_treated_unit"])
df["pre_treatment"] = (df[time_var] < treatment_year).astype(int)
df["post_treatment"] = (df[time_var] >= treatment_year).astype(int)
n_donors = df[df["is_donor"] == 1][id_var].nunique()
T_pre = df[df["pre_treatment"] == 1][time_var].nunique()
print(f"控制池规模:{n_donors},预处理期:{T_pre}")
if T_pre < 5:
print("⚠️ 预处理期较短,匹配质量可能受限")
触发条件: identification_strategy 为 DiD、Panel FE 或 Synthetic Control
print("=== 面板结构验证 ===")
print(f"观测总数:{len(df):,}")
print(f"个体数:{df[id_var].nunique()}")
print(f"时间期数:{df[time_var].nunique()}")
print(f"时间范围:{df[time_var].min()} – {df[time_var].max()}")
obs_per_unit = df.groupby(id_var).size()
print(f"每个体观测数:min={obs_per_unit.min()}, max={obs_per_unit.max()}")
if obs_per_unit.min() == obs_per_unit.max():
print("✅ 强平衡面板")
else:
print(f"⚠️ 不平衡面板")
print("\n=== 关键变量最终状态 ===")
audit_vars = [Y_var, D_var, Z_var] + control_vars
for v in audit_vars:
if v not in df.columns:
print(f"❌ {v}:不存在!")
continue
miss = df[v].isna().mean()
mean = df[v].mean() if df[v].dtype != object else "N/A"
std = df[v].std() if df[v].dtype != object else "N/A"
print(f"{'✅' if miss < 0.05 else '⚠️ '} {v}: 缺失={miss:.1%}, 均值={mean:.4g}, 标准差={std:.4g}")
import os
from datetime import date
os.makedirs("data/clean", exist_ok=True)
output_path_parquet = f"data/clean/{project_name}_clean.parquet"
df.to_parquet(output_path_parquet, index=False)
print(f"✅ 已保存:{output_path_parquet}({len(df):,} 行 × {df.shape[1]} 列)")
# 如需 Stata 格式
try:
import pyreadstat
output_path_dta = f"data/clean/{project_name}_clean.dta"
pyreadstat.write_dta(df, output_path_dta)
print(f"✅ 已保存:{output_path_dta}")
except ImportError:
print("(如需 .dta 格式,请安装 pyreadstat)")
cleaning_log.md每次清洗完成后自动追加:
log = f"""
## 数据清洗记录 — {project_name}
- 清洗日期:{date.today()}
- 识别策略:{identification_strategy}
- 输入文件:{input_files}
- 输出文件:data/clean/{project_name}_clean.parquet
### 样本量变化
| 步骤 | 操作 | 样本量 |
|------|------|--------|
| 原始数据(合并后) | — | {n_raw:,} |
| 删除重复观测 | ({id_var}, {time_var}) | {n_after_dedup:,} |
| 删除 Y/D 缺失 | 结果/处理变量 | {n_after_ymiss:,} |
| 最终样本 | — | {len(df):,} |
### 关键变量缺失率(清洗后)
| 变量 | 角色 | 缺失率 |
|------|------|--------|
{chr(10).join(f"| {v} | {role} | {df[v].isna().mean():.1%} |" for v, role in zip([Y_var, D_var, Z_var], ["Y", "D", "Z"]))}
### 识别策略专属变量
{chr(10).join(f"- `{v}`:已构建" for v in strategy_vars_built)}
### 数据质量问题记录
{quality_notes}
"""
os.makedirs("data/clean", exist_ok=True)
with open("data/clean/cleaning_log.md", "a", encoding="utf-8") as f:
f.write(log)
print("✅ 已写入 data/clean/cleaning_log.md")
| ❌ 常见错误 | ✅ 正确做法 |
|---|---|
| 合并前不检查合并键唯一性 | assert df[key].is_unique |
| 对 Y / D 自动 Winsorize | 缩尾前须有经济学判断 |
| 用全局均值填充 Y / D 缺失 | Y/D 应删除,不应插补 |
| DiD 用全局政策年份,忽视交错处理 | 用个体自身 first_treated_year |
| 覆盖原始数据 | data/raw/ 仅读,data/clean/ 用于保存 |
| 未记录样本量变化 | 每步都在 cleaning_log.md 记录 |
| 合并后不检查匹配率 | 低于 70% 需向用户报告 |
| 对 RDD 忘记密度检验 | McCrary 检验在 EDA 阶段执行 |
pip install pandas pyreadstat python-dotenv fredapi wbdata requests yfinance imf-reader
/data:调用本 skill 的上级命令results-analysis:接收本 skill 的输出,生成描述性统计与探索性分析did-analysis / rdd-analysis / iv-estimation / panel-data:应用本 skill 构建的变量npx claudepluginhub zhouziyue233/great-econometrics --plugin econometricsQueries 800K+ economic time series from FRED (Federal Reserve) including GDP, unemployment, inflation, interest rates. For macroeconomic analysis, financial research, and policy studies.
Queries the FRED (Federal Reserve Economic Data) API for 800,000+ economic time series including GDP, unemployment, inflation, and interest rates. Useful for macroeconomic analysis and financial research.
Provides Stata reference for .do files, data management, econometrics, causal inference, graphics, Mata programming, and packages like reghdfe, estout, did, rdrobust. Aids writing, debugging, explaining code.