Código fuente para rbm_model

"""
============================================================================
MÓDULO RBM - Máquina de Boltzmann Restringida
============================================================================

Implementación completa de Restricted Boltzmann Machine (RBM) para 
extracción de características latentes en datos de riesgo crediticio.

Características:
- Implementación desde cero del algoritmo RBM
- Contrastive Divergence (CD-k) para entrenamiento
- Métricas de evaluación completas
- Visualizaciones de diagnóstico
- Interfaz interactiva con Streamlit

Autor: Sistema de Física
Versión: 1.0.0
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import streamlit as st
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import pickle
import os
from typing import Tuple, Dict, List, Optional
import warnings
from datetime import datetime

warnings.filterwarnings('ignore')

class RestrictedBoltzmannMachine:
    """
    Implementación completa de Máquina de Boltzmann Restringida (RBM)
    
    La RBM es un modelo generativo no supervisado que aprende representaciones
    latentes de los datos mediante una arquitectura de dos capas:
    - Capa visible: datos de entrada
    - Capa oculta: características latentes
    
    Entrenamiento mediante Contrastive Divergence (CD-k)
    """
    
    def __init__(self, 
                 n_visible: int,
                 n_hidden: int = 100,
                 learning_rate: float = 0.01,
                 n_epochs: int = 100,
                 batch_size: int = 64,
                 k_cd: int = 1,
                 random_state: int = 42):
        """
        Inicializa la RBM
        
        Args:
            n_visible: Número de unidades visibles (dimensión de entrada)
            n_hidden: Número de unidades ocultas
            learning_rate: Tasa de aprendizaje
            n_epochs: Número de épocas de entrenamiento
            batch_size: Tamaño del batch
            k_cd: Número de pasos de Gibbs sampling en CD
            random_state: Semilla aleatoria
        """
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.k_cd = k_cd
        self.random_state = random_state
        
        # Inicializar generador aleatorio
        np.random.seed(random_state)
        
        # Inicializar parámetros
        self._initialize_parameters()
        
        # Métricas de entrenamiento
        self.training_history = {
            'reconstruction_error': [],
            'pseudo_log_likelihood': [],
            'free_energy': []
        }
        
        # Estado del modelo
        self.is_trained = False
        self.scaler = None
        
    def _initialize_parameters(self):
        """Inicializa pesos y sesgos de la RBM"""
        # Pesos: inicialización Xavier/Glorot
        std = np.sqrt(2.0 / (self.n_visible + self.n_hidden))
        self.W = np.random.normal(0, std, (self.n_visible, self.n_hidden))
        
        # Sesgos
        self.visible_bias = np.zeros(self.n_visible)
        self.hidden_bias = np.zeros(self.n_hidden)
        
        print(f"✓ Parámetros inicializados:")
        print(f"  - Pesos W: {self.W.shape}")
        print(f"  - Sesgo visible: {self.visible_bias.shape}")
        print(f"  - Sesgo oculto: {self.hidden_bias.shape}")
    
    def _sigmoid(self, x: np.ndarray) -> np.ndarray:
        """Función sigmoide estable numéricamente"""
        return np.where(x >= 0, 
                       1 / (1 + np.exp(-x)),
                       np.exp(x) / (1 + np.exp(x)))
    
    def _sample_hidden(self, visible: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        Muestrea unidades ocultas dado el estado visible
        
        Args:
            visible: Estado de las unidades visibles
            
        Returns:
            hidden_probs: Probabilidades de activación de unidades ocultas
            hidden_states: Estados binarios de unidades ocultas
        """
        # Calcular probabilidades: P(h_j = 1 | v)
        hidden_probs = self._sigmoid(np.dot(visible, self.W) + self.hidden_bias)
        
        # Muestrear estados binarios
        hidden_states = (hidden_probs > np.random.random(hidden_probs.shape)).astype(np.float32)
        
        return hidden_probs, hidden_states
    
    def _sample_visible(self, hidden: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        Muestrea unidades visibles dado el estado oculto
        
        Args:
            hidden: Estado de las unidades ocultas
            
        Returns:
            visible_probs: Probabilidades de activación de unidades visibles
            visible_states: Estados de unidades visibles
        """
        # Calcular probabilidades: P(v_i = 1 | h)
        visible_probs = self._sigmoid(np.dot(hidden, self.W.T) + self.visible_bias)
        
        # Para datos continuos, usar las probabilidades directamente
        # Para datos binarios, muestrear estados binarios
        visible_states = visible_probs  # Asumiendo datos continuos normalizados
        
        return visible_probs, visible_states
    
    def _contrastive_divergence(self, batch: np.ndarray) -> Dict[str, np.ndarray]:
        """
        Implementa el algoritmo Contrastive Divergence (CD-k)
        
        Args:
            batch: Batch de datos de entrenamiento
            
        Returns:
            Gradientes para actualizar parámetros
        """
        batch_size = batch.shape[0]
        
        # Fase positiva
        pos_hidden_probs, pos_hidden_states = self._sample_hidden(batch)
        
        # Fase negativa: k pasos de Gibbs sampling
        neg_visible = batch.copy()
        for _ in range(self.k_cd):
            neg_hidden_probs, neg_hidden_states = self._sample_hidden(neg_visible)
            neg_visible_probs, neg_visible = self._sample_visible(neg_hidden_states)
        
        # Calcular gradientes
        pos_associations = np.dot(batch.T, pos_hidden_probs)
        neg_associations = np.dot(neg_visible.T, neg_hidden_probs)
        
        # Gradientes
        dW = (pos_associations - neg_associations) / batch_size
        dv_bias = np.mean(batch - neg_visible, axis=0)
        dh_bias = np.mean(pos_hidden_probs - neg_hidden_probs, axis=0)
        
        return {
            'dW': dW,
            'dv_bias': dv_bias,
            'dh_bias': dh_bias,
            'reconstruction': neg_visible
        }
    
    def _compute_reconstruction_error(self, data: np.ndarray) -> float:
        """Calcula el error de reconstrucción"""
        hidden_probs, _ = self._sample_hidden(data)
        visible_probs, _ = self._sample_visible(hidden_probs)
        return mean_squared_error(data, visible_probs)
    
    def _compute_pseudo_log_likelihood(self, data: np.ndarray, n_samples: int = 100) -> float:
        """
        Calcula pseudo log-likelihood como aproximación de la verosimilitud
        
        Args:
            data: Datos de evaluación
            n_samples: Número de muestras para la estimación
            
        Returns:
            Pseudo log-likelihood promedio
        """
        if len(data) > n_samples:
            indices = np.random.choice(len(data), n_samples, replace=False)
            sample_data = data[indices]
        else:
            sample_data = data
        
        pll = 0
        for sample in sample_data:
            # Calcular energía libre para el sample original
            fe_original = self._free_energy(sample.reshape(1, -1))
            
            # Para cada dimensión, calcular energía libre con bit flippeado
            fe_flipped = []
            for i in range(len(sample)):
                sample_flipped = sample.copy()
                sample_flipped[i] = 1 - sample_flipped[i]  # Flip bit
                fe_flipped.append(self._free_energy(sample_flipped.reshape(1, -1)))
            
            # Pseudo log-likelihood para este sample
            pll += self.n_visible * np.log(self._sigmoid(fe_flipped[0] - fe_original))
        
        return pll / len(sample_data)
    
    def _free_energy(self, visible: np.ndarray) -> float:
        """
        Calcula la energía libre: F(v) = -log(sum_h exp(-E(v,h)))
        
        Args:
            visible: Estado visible
            
        Returns:
            Energía libre
        """
        wx_b = np.dot(visible, self.W) + self.hidden_bias
        vbias_term = np.dot(visible, self.visible_bias)
        hidden_term = np.sum(np.log(1 + np.exp(wx_b)), axis=1)
        return -hidden_term - vbias_term
    
    def fit(self, X: np.ndarray, validation_split: float = 0.2, verbose: bool = True) -> Dict:
        """
        Entrena la RBM usando Contrastive Divergence
        
        Args:
            X: Datos de entrenamiento
            validation_split: Proporción de datos para validación
            verbose: Si mostrar progreso
            
        Returns:
            Historia de entrenamiento
        """
        print(f"\n🚀 INICIANDO ENTRENAMIENTO RBM")
        print(f"{'='*50}")
        print(f"Arquitectura: {self.n_visible}{self.n_hidden}")
        print(f"Hiperparámetros:")
        print(f"  - Learning rate: {self.learning_rate}")
        print(f"  - Epochs: {self.n_epochs}")
        print(f"  - Batch size: {self.batch_size}")
        print(f"  - CD steps: {self.k_cd}")
        print(f"{'='*50}")
        
        # Normalizar datos
        self.scaler = MinMaxScaler()
        X_scaled = self.scaler.fit_transform(X)
        
        # Split train/validation
        if validation_split > 0:
            X_train, X_val = train_test_split(X_scaled, test_size=validation_split, 
                                            random_state=self.random_state)
        else:
            X_train = X_scaled
            X_val = None
        
        n_batches = len(X_train) // self.batch_size
        
        # Entrenamiento
        for epoch in range(self.n_epochs):
            epoch_error = 0
            
            # Shuffle datos
            indices = np.random.permutation(len(X_train))
            X_train_shuffled = X_train[indices]
            
            # Procesar batches
            for i in range(n_batches):
                start_idx = i * self.batch_size
                end_idx = start_idx + self.batch_size
                batch = X_train_shuffled[start_idx:end_idx]
                
                # Contrastive Divergence
                gradients = self._contrastive_divergence(batch)
                
                # Actualizar parámetros
                self.W += self.learning_rate * gradients['dW']
                self.visible_bias += self.learning_rate * gradients['dv_bias']
                self.hidden_bias += self.learning_rate * gradients['dh_bias']
                
                # Acumular error
                epoch_error += self._compute_reconstruction_error(batch)
            
            # Métricas de época
            avg_error = epoch_error / n_batches
            self.training_history['reconstruction_error'].append(avg_error)
            
            # Calcular métricas adicionales cada 10 épocas
            if epoch % 10 == 0 or epoch == self.n_epochs - 1:
                if X_val is not None:
                    val_error = self._compute_reconstruction_error(X_val)
                    pll = self._compute_pseudo_log_likelihood(X_val[:100])  # Muestra pequeña
                    free_energy = np.mean(self._free_energy(X_val[:100]))
                    
                    self.training_history['pseudo_log_likelihood'].append(pll)
                    self.training_history['free_energy'].append(free_energy)
                    
                    if verbose:
                        print(f"Época {epoch+1:3d}/{self.n_epochs} | "
                              f"Error: {avg_error:.6f} | "
                              f"Val Error: {val_error:.6f} | "
                              f"PLL: {pll:.3f}")
                else:
                    if verbose:
                        print(f"Época {epoch+1:3d}/{self.n_epochs} | "
                              f"Error: {avg_error:.6f}")
        
        self.is_trained = True
        print(f"\n✅ ENTRENAMIENTO COMPLETADO")
        print(f"Error final: {self.training_history['reconstruction_error'][-1]:.6f}")
        
        return self.training_history
    
    def transform(self, X: np.ndarray) -> np.ndarray:
        """
        Extrae características de la capa oculta
        
        Args:
            X: Datos de entrada
            
        Returns:
            Activaciones de la capa oculta
        """
        if not self.is_trained:
            raise ValueError("El modelo debe ser entrenado primero")
        
        # Normalizar usando el mismo scaler del entrenamiento
        X_scaled = self.scaler.transform(X)
        
        # Obtener activaciones ocultas
        hidden_probs, _ = self._sample_hidden(X_scaled)
        
        return hidden_probs
    
    def reconstruct(self, X: np.ndarray) -> np.ndarray:
        """
        Reconstruye datos desde la representación oculta
        
        Args:
            X: Datos de entrada
            
        Returns:
            Datos reconstruidos
        """
        if not self.is_trained:
            raise ValueError("El modelo debe ser entrenado primero")
        
        # Normalizar
        X_scaled = self.scaler.transform(X)
        
        # Codificar y decodificar
        hidden_probs, _ = self._sample_hidden(X_scaled)
        visible_probs, _ = self._sample_visible(hidden_probs)
        
        # Desnormalizar
        reconstructed = self.scaler.inverse_transform(visible_probs)
        
        return reconstructed
    
    def generate_samples(self, n_samples: int = 100, n_gibbs: int = 1000) -> np.ndarray:
        """
        Genera muestras sintéticas usando Gibbs sampling
        
        Args:
            n_samples: Número de muestras a generar
            n_gibbs: Número de pasos de Gibbs sampling
            
        Returns:
            Muestras generadas
        """
        if not self.is_trained:
            raise ValueError("El modelo debe ser entrenado primero")
        
        # Inicializar con ruido aleatorio
        samples = np.random.random((n_samples, self.n_visible))
        
        # Gibbs sampling
        for _ in range(n_gibbs):
            hidden_probs, hidden_states = self._sample_hidden(samples)
            visible_probs, samples = self._sample_visible(hidden_states)
        
        # Desnormalizar
        samples_denorm = self.scaler.inverse_transform(samples)
        
        return samples_denorm
    
    def save_model(self, filepath: str, feature_names: List[str] = None):
        """Guarda el modelo entrenado"""
        model_data = {
            'W': self.W,
            'visible_bias': self.visible_bias,
            'hidden_bias': self.hidden_bias,
            'n_visible': self.n_visible,
            'n_hidden': self.n_hidden,
            'learning_rate': self.learning_rate,
            'n_epochs': self.n_epochs,
            'batch_size': self.batch_size,
            'k_cd': self.k_cd,
            'random_state': self.random_state,
            'training_history': self.training_history,
            'is_trained': self.is_trained,
            'scaler': self.scaler,
            'feature_names': feature_names,
            'timestamp': datetime.now().isoformat()
        }
        
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        with open(filepath, 'wb') as f:
            pickle.dump(model_data, f)
        
        print(f"✅ Modelo guardado en: {filepath}")
    
    @classmethod
    def load_model(cls, filepath: str):
        """Carga un modelo entrenado"""
        with open(filepath, 'rb') as f:
            model_data = pickle.load(f)
        
        # Crear instancia
        rbm = cls(
            n_visible=model_data['n_visible'],
            n_hidden=model_data['n_hidden'],
            learning_rate=model_data['learning_rate'],
            n_epochs=model_data['n_epochs'],
            batch_size=model_data['batch_size'],
            k_cd=model_data['k_cd'],
            random_state=model_data['random_state']
        )
        
        # Restaurar parámetros
        rbm.W = model_data['W']
        rbm.visible_bias = model_data['visible_bias']
        rbm.hidden_bias = model_data['hidden_bias']
        rbm.training_history = model_data['training_history']
        rbm.is_trained = model_data['is_trained']
        rbm.scaler = model_data['scaler']
        rbm.feature_names = model_data.get('feature_names', None)
        
        print(f"✅ Modelo cargado desde: {filepath}")
        if rbm.feature_names:
            print(f"  - Características: {len(rbm.feature_names)}")
        return rbm

[documentos] def create_rbm_visualizations(rbm: RestrictedBoltzmannMachine, X_original: np.ndarray, feature_names: List[str] = None) -> Dict: """ Crea visualizaciones de diagnóstico para la RBM Args: rbm: Modelo RBM entrenado X_original: Datos originales feature_names: Nombres de las características Returns: Diccionario con figuras de Plotly """ figures = {} # 1. Curvas de aprendizaje if rbm.training_history['reconstruction_error']: fig_learning = go.Figure() epochs = list(range(1, len(rbm.training_history['reconstruction_error']) + 1)) fig_learning.add_trace(go.Scatter( x=epochs, y=rbm.training_history['reconstruction_error'], mode='lines+markers', name='Error de Reconstrucción', line=dict(color='#e74c3c', width=2) )) fig_learning.update_layout( title="Curva de Aprendizaje - Error de Reconstrucción", xaxis_title="Época", yaxis_title="Error MSE", template="plotly_white", height=400 ) figures['learning_curve'] = fig_learning # 2. Heatmap de pesos fig_weights = go.Figure(data=go.Heatmap( z=rbm.W.T, colorscale='RdBu', zmid=0, showscale=True, colorbar=dict(title="Peso") )) fig_weights.update_layout( title=f"Matriz de Pesos W ({rbm.n_visible} × {rbm.n_hidden})", xaxis_title="Unidades Visibles", yaxis_title="Unidades Ocultas", template="plotly_white", height=500 ) figures['weights_heatmap'] = fig_weights # 3. Distribución de activaciones ocultas if rbm.is_trained: hidden_activations = rbm.transform(X_original) fig_activations = go.Figure() # Histograma de activaciones promedio por unidad oculta mean_activations = np.mean(hidden_activations, axis=0) fig_activations.add_trace(go.Histogram( x=mean_activations, nbinsx=30, name='Activaciones Promedio', marker_color='#3498db', opacity=0.7 )) fig_activations.update_layout( title="Distribución de Activaciones de Unidades Ocultas", xaxis_title="Activación Promedio", yaxis_title="Frecuencia", template="plotly_white", height=400 ) figures['activations_dist'] = fig_activations # 4. Comparación Original vs Reconstruido if rbm.is_trained and len(X_original) > 0: # Tomar una muestra pequeña para visualización sample_size = min(100, len(X_original)) sample_indices = np.random.choice(len(X_original), sample_size, replace=False) X_sample = X_original[sample_indices] X_reconstructed = rbm.reconstruct(X_sample) # Crear subplots para comparar algunas características n_features_to_show = min(6, X_original.shape[1]) feature_indices = np.random.choice(X_original.shape[1], n_features_to_show, replace=False) fig_comparison = make_subplots( rows=2, cols=3, subplot_titles=[f"Característica {i}" for i in feature_indices], vertical_spacing=0.1 ) for idx, feat_idx in enumerate(feature_indices): row = (idx // 3) + 1 col = (idx % 3) + 1 # Original fig_comparison.add_trace( go.Scatter( x=list(range(len(X_sample))), y=X_sample[:, feat_idx], mode='markers', name='Original', marker=dict(color='#e74c3c', size=4), showlegend=(idx == 0) ), row=row, col=col ) # Reconstruido fig_comparison.add_trace( go.Scatter( x=list(range(len(X_sample))), y=X_reconstructed[:, feat_idx], mode='markers', name='Reconstruido', marker=dict(color='#3498db', size=4), showlegend=(idx == 0) ), row=row, col=col ) fig_comparison.update_layout( title="Comparación: Datos Originales vs Reconstruidos", template="plotly_white", height=600 ) figures['reconstruction_comparison'] = fig_comparison return figures
[documentos] def render_rbm_module(): """Renderiza el módulo completo de RBM en Streamlit""" st.title("⚡ Máquina de Boltzmann Restringida (RBM)") st.markdown("### *Extracción de características latentes mediante aprendizaje generativo*") # Información teórica with st.expander("📚 ¿Qué es una RBM?", expanded=False): st.markdown(""" Una **Máquina de Boltzmann Restringida (RBM)** es un modelo generativo no supervisado que aprende representaciones latentes de los datos. **Arquitectura:** - **Capa Visible:** Representa los datos de entrada - **Capa Oculta:** Captura características latentes - **Sin conexiones** dentro de cada capa (restricción) - **Conexiones bidireccionales** entre capas **Función de Energía:** ``` E(v,h) = -∑ᵢ aᵢvᵢ - ∑ⱼ bⱼhⱼ - ∑ᵢⱼ vᵢWᵢⱼhⱼ ``` **Entrenamiento:** - Algoritmo: **Contrastive Divergence (CD-k)** - Objetivo: Maximizar la verosimilitud de los datos - Aproximación: k pasos de Gibbs sampling """) # Verificar datos if not os.path.exists("data/processed/datos_credito_hipotecario_realista.csv"): st.error("❌ No hay datos disponibles. Ve a 'Generar Datos' primero.") return # Cargar datos @st.cache_data def load_data(): df = pd.read_csv("data/processed/datos_credito_hipotecario_realista.csv") return df df = load_data() st.success(f"✅ Datos cargados: {len(df):,} registros, {len(df.columns)} variables") # Configuración de la RBM st.subheader("⚙️ Configuración de la RBM") col1, col2 = st.columns(2) with col1: st.markdown("**Arquitectura:**") n_hidden = st.slider( "Unidades ocultas", min_value=10, max_value=200, value=100, step=10, help="Número de neuronas en la capa oculta" ) learning_rate = st.select_slider( "Tasa de aprendizaje", options=[0.001, 0.005, 0.01, 0.05, 0.1], value=0.01, help="Velocidad de actualización de parámetros" ) with col2: st.markdown("**Entrenamiento:**") n_epochs = st.slider( "Épocas", min_value=10, max_value=200, value=100, step=10, help="Número de iteraciones de entrenamiento" ) batch_size = st.select_slider( "Tamaño de batch", options=[16, 32, 64, 128, 256], value=64, help="Número de muestras por batch" ) k_cd = st.slider( "Pasos CD", min_value=1, max_value=10, value=1, help="Pasos de Gibbs sampling en Contrastive Divergence" ) # Selección de características st.subheader("📊 Selección de Características") # Filtrar solo columnas numéricas numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist() # Excluir algunas columnas problemáticas exclude_cols = ['nivel_riesgo_cod', 'rechazo_automatico'] if 'nivel_riesgo_cod' in numeric_columns else [] numeric_columns = [col for col in numeric_columns if col not in exclude_cols] selected_features = st.multiselect( "Selecciona las características para entrenar la RBM:", options=numeric_columns, default=numeric_columns[:15] if len(numeric_columns) > 15 else numeric_columns, help="Selecciona las variables numéricas para el entrenamiento" ) if not selected_features: st.warning("⚠️ Selecciona al menos una característica.") return # Preparar datos X = df[selected_features].values n_visible = len(selected_features) st.info(f"📊 Datos preparados: {X.shape[0]} muestras × {X.shape[1]} características") # Entrenamiento st.subheader("🚀 Entrenamiento de la RBM") col1, col2 = st.columns([1, 1]) with col1: if st.button("🎯 Entrenar RBM", type="primary", use_container_width=True): with st.spinner("⏳ Entrenando Máquina de Boltzmann..."): try: # Crear RBM rbm = RestrictedBoltzmannMachine( n_visible=n_visible, n_hidden=n_hidden, learning_rate=learning_rate, n_epochs=n_epochs, batch_size=batch_size, k_cd=k_cd, random_state=42 ) # Entrenar history = rbm.fit(X, validation_split=0.2, verbose=False) # Guardar modelo con nombres de características os.makedirs("models/rbm", exist_ok=True) model_path = f"models/rbm/rbm_h{n_hidden}_lr{learning_rate}_e{n_epochs}.pkl" rbm.save_model(model_path, feature_names=selected_features) # Guardar en session state st.session_state.rbm_model = rbm st.session_state.rbm_features = selected_features st.session_state.rbm_data = X st.success("✅ RBM entrenada exitosamente!") # Mostrar métricas finales col_m1, col_m2, col_m3, col_m4 = st.columns(4) with col_m1: final_error = history['reconstruction_error'][-1] st.metric("Error Reconstrucción", f"{final_error:.6f}") with col_m2: if history['pseudo_log_likelihood']: final_pll = float(history['pseudo_log_likelihood'][-1]) st.metric("Pseudo Log-Likelihood", f"{final_pll:.3f}") with col_m3: if history['free_energy']: final_fe = float(history['free_energy'][-1]) st.metric("Energía Libre", f"{final_fe:.3f}") with col_m4: # Calcular sparsity de activaciones hidden_act = rbm.transform(X[:1000]) sparsity = float(np.mean(hidden_act < 0.1)) st.metric("Sparsity", f"{sparsity:.2%}") except Exception as e: st.error(f"❌ Error durante el entrenamiento: {e}") st.exception(e) with col2: # Cargar modelo existente model_files = [] if os.path.exists("models/rbm"): model_files = [f for f in os.listdir("models/rbm") if f.endswith('.pkl')] if model_files: selected_model = st.selectbox( "O cargar modelo existente:", options=model_files, help="Selecciona un modelo RBM previamente entrenado" ) if st.button("📂 Cargar Modelo", use_container_width=True): try: model_path = f"models/rbm/{selected_model}" rbm = RestrictedBoltzmannMachine.load_model(model_path) st.session_state.rbm_model = rbm st.session_state.rbm_features = selected_features st.session_state.rbm_data = X st.success("✅ Modelo cargado exitosamente!") except Exception as e: st.error(f"❌ Error cargando modelo: {e}") # Visualizaciones y análisis if 'rbm_model' in st.session_state: rbm = st.session_state.rbm_model st.divider() st.subheader("📊 Análisis y Visualizaciones") # Crear visualizaciones with st.spinner("🎨 Generando visualizaciones..."): figures = create_rbm_visualizations(rbm, X, selected_features) # Mostrar visualizaciones en tabs tab1, tab2, tab3, tab4 = st.tabs([ "📈 Aprendizaje", "🎨 Pesos", "📊 Activaciones", "🔄 Reconstrucción" ]) with tab1: if 'learning_curve' in figures: st.plotly_chart(figures['learning_curve'], use_container_width=True) else: st.info("No hay datos de entrenamiento disponibles.") with tab2: if 'weights_heatmap' in figures: st.plotly_chart(figures['weights_heatmap'], use_container_width=True) st.markdown(""" **Interpretación del Heatmap de Pesos:** - **Colores rojos:** Conexiones positivas (activación conjunta) - **Colores azules:** Conexiones negativas (inhibición) - **Patrones:** Indican qué características se agrupan """) with tab3: if 'activations_dist' in figures: st.plotly_chart(figures['activations_dist'], use_container_width=True) st.markdown(""" **Análisis de Activaciones:** - Distribución de activaciones promedio de unidades ocultas - Valores cerca de 0.5 indican unidades balanceadas - Valores extremos (0 o 1) pueden indicar unidades especializadas """) with tab4: if 'reconstruction_comparison' in figures: st.plotly_chart(figures['reconstruction_comparison'], use_container_width=True) st.markdown(""" **Calidad de Reconstrucción:** - Comparación entre datos originales y reconstruidos - Puntos cercanos indican buena reconstrucción - Dispersión indica pérdida de información """) # Extracción de características st.divider() st.subheader("🔧 Extracción de Características") col1, col2 = st.columns(2) with col1: if st.button("🎯 Extraer Características RBM", use_container_width=True): with st.spinner("⚡ Extrayendo características latentes..."): # Extraer características hidden_features = rbm.transform(X) # Crear DataFrame con características originales + RBM feature_names_rbm = [f"RBM_H{i+1}" for i in range(hidden_features.shape[1])] df_rbm = pd.DataFrame(hidden_features, columns=feature_names_rbm) # Combinar con datos originales df_enhanced = pd.concat([df.reset_index(drop=True), df_rbm], axis=1) # Guardar dataset enriquecido os.makedirs("data/processed", exist_ok=True) enhanced_path = "data/processed/datos_con_rbm.csv" df_enhanced.to_csv(enhanced_path, index=False) st.success(f"✅ Características extraídas: {hidden_features.shape[1]} nuevas variables") st.success(f"💾 Dataset enriquecido guardado: {enhanced_path}") # Mostrar estadísticas st.metric("Características RBM", hidden_features.shape[1]) st.metric("Dataset Total", f"{df_enhanced.shape[1]} variables") with col2: if st.button("🎲 Generar Muestras Sintéticas", use_container_width=True): with st.spinner("🎲 Generando muestras sintéticas..."): try: # Generar muestras synthetic_samples = rbm.generate_samples(n_samples=100, n_gibbs=1000) # Crear DataFrame df_synthetic = pd.DataFrame(synthetic_samples, columns=selected_features) # Guardar os.makedirs("data/synthetic", exist_ok=True) synthetic_path = "data/synthetic/rbm_synthetic_samples.csv" df_synthetic.to_csv(synthetic_path, index=False) st.success("✅ Muestras sintéticas generadas!") st.success(f"💾 Guardadas en: {synthetic_path}") # Mostrar muestra st.dataframe(df_synthetic.head(), use_container_width=True) except Exception as e: st.error(f"❌ Error generando muestras: {e}")
if __name__ == "__main__": # Para testing print("Módulo RBM cargado correctamente")