1. ¿Qué es el Paralelismo en Celery?
El paralelismo es la capacidad de ejecutar múltiples tareas exactamente al mismo tiempo.
En lugar de que una tarea espere a que la anterior termine (secuencial), el paralelismo permite que Celery distribuya la carga de trabajo entre diferentes recursos de tu servidor. Esto es clave para reducir tiempos de respuesta en procesos masivos
2. Ejecución en paralelo con group
En Celery, la herramienta diseñada para lanzar múltiples tareas al mismo tiempo es el group. Un grupo toma una lista de firmas de tareas (signatures) Ejemplo: tasks.s() y las envía al broker de forma masiva para que los workers disponibles las tomen de inmediato.
3. Los modelos de ejecución: Prefork y Gevent (configuración en workers)
Para lograr que las tareas corran "al mismo tiempo", Celery utiliza diferentes "Pools". Los dos más importantes son Prefork y Gevent.
A. Prefork (El estándar de procesos)
Es el modelo por defecto de Celery. Se basa en la creación de procesos hijos independientes del sistema operativo.
-
Cómo funciona: Cada tarea corre en su propio proceso. Esto significa que cada una tiene su propia memoria y acceso directo a un núcleo del CPU.
-
Para qué sirve: Es ideal para tareas de procesamiento (CPU Bound), como cálculos matemáticos, redimensionar imágenes o consultas pesadas a bases de datos SQL donde se requiere potencia real.
B. Gevent (Hilos cooperativos)
Gevent no usa procesos, sino corutinas (hilos verdes muy ligeros). Todo ocurre dentro de un mismo proceso.
-
Cómo funciona: Utiliza un mecanismo llamado "I/O no bloqueante". Cuando una tarea se queda esperando una respuesta (de una API o un servidor), Gevent la pausa y salta a la siguiente tarea instantáneamente sin perder tiempo.
-
Para qué sirve: Es ideal para tareas de red (I/O Bound), como hacer Web Scraping, consultar APIs externas o enviar miles de mensajes HTTP, donde el CPU casi no trabaja y solo está esperando.
4. El parámetro --concurrency
En la configuración del worker, el flag --concurrency define cuántas tareas puede tomar el worker simultáneamente. Su significado cambia según el pool:
-
En Prefork: Si pones --concurrency=4, Celery creará 4 procesos reales. El límite suele ser el número de núcleos de tu servidor.
-
En Gevent: Si pones --concurrency=1000, Celery manejará 1000 conexiones concurrentes dentro de un solo proceso. Aquí el límite es la memoria RAM y la capacidad de tu red.
Regla de oro: > * Prefork = Pocas tareas, pero con mucha potencia (Paralelismo real).
Gevent = Miles de tareas, pero con mucha espera (Concurrencia masiva).
5. Ejemplos
5.1- Ejemplo: Tareas en paralelo con Prefork
Configuración del Worker
celery -A proyecto worker -P prefork --concurrency=4
Ejemplo
# tasks.py
import time
from celery import task
@task
def generar_reporte_pesado(data):
# Simula procesamiento de datos intenso
resultado = sum(i * i for i in range(1000000))
time.sleep(2)
return resultado
# functions.py
from celery import group
from tasks import generar_reporte_pesado
def procesar_lote_reportes():
datos = [1, 2, 3, 4]
# Ejecutamos en paralelo real
job = group(generar_reporte_pesado.s(d) for d in datos)
job.apply_async()
5.2- Ejemplo: Tareas en paralelo con Gevent
Configuración del Worker
celery -A proyecto worker -P gevent --concurrency=100
Ejemplo
# tasks.py
import requests
from celery import task
@task
def solicitar_url(url):
# El 99% del tiempo es espera de red
r = requests.get(url)
return r.status_code
# functions.py (La función que lanza el paralelo)
from celery import group
from tasks import solicitar_url
def lanzar_monitoreo_web():
urls = ["https://google.com", "https://python.org", "https://github.com"]
# Mandamos las 3 en paralelo
job = group(solicitar_url.s(u) for u in urls)
job.apply_async()
6. El Riesgo de Elegir el Pool Incorrecto
Usar la herramienta equivocada no solo es ineficiente, puede romper la lógica de tu aplicación:
-
I/O Bound con Prefork (Ineficiencia): Si intentas manejar miles de tareas de red (como enviar 5,000 correos) con prefork, tu servidor colapsará. Cada proceso de prefork consume mucha RAM. Intentar levantar cientos de procesos para tareas que solo están "esperando" es un desperdicio masivo de recursos.
-
CPU Bound con Gevent (Código Bloqueante): Este es el error más común. Si tu tarea procesa datos o hace cálculos y usas gevent, bloquearás el worker. Al ser un solo hilo, hasta que la tarea 1 no suelte el CPU, la tarea 2 ni siquiera empezará. El paralelismo desaparece.
7.-Caso de Estudio Real: El Problema de las 6,000 Facturas
Imagina que tienes que generar facturas para 6,000 clientes. Cada factura requiere consultas a la base de datos (DB) y procesamiento lógico.
El Problema Inicial
Una sola tarea procesando 6,000 facturas tardaba mas 45 minutos - 1 hora. Era frágil: si la base de datos fallaba en el minuto 30, se perdía todo el progreso.
Intento 1: Fragmentación (Chunks) con Gevent
Se dividio el trabajo en 12 bloques de 500 clientes. Al lanzarlos (incluso usando group), se noto algo algo extraño en Flower:
- Aunque se tenia
concurrency=20, las tareas se ejecutaban secuencialmente (una tras otra).
- Terminaba una de 5 min y apenas empezaba la siguiente.
- ¿Qué pasó? Se tenia configurado
-P gevent. Como la generación de facturas implica procesar datos y el driver de la base de datos bloqueaba el hilo, Gevent no podía saltar a la siguiente tarea. El worker estaba "bloqueado" por un bloque a la vez.
La Solución Definitiva: El cambio a Prefork
Al cambiar la configuración a: worker -A Project -P prefork --concurrency=20
¡Todo funcionó! ¿Por qué?
- Paralelismo Real: prefork creó procesos independientes. Al lanzar las 12 tareas, Celery asignó cada bloque a un proceso distinto.
- Adiós al Bloqueo: El procesamiento de la factura en el Proceso 1 no afectó al Proceso 2. Los 12 bloques empezaron a trabajar al mismo tiempo.
- Resultado: Lo que antes sumaba 45 minutos - 1 hora o se ejecutaba en fila, ahora se completó en lo que tarda el bloque más lento (aprox. 7-15 minutos).
Conclusión: ¿Qué aprendimos?
La problemática no era el código, sino el modelo de ejecución.
Si tu tarea hace queries, cálculos o procesamiento de objetos, necesitas procesos reales: Usa Prefork.
Si usas Gevent para estas tareas, el group o la concurrency alta no servirán de nada, porque el código bloqueante obligará a tus tareas a ir una por una.
La regla de oro de este caso: Para facturación y base de datos, Prefork es el ideal.
8. Plus Extra
Hasta ahora hemos visto cómo ejecutar tareas en paralelo, pero hay un peligro oculto: ¿Qué pasa si 100 usuarios entran a tu vista al mismo tiempo?
Si tu vista de Django lanza un grupo de tareas y se queda esperando el resultado (o gestionando demasiada lógica), podrías agotar los hilos de Gunicorn o uWSGI. El servidor web se quedaría "congelado" esperando a Celery, y tu página dejaría de cargar para todos los demás.
La Solución: El patrón "Tarea Padre" con .join()
La mejor arquitectura no es lanzar el grupo directamente desde la vista, sino delegar esa responsabilidad a una Tarea Padre.
¿Cómo funciona?
- La Vista: Solo lanza una única tarea (tarea_maestra) y responde de inmediato al usuario devolviendo el task_id. El servidor web queda libre en milisegundos.
- La Tarea Padre (Celery): Esta tarea es la que crea el group de tareas hijas.
- El método .join(): Dentro de la tarea padre, usamos el comando de Celery para esperar a las hijas. Lo mejor de esto es que la tarea padre no se marca como "Success" hasta que todas las tareas del grupo han terminado.
¿Por qué esta es la forma correcta?
User Experience: El usuario no ve una página cargando infinitamente; recibe un ID de seguimiento al instante.
Escalabilidad: Puedes tener a miles de usuarios lanzando procesos masivos y tu Django seguirá funcionando fluido, ya que todo el "trabajo sucio" ocurre dentro de la infraestructura de Celery.
* Sencillez de Monitoreo: Solo tienes que consultar el estado de un solo ID (el de la tarea padre). Si el padre está terminado, sabes con total certeza que todas las facturas o procesos hijos están listos.
Ejemplo de cómo implementarlo
Configuración de las tareas
from celery import shared_task, group
@shared_task
def tarea_hija(nombre):
# Simulación de trabajo
return "Resultado de " + nombre
@shared_task
def ejecutar_tareas():
# 1. Definimos el grupo de tareas que queremos en paralelo
job = group([
tarea_hija.s("Tarea A"),
tarea_hija.s("Tarea B"),
tarea_hija.s("Tarea C"),
])
# 2. Ejecutamos el grupo
result_group = job.apply_async()
# 3. Esperamos a que todas terminen (sin importar si fallan o no)
# join() bloquea la ejecución de 'ejecutar_tareas' hasta que el grupo termine
result_group.join(propagate=False)
return "Todas las subtareas han finalizado"
Cómo funciona en tu flujo
- En tu vista: Al llamar a
ejecutar_tareas.delay(), Celery te devolverá inmediatamente un task_id.
def vista1():
task = ejecutar_tareas.delay()
return task.id # Este es el ID que vas a monitorear
El Ciclo de Vida:
- La tarea
ejecutar_tareas pasará a estado STARTED.
- Internamente, lanzará las 3 tareas hijas en paralelo.
ejecutar_tareas se quedará esperando en la línea result_group.join().
- Una vez que las 3 hijas terminen (ya sea con éxito o error),
ejecutar_tareas finalizará y su estado cambiará a SUCCESS.
Consideraciones Importantes
- Propagate=False: Es vital usar propagate=False en el método join(). Esto asegura que aunque una de las tareas hijas falle (levante una excepción), la tarea principal no muera y espere a que las demás terminen.
- Result Backend: Para que esto funcione, debes tener configurado un CELERY_RESULT_BACKEND (como Redis o base de datos), de lo contrario, ejecutar_tareas no podrá saber cuándo terminaron las hijas.
- Worker Resources: Asegúrate de tener suficientes workers (o concurrencia) para ejecutar las 4 tareas (la principal + las 3 hijas) simultáneamente, de lo contrario, podrías causar un deadlock si la principal ocupa un slot esperando a hijas que no pueden iniciar.