Saltar al contenido principal

ARQUITECTURA

Ver en Git


Arquitectura Completa — Detección de Cultivos

1. ¿Qué es este repo?

Un worker de Celery que recibe tareas asíncronas para clasificar cultivos agrícolas a partir de imágenes satelitales HLS (Harmonized Landsat Sentinel). No es una API REST, no expone endpoints HTTP. Solo consume mensajes de una cola Redis y ejecuta un pipeline de inferencia de machine learning.

Cultivos que clasifica: girasol (0), maní (1), maíz (2), pastura (3), soja (4), trigo (5), dudoso (99).


2. Infraestructura y Deploy

GitLab CI → Docker build → ECR (us-west-2) → ECS (cluster Oregon1)
  • Dockerfile: Ubuntu 24.04, instala Python 3 + dependencias, ejecuta run_celery.sh
  • run_celery.sh: Levanta un worker Celery escuchando la cola crop_detection
  • Broker/Backend: Redis (configurado via CELERY_BROKER env var)
  • Base de datos: PostgreSQL (dos pools: lectura y escritura)
  • Storage: S3 para guardar los GeoTIFF resultantes
  • Monitoreo: Sentry para logging y excepciones

3. Estructura de Carpetas

deteccion-de-cultivos/
├── run.py # Entry point: define Celery app y task
├── core/ # Infraestructura compartida (boilerplate Auravant)
│ ├── settings.py # Variables de entorno
│ ├── database_handler.py # Pool de conexiones PostgreSQL
│ ├── boto_handler.py # Cliente S3
│ ├── redis_handler.py # Cliente Redis
│ ├── logging_config.py # Configuración de logging
│ ├── errors.py # Excepciones custom
│ ├── models.py # Helpers de carpetas temporales (NO son modelos de DB)
│ ├── emails.py # Envío de emails SMTP (no se usa en el flujo principal)
│ ├── permissions.py # Validación de permisos via API externa (no se usa)
│ ├── aura_events.py # Logging de eventos a DB (no se usa)
│ └── constants.py # ROOT_PATH, TMP_FOLDER, formatos
├── features/
│ ├── interface/
│ │ └── main.py # Punto de entrada de negocio (startCropDetection)
│ ├── services/ # ⭐ Toda la lógica de negocio
│ │ ├── inferencia_completa.py # Orquestador del pipeline (3 pasos)
│ │ ├── build_dataset.py # Paso 1: Descarga imágenes de NASA Earthdata
│ │ ├── earthdata_token_manager.py # Gestión de tokens Earthdata con Redis
│ │ ├── seleccionar_ciclo_fenologico.py # Paso 2: Extrae ciclos fenológicos
│ │ ├── clasificacion_suavizado_espacial.py # Paso 3: Clasificación + suavizado
│ │ ├── simple_classifier_lstm.py # Definición del modelo LSTM (PyTorch)
│ │ ├── rasterizar_imagen_hsl.py # Genera GeoTIFF UTM y sube a S3
│ │ ├── db.py # Queries a PostgreSQL (update status, insert crops)
│ │ ├── generar_json_inferencia.py # Utilidad CLI (no se usa en el worker)
│ │ ├── generar_geojson.py # Utilidad CLI (no se usa en el worker)
│ │ └── detector_siembras.py # Utilidad de debug/visualización
│ ├── models/ # Pesos del modelo (.pth) y label mappings (.json)
│ ├── test_manual/
│ │ └── main.py # Script de test local sin DB
│ └── utils/ # Scripts auxiliares de análisis y conversión
└── docs/

4. Flujo Completo (de punta a punta)

4.1 Entrada de la tarea

Otro servicio → Redis (cola crop_detection) → Celery worker

run.py define la task de Celery:

@celery_app.task(name='crop_detection')
def crop_detection(id, uuid_detection, wkt, model, start_date):
result = startCropDetection(id, uuid_detection, wkt, model, start_date)
return result

Parámetros que recibe:

ParamTipoDescripción
idstrID interno de la detección
uuid_detectionstrUUID único del proceso
wktstrPolígono en formato WKT (ej: POLYGON((...)))
modelintPasos temporales: 8, 15 o 24
start_datestrFecha de siembra YYYY-MM-DD

4.2 Interface (features/interface/main.py)

startCropDetection hace:

  1. Valida parámetros (id, uuid, wkt, start_date, model ∈ 24)
  2. Calcula fecha_fin: si model=24 → siembra + 180 días (máx hoy), sino → hoy
  3. Actualiza DB → status = "processing"
  4. Llama a inferencia_completa(...) (el orquestador)
  5. Si OK → actualiza DB status="completed" + inserta cultivos detectados
  6. Si falla → actualiza DB status="pending" + error_message

4.3 Pipeline de Inferencia (3 pasos)

