Skip to content

feat(concurrency): bullmq based concurrency control system#3605

Open
icecrasher321 wants to merge 11 commits intostagingfrom
feat/conc-control
Open

feat(concurrency): bullmq based concurrency control system#3605
icecrasher321 wants to merge 11 commits intostagingfrom
feat/conc-control

Conversation

@icecrasher321
Copy link
Collaborator

@icecrasher321 icecrasher321 commented Mar 16, 2026

Summary

  • BullMQ based concurrency control system for executions currently running in line [manual execs excluded]. Can tune limits based on resources.

  • Overall admin gates to prevent rate limiting services based crashes.

Type of Change

  • New feature
  • Other: Reliability

Testing

Tested manually under different configurations, added extensive test suite

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel
Copy link

vercel bot commented Mar 16, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Mar 16, 2026 11:40pm

Request Review

@icecrasher321 icecrasher321 marked this pull request as ready for review March 16, 2026 03:56
@cursor
Copy link

cursor bot commented Mar 16, 2026

PR Summary

High Risk
High risk because it rewires core execution/scheduling/webhook paths to use a new BullMQ + workspace-dispatch queueing system with new capacity limits and error handling, which can affect job admission, ordering, and availability under load.

Overview
Introduces a BullMQ-backed workspace dispatch system to enforce per-workspace concurrency and global/tenant queue capacity. This adds new dispatch storage adapters (Redis + in-memory), a dispatcher/planner/reconciler loop with leasing and fairness across workspaces, and a worker helper to update dispatch job lifecycle.

Execution entrypoints are updated to enqueue via enqueueWorkspaceDispatch when BullMQ is available (workflow executes, scheduled executes, and mothership jobs), including new direct-execution paths that can synchronously waitForDispatchJob and stream via a buffered SSE reader. The jobs status API now prefers dispatch records and presents dispatcher-aware states/metadata.

Adds an in-process admission/gate (429 + Retry-After) to shed load on select API routes, and returns explicit 503 capacity responses when the dispatch queue is full. Billing metadata parsing is hardened with zod and adds a new cached getWorkspaceConcurrencyLimit (env defaults + enterprise override). Async job backend selection is changed from Redis to BullMQ, removing the Redis async backend implementation/tests and adding a new BullMQ async backend.

Written by Cursor Bugbot for commit 2bf1feb. Configure here.

@icecrasher321 icecrasher321 requested a review from Sg312 March 16, 2026 03:56
@icecrasher321 icecrasher321 changed the title feat(concurrency): bullmq based queueing system feat(concurrency): bullmq based concurrency control system Mar 16, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 16, 2026

Greptile Summary

This PR introduces a comprehensive BullMQ-based queuing and concurrency control system for workflow, webhook, and schedule executions. It replaces the previous Redis/database job queue backends with BullMQ queues backed by Redis, adds a per-workspace fairness dispatcher (with a Lua-script-based atomic claim mechanism), a lease-based concurrency limit (keyed to billing plan), an in-process admission gate for external API requests, and a standalone worker process to consume jobs. It also adds a buffered SSE stream for non-manual executions that now run asynchronously through the dispatch queue. The change is substantial (~5900 lines added) and represents a foundational infrastructure improvement for reliability and rate-limiting protection.

