"""
============================================================================
MÓDULO DE MODELOS SUPERVISADOS
============================================================================
Entrenamiento y evaluación de múltiples modelos de clasificación de riesgo
crediticio con integración de características RBM.
Autor: Sistema de Física
Versión: 1.0.0
"""
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import streamlit as st
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import (
classification_report, confusion_matrix, accuracy_score,
precision_score, recall_score, f1_score, roc_auc_score,
roc_curve, precision_recall_curve, cohen_kappa_score,
matthews_corrcoef
)
import xgboost as xgb
import lightgbm as lgb
import pickle
import json
import os
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
import warnings
warnings.filterwarnings('ignore')
class SupervisedModelTrainer:
"""Entrenador de modelos supervisados para riesgo crediticio"""
def __init__(self):
"""Inicializa el entrenador"""
self.models = {}
self.results = {}
self.X_train = None
self.X_test = None
self.X_holdout = None
self.y_train = None
self.y_test = None
self.y_holdout = None
self.feature_names = []
self.label_encoder = LabelEncoder()
self.scaler = StandardScaler()
# Configuración de modelos
self.model_configs = {
'logistic': {
'name': 'Logistic Regression',
'model': LogisticRegression(random_state=42, max_iter=1000),
'params': {
'C': [0.1, 1.0, 10.0],
'penalty': ['l1', 'l2'],
'solver': ['liblinear']
}
},
'random_forest': {
'name': 'Random Forest',
'model': RandomForestClassifier(random_state=42),
'params': {
'n_estimators': [100, 200],
'max_depth': [10, 20, None],
'min_samples_split': [2, 5]
}
},
'xgboost': {
'name': 'XGBoost',
'model': xgb.XGBClassifier(random_state=42, eval_metric='logloss'),
'params': {
'n_estimators': [100, 200],
'max_depth': [3, 6],
'learning_rate': [0.01, 0.1]
}
},
'lightgbm': {
'name': 'LightGBM',
'model': lgb.LGBMClassifier(random_state=42, verbose=-1),
'params': {
'n_estimators': [100, 200],
'max_depth': [3, 6],
'learning_rate': [0.01, 0.1]
}
},
'svm': {
'name': 'Support Vector Machine',
'model': SVC(random_state=42, probability=True),
'params': {
'C': [0.1, 1.0, 10.0],
'kernel': ['rbf', 'linear']
}
},
'mlp': {
'name': 'Multi-Layer Perceptron',
'model': MLPClassifier(random_state=42, max_iter=500),
'params': {
'hidden_layer_sizes': [(100,), (100, 50)],
'alpha': [0.001, 0.01],
'learning_rate': ['constant', 'adaptive']
}
}
}
def prepare_data(self, df: pd.DataFrame, target_col: str = 'nivel_riesgo',
test_size: float = 0.2, holdout_size: float = 0.1) -> bool:
"""
Prepara datos para entrenamiento
Args:
df: DataFrame con datos
target_col: Variable objetivo
test_size: Proporción para testing
holdout_size: Proporción para holdout
Returns:
True si exitoso
"""
try:
# Verificar que existe la variable objetivo
if target_col not in df.columns:
raise ValueError(f"Variable objetivo '{target_col}' no encontrada")
# Preparar características (solo numéricas)
feature_cols = df.select_dtypes(include=[np.number]).columns.tolist()
# Remover variables que no deben usarse como features
exclude_cols = ['rechazo_automatico', 'puntaje_riesgo']
feature_cols = [col for col in feature_cols if col not in exclude_cols]
X = df[feature_cols].fillna(df[feature_cols].median())
y = df[target_col]
# Codificar variable objetivo
y_encoded = self.label_encoder.fit_transform(y)
# Split estratificado: 70% train, 20% test, 10% holdout
X_temp, self.X_holdout, y_temp, self.y_holdout = train_test_split(
X, y_encoded, test_size=holdout_size, random_state=42, stratify=y_encoded
)
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X_temp, y_temp, test_size=test_size/(1-holdout_size), random_state=42, stratify=y_temp
)
# Escalar características
self.X_train_scaled = self.scaler.fit_transform(self.X_train)
self.X_test_scaled = self.scaler.transform(self.X_test)
self.X_holdout_scaled = self.scaler.transform(self.X_holdout)
self.feature_names = feature_cols
print(f"✅ Datos preparados:")
print(f" - Entrenamiento: {self.X_train.shape[0]} muestras")
print(f" - Testing: {self.X_test.shape[0]} muestras")
print(f" - Holdout: {self.X_holdout.shape[0]} muestras")
print(f" - Características: {len(self.feature_names)}")
print(f" - Clases: {len(self.label_encoder.classes_)}")
return True
except Exception as e:
st.error(f"❌ Error preparando datos: {e}")
return False
def train_model(self, model_key: str, use_grid_search: bool = True) -> Dict:
"""
Entrena un modelo específico
Args:
model_key: Clave del modelo a entrenar
use_grid_search: Si usar búsqueda de hiperparámetros
Returns:
Resultados del entrenamiento
"""
if model_key not in self.model_configs:
raise ValueError(f"Modelo '{model_key}' no configurado")
config = self.model_configs[model_key]
print(f"\n🚀 Entrenando {config['name']}...")
if use_grid_search and config['params']:
# Búsqueda de hiperparámetros
grid_search = GridSearchCV(
config['model'],
config['params'],
cv=5,
scoring='f1_weighted',
n_jobs=-1,
verbose=0
)
grid_search.fit(self.X_train_scaled, self.y_train)
best_model = grid_search.best_estimator_
best_params = grid_search.best_params_
else:
# Entrenamiento directo
best_model = config['model']
best_model.fit(self.X_train_scaled, self.y_train)
best_params = {}
# Predicciones
y_train_pred = best_model.predict(self.X_train_scaled)
y_test_pred = best_model.predict(self.X_test_scaled)
# Probabilidades (si el modelo las soporta)
try:
y_test_proba = best_model.predict_proba(self.X_test_scaled)
except:
y_test_proba = None
# Calcular métricas
metrics = self._calculate_metrics(self.y_test, y_test_pred, y_test_proba)
# Guardar modelo
model_path = f"models/supervised/{model_key}_model.pkl"
os.makedirs("models/supervised", exist_ok=True)
model_data = {
'model': best_model,
'scaler': self.scaler,
'label_encoder': self.label_encoder,
'feature_names': self.feature_names,
'best_params': best_params,
'metrics': metrics,
'timestamp': datetime.now().isoformat()
}
with open(model_path, 'wb') as f:
pickle.dump(model_data, f)
# Guardar métricas en JSON
metrics_path = f"models/supervised/{model_key}_metrics.json"
with open(metrics_path, 'w') as f:
# Convertir numpy types a tipos nativos de Python
metrics_serializable = self._make_serializable(metrics)
json.dump(metrics_serializable, f, indent=2)
results = {
'model': best_model,
'best_params': best_params,
'metrics': metrics,
'predictions': {
'y_train_pred': y_train_pred,
'y_test_pred': y_test_pred,
'y_test_proba': y_test_proba
}
}
print(f"✅ {config['name']} entrenado")
print(f" - Accuracy: {metrics['accuracy']:.4f}")
print(f" - F1-Score: {metrics['f1_weighted']:.4f}")
return results
def _calculate_metrics(self, y_true: np.ndarray, y_pred: np.ndarray,
y_proba: Optional[np.ndarray] = None) -> Dict:
"""Calcula métricas de evaluación completas"""
metrics = {
'accuracy': accuracy_score(y_true, y_pred),
'precision_macro': precision_score(y_true, y_pred, average='macro'),
'precision_weighted': precision_score(y_true, y_pred, average='weighted'),
'recall_macro': recall_score(y_true, y_pred, average='macro'),
'recall_weighted': recall_score(y_true, y_pred, average='weighted'),
'f1_macro': f1_score(y_true, y_pred, average='macro'),
'f1_weighted': f1_score(y_true, y_pred, average='weighted'),
'cohen_kappa': cohen_kappa_score(y_true, y_pred),
'matthews_corr': matthews_corrcoef(y_true, y_pred)
}
# AUC si hay probabilidades
if y_proba is not None:
try:
if len(self.label_encoder.classes_) == 2:
metrics['roc_auc'] = roc_auc_score(y_true, y_proba[:, 1])
else:
metrics['roc_auc'] = roc_auc_score(y_true, y_proba, multi_class='ovr')
except:
metrics['roc_auc'] = np.nan
# Matriz de confusión
metrics['confusion_matrix'] = confusion_matrix(y_true, y_pred).tolist()
# Reporte de clasificación
class_report = classification_report(y_true, y_pred, output_dict=True)
metrics['classification_report'] = class_report
return metrics
def _make_serializable(self, obj: Any) -> Any:
"""Convierte objetos numpy a tipos serializables"""
if isinstance(obj, dict):
return {key: self._make_serializable(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._make_serializable(item) for item in obj]
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, (np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.float64, np.float32)):
return float(obj)
else:
return obj
def train_all_models(self, selected_models: List[str], use_grid_search: bool = True) -> Dict:
"""
Entrena todos los modelos seleccionados
Args:
selected_models: Lista de modelos a entrenar
use_grid_search: Si usar búsqueda de hiperparámetros
Returns:
Resultados de todos los modelos
"""
all_results = {}
for model_key in selected_models:
if model_key in self.model_configs:
try:
results = self.train_model(model_key, use_grid_search)
all_results[model_key] = results
except Exception as e:
st.error(f"❌ Error entrenando {self.model_configs[model_key]['name']}: {e}")
return all_results
def create_comparison_visualizations(self, results: Dict) -> Dict:
"""
Crea visualizaciones comparativas de modelos
Args:
results: Resultados de múltiples modelos
Returns:
Diccionario con figuras
"""
figures = {}
if not results:
return figures
# 1. Comparación de métricas principales
model_names = []
accuracies = []
f1_scores = []
roc_aucs = []
for model_key, result in results.items():
model_names.append(self.model_configs[model_key]['name'])
accuracies.append(result['metrics']['accuracy'])
f1_scores.append(result['metrics']['f1_weighted'])
roc_aucs.append(result['metrics'].get('roc_auc', 0))
# Gráfico de barras comparativo
fig_comparison = go.Figure()
fig_comparison.add_trace(go.Bar(
name='Accuracy',
x=model_names,
y=accuracies,
marker_color='#3498db'
))
fig_comparison.add_trace(go.Bar(
name='F1-Score',
x=model_names,
y=f1_scores,
marker_color='#e74c3c'
))
fig_comparison.add_trace(go.Bar(
name='ROC-AUC',
x=model_names,
y=roc_aucs,
marker_color='#2ecc71'
))
fig_comparison.update_layout(
title="Comparación de Modelos - Métricas Principales",
xaxis_title="Modelos",
yaxis_title="Score",
barmode='group',
template="plotly_white",
height=500
)
figures['model_comparison'] = fig_comparison
# 2. Curvas ROC superpuestas (si hay probabilidades)
fig_roc = go.Figure()
for model_key, result in results.items():
if result['predictions']['y_test_proba'] is not None:
y_test_proba = result['predictions']['y_test_proba']
# Para multiclase, usar One-vs-Rest
if len(self.label_encoder.classes_) > 2:
# Tomar la clase de mayor riesgo (última clase)
fpr, tpr, _ = roc_curve(
(self.y_test == len(self.label_encoder.classes_) - 1).astype(int),
y_test_proba[:, -1]
)
else:
fpr, tpr, _ = roc_curve(self.y_test, y_test_proba[:, 1])
auc_score = result['metrics'].get('roc_auc', 0)
fig_roc.add_trace(go.Scatter(
x=fpr,
y=tpr,
mode='lines',
name=f"{self.model_configs[model_key]['name']} (AUC={auc_score:.3f})",
line=dict(width=2)
))
# Línea diagonal
fig_roc.add_trace(go.Scatter(
x=[0, 1],
y=[0, 1],
mode='lines',
name='Random',
line=dict(dash='dash', color='gray')
))
fig_roc.update_layout(
title="Curvas ROC - Comparación de Modelos",
xaxis_title="Tasa de Falsos Positivos",
yaxis_title="Tasa de Verdaderos Positivos",
template="plotly_white",
height=500
)
figures['roc_curves'] = fig_roc
return figures
def create_confusion_matrix_plot(self, model_key: str, results: Dict) -> go.Figure:
"""
Crea visualización de matriz de confusión
Args:
model_key: Clave del modelo
results: Resultados del modelo
Returns:
Figura de Plotly
"""
cm = np.array(results['metrics']['confusion_matrix'])
class_names = self.label_encoder.classes_
# Normalizar matriz de confusión
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
# Crear heatmap
fig = go.Figure(data=go.Heatmap(
z=cm_normalized,
x=class_names,
y=class_names,
colorscale='Blues',
text=cm,
texttemplate='%{text}',
textfont={"size": 14},
showscale=True,
colorbar=dict(title="Proporción")
))
fig.update_layout(
title=f"Matriz de Confusión - {self.model_configs[model_key]['name']}",
xaxis_title="Predicción",
yaxis_title="Real",
template="plotly_white",
height=400
)
return fig
def render_supervised_models():
"""Renderiza el módulo de modelos supervisados en Streamlit"""
st.title("🤖 Modelos Supervisados")
st.markdown("### *Entrenamiento y evaluación de modelos de clasificación*")
# Verificar datos
data_paths = [
"data/processed/datos_con_caracteristicas.csv",
"data/processed/datos_con_rbm.csv",
"data/processed/datos_credito_hipotecario_realista.csv"
]
available_datasets = [path for path in data_paths if os.path.exists(path)]
if not available_datasets:
st.error("❌ No hay datos disponibles. Ve a 'Generar Datos' primero.")
return
# Selección de dataset
st.subheader("📊 Selección de Dataset")
dataset_options = {
"data/processed/datos_credito_hipotecario_realista.csv": "📊 Datos Originales",
"data/processed/datos_con_caracteristicas.csv": "🔧 Con Ingeniería de Características",
"data/processed/datos_con_rbm.csv": "⚡ Con Características RBM"
}
available_options = {path: name for path, name in dataset_options.items() if path in available_datasets}
selected_dataset = st.selectbox(
"Selecciona el dataset para entrenamiento:",
options=list(available_options.keys()),
format_func=lambda x: available_options[x],
help="Elige el dataset con las características que deseas usar"
)
# Cargar datos seleccionados
@st.cache_data
def load_selected_data(path):
return pd.read_csv(path)
df = load_selected_data(selected_dataset)
st.success(f"✅ Dataset cargado: {len(df):,} registros, {len(df.columns)} variables")
# Crear entrenador
trainer = SupervisedModelTrainer()
# Configuración de entrenamiento
st.subheader("⚙️ Configuración de Entrenamiento")
col1, col2 = st.columns(2)
with col1:
st.markdown("**Selección de Modelos:**")
selected_models = st.multiselect(
"Modelos a entrenar:",
options=list(trainer.model_configs.keys()),
default=['logistic', 'random_forest', 'xgboost'],
format_func=lambda x: trainer.model_configs[x]['name']
)
use_grid_search = st.checkbox(
"Optimización de hiperparámetros",
value=True,
help="Usar GridSearchCV para encontrar mejores parámetros"
)
with col2:
st.markdown("**División de Datos:**")
st.markdown("""
- **70%** Entrenamiento (con validación cruzada 5-fold)
- **20%** Testing (evaluación final)
- **10%** Holdout (simulación de producción)
""")
target_col = st.selectbox(
"Variable objetivo:",
options=['nivel_riesgo'],
help="Variable a predecir"
)
# Entrenamiento
if selected_models and st.button("🚀 Entrenar Modelos", type="primary"):
with st.spinner("🤖 Entrenando modelos de Machine Learning..."):
try:
# Preparar datos
if trainer.prepare_data(df, target_col):
# Entrenar modelos seleccionados
progress_bar = st.progress(0)
status_text = st.empty()
all_results = {}
for i, model_key in enumerate(selected_models):
status_text.text(f"Entrenando {trainer.model_configs[model_key]['name']}...")
results = trainer.train_model(model_key, use_grid_search)
all_results[model_key] = results
progress_bar.progress((i + 1) / len(selected_models))
progress_bar.empty()
status_text.empty()
# Guardar resultados en session state
st.session_state.model_results = all_results
st.session_state.model_trainer = trainer
st.success(f"✅ {len(selected_models)} modelos entrenados exitosamente!")
# Mostrar tabla de resultados
st.subheader("📊 Resultados de Entrenamiento")
results_data = []
for model_key, result in all_results.items():
results_data.append([
trainer.model_configs[model_key]['name'],
f"{result['metrics']['accuracy']:.4f}",
f"{result['metrics']['f1_weighted']:.4f}",
f"{result['metrics']['precision_weighted']:.4f}",
f"{result['metrics']['recall_weighted']:.4f}",
f"{result['metrics'].get('roc_auc', 0):.4f}"
])
results_df = pd.DataFrame(results_data, columns=[
'Modelo', 'Accuracy', 'F1-Score', 'Precision', 'Recall', 'ROC-AUC'
])
st.dataframe(results_df, use_container_width=True, hide_index=True)
# Crear visualizaciones comparativas
comparison_figures = trainer.create_comparison_visualizations(all_results)
if comparison_figures:
col1, col2 = st.columns(2)
with col1:
if 'model_comparison' in comparison_figures:
st.plotly_chart(comparison_figures['model_comparison'], use_container_width=True)
with col2:
if 'roc_curves' in comparison_figures:
st.plotly_chart(comparison_figures['roc_curves'], use_container_width=True)
except Exception as e:
st.error(f"❌ Error durante el entrenamiento: {e}")
st.exception(e)
# Análisis detallado de modelos
if 'model_results' in st.session_state:
st.divider()
st.subheader("🔍 Análisis Detallado de Modelos")
model_results = st.session_state.model_results
trainer = st.session_state.model_trainer
# Selector de modelo para análisis detallado
selected_model_key = st.selectbox(
"Selecciona modelo para análisis detallado:",
options=list(model_results.keys()),
format_func=lambda x: trainer.model_configs[x]['name']
)
if selected_model_key:
result = model_results[selected_model_key]
# Mostrar métricas detalladas
col1, col2 = st.columns(2)
with col1:
st.markdown("**📊 Métricas de Clasificación:**")
metrics_df = pd.DataFrame([
["Accuracy", f"{result['metrics']['accuracy']:.4f}"],
["F1-Score (Weighted)", f"{result['metrics']['f1_weighted']:.4f}"],
["Precision (Weighted)", f"{result['metrics']['precision_weighted']:.4f}"],
["Recall (Weighted)", f"{result['metrics']['recall_weighted']:.4f}"],
["Cohen's Kappa", f"{result['metrics']['cohen_kappa']:.4f}"],
["Matthews Correlation", f"{result['metrics']['matthews_corr']:.4f}"],
["ROC-AUC", f"{result['metrics'].get('roc_auc', 0):.4f}"]
], columns=["Métrica", "Valor"])
st.dataframe(metrics_df, use_container_width=True, hide_index=True)
with col2:
# Matriz de confusión
fig_cm = trainer.create_confusion_matrix_plot(selected_model_key, result)
st.plotly_chart(fig_cm, use_container_width=True)
# Importancia de características (si está disponible)
if hasattr(result['model'], 'feature_importances_'):
st.subheader("📊 Importancia de Características")
importances = result['model'].feature_importances_
feature_importance_df = pd.DataFrame({
'Característica': trainer.feature_names,
'Importancia': importances
}).sort_values('Importancia', ascending=False)
# Top 15 características
top_features = feature_importance_df.head(15)
fig_importance = px.bar(
top_features,
x='Importancia',
y='Característica',
orientation='h',
title=f"Top 15 Características - {trainer.model_configs[selected_model_key]['name']}"
)
fig_importance.update_layout(
template="plotly_white",
height=500,
yaxis={'categoryorder': 'total ascending'}
)
st.plotly_chart(fig_importance, use_container_width=True)
[documentos]
def render_supervised_models_module():
"""Función principal para renderizar el módulo de modelos supervisados"""
render_supervised_models()
if __name__ == "__main__":
print("Módulo de modelos supervisados cargado correctamente")