mirror of
https://github.com/github/awesome-copilot.git
synced 2026-03-13 04:35:12 +00:00
Data analysis skills specifically designed for the financial risk control field (#823)
* upload skill datanalysis-credit-risk * update skill datanalysis-credit-risk * fix plugin problem * re-run npm start * re-run npm start * change codes description skill.md to english and remove personal path * try to update readme.md * Updating readme --------- Co-authored-by: Aaron Powell <me@aaron-powell.com>
This commit is contained in:
391
skills/datanalysis-credit-risk/scripts/example.py
Normal file
391
skills/datanalysis-credit-risk/scripts/example.py
Normal file
@@ -0,0 +1,391 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Execution script
|
||||
Version: 1.0.0
|
||||
Last modified: 02-03-2026
|
||||
"""
|
||||
import os, sys
|
||||
import time
|
||||
import pandas as pd
|
||||
from typing import Dict, List, Optional, Any, Callable
|
||||
import numpy as np
|
||||
import multiprocessing
|
||||
|
||||
# =============================================================================
|
||||
# System Configuration
|
||||
# =============================================================================
|
||||
CPU_COUNT = multiprocessing.cpu_count()
|
||||
N_JOBS = max(1, CPU_COUNT - 1) # Multi-process parallel count, keep 1 core for system
|
||||
|
||||
def _ensure_references_on_path():
|
||||
script_dir = os.path.dirname(__file__)
|
||||
cur = script_dir
|
||||
for _ in range(8):
|
||||
candidate = os.path.join(cur, 'references')
|
||||
if os.path.isdir(candidate):
|
||||
# add parent folder (which contains `references`) to sys.path
|
||||
sys.path.insert(0, cur)
|
||||
return
|
||||
parent = os.path.dirname(cur)
|
||||
if parent == cur:
|
||||
break
|
||||
cur = parent
|
||||
# fallback: add a reasonable repo-root guess
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(script_dir, '..', '..', '..')))
|
||||
|
||||
|
||||
_ensure_references_on_path()
|
||||
|
||||
from references.func import get_dataset, missing_check, org_analysis
|
||||
from references.analysis import (drop_abnormal_ym, drop_highmiss_features,
|
||||
drop_lowiv_features, drop_highcorr_features,
|
||||
drop_highpsi_features,
|
||||
drop_highnoise_features,
|
||||
export_cleaning_report,
|
||||
iv_distribution_by_org,
|
||||
psi_distribution_by_org,
|
||||
value_ratio_distribution_by_org)
|
||||
|
||||
# ==================== Path Configuration (Interactive Input) ====================
|
||||
# Use 50-column test data as default, support interactive modification in command line
|
||||
default_data_path = ''
|
||||
default_output_dir = ''
|
||||
|
||||
def _get_path_input(prompt, default):
|
||||
try:
|
||||
user_val = input(f"{prompt} (default: {default}): ").strip()
|
||||
except Exception:
|
||||
user_val = ''
|
||||
return user_val if user_val else default
|
||||
|
||||
DATA_PATH = _get_path_input('Please enter data file path DATA_PATH', default_data_path)
|
||||
OUTPUT_DIR = _get_path_input('Please enter output directory OUTPUT_DIR', default_output_dir)
|
||||
REPORT_PATH = os.path.join(OUTPUT_DIR, '数据清洗报告.xlsx')
|
||||
|
||||
# Data column name configuration (adjust according to actual data)
|
||||
DATE_COL = _get_path_input('Please enter date column name in data', 'apply_date')
|
||||
Y_COL = _get_path_input('Please enter label column name in data', 'target')
|
||||
ORG_COL = _get_path_input('Please enter organization column name in data', 'org_info')
|
||||
|
||||
# Support multiple primary key column names input (comma or space separated)
|
||||
def _get_list_input(prompt, default):
|
||||
try:
|
||||
user_val = input(f"{prompt} (default: {default}): ").strip()
|
||||
except Exception:
|
||||
user_val = ''
|
||||
if not user_val:
|
||||
user_val = default
|
||||
# Support comma or space separation
|
||||
parts = [p.strip() for p in user_val.replace(',', ' ').split() if p.strip()]
|
||||
return parts
|
||||
|
||||
KEY_COLS = _get_list_input('Please enter primary key column names in data (multiple columns separated by comma or space)', 'record_id')
|
||||
|
||||
# ==================== Multi-process Configuration Information ====================
|
||||
print("=" * 60)
|
||||
print("Multi-process Configuration")
|
||||
print("=" * 60)
|
||||
print(f" Local CPU cores: {CPU_COUNT}")
|
||||
print(f" Current process count: {N_JOBS}")
|
||||
print("=" * 60)
|
||||
|
||||
# ==================== OOS Organization Configuration (Interactive Input) ====================
|
||||
# Default out-of-sample organization list, users can input custom list in comma-separated format during interaction
|
||||
default_oos = [
|
||||
'orgA', 'orgB', 'orgC', 'orgD', 'orgE',
|
||||
]
|
||||
|
||||
try:
|
||||
oos_input = input('Please enter out-of-sample organization list, comma separated (press Enter to use default list):').strip()
|
||||
except Exception:
|
||||
oos_input = ''
|
||||
if oos_input:
|
||||
OOS_ORGS = [s.strip() for s in oos_input.split(',') if s.strip()]
|
||||
else:
|
||||
OOS_ORGS = default_oos
|
||||
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
# ==================== Interactive Hyperparameter Input ====================
|
||||
def get_user_input(prompt, default, dtype=float):
|
||||
"""Get user input, support default value and type conversion"""
|
||||
while True:
|
||||
try:
|
||||
user_input = input(f"{prompt} (default: {default}): ").strip()
|
||||
if not user_input:
|
||||
return default
|
||||
return dtype(user_input)
|
||||
except ValueError:
|
||||
print(f" Invalid input, please enter {dtype.__name__} type")
|
||||
|
||||
# Record cleaning steps
|
||||
steps = []
|
||||
|
||||
# Store parameters for each step
|
||||
params = {}
|
||||
|
||||
# Timer decorator
|
||||
def timer(step_name):
|
||||
"""Timer decorator"""
|
||||
def decorator(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
print(f"\nStarting {step_name}...")
|
||||
start_time = time.time()
|
||||
result = func(*args, **kwargs)
|
||||
elapsed = time.time() - start_time
|
||||
print(f" {step_name} elapsed: {elapsed:.2f} seconds")
|
||||
return result
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
# ==================== Step 1: Get Data ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 1: Get Data")
|
||||
print("=" * 60)
|
||||
step_start = time.time()
|
||||
# Use configuration from global_parameters
|
||||
data = get_dataset(
|
||||
data_pth=DATA_PATH,
|
||||
date_colName=DATE_COL,
|
||||
y_colName=Y_COL,
|
||||
org_colName=ORG_COL,
|
||||
data_encode='utf-8',
|
||||
key_colNames=KEY_COLS,
|
||||
drop_colNames=[],
|
||||
miss_vals=[-1, -999, -1111]
|
||||
)
|
||||
print(f" Original data: {data.shape}")
|
||||
print(f" Abnormal values replaced with NaN: [-1, -999, -1111]")
|
||||
print(f" Step 1 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 2: Organization Sample Analysis ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 2: Organization Sample Analysis")
|
||||
print("=" * 60)
|
||||
step_start = time.time()
|
||||
org_stat = org_analysis(data, oos_orgs=OOS_ORGS)
|
||||
steps.append(('机构样本统计', org_stat))
|
||||
print(f" Organization count: {data['new_org'].nunique()}, Month count: {data['new_date_ym'].nunique()}")
|
||||
print(f" Out-of-sample organizations: {len(OOS_ORGS)}")
|
||||
print(f" Step 2 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 3: Separate OOS Data ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 3: Separate OOS Data")
|
||||
print("=" * 60)
|
||||
step_start = time.time()
|
||||
oos_data = data[data['new_org'].isin(OOS_ORGS)]
|
||||
data = data[~data['new_org'].isin(OOS_ORGS)]
|
||||
print(f" OOS samples: {oos_data.shape[0]} rows")
|
||||
print(f" Modeling samples: {data.shape[0]} rows")
|
||||
print(f" OOS organizations: {OOS_ORGS}")
|
||||
print(f" Step 3 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
# Create separation information DataFrame
|
||||
oos_info = pd.DataFrame({'变量': ['OOS样本', '建模样本'], '数量': [oos_data.shape[0], data.shape[0]]})
|
||||
steps.append(('分离OOS数据', oos_info))
|
||||
|
||||
# ==================== Step 4: Filter Abnormal Months (Modeling Data Only) ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 4: Filter Abnormal Months (Modeling Data Only)")
|
||||
print("=" * 60)
|
||||
print(" Press Enter to use default values")
|
||||
print("=" * 60)
|
||||
params['min_ym_bad_sample'] = int(get_user_input("Bad sample count threshold", 10, int))
|
||||
params['min_ym_sample'] = int(get_user_input("Total sample count threshold", 500, int))
|
||||
step_start = time.time()
|
||||
data_filtered, abnormal_ym = drop_abnormal_ym(data.copy(), min_ym_bad_sample=params['min_ym_bad_sample'], min_ym_sample=params['min_ym_sample'])
|
||||
steps.append(('Step4-异常月份处理', abnormal_ym))
|
||||
print(f" After filtering: {data_filtered.shape}")
|
||||
print(f" Parameters: min_ym_bad_sample={params['min_ym_bad_sample']}, min_ym_sample={params['min_ym_sample']}")
|
||||
if len(abnormal_ym) > 0:
|
||||
print(f" Dropped months: {abnormal_ym['年月'].tolist()}")
|
||||
print(f" Removal conditions: {abnormal_ym['去除条件'].tolist()}")
|
||||
print(f" Step 4 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 5: Calculate Missing Rate ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 5: Calculate Missing Rate")
|
||||
print("=" * 60)
|
||||
step_start = time.time()
|
||||
orgs = data['new_org'].unique().tolist()
|
||||
channel = {'整体': orgs}
|
||||
miss_detail, miss_channel = missing_check(data, channel=channel)
|
||||
# miss_detail: Missing rate details (format: feature, overall, org1, org2, ..., orgn)
|
||||
# miss_channel: Overall missing rate
|
||||
steps.append(('缺失率明细', miss_detail))
|
||||
print(f" Feature count: {len(miss_detail['变量'].unique())}")
|
||||
print(f" Organization count: {len(miss_detail.columns) - 2}") # Subtract '变量' and '整体' columns
|
||||
print(f" Step 5 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 6: Drop High Missing Rate Features ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 6: Drop High Missing Rate Features")
|
||||
print("=" * 60)
|
||||
print(" Press Enter to use default values")
|
||||
print("=" * 60)
|
||||
params['missing_ratio'] = get_user_input("Missing rate threshold", 0.6)
|
||||
step_start = time.time()
|
||||
data_miss, dropped_miss = drop_highmiss_features(data.copy(), miss_channel, threshold=params['missing_ratio'])
|
||||
steps.append(('Step6-高缺失率处理', dropped_miss))
|
||||
print(f" Dropped: {len(dropped_miss)}")
|
||||
print(f" Threshold: {params['missing_ratio']}")
|
||||
if len(dropped_miss) > 0:
|
||||
print(f" Dropped features: {dropped_miss['变量'].tolist()[:5]}...")
|
||||
print(f" Removal conditions: {dropped_miss['去除条件'].tolist()[:5]}...")
|
||||
print(f" Step 6 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 7: Drop Low IV Features ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 7: Drop Low IV Features")
|
||||
print("=" * 60)
|
||||
print(" Press Enter to use default values")
|
||||
print("=" * 60)
|
||||
params['overall_iv_threshold'] = get_user_input("Overall IV threshold", 0.1)
|
||||
params['org_iv_threshold'] = get_user_input("Single organization IV threshold", 0.1)
|
||||
params['max_org_threshold'] = int(get_user_input("Maximum tolerated low IV organization count", 2, int))
|
||||
step_start = time.time()
|
||||
# Get feature list (use all features)
|
||||
features = [c for c in data.columns if c.startswith('i_')]
|
||||
data_iv, iv_detail, iv_process = drop_lowiv_features(
|
||||
data.copy(), features,
|
||||
overall_iv_threshold=params['overall_iv_threshold'],
|
||||
org_iv_threshold=params['org_iv_threshold'],
|
||||
max_org_threshold=params['max_org_threshold'],
|
||||
n_jobs=N_JOBS
|
||||
)
|
||||
# iv_detail: IV details (IV value of each feature in each organization and overall)
|
||||
# iv_process: IV processing table (features that do not meet the conditions)
|
||||
steps.append(('Step7-IV处理', iv_process))
|
||||
print(f" Dropped: {len(iv_process)}")
|
||||
print(f" Parameters: overall_iv_threshold={params['overall_iv_threshold']}, org_iv_threshold={params['org_iv_threshold']}, max_org_threshold={params['max_org_threshold']}")
|
||||
if len(iv_process) > 0:
|
||||
print(f" Dropped features: {iv_process['变量'].tolist()[:5]}...")
|
||||
print(f" Processing reasons: {iv_process['处理原因'].tolist()[:5]}...")
|
||||
print(f" Step 7 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 8: Drop High PSI Features ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 8: Drop High PSI Features (By Organization + Month-by-Month)")
|
||||
print("=" * 60)
|
||||
print(" Press Enter to use default values")
|
||||
print("=" * 60)
|
||||
params['psi_threshold'] = get_user_input("PSI threshold", 0.1)
|
||||
params['max_months_ratio'] = get_user_input("Maximum unstable month ratio", 1/3)
|
||||
params['max_orgs'] = int(get_user_input("Maximum unstable organization count", 6, int))
|
||||
step_start = time.time()
|
||||
# Get features before PSI calculation (use all features)
|
||||
features_for_psi = [c for c in data.columns if c.startswith('i_')]
|
||||
data_psi, psi_detail, psi_process = drop_highpsi_features(
|
||||
data.copy(), features_for_psi,
|
||||
psi_threshold=params['psi_threshold'],
|
||||
max_months_ratio=params['max_months_ratio'],
|
||||
max_orgs=params['max_orgs'],
|
||||
min_sample_per_month=100,
|
||||
n_jobs=N_JOBS
|
||||
)
|
||||
# psi_detail: PSI details (PSI value of each feature in each organization each month)
|
||||
# psi_process: PSI processing table (features that do not meet the conditions)
|
||||
steps.append(('Step8-PSI处理', psi_process))
|
||||
print(f" Dropped: {len(psi_process)}")
|
||||
print(f" Parameters: psi_threshold={params['psi_threshold']}, max_months_ratio={params['max_months_ratio']:.2f}, max_orgs={params['max_orgs']}")
|
||||
if len(psi_process) > 0:
|
||||
print(f" Dropped features: {psi_process['变量'].tolist()[:5]}...")
|
||||
print(f" Processing reasons: {psi_process['处理原因'].tolist()[:5]}...")
|
||||
print(f" PSI details: {len(psi_detail)} records")
|
||||
print(f" Step 8 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 9: Null Importance Denoising ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 9: Null Importance Remove High Noise Features")
|
||||
print("=" * 60)
|
||||
print(" Press Enter to use default values")
|
||||
print("=" * 60)
|
||||
params['n_estimators'] = int(get_user_input("Number of trees", 100, int))
|
||||
params['max_depth'] = int(get_user_input("Maximum tree depth", 5, int))
|
||||
params['gain_threshold'] = get_user_input("Gain difference threshold", 50)
|
||||
step_start = time.time()
|
||||
# Get feature list (use all features)
|
||||
features = [c for c in data.columns if c.startswith('i_')]
|
||||
data_noise, dropped_noise = drop_highnoise_features(data.copy(), features, n_estimators=params['n_estimators'], max_depth=params['max_depth'], gain_threshold=params['gain_threshold'])
|
||||
steps.append(('Step9-null importance处理', dropped_noise))
|
||||
print(f" Dropped: {len(dropped_noise)}")
|
||||
print(f" Parameters: n_estimators={params['n_estimators']}, max_depth={params['max_depth']}, gain_threshold={params['gain_threshold']}")
|
||||
if len(dropped_noise) > 0:
|
||||
print(f" Dropped features: {dropped_noise['变量'].tolist()}")
|
||||
print(f" Step 9 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 10: Drop High Correlation Features (Based on Null Importance Original Gain) ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 10: Drop High Correlation Features (Based on Null Importance Original Gain)")
|
||||
print("=" * 60)
|
||||
print(" Press Enter to use default values")
|
||||
print("=" * 60)
|
||||
params['max_corr'] = get_user_input("Correlation threshold", 0.9)
|
||||
params['top_n_keep'] = int(get_user_input("Keep top N features by original gain ranking", 20, int))
|
||||
step_start = time.time()
|
||||
# Get feature list (use all features)
|
||||
features = [c for c in data.columns if c.startswith('i_')]
|
||||
# Get original gain from null importance results
|
||||
if len(dropped_noise) > 0 and '原始gain' in dropped_noise.columns:
|
||||
gain_dict = dict(zip(dropped_noise['变量'], dropped_noise['原始gain']))
|
||||
else:
|
||||
gain_dict = {}
|
||||
data_corr, dropped_corr = drop_highcorr_features(data.copy(), features, threshold=params['max_corr'], gain_dict=gain_dict, top_n_keep=params['top_n_keep'])
|
||||
steps.append(('Step10-高相关性剔除', dropped_corr))
|
||||
print(f" Dropped: {len(dropped_corr)}")
|
||||
print(f" Threshold: {params['max_corr']}")
|
||||
if len(dropped_corr) > 0:
|
||||
print(f" Dropped features: {dropped_corr['变量'].tolist()}")
|
||||
print(f" Removal conditions: {dropped_corr['去除条件'].tolist()[:5]}...")
|
||||
print(f" Step 10 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Step 11: Export Report ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 11: Export Report")
|
||||
print("=" * 60)
|
||||
step_start = time.time()
|
||||
|
||||
# Calculate IV distribution statistics
|
||||
print(" Calculating IV distribution statistics...")
|
||||
iv_distribution = iv_distribution_by_org(iv_detail, oos_orgs=OOS_ORGS)
|
||||
print(f" IV distribution statistics: {len(iv_distribution)} records")
|
||||
|
||||
# Calculate PSI distribution statistics
|
||||
print(" Calculating PSI distribution statistics...")
|
||||
psi_distribution = psi_distribution_by_org(psi_detail, oos_orgs=OOS_ORGS)
|
||||
print(f" PSI distribution statistics: {len(psi_distribution)} records")
|
||||
|
||||
# Calculate value ratio distribution statistics (use all features)
|
||||
print(" Calculating value ratio distribution statistics...")
|
||||
features_for_value_ratio = [c for c in data.columns if c.startswith('i_')]
|
||||
value_ratio_distribution = value_ratio_distribution_by_org(data, features_for_value_ratio, oos_orgs=OOS_ORGS)
|
||||
print(f" Value ratio distribution statistics: {len(value_ratio_distribution)} records")
|
||||
|
||||
# Add details and distribution statistics to steps list
|
||||
steps.append(('Step7-IV明细', iv_detail))
|
||||
steps.append(('Step7-IV分布统计', iv_distribution))
|
||||
steps.append(('Step8-PSI明细', psi_detail))
|
||||
steps.append(('Step8-PSI分布统计', psi_distribution))
|
||||
steps.append(('Step5-有值率分布统计', value_ratio_distribution))
|
||||
|
||||
export_cleaning_report(REPORT_PATH, steps,
|
||||
iv_detail=iv_detail,
|
||||
iv_process=iv_process,
|
||||
psi_detail=psi_detail,
|
||||
psi_process=psi_process,
|
||||
params=params,
|
||||
iv_distribution=iv_distribution,
|
||||
psi_distribution=psi_distribution,
|
||||
value_ratio_distribution=value_ratio_distribution)
|
||||
print(f" Report: {REPORT_PATH}")
|
||||
print(f" Step 11 elapsed: {time.time() - step_start:.2f} seconds")
|
||||
|
||||
# ==================== Summary ====================
|
||||
print("\n" + "=" * 60)
|
||||
print("Data Cleaning Completed!")
|
||||
print("=" * 60)
|
||||
print(f" Original data: {data.shape[0]} rows")
|
||||
print(f" Original features: {len([c for c in data.columns if c.startswith('i_')])}")
|
||||
print(f" Cleaning steps (each step executed independently, data not deleted):")
|
||||
for name, df in steps:
|
||||
print(f" - {name}: Dropped {df.shape[0] if hasattr(df, 'shape') else len(df)}")
|
||||
Reference in New Issue
Block a user