← Terug naar blog
AI Engineering
20 januari 2026

Webhook-gestuurde AI-pipelines: job queue met exponential backoff

Hoe je inkomende WhatsApp-berichten betrouwbaar verwerkt via een polling job queue — inclusief idempotentie, concurrency en retry-strategie
Aron Heesakkers
Aron HeesakkersFullstack Developer & AI Engineer

Een inkomend WhatsApp-bericht triggert een Twilio-webhook. Die webhook duurt maximaal een paar seconden voordat Twilio een time-out verwacht. Een LLM-aanroep met RAG-context duurt typisch 1–3 seconden — soms meer als het model langzaam is of de context groot is.

Als je de LLM-aanroep direct in de webhook-handler zet, zit je altijd op de grens van wat Twilio tolereert. En als de aanroep mislukt — netwerk-flap, Azure-throttling, database-time-out — is er niets dat het automatisch opnieuw probeert.

De oplossing is een job queue: de webhook schrijft een rij in de database, geeft Twilio meteen een 200 OK terug, en een aparte worker verwerkt de job asynchroon.

De queue-structuur

Een ai_jobs-tabel met vijf relevante kolommen:

create table ai_jobs (
  id          uuid primary key default gen_random_uuid(),
  status      text not null default 'pending', -- pending | processing | done | failed
  payload     jsonb not null,
  attempts    int not null default 0,
  max_attempts int not null default 3,
  next_run_at timestamptz not null default now(),
  last_error  text,
  created_at  timestamptz default now()
);

next_run_at is de sleutel voor backoff: een retry-job krijgt een toekomstige next_run_at zodat de worker hem overslaat totdat de wachttijd verstreken is.

Jobs claimen: atomaire locking

De worker gebruikt een Postgres-functie om jobs te claimen en te lockken in één atomaire operatie:

-- claim_ai_jobs(batch_size int) → geeft claimed jobs terug
update ai_jobs
set status = 'processing', attempts = attempts + 1, updated_at = now()
where id in (
  select id from ai_jobs
  where status = 'pending' and next_run_at <= now()
  order by next_run_at
  limit batch_size
  for update skip locked  -- skip rows locked by concurrent workers
)
returning *;

FOR UPDATE SKIP LOCKED is de Postgres-primitief voor concurrent job queues: meerdere worker-instanties kunnen tegelijkertijd pollen zonder dat ze elkaars jobs claimen. Elk bericht wordt exact één keer verwerkt, ook als er meerdere worker-processen draaien.

Exponential backoff

De backoff-formule is simpel maar effectief:

// min(2^attempts × 5, 300) — cap op 5 minuten
export function calcBackoffSeconds(attempts: number): number {
  return Math.min(Math.pow(2, attempts) * 5, 300);
}
// Poging 1 → 10 seconden
// Poging 2 → 20 seconden
// Poging 3 → 40 seconden (laatste poging)

Na elke mislukte poging (als attempts < max_attempts) zet de worker de job terug op pending met een berekende next_run_at:

export async function requeueWithBackoff(jobId, attempts, err): Promise<void> {
  const delaySecs = calcBackoffSeconds(attempts);
  const nextRunAt = new Date(Date.now() + delaySecs * 1000).toISOString();

  await sb
    .from("ai_jobs")
    .update({
      status: "pending",
      last_error: err,
      next_run_at: nextRunAt,
    })
    .eq("id", jobId);
}

Als attempts >= max_attempts, markeert de worker de job als failed en logt de fout. Beheerders zien mislukte jobs in het dashboard en kunnen ze handmatig herstarten.

Idempotentie: de done-is-noop guard

AI-replies dupliceren is erger dan ze missen. Als een job halverwege crasht — na het opslaan van de reply maar vóór het markeren als done — en de job opnieuw wordt verwerkt, zou de medewerker twee keer hetzelfde antwoord ontvangen.

We vangen dat op met een idempotentie-check aan het begin van elke job:

async function aiReplyAlreadyExists(
  conversationId,
  inboundMessageCreatedAt,
): Promise<boolean> {
  const { data } = await sb
    .from("messages")
    .select("id")
    .eq("conversation_id", conversationId)
    .eq("from_self", true)
    .eq("source", "ai")
    .gt("created_at", inboundMessageCreatedAt)
    .limit(1);
  return (data ?? []).length > 0;
}

// Begin van executeJob:
const alreadyDone = await aiReplyAlreadyExists(
  conversationId,
  inboundCreatedAt,
);
if (alreadyDone) {
  await markDone(job.id);
  return; // reply al verstuurd — niet opnieuw doen
}

De check: bestaat er al een AI-bericht in dit gesprek, aangemaakt ná het inkomende bericht? Zo ja, dan is de job al klaar. Dit maakt de job-verwerking idempotent: het maakt niet uit hoe vaak hij wordt verwerkt — het resultaat is hetzelfde.

Concurrency en slow-job monitoring

De poll-functie claimt en verwerkt meerdere jobs parallel:

export async function pollOnce(concurrency: number): Promise<number> {
  const jobs = await claimNext(concurrency);
  if (jobs.length === 0) return 0;

  await Promise.all(
    jobs.map(async (job) => {
      try {
        const durationMs = await executeJob(job);
        if (durationMs > SLOW_JOB_THRESHOLD_MS) {
          warn(`SLOW job ${job.id}: ${durationMs}ms`);
        }
      } catch (err) {
        const isFinal = job.attempts >= job.max_attempts;
        if (isFinal) {
          await markFailed(job.id, err.message);
        } else {
          await requeueWithBackoff(job.id, job.attempts, err.message);
        }
      }
    }),
  );
  return jobs.length;
}

SLOW_JOB_THRESHOLD_MS is 8.000ms. Jobs die langer duren, worden gelogd als waarschuwing — ze mislukken niet, maar de trage uitvoering is een signaal dat context te groot is of het model traag reageert.

Job-typen in één worker

De worker verwerkt vier job-typen in dezelfde poll-loop:

const durationMs =
  job.payload.type === "embed_record"
    ? await executeEmbedJob(job)
    : job.payload.type === "import_people"
      ? await executeImportJob(job)
      : job.payload.type === "suggest"
        ? await executeSuggestJob(job)
        : /* 'autoreply' */ await executeJob(job);
  • autoreply — inkomend bericht verwerken, LLM aanroepen, via Twilio versturen
  • suggest — AI-suggestie genereren voor admin zonder versturen
  • embed_record — medewerker of document embedden naar pgvector
  • import_people — mensen importeren uit CSV of externe bron

Eén worker-loop voor alle typen betekent dat prioriteiten centraal beheerd worden (via next_run_at en created_at) en dat er geen aparte infrastructuur nodig is per job-type.

Waarom Postgres in plaats van een echte queue?

De voor de hand liggende vraag: waarom geen Redis, RabbitMQ, of een managed queue-service? Drie redenen:

  1. Eenvoud — de queue-tabel zit in dezelfde database als de rest van de applicatie. Geen extra service om te beheren, te monitoren of te betalen.
  2. Inzichtelijkheid — beheerders kunnen de queue direct inspecteren via de admin-UI met een gewone select-query.
  3. Schaal — voor honderden berichten per dag is Postgres meer dan snel genoeg. FOR UPDATE SKIP LOCKED is precies hiervoor ontworpen en werkt feilloos tot tienduizenden jobs per dag.

Als de schaal naar honderdduizenden berichten per dag groeit, is overstappen naar een dedicated queue gerechtvaardigd. Tot die tijd voegt een extra service alleen complexiteit toe. Vertel ons over je messaging-use case als je wilt sparren over de architectuur.


Project bespreken?

Heb je een use case waar je over twijfelt?

We praten graag over of een AI-aanpak past — en zeggen het eerlijk als het beter zonder kan. Vaste prijs, afgesproken scope.
Plan een gesprek
Probeer AI