Saltar al contenido principal

REFACTOR

Ver en Git


Plan de Refactor — Detección de Cultivos

Cada mejora incluye: código actual, mediciones reales de performance, vulnerabilidades, código propuesto y justificación.

Escenario base (benchmark real, 2026-03-17): Lote de 500 ha → 23,783 píxeles totales, 2,414 exitosos, 25 GeoTIFFs, modelo de 24 steps. Tiempo total medido: 803 seg (13.39 min), de los cuales 797.85 seg (99.3%) en ciclos fenológicos y 5.58 seg en clasificación. Instancia ECS: 2 vCPU, 8GB RAM ($0.07/hora).


Mejora 1: Apertura excesiva de archivos GeoTIFF ✅ HECHO

Archivo: features/services/seleccionar_ciclo_fenologico.py — función procesar_pixeles_desde_geotiff()

Código actual:

for i, (fila, col) in enumerate(zip(filas, columnas)):  # 23,783 píxeles
serie_temporal = []
for ruta_raster in archivos_raster: # 25 imágenes
with rasterio.open(ruta_raster) as src: # ABRE Y CIERRA cada vez
datos = src.read(window=Window(col, fila, 1, 1)).squeeze()
serie_temporal.append(datos)
array_bruto = np.array(serie_temporal)

Medición real (benchmark 2026-03-17):

MétricaValor medido
Píxeles totales en polígono23,783
GeoTIFFs procesados25
Llamadas a rasterio.open()23,783 × 25 = 594,575
Tiempo total medido (disco local, Docker)797.85 seg (13.30 min)
Tiempo por open (calculado)~1.34 ms/open
Píxeles exitosos2,414 (10.1% del total)
% del tiempo total de la tarea99.3%
Costo ECS por tarea (solo esta etapa)~$0.016

Vulnerabilidades / errores:

  • File descriptor exhaustion: 594,575 open/close puede agotar file descriptors del OS si hay tareas concurrentes. Límite default de Linux: 1024 soft.
  • Race condition: Si otro proceso borra o modifica un GeoTIFF mientras se itera, el rasterio.open() falla sin retry.
  • Sin timeout: Si un archivo está en NFS y el mount se cuelga, el worker queda bloqueado indefinidamente.

Mejora propuesta:

def procesar_pixeles_desde_geotiff(directorio_raster, ruta_geojson, directorio_salida, num_steps=24, debug=False):
os.makedirs(directorio_salida, exist_ok=True)

archivos_raster = sorted(glob.glob(os.path.join(directorio_raster, '*.tif')))
if not archivos_raster:
raise FileNotFoundError(f"No se encontraron archivos .tif en '{directorio_raster}'")

# Cargar geometría y máscara (igual que antes)
with rasterio.open(archivos_raster[0]) as src:
crs_raster = src.crs
transform = src.transform

gdf = gpd.read_file(ruta_geojson).to_crs(crs_raster)
poligono = gdf.geometry.iloc[0]

with rasterio.open(archivos_raster[0]) as src:
mask = rasterize([poligono], out_shape=(src.height, src.width), transform=transform) > 0

filas, columnas = np.where(mask)
total_pixeles = len(filas)

# ✅ MEJORA: Leer cada imagen COMPLETA una sola vez, indexar en memoria
logger.info(f"Cargando {len(archivos_raster)} imágenes en memoria...")
imagenes = []
for ruta_raster in archivos_raster:
with rasterio.open(ruta_raster) as src:
imagenes.append(src.read()) # shape: (8, H, W) — una sola lectura

logger.info(f"Procesando {total_pixeles} píxeles...")
metadata_resultados = []
pixeles_exitosos = 0
transformador = pyproj.Transformer.from_crs(crs_raster, "epsg:4326", always_xy=True)

for i, (fila, col) in enumerate(zip(filas, columnas)):
if i % 500 == 0:
logger.info(f"Procesando píxel {i+1}/{total_pixeles}")

try:
# ✅ Indexación NumPy directa — nanosegundos por píxel
array_bruto = np.array([img[:, fila, col] for img in imagenes])

array_ciclo, status = procesar_pixel_ciclo_fenologico(array_bruto, num_steps)

# ... (resto del procesamiento igual)

Estimado post-refactor:

MétricaValor
Llamadas a rasterio.open()25 (una por imagen)
Tiempo de carga en memoria~2-5 segundos (25 imágenes)
Tiempo por píxel (indexación NumPy)~500 nanosegundos
Tiempo total estimado~3-6 segundos
RAM adicional~50-150 MB (25 imágenes en memoria)
Speedup vs medido~133x a 266x (de 798s a 3-6s)

¿Por qué se propone?

El benchmark confirmó que esta etapa consume el 99.3% del tiempo total (797.85 de 803.43 seg). Cada rasterio.open() implica: syscall open(), lectura de headers TIFF (IFD), parsing de metadata CRS/transform, creación de objeto Python, y luego close(). Multiplicado por 594,575 veces es absurdo cuando las 25 imágenes recortadas al polígono caben cómodamente en RAM. La mejora lee cada imagen una sola vez y luego indexa en memoria con NumPy, que es una operación de nanosegundos.


Mejora 2: Inferencia píxel por píxel en vez de batch ✅ HECHO

Archivo: features/services/clasificacion_suavizado_espacial.py — función build_probability_cube()

Código actual:

def build_probability_cube(model, pixel_data, num_classes):
# ... inicialización del cubo ...

with torch.no_grad():
for pixel in pixel_data: # 2,414 iteraciones
temporal_data = np.load(pixel['path']) # 1 syscall por píxel
temporal_tensor = torch.from_numpy(temporal_data).float().unsqueeze(0) # batch=1

output = model(temporal_tensor) # Forward pass con 1 sola muestra
probs = torch.softmax(output, dim=1).squeeze(0).numpy()

fila_idx = pixel['fila'] - min_fila
col_idx = pixel['columna'] - min_col
prob_cube[fila_idx, col_idx, :] = probs
pixel_mask[fila_idx, col_idx] = True
pixel_map[(fila_idx, col_idx)] = pixel

Medición real (benchmark 2026-03-17):

MétricaValor medido
Píxeles clasificados2,414
Forward passes2,414 (uno por píxel)
np.load() syscalls2,414
Tiempo total clasificación5.58 seg
Tiempo por píxel (calculado)~2.31 ms/píxel
Píxeles clase_99 (dudosos)744 (31% del total)
% del tiempo total de la tarea0.7%

Vulnerabilidades / errores:

  • Archivo .npy corrupto rompe todo: Si un solo archivo .npy está corrupto o tiene shape incorrecto, el loop entero falla sin procesar los píxeles restantes.
  • Sin validación de shape: No se verifica que temporal_data.shape == (num_steps, 7) antes del forward pass. Un array con shape incorrecto causa un error críptico de PyTorch.
  • Memoria fragmentada: 2,414 llamadas a np.load() + torch.from_numpy() crean y destruyen miles de tensores pequeños, fragmentando memoria.

Mejora propuesta:

def build_probability_cube(model, pixel_data, num_classes, num_steps=24, batch_size=512):
filas = [p['fila'] for p in pixel_data]
columnas = [p['columna'] for p in pixel_data]
min_fila, max_fila = min(filas), max(filas)
min_col, max_col = min(columnas), max(columnas)

alto = max_fila - min_fila + 1
ancho = max_col - min_col + 1

prob_cube = np.zeros((alto, ancho, num_classes), dtype=np.float32)
pixel_mask = np.zeros((alto, ancho), dtype=bool)
pixel_map = {}

# ✅ MEJORA 1: Cargar todos los .npy de una vez con validación
all_arrays = []
valid_pixels = []
for pixel in pixel_data:
try:
data = np.load(pixel['path'])
if data.shape != (num_steps, 7):
logger.warning(f"Shape inválido en {pixel['id']}: {data.shape}, esperado ({num_steps}, 7)")
continue
all_arrays.append(data)
valid_pixels.append(pixel)
except Exception as e:
logger.warning(f"Error cargando {pixel['id']}: {e}")
continue

if not all_arrays:
raise FileNotFoundError("No se pudieron cargar archivos .npy válidos")

# ✅ MEJORA 2: Stack en un solo tensor
batch_tensor = torch.from_numpy(np.stack(all_arrays)).float() # (N, num_steps, 7)

# ✅ MEJORA 3: Inferencia en mini-batches
all_probs = []
with torch.no_grad():
for i in range(0, len(batch_tensor), batch_size):
batch = batch_tensor[i:i + batch_size]
output = model(batch)
probs = torch.softmax(output, dim=1).numpy()
all_probs.append(probs)

all_probs = np.concatenate(all_probs, axis=0) # (N, num_classes)

# Asignar al cubo
for i, pixel in enumerate(valid_pixels):
fila_idx = pixel['fila'] - min_fila
col_idx = pixel['columna'] - min_col
prob_cube[fila_idx, col_idx, :] = all_probs[i]
pixel_mask[fila_idx, col_idx] = True
pixel_map[(fila_idx, col_idx)] = pixel

logger.info(f"Cubo construido: {alto}x{ancho}x{num_classes} ({len(valid_pixels)} píxeles válidos)")
return prob_cube, pixel_mask, pixel_map, (min_fila, min_col)

Estimado post-refactor:

MétricaValor
Forward passes5 (2,414 / 512 batches)
np.load() syscalls1 (con Mejora 3, carga .npz único)
Overhead por forward pass~3-5ms × 5 = ~25ms
Tiempo de cómputo real~0.1ms × 2,414 = ~241ms
Tiempo total estimado~0.3-0.5 segundos
Utilización CPU~85% (cómputo domina sobre overhead)
Speedup vs medido~11-19x (de 5.58s a 0.3-0.5s)

¿Por qué se propone?

Aunque la clasificación solo toma 5.58 seg (0.7% del total), el patrón de inferencia píxel-por-píxel es ineficiente. PyTorch está optimizado para operaciones matriciales en batch. Con batch_size=1, el overhead (setup de tensores, sincronización interna del LSTM, allocación de memoria) domina. Con batch_size=512, el LSTM procesa 512 secuencias en paralelo usando instrucciones SIMD del CPU. Además, la validación de shape previene errores crípticos en producción.


Mejora 3: Miles de archivos .npy individuales ✅ HECHO

Archivos:

  • features/services/seleccionar_ciclo_fenologico.py — guarda 1 archivo por píxel
  • features/services/clasificacion_suavizado_espacial.py — carga 1 archivo por píxel

Código actual (guardar):

# En seleccionar_ciclo_fenologico.py
for i, (fila, col) in enumerate(zip(filas, columnas)):
# ... procesamiento ...
if array_ciclo is not None:
ruta_salida = os.path.join(directorio_salida, f"pixel_{i+1}_ciclo.npy")
np.save(ruta_salida, array_ciclo) # 1 syscall write por píxel

Código actual (cargar):

# En clasificacion_suavizado_espacial.py
for pixel in pixel_data:
temporal_data = np.load(pixel['path']) # 1 syscall read por píxel

Medición real (benchmark 2026-03-17):

MétricaValor medido
Archivos .npy creados2,414
Tamaño total en disco3.39 MB
Tamaño por archivo~1.4 KB (24×7×4 bytes + header)
Overhead filesystem por archivo~4 KB (inodo + metadata)
Espacio real en disco3.39 MB datos + ~9.7 MB overhead inodos = ~13 MB
NotaCon modelo 8 steps serían ~0.55 KB/archivo → ~1.33 MB datos

Vulnerabilidades / errores:

  • Límite de inodos: EFS y algunos filesystems tienen límites de inodos. 5,500 archivos por tarea × 100 tareas concurrentes = 550,000 archivos. Puede agotar inodos.
  • Listado lento: os.listdir() en un directorio con 5,500 archivos es O(n). En EFS puede tardar segundos.
  • Sin atomicidad: Si el proceso muere a mitad del guardado, quedan archivos parciales sin forma de saber cuáles están completos.
  • Cleanup manual: No hay limpieza automática. Los archivos se acumulan indefinidamente.

Mejora propuesta (guardar):

# En seleccionar_ciclo_fenologico.py
def procesar_pixeles_desde_geotiff(directorio_raster, ruta_geojson, directorio_salida, num_steps=24):
os.makedirs(directorio_salida, exist_ok=True)

# ... (carga de imágenes y procesamiento igual) ...

# ✅ Acumular en memoria en vez de guardar uno por uno
ciclos_data = []
ciclos_metadata = []

for i, (fila, col) in enumerate(zip(filas, columnas)):
try:
array_bruto = np.array([img[:, fila, col] for img in imagenes])
array_ciclo, status = procesar_pixel_ciclo_fenologico(array_bruto, num_steps)

centro_x, centro_y = transform * (col + 0.5, fila + 0.5)
lon, lat = transformador.transform(centro_x, centro_y)

pixel_meta = {
"pixel_id": f"pixel_{i+1}",
"fila": int(fila),
"columna": int(col),
"latitud": lat,
"longitud": lon,
"status": status,
"idx": len(ciclos_data) if array_ciclo is not None else None
}
ciclos_metadata.append(pixel_meta)