Key changes and issues found:

  • Misleading variable name in schedule route (apps/sim/app/api/schedules/execute/route.ts, lines 118–123): workspaceId is assigned the getWorkflowById function, not a workspace ID string. The code works at runtime but is extremely confusing and should be renamed (e.g. getWorkflowByIdFn).

  • GLOBAL_DEPTH_KEY double-decrement race (apps/sim/lib/core/workspace-dispatch/redis-store.ts): markDispatchJobCompleted / markDispatchJobFailed unconditionally call DECR on GLOBAL_DEPTH_KEY. If the reconciler and the BullMQ worker both process the same job's completion concurrently, the counter is decremented twice. The Math.max(0, …) guard and periodic reconcileGlobalQueueDepth mitigate the visible impact, but a guard that skips the decrement when the record is already terminal would be more correct.

  • Non-atomic BullMQ add + dispatch record update in finalizeAdmittedJob (apps/sim/lib/core/workspace-dispatch/planner.ts): The BullMQ job is added before the dispatch record is marked admitted. If markDispatchJobAdmitted fails, an orphaned BullMQ job exists with a released lease, creating a window where the job could be executed while the reconciler simultaneously restores the dispatch record to waiting. Reversing the order (mark admitted first, then add to BullMQ) makes the failure mode fully safe for the reconciler to recover.

  • Long-polling HTTP handler (apps/sim/app/api/workflows/[id]/execute/route.ts): waitForDispatchJob can block the HTTP handler for up to 5.5 minutes. This is safe on dedicated servers but will be silently dropped by many load balancers/proxies with shorter idle timeouts. Infrastructure timeout requirements should be documented.

  • Per-process admission gate (apps/sim/lib/core/admission/gate.ts): The inflight counter is module-local. In a horizontally scaled deployment the effective global limit is n_pods × ADMISSION_GATE_MAX_INFLIGHT. Operators should be guided to set the env-var to global_limit / n_pods.

Confidence Score: 3/5

  • PR introduces valuable infrastructure but has a few logic issues around counter correctness, partial-failure atomicity, and infrastructure timeout assumptions that should be resolved before production rollout.
  • The core design (Lua-script atomic claim, lease-based concurrency, BullMQ workers) is well-thought-out and the test suite is comprehensive. However, the non-atomic finalizeAdmittedJob step creates a real (if narrow) window for duplicate execution, the GLOBAL_DEPTH_KEY double-decrement weakens the global gate, and the long-polling HTTP handler will silently fail for users behind standard load balancers. These issues are fixable but should be addressed before heavy production traffic.
  • Pay close attention to apps/sim/lib/core/workspace-dispatch/planner.ts (operation ordering), apps/sim/lib/core/workspace-dispatch/redis-store.ts (double-decrement guard), and apps/sim/app/api/workflows/[id]/execute/route.ts (long-polling timeout).

Important Files Changed

Filename Overview
apps/sim/lib/core/workspace-dispatch/dispatcher.ts New per-process dispatcher loop that coalesces wakeups and drives workspace admission; process-local state is intentional but should be noted for multi-pod deployments.
apps/sim/lib/core/workspace-dispatch/redis-store.ts Redis-backed dispatch store using a Lua script for atomic job claiming; GLOBAL_DEPTH_KEY can be double-decremented in a race between the reconciler and worker completing the same job.
apps/sim/lib/core/workspace-dispatch/planner.ts Claims next admissible workspace job via a Redis lock; BullMQ add and markDispatchJobAdmitted are not atomic — a partial failure leaves an orphaned BullMQ job that may be double-executed.
apps/sim/lib/core/workspace-dispatch/reconciler.ts Periodic reconciliation of stranded/stuck dispatch jobs; correctly scoped to admitting/admitted/running states, but can race with the worker on markDispatchJobCompleted.
apps/sim/lib/core/workspace-dispatch/worker.ts Lease heartbeat + job lifecycle hooks for BullMQ workers; solid implementation with proper cleanup in finally block.
apps/sim/lib/core/admission/gate.ts Simple in-process admission gate limiting concurrent external API requests; effective global limit equals n_pods × MAX_INFLIGHT which should be documented for operators.
apps/sim/app/api/workflows/[id]/execute/route.ts Refactored to route non-manual executions through workspace dispatch; long-polling waitForDispatchJob (up to 5.5 min) in the HTTP handler risks infrastructure timeout disconnects.
apps/sim/app/api/schedules/execute/route.ts Updated to route scheduled and mothership jobs through workspace dispatch; contains a misleading variable named workspaceId that holds a function reference (getWorkflowById).
apps/sim/lib/billing/workspace-concurrency.ts New module resolving per-workspace concurrency limits from billing plan, with Redis-backed and in-memory cache (60s TTL); clean implementation.
apps/sim/worker/index.ts New standalone BullMQ worker process with configurable concurrency per queue, graceful shutdown, and periodic dispatcher wake/notification sweep intervals.

