REFACTOR
Plan de Refactor — Detección de Cultivos
Cada mejora incluye: código actual, mediciones reales de performance, vulnerabilidades, código propuesto y justificación.
Escenario base (benchmark real, 2026-03-17): Lote de
500 ha → 23,783 píxeles totales, 2,414 exitosos, 25 GeoTIFFs, modelo de 24 steps. Tiempo total medido: 803 seg (13.39 min), de los cuales 797.85 seg (99.3%) en ciclos fenológicos y 5.58 seg en clasificación. Instancia ECS: 2 vCPU, 8GB RAM ($0.07/hora).
Mejora 1: Apertura excesiva de archivos GeoTIFF ✅ HECHO
Archivo: features/services/seleccionar_ciclo_fenologico.py — función procesar_pixeles_desde_geotiff()
Código actual:
for i, (fila, col) in enumerate(zip(filas, columnas)): # 23,783 píxeles
serie_temporal = []
for ruta_raster in archivos_raster: # 25 imágenes
with rasterio.open(ruta_raster) as src: # ABRE Y CIERRA cada vez
datos = src.read(window=Window(col, fila, 1, 1)).squeeze()
serie_temporal.append(datos)
array_bruto = np.array(serie_temporal)
Medición real (benchmark 2026-03-17):
| Métrica | Valor medido |
|---|---|
| Píxeles totales en polígono | 23,783 |
| GeoTIFFs procesados | 25 |
Llamadas a rasterio.open() | 23,783 × 25 = 594,575 |
| Tiempo total medido (disco local, Docker) | 797.85 seg (13.30 min) |
| Tiempo por open (calculado) | ~1.34 ms/open |
| Píxeles exitosos | 2,414 (10.1% del total) |
| % del tiempo total de la tarea | 99.3% |
| Costo ECS por tarea (solo esta etapa) | ~$0.016 |
Vulnerabilidades / errores:
- File descriptor exhaustion: 594,575 open/close puede agotar file descriptors del OS si hay tareas concurrentes. Límite default de Linux: 1024 soft.
- Race condition: Si otro proceso borra o modifica un GeoTIFF mientras se itera, el
rasterio.open()falla sin retry. - Sin timeout: Si un archivo está en NFS y el mount se cuelga, el worker queda bloqueado indefinidamente.
Mejora propuesta:
def procesar_pixeles_desde_geotiff(directorio_raster, ruta_geojson, directorio_salida, num_steps=24, debug=False):
os.makedirs(directorio_salida, exist_ok=True)
archivos_raster = sorted(glob.glob(os.path.join(directorio_raster, '*.tif')))
if not archivos_raster:
raise FileNotFoundError(f"No se encontraron archivos .tif en '{directorio_raster}'")
# Cargar geometría y máscara (igual que antes)
with rasterio.open(archivos_raster[0]) as src:
crs_raster = src.crs
transform = src.transform
gdf = gpd.read_file(ruta_geojson).to_crs(crs_raster)
poligono = gdf.geometry.iloc[0]
with rasterio.open(archivos_raster[0]) as src:
mask = rasterize([poligono], out_shape=(src.height, src.width), transform=transform) > 0
filas, columnas = np.where(mask)
total_pixeles = len(filas)
# ✅ MEJORA: Leer cada imagen COMPLETA una sola vez, indexar en memoria
logger.info(f"Cargando {len(archivos_raster)} imágenes en memoria...")
imagenes = []
for ruta_raster in archivos_raster:
with rasterio.open(ruta_raster) as src:
imagenes.append(src.read()) # shape: (8, H, W) — una sola lectura
logger.info(f"Procesando {total_pixeles} píxeles...")
metadata_resultados = []
pixeles_exitosos = 0
transformador = pyproj.Transformer.from_crs(crs_raster, "epsg:4326", always_xy=True)
for i, (fila, col) in enumerate(zip(filas, columnas)):
if i % 500 == 0:
logger.info(f"Procesando píxel {i+1}/{total_pixeles}")
try:
# ✅ Indexación NumPy directa — nanosegundos por píxel
array_bruto = np.array([img[:, fila, col] for img in imagenes])
array_ciclo, status = procesar_pixel_ciclo_fenologico(array_bruto, num_steps)
# ... (resto del procesamiento igual)
Estimado post-refactor:
| Métrica | Valor |
|---|---|
Llamadas a rasterio.open() | 25 (una por imagen) |
| Tiempo de carga en memoria | ~2-5 segundos (25 imágenes) |
| Tiempo por píxel (indexación NumPy) | ~500 nanosegundos |
| Tiempo total estimado | ~3-6 segundos |
| RAM adicional | ~50-150 MB (25 imágenes en memoria) |
| Speedup vs medido | ~133x a 266x (de 798s a 3-6s) |
¿Por qué se propone?
El benchmark confirmó que esta etapa consume el 99.3% del tiempo total (797.85 de 803.43 seg). Cada rasterio.open() implica: syscall open(), lectura de headers TIFF (IFD), parsing de metadata CRS/transform, creación de objeto Python, y luego close(). Multiplicado por 594,575 veces es absurdo cuando las 25 imágenes recortadas al polígono caben cómodamente en RAM. La mejora lee cada imagen una sola vez y luego indexa en memoria con NumPy, que es una operación de nanosegundos.
Mejora 2: Inferencia píxel por píxel en vez de batch ✅ HECHO
Archivo: features/services/clasificacion_suavizado_espacial.py — función build_probability_cube()
Código actual:
def build_probability_cube(model, pixel_data, num_classes):
# ... inicialización del cubo ...
with torch.no_grad():
for pixel in pixel_data: # 2,414 iteraciones
temporal_data = np.load(pixel['path']) # 1 syscall por píxel
temporal_tensor = torch.from_numpy(temporal_data).float().unsqueeze(0) # batch=1
output = model(temporal_tensor) # Forward pass con 1 sola muestra
probs = torch.softmax(output, dim=1).squeeze(0).numpy()
fila_idx = pixel['fila'] - min_fila
col_idx = pixel['columna'] - min_col
prob_cube[fila_idx, col_idx, :] = probs
pixel_mask[fila_idx, col_idx] = True
pixel_map[(fila_idx, col_idx)] = pixel
Medición real (benchmark 2026-03-17):
| Métrica | Valor medido |
|---|---|
| Píxeles clasificados | 2,414 |
| Forward passes | 2,414 (uno por píxel) |
np.load() syscalls | 2,414 |
| Tiempo total clasificación | 5.58 seg |
| Tiempo por píxel (calculado) | ~2.31 ms/píxel |
| Píxeles clase_99 (dudosos) | 744 (31% del total) |
| % del tiempo total de la tarea | 0.7% |
Vulnerabilidades / errores:
- Archivo .npy corrupto rompe todo: Si un solo archivo .npy está corrupto o tiene shape incorrecto, el loop entero falla sin procesar los píxeles restantes.
- Sin validación de shape: No se verifica que
temporal_data.shape == (num_steps, 7)antes del forward pass. Un array con shape incorrecto causa un error críptico de PyTorch. - Memoria fragmentada: 2,414 llamadas a
np.load()+torch.from_numpy()crean y destruyen miles de tensores pequeños, fragmentando memoria.
Mejora propuesta:
def build_probability_cube(model, pixel_data, num_classes, num_steps=24, batch_size=512):
filas = [p['fila'] for p in pixel_data]
columnas = [p['columna'] for p in pixel_data]
min_fila, max_fila = min(filas), max(filas)
min_col, max_col = min(columnas), max(columnas)
alto = max_fila - min_fila + 1
ancho = max_col - min_col + 1
prob_cube = np.zeros((alto, ancho, num_classes), dtype=np.float32)
pixel_mask = np.zeros((alto, ancho), dtype=bool)
pixel_map = {}
# ✅ MEJORA 1: Cargar todos los .npy de una vez con validación
all_arrays = []
valid_pixels = []
for pixel in pixel_data:
try:
data = np.load(pixel['path'])
if data.shape != (num_steps, 7):
logger.warning(f"Shape inválido en {pixel['id']}: {data.shape}, esperado ({num_steps}, 7)")
continue
all_arrays.append(data)
valid_pixels.append(pixel)
except Exception as e:
logger.warning(f"Error cargando {pixel['id']}: {e}")
continue
if not all_arrays:
raise FileNotFoundError("No se pudieron cargar archivos .npy válidos")
# ✅ MEJORA 2: Stack en un solo tensor
batch_tensor = torch.from_numpy(np.stack(all_arrays)).float() # (N, num_steps, 7)
# ✅ MEJORA 3: Inferencia en mini-batches
all_probs = []
with torch.no_grad():
for i in range(0, len(batch_tensor), batch_size):
batch = batch_tensor[i:i + batch_size]
output = model(batch)
probs = torch.softmax(output, dim=1).numpy()
all_probs.append(probs)
all_probs = np.concatenate(all_probs, axis=0) # (N, num_classes)
# Asignar al cubo
for i, pixel in enumerate(valid_pixels):
fila_idx = pixel['fila'] - min_fila
col_idx = pixel['columna'] - min_col
prob_cube[fila_idx, col_idx, :] = all_probs[i]
pixel_mask[fila_idx, col_idx] = True
pixel_map[(fila_idx, col_idx)] = pixel
logger.info(f"Cubo construido: {alto}x{ancho}x{num_classes} ({len(valid_pixels)} píxeles válidos)")
return prob_cube, pixel_mask, pixel_map, (min_fila, min_col)
Estimado post-refactor:
| Métrica | Valor |
|---|---|
| Forward passes | 5 (2,414 / 512 batches) |
np.load() syscalls | 1 (con Mejora 3, carga .npz único) |
| Overhead por forward pass | ~3-5ms × 5 = ~25ms |
| Tiempo de cómputo real | ~0.1ms × 2,414 = ~241ms |
| Tiempo total estimado | ~0.3-0.5 segundos |
| Utilización CPU | ~85% (cómputo domina sobre overhead) |
| Speedup vs medido | ~11-19x (de 5.58s a 0.3-0.5s) |
¿Por qué se propone?
Aunque la clasificación solo toma 5.58 seg (0.7% del total), el patrón de inferencia píxel-por-píxel es ineficiente. PyTorch está optimizado para operaciones matriciales en batch. Con batch_size=1, el overhead (setup de tensores, sincronización interna del LSTM, allocación de memoria) domina. Con batch_size=512, el LSTM procesa 512 secuencias en paralelo usando instrucciones SIMD del CPU. Además, la validación de shape previene errores crípticos en producción.
Mejora 3: Miles de archivos .npy individuales ✅ HECHO
Archivos:
features/services/seleccionar_ciclo_fenologico.py— guarda 1 archivo por píxelfeatures/services/clasificacion_suavizado_espacial.py— carga 1 archivo por píxel
Código actual (guardar):
# En seleccionar_ciclo_fenologico.py
for i, (fila, col) in enumerate(zip(filas, columnas)):
# ... procesamiento ...
if array_ciclo is not None:
ruta_salida = os.path.join(directorio_salida, f"pixel_{i+1}_ciclo.npy")
np.save(ruta_salida, array_ciclo) # 1 syscall write por píxel
Código actual (cargar):
# En clasificacion_suavizado_espacial.py
for pixel in pixel_data:
temporal_data = np.load(pixel['path']) # 1 syscall read por píxel
Medición real (benchmark 2026-03-17):
| Métrica | Valor medido |
|---|---|
| Archivos .npy creados | 2,414 |
| Tamaño total en disco | 3.39 MB |
| Tamaño por archivo | ~1.4 KB (24×7×4 bytes + header) |
| Overhead filesystem por archivo | ~4 KB (inodo + metadata) |
| Espacio real en disco | 3.39 MB datos + ~9.7 MB overhead inodos = ~13 MB |
| Nota | Con modelo 8 steps serían ~0.55 KB/archivo → ~1.33 MB datos |
Vulnerabilidades / errores:
- Límite de inodos: EFS y algunos filesystems tienen límites de inodos. 5,500 archivos por tarea × 100 tareas concurrentes = 550,000 archivos. Puede agotar inodos.
- Listado lento:
os.listdir()en un directorio con 5,500 archivos es O(n). En EFS puede tardar segundos. - Sin atomicidad: Si el proceso muere a mitad del guardado, quedan archivos parciales sin forma de saber cuáles están completos.
- Cleanup manual: No hay limpieza automática. Los archivos se acumulan indefinidamente.
Mejora propuesta (guardar):
# En seleccionar_ciclo_fenologico.py
def procesar_pixeles_desde_geotiff(directorio_raster, ruta_geojson, directorio_salida, num_steps=24):
os.makedirs(directorio_salida, exist_ok=True)
# ... (carga de imágenes y procesamiento igual) ...
# ✅ Acumular en memoria en vez de guardar uno por uno
ciclos_data = []
ciclos_metadata = []
for i, (fila, col) in enumerate(zip(filas, columnas)):
try:
array_bruto = np.array([img[:, fila, col] for img in imagenes])
array_ciclo, status = procesar_pixel_ciclo_fenologico(array_bruto, num_steps)
centro_x, centro_y = transform * (col + 0.5, fila + 0.5)
lon, lat = transformador.transform(centro_x, centro_y)
pixel_meta = {
"pixel_id": f"pixel_{i+1}",
"fila": int(fila),
"columna": int(col),
"latitud": lat,
"longitud": lon,
"status": status,
"idx": len(ciclos_data) if array_ciclo is not None else None
}
ciclos_metadata.append(pixel_meta)
if array_ciclo is not None and status in ['Exitoso', 'Ciclo_Incompleto']:
ciclos_data.append(array_ciclo)
except Exception as e:
ciclos_metadata.append({
"pixel_id": f"pixel_{i+1}",
"fila": int(fila),
"columna": int(col),
"status": "Error",
"error": str(e),
"idx": None
})
# ✅ UN solo archivo con todos los ciclos
if ciclos_data:
ciclos_array = np.stack(ciclos_data) # (N_exitosos, num_steps, 7)
np.savez_compressed(
os.path.join(directorio_salida, "ciclos.npz"),
data=ciclos_array
)
# Metadata en JSON (igual que antes)
with open(os.path.join(directorio_salida, "metadata.json"), 'w') as f:
json.dump(ciclos_metadata, f, indent=2)
logger.info(f"Guardados {len(ciclos_data)} ciclos en ciclos.npz")
Mejora propuesta (cargar):
# En clasificacion_suavizado_espacial.py
def get_pixel_data_with_coords(npy_folder: str, metadata_path: str, num_steps: int = 24):
with open(metadata_path, 'r') as f:
metadata = json.load(f)
# ✅ UN solo load
npz_path = os.path.join(npy_folder, "ciclos.npz")
loaded = np.load(npz_path)
all_data = loaded['data'] # (N, num_steps, 7)
pixel_data = []
for pixel in metadata:
if pixel.get('idx') is not None:
pixel_data.append({
'id': pixel['pixel_id'],
'fila': pixel['fila'],
'columna': pixel['columna'],
'lat': pixel['latitud'],
'lon': pixel['longitud'],
'data': all_data[pixel['idx']] # Referencia directa al array
})
return pixel_data, all_data
Estimado post-refactor:
| Métrica | Valor |
|---|---|
| Archivos creados | 2 (ciclos.npz + metadata.json) |
| Syscalls write | 2 |
| Syscalls read | 2 |
| Tamaño ciclos.npz (comprimido) | ~1-2 MB (2,414 ciclos de 24 steps) |
| Tamaño metadata.json | ~2 MB (23,783 píxeles con metadata) |
| Espacio real en disco | ~4 MB (vs ~13 MB antes) |
| Tiempo guardar | ~0.2-0.5 segundos |
| Tiempo cargar | ~0.1-0.3 segundos |
| Reducción de archivos | 2,414 → 2 (99.9% menos) |
¿Por qué se propone?
El benchmark mostró 2,414 archivos de ~1.4 KB cada uno. El overhead de inodos (~4 KB por archivo) es ~3x mayor que los datos mismos. Además, np.savez_compressed usa zlib que comprime muy bien datos numéricos repetitivos (los valores de reflectancia tienen patrones). El resultado es menos espacio, menos I/O, menos inodos, y operaciones atómicas (si el .npz existe, está completo). Con modelo de 8 steps los archivos serían más chicos (~0.55 KB) y el ratio overhead/datos sería aún peor.
Mejora 4: Credenciales AWS impresas en stdout ✅ HECHO
Archivo: features/services/build_dataset.py — función extract_pixel_data()
Código actual:
def extract_pixel_data(self, url, geometry, use_s3=False):
try:
# ...
session = boto3.Session(
aws_access_key_id=self.rasterio_env['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=self.rasterio_env['AWS_SECRET_ACCESS_KEY'],
aws_session_token=self.rasterio_env['AWS_SESSION_TOKEN'],
region_name='us-west-2'
)
print(self.rasterio_env) # ⚠️ IMPRIME ACCESS KEY, SECRET KEY Y SESSION TOKEN
# ...
print(f"URL final para rasterio: {final_url}") # ⚠️ Imprime URLs internas
Estimado actual:
| Métrica | Valor |
|---|---|
Llamadas a print(self.rasterio_env) por tarea | ~240 (8 bandas × 30 granulos) |
| Datos sensibles expuestos por tarea | AccessKeyId, SecretAccessKey, SessionToken |
| Destino del print | stdout → logs de Celery → CloudWatch/Sentry |
Vulnerabilidades / errores:
- CRÍTICO — Exposición de credenciales:
print(self.rasterio_env)imprimeAWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEYyAWS_SESSION_TOKENen texto plano. Estas credenciales van a:- Logs de Celery (stdout)
- CloudWatch Logs (si está configurado)
- Sentry (el
event_levelestá enINFO, captura todo) - Cualquier sistema de log aggregation
- Credenciales temporales pero peligrosas: Aunque son temporales (1 hora), permiten acceso a
lp-prod-protectedde NASA conRequestPayer=requester. Un atacante podría generar costos de transferencia. - URLs internas expuestas: Las URLs S3 revelan estructura de buckets y paths internos.
Mejora propuesta:
def extract_pixel_data(self, url, geometry, use_s3=False):
try:
# ...
session = boto3.Session(
aws_access_key_id=self.rasterio_env['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=self.rasterio_env['AWS_SECRET_ACCESS_KEY'],
aws_session_token=self.rasterio_env['AWS_SESSION_TOKEN'],
region_name='us-west-2'
)
# ✅ Eliminados los print de credenciales y URLs
logger.debug(f"Accediendo a raster: {url.split('/')[-1]}") # Solo el filename
Nuevo estimado:
| Métrica | Valor |
|---|---|
| Credenciales expuestas | 0 |
| Datos sensibles en logs | Ninguno |
¿Por qué se propone?
Es una vulnerabilidad de seguridad directa. Las credenciales AWS en logs son un vector de ataque conocido. Sentry captura eventos de nivel INFO, lo que significa que cada tarea envía ~240 copias de las credenciales temporales a un servicio externo. Aunque son temporales, durante su ventana de validez (1 hora) permiten acceso a datos de NASA con costos de transferencia a cargo de la cuenta.
Mejora 5: Credenciales AWS hardcodeadas en test.py ✅ HECHO
Archivo: test.py (raíz del proyecto)
Código actual:
import boto3
cred = {
"AWS_ACCESS_KEY_ID": "ASIAZLX6ZES4WMYECNB2",
"AWS_SECRET_ACCESS_KEY": "HdjhQU4t0r3bb0gndqNM6VLhIpHWt9G6471bF4xX",
"AWS_SESSION_TOKEN": "IQoJb3JpZ2luX2VjEIL//////////wEaCXVzLXdlc3QtMiJH...",
"AWS_REGION": "us-west-2",
}
s3 = boto3.client("s3",
aws_access_key_id=cred["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=cred["AWS_SECRET_ACCESS_KEY"],
region_name=cred["AWS_REGION"],
aws_session_token=cred["AWS_SESSION_TOKEN"],
)
Vulnerabilidades / errores:
- CRÍTICO — Credenciales en repositorio Git: Aunque son temporales (session token), el archivo está commiteado. El historial de Git conserva todas las versiones anteriores, incluyendo credenciales que pudieron ser válidas.
- Account ID expuesto: El
AWS_ACCESS_KEY_IDcon prefijoASIAindica credenciales temporales de STS. El account ID643705676985se puede extraer del ARN implícito. - Bucket de producción: El código accede a
lp-prod-protected(bucket de NASA LPDAAC). Si las credenciales fueran válidas, cualquiera con acceso al repo podría generar costos. - El archivo se copia al container Docker: El
DockerfilehaceCOPY ./ /var/async/, lo que incluyetest.pyen la imagen de producción.
Mejora propuesta:
# test.py — Usar variables de entorno o perfil AWS
import boto3
import os
s3 = boto3.client("s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION", "us-west-2"),
aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
)
Y agregar al .gitignore y .dockerignore:
test.py
¿Por qué se propone?
Credenciales en código fuente es el error de seguridad #1 en cloud. Aunque estas son temporales y probablemente ya expiraron, el patrón es peligroso. Si alguien copia el patrón con credenciales de larga duración (IAM user keys), el impacto es severo. Además, el archivo se incluye en la imagen Docker de producción sin necesidad.
Mejora 6: USE_S3_ACCESS se lee como string pero se usa como booleano ✅ HECHO
Archivos:
core/settings.py— define la variablefeatures/services/build_dataset.py— la usa
Código actual:
# core/settings.py
USE_S3_ACCESS = os.getenv('USE_S3_ACCESS', 'False') # Siempre es string
# build_dataset.py — se pasa directamente
pixel_data, meta = self.extract_pixel_data(http_url, geometry, use_s3=USE_S3_ACCESS)
# extract_pixel_data recibe:
def extract_pixel_data(self, url, geometry, use_s3=False):
if use_s3: # ⚠️ 'False' (string) es truthy en Python!
Vulnerabilidades / errores:
- Bug lógico:
os.getenv('USE_S3_ACCESS', 'False')retorna el string'False', no el booleanoFalse. En Python, cualquier string no vacío es truthy:bool('False') == True # ⚠️
bool('') == False
bool('True') == True - Consecuencia: Si
USE_S3_ACCESS=Falseen el.env, el código interpreta'False'comoTruey usa acceso S3 en vez de HTTP. - Impacto en costos: El acceso S3 requiere credenciales temporales (request adicional a
s3credentialsendpoint), mientras que HTTP usa el Bearer token directamente. Si se fuerza S3 sin querer, se hacen requests innecesarios. - Falla silenciosa: No hay error, simplemente usa el modo incorrecto.
Mejora propuesta:
# core/settings.py
USE_S3_ACCESS = os.getenv('USE_S3_ACCESS', 'false').lower() in ('true', '1', 'yes')
Nuevo estimado:
| Métrica | Antes | Después |
|---|---|---|
USE_S3_ACCESS=False en .env | Usa S3 (bug) | Usa HTTP (correcto) |
USE_S3_ACCESS=True en .env | Usa S3 | Usa S3 |
USE_S3_ACCESS no definido | Usa S3 (bug, default 'False' es truthy) | Usa HTTP (correcto) |
¿Por qué se propone?
Es un bug silencioso que cambia el comportamiento del sistema. El default debería ser HTTP (más simple, sin credenciales temporales), pero por el bug de string-como-booleano, el default real es S3. Esto genera requests innecesarios al endpoint de credenciales y puede causar fallos si las credenciales S3 no están disponibles.
Mejora 7: db.py carga archivo JSON a nivel de módulo ✅ HECHO
Archivo: features/services/db.py
Código actual:
import json
from pathlib import Path
from core.database_handler import db_query
# ⚠️ Se ejecuta al importar el módulo, no al llamar una función
_LABEL_MAPPING_PATH = Path(__file__).parent.parent / "models" / "label_mapping_24steps.json"
with open(_LABEL_MAPPING_PATH, "r") as f:
_NAME_TO_CROP_ID = json.load(f) # Si el archivo no existe, CRASH al importar
Vulnerabilidades / errores:
- Crash al importar: Si
label_mapping_24steps.jsonno existe o está corrupto, elimportdedb.pyfalla. Comodb.pyse importa enfeatures/interface/main.py, y este se importa enrun.py, el worker de Celery entero no arranca. - Hardcodeo de "24steps": Siempre carga el mapping de 24 steps, incluso si la tarea usa modelo de 8 o 15 steps. Si los mappings difieren entre modelos, los crop_ids insertados en DB serían incorrectos.
- Sin manejo de errores: No hay try/except. Un JSON malformado mata el proceso.
- Acoplamiento: El módulo de base de datos depende de un archivo de modelo de ML. Si se reorganizan los modelos, db.py se rompe.
Mejora propuesta:
import json
import logging
from pathlib import Path
from core.database_handler import db_query
logger = logging.getLogger(__name__)
_MODELS_DIR = Path(__file__).parent.parent / "models"
def _load_label_mapping(num_steps: int = 24, fine_tuning: bool = True) -> dict:
"""Carga label mapping bajo demanda con fallback"""
suffix = '_ft' if fine_tuning else ''
path = _MODELS_DIR / f"label_mapping_{num_steps}steps{suffix}.json"
try:
with open(path, "r") as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Label mapping no encontrado: {path}")
return {}
except json.JSONDecodeError as e:
logger.error(f"Label mapping corrupto: {path} — {e}")
return {}
def update_db(uuid_detection="", status="", error_message=None, results=None):
query = """
UPDATE sales_adis_detections
SET process_status = %(status)s,
error_message = %(error_message)s,
results = %(results)s
WHERE uuid_detection = %(uuid_detection)s
RETURNING *
"""
db_query(query, {
"status": status,
"error_message": error_message,
"results": json.dumps(results) if results is not None else None,
"uuid_detection": uuid_detection
}, True).fetchone()
def insert_detection_crops(uuid_detection: str, resultados: dict, num_steps: int = 24, fine_tuning: bool = True):
"""Inserta un registro por cultivo en sales_adis_detection_crops"""
name_to_crop_id = _load_label_mapping(num_steps, fine_tuning)
if not name_to_crop_id:
logger.error("No se pudo cargar label mapping, saltando insert de crops")
return
row = db_query(
"SELECT id FROM sales_adis_detections WHERE uuid_detection = %(uuid)s",
{"uuid": uuid_detection}, True
).fetchone()
if not row:
raise ValueError(f"No se encontró detection con uuid_detection={uuid_detection}")
detection_id = row["id"]
for crop_name, data in resultados.items():
if crop_name == "clase_99":
crop_id = 99
else:
crop_id = name_to_crop_id.get(crop_name)
if crop_id is None:
continue
db_query(
"""
INSERT INTO sales_adis_detection_crops (detection_id, crop_id, confidence, area)
VALUES (%(detection_id)s, %(crop_id)s, %(confidence)s, %(area)s)
RETURNING *
""",
{
"detection_id": detection_id,
"crop_id": crop_id,
"confidence": data["confianza_promedio"],
"area": data["superficie_ha"]
}, True
).fetchone()
Estimado:
| Métrica | Antes | Después |
|---|---|---|
| Archivo faltante | Worker no arranca | Warning en log, tarea falla gracefully |
| JSON corrupto | Worker no arranca | Warning en log, tarea falla gracefully |
| Modelo 8 steps | Usa mapping de 24 (potencial bug) | Usa mapping correcto |
| Overhead de carga | 0 (carga al import) | ~1ms (carga bajo demanda, una vez por tarea) |
¿Por qué se propone?
Un módulo de base de datos no debería depender de un archivo de ML para poder importarse. La carga a nivel de módulo es un anti-pattern que convierte un error recuperable (archivo faltante) en un error fatal (worker no arranca). La carga bajo demanda con manejo de errores permite que el worker arranque siempre y que las tareas fallen individualmente con mensajes claros.
Mejora 8: Manipulación de sys.path en múltiples archivos ✅ HECHO
Archivos afectados:
features/services/inferencia_completa.pyfeatures/services/clasificacion_suavizado_espacial.pyfeatures/services/seleccionar_ciclo_fenologico.py
Código actual:
# inferencia_completa.py
import os, sys
_services_dir = os.path.dirname(os.path.abspath(__file__))
_project_root = os.path.abspath(os.path.join(_services_dir, '..', '..'))
sys.path.insert(0, _services_dir)
sys.path.insert(0, _project_root)
# ...
from build_dataset import EarthdataProcessor # Import sin prefijo de paquete
from seleccionar_ciclo_fenologico import procesar_pixeles_desde_geotiff
from clasificacion_suavizado_espacial import run_classification_with_smoothing
# clasificacion_suavizado_espacial.py
import os, sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'utils'))
# ...
from rasterizar_imagen_hsl import rasterizar_imagen_utm # Import sin prefijo
from simple_classifier_lstm import TemporalClassifierOnly
# seleccionar_ciclo_fenologico.py
import os, sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'utils'))
Vulnerabilidades / errores:
- Colisión de nombres:
sys.path.insert(0, _services_dir)pone el directorio de services al inicio del path. Si hay un módulo estándar de Python con el mismo nombre que un service (ej:json,logging), el service lo sobreescribe. - Imports no determinísticos: El orden de
sys.pathdepende de qué módulo se importa primero. Si Celery importa en orden diferente al esperado, los imports pueden fallar. - No funciona con paquetes:
from build_dataset import ...no es un import de paquete Python válido. No se puede usar con herramientas comomypy,pytest,pylint, o IDEs con autocompletado. - Fragilidad: Si se mueve un archivo de carpeta, hay que actualizar los
sys.path.inserten todos los archivos que lo importan.
Mejora propuesta:
Convertir features/services/ y features/utils/ en paquetes Python reales con __init__.py y usar imports absolutos:
# features/services/__init__.py (crear archivo vacío)
# features/utils/__init__.py (crear archivo vacío)
# features/services/inferencia_completa.py — SIN sys.path.insert
from features.services.build_dataset import EarthdataProcessor
from features.services.seleccionar_ciclo_fenologico import procesar_pixeles_desde_geotiff
from features.services.clasificacion_suavizado_espacial import run_classification_with_smoothing
from core.settings import EARTHDATA_USERNAME, EARTHDATA_PASSWORD
# features/services/clasificacion_suavizado_espacial.py — SIN sys.path.insert
from features.services.rasterizar_imagen_hsl import rasterizar_imagen_utm
from features.services.simple_classifier_lstm import TemporalClassifierOnly
# features/services/seleccionar_ciclo_fenologico.py — SIN sys.path.insert
from features.utils.utils_logging import configurar_logging
from features.utils.utils_validacion import validar_num_steps
Estimado:
| Métrica | Antes | Después |
|---|---|---|
sys.path.insert en el proyecto | 6+ llamadas | 0 |
| Compatibilidad con pytest | ❌ Requiere conftest.py con hacks | ✅ Funciona directo |
| Compatibilidad con mypy/pylint | ❌ No resuelve imports | ✅ Resuelve imports |
| Autocompletado en IDE | ❌ Parcial | ✅ Completo |
| Riesgo de colisión de nombres | Alto | Ninguno |
¿Por qué se propone?
sys.path.insert es un hack que rompe el sistema de paquetes de Python. Hace que los imports sean frágiles, no determinísticos y incompatibles con herramientas de análisis estático. La solución es trivial: crear __init__.py en las carpetas y usar imports absolutos desde la raíz del proyecto (que ya está en sys.path porque Celery se ejecuta desde ahí).
Mejora 9: Archivos temporales sin limpieza automática ✅ HECHO
Archivos afectados:
features/services/build_dataset.py— generageotiff_output/lote_{uuid}/features/services/seleccionar_ciclo_fenologico.py— generaciclos_fenologicos/lote_{uuid}_{steps}steps/features/services/inferencia_completa.py— generaresultados_clasificacion/{uuid}/ytemp_geojson_{uuid}.geojson
Código actual:
# build_dataset.py — crea directorio pero nunca lo borra
geotiffs_dir = self.output_dir / f"lote_{lote_id}"
geotiffs_dir.mkdir(parents=True, exist_ok=True)
# ... guarda GeoTIFFs ...
# ❌ No hay cleanup
# seleccionar_ciclo_fenologico.py
os.makedirs(directorio_salida, exist_ok=True)
# ... guarda .npy ...
# ❌ No hay cleanup
# inferencia_completa.py — el único que intenta limpiar algo
geojson_path = f"temp_geojson_{uuid_detection}.geojson"
with open(geojson_path, 'w') as f:
json.dump(geojson_temp, f)
# ...
if os.path.exists(geojson_path):
os.remove(geojson_path) # ✅ Solo este se limpia
Estimado actual:
| Métrica | Valor |
|---|---|
| Espacio por tarea (GeoTIFFs) | ~50-200 MB |
| Espacio por tarea (ciclos .npy) | ~5-50 MB |
| Espacio por tarea (resultados) | ~1-5 MB |
| Total por tarea | ~60-250 MB |
| Tareas por día (estimado) | ~100 |
| Acumulación diaria | ~6-25 GB |
| Acumulación mensual | ~180-750 GB |
Vulnerabilidades / errores:
- Disco lleno: Sin cleanup, el disco se llena eventualmente. En ECS con EFS, esto puede afectar a todos los workers.
- Costos de storage: EFS cobra por GB almacenado. 750 GB × $0.30/GB = $225/mes solo en archivos temporales.
- Datos huérfanos: Si una tarea falla a mitad, quedan archivos parciales que nunca se usan ni se limpian.
- Sin trazabilidad: No hay forma de saber qué archivos corresponden a qué tarea o cuándo se crearon.
Mejora propuesta:
# features/services/inferencia_completa.py
import shutil
from contextlib import contextmanager
@contextmanager
def temp_directories(uuid_detection: str, num_steps: int):
"""Context manager que crea y limpia directorios temporales"""
dirs = {
'geotiff': f"geotiff_output/lote_{uuid_detection}",
'ciclos': f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps",
'resultados': f"resultados_clasificacion/{uuid_detection}",
'temp_geojson': f"temp_geojson_{uuid_detection}.geojson"
}
# Crear directorios
for key, path in dirs.items():
if key != 'temp_geojson':
os.makedirs(path, exist_ok=True)
try:
yield dirs
finally:
# Limpiar todo al terminar (éxito o error)
for key, path in dirs.items():
try:
if os.path.isdir(path):
shutil.rmtree(path)
logger.debug(f"Limpiado directorio: {path}")
elif os.path.isfile(path):
os.remove(path)
logger.debug(f"Limpiado archivo: {path}")
except Exception as e:
logger.warning(f"Error limpiando {path}: {e}")
def inferencia_completa(uuid_detection: str, poligono_wkt: str, ...):
with temp_directories(uuid_detection, num_steps) as dirs:
# Paso 1: Descargar imágenes
if not ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
return None # El finally limpia automáticamente
# Paso 2: Ciclos fenológicos
if not ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, num_steps):
return None # El finally limpia automáticamente
# Paso 3: Clasificación (sube a S3, no necesita archivos locales después)
stats = ejecutar_clasificacion(...)
return resultado
# Al salir del with, se limpian todos los directorios
Nuevo estimado:
| Métrica | Antes | Después |
|---|---|---|
| Espacio residual por tarea | 60-250 MB | 0 |
| Acumulación mensual | 180-750 GB | 0 |
| Costo storage mensual | $55-225 | $0 |
| Archivos huérfanos | Sí | No |
¿Por qué se propone?
Los archivos temporales son exactamente eso: temporales. Una vez que el GeoTIFF final está en S3 y los resultados en la base de datos, los archivos intermedios no tienen valor. El context manager garantiza limpieza incluso si la tarea falla con excepción, evitando acumulación de datos huérfanos.
Mejora 10: Código muerto en build_dataset.py ✅ HECHO
Archivo: features/services/build_dataset.py — clase EarthdataProcessor
Código actual:
La clase EarthdataProcessor tiene ~600 líneas. Las siguientes funciones nunca se llaman desde el flujo del worker:
# NUNCA se llama (show_granule_images está comentado en process_lote_earthdata)
def show_granule_images(self, granule_data, lote_id, geometry):
"""Mostrar imágenes de bandas para un granulo usando matplotlib"""
import matplotlib.pyplot as plt
# ... 60 líneas de código de visualización ...
# NUNCA se llama (save_netcdf=False siempre en el worker)
def save_lote_data(self, lote_result):
"""Guardar datos de un lote"""
# ... 10 líneas ...
# NUNCA se llama (solo desde save_lote_data)
def create_xarray_dataset(self, lote_result):
"""Crear dataset XArray con todos los valores de pixels"""
# ... 90 líneas ...
# NUNCA se llama desde el worker (solo desde CLI con --json)
def process_all_lotes(self, json_path, max_lotes=None):
"""Procesar todos los lotes"""
# ... 50 líneas ...
# NUNCA se llama (main() es para CLI)
def main():
"""Función principal"""
# ... 50 líneas ...
Estimado actual:
| Métrica | Valor |
|---|---|
| Líneas de código muerto | ~260 líneas (~43% del archivo) |
| Imports innecesarios por código muerto | xarray, matplotlib, seaborn |
Tiempo de import de matplotlib | ~500ms-1s (primera vez) |
RAM de matplotlib cargado | ~30-50 MB |
Tamaño de matplotlib en Docker image | ~50 MB |
Vulnerabilidades / errores:
- Import de matplotlib en producción: Aunque
show_granule_imagesnunca se llama,matplotlibse importa condicionalmente dentro de la función. Sin embargo,xarraysí se importa a nivel de módulo y arrastranetCDF4como dependencia. - Superficie de ataque: Más código = más dependencias = más CVEs potenciales.
matplotlibyxarraytienen historial de vulnerabilidades. - Docker image inflada: Las dependencias de visualización (
matplotlib,seaborn) y datos (xarray,netCDF4) agregan ~150 MB a la imagen Docker sin necesidad. - Confusión de mantenimiento: Un desarrollador nuevo no sabe qué funciones son parte del flujo real y cuáles son legacy.
Mejora propuesta:
# build_dataset.py — Solo lo que usa el worker
import json
import requests
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import logging
import rasterio
from rasterio.mask import mask
from rasterio.session import AWSSession
from shapely.geometry import shape
import os
import boto3
from features.services.earthdata_token_manager import EarthdataTokenManager
from core.settings import EARTHDATA_USERNAME, EARTHDATA_PASSWORD, USE_S3_ACCESS
logger = logging.getLogger(__name__)
class EarthdataProcessor:
"""Procesador de datos satelitales usando NASA Earthdata"""
FMASK_VALID_VALUES = {0, 64, 128, 192}
MIN_GRANULES = {8: 6, 15: 14, 24: 22}
FMASK_MIN_QUALITY = 0.70
def __init__(self, username, password, output_dir="geotiffs"):
self.username = username
self.password = password
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.aws_credentials = None
self.credentials_expiry = None
self.token_manager = EarthdataTokenManager(username, password)
self.access_token = None
self.hls_base_url = "https://cmr.earthdata.nasa.gov/search/granules.json"
self.tile_centroids = self._load_tile_centroids()
# ✅ Solo los métodos que usa el worker:
# - authenticate_if_needed()
# - search_hls_granules()
# - _load_tile_centroids()
# - filter_granules_by_closest_tile()
# - credentials_expired()
# - get_aws_credentials()
# - configure_gdal_s3()
# - extract_pixel_data()
# - get_all_pixel_coordinates()
# - _fmask_quality_ok()
# - process_lote_earthdata()
# - save_granule_geotiff()
# ❌ ELIMINADOS:
# - show_granule_images() → era debug visual
# - save_lote_data() → save_netcdf nunca es True
# - create_xarray_dataset() → solo desde save_lote_data
# - process_all_lotes() → solo desde CLI
# - main() → solo desde CLI
Y en requirements.txt, eliminar dependencias no usadas por el worker:
- xarray>=2023.1.0
- netCDF4>=1.6.0
- matplotlib>=3.7.0
- seaborn>=0.12.0
Nuevo estimado:
| Métrica | Antes | Después |
|---|---|---|
| Líneas en build_dataset.py | ~600 | ~340 |
| Dependencias Python | 22 | 18 |
| Tamaño Docker image | ~1.8 GB (estimado) | ~1.5 GB |
| RAM al importar módulo | ~80 MB (con xarray) | ~30 MB |
| Tiempo de build Docker | ~3-5 min | ~2-3 min |
¿Por qué se propone?
Menos código = menos bugs, menos dependencias, menos superficie de ataque, imagen Docker más chica, builds más rápidos. Las funciones de visualización y CLI son útiles para desarrollo local pero no pertenecen al worker de producción. Si se necesitan, pueden vivir en un script separado fuera del paquete principal.
Mejora 11: settings.py con 70% de variables no usadas ✅ HECHO
Archivo: core/settings.py
Código actual:
# Variables que SÍ usa este servicio (~15):
APP_NAME = os.environ.get("APP_NAME", "boilerplate")
ENVIRONMENT = os.environ.get("ENVIRONMENT")
SENTRY_DSN = os.environ.get('SENTRY_DSN', '...')
PG_DATABASE = os.environ.get("PG_DATABASE")
PG_USER = os.environ.get("PG_USER")
PG_HOST = os.environ.get("PG_HOST")
PG_PASSWORD = os.environ.get("PG_PASSWORD")
PG_PORT = os.environ.get("PG_PORT", "5432")
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
REDIS_AUTH = os.environ.get("REDIS_AUTH")
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY")
AWS_S3_BUCKET_CROP_DETECTION = os.environ.get("AWS_S3_BUCKET_CROP_DETECTION")
USE_S3_ACCESS = os.getenv('USE_S3_ACCESS', 'False')
EARTHDATA_USERNAME = os.getenv('EARTHDATA_USERNAME')
EARTHDATA_PASSWORD = os.getenv('EARTHDATA_PASSWORD')
CELERY_BROKER = os.environ.get("CELERY_BROKER")
# Variables que NO usa este servicio (~45):
AUTH_JWT_KEY = os.environ.get("AUTH_JWT_KEY")
AUTH_DOMAIN = os.environ.get("AUTH_DOMAIN")
SMTP_HOST = os.environ.get("SMTP_HOST")
SMTP_PORT = os.environ.get("SMTP_PORT")
SMTP_USER = os.environ.get("SMTP_USER")
SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD")
EMAIL = os.environ.get("EMAIL")
NOTIFICATION_CENTER_URL = os.environ.get("NOTIFICATION_CENTER_URL")
VC_KEY = os.environ.get("VISUALCROSSING_KEY")
AGD_S3_ACCESS_KEY = os.environ.get("AGD_S3_ACCESS_KEY")
AGD_S3_SECRET_KEY = os.environ.get("AGD_S3_SECRET_KEY")
CFT_API_APP_KEY = os.environ.get("CFT_API_APP_KEY", "")
CFT_API_KEY = os.environ.get("CFT_API_KEY", "")
PASS_REPORTES_PERIODICOS = os.environ.get("PASS_REPORTES_PERIODICOS")
CLIENT_REPORTES_PERIODICOS = os.environ.get("CLIENT_REPORTES_PERIODICOS")
FIWARE_KEY_PRESCRIPTION = os.environ.get("FIWARE_KEY_PRESCRIPTION")
FIWARE_KEY_SAMPLING = os.environ.get("FIWARE_KEY_SAMPLING")
PM_ADMIN_USER = os.environ.get("PM_ADMIN_USER")
PM_ADMIN_PASS = os.environ.get("PM_ADMIN_PASS")
PM_BASE_URL = os.environ.get("PM_BASE_URL")
PEDIDOS_INSUMOS_SECRET_ID = os.environ.get("PEDIDOS_INSUMOS_SECRET_ID")
PEDIDOS_INSUMOS_CLIENT_ID = os.environ.get("PEDIDOS_INSUMOS_CLIENT_ID")
NIDERA_SF_STAGING = os.environ.get("NIDERA_SF_STAGING")
NIDERA_SF_PRODUCTION = os.environ.get("NIDERA_SF_PRODUCTION")
NIDERA_SF_URL_STAGING = os.environ.get("NIDERA_SF_URL_STAGING")
NIDERA_SF_URL_PRODUCTION = os.environ.get("NIDERA_SF_URL_PRODUCTION")
EE_CREDENTIALS = os.environ.get("EE_CREDENTIALS")
EE_PRIVATE_KEY = os.environ.get("EE_PRIVATE_KEY")
BCR_API_KEY = os.environ.get("BCR_API_KEY")
BCR_SECRET = os.environ.get("BCR_SECRET")
CLAVE_SIAR = os.environ.get("CLAVE_SIAR")
ALBOR_EXTENSION_CLIENT_ID = os.environ.get("ALBOR_EXTENSION_CLIENT_ID")
ALBOR_CLIENT_ID = os.environ.get("ALBOR_CLIENT_ID")
ALBOR_CLIENT_SECRET = os.environ.get("ALBOR_CLIENT_SECRET")
ALBOR_AUTH_ENDPOINT = os.environ.get("ALBOR_AUTH_ENDPOINT")
ALBOR_TOKEN_ENDPOINT = os.environ.get("ALBOR_TOKEN_ENDPOINT")
FINNEGANS_BASE_URL = os.environ.get("FINNEGANS_BASE_URL")
API_AGLAB = os.environ.get("API_AGLAB")
AGLAB_USER = os.environ.get("AGLAB_USER")
AGLAB_PASS = os.environ.get("AGLAB_PASS")
CALIPSO_USER = os.environ.get("CALIPSO_USER")
CALIPSO_PASS = os.environ.get("CALIPSO_PASS")
PETERSON_USER = os.environ.get("PETERSON_USER")
PETERSON_KEY = os.environ.get("PETERSON_KEY")
PETERSON_USER_DEV = os.environ.get("PETERSON_USER_DEV")
PETERSON_KEY_DEV = os.environ.get("PETERSON_KEY_DEV")
PETERSON_DOMAIN = os.environ.get("PETERSON_DOMAIN")
TOKEN_PUSH_NOTIFICATION = os.environ.get("TOKEN_PUSH_NOTIFICATION")
PERMISSIONS_API_URL = os.environ.get("PERMISSIONS_API_URL", "https://api.auravant.com")
# ... y más
Vulnerabilidades / errores:
- Superficie de ataque innecesaria: El
.envdel container necesita definir ~60 variables aunque solo se usen ~15. Cada variable extra es un secreto más que gestionar, rotar y proteger. - Confusión operativa: Un DevOps que configura el servicio no sabe cuáles son obligatorias y cuáles son basura heredada. Si falta
ALBOR_CLIENT_SECRET¿se rompe algo? No, pero no hay forma de saberlo sin leer todo el código. - Módulos muertos importados:
settings.pyse importa desdecore/__init__.py, que a su vez importalogging_config.py. Peroemails.py,permissions.py,aura_events.pyymodels.pytambién importansettingsy están disponibles aunque nunca se usen.
Mejora propuesta:
# core/settings.py — Solo lo que usa este servicio
import os
# App
APP_NAME = os.environ.get("APP_NAME", "crop-detection-worker")
ENVIRONMENT = os.environ.get("ENVIRONMENT", "development")
SENTRY_DSN = os.environ.get("SENTRY_DSN", "")
# Logging
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
LOG_PATH = os.environ.get("LOG_PATH")
SERVER_SOFTWARE = os.environ.get("SERVER_SOFTWARE", "")
# PostgreSQL (lectura)
DB_CONNECTION_POOL_MINCONN = int(os.environ.get("DB_CONNECTION_POOL_MINCONN", 1))
DB_CONNECTION_POOL_MAXCONN = int(os.environ.get("DB_CONNECTION_POOL_MAXCONN", 1))
PG_DATABASE = os.environ.get("PG_DATABASE")
PG_USER = os.environ.get("PG_USER")
PG_HOST = os.environ.get("PG_HOST")
PG_PASSWORD = os.environ.get("PG_PASSWORD")
PG_PORT = os.environ.get("PG_PORT", "5432")
# Redis
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT", "6379")
REDIS_AUTH = os.environ.get("REDIS_AUTH")
# AWS (resultados S3)
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY")
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")
AWS_S3_BUCKET_CROP_DETECTION = os.environ.get("AWS_S3_BUCKET_CROP_DETECTION")
# NASA Earthdata
EARTHDATA_USERNAME = os.environ.get("EARTHDATA_USERNAME")
EARTHDATA_PASSWORD = os.environ.get("EARTHDATA_PASSWORD")
USE_S3_ACCESS = os.environ.get("USE_S3_ACCESS", "false").lower() in ("true", "1", "yes")
# Celery
CELERY_BROKER = os.environ.get("CELERY_BROKER")
Y eliminar los módulos no usados de core/:
core/
├── __init__.py
├── settings.py
├── database_handler.py
├── boto_handler.py
├── redis_handler.py
├── logging_config.py
├── errors.py
├── constants.py
- ├── emails.py ← NO SE USA
- ├── permissions.py ← NO SE USA
- ├── aura_events.py ← NO SE USA
- └── models.py ← NO SE USA
Estimado:
| Métrica | Antes | Después |
|---|---|---|
| Variables en settings.py | ~60 | ~20 |
| Líneas en settings.py | ~120 | ~35 |
| Módulos en core/ | 11 | 7 |
| Variables de entorno requeridas en .env | ~60 | ~20 |
| Secretos a gestionar | ~45 (muchos innecesarios) | ~8 |
¿Por qué se propone?
El archivo es un copy-paste del boilerplate de Auravant que se usa para todos los microservicios. Este servicio solo necesita: DB, Redis, AWS, Earthdata y Celery. Todo lo demás (SMTP, Fiware, Peterson, Nidera, Albor, Finnegans, etc.) es ruido que complica la configuración, aumenta la superficie de ataque y confunde a cualquiera que intente entender qué necesita el servicio para funcionar.
Mejora 12: Rutas hardcodeadas en múltiples archivos ✅ HECHO
Archivos afectados:
features/services/inferencia_completa.pyfeatures/services/build_dataset.pyfeatures/services/clasificacion_suavizado_espacial.pyfeatures/services/seleccionar_ciclo_fenologico.py
Código actual:
# inferencia_completa.py
from build_dataset import EarthdataProcessor
processor = EarthdataProcessor(username=..., password=..., output_dir="geotiff_output") # hardcoded
# ...
directorio_raster = f"geotiff_output/lote_{uuid_detection}" # hardcoded
directorio_salida = f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps" # hardcoded
# ...
model_path = f"features/models/best_model_{num_steps}steps{model_suffix}.pth" # hardcoded
label_mapping_path = f"features/models/label_mapping_{num_steps}steps{model_suffix}.json" # hardcoded
npy_folder = f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps" # hardcoded (duplicado)
output_folder = f"resultados_clasificacion/{uuid_detection}" # hardcoded
# ...
resultado_path = f"resultados_clasificacion/{uuid_detection}/resultado_completo_{uuid_detection}.json" # hardcoded
geojson_path = f"temp_geojson_{uuid_detection}.geojson" # hardcoded
# seleccionar_ciclo_fenologico.py (CLI)
directorio_raster = f"{args.rasters_dir}/lote_{args.lote_id}" # parcialmente configurable
directorio_salida = f"{args.output}/lote_{args.lote_id}_{args.num_steps}steps" # parcialmente configurable
# clasificacion_suavizado_espacial.py (CLI)
model_path = f"features/models/best_model_{args.num_steps}steps{model_suffix}.pth" # hardcoded
label_mapping_path = f"features/models/label_mapping_{args.num_steps}steps{model_suffix}.json" # hardcoded
Vulnerabilidades / errores:
- Duplicación: La ruta
ciclos_fenologicos/lote_{uuid}_{steps}stepsse construye eninferencia_completa.py(2 veces) y enseleccionar_ciclo_fenologico.py. Si se cambia en uno y no en otro, el pipeline se rompe silenciosamente. - Rutas relativas al CWD: Todas las rutas son relativas al directorio de trabajo. Si Celery se ejecuta desde un directorio diferente (ej: cambio en Dockerfile), todo falla.
- Sin configuración externa: No se pueden cambiar las rutas sin modificar código. Imposible usar volúmenes diferentes en staging vs producción.
- Patrón de nombres inconsistente:
geotiff_outputvsgeotiffs(default enEarthdataProcessor.__init__),ciclos_fenologicosvsciclos, etc.
Mejora propuesta:
# features/services/paths.py — Fuente única de verdad para rutas
import os
from pathlib import Path
# Base configurable via env var, default a directorio del proyecto
BASE_DIR = Path(os.environ.get("CROP_DETECTION_DATA_DIR", "/tmp/crop_detection"))
MODELS_DIR = Path(__file__).parent.parent / "models"
def geotiff_dir(uuid: str) -> Path:
return BASE_DIR / "geotiffs" / f"lote_{uuid}"
def ciclos_dir(uuid: str, num_steps: int) -> Path:
return BASE_DIR / "ciclos" / f"lote_{uuid}_{num_steps}steps"
def resultados_dir(uuid: str) -> Path:
return BASE_DIR / "resultados" / uuid
def model_path(num_steps: int, fine_tuning: bool = True) -> Path:
suffix = "_ft" if fine_tuning else ""
return MODELS_DIR / f"best_model_{num_steps}steps{suffix}.pth"
def label_mapping_path(num_steps: int, fine_tuning: bool = True) -> Path:
suffix = "_ft" if fine_tuning else ""
return MODELS_DIR / f"label_mapping_{num_steps}steps{suffix}.json"
def temp_geojson_path(uuid: str) -> Path:
return BASE_DIR / "tmp" / f"{uuid}.geojson"
def ciclos_metadata_path(uuid: str, num_steps: int) -> Path:
return ciclos_dir(uuid, num_steps) / "ciclos_fenologicos_metadata.json"
# inferencia_completa.py — Usa paths centralizados
from features.services.paths import (
geotiff_dir, ciclos_dir, resultados_dir,
model_path, label_mapping_path, temp_geojson_path, ciclos_metadata_path
)
def ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
processor = EarthdataProcessor(
username=EARTHDATA_USERNAME,
password=EARTHDATA_PASSWORD,
output_dir=str(geotiff_dir(uuid_detection).parent) # "geotiffs/"
)
# ...
def ejecutar_clasificacion(uuid_detection, num_steps, fine_tuning, ...):
stats = run_classification_with_smoothing(
npy_folder=str(ciclos_dir(uuid_detection, num_steps)),
metadata_path=str(ciclos_metadata_path(uuid_detection, num_steps)),
model_path=str(model_path(num_steps, fine_tuning)),
label_mapping_path=str(label_mapping_path(num_steps, fine_tuning)),
output_folder=str(resultados_dir(uuid_detection)),
output_name=uuid_detection,
# ...
)
Estimado:
| Métrica | Antes | Después |
|---|---|---|
| Lugares donde se construyen rutas | ~12 (en 4 archivos) | 1 (paths.py) |
| Riesgo de inconsistencia | Alto | Ninguno |
| Configurable via env var | No | Sí (CROP_DETECTION_DATA_DIR) |
| Cambiar directorio base | Editar 4 archivos | 1 variable de entorno |
¿Por qué se propone?
Las rutas duplicadas son una fuente clásica de bugs silenciosos. Si alguien cambia el patrón de nombres en un archivo y no en otro, el paso 3 no encuentra los archivos del paso 2 y la tarea falla. Un módulo centralizado de rutas elimina la duplicación y permite configurar el directorio base via variable de entorno (útil para staging, testing, o usar volúmenes diferentes).
Mejora 13: Typo en nombre de archivo rasterizar_imagen_hsl.py ✅ HECHO
Archivo: features/services/rasterizar_imagen_hsl.py
Código actual:
features/services/rasterizar_imagen_hsl.py ← "hsl" es incorrecto
El archivo se llama hsl pero debería ser hls:
- HLS = Harmonized Landsat Sentinel (el producto satelital que usa el sistema)
- HSL = Hue, Saturation, Lightness (modelo de color, no tiene nada que ver)
Imports actuales:
# clasificacion_suavizado_espacial.py
from rasterizar_imagen_hsl import rasterizar_imagen_utm # typo propagado
Vulnerabilidades / errores:
- Confusión semántica: Un desarrollador nuevo puede pensar que el archivo hace conversión de colores HSL, no rasterización de datos HLS.
- Búsqueda fallida:
grep -r "HLS"no encuentra este archivo.grep -r "rasterizar"sí, pero la asociación con HLS se pierde. - Documentación inconsistente: El README habla de "HLS (Harmonized Landsat Sentinel)" pero el código dice "hsl".
Mejora propuesta:
# Renombrar archivo
mv features/services/rasterizar_imagen_hsl.py features/services/rasterizar_imagen_hls.py
# clasificacion_suavizado_espacial.py — Actualizar import
from features.services.rasterizar_imagen_hls import rasterizar_imagen_utm
Estimado:
| Métrica | Antes | Después |
|---|---|---|
| Nombre correcto | ❌ | ✅ |
| Archivos a modificar | 0 | 2 (renombrar + actualizar import) |
| Riesgo | Bajo (solo confusión) | Ninguno |
¿Por qué se propone?
Es un cambio trivial pero mejora la legibilidad y consistencia del código. El nombre correcto hace que el archivo sea autodocumentado: "rasteriza imágenes HLS" en vez de "rasteriza imágenes... ¿hsl? ¿qué es hsl?".
Mejora 14: No hay tests automatizados
Estado actual:
features/
├── test_manual/
│ └── main.py ← Script de ejecución manual, NO es un test
└── utils/
└── test_flujo_inferencia.py ← Probablemente otro script manual
No hay:
- Tests unitarios
- Tests de integración
- Mocks de servicios externos (NASA, S3, PostgreSQL)
- CI/CD que ejecute tests antes de deploy
Código actual (test_manual/main.py):
# Es un script de ejecución, no un test
if __name__ == "__main__":
result = startCropDetection(
id="1",
uuid_detection="test-uuid-001",
wkt="POLYGON ((-59.7920168 -35.718267, ...))",
model=24,
start_date="2026-01-10",
skip_download=True,
skip_ciclos=True
)
print(result) # ← No hay assertions, solo print
Vulnerabilidades / errores:
- Regresiones silenciosas: Sin tests, un cambio en
detect_planting()puede romper la detección de siembra y nadie se entera hasta que un cliente reporta resultados incorrectos. - Refactor peligroso: Cualquier refactor (como los propuestos en este documento) es riesgoso sin tests que validen que el comportamiento no cambió.
- Deploy a ciegas: El CI/CD hace build y deploy sin validar que el código funciona. Un typo en un import rompe producción.
- Dependencias externas no mockeadas: Para probar manualmente hay que tener credenciales de NASA, conexión a PostgreSQL, bucket S3, etc.
Mejora propuesta:
Crear estructura de tests con pytest:
tests/
├── __init__.py
├── conftest.py # Fixtures compartidos
├── unit/
│ ├── __init__.py
│ ├── test_detect_planting.py # Detección de siembra
│ ├── test_ndvi_calculation.py # Cálculo de NDVI
│ ├── test_fmask_filtering.py # Filtrado por calidad
│ ├── test_interpolation.py # Interpolación temporal
│ └── test_spatial_smoothing.py # Suavizado espacial
├── integration/
│ ├── __init__.py
│ ├── test_ciclo_fenologico.py # Pipeline de ciclos
│ └── test_clasificacion.py # Pipeline de clasificación
└── fixtures/
├── sample_ndvi_series.npy # Serie NDVI de ejemplo
├── sample_pixel_24steps.npy # Píxel procesado de ejemplo
└── sample_geotiff/ # GeoTIFF pequeño de prueba
# tests/conftest.py
import pytest
import numpy as np
@pytest.fixture
def sample_ndvi_with_planting():
"""Serie NDVI con patrón claro de siembra"""
# Simula: suelo bajo → crecimiento → pico → senescencia
return np.array([
0.15, 0.18, 0.20, 0.25, # Suelo/siembra
0.35, 0.45, 0.55, 0.65, # Crecimiento
0.75, 0.82, 0.85, 0.83, # Pico
0.78, 0.70, 0.60, 0.50, # Senescencia
0.40, 0.35, 0.30, 0.28,
0.25, 0.22, 0.20, 0.18
])
@pytest.fixture
def sample_ndvi_no_planting():
"""Serie NDVI sin patrón de siembra (pastura estable)"""
return np.array([0.45 + np.random.normal(0, 0.05) for _ in range(24)])
@pytest.fixture
def sample_pixel_array():
"""Array de píxel (24, 7) con datos realistas"""
return np.load("tests/fixtures/sample_pixel_24steps.npy")
# tests/unit/test_detect_planting.py
import pytest
import numpy as np
from features.services.seleccionar_ciclo_fenologico import detect_planting
class TestDetectPlanting:
def test_detects_clear_planting_pattern(self, sample_ndvi_with_planting):
"""Debe detectar siembra cuando hay patrón claro"""
result = detect_planting(sample_ndvi_with_planting)
assert result is not None
assert 0 <= result <= 5 # Siembra en los primeros timesteps
def test_returns_none_for_stable_vegetation(self, sample_ndvi_no_planting):
"""Debe retornar None para vegetación estable (pastura)"""
result = detect_planting(sample_ndvi_no_planting)
assert result is None
def test_returns_none_for_short_series(self):
"""Debe retornar None si la serie es muy corta"""
short_series = np.array([0.2, 0.3, 0.4])
result = detect_planting(short_series)
assert result is None
def test_respects_min_peak_height(self, sample_ndvi_with_planting):
"""No debe detectar si el pico no alcanza min_peak_height"""
low_peak = sample_ndvi_with_planting * 0.5 # Máximo ~0.42
result = detect_planting(low_peak, min_peak_height=0.7)
assert result is None
def test_respects_max_planting_ndvi(self):
"""No debe detectar si el NDVI inicial es muy alto"""
high_start = np.array([0.5, 0.55, 0.6, 0.7, 0.8, 0.85, 0.8, 0.7] + [0.5]*16)
result = detect_planting(high_start, max_planting_ndvi=0.35)
assert result is None
# tests/unit/test_spatial_smoothing.py
import pytest
import numpy as np
from features.services.clasificacion_suavizado_espacial import spatial_smoothing
class TestSpatialSmoothing:
def test_smoothing_reduces_noise(self):
"""El suavizado debe reducir ruido sal y pimienta"""
# Crear cubo con ruido
prob_cube = np.zeros((10, 10, 6))
prob_cube[:, :, 0] = 0.9 # Casi todo clase 0
prob_cube[5, 5, 0] = 0.1 # Un píxel ruidoso
prob_cube[5, 5, 1] = 0.9
pixel_mask = np.ones((10, 10), dtype=bool)
smoothed = spatial_smoothing(prob_cube, pixel_mask, kernel_size=3)
# El píxel ruidoso debe haberse suavizado
assert smoothed[5, 5, 0] > 0.5 # Ahora más cercano a vecinos
assert smoothed[5, 5, 1] < 0.5
def test_preserves_homogeneous_regions(self):
"""No debe cambiar regiones homogéneas"""
prob_cube = np.zeros((10, 10, 6))
prob_cube[:, :, 2] = 1.0 # Todo clase 2
pixel_mask = np.ones((10, 10), dtype=bool)
smoothed = spatial_smoothing(prob_cube, pixel_mask, kernel_size=3)
np.testing.assert_array_almost_equal(smoothed[:, :, 2], 1.0)
Y actualizar .gitlab-ci.yml:
stages:
- test # ← NUEVO
- security
- build
- deploy
test:
stage: test
image: python:3.11
script:
- pip install -r requirements.txt
- pip install pytest pytest-cov
- pytest tests/ -v --cov=features --cov-report=term-missing
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_REF_NAME == $CI_DEFAULT_BRANCH
Estimado:
| Métrica | Antes | Después |
|---|---|---|
| Tests automatizados | 0 | ~20-30 |
| Cobertura de código | 0% | ~60-70% (funciones críticas) |
| Tiempo de CI | ~5 min (build+deploy) | ~7 min (+2 min tests) |
| Confianza en refactors | Baja | Alta |
| Regresiones detectadas pre-deploy | 0 | Mayoría |
¿Por qué se propone?
Sin tests, cada deploy es un acto de fe. Los tests unitarios para funciones puras como detect_planting(), calculate_ndvi(), spatial_smoothing() son fáciles de escribir y dan alta confianza. No se necesita mockear NASA ni PostgreSQL para probar que la detección de siembra funciona correctamente con una serie NDVI sintética.
Mejora 15: EarthdataProcessor es un God Object
Archivo: features/services/build_dataset.py
Código actual:
La clase EarthdataProcessor tiene ~600 líneas y maneja 7 responsabilidades diferentes:
class EarthdataProcessor:
# 1. Autenticación con NASA Earthdata
def authenticate_if_needed(self): ...
def get_aws_credentials(self): ...
def configure_gdal_s3(self): ...
# 2. Búsqueda en catálogo CMR
def search_hls_granules(self, bbox, start_date, end_date, max_cloud_cover): ...
# 3. Selección de tiles
def _load_tile_centroids(self): ...
def filter_granules_by_closest_tile(self, granules, geometry): ...
# 4. Descarga y extracción de datos
def extract_pixel_data(self, url, geometry, use_s3): ...
def _fmask_quality_ok(self, fmask_url, geometry, use_s3): ...
# 5. Cálculo de coordenadas
def get_all_pixel_coordinates(self, pixel_data, meta): ...
# 6. Guardado de archivos
def save_granule_geotiff(self, granule_result, lote_id): ...
def save_lote_data(self, lote_result): ... # código muerto
# 7. Orquestación del pipeline
def process_lote_earthdata(self, lote_data, num_steps, save_netcdf): ...
def process_all_lotes(self, json_path, max_lotes): ... # código muerto
# 8. Visualización (código muerto)
def show_granule_images(self, granule_data, lote_id, geometry): ...
# 9. Creación de datasets (código muerto)
def create_xarray_dataset(self, lote_result): ...
Vulnerabilidades / errores:
- Violación de Single Responsibility: Una clase que hace autenticación, búsqueda, descarga, guardado y orquestación es imposible de testear unitariamente.
- Estado mutable compartido:
self.aws_credentials,self.access_token,self.credentials_expiryson estado mutable que se modifica en múltiples métodos. Difícil razonar sobre el estado en un momento dado. - Acoplamiento alto: Para testear
filter_granules_by_closest_tile()hay que instanciar toda la clase con credenciales de NASA. - Difícil de extender: Agregar un nuevo proveedor de datos (ej: Sentinel Hub) requiere modificar esta clase gigante.
Mejora propuesta:
Separar en clases con responsabilidad única:
# features/services/earthdata/auth.py
class EarthdataAuth:
"""Maneja autenticación con NASA Earthdata"""
def __init__(self, username: str, password: str, redis_client=None):
self.username = username
self.password = password
self.token_manager = EarthdataTokenManager(username, password)
self._access_token = None
self._aws_credentials = None
self._credentials_expiry = None
@property
def access_token(self) -> str:
if not self._access_token:
self._access_token = self.token_manager.get_valid_token()
return self._access_token
@property
def aws_credentials(self) -> dict:
if not self._aws_credentials or self._credentials_expired():
self._aws_credentials = self._fetch_aws_credentials()
return self._aws_credentials
def _credentials_expired(self) -> bool:
if not self._credentials_expiry:
return True
return datetime.now() > self._credentials_expiry
def _fetch_aws_credentials(self) -> dict:
response = requests.get(
"https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
headers={"Authorization": f"Bearer {self.access_token}"}
)
response.raise_for_status()
creds = response.json()
self._credentials_expiry = self._parse_expiry(creds.get("expiration"))
return creds
# features/services/earthdata/catalog.py
class HLSCatalog:
"""Busca granulos HLS en el catálogo CMR de NASA"""
CMR_URL = "https://cmr.earthdata.nasa.gov/search/granules.json"
COLLECTION_IDS = ["C2021957657-LPCLOUD", "C2021957295-LPCLOUD"]
def search(self, bbox: tuple, start_date: str, end_date: str,
max_cloud_cover: int = 30) -> list:
"""Busca granulos HLS con paginación"""
all_granules = []
page_num = 1
while True:
params = {
"collection_concept_id": self.COLLECTION_IDS,
"temporal": f"{start_date}T00:00:00Z,{end_date}T23:59:59Z",
"bounding_box": ",".join(map(str, bbox)),
"cloud_cover": f"0,{max_cloud_cover}",
"page_size": 2000,
"page_num": page_num,
}
response = requests.get(self.CMR_URL, params=params)
response.raise_for_status()
granules = response.json().get("feed", {}).get("entry", [])
if not granules:
break
all_granules.extend(granules)
if len(granules) < 2000:
break
page_num += 1
return all_granules
# features/services/earthdata/tile_selector.py
class TileSelector:
"""Selecciona el tile más cercano al centroide del lote"""
def __init__(self, centroids_path: str = None):
self.centroids = self._load_centroids(centroids_path)
def filter_by_closest(self, granules: list, geometry) -> list:
"""Filtra granulos para usar solo el tile más cercano"""
if not granules:
return granules
centroid = geometry.centroid
unique_tiles = self._extract_tiles(granules)
closest = self._find_closest(unique_tiles, centroid.x, centroid.y)
return [g for g in granules if f".T{closest}." in g.get("title", "")]
# features/services/earthdata/downloader.py
class HLSDownloader:
"""Descarga y recorta bandas HLS"""
BAND_MAPPING_LANDSAT = {
"B01": "Aerosol", "B02": "Blue", "B03": "Green", "B04": "Red",
"B05": "NIR", "B06": "SWIR1", "B07": "SWIR2", "Fmask": "Fmask"
}
BAND_MAPPING_SENTINEL = {
"B01": "Aerosol", "B02": "Blue", "B03": "Green", "B04": "Red",
"B8A": "NIR", "B11": "SWIR1", "B12": "SWIR2", "Fmask": "Fmask"
}
def __init__(self, auth: EarthdataAuth):
self.auth = auth
def download_granule(self, granule: dict, geometry, output_dir: Path) -> dict:
"""Descarga todas las bandas de un granulo y las guarda como GeoTIFF"""
# ... implementación ...
def _extract_band(self, url: str, geometry) -> tuple:
"""Extrae y recorta una banda"""
# ... implementación usando self.auth.aws_credentials ...
# features/services/earthdata/processor.py
class EarthdataProcessor:
"""Orquesta el pipeline de descarga de datos HLS"""
MIN_GRANULES = {8: 6, 15: 14, 24: 22}
def __init__(self, username: str, password: str, output_dir: str = "geotiffs"):
self.auth = EarthdataAuth(username, password)
self.catalog = HLSCatalog()
self.tile_selector = TileSelector()
self.downloader = HLSDownloader(self.auth)
self.output_dir = Path(output_dir)
def process_lote(self, lote_data: dict, num_steps: int = 24) -> dict:
"""Procesa un lote: busca, filtra, descarga y guarda"""
geometry = shape(json.loads(lote_data["geojson"]))
bbox = geometry.bounds
# 1. Buscar granulos
granules = self.catalog.search(
bbox=bbox,
start_date=lote_data["fecha_siembra"],
end_date=lote_data["fecha_cosecha"],
max_cloud_cover=20
)
# 2. Filtrar por tile más cercano
granules = self.tile_selector.filter_by_closest(granules, geometry)
# 3. Validar cantidad mínima
min_required = self.MIN_GRANULES.get(num_steps, 6)
if len(granules) < min_required:
raise ValueError(f"Insuficientes granulos: {len(granules)} < {min_required}")
# 4. Descargar cada granulo
output_dir = self.output_dir / f"lote_{lote_data['id_lote']}"
for granule in granules:
self.downloader.download_granule(granule, geometry, output_dir)
return {"processed_granules": len(granules)}
Estimado:
| Métrica | Antes | Después |
|---|---|---|
| Líneas en build_dataset.py | ~600 | ~100 (solo orquestador) |
| Archivos | 1 | 5 (auth, catalog, tile_selector, downloader, processor) |
| Responsabilidades por clase | 7+ | 1 |
| Testeable unitariamente | ❌ | ✅ |
| Acoplamiento | Alto | Bajo |
¿Por qué se propone?
El principio de Single Responsibility dice que una clase debe tener una sola razón para cambiar. EarthdataProcessor tiene al menos 7 razones: cambios en autenticación, en el catálogo CMR, en la selección de tiles, en el formato de descarga, en el guardado, etc. Separar en clases pequeñas permite:
- Testear unitariamente:
TileSelectorse puede testear sin credenciales de NASA. - Reusar componentes:
HLSCatalogse puede usar en otros proyectos. - Extender fácilmente: Agregar
SentinelHubDownloaderno requiere tocar el código existente. - Razonar sobre el código: Cada archivo tiene ~100 líneas y hace una cosa.
Mejora 16: Manejo de errores — try/except excesivos que ocultan errores a Sentry ✅ HECHO
Tipo: Observabilidad / Estabilidad
Severidad: 🔴 Crítica
Archivos afectados:
features/services/build_dataset.py(18 try/except, 16 conexcept Exception)features/services/inferencia_completa.py(5 try/except)features/services/earthdata_token_manager.py(6 try/except)features/services/seleccionar_ciclo_fenologico.py(4 try/except, 2 bareexcept:)features/services/clasificacion_suavizado_espacial.py(1 bareexcept: pass)features/services/rasterizar_imagen_hsl.py(1 try/except)features/interface/main.py(0 try/except — pero tampoco tiene protección)
Total: ~35 bloques try/except en el flujo principal, la mayoría innecesarios.
Problema
Sentry está configurado en run.py para capturar excepciones automáticamente. Pero los try/except distribuidos por todo el código atrapan las excepciones antes de que lleguen a Sentry, las loguean como texto genérico, y devuelven None/False. Resultado:
- Sentry no recibe los errores → no hay alertas, no hay stack traces, no hay contexto
- Los errores se propagan como
None→ el código que llama no sabe qué falló - Mensajes genéricos →
"Error ejecutando build_dataset: <excepción>"no dice en qué paso, con qué datos, ni por qué - Bare
except:atrapa TODO → incluyendoKeyboardInterrupt,SystemExit,MemoryError
Ejemplo 1: Cadena de try/except que silencia errores
ANTES — inferencia_completa.py:
Cada función envuelve todo en try/except y devuelve False/None. El error original se pierde:
# inferencia_completa.py — ANTES
def ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
try:
# ... 20 líneas de lógica ...
result = processor.process_lote_earthdata(lote_input, num_steps=num_steps)
if result:
return True
else:
logger.error("Error procesando lote en build_dataset") # ← ¿qué error?
return False
except Exception as e:
logger.error(f"Error ejecutando build_dataset: {e}") # ← Sentry nunca ve esto
return False
def ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, num_steps):
try:
# ... lógica ...
return True
except Exception as e:
logger.error(f"Error ejecutando seleccionar_ciclo_fenologico.py: {e}") # ← Sentry nunca ve esto
return False
def ejecutar_clasificacion(uuid_detection, num_steps, fine_tuning, ...):
try:
# ... lógica ...
return stats
except Exception as e:
logger.error(f"Error ejecutando clasificacion_suavizado_espacial: {e}") # ← Sentry nunca ve esto
return None
def inferencia_completa(uuid_detection, poligono_wkt, ...):
try:
if not ejecutar_build_dataset(...): # ← ¿falló por timeout? ¿por auth? ¿por disco lleno?
logger.error("Fallo en descarga de imágenes") # ← mensaje inútil
return None
# ... más pasos ...
except Exception as e:
logger.error(f"Error en flujo de inferencia: {e}") # ← 4to nivel de catch, error ya mutilado
return None
El error viaja así: excepción original → except en process_lote_earthdata → devuelve None → ejecutar_build_dataset ve None → devuelve False → inferencia_completa ve False → devuelve None → main.py ve None → escribe "Inference failed" en DB. Sentry no recibió nada.
DESPUÉS — Sin try/except innecesarios:
# inferencia_completa.py — DESPUÉS
def ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
from shapely import wkt
from shapely.geometry import mapping
geometry_wkt = wkt.loads(poligono_wkt)
geometry_geojson = mapping(geometry_wkt)
lote_input = {
'id_lote': uuid_detection,
'fecha_siembra': fecha_ini,
'fecha_cosecha': fecha_fin,
'cultivo': 'unknown',
'geojson': json.dumps(geometry_geojson)
}
if not EARTHDATA_USERNAME or not EARTHDATA_PASSWORD:
raise EnvironmentError("Faltan variables EARTHDATA_USERNAME / EARTHDATA_PASSWORD")
from build_dataset import EarthdataProcessor
processor = EarthdataProcessor(username=EARTHDATA_USERNAME, password=EARTHDATA_PASSWORD, output_dir="geotiff_output")
result = processor.process_lote_earthdata(lote_input, num_steps=num_steps)
if not result:
raise RuntimeError(f"build_dataset no produjo resultado para lote {uuid_detection}")
# Si falla → la excepción sube directo a Sentry con stack trace completo
def ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, num_steps):
from shapely import wkt
from shapely.geometry import mapping
geometry_wkt = wkt.loads(poligono_wkt)
geometry_geojson = mapping(geometry_wkt)
geojson_temp = {"type": "Feature", "properties": {}, "geometry": geometry_geojson}
geojson_path = f"temp_geojson_{uuid_detection}.geojson"
with open(geojson_path, 'w') as f:
json.dump(geojson_temp, f)
try:
from seleccionar_ciclo_fenologico import procesar_pixeles_desde_geotiff
procesar_pixeles_desde_geotiff(
f"geotiff_output/lote_{uuid_detection}",
geojson_path,
f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps",
num_steps
)
finally:
# try/finally JUSTIFICADO: limpiar archivo temporal pase lo que pase
if os.path.exists(geojson_path):
os.remove(geojson_path)
def ejecutar_clasificacion(uuid_detection, num_steps, fine_tuning, **kwargs):
model_suffix = '_ft' if fine_tuning else ''
# ... construir rutas ...
from clasificacion_suavizado_espacial import run_classification_with_smoothing
stats = run_classification_with_smoothing(...)
if stats is None:
raise RuntimeError(f"Clasificación no produjo resultados para {uuid_detection}")
return stats
def inferencia_completa(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, **kwargs):
# SIN try/except — las excepciones suben a Sentry con contexto completo
if not kwargs.get('skip_download'):
ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, kwargs.get('num_steps', 24))
if not kwargs.get('skip_ciclos'):
ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, kwargs.get('num_steps', 24))
stats = ejecutar_clasificacion(uuid_detection, kwargs.get('num_steps', 24), kwargs.get('fine_tuning', False))
resultado = {"uuid_detection": uuid_detection, "resultados": stats, ...}
return resultado
Ejemplo 2: Bare except: que atrapa todo
ANTES — clasificacion_suavizado_espacial.py línea 333-337:
# clasificacion_suavizado_espacial.py — ANTES
try:
from utils_validacion import validar_modelo_existe
validar_modelo_existe(args.num_steps)
except: # ← atrapa ImportError, ValueError, MemoryError, KeyboardInterrupt, TODO
pass # ← silencio total
DESPUÉS:
# clasificacion_suavizado_espacial.py — DESPUÉS
# Si utils_validacion no está disponible, validar inline
try:
from utils_validacion import validar_modelo_existe
validar_modelo_existe(args.num_steps)
except ImportError:
# Fallback: validación manual si el módulo no está disponible
model_suffix = '_ft' if args.fine_tuning else ''
model_path = Path(f"features/models/best_model_{args.num_steps}steps{model_suffix}.pth")
if not model_path.exists():
raise FileNotFoundError(f"Modelo no encontrado: {model_path}")
Ejemplo 3: build_dataset.py — 18 try/except anidados
ANTES — build_dataset.py:
# build_dataset.py — ANTES (patrón repetido 18 veces)
def process_lote_earthdata(self, lote_data, num_steps=24):
try:
# ... 150 líneas ...
for i, granule in enumerate(granules):
try:
# ... 80 líneas ...
for link in http_links:
try:
# ... 40 líneas ...
pixel_data, meta = self.extract_pixel_data(http_url, geometry, use_s3=USE_S3_ACCESS)
except Exception as e:
logger.error(f"Error procesando banda {filename}: {e}") # ← Sentry no ve
continue
except Exception as e:
logger.error(f"Error procesando granulo {i}: {e}") # ← Sentry no ve
continue
except Exception as e:
logger.error(f"Error procesando lote {id_lote}: {e}") # ← Sentry no ve
return None
DESPUÉS — Solo try/except donde se espera fallo y se necesita continuar:
# build_dataset.py — DESPUÉS
def process_lote_earthdata(self, lote_data, num_steps=24):
id_lote = lote_data['id_lote']
geom_dict = json.loads(lote_data['geojson'])
geometry = shape(geom_dict)
# ... setup sin try/except, si falla es un bug real ...
granules = self.search_hls_granules(bbox, start_str, end_str, max_cloud_cover=20)
granules = self.filter_granules_by_closest_tile(granules, geometry)
if not granules:
raise ValueError(f"No se encontraron granulos HLS para lote {id_lote} en {start_str}/{end_str}")
processed_data = []
for i, granule in enumerate(granules):
granule_id = granule.get('title', f'granule_{i}')
# try/except JUSTIFICADO: un granulo malo no debe abortar todo el lote
try:
band_data = self._process_single_granule(granule, geometry)
except Exception as e:
logger.warning(f"Granulo {granule_id} falló, continuando: {type(e).__name__}: {e}")
continue
if band_data:
processed_data.append(band_data)
min_required = self.MIN_GRANULES.get(num_steps, 6)
if len(processed_data) < min_required:
raise ValueError(
f"Imágenes insuficientes para lote {id_lote}: "
f"{len(processed_data)}/{min_required} requeridas"
)
# ... resto sin try/except ...
Ejemplo 4: main.py — Sin protección, errores de DB no controlados
ANTES — features/interface/main.py:
# main.py — ANTES
def startCropDetection(id, uuid_detection, wkt, model, start_date):
# ... validaciones ...
update_db(uuid_detection=uuid_detection, status="processing")
result = inferencia_completa(...) # ← si esto lanza excepción, el status queda en "processing" para siempre
if result is None:
return failed_execution(uuid_detection, "Inference failed") # ← mensaje genérico
update_db(uuid_detection=uuid_detection, status="completed", results=result)
insert_detection_crops(...) # ← si esto falla, status es "completed" pero sin crops
return {"code": "ok", ...}
DESPUÉS — Un solo try/except en el entry point, con error descriptivo:
# main.py — DESPUÉS
def startCropDetection(id="", uuid_detection="", wkt="", model=24, start_date="", fine_tuning=True):
# ... validaciones (sin try/except, si fallan es un bug del caller) ...
update_db(uuid_detection=uuid_detection, status="processing")
try:
result = inferencia_completa(
uuid_detection=uuid_detection,
poligono_wkt=wkt,
fecha_ini=start_date,
fecha_fin=fecha_fin,
num_steps=model,
fine_tuning=fine_tuning
)
except Exception as e:
# try/except JUSTIFICADO: necesitamos actualizar DB a "error" antes de re-lanzar
error_msg = f"{type(e).__name__}: {e}"
failed_execution(uuid_detection, error_msg)
raise # ← re-lanza para que Sentry capture el stack trace completo
update_db(uuid_detection=uuid_detection, status="completed", results=result)
insert_detection_crops(uuid_detection=uuid_detection, resultados=result["resultados"])
return {"code": "ok", "ended_at": int(datetime.now().timestamp()), "data": result}
Ejemplo 5: rasterizar_imagen_hsl.py — Error silenciado en paso crítico
ANTES:
# rasterizar_imagen_hsl.py — ANTES
def rasterizar_imagen_utm(df, nombre_clasificacion):
try:
# ... 60 líneas de rasterización + upload a S3 ...
s3_eu.put_object(Bucket=s3_bucket, Key=s3_key, Body=memfile.read())
logger.info(f"Raster guardado en s3://...")
except Exception as e:
logger.error(f"Error al intentar rasterizar la imagen: {e}") # ← el tiff no se subió a S3 pero nadie se entera
DESPUÉS:
# rasterizar_imagen_hsl.py — DESPUÉS
def rasterizar_imagen_utm(df, nombre_clasificacion):
# Sin try/except — si falla la rasterización o el upload a S3,
# la excepción sube a main.py → Sentry la captura con stack trace
df = df.sort_values(by=['lat', 'lon'], ascending=[False, True]).reset_index(drop=True)
# ... toda la lógica sin envolver ...
s3_eu.put_object(Bucket=s3_bucket, Key=s3_key, Body=memfile.read())
logger.info(f"Raster guardado en s3://{s3_bucket}/{s3_key}")
Regla: Cuándo SÍ usar try/except
| Situación | ¿try/except? | Ejemplo |
|---|---|---|
| Un item de una lista falla pero el resto debe continuar | ✅ Sí | Granulo individual en loop de build_dataset.py |
| Limpiar recurso temporal pase lo que pase | ✅ try/finally | Borrar temp_geojson_*.geojson |
| Necesitar actualizar DB a "error" antes de propagar | ✅ Sí + raise | main.py entry point |
| Fallback cuando un módulo opcional no existe | ✅ except ImportError | utils_validacion no disponible |
| Parsear fecha con múltiples formatos posibles | ✅ except ValueError | Parseo de expiration_str en build_dataset.py |
| Función completa que "puede fallar" | ❌ No | search_hls_granules, extract_pixel_data, etc. |
| Guardar archivo / subir a S3 | ❌ No | rasterizar_imagen_utm, save_granule_geotiff |
| Cualquier lógica de negocio normal | ❌ No | Procesamiento de píxeles, clasificación, etc. |
Excepciones personalizadas (opcional pero recomendado)
# features/exceptions.py
class CropDetectionError(Exception):
"""Base para errores del sistema de detección"""
pass
class EarthdataAuthError(CropDetectionError):
"""Fallo de autenticación con NASA Earthdata"""
pass
class InsufficientDataError(CropDetectionError):
"""No hay suficientes imágenes de calidad para procesar"""
pass
class ClassificationError(CropDetectionError):
"""Error durante la clasificación o post-procesamiento"""
pass
Uso:
# build_dataset.py
if not self.access_token:
raise EarthdataAuthError(f"No se pudo obtener token para usuario {self.username}")
if len(processed_data) < min_required:
raise InsufficientDataError(
f"Lote {id_lote}: {len(processed_data)}/{min_required} imágenes válidas "
f"(período {start_str} a {end_str}, cloud_cover<=20%)"
)
En Sentry se ve: InsufficientDataError: Lote abc123: 3/22 imágenes válidas (período 2024-01-15 a 2024-07-14, cloud_cover<=20%) — inmediatamente sabés qué pasó.
Inventario de try/except a eliminar vs mantener
| Archivo | try/except actuales | Eliminar | Mantener (justificado) |
|---|---|---|---|
build_dataset.py | 18 | 14 | 4 (loop granulos, loop bandas, parseo fecha, fallback credenciales) |
inferencia_completa.py | 5 | 4 | 1 (try/finally para cleanup temp file) |
earthdata_token_manager.py | 6 | 2 | 4 (requests HTTP que pueden fallar, Redis que puede no estar) |
seleccionar_ciclo_fenologico.py | 4 | 3 | 1 (píxel individual en loop) |
clasificacion_suavizado_espacial.py | 1 | 1 | 0 |
rasterizar_imagen_hsl.py | 1 | 1 | 0 |
main.py | 0 | 0 | +1 (agregar try/except + raise para update DB) |
| Total | 35 | 25 | ~10 |
Estimación
- Impacto en observabilidad: De ~0% de errores visibles en Sentry a ~100%
- Impacto en debugging: De "Inference failed" a stack traces completos con contexto
- Riesgo: Bajo — los errores que antes se silenciaban ahora se propagan, pero Sentry los captura y
main.pyactualiza la DB a "error" antes de re-lanzar