if array_ciclo is not None and status in ['Exitoso', 'Ciclo_Incompleto']:
ciclos_data.append(array_ciclo)

except Exception as e:
ciclos_metadata.append({
"pixel_id": f"pixel_{i+1}",
"fila": int(fila),
"columna": int(col),
"status": "Error",
"error": str(e),
"idx": None
})

# ✅ UN solo archivo con todos los ciclos
if ciclos_data:
ciclos_array = np.stack(ciclos_data) # (N_exitosos, num_steps, 7)
np.savez_compressed(
os.path.join(directorio_salida, "ciclos.npz"),
data=ciclos_array
)

# Metadata en JSON (igual que antes)
with open(os.path.join(directorio_salida, "metadata.json"), 'w') as f:
json.dump(ciclos_metadata, f, indent=2)

logger.info(f"Guardados {len(ciclos_data)} ciclos en ciclos.npz")

Mejora propuesta (cargar):

# En clasificacion_suavizado_espacial.py
def get_pixel_data_with_coords(npy_folder: str, metadata_path: str, num_steps: int = 24):
with open(metadata_path, 'r') as f:
metadata = json.load(f)

# ✅ UN solo load
npz_path = os.path.join(npy_folder, "ciclos.npz")
loaded = np.load(npz_path)
all_data = loaded['data'] # (N, num_steps, 7)

pixel_data = []
for pixel in metadata:
if pixel.get('idx') is not None:
pixel_data.append({
'id': pixel['pixel_id'],
'fila': pixel['fila'],
'columna': pixel['columna'],
'lat': pixel['latitud'],
'lon': pixel['longitud'],
'data': all_data[pixel['idx']] # Referencia directa al array
})

return pixel_data, all_data

Estimado post-refactor:

MétricaValor
Archivos creados2 (ciclos.npz + metadata.json)
Syscalls write2
Syscalls read2
Tamaño ciclos.npz (comprimido)~1-2 MB (2,414 ciclos de 24 steps)
Tamaño metadata.json~2 MB (23,783 píxeles con metadata)
Espacio real en disco~4 MB (vs ~13 MB antes)
Tiempo guardar~0.2-0.5 segundos
Tiempo cargar~0.1-0.3 segundos
Reducción de archivos2,414 → 2 (99.9% menos)

¿Por qué se propone?

El benchmark mostró 2,414 archivos de ~1.4 KB cada uno. El overhead de inodos (~4 KB por archivo) es ~3x mayor que los datos mismos. Además, np.savez_compressed usa zlib que comprime muy bien datos numéricos repetitivos (los valores de reflectancia tienen patrones). El resultado es menos espacio, menos I/O, menos inodos, y operaciones atómicas (si el .npz existe, está completo). Con modelo de 8 steps los archivos serían más chicos (~0.55 KB) y el ratio overhead/datos sería aún peor.


Mejora 4: Credenciales AWS impresas en stdout ✅ HECHO

Archivo: features/services/build_dataset.py — función extract_pixel_data()

Código actual:

def extract_pixel_data(self, url, geometry, use_s3=False):
try:
# ...
session = boto3.Session(
aws_access_key_id=self.rasterio_env['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=self.rasterio_env['AWS_SECRET_ACCESS_KEY'],
aws_session_token=self.rasterio_env['AWS_SESSION_TOKEN'],
region_name='us-west-2'
)
print(self.rasterio_env) # ⚠️ IMPRIME ACCESS KEY, SECRET KEY Y SESSION TOKEN
# ...
print(f"URL final para rasterio: {final_url}") # ⚠️ Imprime URLs internas

Estimado actual:

MétricaValor
Llamadas a print(self.rasterio_env) por tarea~240 (8 bandas × 30 granulos)
Datos sensibles expuestos por tareaAccessKeyId, SecretAccessKey, SessionToken
Destino del printstdout → logs de Celery → CloudWatch/Sentry

Vulnerabilidades / errores:

  • CRÍTICO — Exposición de credenciales: print(self.rasterio_env) imprime AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY y AWS_SESSION_TOKEN en texto plano. Estas credenciales van a:
    • Logs de Celery (stdout)
    • CloudWatch Logs (si está configurado)
    • Sentry (el event_level está en INFO, captura todo)
    • Cualquier sistema de log aggregation
  • Credenciales temporales pero peligrosas: Aunque son temporales (1 hora), permiten acceso a lp-prod-protected de NASA con RequestPayer=requester. Un atacante podría generar costos de transferencia.
  • URLs internas expuestas: Las URLs S3 revelan estructura de buckets y paths internos.

Mejora propuesta:

def extract_pixel_data(self, url, geometry, use_s3=False):
try:
# ...
session = boto3.Session(
aws_access_key_id=self.rasterio_env['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=self.rasterio_env['AWS_SECRET_ACCESS_KEY'],
aws_session_token=self.rasterio_env['AWS_SESSION_TOKEN'],
region_name='us-west-2'
)
# ✅ Eliminados los print de credenciales y URLs
logger.debug(f"Accediendo a raster: {url.split('/')[-1]}") # Solo el filename

Nuevo estimado:

MétricaValor
Credenciales expuestas0
Datos sensibles en logsNinguno

¿Por qué se propone?

Es una vulnerabilidad de seguridad directa. Las credenciales AWS en logs son un vector de ataque conocido. Sentry captura eventos de nivel INFO, lo que significa que cada tarea envía ~240 copias de las credenciales temporales a un servicio externo. Aunque son temporales, durante su ventana de validez (1 hora) permiten acceso a datos de NASA con costos de transferencia a cargo de la cuenta.


Mejora 5: Credenciales AWS hardcodeadas en test.py ✅ HECHO

Archivo: test.py (raíz del proyecto)

Código actual:

import boto3

cred = {
"AWS_ACCESS_KEY_ID": "ASIAZLX6ZES4WMYECNB2",
"AWS_SECRET_ACCESS_KEY": "HdjhQU4t0r3bb0gndqNM6VLhIpHWt9G6471bF4xX",
"AWS_SESSION_TOKEN": "IQoJb3JpZ2luX2VjEIL//////////wEaCXVzLXdlc3QtMiJH...",
"AWS_REGION": "us-west-2",
}

s3 = boto3.client("s3",
aws_access_key_id=cred["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=cred["AWS_SECRET_ACCESS_KEY"],
region_name=cred["AWS_REGION"],
aws_session_token=cred["AWS_SESSION_TOKEN"],
)

Vulnerabilidades / errores:

  • CRÍTICO — Credenciales en repositorio Git: Aunque son temporales (session token), el archivo está commiteado. El historial de Git conserva todas las versiones anteriores, incluyendo credenciales que pudieron ser válidas.
  • Account ID expuesto: El AWS_ACCESS_KEY_ID con prefijo ASIA indica credenciales temporales de STS. El account ID 643705676985 se puede extraer del ARN implícito.
  • Bucket de producción: El código accede a lp-prod-protected (bucket de NASA LPDAAC). Si las credenciales fueran válidas, cualquiera con acceso al repo podría generar costos.
  • El archivo se copia al container Docker: El Dockerfile hace COPY ./ /var/async/, lo que incluye test.py en la imagen de producción.

Mejora propuesta:

# test.py — Usar variables de entorno o perfil AWS
import boto3
import os

s3 = boto3.client("s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION", "us-west-2"),
aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
)

Y agregar al .gitignore y .dockerignore:

test.py

¿Por qué se propone?

Credenciales en código fuente es el error de seguridad #1 en cloud. Aunque estas son temporales y probablemente ya expiraron, el patrón es peligroso. Si alguien copia el patrón con credenciales de larga duración (IAM user keys), el impacto es severo. Además, el archivo se incluye en la imagen Docker de producción sin necesidad.


Mejora 6: USE_S3_ACCESS se lee como string pero se usa como booleano ✅ HECHO

Archivos:

  • core/settings.py — define la variable
  • features/services/build_dataset.py — la usa

Código actual:

# core/settings.py
USE_S3_ACCESS = os.getenv('USE_S3_ACCESS', 'False') # Siempre es string

# build_dataset.py — se pasa directamente
pixel_data, meta = self.extract_pixel_data(http_url, geometry, use_s3=USE_S3_ACCESS)

# extract_pixel_data recibe:
def extract_pixel_data(self, url, geometry, use_s3=False):
if use_s3: # ⚠️ 'False' (string) es truthy en Python!

Vulnerabilidades / errores:

  • Bug lógico: os.getenv('USE_S3_ACCESS', 'False') retorna el string 'False', no el booleano False. En Python, cualquier string no vacío es truthy:
    bool('False') == True  # ⚠️
    bool('') == False
    bool('True') == True
  • Consecuencia: Si USE_S3_ACCESS=False en el .env, el código interpreta 'False' como True y usa acceso S3 en vez de HTTP.
  • Impacto en costos: El acceso S3 requiere credenciales temporales (request adicional a s3credentials endpoint), mientras que HTTP usa el Bearer token directamente. Si se fuerza S3 sin querer, se hacen requests innecesarios.
  • Falla silenciosa: No hay error, simplemente usa el modo incorrecto.

Mejora propuesta:

# core/settings.py
USE_S3_ACCESS = os.getenv('USE_S3_ACCESS', 'false').lower() in ('true', '1', 'yes')

Nuevo estimado:

MétricaAntesDespués
USE_S3_ACCESS=False en .envUsa S3 (bug)Usa HTTP (correcto)
USE_S3_ACCESS=True en .envUsa S3Usa S3
USE_S3_ACCESS no definidoUsa S3 (bug, default 'False' es truthy)Usa HTTP (correcto)

¿Por qué se propone?

Es un bug silencioso que cambia el comportamiento del sistema. El default debería ser HTTP (más simple, sin credenciales temporales), pero por el bug de string-como-booleano, el default real es S3. Esto genera requests innecesarios al endpoint de credenciales y puede causar fallos si las credenciales S3 no están disponibles.


Mejora 7: db.py carga archivo JSON a nivel de módulo ✅ HECHO

Archivo: features/services/db.py

Código actual:

import json
from pathlib import Path
from core.database_handler import db_query

# ⚠️ Se ejecuta al importar el módulo, no al llamar una función
_LABEL_MAPPING_PATH = Path(__file__).parent.parent / "models" / "label_mapping_24steps.json"
with open(_LABEL_MAPPING_PATH, "r") as f:
_NAME_TO_CROP_ID = json.load(f) # Si el archivo no existe, CRASH al importar

Vulnerabilidades / errores:

  • Crash al importar: Si label_mapping_24steps.json no existe o está corrupto, el import de db.py falla. Como db.py se importa en features/interface/main.py, y este se importa en run.py, el worker de Celery entero no arranca.
  • Hardcodeo de "24steps": Siempre carga el mapping de 24 steps, incluso si la tarea usa modelo de 8 o 15 steps. Si los mappings difieren entre modelos, los crop_ids insertados en DB serían incorrectos.
  • Sin manejo de errores: No hay try/except. Un JSON malformado mata el proceso.
  • Acoplamiento: El módulo de base de datos depende de un archivo de modelo de ML. Si se reorganizan los modelos, db.py se rompe.

Mejora propuesta:

import json
import logging
from pathlib import Path
from core.database_handler import db_query

logger = logging.getLogger(__name__)

_MODELS_DIR = Path(__file__).parent.parent / "models"

def _load_label_mapping(num_steps: int = 24, fine_tuning: bool = True) -> dict:
"""Carga label mapping bajo demanda con fallback"""
suffix = '_ft' if fine_tuning else ''
path = _MODELS_DIR / f"label_mapping_{num_steps}steps{suffix}.json"
try:
with open(path, "r") as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Label mapping no encontrado: {path}")
return {}
except json.JSONDecodeError as e:
logger.error(f"Label mapping corrupto: {path}{e}")
return {}


def update_db(uuid_detection="", status="", error_message=None, results=None):
query = """
UPDATE sales_adis_detections
SET process_status = %(status)s,
error_message = %(error_message)s,
results = %(results)s
WHERE uuid_detection = %(uuid_detection)s
RETURNING *
"""
db_query(query, {
"status": status,
"error_message": error_message,
"results": json.dumps(results) if results is not None else None,
"uuid_detection": uuid_detection
}, True).fetchone()


def insert_detection_crops(uuid_detection: str, resultados: dict, num_steps: int = 24, fine_tuning: bool = True):
"""Inserta un registro por cultivo en sales_adis_detection_crops"""
name_to_crop_id = _load_label_mapping(num_steps, fine_tuning)
if not name_to_crop_id:
logger.error("No se pudo cargar label mapping, saltando insert de crops")
return

row = db_query(
"SELECT id FROM sales_adis_detections WHERE uuid_detection = %(uuid)s",
{"uuid": uuid_detection}, True
).fetchone()

if not row:
raise ValueError(f"No se encontró detection con uuid_detection={uuid_detection}")

detection_id = row["id"]

for crop_name, data in resultados.items():
if crop_name == "clase_99":
crop_id = 99
else:
crop_id = name_to_crop_id.get(crop_name)
if crop_id is None:
continue

db_query(
"""
INSERT INTO sales_adis_detection_crops (detection_id, crop_id, confidence, area)
VALUES (%(detection_id)s, %(crop_id)s, %(confidence)s, %(area)s)
RETURNING *
""",
{
"detection_id": detection_id,
"crop_id": crop_id,
"confidence": data["confianza_promedio"],
"area": data["superficie_ha"]
}, True
).fetchone()

Estimado:

MétricaAntesDespués
Archivo faltanteWorker no arrancaWarning en log, tarea falla gracefully
JSON corruptoWorker no arrancaWarning en log, tarea falla gracefully
Modelo 8 stepsUsa mapping de 24 (potencial bug)Usa mapping correcto
Overhead de carga0 (carga al import)~1ms (carga bajo demanda, una vez por tarea)

¿Por qué se propone?

Un módulo de base de datos no debería depender de un archivo de ML para poder importarse. La carga a nivel de módulo es un anti-pattern que convierte un error recuperable (archivo faltante) en un error fatal (worker no arranca). La carga bajo demanda con manejo de errores permite que el worker arranque siempre y que las tareas fallen individualmente con mensajes claros.


Mejora 8: Manipulación de sys.path en múltiples archivos ✅ HECHO

Archivos afectados:

  • features/services/inferencia_completa.py
  • features/services/clasificacion_suavizado_espacial.py
  • features/services/seleccionar_ciclo_fenologico.py

Código actual:

# inferencia_completa.py
import os, sys
_services_dir = os.path.dirname(os.path.abspath(__file__))
_project_root = os.path.abspath(os.path.join(_services_dir, '..', '..'))
sys.path.insert(0, _services_dir)
sys.path.insert(0, _project_root)
# ...
from build_dataset import EarthdataProcessor # Import sin prefijo de paquete
from seleccionar_ciclo_fenologico import procesar_pixeles_desde_geotiff
from clasificacion_suavizado_espacial import run_classification_with_smoothing

# clasificacion_suavizado_espacial.py
import os, sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'utils'))
# ...
from rasterizar_imagen_hsl import rasterizar_imagen_utm # Import sin prefijo
from simple_classifier_lstm import TemporalClassifierOnly

# seleccionar_ciclo_fenologico.py
import os, sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'utils'))

Vulnerabilidades / errores:

  • Colisión de nombres: sys.path.insert(0, _services_dir) pone el directorio de services al inicio del path. Si hay un módulo estándar de Python con el mismo nombre que un service (ej: json, logging), el service lo sobreescribe.
  • Imports no determinísticos: El orden de sys.path depende de qué módulo se importa primero. Si Celery importa en orden diferente al esperado, los imports pueden fallar.
  • No funciona con paquetes: from build_dataset import ... no es un import de paquete Python válido. No se puede usar con herramientas como mypy, pytest, pylint, o IDEs con autocompletado.
  • Fragilidad: Si se mueve un archivo de carpeta, hay que actualizar los sys.path.insert en todos los archivos que lo importan.

Mejora propuesta:

Convertir features/services/ y features/utils/ en paquetes Python reales con __init__.py y usar imports absolutos:

# features/services/__init__.py  (crear archivo vacío)
# features/utils/__init__.py (crear archivo vacío)
# features/services/inferencia_completa.py — SIN sys.path.insert
from features.services.build_dataset import EarthdataProcessor
from features.services.seleccionar_ciclo_fenologico import procesar_pixeles_desde_geotiff
from features.services.clasificacion_suavizado_espacial import run_classification_with_smoothing
from core.settings import EARTHDATA_USERNAME, EARTHDATA_PASSWORD

# features/services/clasificacion_suavizado_espacial.py — SIN sys.path.insert
from features.services.rasterizar_imagen_hsl import rasterizar_imagen_utm
from features.services.simple_classifier_lstm import TemporalClassifierOnly

# features/services/seleccionar_ciclo_fenologico.py — SIN sys.path.insert
from features.utils.utils_logging import configurar_logging
from features.utils.utils_validacion import validar_num_steps

Estimado:

MétricaAntesDespués
sys.path.insert en el proyecto6+ llamadas0
Compatibilidad con pytest❌ Requiere conftest.py con hacks✅ Funciona directo
Compatibilidad con mypy/pylint❌ No resuelve imports✅ Resuelve imports
Autocompletado en IDE❌ Parcial✅ Completo
Riesgo de colisión de nombresAltoNinguno

¿Por qué se propone?

sys.path.insert es un hack que rompe el sistema de paquetes de Python. Hace que los imports sean frágiles, no determinísticos y incompatibles con herramientas de análisis estático. La solución es trivial: crear __init__.py en las carpetas y usar imports absolutos desde la raíz del proyecto (que ya está en sys.path porque Celery se ejecuta desde ahí).


Mejora 9: Archivos temporales sin limpieza automática ✅ HECHO

Archivos afectados:

  • features/services/build_dataset.py — genera geotiff_output/lote_{uuid}/
  • features/services/seleccionar_ciclo_fenologico.py — genera ciclos_fenologicos/lote_{uuid}_{steps}steps/
  • features/services/inferencia_completa.py — genera resultados_clasificacion/{uuid}/ y temp_geojson_{uuid}.geojson

Código actual:

# build_dataset.py — crea directorio pero nunca lo borra
geotiffs_dir = self.output_dir / f"lote_{lote_id}"
geotiffs_dir.mkdir(parents=True, exist_ok=True)
# ... guarda GeoTIFFs ...
# ❌ No hay cleanup

# seleccionar_ciclo_fenologico.py
os.makedirs(directorio_salida, exist_ok=True)
# ... guarda .npy ...
# ❌ No hay cleanup

# inferencia_completa.py — el único que intenta limpiar algo
geojson_path = f"temp_geojson_{uuid_detection}.geojson"
with open(geojson_path, 'w') as f:
json.dump(geojson_temp, f)
# ...
if os.path.exists(geojson_path):
os.remove(geojson_path) # ✅ Solo este se limpia

Estimado actual:

MétricaValor
Espacio por tarea (GeoTIFFs)~50-200 MB
Espacio por tarea (ciclos .npy)~5-50 MB
Espacio por tarea (resultados)~1-5 MB
Total por tarea~60-250 MB
Tareas por día (estimado)~100
Acumulación diaria~6-25 GB
Acumulación mensual~180-750 GB

Vulnerabilidades / errores:

  • Disco lleno: Sin cleanup, el disco se llena eventualmente. En ECS con EFS, esto puede afectar a todos los workers.
  • Costos de storage: EFS cobra por GB almacenado. 750 GB × $0.30/GB = $225/mes solo en archivos temporales.
  • Datos huérfanos: Si una tarea falla a mitad, quedan archivos parciales que nunca se usan ni se limpian.
  • Sin trazabilidad: No hay forma de saber qué archivos corresponden a qué tarea o cuándo se crearon.

Mejora propuesta:

# features/services/inferencia_completa.py
import shutil
from contextlib import contextmanager

@contextmanager
def temp_directories(uuid_detection: str, num_steps: int):
"""Context manager que crea y limpia directorios temporales"""
dirs = {
'geotiff': f"geotiff_output/lote_{uuid_detection}",
'ciclos': f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps",
'resultados': f"resultados_clasificacion/{uuid_detection}",
'temp_geojson': f"temp_geojson_{uuid_detection}.geojson"
}

# Crear directorios
for key, path in dirs.items():
if key != 'temp_geojson':
os.makedirs(path, exist_ok=True)

try:
yield dirs
finally:
# Limpiar todo al terminar (éxito o error)
for key, path in dirs.items():
try:
if os.path.isdir(path):
shutil.rmtree(path)
logger.debug(f"Limpiado directorio: {path}")
elif os.path.isfile(path):
os.remove(path)
logger.debug(f"Limpiado archivo: {path}")
except Exception as e:
logger.warning(f"Error limpiando {path}: {e}")


def inferencia_completa(uuid_detection: str, poligono_wkt: str, ...):
with temp_directories(uuid_detection, num_steps) as dirs:
# Paso 1: Descargar imágenes
if not ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
return None # El finally limpia automáticamente

# Paso 2: Ciclos fenológicos
if not ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, num_steps):
return None # El finally limpia automáticamente

# Paso 3: Clasificación (sube a S3, no necesita archivos locales después)
stats = ejecutar_clasificacion(...)

return resultado
# Al salir del with, se limpian todos los directorios

Nuevo estimado:

MétricaAntesDespués
Espacio residual por tarea60-250 MB0
Acumulación mensual180-750 GB0
Costo storage mensual$55-225$0
Archivos huérfanosNo

¿Por qué se propone?

Los archivos temporales son exactamente eso: temporales. Una vez que el GeoTIFF final está en S3 y los resultados en la base de datos, los archivos intermedios no tienen valor. El context manager garantiza limpieza incluso si la tarea falla con excepción, evitando acumulación de datos huérfanos.


Mejora 10: Código muerto en build_dataset.py ✅ HECHO

Archivo: features/services/build_dataset.py — clase EarthdataProcessor

Código actual:

La clase EarthdataProcessor tiene ~600 líneas. Las siguientes funciones nunca se llaman desde el flujo del worker:

# NUNCA se llama (show_granule_images está comentado en process_lote_earthdata)
def show_granule_images(self, granule_data, lote_id, geometry):
"""Mostrar imágenes de bandas para un granulo usando matplotlib"""
import matplotlib.pyplot as plt
# ... 60 líneas de código de visualización ...

# NUNCA se llama (save_netcdf=False siempre en el worker)
def save_lote_data(self, lote_result):
"""Guardar datos de un lote"""
# ... 10 líneas ...

# NUNCA se llama (solo desde save_lote_data)
def create_xarray_dataset(self, lote_result):
"""Crear dataset XArray con todos los valores de pixels"""
# ... 90 líneas ...

# NUNCA se llama desde el worker (solo desde CLI con --json)
def process_all_lotes(self, json_path, max_lotes=None):
"""Procesar todos los lotes"""
# ... 50 líneas ...

# NUNCA se llama (main() es para CLI)
def main():
"""Función principal"""
# ... 50 líneas ...

Estimado actual:

MétricaValor
Líneas de código muerto~260 líneas (~43% del archivo)
Imports innecesarios por código muertoxarray, matplotlib, seaborn
Tiempo de import de matplotlib~500ms-1s (primera vez)
RAM de matplotlib cargado~30-50 MB
Tamaño de matplotlib en Docker image~50 MB

Vulnerabilidades / errores:

  • Import de matplotlib en producción: Aunque show_granule_images nunca se llama, matplotlib se importa condicionalmente dentro de la función. Sin embargo, xarray sí se importa a nivel de módulo y arrastra netCDF4 como dependencia.
  • Superficie de ataque: Más código = más dependencias = más CVEs potenciales. matplotlib y xarray tienen historial de vulnerabilidades.
  • Docker image inflada: Las dependencias de visualización (matplotlib, seaborn) y datos (xarray, netCDF4) agregan ~150 MB a la imagen Docker sin necesidad.
  • Confusión de mantenimiento: Un desarrollador nuevo no sabe qué funciones son parte del flujo real y cuáles son legacy.

Mejora propuesta:

# build_dataset.py — Solo lo que usa el worker
import json
import requests
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import logging
import rasterio
from rasterio.mask import mask
from rasterio.session import AWSSession
from shapely.geometry import shape
import os
import boto3

from features.services.earthdata_token_manager import EarthdataTokenManager
from core.settings import EARTHDATA_USERNAME, EARTHDATA_PASSWORD, USE_S3_ACCESS

logger = logging.getLogger(__name__)


class EarthdataProcessor:
"""Procesador de datos satelitales usando NASA Earthdata"""

FMASK_VALID_VALUES = {0, 64, 128, 192}
MIN_GRANULES = {8: 6, 15: 14, 24: 22}
FMASK_MIN_QUALITY = 0.70

def __init__(self, username, password, output_dir="geotiffs"):
self.username = username
self.password = password
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.aws_credentials = None
self.credentials_expiry = None
self.token_manager = EarthdataTokenManager(username, password)
self.access_token = None
self.hls_base_url = "https://cmr.earthdata.nasa.gov/search/granules.json"
self.tile_centroids = self._load_tile_centroids()

# ✅ Solo los métodos que usa el worker:
# - authenticate_if_needed()
# - search_hls_granules()
# - _load_tile_centroids()
# - filter_granules_by_closest_tile()
# - credentials_expired()
# - get_aws_credentials()
# - configure_gdal_s3()
# - extract_pixel_data()
# - get_all_pixel_coordinates()
# - _fmask_quality_ok()
# - process_lote_earthdata()
# - save_granule_geotiff()

# ❌ ELIMINADOS:
# - show_granule_images() → era debug visual
# - save_lote_data() → save_netcdf nunca es True
# - create_xarray_dataset() → solo desde save_lote_data
# - process_all_lotes() → solo desde CLI
# - main() → solo desde CLI

Y en requirements.txt, eliminar dependencias no usadas por el worker:

- xarray>=2023.1.0
- netCDF4>=1.6.0
- matplotlib>=3.7.0
- seaborn>=0.12.0

Nuevo estimado:

MétricaAntesDespués
Líneas en build_dataset.py~600~340
Dependencias Python2218
Tamaño Docker image~1.8 GB (estimado)~1.5 GB
RAM al importar módulo~80 MB (con xarray)~30 MB
Tiempo de build Docker~3-5 min~2-3 min

¿Por qué se propone?

Menos código = menos bugs, menos dependencias, menos superficie de ataque, imagen Docker más chica, builds más rápidos. Las funciones de visualización y CLI son útiles para desarrollo local pero no pertenecen al worker de producción. Si se necesitan, pueden vivir en un script separado fuera del paquete principal.


Mejora 11: settings.py con 70% de variables no usadas ✅ HECHO

Archivo: core/settings.py

Código actual:

# Variables que SÍ usa este servicio (~15):
APP_NAME = os.environ.get("APP_NAME", "boilerplate")
ENVIRONMENT = os.environ.get("ENVIRONMENT")
SENTRY_DSN = os.environ.get('SENTRY_DSN', '...')
PG_DATABASE = os.environ.get("PG_DATABASE")
PG_USER = os.environ.get("PG_USER")
PG_HOST = os.environ.get("PG_HOST")
PG_PASSWORD = os.environ.get("PG_PASSWORD")
PG_PORT = os.environ.get("PG_PORT", "5432")
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
REDIS_AUTH = os.environ.get("REDIS_AUTH")
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY")
AWS_S3_BUCKET_CROP_DETECTION = os.environ.get("AWS_S3_BUCKET_CROP_DETECTION")
USE_S3_ACCESS = os.getenv('USE_S3_ACCESS', 'False')
EARTHDATA_USERNAME = os.getenv('EARTHDATA_USERNAME')
EARTHDATA_PASSWORD = os.getenv('EARTHDATA_PASSWORD')
CELERY_BROKER = os.environ.get("CELERY_BROKER")

# Variables que NO usa este servicio (~45):
AUTH_JWT_KEY = os.environ.get("AUTH_JWT_KEY")
AUTH_DOMAIN = os.environ.get("AUTH_DOMAIN")
SMTP_HOST = os.environ.get("SMTP_HOST")
SMTP_PORT = os.environ.get("SMTP_PORT")
SMTP_USER = os.environ.get("SMTP_USER")
SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD")
EMAIL = os.environ.get("EMAIL")
NOTIFICATION_CENTER_URL = os.environ.get("NOTIFICATION_CENTER_URL")
VC_KEY = os.environ.get("VISUALCROSSING_KEY")
AGD_S3_ACCESS_KEY = os.environ.get("AGD_S3_ACCESS_KEY")
AGD_S3_SECRET_KEY = os.environ.get("AGD_S3_SECRET_KEY")
CFT_API_APP_KEY = os.environ.get("CFT_API_APP_KEY", "")
CFT_API_KEY = os.environ.get("CFT_API_KEY", "")
PASS_REPORTES_PERIODICOS = os.environ.get("PASS_REPORTES_PERIODICOS")
CLIENT_REPORTES_PERIODICOS = os.environ.get("CLIENT_REPORTES_PERIODICOS")
FIWARE_KEY_PRESCRIPTION = os.environ.get("FIWARE_KEY_PRESCRIPTION")
FIWARE_KEY_SAMPLING = os.environ.get("FIWARE_KEY_SAMPLING")
PM_ADMIN_USER = os.environ.get("PM_ADMIN_USER")
PM_ADMIN_PASS = os.environ.get("PM_ADMIN_PASS")
PM_BASE_URL = os.environ.get("PM_BASE_URL")
PEDIDOS_INSUMOS_SECRET_ID = os.environ.get("PEDIDOS_INSUMOS_SECRET_ID")
PEDIDOS_INSUMOS_CLIENT_ID = os.environ.get("PEDIDOS_INSUMOS_CLIENT_ID")
NIDERA_SF_STAGING = os.environ.get("NIDERA_SF_STAGING")
NIDERA_SF_PRODUCTION = os.environ.get("NIDERA_SF_PRODUCTION")
NIDERA_SF_URL_STAGING = os.environ.get("NIDERA_SF_URL_STAGING")
NIDERA_SF_URL_PRODUCTION = os.environ.get("NIDERA_SF_URL_PRODUCTION")
EE_CREDENTIALS = os.environ.get("EE_CREDENTIALS")
EE_PRIVATE_KEY = os.environ.get("EE_PRIVATE_KEY")
BCR_API_KEY = os.environ.get("BCR_API_KEY")
BCR_SECRET = os.environ.get("BCR_SECRET")
CLAVE_SIAR = os.environ.get("CLAVE_SIAR")
ALBOR_EXTENSION_CLIENT_ID = os.environ.get("ALBOR_EXTENSION_CLIENT_ID")
ALBOR_CLIENT_ID = os.environ.get("ALBOR_CLIENT_ID")
ALBOR_CLIENT_SECRET = os.environ.get("ALBOR_CLIENT_SECRET")
ALBOR_AUTH_ENDPOINT = os.environ.get("ALBOR_AUTH_ENDPOINT")
ALBOR_TOKEN_ENDPOINT = os.environ.get("ALBOR_TOKEN_ENDPOINT")
FINNEGANS_BASE_URL = os.environ.get("FINNEGANS_BASE_URL")
API_AGLAB = os.environ.get("API_AGLAB")
AGLAB_USER = os.environ.get("AGLAB_USER")
AGLAB_PASS = os.environ.get("AGLAB_PASS")
CALIPSO_USER = os.environ.get("CALIPSO_USER")
CALIPSO_PASS = os.environ.get("CALIPSO_PASS")
PETERSON_USER = os.environ.get("PETERSON_USER")
PETERSON_KEY = os.environ.get("PETERSON_KEY")
PETERSON_USER_DEV = os.environ.get("PETERSON_USER_DEV")
PETERSON_KEY_DEV = os.environ.get("PETERSON_KEY_DEV")
PETERSON_DOMAIN = os.environ.get("PETERSON_DOMAIN")
TOKEN_PUSH_NOTIFICATION = os.environ.get("TOKEN_PUSH_NOTIFICATION")
PERMISSIONS_API_URL = os.environ.get("PERMISSIONS_API_URL", "https://api.auravant.com")
# ... y más

Vulnerabilidades / errores:

  • Superficie de ataque innecesaria: El .env del container necesita definir ~60 variables aunque solo se usen ~15. Cada variable extra es un secreto más que gestionar, rotar y proteger.
  • Confusión operativa: Un DevOps que configura el servicio no sabe cuáles son obligatorias y cuáles son basura heredada. Si falta ALBOR_CLIENT_SECRET ¿se rompe algo? No, pero no hay forma de saberlo sin leer todo el código.
  • Módulos muertos importados: settings.py se importa desde core/__init__.py, que a su vez importa logging_config.py. Pero emails.py, permissions.py, aura_events.py y models.py también importan settings y están disponibles aunque nunca se usen.

Mejora propuesta:

# core/settings.py — Solo lo que usa este servicio
import os

# App
APP_NAME = os.environ.get("APP_NAME", "crop-detection-worker")
ENVIRONMENT = os.environ.get("ENVIRONMENT", "development")
SENTRY_DSN = os.environ.get("SENTRY_DSN", "")

# Logging
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
LOG_PATH = os.environ.get("LOG_PATH")
SERVER_SOFTWARE = os.environ.get("SERVER_SOFTWARE", "")

# PostgreSQL (lectura)
DB_CONNECTION_POOL_MINCONN = int(os.environ.get("DB_CONNECTION_POOL_MINCONN", 1))
DB_CONNECTION_POOL_MAXCONN = int(os.environ.get("DB_CONNECTION_POOL_MAXCONN", 1))
PG_DATABASE = os.environ.get("PG_DATABASE")
PG_USER = os.environ.get("PG_USER")
PG_HOST = os.environ.get("PG_HOST")
PG_PASSWORD = os.environ.get("PG_PASSWORD")
PG_PORT = os.environ.get("PG_PORT", "5432")

# Redis
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT", "6379")
REDIS_AUTH = os.environ.get("REDIS_AUTH")

# AWS (resultados S3)
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY")
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")
AWS_S3_BUCKET_CROP_DETECTION = os.environ.get("AWS_S3_BUCKET_CROP_DETECTION")

# NASA Earthdata
EARTHDATA_USERNAME = os.environ.get("EARTHDATA_USERNAME")
EARTHDATA_PASSWORD = os.environ.get("EARTHDATA_PASSWORD")
USE_S3_ACCESS = os.environ.get("USE_S3_ACCESS", "false").lower() in ("true", "1", "yes")

# Celery
CELERY_BROKER = os.environ.get("CELERY_BROKER")

Y eliminar los módulos no usados de core/:

  core/
├── __init__.py
├── settings.py
├── database_handler.py
├── boto_handler.py
├── redis_handler.py
├── logging_config.py
├── errors.py
├── constants.py
- ├── emails.py ← NO SE USA
- ├── permissions.py ← NO SE USA
- ├── aura_events.py ← NO SE USA
- └── models.py ← NO SE USA

Estimado:

MétricaAntesDespués
Variables en settings.py~60~20
Líneas en settings.py~120~35
Módulos en core/117
Variables de entorno requeridas en .env~60~20
Secretos a gestionar~45 (muchos innecesarios)~8

¿Por qué se propone?

El archivo es un copy-paste del boilerplate de Auravant que se usa para todos los microservicios. Este servicio solo necesita: DB, Redis, AWS, Earthdata y Celery. Todo lo demás (SMTP, Fiware, Peterson, Nidera, Albor, Finnegans, etc.) es ruido que complica la configuración, aumenta la superficie de ataque y confunde a cualquiera que intente entender qué necesita el servicio para funcionar.


Mejora 12: Rutas hardcodeadas en múltiples archivos ✅ HECHO

Archivos afectados:

  • features/services/inferencia_completa.py
  • features/services/build_dataset.py
  • features/services/clasificacion_suavizado_espacial.py
  • features/services/seleccionar_ciclo_fenologico.py

Código actual:

# inferencia_completa.py
from build_dataset import EarthdataProcessor
processor = EarthdataProcessor(username=..., password=..., output_dir="geotiff_output") # hardcoded
# ...
directorio_raster = f"geotiff_output/lote_{uuid_detection}" # hardcoded
directorio_salida = f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps" # hardcoded
# ...
model_path = f"features/models/best_model_{num_steps}steps{model_suffix}.pth" # hardcoded
label_mapping_path = f"features/models/label_mapping_{num_steps}steps{model_suffix}.json" # hardcoded
npy_folder = f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps" # hardcoded (duplicado)
output_folder = f"resultados_clasificacion/{uuid_detection}" # hardcoded
# ...
resultado_path = f"resultados_clasificacion/{uuid_detection}/resultado_completo_{uuid_detection}.json" # hardcoded
geojson_path = f"temp_geojson_{uuid_detection}.geojson" # hardcoded

# seleccionar_ciclo_fenologico.py (CLI)
directorio_raster = f"{args.rasters_dir}/lote_{args.lote_id}" # parcialmente configurable
directorio_salida = f"{args.output}/lote_{args.lote_id}_{args.num_steps}steps" # parcialmente configurable

# clasificacion_suavizado_espacial.py (CLI)
model_path = f"features/models/best_model_{args.num_steps}steps{model_suffix}.pth" # hardcoded
label_mapping_path = f"features/models/label_mapping_{args.num_steps}steps{model_suffix}.json" # hardcoded

Vulnerabilidades / errores:

  • Duplicación: La ruta ciclos_fenologicos/lote_{uuid}_{steps}steps se construye en inferencia_completa.py (2 veces) y en seleccionar_ciclo_fenologico.py. Si se cambia en uno y no en otro, el pipeline se rompe silenciosamente.
  • Rutas relativas al CWD: Todas las rutas son relativas al directorio de trabajo. Si Celery se ejecuta desde un directorio diferente (ej: cambio en Dockerfile), todo falla.
  • Sin configuración externa: No se pueden cambiar las rutas sin modificar código. Imposible usar volúmenes diferentes en staging vs producción.
  • Patrón de nombres inconsistente: geotiff_output vs geotiffs (default en EarthdataProcessor.__init__), ciclos_fenologicos vs ciclos, etc.

Mejora propuesta:

# features/services/paths.py — Fuente única de verdad para rutas
import os
from pathlib import Path

# Base configurable via env var, default a directorio del proyecto
BASE_DIR = Path(os.environ.get("CROP_DETECTION_DATA_DIR", "/tmp/crop_detection"))

MODELS_DIR = Path(__file__).parent.parent / "models"


def geotiff_dir(uuid: str) -> Path:
return BASE_DIR / "geotiffs" / f"lote_{uuid}"


def ciclos_dir(uuid: str, num_steps: int) -> Path:
return BASE_DIR / "ciclos" / f"lote_{uuid}_{num_steps}steps"


def resultados_dir(uuid: str) -> Path:
return BASE_DIR / "resultados" / uuid


def model_path(num_steps: int, fine_tuning: bool = True) -> Path:
suffix = "_ft" if fine_tuning else ""
return MODELS_DIR / f"best_model_{num_steps}steps{suffix}.pth"


def label_mapping_path(num_steps: int, fine_tuning: bool = True) -> Path:
suffix = "_ft" if fine_tuning else ""
return MODELS_DIR / f"label_mapping_{num_steps}steps{suffix}.json"


def temp_geojson_path(uuid: str) -> Path:
return BASE_DIR / "tmp" / f"{uuid}.geojson"


def ciclos_metadata_path(uuid: str, num_steps: int) -> Path:
return ciclos_dir(uuid, num_steps) / "ciclos_fenologicos_metadata.json"
# inferencia_completa.py — Usa paths centralizados
from features.services.paths import (
geotiff_dir, ciclos_dir, resultados_dir,
model_path, label_mapping_path, temp_geojson_path, ciclos_metadata_path
)

def ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
processor = EarthdataProcessor(
username=EARTHDATA_USERNAME,
password=EARTHDATA_PASSWORD,
output_dir=str(geotiff_dir(uuid_detection).parent) # "geotiffs/"
)
# ...

def ejecutar_clasificacion(uuid_detection, num_steps, fine_tuning, ...):
stats = run_classification_with_smoothing(
npy_folder=str(ciclos_dir(uuid_detection, num_steps)),
metadata_path=str(ciclos_metadata_path(uuid_detection, num_steps)),
model_path=str(model_path(num_steps, fine_tuning)),
label_mapping_path=str(label_mapping_path(num_steps, fine_tuning)),
output_folder=str(resultados_dir(uuid_detection)),
output_name=uuid_detection,
# ...
)

Estimado:

MétricaAntesDespués
Lugares donde se construyen rutas~12 (en 4 archivos)1 (paths.py)
Riesgo de inconsistenciaAltoNinguno
Configurable via env varNo (CROP_DETECTION_DATA_DIR)
Cambiar directorio baseEditar 4 archivos1 variable de entorno

¿Por qué se propone?

Las rutas duplicadas son una fuente clásica de bugs silenciosos. Si alguien cambia el patrón de nombres en un archivo y no en otro, el paso 3 no encuentra los archivos del paso 2 y la tarea falla. Un módulo centralizado de rutas elimina la duplicación y permite configurar el directorio base via variable de entorno (útil para staging, testing, o usar volúmenes diferentes).


Mejora 13: Typo en nombre de archivo rasterizar_imagen_hsl.py ✅ HECHO

Archivo: features/services/rasterizar_imagen_hsl.py

Código actual:

features/services/rasterizar_imagen_hsl.py  ← "hsl" es incorrecto

El archivo se llama hsl pero debería ser hls:

  • HLS = Harmonized Landsat Sentinel (el producto satelital que usa el sistema)
  • HSL = Hue, Saturation, Lightness (modelo de color, no tiene nada que ver)

Imports actuales:

# clasificacion_suavizado_espacial.py
from rasterizar_imagen_hsl import rasterizar_imagen_utm # typo propagado

Vulnerabilidades / errores:

  • Confusión semántica: Un desarrollador nuevo puede pensar que el archivo hace conversión de colores HSL, no rasterización de datos HLS.
  • Búsqueda fallida: grep -r "HLS" no encuentra este archivo. grep -r "rasterizar" sí, pero la asociación con HLS se pierde.
  • Documentación inconsistente: El README habla de "HLS (Harmonized Landsat Sentinel)" pero el código dice "hsl".

Mejora propuesta:

# Renombrar archivo
mv features/services/rasterizar_imagen_hsl.py features/services/rasterizar_imagen_hls.py
# clasificacion_suavizado_espacial.py — Actualizar import
from features.services.rasterizar_imagen_hls import rasterizar_imagen_utm

Estimado:

MétricaAntesDespués
Nombre correcto
Archivos a modificar02 (renombrar + actualizar import)
RiesgoBajo (solo confusión)Ninguno

¿Por qué se propone?

Es un cambio trivial pero mejora la legibilidad y consistencia del código. El nombre correcto hace que el archivo sea autodocumentado: "rasteriza imágenes HLS" en vez de "rasteriza imágenes... ¿hsl? ¿qué es hsl?".


Mejora 14: No hay tests automatizados

Estado actual:

features/
├── test_manual/
│ └── main.py ← Script de ejecución manual, NO es un test
└── utils/
└── test_flujo_inferencia.py ← Probablemente otro script manual

No hay:

  • Tests unitarios
  • Tests de integración
  • Mocks de servicios externos (NASA, S3, PostgreSQL)
  • CI/CD que ejecute tests antes de deploy

Código actual (test_manual/main.py):

# Es un script de ejecución, no un test
if __name__ == "__main__":
result = startCropDetection(
id="1",
uuid_detection="test-uuid-001",
wkt="POLYGON ((-59.7920168 -35.718267, ...))",
model=24,
start_date="2026-01-10",
skip_download=True,
skip_ciclos=True
)
print(result) # ← No hay assertions, solo print

Vulnerabilidades / errores:

  • Regresiones silenciosas: Sin tests, un cambio en detect_planting() puede romper la detección de siembra y nadie se entera hasta que un cliente reporta resultados incorrectos.
  • Refactor peligroso: Cualquier refactor (como los propuestos en este documento) es riesgoso sin tests que validen que el comportamiento no cambió.
  • Deploy a ciegas: El CI/CD hace build y deploy sin validar que el código funciona. Un typo en un import rompe producción.
  • Dependencias externas no mockeadas: Para probar manualmente hay que tener credenciales de NASA, conexión a PostgreSQL, bucket S3, etc.

Mejora propuesta:

Crear estructura de tests con pytest:

tests/
├── __init__.py
├── conftest.py # Fixtures compartidos
├── unit/
│ ├── __init__.py
│ ├── test_detect_planting.py # Detección de siembra
│ ├── test_ndvi_calculation.py # Cálculo de NDVI
│ ├── test_fmask_filtering.py # Filtrado por calidad
│ ├── test_interpolation.py # Interpolación temporal
│ └── test_spatial_smoothing.py # Suavizado espacial
├── integration/
│ ├── __init__.py
│ ├── test_ciclo_fenologico.py # Pipeline de ciclos
│ └── test_clasificacion.py # Pipeline de clasificación
└── fixtures/
├── sample_ndvi_series.npy # Serie NDVI de ejemplo
├── sample_pixel_24steps.npy # Píxel procesado de ejemplo
└── sample_geotiff/ # GeoTIFF pequeño de prueba
# tests/conftest.py
import pytest
import numpy as np

@pytest.fixture
def sample_ndvi_with_planting():
"""Serie NDVI con patrón claro de siembra"""
# Simula: suelo bajo → crecimiento → pico → senescencia
return np.array([
0.15, 0.18, 0.20, 0.25, # Suelo/siembra
0.35, 0.45, 0.55, 0.65, # Crecimiento
0.75, 0.82, 0.85, 0.83, # Pico
0.78, 0.70, 0.60, 0.50, # Senescencia
0.40, 0.35, 0.30, 0.28,
0.25, 0.22, 0.20, 0.18
])

@pytest.fixture
def sample_ndvi_no_planting():
"""Serie NDVI sin patrón de siembra (pastura estable)"""
return np.array([0.45 + np.random.normal(0, 0.05) for _ in range(24)])

@pytest.fixture
def sample_pixel_array():
"""Array de píxel (24, 7) con datos realistas"""
return np.load("tests/fixtures/sample_pixel_24steps.npy")
# tests/unit/test_detect_planting.py
import pytest
import numpy as np
from features.services.seleccionar_ciclo_fenologico import detect_planting

class TestDetectPlanting:

def test_detects_clear_planting_pattern(self, sample_ndvi_with_planting):
"""Debe detectar siembra cuando hay patrón claro"""
result = detect_planting(sample_ndvi_with_planting)
assert result is not None
assert 0 <= result <= 5 # Siembra en los primeros timesteps

def test_returns_none_for_stable_vegetation(self, sample_ndvi_no_planting):
"""Debe retornar None para vegetación estable (pastura)"""
result = detect_planting(sample_ndvi_no_planting)
assert result is None

def test_returns_none_for_short_series(self):
"""Debe retornar None si la serie es muy corta"""
short_series = np.array([0.2, 0.3, 0.4])
result = detect_planting(short_series)
assert result is None

def test_respects_min_peak_height(self, sample_ndvi_with_planting):
"""No debe detectar si el pico no alcanza min_peak_height"""
low_peak = sample_ndvi_with_planting * 0.5 # Máximo ~0.42
result = detect_planting(low_peak, min_peak_height=0.7)
assert result is None

def test_respects_max_planting_ndvi(self):
"""No debe detectar si el NDVI inicial es muy alto"""
high_start = np.array([0.5, 0.55, 0.6, 0.7, 0.8, 0.85, 0.8, 0.7] + [0.5]*16)
result = detect_planting(high_start, max_planting_ndvi=0.35)
assert result is None
# tests/unit/test_spatial_smoothing.py
import pytest
import numpy as np
from features.services.clasificacion_suavizado_espacial import spatial_smoothing

class TestSpatialSmoothing:

def test_smoothing_reduces_noise(self):
"""El suavizado debe reducir ruido sal y pimienta"""
# Crear cubo con ruido
prob_cube = np.zeros((10, 10, 6))
prob_cube[:, :, 0] = 0.9 # Casi todo clase 0
prob_cube[5, 5, 0] = 0.1 # Un píxel ruidoso
prob_cube[5, 5, 1] = 0.9

pixel_mask = np.ones((10, 10), dtype=bool)

smoothed = spatial_smoothing(prob_cube, pixel_mask, kernel_size=3)

# El píxel ruidoso debe haberse suavizado
assert smoothed[5, 5, 0] > 0.5 # Ahora más cercano a vecinos
assert smoothed[5, 5, 1] < 0.5

def test_preserves_homogeneous_regions(self):
"""No debe cambiar regiones homogéneas"""
prob_cube = np.zeros((10, 10, 6))
prob_cube[:, :, 2] = 1.0 # Todo clase 2

pixel_mask = np.ones((10, 10), dtype=bool)

smoothed = spatial_smoothing(prob_cube, pixel_mask, kernel_size=3)

np.testing.assert_array_almost_equal(smoothed[:, :, 2], 1.0)

Y actualizar .gitlab-ci.yml:

stages:
- test # ← NUEVO
- security
- build
- deploy

test:
stage: test
image: python:3.11
script:
- pip install -r requirements.txt
- pip install pytest pytest-cov
- pytest tests/ -v --cov=features --cov-report=term-missing
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_REF_NAME == $CI_DEFAULT_BRANCH

Estimado:

MétricaAntesDespués
Tests automatizados0~20-30
Cobertura de código0%~60-70% (funciones críticas)
Tiempo de CI~5 min (build+deploy)~7 min (+2 min tests)
Confianza en refactorsBajaAlta
Regresiones detectadas pre-deploy0Mayoría

¿Por qué se propone?

Sin tests, cada deploy es un acto de fe. Los tests unitarios para funciones puras como detect_planting(), calculate_ndvi(), spatial_smoothing() son fáciles de escribir y dan alta confianza. No se necesita mockear NASA ni PostgreSQL para probar que la detección de siembra funciona correctamente con una serie NDVI sintética.


Mejora 15: EarthdataProcessor es un God Object

Archivo: features/services/build_dataset.py

Código actual:

La clase EarthdataProcessor tiene ~600 líneas y maneja 7 responsabilidades diferentes:

class EarthdataProcessor:
# 1. Autenticación con NASA Earthdata
def authenticate_if_needed(self): ...
def get_aws_credentials(self): ...
def configure_gdal_s3(self): ...

# 2. Búsqueda en catálogo CMR
def search_hls_granules(self, bbox, start_date, end_date, max_cloud_cover): ...

# 3. Selección de tiles
def _load_tile_centroids(self): ...
def filter_granules_by_closest_tile(self, granules, geometry): ...

# 4. Descarga y extracción de datos
def extract_pixel_data(self, url, geometry, use_s3): ...
def _fmask_quality_ok(self, fmask_url, geometry, use_s3): ...

# 5. Cálculo de coordenadas
def get_all_pixel_coordinates(self, pixel_data, meta): ...

# 6. Guardado de archivos
def save_granule_geotiff(self, granule_result, lote_id): ...
def save_lote_data(self, lote_result): ... # código muerto

# 7. Orquestación del pipeline
def process_lote_earthdata(self, lote_data, num_steps, save_netcdf): ...
def process_all_lotes(self, json_path, max_lotes): ... # código muerto

# 8. Visualización (código muerto)
def show_granule_images(self, granule_data, lote_id, geometry): ...

# 9. Creación de datasets (código muerto)
def create_xarray_dataset(self, lote_result): ...

Vulnerabilidades / errores:

  • Violación de Single Responsibility: Una clase que hace autenticación, búsqueda, descarga, guardado y orquestación es imposible de testear unitariamente.
  • Estado mutable compartido: self.aws_credentials, self.access_token, self.credentials_expiry son estado mutable que se modifica en múltiples métodos. Difícil razonar sobre el estado en un momento dado.
  • Acoplamiento alto: Para testear filter_granules_by_closest_tile() hay que instanciar toda la clase con credenciales de NASA.
  • Difícil de extender: Agregar un nuevo proveedor de datos (ej: Sentinel Hub) requiere modificar esta clase gigante.

Mejora propuesta:

Separar en clases con responsabilidad única:

# features/services/earthdata/auth.py
class EarthdataAuth:
"""Maneja autenticación con NASA Earthdata"""

def __init__(self, username: str, password: str, redis_client=None):
self.username = username
self.password = password
self.token_manager = EarthdataTokenManager(username, password)
self._access_token = None
self._aws_credentials = None
self._credentials_expiry = None

@property
def access_token(self) -> str:
if not self._access_token:
self._access_token = self.token_manager.get_valid_token()
return self._access_token

@property
def aws_credentials(self) -> dict:
if not self._aws_credentials or self._credentials_expired():
self._aws_credentials = self._fetch_aws_credentials()
return self._aws_credentials

def _credentials_expired(self) -> bool:
if not self._credentials_expiry:
return True
return datetime.now() > self._credentials_expiry

def _fetch_aws_credentials(self) -> dict:
response = requests.get(
"https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
headers={"Authorization": f"Bearer {self.access_token}"}
)
response.raise_for_status()
creds = response.json()
self._credentials_expiry = self._parse_expiry(creds.get("expiration"))
return creds
# features/services/earthdata/catalog.py
class HLSCatalog:
"""Busca granulos HLS en el catálogo CMR de NASA"""

CMR_URL = "https://cmr.earthdata.nasa.gov/search/granules.json"
COLLECTION_IDS = ["C2021957657-LPCLOUD", "C2021957295-LPCLOUD"]

def search(self, bbox: tuple, start_date: str, end_date: str,
max_cloud_cover: int = 30) -> list:
"""Busca granulos HLS con paginación"""
all_granules = []
page_num = 1

while True:
params = {
"collection_concept_id": self.COLLECTION_IDS,
"temporal": f"{start_date}T00:00:00Z,{end_date}T23:59:59Z",
"bounding_box": ",".join(map(str, bbox)),
"cloud_cover": f"0,{max_cloud_cover}",
"page_size": 2000,
"page_num": page_num,
}
response = requests.get(self.CMR_URL, params=params)
response.raise_for_status()

granules = response.json().get("feed", {}).get("entry", [])
if not granules:
break

all_granules.extend(granules)
if len(granules) < 2000:
break
page_num += 1

return all_granules
# features/services/earthdata/tile_selector.py
class TileSelector:
"""Selecciona el tile más cercano al centroide del lote"""

def __init__(self, centroids_path: str = None):
self.centroids = self._load_centroids(centroids_path)

def filter_by_closest(self, granules: list, geometry) -> list:
"""Filtra granulos para usar solo el tile más cercano"""
if not granules:
return granules

centroid = geometry.centroid
unique_tiles = self._extract_tiles(granules)
closest = self._find_closest(unique_tiles, centroid.x, centroid.y)

return [g for g in granules if f".T{closest}." in g.get("title", "")]
# features/services/earthdata/downloader.py
class HLSDownloader:
"""Descarga y recorta bandas HLS"""

BAND_MAPPING_LANDSAT = {
"B01": "Aerosol", "B02": "Blue", "B03": "Green", "B04": "Red",
"B05": "NIR", "B06": "SWIR1", "B07": "SWIR2", "Fmask": "Fmask"
}
BAND_MAPPING_SENTINEL = {
"B01": "Aerosol", "B02": "Blue", "B03": "Green", "B04": "Red",
"B8A": "NIR", "B11": "SWIR1", "B12": "SWIR2", "Fmask": "Fmask"
}

def __init__(self, auth: EarthdataAuth):
self.auth = auth

def download_granule(self, granule: dict, geometry, output_dir: Path) -> dict:
"""Descarga todas las bandas de un granulo y las guarda como GeoTIFF"""
# ... implementación ...

def _extract_band(self, url: str, geometry) -> tuple:
"""Extrae y recorta una banda"""
# ... implementación usando self.auth.aws_credentials ...
# features/services/earthdata/processor.py
class EarthdataProcessor:
"""Orquesta el pipeline de descarga de datos HLS"""

MIN_GRANULES = {8: 6, 15: 14, 24: 22}

def __init__(self, username: str, password: str, output_dir: str = "geotiffs"):
self.auth = EarthdataAuth(username, password)
self.catalog = HLSCatalog()
self.tile_selector = TileSelector()
self.downloader = HLSDownloader(self.auth)
self.output_dir = Path(output_dir)

def process_lote(self, lote_data: dict, num_steps: int = 24) -> dict:
"""Procesa un lote: busca, filtra, descarga y guarda"""
geometry = shape(json.loads(lote_data["geojson"]))
bbox = geometry.bounds

# 1. Buscar granulos
granules = self.catalog.search(
bbox=bbox,
start_date=lote_data["fecha_siembra"],
end_date=lote_data["fecha_cosecha"],
max_cloud_cover=20
)

# 2. Filtrar por tile más cercano
granules = self.tile_selector.filter_by_closest(granules, geometry)

# 3. Validar cantidad mínima
min_required = self.MIN_GRANULES.get(num_steps, 6)
if len(granules) < min_required:
raise ValueError(f"Insuficientes granulos: {len(granules)} < {min_required}")

# 4. Descargar cada granulo
output_dir = self.output_dir / f"lote_{lote_data['id_lote']}"
for granule in granules:
self.downloader.download_granule(granule, geometry, output_dir)

return {"processed_granules": len(granules)}

Estimado:

MétricaAntesDespués
Líneas en build_dataset.py~600~100 (solo orquestador)
Archivos15 (auth, catalog, tile_selector, downloader, processor)
Responsabilidades por clase7+1
Testeable unitariamente
AcoplamientoAltoBajo

¿Por qué se propone?

El principio de Single Responsibility dice que una clase debe tener una sola razón para cambiar. EarthdataProcessor tiene al menos 7 razones: cambios en autenticación, en el catálogo CMR, en la selección de tiles, en el formato de descarga, en el guardado, etc. Separar en clases pequeñas permite:

  1. Testear unitariamente: TileSelector se puede testear sin credenciales de NASA.
  2. Reusar componentes: HLSCatalog se puede usar en otros proyectos.
  3. Extender fácilmente: Agregar SentinelHubDownloader no requiere tocar el código existente.
  4. Razonar sobre el código: Cada archivo tiene ~100 líneas y hace una cosa.

Mejora 16: Manejo de errores — try/except excesivos que ocultan errores a Sentry ✅ HECHO

Tipo: Observabilidad / Estabilidad
Severidad: 🔴 Crítica
Archivos afectados:

  • features/services/build_dataset.py (18 try/except, 16 con except Exception)
  • features/services/inferencia_completa.py (5 try/except)
  • features/services/earthdata_token_manager.py (6 try/except)
  • features/services/seleccionar_ciclo_fenologico.py (4 try/except, 2 bare except:)
  • features/services/clasificacion_suavizado_espacial.py (1 bare except: pass)
  • features/services/rasterizar_imagen_hsl.py (1 try/except)
  • features/interface/main.py (0 try/except — pero tampoco tiene protección)

Total: ~35 bloques try/except en el flujo principal, la mayoría innecesarios.


Problema

Sentry está configurado en run.py para capturar excepciones automáticamente. Pero los try/except distribuidos por todo el código atrapan las excepciones antes de que lleguen a Sentry, las loguean como texto genérico, y devuelven None/False. Resultado:

  1. Sentry no recibe los errores → no hay alertas, no hay stack traces, no hay contexto
  2. Los errores se propagan como None → el código que llama no sabe qué falló
  3. Mensajes genéricos"Error ejecutando build_dataset: <excepción>" no dice en qué paso, con qué datos, ni por qué
  4. Bare except: atrapa TODO → incluyendo KeyboardInterrupt, SystemExit, MemoryError

Ejemplo 1: Cadena de try/except que silencia errores

ANTES — inferencia_completa.py:

Cada función envuelve todo en try/except y devuelve False/None. El error original se pierde:

# inferencia_completa.py — ANTES
def ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
try:
# ... 20 líneas de lógica ...
result = processor.process_lote_earthdata(lote_input, num_steps=num_steps)
if result:
return True
else:
logger.error("Error procesando lote en build_dataset") # ← ¿qué error?
return False
except Exception as e:
logger.error(f"Error ejecutando build_dataset: {e}") # ← Sentry nunca ve esto
return False

def ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, num_steps):
try:
# ... lógica ...
return True
except Exception as e:
logger.error(f"Error ejecutando seleccionar_ciclo_fenologico.py: {e}") # ← Sentry nunca ve esto
return False

def ejecutar_clasificacion(uuid_detection, num_steps, fine_tuning, ...):
try:
# ... lógica ...
return stats
except Exception as e:
logger.error(f"Error ejecutando clasificacion_suavizado_espacial: {e}") # ← Sentry nunca ve esto
return None

def inferencia_completa(uuid_detection, poligono_wkt, ...):
try:
if not ejecutar_build_dataset(...): # ← ¿falló por timeout? ¿por auth? ¿por disco lleno?
logger.error("Fallo en descarga de imágenes") # ← mensaje inútil
return None
# ... más pasos ...
except Exception as e:
logger.error(f"Error en flujo de inferencia: {e}") # ← 4to nivel de catch, error ya mutilado
return None

El error viaja así: excepción original → except en process_lote_earthdata → devuelve Noneejecutar_build_dataset ve None → devuelve Falseinferencia_completa ve False → devuelve Nonemain.py ve None → escribe "Inference failed" en DB. Sentry no recibió nada.

DESPUÉS — Sin try/except innecesarios:

# inferencia_completa.py — DESPUÉS
def ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, num_steps):
from shapely import wkt
from shapely.geometry import mapping

geometry_wkt = wkt.loads(poligono_wkt)
geometry_geojson = mapping(geometry_wkt)

lote_input = {
'id_lote': uuid_detection,
'fecha_siembra': fecha_ini,
'fecha_cosecha': fecha_fin,
'cultivo': 'unknown',
'geojson': json.dumps(geometry_geojson)
}

if not EARTHDATA_USERNAME or not EARTHDATA_PASSWORD:
raise EnvironmentError("Faltan variables EARTHDATA_USERNAME / EARTHDATA_PASSWORD")

from build_dataset import EarthdataProcessor
processor = EarthdataProcessor(username=EARTHDATA_USERNAME, password=EARTHDATA_PASSWORD, output_dir="geotiff_output")
result = processor.process_lote_earthdata(lote_input, num_steps=num_steps)

if not result:
raise RuntimeError(f"build_dataset no produjo resultado para lote {uuid_detection}")
# Si falla → la excepción sube directo a Sentry con stack trace completo


def ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, num_steps):
from shapely import wkt
from shapely.geometry import mapping

geometry_wkt = wkt.loads(poligono_wkt)
geometry_geojson = mapping(geometry_wkt)

geojson_temp = {"type": "Feature", "properties": {}, "geometry": geometry_geojson}
geojson_path = f"temp_geojson_{uuid_detection}.geojson"
with open(geojson_path, 'w') as f:
json.dump(geojson_temp, f)

try:
from seleccionar_ciclo_fenologico import procesar_pixeles_desde_geotiff
procesar_pixeles_desde_geotiff(
f"geotiff_output/lote_{uuid_detection}",
geojson_path,
f"ciclos_fenologicos/lote_{uuid_detection}_{num_steps}steps",
num_steps
)
finally:
# try/finally JUSTIFICADO: limpiar archivo temporal pase lo que pase
if os.path.exists(geojson_path):
os.remove(geojson_path)


def ejecutar_clasificacion(uuid_detection, num_steps, fine_tuning, **kwargs):
model_suffix = '_ft' if fine_tuning else ''
# ... construir rutas ...

from clasificacion_suavizado_espacial import run_classification_with_smoothing
stats = run_classification_with_smoothing(...)

if stats is None:
raise RuntimeError(f"Clasificación no produjo resultados para {uuid_detection}")
return stats


def inferencia_completa(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, **kwargs):
# SIN try/except — las excepciones suben a Sentry con contexto completo
if not kwargs.get('skip_download'):
ejecutar_build_dataset(uuid_detection, poligono_wkt, fecha_ini, fecha_fin, kwargs.get('num_steps', 24))

if not kwargs.get('skip_ciclos'):
ejecutar_seleccionar_ciclo(uuid_detection, poligono_wkt, kwargs.get('num_steps', 24))

stats = ejecutar_clasificacion(uuid_detection, kwargs.get('num_steps', 24), kwargs.get('fine_tuning', False))

resultado = {"uuid_detection": uuid_detection, "resultados": stats, ...}
return resultado

Ejemplo 2: Bare except: que atrapa todo

ANTES — clasificacion_suavizado_espacial.py línea 333-337:

# clasificacion_suavizado_espacial.py — ANTES
try:
from utils_validacion import validar_modelo_existe
validar_modelo_existe(args.num_steps)
except: # ← atrapa ImportError, ValueError, MemoryError, KeyboardInterrupt, TODO
pass # ← silencio total

DESPUÉS:

# clasificacion_suavizado_espacial.py — DESPUÉS
# Si utils_validacion no está disponible, validar inline
try:
from utils_validacion import validar_modelo_existe
validar_modelo_existe(args.num_steps)
except ImportError:
# Fallback: validación manual si el módulo no está disponible
model_suffix = '_ft' if args.fine_tuning else ''
model_path = Path(f"features/models/best_model_{args.num_steps}steps{model_suffix}.pth")
if not model_path.exists():
raise FileNotFoundError(f"Modelo no encontrado: {model_path}")

Ejemplo 3: build_dataset.py — 18 try/except anidados

ANTES — build_dataset.py:

# build_dataset.py — ANTES (patrón repetido 18 veces)
def process_lote_earthdata(self, lote_data, num_steps=24):
try:
# ... 150 líneas ...
for i, granule in enumerate(granules):
try:
# ... 80 líneas ...
for link in http_links:
try:
# ... 40 líneas ...
pixel_data, meta = self.extract_pixel_data(http_url, geometry, use_s3=USE_S3_ACCESS)
except Exception as e:
logger.error(f"Error procesando banda {filename}: {e}") # ← Sentry no ve
continue
except Exception as e:
logger.error(f"Error procesando granulo {i}: {e}") # ← Sentry no ve
continue
except Exception as e:
logger.error(f"Error procesando lote {id_lote}: {e}") # ← Sentry no ve
return None

DESPUÉS — Solo try/except donde se espera fallo y se necesita continuar:

# build_dataset.py — DESPUÉS
def process_lote_earthdata(self, lote_data, num_steps=24):
id_lote = lote_data['id_lote']
geom_dict = json.loads(lote_data['geojson'])
geometry = shape(geom_dict)
# ... setup sin try/except, si falla es un bug real ...

granules = self.search_hls_granules(bbox, start_str, end_str, max_cloud_cover=20)
granules = self.filter_granules_by_closest_tile(granules, geometry)

if not granules:
raise ValueError(f"No se encontraron granulos HLS para lote {id_lote} en {start_str}/{end_str}")

processed_data = []
for i, granule in enumerate(granules):
granule_id = granule.get('title', f'granule_{i}')

# try/except JUSTIFICADO: un granulo malo no debe abortar todo el lote
try:
band_data = self._process_single_granule(granule, geometry)
except Exception as e:
logger.warning(f"Granulo {granule_id} falló, continuando: {type(e).__name__}: {e}")
continue

if band_data:
processed_data.append(band_data)

min_required = self.MIN_GRANULES.get(num_steps, 6)
if len(processed_data) < min_required:
raise ValueError(
f"Imágenes insuficientes para lote {id_lote}: "
f"{len(processed_data)}/{min_required} requeridas"
)
# ... resto sin try/except ...

Ejemplo 4: main.py — Sin protección, errores de DB no controlados

ANTES — features/interface/main.py:

# main.py — ANTES
def startCropDetection(id, uuid_detection, wkt, model, start_date):
# ... validaciones ...
update_db(uuid_detection=uuid_detection, status="processing")

result = inferencia_completa(...) # ← si esto lanza excepción, el status queda en "processing" para siempre

if result is None:
return failed_execution(uuid_detection, "Inference failed") # ← mensaje genérico

update_db(uuid_detection=uuid_detection, status="completed", results=result)
insert_detection_crops(...) # ← si esto falla, status es "completed" pero sin crops
return {"code": "ok", ...}

DESPUÉS — Un solo try/except en el entry point, con error descriptivo:

# main.py — DESPUÉS
def startCropDetection(id="", uuid_detection="", wkt="", model=24, start_date="", fine_tuning=True):
# ... validaciones (sin try/except, si fallan es un bug del caller) ...

update_db(uuid_detection=uuid_detection, status="processing")

try:
result = inferencia_completa(
uuid_detection=uuid_detection,
poligono_wkt=wkt,
fecha_ini=start_date,
fecha_fin=fecha_fin,
num_steps=model,
fine_tuning=fine_tuning
)
except Exception as e:
# try/except JUSTIFICADO: necesitamos actualizar DB a "error" antes de re-lanzar
error_msg = f"{type(e).__name__}: {e}"
failed_execution(uuid_detection, error_msg)
raise # ← re-lanza para que Sentry capture el stack trace completo

update_db(uuid_detection=uuid_detection, status="completed", results=result)
insert_detection_crops(uuid_detection=uuid_detection, resultados=result["resultados"])
return {"code": "ok", "ended_at": int(datetime.now().timestamp()), "data": result}

Ejemplo 5: rasterizar_imagen_hsl.py — Error silenciado en paso crítico

ANTES:

# rasterizar_imagen_hsl.py — ANTES
def rasterizar_imagen_utm(df, nombre_clasificacion):
try:
# ... 60 líneas de rasterización + upload a S3 ...
s3_eu.put_object(Bucket=s3_bucket, Key=s3_key, Body=memfile.read())
logger.info(f"Raster guardado en s3://...")
except Exception as e:
logger.error(f"Error al intentar rasterizar la imagen: {e}") # ← el tiff no se subió a S3 pero nadie se entera

DESPUÉS:

# rasterizar_imagen_hsl.py — DESPUÉS
def rasterizar_imagen_utm(df, nombre_clasificacion):
# Sin try/except — si falla la rasterización o el upload a S3,
# la excepción sube a main.py → Sentry la captura con stack trace
df = df.sort_values(by=['lat', 'lon'], ascending=[False, True]).reset_index(drop=True)
# ... toda la lógica sin envolver ...
s3_eu.put_object(Bucket=s3_bucket, Key=s3_key, Body=memfile.read())
logger.info(f"Raster guardado en s3://{s3_bucket}/{s3_key}")

Regla: Cuándo SÍ usar try/except

Situación¿try/except?Ejemplo
Un item de una lista falla pero el resto debe continuar✅ SíGranulo individual en loop de build_dataset.py
Limpiar recurso temporal pase lo que pasetry/finallyBorrar temp_geojson_*.geojson
Necesitar actualizar DB a "error" antes de propagar✅ Sí + raisemain.py entry point
Fallback cuando un módulo opcional no existeexcept ImportErrorutils_validacion no disponible
Parsear fecha con múltiples formatos posiblesexcept ValueErrorParseo de expiration_str en build_dataset.py
Función completa que "puede fallar"❌ Nosearch_hls_granules, extract_pixel_data, etc.
Guardar archivo / subir a S3❌ Norasterizar_imagen_utm, save_granule_geotiff
Cualquier lógica de negocio normal❌ NoProcesamiento de píxeles, clasificación, etc.

Excepciones personalizadas (opcional pero recomendado)

# features/exceptions.py
class CropDetectionError(Exception):
"""Base para errores del sistema de detección"""
pass

class EarthdataAuthError(CropDetectionError):
"""Fallo de autenticación con NASA Earthdata"""
pass

class InsufficientDataError(CropDetectionError):
"""No hay suficientes imágenes de calidad para procesar"""
pass

class ClassificationError(CropDetectionError):
"""Error durante la clasificación o post-procesamiento"""
pass

Uso:

# build_dataset.py
if not self.access_token:
raise EarthdataAuthError(f"No se pudo obtener token para usuario {self.username}")

if len(processed_data) < min_required:
raise InsufficientDataError(
f"Lote {id_lote}: {len(processed_data)}/{min_required} imágenes válidas "
f"(período {start_str} a {end_str}, cloud_cover<=20%)"
)

En Sentry se ve: InsufficientDataError: Lote abc123: 3/22 imágenes válidas (período 2024-01-15 a 2024-07-14, cloud_cover<=20%)inmediatamente sabés qué pasó.


Inventario de try/except a eliminar vs mantener

Archivotry/except actualesEliminarMantener (justificado)
build_dataset.py18144 (loop granulos, loop bandas, parseo fecha, fallback credenciales)
inferencia_completa.py541 (try/finally para cleanup temp file)
earthdata_token_manager.py624 (requests HTTP que pueden fallar, Redis que puede no estar)
seleccionar_ciclo_fenologico.py431 (píxel individual en loop)
clasificacion_suavizado_espacial.py110
rasterizar_imagen_hsl.py110
main.py00+1 (agregar try/except + raise para update DB)
Total3525~10

Estimación

  • Impacto en observabilidad: De ~0% de errores visibles en Sentry a ~100%
  • Impacto en debugging: De "Inference failed" a stack traces completos con contexto
  • Riesgo: Bajo — los errores que antes se silenciaban ahora se propagan, pero Sentry los captura y main.py actualiza la DB a "error" antes de re-lanzar

Mejora 17: Excepciones genéricas — usar excepciones custom estilo Aura

Problema: Los raise actuales usan excepciones genéricas de Python (RuntimeError, EnvironmentError, ValueError, FileNotFoundError). Esto dificulta:

  • Distinguir errores de negocio vs errores de infraestructura en Sentry
  • Filtrar/agrupar errores por tipo en dashboards
  • Tomar acciones diferentes según el tipo de error en main.py

Patrón actual (genérico):

# inferencia_completa.py
raise EnvironmentError("Se requieren variables de entorno EARTHDATA_USERNAME y EARTHDATA_PASSWORD")
raise RuntimeError(f"build_dataset no produjo resultado para lote {uuid_detection}")
raise RuntimeError(f"Clasificación no produjo resultados para {uuid_detection}")

# seleccionar_ciclo_fenologico.py
raise FileNotFoundError(f"No se encontraron archivos .tif en '{directorio_raster}'")

# clasificacion_suavizado_espacial.py
raise FileNotFoundError("No se encontraron archivos .npy válidos")

# db.py
raise ValueError(f"No se encontró detection con uuid_detection={uuid_detection}")

Patrón propuesto (estilo Aura):

Crear jerarquía de excepciones custom en core/errors.py (ya existe el archivo con ApiErr):

# core/errors.py — agregar al archivo existente

class CropDetectionError(Exception):
"""Base para todos los errores del sistema de detección de cultivos"""
code = 'CROP_DETECTION_ERROR'
msg = 'Error en detección de cultivos'

def __init__(self, msg=None, data=None, log_msg=''):
self.msg = msg or self.msg
self.data = data
self.log_msg = log_msg

def __str__(self):
return self.msg

def tostring(self):
return f"{self.__class__.__name__} Code: {self.code} Log_msg: {self.log_msg}"


class MissingCredentialsError(CropDetectionError):
code = 'MISSING_CREDENTIALS'
msg = 'Credenciales de Earthdata no configuradas'


class DataDownloadError(CropDetectionError):
code = 'DATA_DOWNLOAD_ERROR'
msg = 'Error descargando imágenes satelitales'


class NoGeoTIFFsError(CropDetectionError):
code = 'NO_GEOTIFFS'
msg = 'No se encontraron archivos GeoTIFF'


class NoCyclesError(CropDetectionError):
code = 'NO_CYCLES'
msg = 'No se encontraron ciclos fenológicos válidos'


class ClassificationError(CropDetectionError):
code = 'CLASSIFICATION_ERROR'
msg = 'Error en clasificación de cultivos'


class DetectionNotFoundError(CropDetectionError):
code = 'DETECTION_NOT_FOUND'
msg = 'Detección no encontrada en base de datos'

Después — uso en servicios:

# inferencia_completa.py
from core.errors import MissingCredentialsError, DataDownloadError, ClassificationError

raise MissingCredentialsError(log_msg="EARTHDATA_USERNAME o EARTHDATA_PASSWORD vacíos")
raise DataDownloadError(msg=f"build_dataset no produjo resultado para lote {uuid_detection}")
raise ClassificationError(msg=f"Clasificación no produjo resultados para {uuid_detection}")

# seleccionar_ciclo_fenologico.py
from core.errors import NoGeoTIFFsError

raise NoGeoTIFFsError(msg=f"No se encontraron archivos .tif en '{directorio_raster}'")

# clasificacion_suavizado_espacial.py
from core.errors import NoCyclesError

raise NoCyclesError(msg=f"No se encontraron archivos .npy válidos en '{npy_folder}'")

# db.py
from core.errors import DetectionNotFoundError

raise DetectionNotFoundError(msg=f"uuid_detection={uuid_detection}", log_msg="update_db")

Después — manejo diferenciado en main.py:

# features/interface/main.py
from core.errors import CropDetectionError, MissingCredentialsError

try:
result = inferencia_completa(...)
except MissingCredentialsError as e:
failed_execution(uuid_detection, f"CONFIG_ERROR: {e}")
raise
except CropDetectionError as e:
failed_execution(uuid_detection, f"{e.code}: {e}")
raise
except Exception as e:
failed_execution(uuid_detection, f"UNEXPECTED: {type(e).__name__}: {e}")
raise

Ventajas:

  • Consistente con el patrón de aura_exceptions.py usado en otros repos (credits, activities)
  • Sentry agrupa automáticamente por tipo de excepción (MissingCredentialsError vs DataDownloadError)
  • main.py puede tomar acciones diferentes (ej: reintentar DataDownloadError, no reintentar MissingCredentialsError)
  • El campo code permite filtrar en dashboards sin parsear strings
  • El campo data permite adjuntar contexto estructurado (ej: {"lote_id": "...", "geotiffs_found": 0})