┌─────────────────────────────────────────────────────────────────┐
│ inferencia_completa.py │
│ │
│ Paso 1: ejecutar_build_dataset() │
│ └─ build_dataset.py → Descarga GeoTIFFs de NASA │
│ │
│ Paso 2: ejecutar_seleccionar_ciclo() │
│ └─ seleccionar_ciclo_fenologico.py → Genera .npy por píxel │
│ │
│ Paso 3: ejecutar_clasificacion() │
│ └─ clasificacion_suavizado_espacial.py → Clasifica + raster │
│ └─ rasterizar_imagen_hsl.py → Sube .tif a S3 │
│ │
│ Return: { uuid, resultados, archivos_generados } │
└─────────────────────────────────────────────────────────────────┘

Si cualquier paso falla, retorna None y el flujo se corta.


5. Services en Detalle ⭐

5.1 build_dataset.py — Descarga de imágenes satelitales

Clase principal: EarthdataProcessor

Responsabilidades:

  • Autenticación con NASA Earthdata (tokens Bearer)
  • Búsqueda de granulos HLS en el catálogo CMR
  • Descarga de bandas espectrales via S3 (con credenciales temporales)
  • Recorte de rasters al polígono del lote
  • Guardado de GeoTIFFs multi-banda

Flujo interno:

1. Recibe WKT → lo convierte a GeoJSON con shapely
2. Calcula bounding box del polígono
3. Busca granulos HLS en CMR (API de NASA) con filtro de nubosidad ≤20%
4. Filtra por tile más cercano al centroide del lote (usa sentinel_grid_centroids.geojson)
5. Para cada granulo:
a. Descarga Fmask → verifica calidad ≥70% píxeles válidos
b. Si pasa → descarga todas las bandas (Red, Green, Blue, NIR, Aerosol, SWIR1, SWIR2, Fmask)
c. Recorta cada banda al polígono (rasterio.mask)
d. Guarda GeoTIFF multi-banda en geotiff_output/lote_{uuid}/
6. Valida mínimo de imágenes: {8: 6, 15: 14, 24: 22}

Acceso a datos — Dos modos:

  • S3 directo (controlado por USE_S3_ACCESS env var): Usa credenciales temporales de s3credentials endpoint
  • HTTP: Usa Bearer token via GDAL headers

Dependencia clave: EarthdataTokenManager para gestión de tokens.

Archivos generados:

geotiff_output/lote_{uuid}/
├── HLS.S30.T21HTB.2024100T134711.v2.0_2024-04-09.tif
├── HLS.L30.T21HTB.2024107T134711.v2.0_2024-04-16.tif
└── ... (N imágenes cronológicas, 8 bandas cada una)

5.2 earthdata_token_manager.py — Gestión de tokens

Clase: EarthdataTokenManager

Implementa el flujo documentado en docs/Playbook/EarthData.md:

1. Buscar token en Redis (key: "earthdata:access_token")
2. Si no hay → listar tokens existentes en Earthdata API
3. Si hay uno válido → guardarlo en Redis y usarlo
4. Si hay 2 tokens (límite) → revocar el más antiguo
5. Solicitar nuevo token → guardarlo en Redis (TTL 30 días)

Detalle: Los tokens de Earthdata duran 30 días. Máximo 2 simultáneos por cuenta.

5.3 seleccionar_ciclo_fenologico.py — Extracción de ciclos

Función principal: procesar_pixeles_desde_geotiff()

Responsabilidades:

  • Leer GeoTIFFs cronológicamente
  • Identificar píxeles dentro del polígono (rasterización de geometría)
  • Para cada píxel: detectar siembra y extraer ciclo fenológico
  • Guardar arrays .npy por píxel

Flujo por píxel:

1. Extraer serie temporal del píxel de todos los GeoTIFFs → array (N_imagenes, 8_bandas)
2. Calcular NDVI = (NIR - Red) / (NIR + Red)
3. Detectar inicio de siembra:
- Suavizar NDVI con media móvil (ventana=3)
- Buscar picos (NDVI > 0.7)
- Para cada pico, buscar hacia atrás un punto con NDVI < 0.35
- Verificar crecimiento sostenido (tendencia > 0.01, magnitud > 0.2)
4. Extraer N pasos desde siembra
5. Filtrar por Fmask (valores válidos: 0, 64, 128, 192)
6. Interpolar a exactamente N pasos temporales
7. Seleccionar bandas finales: [Blue, Green, NIR, Red, SWIR1, SWIR2]
8. Normalizar /10000
9. Calcular NDVI final → agregar como 7ma columna
10. Guardar .npy de shape (N, 7)

Explicación detallada del flujo por píxel:

Paso 1 — Extraer serie temporal

Para cada píxel identificado dentro del polígono, se itera sobre todos los archivos GeoTIFF ordenados cronológicamente. Usando rasterio.Window, se lee únicamente la celda correspondiente a ese píxel (fila, columna) de cada imagen. El resultado es un array de shape (N_imagenes, 8) donde las 8 bandas son: Red, Green, Blue, NIR, Aerosol, SWIR1, SWIR2, Fmask (en ese orden según BAND_MAP).

Paso 2 — Calcular NDVI

El NDVI (Normalized Difference Vegetation Index) es el indicador principal para detectar vegetación activa. Se calcula como:

NDVI = (NIR - Red) / (NIR + Red)

Valores típicos:

  • -1 a 0: Agua, suelo desnudo, nubes
  • 0 a 0.3: Suelo con poca vegetación
  • 0.3 a 0.6: Vegetación moderada
  • 0.6 a 1: Vegetación densa y saludable

En este paso se calcula sobre la serie temporal completa (sin filtrar por calidad) para tener la curva fenológica completa y poder detectar el patrón de siembra.

Paso 3 — Detectar inicio de siembra

Este es el paso más crítico. El algoritmo busca el patrón característico de un cultivo:

  1. Suavizado: Se aplica media móvil con ventana de 3 para reducir ruido de la señal NDVI.

  2. Búsqueda de picos: Usando scipy.signal.find_peaks, se identifican máximos locales donde NDVI > 0.7 (vegetación densa). El parámetro distance=5 evita detectar picos muy cercanos.

  3. Búsqueda hacia atrás: Para cada pico encontrado, se busca hacia atrás (hasta 10 timesteps) un punto donde NDVI < 0.35. Este punto representa el suelo "vacío" antes de la siembra.

  4. Validación de crecimiento: Se verifica que entre el punto de siembra y el pico:

    • La tendencia (pendiente de regresión lineal) sea > 0.01 (crecimiento sostenido)
    • La magnitud del crecimiento (pico - inicio) sea > 0.2 (cambio significativo)
    • El período de crecimiento tenga al menos min_growth_period (4) timesteps

Si no se encuentra ningún patrón válido, el píxel se marca como No_Siembra.

Paso 4 — Extraer N pasos desde siembra

Una vez identificado el índice de inicio de siembra (planting_idx), se extrae un slice del array desde ese punto: array_bruto[planting_idx:]. Si hay más de N pasos disponibles, se toman solo los primeros N. Si hay menos, se procesa lo que haya y luego se aplica padding.

Paso 5 — Filtrar por Fmask

Fmask es la banda de calidad que indica el estado de cada píxel en la imagen original. Los valores válidos son:

  • 0: Píxel limpio
  • 64: Agua
  • 128: Sombra de nube
  • 192: Nieve/hielo

Cualquier otro valor (nubes, aerosoles, etc.) se descarta. Se crea una máscara booleana y se filtran solo los timesteps con datos de calidad. Si quedan menos de 2 observaciones válidas, el píxel se marca como Ciclo_Incompleto.

Paso 6 — Interpolar a N pasos

Después del filtrado por Fmask, la serie temporal tiene "huecos" (timesteps descartados). Para que el modelo LSTM reciba siempre exactamente N pasos, se interpola linealmente:

eje_tiempo_nuevo = np.linspace(indices_validos.min(), indices_validos.max(), num_steps)
for cada_banda:
banda_interpolada = np.interp(eje_tiempo_nuevo, indices_validos, valores_banda)

Esto genera una serie temporal regular de exactamente N puntos, preservando la forma general de la curva fenológica.

Paso 7 — Seleccionar bandas finales

De las 8 bandas originales, se seleccionan solo 6 para el modelo:

  • Blue (índice 2 en original → 0 en final)
  • Green (índice 1 → 1)
  • NIR (índice 3 → 2)
  • Red (índice 0 → 3)
  • SWIR1 (índice 5 → 4)
  • SWIR2 (índice 6 → 5)

Se descartan Aerosol y Fmask (ya cumplieron su función de filtrado).

Paso 8 — Normalizar /10000

Los valores de reflectancia en HLS vienen escalados por 10000 (enteros). Se dividen para obtener valores en rango [0, 1]:

array_final = array_final / 10000.0

Esto es crítico para que el modelo funcione correctamente, ya que fue entrenado con datos normalizados.

Paso 9 — Calcular NDVI final

Se recalcula el NDVI sobre los datos ya filtrados, interpolados y normalizados:

nir_final = array_final[:, 2]  # NIR está en índice 2
red_final = array_final[:, 3] # Red está en índice 3
ndvi_final = (nir_final - red_final) / (nir_final + red_final)

Este NDVI "limpio" se agrega como 7ma columna, quedando el array con shape (N, 7).

Paso 10 — Guardar .npy

El array final se guarda en formato NumPy binario:

np.save(f"{directorio_salida}/pixel_{i}_ciclo.npy", array_final)

Si el ciclo quedó incompleto (menos de N pasos después de todo el procesamiento), se aplica padding repitiendo el último valor válido hasta completar N pasos. El status se marca como Ciclo_Incompleto pero igual se guarda el archivo.

Status posibles por píxel:

  • Exitoso: Ciclo completo detectado
  • No_Siembra: No se detectó patrón de siembra
  • Ciclo_Incompleto: Siembra detectada pero datos insuficientes (se aplica padding)
  • Error: Excepción durante procesamiento

Archivos generados:

ciclos_fenologicos/lote_{uuid}_{N}steps/
├── pixel_1_ciclo.npy # shape (N, 7)
├── pixel_2_ciclo.npy
├── ...
└── ciclos_fenologicos_metadata.json # coordenadas, status por píxel

5.4 simple_classifier_lstm.py — Modelo de ML

Clase: TemporalClassifierOnly

Arquitectura:

Input (batch, N_steps, 7_bandas)


SpectralEncoder: Linear(7→32) → ReLU → BatchNorm → Dropout(0.4)
│ (se aplica por timestep, reshape batch*N → batch, N, 32)

TemporalEncoder: LSTM(input=32, hidden=8, layers=2, dropout=0.4)
│ output: (batch, N_steps, 8)

Attention: Scaled additive attention → context vector (batch, 8)


Classifier: Linear(8→8) → ReLU → Dropout(0.4) → Linear(8→6_clases)


Output: logits (batch, 6)

Nota: El README dice "bidireccional" pero el código tiene bidirectional comentado. Es unidireccional.

Data augmentation (solo entrenamiento): TimeWarp + Drift + Reverse (via tsaug).

5.5 clasificacion_suavizado_espacial.py — Clasificación + post-procesamiento

Función principal: run_classification_with_smoothing()

Flujo completo:

1. Cargar label_mapping.json → determinar num_classes (6)
2. Cargar modelo .pth
3. Cargar píxeles (.npy) + metadata (coordenadas fila/columna)
4. Construir cubo de probabilidades:
- Para cada píxel: forward pass → softmax → vector de 6 probabilidades
- Organizar en tensor (Alto × Ancho × 6) según posición en grid
5. Suavizado espacial:
- Filtro de media (uniform_filter) por cada canal de clase
- Normalizar por cantidad de vecinos válidos
- Re-normalizar probabilidades (suma=1)
6. Re-clasificación con umbral:
- clase = argmax(probabilidades_suavizadas)
- Si max_prob < threshold → clase = 99 (dudoso)
7. Post-procesamiento morfológico:
- Por cada clase: binary_closing (rellena huecos) → binary_opening (elimina islas)
8. Generar DataFrame de resultados (id_pixel, lon, lat, clase, confianza)
9. Calcular estadísticas por cultivo (superficie_ha, confianza_promedio)
10. Rasterizar → subir a S3

Explicación detallada del flujo de clasificación:

Paso 1 — Cargar label_mapping.json

El archivo JSON contiene el mapeo nombre → índice de clase:

{"girasol": 0, "maní": 1, "maíz": 2, "pastura": 3, "soja": 4, "trigo": 5}

Se usa para:

  • Determinar num_classes = 6 (cantidad de neuronas en la capa de salida)
  • Traducir índices numéricos a nombres de cultivo en los resultados finales
  • El sufijo _ft indica modelos con fine-tuning adicional

Paso 2 — Cargar modelo .pth

Se instancia la arquitectura TemporalClassifierOnly con los hiperparámetros correctos:

model = TemporalClassifierOnly(
num_timestamps=num_steps, # 8, 15 o 24
num_bands=7, # Blue, Green, NIR, Red, SWIR1, SWIR2, NDVI
num_classes=6,
lstm_hidden_size=8,
lstm_layers=2,
lstm_dropout=0.4,
spectral_features=32,
dropout_rate=0.4
)
model.load_state_dict(torch.load(model_path))
model.eval() # Modo evaluación (desactiva dropout)

Paso 3 — Cargar píxeles + metadata

Se lee el archivo ciclos_fenologicos_metadata.json que contiene por cada píxel:

  • pixel_id: Identificador único
  • fila, columna: Posición en el grid raster original
  • latitud, longitud: Coordenadas geográficas WGS84
  • status: Exitoso, No_Siembra, Ciclo_Incompleto
  • archivo_ciclo: Nombre del .npy correspondiente

Solo se cargan los píxeles que tienen archivo .npy asociado (status Exitoso o Ciclo_Incompleto).

Paso 4 — Construir cubo de probabilidades

Este es el paso clave que permite el suavizado espacial:

  1. Determinar dimensiones del grid: Se calculan los rangos de filas y columnas de todos los píxeles para crear una matriz 2D que represente el espacio geográfico.

  2. Inicializar estructuras:

    • prob_cube: Tensor de shape (Alto, Ancho, 6) inicializado en ceros
    • pixel_mask: Matriz booleana (Alto, Ancho) que indica qué celdas tienen datos
    • pixel_map: Diccionario {(fila, col): pixel_info} para recuperar metadata después
  3. Clasificar cada píxel:

    with torch.no_grad():
    for pixel in pixel_data:
    # Cargar datos temporales
    temporal_data = np.load(pixel['path']) # shape (N, 7)
    temporal_tensor = torch.from_numpy(temporal_data).float().unsqueeze(0) # (1, N, 7)

    # Forward pass
    output = model(temporal_tensor) # logits (1, 6)
    probs = torch.softmax(output, dim=1).squeeze(0).numpy() # (6,)

    # Ubicar en el cubo según posición espacial
    fila_idx = pixel['fila'] - min_fila
    col_idx = pixel['columna'] - min_col
    prob_cube[fila_idx, col_idx, :] = probs
    pixel_mask[fila_idx, col_idx] = True

El resultado es un "cubo" donde cada celda (x, y) contiene un vector de 6 probabilidades, y las celdas sin datos quedan en cero.

Paso 5 — Suavizado espacial

El objetivo es reducir el ruido "sal y pimienta" (píxeles aislados de una clase rodeados de otra) aprovechando la coherencia espacial de los cultivos (un campo de soja no tiene píxeles aleatorios de maíz en el medio).

  1. Filtro de media por clase: Para cada una de las 6 clases, se aplica uniform_filter con kernel de tamaño configurable (3×3, 5×5 o 7×7):

    for c in range(num_classes):
    smoothed_channel = uniform_filter(prob_cube[:, :, c], size=kernel_size, mode='constant')

    Esto promedia las probabilidades de cada clase con sus vecinos.

  2. Normalizar por vecinos válidos: Como no todas las celdas tienen datos (el polígono no es rectangular), se divide por la cantidad real de vecinos:

    neighbor_count = uniform_filter(pixel_mask.astype(float), size=kernel_size)
    smoothed_cube[:, :, c] = smoothed_channel / neighbor_count
  3. Re-normalizar probabilidades: Después del suavizado, la suma de probabilidades por píxel puede no ser 1. Se normaliza:

    prob_sum = smoothed_cube.sum(axis=2, keepdims=True)
    smoothed_cube = smoothed_cube / prob_sum

Efecto del kernel_size:

  • 3×3: Suavizado conservador, preserva detalles y bordes
  • 5×5: Suavizado moderado, buen balance
  • 7×7: Suavizado agresivo, máxima homogeneidad

Paso 6 — Re-clasificación con umbral

Con las probabilidades suavizadas, se asigna la clase final:

# Clase con mayor probabilidad
class_map = np.argmax(smoothed_cube, axis=2) # (Alto, Ancho)
confidence_map = np.max(smoothed_cube, axis=2) # (Alto, Ancho)

# Aplicar umbral de confianza
low_confidence = confidence_map < threshold # default 0.4
class_map[low_confidence] = 99 # Clase "dudoso"

# Marcar celdas sin datos
class_map[~pixel_mask] = -1

¿Por qué clase 99? Representa píxeles donde el modelo no tiene suficiente confianza. Puede deberse a:

  • Mezcla de cultivos en el píxel (borde entre campos)
  • Datos de mala calidad que pasaron los filtros
  • Cultivo no presente en el entrenamiento
  • Suelo desnudo o barbecho

Paso 7 — Post-procesamiento morfológico

Operaciones de morfología matemática para limpiar el mapa de clases:

  1. Binary closing (cierre): Dilata y luego erosiona. Efecto: rellena pequeños huecos dentro de regiones.

    closed = binary_closing(class_mask, structure=np.ones((3,3)))

    Ejemplo: Un píxel clase 99 rodeado de soja se convierte en soja.

  2. Binary opening (apertura): Erosiona y luego dilata. Efecto: elimina pequeñas islas aisladas.

    opened = binary_opening(closed, structure=np.ones((3,3)))

    Ejemplo: Un píxel de maíz aislado en medio de un campo de soja desaparece.

Se aplica por cada clase (excepto 99) con un elemento estructurante cuadrado de tamaño configurable (morphology_size).

Paso 8 — Generar DataFrame de resultados

Se recorre el pixel_map y se construye un DataFrame con:

results.append({
'id_pixel': pixel['id'],
'lon': pixel['lon'],
'lat': pixel['lat'],
'geometry': Point(pixel['lon'], pixel['lat']),
'clase_predicha': clase, # Con remapeo: 0→6 para girasol
'precision_predicha': confidence
})

Remapeo importante: CROP_ID_REMAP = {0: 6} cambia girasol de 0 a 6 porque algunos frontends usan 0 como nodata.

Paso 9 — Calcular estadísticas

Por cada cultivo detectado se calcula:

stats_by_crop[crop_name] = {
"crop_id": int(clase),
"superficie_ha": round(len(clase_data) * (30**2) / 10000, 2), # píxeles × 900m² / 10000
"confianza_promedio": round(clase_data['precision_predicha'].mean(), 4),
"cantidad_pixeles": len(clase_data)
}

Cada píxel representa 30m × 30m = 900m² = 0.09 hectáreas.

Paso 10 — Rasterizar y subir a S3

Se llama a rasterizar_imagen_utm() que:

  1. Transforma coordenadas WGS84 → UTM (zona automática según centroide)
  2. Crea un grid raster de 30m de resolución
  3. Asigna el valor de clase a cada celda
  4. Escribe un GeoTIFF COG (Cloud Optimized GeoTIFF) en memoria
  5. Sube directamente a S3 sin guardar en disco local
s3_eu.put_object(
Bucket=settings.AWS_S3_BUCKET_CROP_DETECTION,
Key=f"{uuid}.tif",
Body=memfile.read()
)

Remapeo importante: CROP_ID_REMAP = {0: 6} — El girasol (originalmente 0) se remapea a 6 para evitar conflictos con nodata en el frontend.

5.6 rasterizar_imagen_hsl.py — Generación de GeoTIFF y subida a S3

Función: rasterizar_imagen_utm()

Flujo:

1. Recibe DataFrame con (lon, lat, clase_predicha)
2. Calcula zona UTM automática según centroide
3. Transforma coordenadas WGS84 → UTM
4. Crea grid raster de 30m de resolución
5. Asigna clases a celdas del grid
6. Escribe GeoTIFF COG (Cloud Optimized) en memoria
7. Sube a S3: s3://{AWS_S3_BUCKET_CROP_DETECTION}/{uuid}.tif

No guarda archivo local, va directo a S3 via MemoryFile.

5.7 db.py — Persistencia en PostgreSQL

Dos funciones:

update_db(uuid_detection, status, error_message, results)

  • UPDATE en sales_adis_detections
  • Campos: process_status, error_message, results (JSON)

insert_detection_crops(uuid_detection, resultados)

  • Busca detection_id por uuid
  • INSERT en sales_adis_detection_crops por cada cultivo detectado
  • Campos: detection_id, crop_id, confidence, area
  • Usa label_mapping_24steps.json para mapear nombre → crop_id

Tablas involucradas:

  • sales_adis_detections — Estado del proceso
  • sales_adis_detection_crops — Resultados por cultivo

5.8 generar_json_inferencia.py y generar_geojson.py

Scripts CLI auxiliares. No se usan en el flujo del worker. Sirven para generar archivos de entrada manualmente.

5.9 detector_siembras.py

Script de debug/visualización. Grafica series NDVI con marcadores de siembra. La lógica real está en seleccionar_ciclo_fenologico.py.


6. Core en Detalle

6.1 settings.py

Carga todas las variables de entorno. Es un archivo heredado del boilerplate de Auravant — contiene muchas variables que no se usan en este servicio (FIWARE, PETERSON, NIDERA, ALBOR, etc.).

Variables relevantes para este servicio:

  • EARTHDATA_USERNAME, EARTHDATA_PASSWORD — Credenciales NASA
  • USE_S3_ACCESS — Modo de acceso a datos HLS
  • AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_S3_BUCKET_CROP_DETECTION — S3 para resultados
  • CELERY_BROKER — URL de Redis para Celery
  • PG_* — Conexión a PostgreSQL (lectura)
  • PG_*_W — Conexión a PostgreSQL (escritura)
  • REDIS_HOST, REDIS_PORT, REDIS_AUTH — Redis para cache de tokens
  • SENTRY_DSN — Monitoreo

6.2 database_handler.py

Pool de conexiones PostgreSQL con psycopg2. Dos pools:

  • Lectura (db_query) — Usado por db.py
  • Escritura (db_query_w) — Usado por aura_events.py (no relevante aquí)

Implementa reconexión automática (10 reintentos, 2s entre cada uno).

6.3 boto_handler.py

Cliente S3 global (s3_eu). Usado por rasterizar_imagen_hsl.py para subir el GeoTIFF.

6.4 redis_handler.py

Wrapper de Redis. Usado por EarthdataTokenManager para cachear tokens.

6.5 Módulos NO usados en el flujo principal

  • emails.py — SMTP, no se usa
  • permissions.py — Validación de permisos, no se usa
  • aura_events.py — Logging de eventos a DB, no se usa
  • models.py — Helpers de carpetas temporales, no se usa

7. Diagrama de Dependencias entre Services

run.py
└─ features/interface/main.py (startCropDetection)
├─ features/services/db.py
│ └─ core/database_handler.py
│ └─ features/models/label_mapping_24steps.json

└─ features/services/inferencia_completa.py

├─ [Paso 1] features/services/build_dataset.py
│ ├─ features/services/earthdata_token_manager.py
│ │ └─ core/redis_handler.py
│ ├─ features/utils/sentinel_grid_centroids.geojson
│ └─ core/settings.py (EARTHDATA_*, USE_S3_ACCESS)

├─ [Paso 2] features/services/seleccionar_ciclo_fenologico.py
│ └─ features/utils/utils_logging.py
│ └─ features/utils/utils_validacion.py

└─ [Paso 3] features/services/clasificacion_suavizado_espacial.py
├─ features/services/simple_classifier_lstm.py
├─ features/services/rasterizar_imagen_hsl.py
│ ├─ core/boto_handler.py (s3_eu)
│ └─ core/settings.py (AWS_S3_BUCKET_CROP_DETECTION)
└─ features/models/best_model_{N}steps[_ft].pth

8. Formato de Datos en cada Etapa

EtapaInputOutput
Task Celery(id, uuid, wkt, model, start_date){code, data}
build_datasetWKT + fechasGeoTIFFs multi-banda (8 bandas, 30m)
seleccionar_cicloGeoTIFFs + GeoJSON.npy por píxel (N×7) + metadata.json
clasificacion.npy + modelo .pthDataFrame + stats dict
rasterizarDataFrame (lon, lat, clase)GeoTIFF COG en S3
DB updatestats dictRows en sales_adis_detection_crops

Resultado final retornado:

{
"uuid_detection": "abc-123",
"fecha_creacion": "2024-01-15T10:30:00",
"periodo": { "fecha_inicio": "2024-01-15", "fecha_fin": "2024-07-14" },
"modelo": { "num_steps": 24, "fine_tuning": true },
"parametros_clasificacion": { "kernel_size": 3, "threshold": 0.4, "morphology_size": 3 },
"resultados": {
"soja": { "crop_id": 4, "superficie_ha": 150.5, "confianza_promedio": 0.87, "cantidad_pixeles": 1672 },
"maíz": { "crop_id": 2, "superficie_ha": 80.2, "confianza_promedio": 0.82, "cantidad_pixeles": 891 }
},
"archivos_generados": { "geotiff": "abc-123.tif" }
}

9. Archivos Temporales y Filesystem

El worker genera archivos en disco durante la ejecución:

geotiff_output/lote_{uuid}/          ← GeoTIFFs descargados (Paso 1)
ciclos_fenologicos/lote_{uuid}_{N}steps/ ← Arrays .npy (Paso 2)
resultados_clasificacion/{uuid}/ ← JSON de resultado (Paso 3)
temp_geojson_{uuid}.geojson ← GeoJSON temporal (se borra)

Problema: No hay limpieza automática de estos archivos después de la ejecución. Se acumulan en disco.


10. Problemas y Oportunidades de Refactor

Arquitectura

  • core/ es boilerplate heredado: El 70% de settings.py son variables que no se usan. emails.py, permissions.py, aura_events.py, models.py no participan del flujo.
  • sys.path.insert por todos lados: inferencia_completa.py, clasificacion_suavizado_espacial.py, seleccionar_ciclo_fenologico.py manipulan sys.path para importar módulos. Indica que la estructura de paquetes no está bien definida.
  • Imports relativos rotos: Los services se importan entre sí con from build_dataset import ... (sin prefijo de paquete), lo cual solo funciona porque se inyecta _services_dir en sys.path.

Services

  • build_dataset.py es un God Object: EarthdataProcessor tiene 500+ líneas, maneja autenticación, búsqueda, descarga, recorte, guardado, visualización y creación de datasets xarray. Debería separarse.
  • Archivos temporales sin cleanup: GeoTIFFs, .npy y JSONs se acumulan en disco sin limpieza.
  • Rutas hardcodeadas: "geotiff_output", "ciclos_fenologicos", "resultados_clasificacion" están hardcodeados en varios archivos.
  • Código muerto: show_granule_images(), save_lote_data(), create_xarray_dataset(), process_all_lotes() en build_dataset.py no se usan en el flujo del worker.
  • db.py carga label_mapping al importar: Lee el JSON a nivel de módulo. Si el archivo no existe, el import falla y rompe todo el worker.
  • rasterizar_imagen_hsl.py tiene typo en el nombre: Debería ser hls (Harmonized Landsat Sentinel), no hsl.

Seguridad

  • test.py tiene credenciales AWS hardcodeadas (access key, secret key, session token). Aunque son temporales, no debería estar en el repo.
  • USE_S3_ACCESS se lee como string del env pero se usa como booleano en comparaciones. Puede dar comportamiento inesperado.

Testing

  • No hay tests automatizados. Solo test_manual/main.py que es un script de ejecución manual.
  • features/utils/test_flujo_inferencia.py existe pero no lo revisé — probablemente otro test manual.

Performance

Problema 1: Apertura excesiva de archivos en seleccionar_ciclo_fenologico.py

Código actual:

