ARQUITECTURA
Arquitectura Completa — Detección de Cultivos
1. ¿Qué es este repo?
Un worker de Celery que recibe tareas asíncronas para clasificar cultivos agrícolas a partir de imágenes satelitales HLS (Harmonized Landsat Sentinel). No es una API REST, no expone endpoints HTTP. Solo consume mensajes de una cola Redis y ejecuta un pipeline de inferencia de machine learning.
Cultivos que clasifica: girasol (0), maní (1), maíz (2), pastura (3), soja (4), trigo (5), dudoso (99).
2. Infraestructura y Deploy
GitLab CI → Docker build → ECR (us-west-2) → ECS (cluster Oregon1)
- Dockerfile: Ubuntu 24.04, instala Python 3 + dependencias, ejecuta
run_celery.sh - run_celery.sh: Levanta un worker Celery escuchando la cola
crop_detection - Broker/Backend: Redis (configurado via
CELERY_BROKERenv var) - Base de datos: PostgreSQL (dos pools: lectura y escritura)
- Storage: S3 para guardar los GeoTIFF resultantes
- Monitoreo: Sentry para logging y excepciones
3. Estructura de Carpetas
deteccion-de-cultivos/
├── run.py # Entry point: define Celery app y task
├── core/ # Infraestructura compartida (boilerplate Auravant)
│ ├── settings.py # Variables de entorno
│ ├── database_handler.py # Pool de conexiones PostgreSQL
│ ├── boto_handler.py # Cliente S3
│ ├── redis_handler.py # Cliente Redis
│ ├── logging_config.py # Configuración de logging
│ ├── errors.py # Excepciones custom
│ ├── models.py # Helpers de carpetas temporales (NO son modelos de DB)
│ ├── emails.py # Envío de emails SMTP (no se usa en el flujo principal)
│ ├── permissions.py # Validación de permisos via API externa (no se usa)
│ ├── aura_events.py # Logging de eventos a DB (no se usa)
│ └── constants.py # ROOT_PATH, TMP_FOLDER, formatos
├── features/
│ ├── interface/
│ │ └── main.py # Punto de entrada de negocio (startCropDetection)
│ ├── services/ # ⭐ Toda la lógica de negocio
│ │ ├── inferencia_completa.py # Orquestador del pipeline (3 pasos)
│ │ ├── build_dataset.py # Paso 1: Descarga imágenes de NASA Earthdata
│ │ ├── earthdata_token_manager.py # Gestión de tokens Earthdata con Redis
│ │ ├── seleccionar_ciclo_fenologico.py # Paso 2: Extrae ciclos fenológicos
│ │ ├── clasificacion_suavizado_espacial.py # Paso 3: Clasificación + suavizado
│ │ ├── simple_classifier_lstm.py # Definición del modelo LSTM (PyTorch)
│ │ ├── rasterizar_imagen_hsl.py # Genera GeoTIFF UTM y sube a S3
│ │ ├── db.py # Queries a PostgreSQL (update status, insert crops)
│ │ ├── generar_json_inferencia.py # Utilidad CLI (no se usa en el worker)
│ │ ├── generar_geojson.py # Utilidad CLI (no se usa en el worker)
│ │ └── detector_siembras.py # Utilidad de debug/visualización
│ ├── models/ # Pesos del modelo (.pth) y label mappings (.json)
│ ├── test_manual/
│ │ └── main.py # Script de test local sin DB
│ └── utils/ # Scripts auxiliares de análisis y conversión
└── docs/
4. Flujo Completo (de punta a punta)
4.1 Entrada de la tarea
Otro servicio → Redis (cola crop_detection) → Celery worker
run.py define la task de Celery:
@celery_app.task(name='crop_detection')
def crop_detection(id, uuid_detection, wkt, model, start_date):
result = startCropDetection(id, uuid_detection, wkt, model, start_date)
return result
Parámetros que recibe:
| Param | Tipo | Descripción |
|---|---|---|
id | str | ID interno de la detección |
uuid_detection | str | UUID único del proceso |
wkt | str | Polígono en formato WKT (ej: POLYGON((...))) |
model | int | Pasos temporales: 8, 15 o 24 |
start_date | str | Fecha de siembra YYYY-MM-DD |
4.2 Interface (features/interface/main.py)
startCropDetection hace:
- Valida parámetros (id, uuid, wkt, start_date, model ∈ 24)
- Calcula fecha_fin: si model=24 → siembra + 180 días (máx hoy), sino → hoy
- Actualiza DB → status =
"processing" - Llama a
inferencia_completa(...)(el orquestador) - Si OK → actualiza DB status=
"completed"+ inserta cultivos detectados - Si falla → actualiza DB status=
"pending"+ error_message
4.3 Pipeline de Inferencia (3 pasos)
┌─────────────────────────────────────────────────────────────────┐
│ inferencia_completa.py │
│ │
│ Paso 1: ejecutar_build_dataset() │
│ └─ build_dataset.py → Descarga GeoTIFFs de NASA │
│ │
│ Paso 2: ejecutar_seleccionar_ciclo() │
│ └─ seleccionar_ciclo_fenologico.py → Genera .npy por píxel │
│ │
│ Paso 3: ejecutar_clasificacion() │
│ └─ clasificacion_suavizado_espacial.py → Clasifica + raster │
│ └─ rasterizar_imagen_hsl.py → Sube .tif a S3 │
│ │
│ Return: { uuid, resultados, archivos_generados } │
└─────────────────────────────────────────────────────────────────┘
Si cualquier paso falla, retorna None y el flujo se corta.
5. Services en Detalle ⭐
5.1 build_dataset.py — Descarga de imágenes satelitales
Clase principal: EarthdataProcessor
Responsabilidades:
- Autenticación con NASA Earthdata (tokens Bearer)
- Búsqueda de granulos HLS en el catálogo CMR
- Descarga de bandas espectrales via S3 (con credenciales temporales)
- Recorte de rasters al polígono del lote
- Guardado de GeoTIFFs multi-banda
Flujo interno:
1. Recibe WKT → lo convierte a GeoJSON con shapely
2. Calcula bounding box del polígono
3. Busca granulos HLS en CMR (API de NASA) con filtro de nubosidad ≤20%
4. Filtra por tile más cercano al centroide del lote (usa sentinel_grid_centroids.geojson)
5. Para cada granulo:
a. Descarga Fmask → verifica calidad ≥70% píxeles válidos
b. Si pasa → descarga todas las bandas (Red, Green, Blue, NIR, Aerosol, SWIR1, SWIR2, Fmask)
c. Recorta cada banda al polígono (rasterio.mask)
d. Guarda GeoTIFF multi-banda en geotiff_output/lote_{uuid}/
6. Valida mínimo de imágenes: {8: 6, 15: 14, 24: 22}
Acceso a datos — Dos modos:
- S3 directo (controlado por
USE_S3_ACCESSenv var): Usa credenciales temporales des3credentialsendpoint - HTTP: Usa Bearer token via GDAL headers
Dependencia clave: EarthdataTokenManager para gestión de tokens.
Archivos generados:
geotiff_output/lote_{uuid}/
├── HLS.S30.T21HTB.2024100T134711.v2.0_2024-04-09.tif
├── HLS.L30.T21HTB.2024107T134711.v2.0_2024-04-16.tif
└── ... (N imágenes cronológicas, 8 bandas cada una)
5.2 earthdata_token_manager.py — Gestión de tokens
Clase: EarthdataTokenManager
Implementa el flujo documentado en docs/Playbook/EarthData.md:
1. Buscar token en Redis (key: "earthdata:access_token")
2. Si no hay → listar tokens existentes en Earthdata API
3. Si hay uno válido → guardarlo en Redis y usarlo
4. Si hay 2 tokens (límite) → revocar el más antiguo
5. Solicitar nuevo token → guardarlo en Redis (TTL 30 días)
Detalle: Los tokens de Earthdata duran 30 días. Máximo 2 simultáneos por cuenta.
5.3 seleccionar_ciclo_fenologico.py — Extracción de ciclos
Función principal: procesar_pixeles_desde_geotiff()
Responsabilidades:
- Leer GeoTIFFs cronológicamente
- Identificar píxeles dentro del polígono (rasterización de geometría)
- Para cada píxel: detectar siembra y extraer ciclo fenológico
- Guardar arrays .npy por píxel
Flujo por píxel:
1. Extraer serie temporal del píxel de todos los GeoTIFFs → array (N_imagenes, 8_bandas)
2. Calcular NDVI = (NIR - Red) / (NIR + Red)
3. Detectar inicio de siembra:
- Suavizar NDVI con media móvil (ventana=3)
- Buscar picos (NDVI > 0.7)
- Para cada pico, buscar hacia atrás un punto con NDVI < 0.35
- Verificar crecimiento sostenido (tendencia > 0.01, magnitud > 0.2)
4. Extraer N pasos desde siembra
5. Filtrar por Fmask (valores válidos: 0, 64, 128, 192)
6. Interpolar a exactamente N pasos temporales
7. Seleccionar bandas finales: [Blue, Green, NIR, Red, SWIR1, SWIR2]
8. Normalizar /10000
9. Calcular NDVI final → agregar como 7ma columna
10. Guardar .npy de shape (N, 7)
Explicación detallada del flujo por píxel:
Paso 1 — Extraer serie temporal
Para cada píxel identificado dentro del polígono, se itera sobre todos los archivos GeoTIFF ordenados cronológicamente. Usando rasterio.Window, se lee únicamente la celda correspondiente a ese píxel (fila, columna) de cada imagen. El resultado es un array de shape (N_imagenes, 8) donde las 8 bandas son: Red, Green, Blue, NIR, Aerosol, SWIR1, SWIR2, Fmask (en ese orden según BAND_MAP).
Paso 2 — Calcular NDVI
El NDVI (Normalized Difference Vegetation Index) es el indicador principal para detectar vegetación activa. Se calcula como:
NDVI = (NIR - Red) / (NIR + Red)
Valores típicos:
-1 a 0: Agua, suelo desnudo, nubes0 a 0.3: Suelo con poca vegetación0.3 a 0.6: Vegetación moderada0.6 a 1: Vegetación densa y saludable
En este paso se calcula sobre la serie temporal completa (sin filtrar por calidad) para tener la curva fenológica completa y poder detectar el patrón de siembra.
Paso 3 — Detectar inicio de siembra
Este es el paso más crítico. El algoritmo busca el patrón característico de un cultivo:
-
Suavizado: Se aplica media móvil con ventana de 3 para reducir ruido de la señal NDVI.
-
Búsqueda de picos: Usando
scipy.signal.find_peaks, se identifican máximos locales donde NDVI > 0.7 (vegetación densa). El parámetrodistance=5evita detectar picos muy cercanos. -
Búsqueda hacia atrás: Para cada pico encontrado, se busca hacia atrás (hasta 10 timesteps) un punto donde NDVI < 0.35. Este punto representa el suelo "vacío" antes de la siembra.
-
Validación de crecimiento: Se verifica que entre el punto de siembra y el pico:
- La tendencia (pendiente de regresión lineal) sea > 0.01 (crecimiento sostenido)
- La magnitud del crecimiento (pico - inicio) sea > 0.2 (cambio significativo)
- El período de crecimiento tenga al menos
min_growth_period(4) timesteps
Si no se encuentra ningún patrón válido, el píxel se marca como No_Siembra.
Paso 4 — Extraer N pasos desde siembra
Una vez identificado el índice de inicio de siembra (planting_idx), se extrae un slice del array desde ese punto: array_bruto[planting_idx:]. Si hay más de N pasos disponibles, se toman solo los primeros N. Si hay menos, se procesa lo que haya y luego se aplica padding.
Paso 5 — Filtrar por Fmask
Fmask es la banda de calidad que indica el estado de cada píxel en la imagen original. Los valores válidos son:
0: Píxel limpio64: Agua128: Sombra de nube192: Nieve/hielo
Cualquier otro valor (nubes, aerosoles, etc.) se descarta. Se crea una máscara booleana y se filtran solo los timesteps con datos de calidad. Si quedan menos de 2 observaciones válidas, el píxel se marca como Ciclo_Incompleto.
Paso 6 — Interpolar a N pasos
Después del filtrado por Fmask, la serie temporal tiene "huecos" (timesteps descartados). Para que el modelo LSTM reciba siempre exactamente N pasos, se interpola linealmente:
eje_tiempo_nuevo = np.linspace(indices_validos.min(), indices_validos.max(), num_steps)
for cada_banda:
banda_interpolada = np.interp(eje_tiempo_nuevo, indices_validos, valores_banda)
Esto genera una serie temporal regular de exactamente N puntos, preservando la forma general de la curva fenológica.
Paso 7 — Seleccionar bandas finales
De las 8 bandas originales, se seleccionan solo 6 para el modelo:
- Blue (índice 2 en original → 0 en final)
- Green (índice 1 → 1)
- NIR (índice 3 → 2)
- Red (índice 0 → 3)
- SWIR1 (índice 5 → 4)
- SWIR2 (índice 6 → 5)
Se descartan Aerosol y Fmask (ya cumplieron su función de filtrado).
Paso 8 — Normalizar /10000
Los valores de reflectancia en HLS vienen escalados por 10000 (enteros). Se dividen para obtener valores en rango [0, 1]:
array_final = array_final / 10000.0
Esto es crítico para que el modelo funcione correctamente, ya que fue entrenado con datos normalizados.
Paso 9 — Calcular NDVI final
Se recalcula el NDVI sobre los datos ya filtrados, interpolados y normalizados:
nir_final = array_final[:, 2] # NIR está en índice 2
red_final = array_final[:, 3] # Red está en índice 3
ndvi_final = (nir_final - red_final) / (nir_final + red_final)
Este NDVI "limpio" se agrega como 7ma columna, quedando el array con shape (N, 7).
Paso 10 — Guardar .npy
El array final se guarda en formato NumPy binario:
np.save(f"{directorio_salida}/pixel_{i}_ciclo.npy", array_final)
Si el ciclo quedó incompleto (menos de N pasos después de todo el procesamiento), se aplica padding repitiendo el último valor válido hasta completar N pasos. El status se marca como Ciclo_Incompleto pero igual se guarda el archivo.
Status posibles por píxel:
Exitoso: Ciclo completo detectadoNo_Siembra: No se detectó patrón de siembraCiclo_Incompleto: Siembra detectada pero datos insuficientes (se aplica padding)Error: Excepción durante procesamiento
Archivos generados:
ciclos_fenologicos/lote_{uuid}_{N}steps/
├── pixel_1_ciclo.npy # shape (N, 7)
├── pixel_2_ciclo.npy
├── ...
└── ciclos_fenologicos_metadata.json # coordenadas, status por píxel
5.4 simple_classifier_lstm.py — Modelo de ML
Clase: TemporalClassifierOnly
Arquitectura:
Input (batch, N_steps, 7_bandas)
│
▼
SpectralEncoder: Linear(7→32) → ReLU → BatchNorm → Dropout(0.4)
│ (se aplica por timestep, reshape batch*N → batch, N, 32)
▼
TemporalEncoder: LSTM(input=32, hidden=8, layers=2, dropout=0.4)
│ output: (batch, N_steps, 8)
▼
Attention: Scaled additive attention → context vector (batch, 8)
│
▼
Classifier: Linear(8→8) → ReLU → Dropout(0.4) → Linear(8→6_clases)
│
▼
Output: logits (batch, 6)
Nota: El README dice "bidireccional" pero el código tiene bidirectional comentado. Es unidireccional.
Data augmentation (solo entrenamiento): TimeWarp + Drift + Reverse (via tsaug).
5.5 clasificacion_suavizado_espacial.py — Clasificación + post-procesamiento
Función principal: run_classification_with_smoothing()
Flujo completo:
1. Cargar label_mapping.json → determinar num_classes (6)
2. Cargar modelo .pth
3. Cargar píxeles (.npy) + metadata (coordenadas fila/columna)
4. Construir cubo de probabilidades:
- Para cada píxel: forward pass → softmax → vector de 6 probabilidades
- Organizar en tensor (Alto × Ancho × 6) según posición en grid
5. Suavizado espacial:
- Filtro de media (uniform_filter) por cada canal de clase
- Normalizar por cantidad de vecinos válidos
- Re-normalizar probabilidades (suma=1)
6. Re-clasificación con umbral:
- clase = argmax(probabilidades_suavizadas)
- Si max_prob < threshold → clase = 99 (dudoso)
7. Post-procesamiento morfológico:
- Por cada clase: binary_closing (rellena huecos) → binary_opening (elimina islas)
8. Generar DataFrame de resultados (id_pixel, lon, lat, clase, confianza)
9. Calcular estadísticas por cultivo (superficie_ha, confianza_promedio)
10. Rasterizar → subir a S3
Explicación detallada del flujo de clasificación:
Paso 1 — Cargar label_mapping.json
El archivo JSON contiene el mapeo nombre → índice de clase:
{"girasol": 0, "maní": 1, "maíz": 2, "pastura": 3, "soja": 4, "trigo": 5}
Se usa para:
- Determinar
num_classes = 6(cantidad de neuronas en la capa de salida) - Traducir índices numéricos a nombres de cultivo en los resultados finales
- El sufijo
_ftindica modelos con fine-tuning adicional
Paso 2 — Cargar modelo .pth
Se instancia la arquitectura TemporalClassifierOnly con los hiperparámetros correctos:
model = TemporalClassifierOnly(
num_timestamps=num_steps, # 8, 15 o 24
num_bands=7, # Blue, Green, NIR, Red, SWIR1, SWIR2, NDVI
num_classes=6,
lstm_hidden_size=8,
lstm_layers=2,
lstm_dropout=0.4,
spectral_features=32,
dropout_rate=0.4
)
model.load_state_dict(torch.load(model_path))
model.eval() # Modo evaluación (desactiva dropout)
Paso 3 — Cargar píxeles + metadata
Se lee el archivo ciclos_fenologicos_metadata.json que contiene por cada píxel:
pixel_id: Identificador únicofila,columna: Posición en el grid raster originallatitud,longitud: Coordenadas geográficas WGS84status: Exitoso, No_Siembra, Ciclo_Incompletoarchivo_ciclo: Nombre del .npy correspondiente
Solo se cargan los píxeles que tienen archivo .npy asociado (status Exitoso o Ciclo_Incompleto).
Paso 4 — Construir cubo de probabilidades
Este es el paso clave que permite el suavizado espacial:
-
Determinar dimensiones del grid: Se calculan los rangos de filas y columnas de todos los píxeles para crear una matriz 2D que represente el espacio geográfico.
-
Inicializar estructuras:
prob_cube: Tensor de shape(Alto, Ancho, 6)inicializado en cerospixel_mask: Matriz booleana(Alto, Ancho)que indica qué celdas tienen datospixel_map: Diccionario{(fila, col): pixel_info}para recuperar metadata después
-
Clasificar cada píxel:
with torch.no_grad():
for pixel in pixel_data:
# Cargar datos temporales
temporal_data = np.load(pixel['path']) # shape (N, 7)
temporal_tensor = torch.from_numpy(temporal_data).float().unsqueeze(0) # (1, N, 7)
# Forward pass
output = model(temporal_tensor) # logits (1, 6)
probs = torch.softmax(output, dim=1).squeeze(0).numpy() # (6,)
# Ubicar en el cubo según posición espacial
fila_idx = pixel['fila'] - min_fila
col_idx = pixel['columna'] - min_col
prob_cube[fila_idx, col_idx, :] = probs
pixel_mask[fila_idx, col_idx] = True
El resultado es un "cubo" donde cada celda (x, y) contiene un vector de 6 probabilidades, y las celdas sin datos quedan en cero.
Paso 5 — Suavizado espacial
El objetivo es reducir el ruido "sal y pimienta" (píxeles aislados de una clase rodeados de otra) aprovechando la coherencia espacial de los cultivos (un campo de soja no tiene píxeles aleatorios de maíz en el medio).
-
Filtro de media por clase: Para cada una de las 6 clases, se aplica
uniform_filtercon kernel de tamaño configurable (3×3, 5×5 o 7×7):for c in range(num_classes):
smoothed_channel = uniform_filter(prob_cube[:, :, c], size=kernel_size, mode='constant')Esto promedia las probabilidades de cada clase con sus vecinos.
-
Normalizar por vecinos válidos: Como no todas las celdas tienen datos (el polígono no es rectangular), se divide por la cantidad real de vecinos:
neighbor_count = uniform_filter(pixel_mask.astype(float), size=kernel_size)
smoothed_cube[:, :, c] = smoothed_channel / neighbor_count -
Re-normalizar probabilidades: Después del suavizado, la suma de probabilidades por píxel puede no ser 1. Se normaliza:
prob_sum = smoothed_cube.sum(axis=2, keepdims=True)
smoothed_cube = smoothed_cube / prob_sum
Efecto del kernel_size:
3×3: Suavizado conservador, preserva detalles y bordes5×5: Suavizado moderado, buen balance7×7: Suavizado agresivo, máxima homogeneidad
Paso 6 — Re-clasificación con umbral
Con las probabilidades suavizadas, se asigna la clase final:
# Clase con mayor probabilidad
class_map = np.argmax(smoothed_cube, axis=2) # (Alto, Ancho)
confidence_map = np.max(smoothed_cube, axis=2) # (Alto, Ancho)
# Aplicar umbral de confianza
low_confidence = confidence_map < threshold # default 0.4
class_map[low_confidence] = 99 # Clase "dudoso"
# Marcar celdas sin datos
class_map[~pixel_mask] = -1
¿Por qué clase 99? Representa píxeles donde el modelo no tiene suficiente confianza. Puede deberse a:
- Mezcla de cultivos en el píxel (borde entre campos)
- Datos de mala calidad que pasaron los filtros
- Cultivo no presente en el entrenamiento
- Suelo desnudo o barbecho
Paso 7 — Post-procesamiento morfológico
Operaciones de morfología matemática para limpiar el mapa de clases:
-
Binary closing (cierre): Dilata y luego erosiona. Efecto: rellena pequeños huecos dentro de regiones.
closed = binary_closing(class_mask, structure=np.ones((3,3)))Ejemplo: Un píxel clase 99 rodeado de soja se convierte en soja.
-
Binary opening (apertura): Erosiona y luego dilata. Efecto: elimina pequeñas islas aisladas.
opened = binary_opening(closed, structure=np.ones((3,3)))Ejemplo: Un píxel de maíz aislado en medio de un campo de soja desaparece.
Se aplica por cada clase (excepto 99) con un elemento estructurante cuadrado de tamaño configurable (morphology_size).
Paso 8 — Generar DataFrame de resultados
Se recorre el pixel_map y se construye un DataFrame con:
results.append({
'id_pixel': pixel['id'],
'lon': pixel['lon'],
'lat': pixel['lat'],
'geometry': Point(pixel['lon'], pixel['lat']),
'clase_predicha': clase, # Con remapeo: 0→6 para girasol
'precision_predicha': confidence
})
Remapeo importante: CROP_ID_REMAP = {0: 6} cambia girasol de 0 a 6 porque algunos frontends usan 0 como nodata.
Paso 9 — Calcular estadísticas
Por cada cultivo detectado se calcula:
stats_by_crop[crop_name] = {
"crop_id": int(clase),
"superficie_ha": round(len(clase_data) * (30**2) / 10000, 2), # píxeles × 900m² / 10000
"confianza_promedio": round(clase_data['precision_predicha'].mean(), 4),
"cantidad_pixeles": len(clase_data)
}
Cada píxel representa 30m × 30m = 900m² = 0.09 hectáreas.
Paso 10 — Rasterizar y subir a S3
Se llama a rasterizar_imagen_utm() que:
- Transforma coordenadas WGS84 → UTM (zona automática según centroide)
- Crea un grid raster de 30m de resolución
- Asigna el valor de clase a cada celda
- Escribe un GeoTIFF COG (Cloud Optimized GeoTIFF) en memoria
- Sube directamente a S3 sin guardar en disco local
s3_eu.put_object(
Bucket=settings.AWS_S3_BUCKET_CROP_DETECTION,
Key=f"{uuid}.tif",
Body=memfile.read()
)
Remapeo importante: CROP_ID_REMAP = {0: 6} — El girasol (originalmente 0) se remapea a 6 para evitar conflictos con nodata en el frontend.
5.6 rasterizar_imagen_hsl.py — Generación de GeoTIFF y subida a S3
Función: rasterizar_imagen_utm()
Flujo:
1. Recibe DataFrame con (lon, lat, clase_predicha)
2. Calcula zona UTM automática según centroide
3. Transforma coordenadas WGS84 → UTM
4. Crea grid raster de 30m de resolución
5. Asigna clases a celdas del grid
6. Escribe GeoTIFF COG (Cloud Optimized) en memoria
7. Sube a S3: s3://{AWS_S3_BUCKET_CROP_DETECTION}/{uuid}.tif
No guarda archivo local, va directo a S3 via MemoryFile.
5.7 db.py — Persistencia en PostgreSQL
Dos funciones:
update_db(uuid_detection, status, error_message, results)
- UPDATE en
sales_adis_detections - Campos:
process_status,error_message,results(JSON)
insert_detection_crops(uuid_detection, resultados)
- Busca
detection_idpor uuid - INSERT en
sales_adis_detection_cropspor cada cultivo detectado - Campos:
detection_id,crop_id,confidence,area - Usa
label_mapping_24steps.jsonpara mapear nombre → crop_id
Tablas involucradas:
sales_adis_detections— Estado del procesosales_adis_detection_crops— Resultados por cultivo
5.8 generar_json_inferencia.py y generar_geojson.py
Scripts CLI auxiliares. No se usan en el flujo del worker. Sirven para generar archivos de entrada manualmente.
5.9 detector_siembras.py
Script de debug/visualización. Grafica series NDVI con marcadores de siembra. La lógica real está en seleccionar_ciclo_fenologico.py.
6. Core en Detalle
6.1 settings.py
Carga todas las variables de entorno. Es un archivo heredado del boilerplate de Auravant — contiene muchas variables que no se usan en este servicio (FIWARE, PETERSON, NIDERA, ALBOR, etc.).
Variables relevantes para este servicio:
EARTHDATA_USERNAME,EARTHDATA_PASSWORD— Credenciales NASAUSE_S3_ACCESS— Modo de acceso a datos HLSAWS_ACCESS_KEY,AWS_SECRET_KEY,AWS_S3_BUCKET_CROP_DETECTION— S3 para resultadosCELERY_BROKER— URL de Redis para CeleryPG_*— Conexión a PostgreSQL (lectura)PG_*_W— Conexión a PostgreSQL (escritura)REDIS_HOST,REDIS_PORT,REDIS_AUTH— Redis para cache de tokensSENTRY_DSN— Monitoreo
6.2 database_handler.py
Pool de conexiones PostgreSQL con psycopg2. Dos pools:
- Lectura (
db_query) — Usado pordb.py - Escritura (
db_query_w) — Usado poraura_events.py(no relevante aquí)
Implementa reconexión automática (10 reintentos, 2s entre cada uno).
6.3 boto_handler.py
Cliente S3 global (s3_eu). Usado por rasterizar_imagen_hsl.py para subir el GeoTIFF.
6.4 redis_handler.py
Wrapper de Redis. Usado por EarthdataTokenManager para cachear tokens.
6.5 Módulos NO usados en el flujo principal
emails.py— SMTP, no se usapermissions.py— Validación de permisos, no se usaaura_events.py— Logging de eventos a DB, no se usamodels.py— Helpers de carpetas temporales, no se usa
7. Diagrama de Dependencias entre Services
run.py
└─ features/interface/main.py (startCropDetection)
├─ features/services/db.py
│ └─ core/database_handler.py
│ └─ features/models/label_mapping_24steps.json
│
└─ features/services/inferencia_completa.py
│
├─ [Paso 1] features/services/build_dataset.py
│ ├─ features/services/earthdata_token_manager.py
│ │ └─ core/redis_handler.py
│ ├─ features/utils/sentinel_grid_centroids.geojson
│ └─ core/settings.py (EARTHDATA_*, USE_S3_ACCESS)
│
├─ [Paso 2] features/services/seleccionar_ciclo_fenologico.py
│ └─ features/utils/utils_logging.py
│ └─ features/utils/utils_validacion.py
│
└─ [Paso 3] features/services/clasificacion_suavizado_espacial.py
├─ features/services/simple_classifier_lstm.py
├─ features/services/rasterizar_imagen_hsl.py
│ ├─ core/boto_handler.py (s3_eu)
│ └─ core/settings.py (AWS_S3_BUCKET_CROP_DETECTION)
└─ features/models/best_model_{N}steps[_ft].pth
8. Formato de Datos en cada Etapa
| Etapa | Input | Output |
|---|---|---|
| Task Celery | (id, uuid, wkt, model, start_date) | {code, data} |
| build_dataset | WKT + fechas | GeoTIFFs multi-banda (8 bandas, 30m) |
| seleccionar_ciclo | GeoTIFFs + GeoJSON | .npy por píxel (N×7) + metadata.json |
| clasificacion | .npy + modelo .pth | DataFrame + stats dict |
| rasterizar | DataFrame (lon, lat, clase) | GeoTIFF COG en S3 |
| DB update | stats dict | Rows en sales_adis_detection_crops |
Resultado final retornado:
{
"uuid_detection": "abc-123",
"fecha_creacion": "2024-01-15T10:30:00",
"periodo": { "fecha_inicio": "2024-01-15", "fecha_fin": "2024-07-14" },
"modelo": { "num_steps": 24, "fine_tuning": true },
"parametros_clasificacion": { "kernel_size": 3, "threshold": 0.4, "morphology_size": 3 },
"resultados": {
"soja": { "crop_id": 4, "superficie_ha": 150.5, "confianza_promedio": 0.87, "cantidad_pixeles": 1672 },
"maíz": { "crop_id": 2, "superficie_ha": 80.2, "confianza_promedio": 0.82, "cantidad_pixeles": 891 }
},
"archivos_generados": { "geotiff": "abc-123.tif" }
}
9. Archivos Temporales y Filesystem
El worker genera archivos en disco durante la ejecución:
geotiff_output/lote_{uuid}/ ← GeoTIFFs descargados (Paso 1)
ciclos_fenologicos/lote_{uuid}_{N}steps/ ← Arrays .npy (Paso 2)
resultados_clasificacion/{uuid}/ ← JSON de resultado (Paso 3)
temp_geojson_{uuid}.geojson ← GeoJSON temporal (se borra)
Problema: No hay limpieza automática de estos archivos después de la ejecución. Se acumulan en disco.
10. Problemas y Oportunidades de Refactor
Arquitectura
- core/ es boilerplate heredado: El 70% de
settings.pyson variables que no se usan.emails.py,permissions.py,aura_events.py,models.pyno participan del flujo. - sys.path.insert por todos lados:
inferencia_completa.py,clasificacion_suavizado_espacial.py,seleccionar_ciclo_fenologico.pymanipulansys.pathpara importar módulos. Indica que la estructura de paquetes no está bien definida. - Imports relativos rotos: Los services se importan entre sí con
from build_dataset import ...(sin prefijo de paquete), lo cual solo funciona porque se inyecta_services_dirensys.path.
Services
- build_dataset.py es un God Object:
EarthdataProcessortiene 500+ líneas, maneja autenticación, búsqueda, descarga, recorte, guardado, visualización y creación de datasets xarray. Debería separarse. - Archivos temporales sin cleanup: GeoTIFFs, .npy y JSONs se acumulan en disco sin limpieza.
- Rutas hardcodeadas:
"geotiff_output","ciclos_fenologicos","resultados_clasificacion"están hardcodeados en varios archivos. - Código muerto:
show_granule_images(),save_lote_data(),create_xarray_dataset(),process_all_lotes()en build_dataset.py no se usan en el flujo del worker. db.pycarga label_mapping al importar: Lee el JSON a nivel de módulo. Si el archivo no existe, el import falla y rompe todo el worker.rasterizar_imagen_hsl.pytiene typo en el nombre: Debería serhls(Harmonized Landsat Sentinel), nohsl.
Seguridad
- test.py tiene credenciales AWS hardcodeadas (access key, secret key, session token). Aunque son temporales, no debería estar en el repo.
USE_S3_ACCESSse lee como string del env pero se usa como booleano en comparaciones. Puede dar comportamiento inesperado.
Testing
- No hay tests automatizados. Solo
test_manual/main.pyque es un script de ejecución manual. features/utils/test_flujo_inferencia.pyexiste pero no lo revisé — probablemente otro test manual.
Performance
Problema 1: Apertura excesiva de archivos en seleccionar_ciclo_fenologico.py
Código actual:
for i, (fila, col) in enumerate(zip(filas, columnas)): # Por cada píxel
serie_temporal = []
for ruta_raster in archivos_raster: # Por cada imagen
with rasterio.open(ruta_raster) as src: # ABRE EL ARCHIVO
datos = src.read(window=Window(col, fila, 1, 1)).squeeze()
serie_temporal.append(datos)
Impacto:
- Para un lote típico: 1000 píxeles × 30 imágenes = 30,000 llamadas a
rasterio.open() - Cada
open()implica: syscall de apertura, lectura de headers TIFF, parsing de metadata, creación de objeto Python - Tiempo estimado: ~50-100ms por open en archivos remotos/NFS, ~5-10ms en SSD local
- Total: 2.5 a 50 minutos solo en I/O de apertura de archivos
Solución propuesta:
# Abrir todos los archivos UNA vez
raster_handles = [rasterio.open(ruta) for ruta in archivos_raster]
try:
for i, (fila, col) in enumerate(zip(filas, columnas)):
serie_temporal = []
for src in raster_handles: # Reutilizar handles abiertos
datos = src.read(window=Window(col, fila, 1, 1)).squeeze()
serie_temporal.append(datos)
finally:
for src in raster_handles:
src.close()
Mejora esperada: De 30,000 opens a 30 opens = ~1000x menos syscalls de apertura
Solución óptima (lectura en bloque):
# Leer TODA la imagen una vez, luego indexar en memoria
for ruta_raster in archivos_raster:
with rasterio.open(ruta_raster) as src:
imagen_completa = src.read() # (8, H, W) en memoria
for i, (fila, col) in enumerate(zip(filas, columnas)):
datos = imagen_completa[:, fila, col] # Indexación NumPy, ~nanosegundos
Trade-off: Usa más RAM (una imagen HLS recortada ~10-50MB), pero elimina completamente el overhead de I/O por píxel.
Problema 2: Inferencia píxel por píxel en clasificacion_suavizado_espacial.py
Código actual:
with torch.no_grad():
for pixel in pixel_data: # Loop secuencial
temporal_data = np.load(pixel['path'])
temporal_tensor = torch.from_numpy(temporal_data).float().unsqueeze(0) # batch_size=1
output = model(temporal_tensor) # Forward pass con 1 muestra
probs = torch.softmax(output, dim=1).squeeze(0).numpy()
Impacto:
- Para 1000 píxeles = 1000 forward passes individuales
- Cada forward pass tiene overhead fijo: transferencia CPU↔GPU (si hay), sincronización, etc.
- El modelo LSTM tiene
batch_first=True, está diseñado para batches pero se usa con batch=1 - PyTorch optimiza operaciones matriciales para batches grandes (SIMD, paralelismo)
Benchmark típico:
| Batch size | Tiempo por muestra | Total 1000 muestras |
|---|---|---|
| 1 | ~5ms | ~5 segundos |
| 32 | ~0.3ms | ~0.3 segundos |
| 128 | ~0.1ms | ~0.1 segundos |
Solución propuesta:
# Cargar todos los píxeles en un solo tensor
all_data = []
for pixel in pixel_data:
temporal_data = np.load(pixel['path'])
all_data.append(temporal_data)
# Stack en un batch único
batch_tensor = torch.from_numpy(np.stack(all_data)).float() # (N_pixels, 24, 7)
# UN solo forward pass
with torch.no_grad():
outputs = model(batch_tensor) # (N_pixels, 6)
all_probs = torch.softmax(outputs, dim=1).numpy() # (N_pixels, 6)
# Asignar al cubo
for i, pixel in enumerate(pixel_data):
fila_idx = pixel['fila'] - min_fila
col_idx = pixel['columna'] - min_col
prob_cube[fila_idx, col_idx, :] = all_probs[i]
Mejora esperada: De ~5 segundos a ~0.1 segundos = ~50x más rápido
Consideración de memoria: Si hay muchos píxeles (>10,000), procesar en mini-batches:
BATCH_SIZE = 512
for i in range(0, len(pixel_data), BATCH_SIZE):
batch = pixel_data[i:i+BATCH_SIZE]
# ... procesar batch
Problema 3: Carga de .npy uno por uno
Código actual:
for pixel in pixel_data:
temporal_data = np.load(pixel['path']) # Syscall por archivo
Impacto: 1000 píxeles = 1000 llamadas a np.load() = 1000 syscalls de lectura
Solución alternativa: Guardar todos los píxeles en un solo archivo:
# En seleccionar_ciclo_fenologico.py (al guardar)
np.savez_compressed(
f"{directorio_salida}/all_pixels.npz",
data=np.stack(all_pixel_arrays), # (N_pixels, 24, 7)
pixel_ids=pixel_ids
)
# En clasificacion_suavizado_espacial.py (al cargar)
loaded = np.load(f"{npy_folder}/all_pixels.npz")
all_data = loaded['data'] # UN solo load
Mejora esperada: De 1000 loads a 1 load = ~100x menos I/O