Sequence Diagram

sequenceDiagram
    participant Client
    participant RouteHandler as API Route Handler
    participant AdmissionGate as Admission Gate (in-process)
    participant Dispatcher as Workspace Dispatcher
    participant RedisStore as Redis Dispatch Store
    participant BullMQ as BullMQ Queue (Redis)
    participant Worker as BullMQ Worker Process
    participant DispatchWorker as Dispatch Worker (worker.ts)

    Client->>RouteHandler: POST /api/workflows/[id]/execute
    RouteHandler->>AdmissionGate: tryAdmit()
    alt At capacity (inflight >= MAX_INFLIGHT)
        AdmissionGate-->>RouteHandler: null
        RouteHandler-->>Client: 429 Too Many Requests
    else Admitted
        AdmissionGate-->>RouteHandler: ticket
        RouteHandler->>Dispatcher: enqueueWorkspaceDispatch(input)
        Dispatcher->>RedisStore: enqueueWorkspaceDispatchJob()
        RedisStore-->>Dispatcher: jobRecord (status=waiting)
        Dispatcher->>Dispatcher: runDispatcherLoop() [void]
        Dispatcher->>RedisStore: popNextWorkspaceId()
        Dispatcher->>RedisStore: claimWorkspaceJob() [Lua script]
        RedisStore-->>Dispatcher: {type: admitted, record, leaseId}
        Dispatcher->>BullMQ: queue.add(jobName, payload, {jobId})
        Dispatcher->>RedisStore: markDispatchJobAdmitted()
        Dispatcher-->>RouteHandler: dispatchJobId
        RouteHandler->>RouteHandler: waitForDispatchJob(id, timeout) [polls 250ms]
        Worker->>BullMQ: picks up job
        Worker->>DispatchWorker: runDispatchedJob(metadata, fn)
        DispatchWorker->>RedisStore: markDispatchJobRunning()
        DispatchWorker->>DispatchWorker: executeQueuedWorkflowJob() / executeWorkflowJob()
        DispatchWorker->>RedisStore: markDispatchJobCompleted(output)
        DispatchWorker->>RedisStore: releaseWorkspaceLease()
        DispatchWorker->>Dispatcher: wakeWorkspaceDispatcher()
        RedisStore-->>RouteHandler: record (status=completed) [via poll]
        RouteHandler->>AdmissionGate: ticket.release()
        RouteHandler-->>Client: 200 JSON result
    end
Loading

Last reviewed commit: be83c97

@icecrasher321
Copy link
Collaborator Author

bugbot run

@icecrasher321
Copy link
Collaborator Author

bugbot run

@icecrasher321
Copy link
Collaborator Author

bugbot run

@icecrasher321
Copy link
Collaborator Author

bugbot run

@icecrasher321
Copy link
Collaborator Author

bugbot run

@icecrasher321
Copy link
Collaborator Author

bugbot run

@icecrasher321
Copy link
Collaborator Author

bugbot run

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

pipeline.set(jobKey(record.id), JSON.stringify(record), 'EX', JOB_TTL_SECONDS)
pipeline.zadd(workspaceLaneKey(record.workspaceId, record.lane), score, record.id)
pipeline.zadd(ACTIVE_WORKSPACES_KEY, 'NX', sequence, record.workspaceId)
await pipeline.exec()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global depth counter drifts when jobs are restored

Low Severity

enqueueWorkspaceDispatchJob increments GLOBAL_DEPTH_KEY while markDispatchJobCompleted and markDispatchJobFailed decrement it. However, restoreWorkspaceDispatchJob (used by the reconciler to reset stranded jobs back to waiting) does not increment the counter, even though a restored job is active again and re-enters the queue. Since the completed/failed path already decremented the counter (or the counter was never decremented for a stranded job), each restore can cause the counter to under-count. This gradually drifts the global depth below reality until reconcileGlobalQueueDepth corrects it.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reconciler never restores terminal jobs -- so this can't happen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant