← Terug naar blog
Software Engineering
7 oktober 2025

Async job workers met BullMQ en Redis: van queue tot event

Hoe we een roosteroptimalisatie-worker bouwen die jobs asynchroon verwerkt, real-time voortgang rapporteert en graceful degradeert bij fouten.
Aron Heesakkers
Aron HeesakkersFullstack Developer & AI Engineer

Roosteroptimalisatie kost tijd. Een genetisch algoritme over 200 diensten en 80 medewerkers loopt 2–6 seconden. Je wil die berekening niet in een HTTP request stoppen — dat blokkeert de server, geeft een slechte gebruikerservaring bij timeouts en maakt retry-logica complex.

De oplossing is een job queue. De monolith zet een job in Redis via BullMQ. Een aparte Python worker pikt hem op, voert de solver uit, en stuurt events terug via een apart Redis kanaal. De gebruiker ziet live voortgang zonder dat er een HTTP-verbinding open hoeft te blijven.

Dit is hoe die architectuur er in de praktijk uitziet.

Twee Redis-kanalen, één verantwoordelijkheid

We gebruiken twee gescheiden Redis-kanalen:

workforce-solver — de job queue. De Node.js monolith voegt hier jobs aan toe via BullMQ. Elke job bevat de roosterdata: diensten, medewerkers, beschikbaarheid, constraints.

solver-events — het event kanaal. De Python worker schrijft hier progress- en completion-events naartoe. De monolith luistert en stuurt updates door naar de frontend via WebSockets.

Scheiding van queue en events is een bewuste keuze. De job queue heeft retry-logica, job-status tracking en prioriteiten nodig — dat is BullMQ's domein. Events zijn fire-and-forget: als een progress-event verloren gaat, is dat geen ramp, de volgende komt 1 seconde later. Ze mengen zou de semantiek van beide vervuilen.

De worker bootstrap

De worker is een asyncio applicatie die één BullMQ-worker opzet:

async def main():
    redis = Redis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        password=settings.REDIS_PASSWORD,
        ssl=settings.REDIS_TLS,
    )

    worker = Worker(
        settings.SOLVER_JOBS_QUEUE,
        process_job,
        {"connection": redis, "concurrency": settings.WORKER_CONCURRENCY},
    )

    await worker.run()

WORKER_CONCURRENCY staat standaard op 2. Dat betekent dat twee zware solver-runs tegelijk kunnen lopen. Meer zou CPU-contention veroorzaken; één zou queue-buildup geven bij piekbelasting.

Job verwerking: validatie eerst

Elke job die binnenkomt, wordt eerst gevalideerd via Pydantic:

async def process_job(job: Job) -> dict:
    try:
        data = SolverJobQueueDataDto.model_validate(job.data)
    except ValidationError as e:
        raise ValueError(f"Ongeldige job data: {e}") from e

    processor = JobProcessor(event_emitter=EventEmitter(redis))
    return await processor.process(data)

Als validatie faalt, gooit BullMQ de job als failed zonder retry — een ongeldige job wordt niet beter van opnieuw proberen. Voor echte solver-fouten (timeout, onverwachte input) gebruiken we BullMQ's ingebouwde retry met exponential backoff.

Progress events: throttled, niet gebufferd

De gebruiker wil weten dat er iets gebeurt. Maar Redis bombarderen met honderd updates per seconde is zinloos — de UI kan toch niet sneller renderen dan ~30fps, en BullMQ-overhead telt op.

We sturen maximaal één update per seconde, met een beschrijving die roteert op basis van verstreken tijd:

PROGRESS_MESSAGES = [
    "Diensten analyseren...",
    "Beschikbaarheid controleren...",
    "Genetisch algoritme uitvoeren...",
    "Oplossingen evalueren...",
    "Rooster optimaliseren...",
]

async def emit_progress(self, pct: int, gen: int, total: int, pop: int, eta: float):
    elapsed = time.monotonic() - self.start_time
    message_idx = int(elapsed / 5) % len(PROGRESS_MESSAGES)

    await self.redis.xadd(
        self.events_queue,
        {
            "type": "solver.progress",
            "job_id": self.job_id,
            "progress": pct,
            "message": PROGRESS_MESSAGES[message_idx],
            "generation": f"{gen}/{total}",
            "population": pop,
            "eta_seconds": round(eta, 1),
        },
    )

We gebruiken Redis Streams (XADD) in plaats van Pub/Sub. Streams bewaren events tot ze worden geconsumeerd — als de monolith even offline is, mist hij geen updates.

De Strategy Pattern voor solver-keuze

We hebben twee solvers: een genetisch algoritme (primair) en een greedy matcher (fallback). Ze zijn uitwisselbaar achter een interface:

class SolverStrategy(Protocol):
    async def solve(self, request: PlanRequestDTO) -> PlanResponseDTO:
        ...

class SolverManager:
    def __init__(
        self,
        primary: SolverStrategy,
        fallback: SolverStrategy | None = None,
    ):
        self.primary = primary
        self.fallback = fallback

    async def solve(self, request: PlanRequestDTO) -> PlanResponseDTO:
        try:
            return await self.primary.solve(request)
        except Exception as exc:
            if self.fallback and settings.SOLVER_ALLOW_FALLBACK:
                logger.warning("Primaire solver gefaald, fallback naar greedy: %s", exc)
                return await self.fallback.solve(request)
            raise

In productie is SOLVER_IMPLEMENTATION=generetic met SOLVER_ALLOW_FALLBACK=true. Als de C solver een segfault of onverwachte fout geeft, geeft de gebruiker alsnog een rooster — misschien iets minder optimaal, maar geen foutmelding.

De keuze welke solver je gebruikt is volledig runtime-configureerbaar. Dat maakt het ook makkelijk om tijdens een deploy naar de nieuwe C-versie te switchen zonder code te deployen.

Completion en failure events

Na afloop stuurt de worker een completion-event:

await self.event_emitter.emit({
    "type": "solver.completed",
    "job_id": job.id,
    "result": result.model_dump(),
    "solve_time_ms": int((time.monotonic() - start) * 1000),
    "solver_used": "generetic" if used_primary else "greedy",
})

solver_used is bewust opgenomen. In onze monitoring zien we hoe vaak de fallback inslaat — als dat percentage stijgt, weten we dat er iets mis is met de C solver, lang voordat gebruikers het melden.

Bij failure laat BullMQ het native job-failure mechanisme zijn werk doen. We gooien gewoon een exception; BullMQ logt de stack trace, markeert de job als failed en triggert eventueel retry. We sturen dan ook een solver.failed event zodat de frontend de gebruiker kan informeren zonder te wachten op een timeout.

Graceful shutdown

Workers die midden in een berekening worden gestopt, laten broken jobs achter. BullMQ handelt dat af via SIGTERM-handling: laufende jobs worden afgemaakt, nieuwe worden niet meer opgepikt. Onze Docker container heeft een stop_grace_period van 30 seconden — ruim genoeg voor de langste solver-run.

async def shutdown(worker: Worker, redis: Redis):
    await worker.close()
    await redis.aclose()

loop.add_signal_handler(
    signal.SIGTERM,
    lambda: asyncio.create_task(shutdown(worker, redis)),
)

Wat je vermijdt met deze aanpak

Een paar dingen die we bewust niet doen:

Geen polling vanuit de frontend op job-status. BullMQ heeft een job-status API, maar polling daarvoor vanuit duizenden clients tegelijk schaalt slecht. Events via Streams schalen beter en zijn push-based.

Geen shared state tussen workers. Elke worker-instantie is stateless. State zit in Redis (job-data, events) of in de job zelf. Dat maakt horizontaal schalen triviaal: draai meer worker containers.

Geen business logic in de worker. De worker weet alleen hoe hij een job ophaalt, valideert, naar de solver stuurt en het resultaat terugstuurt. Alle inhoudelijke logica zit in de solver-laag. Als morgen de queue-implementatie verandert van BullMQ naar SQS, hoeft er niets te veranderen aan de solver.

De volledige flow in één overzicht

Monolith (Node.js)
  └─► Queue "workforce-solver" (BullMQ / Redis)
         │
         ▼
  Python Worker
  ├─► Valideer job data (Pydantic)
  ├─► Emit progress: 10% ("Gestart")
  ├─► SolverManager.solve()
  │     ├─► GenereticSolverStrategy (C solver in thread pool)
  │     │     └─► Poll progress elke 100ms → emit events (max 1/s)
  │     └─► GreedySolverStrategy als fallback
  ├─► Emit progress: 100%
  └─► Emit "solver.completed" event → Stream "solver-events"

Monolith (Node.js)
  └─► Consume "solver-events" → WebSocket naar frontend

De monolith en de worker zijn volledig ontkoppeld. Ze spreken alleen via Redis. Je kunt de worker deployen, updaten of schalen zonder de monolith aan te raken.


Project bespreken?

Heb je een use case waar je over twijfelt?

We praten graag over of een AI-aanpak past — en zeggen het eerlijk als het beter zonder kan. Vaste prijs, afgesproken scope.
Plan een gesprek
Probeer AI