for i, (fila, col) in enumerate(zip(filas, columnas)):  # Por cada píxel
serie_temporal = []
for ruta_raster in archivos_raster: # Por cada imagen
with rasterio.open(ruta_raster) as src: # ABRE EL ARCHIVO
datos = src.read(window=Window(col, fila, 1, 1)).squeeze()
serie_temporal.append(datos)

Impacto:

  • Para un lote típico: 1000 píxeles × 30 imágenes = 30,000 llamadas a rasterio.open()
  • Cada open() implica: syscall de apertura, lectura de headers TIFF, parsing de metadata, creación de objeto Python
  • Tiempo estimado: ~50-100ms por open en archivos remotos/NFS, ~5-10ms en SSD local
  • Total: 2.5 a 50 minutos solo en I/O de apertura de archivos

Solución propuesta:

# Abrir todos los archivos UNA vez
raster_handles = [rasterio.open(ruta) for ruta in archivos_raster]

try:
for i, (fila, col) in enumerate(zip(filas, columnas)):
serie_temporal = []
for src in raster_handles: # Reutilizar handles abiertos
datos = src.read(window=Window(col, fila, 1, 1)).squeeze()
serie_temporal.append(datos)
finally:
for src in raster_handles:
src.close()

Mejora esperada: De 30,000 opens a 30 opens = ~1000x menos syscalls de apertura

Solución óptima (lectura en bloque):

# Leer TODA la imagen una vez, luego indexar en memoria
for ruta_raster in archivos_raster:
with rasterio.open(ruta_raster) as src:
imagen_completa = src.read() # (8, H, W) en memoria
for i, (fila, col) in enumerate(zip(filas, columnas)):
datos = imagen_completa[:, fila, col] # Indexación NumPy, ~nanosegundos

Trade-off: Usa más RAM (una imagen HLS recortada ~10-50MB), pero elimina completamente el overhead de I/O por píxel.


Problema 2: Inferencia píxel por píxel en clasificacion_suavizado_espacial.py

Código actual:

with torch.no_grad():
for pixel in pixel_data: # Loop secuencial
temporal_data = np.load(pixel['path'])
temporal_tensor = torch.from_numpy(temporal_data).float().unsqueeze(0) # batch_size=1
output = model(temporal_tensor) # Forward pass con 1 muestra
probs = torch.softmax(output, dim=1).squeeze(0).numpy()

Impacto:

  • Para 1000 píxeles = 1000 forward passes individuales
  • Cada forward pass tiene overhead fijo: transferencia CPU↔GPU (si hay), sincronización, etc.
  • El modelo LSTM tiene batch_first=True, está diseñado para batches pero se usa con batch=1
  • PyTorch optimiza operaciones matriciales para batches grandes (SIMD, paralelismo)

Benchmark típico:

Batch sizeTiempo por muestraTotal 1000 muestras
1~5ms~5 segundos
32~0.3ms~0.3 segundos
128~0.1ms~0.1 segundos

Solución propuesta:

# Cargar todos los píxeles en un solo tensor
all_data = []
for pixel in pixel_data:
temporal_data = np.load(pixel['path'])
all_data.append(temporal_data)

# Stack en un batch único
batch_tensor = torch.from_numpy(np.stack(all_data)).float() # (N_pixels, 24, 7)

# UN solo forward pass
with torch.no_grad():
outputs = model(batch_tensor) # (N_pixels, 6)
all_probs = torch.softmax(outputs, dim=1).numpy() # (N_pixels, 6)

# Asignar al cubo
for i, pixel in enumerate(pixel_data):
fila_idx = pixel['fila'] - min_fila
col_idx = pixel['columna'] - min_col
prob_cube[fila_idx, col_idx, :] = all_probs[i]

Mejora esperada: De ~5 segundos a ~0.1 segundos = ~50x más rápido

Consideración de memoria: Si hay muchos píxeles (>10,000), procesar en mini-batches:

BATCH_SIZE = 512
for i in range(0, len(pixel_data), BATCH_SIZE):
batch = pixel_data[i:i+BATCH_SIZE]
# ... procesar batch

Problema 3: Carga de .npy uno por uno

Código actual:

for pixel in pixel_data:
temporal_data = np.load(pixel['path']) # Syscall por archivo

Impacto: 1000 píxeles = 1000 llamadas a np.load() = 1000 syscalls de lectura

Solución alternativa: Guardar todos los píxeles en un solo archivo:

# En seleccionar_ciclo_fenologico.py (al guardar)
np.savez_compressed(
f"{directorio_salida}/all_pixels.npz",
data=np.stack(all_pixel_arrays), # (N_pixels, 24, 7)
pixel_ids=pixel_ids
)

# En clasificacion_suavizado_espacial.py (al cargar)
loaded = np.load(f"{npy_folder}/all_pixels.npz")
all_data = loaded['data'] # UN solo load

Mejora esperada: De 1000 loads a 1 load = ~100x menos I/O