Archivos a modificar:

  • core/errors.py — agregar clases de excepción
  • features/services/inferencia_completa.py — 3 raise
  • features/services/seleccionar_ciclo_fenologico.py — 1 raise
  • features/services/clasificacion_suavizado_espacial.py — 1 raise
  • features/services/db.py — 1 raise
  • features/interface/main.py — manejo diferenciado

Métricas:

  • Impacto en observabilidad: Errores agrupados por tipo en Sentry en vez de genéricos
  • Impacto en operaciones: Posibilidad de retry selectivo por tipo de error
  • Riesgo: Bajo — cambio de tipo de excepción, no de comportamiento

Resumen de Mejoras

Tabla comparativa: Antes vs Después

#MejoraMétricaAntes (medido)Después (estimado)
1Apertura excesiva de GeoTIFFsTiempo ciclos (disco local)797.85 seg3-6 seg
Llamadas rasterio.open()594,57525
% del tiempo total99.3%~50%
2Inferencia píxel por píxelTiempo clasificación5.58 seg~0.3-0.5 seg
Forward passes2,4145 (batches)
3Miles de archivos .npyArchivos creados2,4142
Espacio en disco3.39 MB (+9.7 MB inodos)~4 MB
4Print de credenciales AWSCredenciales expuestas por tarea~240 copias en logs0
5Credenciales en test.pyCredenciales en repo/DockerSí (hardcodeadas)No
6USE_S3_ACCESS string vs boolModo de acceso correctoNo ('False' = truthy)
7db.py carga JSON al importarRiesgo de crash al arrancar workerAlto (si falta JSON)Ninguno
8sys.path.insertArchivos con hack de path3 archivos0
9Archivos temporales sin cleanupAcumulación mensual180-750 GB0 GB
Costo storage mensual (EFS)$55-225$0
Archivos huérfanosNo
10Código muerto en build_dataset.pyLíneas muertas260 (43% del archivo)0
11settings.py con 70% no usadoVariables innecesarias~45 de 60~15
12Rutas hardcodeadasRutas duplicadas en código~15 en 5 archivos1 (config central)
13Typo hsl → hlsNombre incorrecto1 archivo + imports0
14No hay tests automatizadosCobertura de tests0%~60-70% (flujo crítico)
15God Object EarthdataProcessorResponsabilidades por clase10+ (God Object)1 por clase (5 clases)
16try/except que ocultan erroresErrores visibles en Sentry~0%~100%
17Excepciones genéricasTipos de error en Sentry1 (genérico)6 (específicos)
try/except innecesarios35~10 (justificados)
Detalle del error en DB"Inference failed"Stack trace + tipo
18f-strings + logs verbososf-strings en servicios~1060
Líneas de log por tarea~100~6
logging.basicConfig duplicados50

Resumen general: Antes vs Después

Performance (por tarea, lote típico 500 ha / 5,500 píxeles / 30 imágenes):

EtapaMedido (pre-refactor)Estimado (post-refactor)Speedup
Selección de ciclos fenológicos797.85 seg (13.30 min)3-6 seg~133-266x
Clasificación + suavizado5.58 seg~0.3-0.5 seg~11-19x
Total procesamiento803.43 seg (13.39 min)~4-7 seg~115-201x

Benchmark: 23,783 píxeles, 25 GeoTIFFs, modelo 24 steps, disco local en Docker. No incluye descarga de imágenes de NASA (build_dataset).

Costos mensuales (estimado ~300 tareas/mes, basado en benchmark real):

ConceptoAntes (calculado)Después (estimado)Ahorro
ECS compute por tarea (solo procesamiento)~$0.016 (803s × $0.07/3600)~$0.0001 (5s × $0.07/3600)~$0.016/tarea
ECS compute mensual (300 tareas)~$4.80~$0.03~$4.77/mes
EFS storage (archivos temporales sin cleanup)$55-225$0~$55-225/mes
Total costos evitables~$60-230/mes~$0.03/mes~$60-230/mes

Seguridad:

VulnerabilidadAntesDespués
Credenciales AWS en logs/Sentry~240 copias por tarea0
Credenciales hardcodeadas en repoSí (test.py en Git + Docker)No
Account ID AWS expuestoNo
USE_S3_ACCESS siempre truthySí (bug)Corregido

Observabilidad:

MétricaAntesDespués
Errores capturados por Sentry~0%~100%
Detalle del error en DB"Inference failed""InsufficientDataError: Lote abc: 3/22 imágenes"
try/except silenciando errores35 bloques~10 (justificados)
Bare except: (atrapa todo)60
Tiempo promedio de debuggingHoras (sin contexto)Minutos (stack trace completo)

Mantenibilidad:

MétricaAntesDespués
Líneas de código muerto~2600
Variables de config no usadas~450
Archivos con sys.path.insert30
Rutas hardcodeadas duplicadas~151 (config central)
Cobertura de tests0%~60-70%
Responsabilidades de EarthdataProcessor10+1 por clase

Orden sugerido de implementación:

Fase 1 — Quick wins de seguridad: ✅ COMPLETADA

  • Mejora 4: Eliminar print de credenciales
  • Mejora 5: Eliminar credenciales hardcodeadas de test.py
  • Mejora 6: Corregir USE_S3_ACCESS

Fase 2 — Estabilidad y observabilidad: ✅ COMPLETADA

  • Mejora 7: Carga lazy de label_mapping
  • Mejora 8: Eliminar sys.path.insert, crear init.py
  • Mejora 16: Eliminar try/except innecesarios, agregar excepciones descriptivas

Fase 2b — Excepciones custom: ✅ COMPLETADA

  • Mejora 17: Reemplazar raise genéricos con excepciones custom estilo Aura

Fase 3 — Performance: ✅ COMPLETADA

  • Mejora 1: Lectura de GeoTIFFs en memoria
  • Mejora 2: Batch inference
  • Mejora 3: Archivo .npz único

Fase 4 — Mantenibilidad: ✅ COMPLETADA

  • Mejora 9: Context manager para cleanup
  • Mejora 10: Eliminar código muerto
  • Mejora 11: Limpiar settings.py
  • Mejora 12: Centralizar rutas
  • Mejora 13: Renombrar hsl → hls

Fase 5 — Calidad a largo plazo:

  • Mejora 14: Agregar tests
  • Mejora 15: Separar EarthdataProcessor

Mejora 18: Eliminar f-strings y limpiar logs verbosos ✅ HECHO

Archivos afectados: Todos los archivos de features/services/, features/interface/main.py, features/services/paths.py

Problema 1: f-strings limitan compatibilidad

f-strings requieren Python 3.6+. Aunque el proyecto usa 3.12, el patrón "{}".format() y logger.info("...", arg) son más portables y consistentes con el estilo del resto del codebase (sales-backend).

ANTES:

logger.info(f"Procesando píxel {i+1}/{total_pixeles}")
raise ApiErr(ErrorCodes.X.get("id"), f"{ErrorCodes.X.get('msg')}{uuid}")
path = MODELS_DIR / f"best_model_{num_steps}steps{suffix}.pth"

DESPUÉS:

logger.info("Procesando píxel %d/%d", i+1, total_pixeles)
raise ApiErr(ErrorCodes.X.get("id"), "{} — {}".format(ErrorCodes.X.get('msg'), uuid))
path = MODELS_DIR / "best_model_{}steps{}.pth".format(num_steps, suffix)

Problema 2: Logs excesivamente verbosos

El pipeline generaba ~100 líneas de log por tarea, la mayoría informativas sin valor en producción:

  • Progreso píxel por píxel ("Procesando píxel 501/23783" cada 500 píxeles)
  • Pasos intermedios ("Cargando modelo...", "Construyendo cubo...", "Aplicando suavizado...")
  • Confirmaciones redundantes ("✓ Completado", "✅ Generado", "✨ Clasificación completada")
  • Detalles de configuración ("Clases: 6, Mapeo: {...}")
  • logging.basicConfig() duplicado en 5 archivos

ANTES (output de una tarea):

Processando con 24 pasos temporales
Cargando 25 imágenes en memoria...
Procesando 23783 píxeles...
Procesando píxel 1/23783
Procesando píxel 501/23783
... (46 líneas más de progreso) ...
🎉 Procesamiento completado:
- Ciclos exitosos: 2414
- Sin siembra detectada: 21358
- Ciclo incompleto: 11
- Errores: 0
- Ciclos guardados en ciclos.npz: 2414
✓ Ciclos fenológicos generados exitosamente
Clases: 6, Mapeo: {'girasol': 0, 'maní': 1, ...}
Cargando modelo con 24 pasos temporales...
Cargando píxeles desde 'ciclos_fenologicos/...'...
Construyendo cubo de probabilidades...
Cubo construido: 134x190x6 (2414 píxeles válidos)
Aplicando suavizado espacial (kernel 3x3)...
Re-clasificando con umbral 0.4...
Aplicando post-procesamiento morfológico...
Generando resultados...
✨ Clasificación completada con suavizado espacial
Kernel: 3x3, Umbral: 0.4
Píxeles clase 99: 744
Raster guardado en s3://bucket/uuid.tif (Resolución: 30m, CRS: EPSG:32720)
✓ Clasificación completada exitosamente

DESPUÉS (output de una tarea):

Ciclos: 2414 exitosos, 21358 no_siembra, 11 incompletos, 0 errores
Clasificación: 2414 píxeles, 744 clase_99
Raster s3://bucket/uuid.tif (EPSG:32720)

Cambios realizados:

Archivof-strings eliminadasLogs eliminadosLogs que quedan
inferencia_completa.py78 info1 info (resumen final), warnings/errors
seleccionar_ciclo_fenologico.py128 info1 info (resumen ciclos)
clasificacion_suavizado_espacial.py1510 info1 info (resumen clasificación), warnings
build_dataset.py2512 info0 info, solo errors/warnings
rasterizar_imagen_hls.py301 info (confirmación S3)
db.py40errors solamente
generar_geojson.py42 infoerrors solamente
generar_json_inferencia.py87 infoerrors solamente
earthdata_token_manager.py128 infoerrors/warnings solamente
paths.py50sin logs
main.py201 error
detector_siembras.py90sin cambios (script debug)
Total~106~55~6 info + errors/warnings

También se eliminaron 5 logging.basicConfig() duplicados — la configuración de logging debe estar en un solo lugar (el entry point).

Criterio de filtrado:

Tipo de logDecisiónRazón
logger.error(...)✅ Mantener todosErrores reales para diagnóstico
logger.warning(...)✅ Mantener todosAdvertencias útiles
logger.info(resumen)✅ Mantener 1 por etapaTrazabilidad mínima del pipeline
logger.info(progreso)❌ EliminarRuido sin valor en producción
logger.info(paso intermedio)❌ EliminarRedundante, el código es autodocumentado
logger.info(confirmación)❌ EliminarSi no hay error, se asume éxito
logger.debug(...)❌ EliminarNo se usa en producción

Estimado:

MétricaAntesDespués
f-strings en servicios~1060
Líneas de log por tarea~100~6 (+ errors si hay)
logging.basicConfig() duplicados50
Compatibilidad Python3.6+ (f-strings)2.7+ (.format)

Datos del benchmark de referencia

Pre-refactor (2026-03-17):

{
"timestamp": "2026-03-17T15:14:25",
"version": "pre-refactor",
"params": {"uuid": "test-uuid-001", "model": 24, "start_date": "2026-01-10"},
"stats": {
"geotiffs": 25, "geotiffs_size_mb": 10.77,
"npy_files": 2414, "npy_size_mb": 3.39,
"pixeles_exitosos": 2414, "pixeles_total": 23783
},
"tiempos_seg": {"ciclos": 797.85, "clasificacion": 5.58},
"total_seg": 803.43
}

Post-refactor Fase 3 (2026-03-17):

{
"timestamp": "2026-03-17T17:11:32",
"version": "post-refactor-fase3",
"params": {"uuid": "test-uuid-001", "model": 24, "start_date": "2026-01-10"},
"stats": {
"geotiffs": 25, "geotiffs_size_mb": 10.77,
"npz_file": "ciclos.npz", "npz_size_mb": 1.38,
"pixeles_exitosos": 2414, "pixeles_total": 23783
},
"tiempos_seg": {"ciclos": 5.97, "clasificacion": 0.11},
"total_seg": 6.08,
"speedup": {
"ciclos": "133x (797.85s → 5.97s)",
"clasificacion": "50x (5.58s → 0.11s)",
"total": "132x (803.43s → 6.08s)"
},
"nota": "clasificacion falla en rasterizar_imagen_utm por S3 bucket=None (preexistente)"
}

Comando para reproducir:

docker run --rm --entrypoint /bin/bash --env-file .env \
-v ./features/test_manual:/var/async/features/test_manual \
-v ./geotiff_output:/var/async/geotiff_output \
-v ./ciclos_fenologicos:/var/async/ciclos_fenologicos \
-v ./resultados_clasificacion:/var/async/resultados_clasificacion \
-v ./benchmarks:/var/async/benchmarks \
deteccion-de-cultivos-worker:latest \
-c "cd /var/async && python3 features/test_manual/benchmark.py --skip-download 2>&1"