Skip to content

Commit ba7e565

Browse files
fix(table): atomic per-key writes for executions, plus run-op race fixes
The executions blob on user_table_rows was read-modify-written wholesale on every update. Concurrent writers (a column edit and a manual-retry stamp, two pickup calls, a cancel and a cascade) each computed a merge from their own snapshot, and the last writer clobbered keys it never touched — producing stuck "queued" cells, vanished stamps, and stale completed exec records reappearing after retries. Fixes: - updateRow / batchUpdateRows now apply executionsPatch via a SQL jsonb merge expression. Each writer only mutates the keys it explicitly patches; other keys are preserved. Eliminates the cross-key clobber. - writeWorkflowGroupState bypasses the stale-worker guard for `queued` (new scheduler stamp) and `cancelled` (authoritative cancel) writes — those ARE the new authority for the cell. Previously the new run's stamp was being rejected by the same guard meant to block the OLD worker's writes. - skipScheduler flag on UpdateRowData / BatchUpdateByIdData lets the cancel path and runWorkflowGroupsInternal opt out of the implicit auto-fire pass (cancel was waking up siblings; manual-run was racing its own scheduler). - CELL_CONTENT pinned to h-[22px] so status badges don't grow rows.
1 parent 941364a commit ba7e565

6 files changed

Lines changed: 135 additions & 30 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,11 @@ const CELL_HEADER =
8989
'border-[var(--border)] border-r border-b bg-[var(--bg)] px-2 py-[7px] text-left align-middle'
9090
const CELL_HEADER_CHECKBOX =
9191
'sticky left-0 z-[12] border-[var(--border)] border-r border-b bg-[var(--bg)] px-1 py-[7px] text-center align-middle'
92+
// Fixed height (not min-) so a Badge-rendered status pill doesn't make the row
93+
// grow vs a plain-text neighbor. Sized to comfortably contain the badge; the
94+
// flex centers plain text + badges on the same baseline.
9295
const CELL_CONTENT =
93-
'relative min-h-[20px] min-w-0 overflow-clip text-ellipsis whitespace-nowrap text-small'
96+
'relative flex h-[22px] min-w-0 items-center overflow-clip text-ellipsis whitespace-nowrap text-small'
9497
const SELECTION_OVERLAY =
9598
'pointer-events-none absolute -top-px -right-px -bottom-px -left-px z-[5] border-[2px] border-[var(--selection)]'
9699

apps/sim/background/workflow-column-execution.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,10 @@ export const workflowGroupCellTask = task({
353353
machine: 'medium-1x',
354354
retry: { maxAttempts: 1 },
355355
// Combined with `concurrencyKey: tableId`, caps each table's sub-queue to
356-
// 10 in-flight cell jobs while letting different tables run in parallel.
356+
// 20 in-flight cell jobs while letting different tables run in parallel.
357357
queue: {
358358
name: 'workflow-group-cell',
359-
concurrencyLimit: 10,
359+
concurrencyLimit: 20,
360360
},
361361
run: (payload: WorkflowGroupCellPayload, { signal }) =>
362362
executeWorkflowGroupCellJob(payload, signal),

apps/sim/lib/table/cell-write.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,25 @@ export async function writeWorkflowGroupState(
5555
return 'wrote'
5656
}
5757
const current = row.executions?.[groupId] as RowExecutionMetadata | undefined
58+
// Stale-worker guard: only blocks writes FROM an old worker (status =
59+
// running / completed / error / pending). A `queued` stamp is the scheduler
60+
// claiming the cell for a brand-new run — the new executionId is supposed
61+
// to overwrite whatever was there. Same for `cancelled` (authoritative).
62+
// Without this carve-out, the new run's stamp gets rejected and the cell
63+
// is stuck in its old state forever.
64+
const isAuthoritativeNewStamp =
65+
payload.executionState.status === 'queued' || payload.executionState.status === 'cancelled'
66+
if (
67+
!isAuthoritativeNewStamp &&
68+
current &&
69+
current.executionId &&
70+
current.executionId !== executionId
71+
) {
72+
logger.info(
73+
`Skipping group write — stale worker (table=${tableId} row=${rowId} group=${groupId} mine=${executionId} active=${current.executionId})`
74+
)
75+
return 'skipped'
76+
}
5877
if (
5978
current?.status === 'cancelled' &&
6079
current.executionId === executionId &&
@@ -66,11 +85,11 @@ export async function writeWorkflowGroupState(
6685
return 'skipped'
6786
}
6887
// Skip writing `cancelled` state with the guard — that's an authoritative
69-
// write from `cancelWorkflowGroupRuns` and must always land. Cell-task
70-
// writes (running/completed/error) get the SQL guard so an in-flight
71-
// partial can't clobber a stop click that already committed.
72-
const cancellationGuard =
73-
payload.executionState.status === 'cancelled' ? undefined : { groupId, executionId }
88+
// write from `cancelWorkflowGroupRuns` and must always land. New `queued`
89+
// stamps from the scheduler also bypass — they ARE the new authority. Cell-
90+
// task writes (running/completed/error) get the SQL guard so an in-flight
91+
// partial can't clobber a stop click or a newer run that already committed.
92+
const cancellationGuard = isAuthoritativeNewStamp ? undefined : { groupId, executionId }
7493
const result = await updateRow(
7594
{
7695
tableId,

apps/sim/lib/table/service.ts

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { userTableDefinitions, userTableRows, workflowExecutionLogs } from '@sim
1212
import { createLogger } from '@sim/logger'
1313
import { getPostgresErrorCode } from '@sim/utils/errors'
1414
import { generateId } from '@sim/utils/id'
15-
import { and, count, eq, gt, gte, inArray, isNull, sql } from 'drizzle-orm'
15+
import { and, count, eq, gt, gte, inArray, isNull, type SQL, sql } from 'drizzle-orm'
1616
import { env } from '@/lib/core/config/env'
1717
import { generateRestoreName } from '@/lib/core/utils/restore-name'
1818
import { getSocketServerUrl } from '@/lib/core/utils/urls'
@@ -1597,6 +1597,35 @@ function applyExecutionsPatch(
15971597
return next
15981598
}
15991599

1600+
/**
1601+
* Builds a SQL expression that applies the given `executionsPatch` to the
1602+
* row's `executions` jsonb in-place — set keys for non-null values, delete
1603+
* keys for `null` values. Returns null when the patch is empty/missing.
1604+
*
1605+
* Why server-side: read-modify-write on the entire jsonb blob races between
1606+
* concurrent writers (e.g., a column edit and a manual-retry stamp), so the
1607+
* last writer wins for keys it didn't touch and clobbers other writers'
1608+
* exec updates. Patching keys at the SQL level keeps each writer's changes
1609+
* atomic per-key.
1610+
*/
1611+
function buildExecutionsSqlPatch(
1612+
patch: Record<string, RowExecutionMetadata | null> | undefined
1613+
): SQL | null {
1614+
if (!patch) return null
1615+
const entries = Object.entries(patch)
1616+
if (entries.length === 0) return null
1617+
1618+
let expr: SQL = sql`coalesce(${userTableRows.executions}, '{}'::jsonb)`
1619+
for (const [gid, value] of entries) {
1620+
if (value === null) {
1621+
expr = sql`(${expr}) - ${gid}::text`
1622+
} else {
1623+
expr = sql`(${expr}) || jsonb_build_object(${gid}::text, ${JSON.stringify(value)}::jsonb)`
1624+
}
1625+
}
1626+
return expr
1627+
}
1628+
16001629
/**
16011630
* Updates a single row.
16021631
*
@@ -1653,26 +1682,46 @@ export async function updateRow(
16531682
const now = new Date()
16541683

16551684
// Cell-task partial writes pass `cancellationGuard` so the SQL update is a
1656-
// no-op when a stop click already wrote `cancelled` for this run between
1657-
// the in-process read and now. Without this, an in-flight `running`
1658-
// partial-write can land after `cancelled` and clobber it.
1685+
// no-op when (a) a stop click already wrote `cancelled` for this run, or
1686+
// (b) a newer run has taken over the cell with a different executionId. The
1687+
// worker is "this run's writes only land if this run is still the active
1688+
// run on the cell." Authoritative cancel writes from `cancelWorkflowGroupRuns`
1689+
// skip the guard entirely (they don't pass `cancellationGuard`).
1690+
//
1691+
// SQL-level for atomicity: an in-process read + update would race a
1692+
// concurrent stop or rerun. The two clauses are joined by AND because
1693+
// either failing means the worker is no longer authoritative.
16591694
const guard = data.cancellationGuard
1660-
// The guard rejects writes only when the DB *already* shows
1661-
// `cancelled` + matching executionId. Wrap the JSON traversals in
1662-
// `IS DISTINCT FROM` so a missing `executions[groupId]` (NULL) cleanly
1663-
// evaluates as "different" — Postgres three-valued logic would otherwise
1664-
// make the whole expression NULL and the UPDATE would mistakenly become
1665-
// a no-op for any row that has no prior execution record.
16661695
const whereClause = guard
16671696
? and(
16681697
eq(userTableRows.id, data.rowId),
1669-
sql`(executions->${guard.groupId}->>'status' IS DISTINCT FROM 'cancelled' OR executions->${guard.groupId}->>'executionId' IS DISTINCT FROM ${guard.executionId})`
1698+
// Reject writes that would land on top of an already-`cancelled` state
1699+
// for this same run. Wrapped in IS DISTINCT FROM so a missing exec
1700+
// (NULL) cleanly evaluates as "different" rather than NULL-poisoning.
1701+
sql`(executions->${guard.groupId}->>'status' IS DISTINCT FROM 'cancelled' OR executions->${guard.groupId}->>'executionId' IS DISTINCT FROM ${guard.executionId})`,
1702+
// Reject writes from a stale worker — the cell's active run has moved
1703+
// on. `OR exec IS NULL` lets the worker land its first `running`
1704+
// stamp on a row that has no prior exec record (initial stamp from
1705+
// the scheduler may not have committed yet).
1706+
sql`(executions->${guard.groupId} IS NULL OR executions->${guard.groupId}->>'executionId' = ${guard.executionId})`
16701707
)
16711708
: eq(userTableRows.id, data.rowId)
16721709

1710+
// Apply the executions patch at the SQL level — we never overwrite the full
1711+
// executions blob, only the keys the caller explicitly patched. Without
1712+
// this, concurrent updateRow calls (e.g., a column edit and a manual
1713+
// retry's stamp) would each compute `mergedExecutions` from their own
1714+
// in-memory snapshot and the last writer wins, clobbering the other's
1715+
// exec keys. The data field still does last-writer-wins because that's
1716+
// the user's edit, but exec records are independently keyed by groupId.
1717+
const executionsExpr = buildExecutionsSqlPatch(data.executionsPatch)
16731718
const updated = await db
16741719
.update(userTableRows)
1675-
.set({ data: mergedData, executions: mergedExecutions, updatedAt: now })
1720+
.set({
1721+
data: mergedData,
1722+
...(executionsExpr ? { executions: executionsExpr } : {}),
1723+
updatedAt: now,
1724+
})
16761725
.where(whereClause)
16771726
.returning({ id: userTableRows.id })
16781727

@@ -1710,7 +1759,7 @@ export async function updateRow(
17101759
notifyTableRowUpdated(data.tableId, updatedRow)
17111760
// Awaited (not `void`) so cell tasks dispatch their cascade before the
17121761
// trigger.dev worker tears down on `run()` resolve.
1713-
await scheduleRunsForRows(table, [updatedRow])
1762+
if (!data.skipScheduler) await scheduleRunsForRows(table, [updatedRow])
17141763

17151764
return updatedRow
17161765
}
@@ -1928,6 +1977,7 @@ export async function batchUpdateRows(
19281977
rowId: string
19291978
mergedData: RowData
19301979
mergedExecutions: RowExecutions
1980+
executionsPatch?: Record<string, RowExecutionMetadata | null>
19311981
}> = []
19321982
for (const update of data.updates) {
19331983
const existing = existingMap.get(update.rowId)!
@@ -1944,7 +1994,12 @@ export async function batchUpdateRows(
19441994
throw new Error(`Row ${update.rowId}: ${schemaValidation.errors.join(', ')}`)
19451995
}
19461996

1947-
mergedUpdates.push({ rowId: update.rowId, mergedData: merged, mergedExecutions })
1997+
mergedUpdates.push({
1998+
rowId: update.rowId,
1999+
mergedData: merged,
2000+
mergedExecutions,
2001+
executionsPatch: update.executionsPatch,
2002+
})
19482003
}
19492004

19502005
const uniqueColumns = getUniqueColumns(table.schema)
@@ -1968,12 +2023,20 @@ export async function batchUpdateRows(
19682023
await setTableTxTimeouts(trx, { statementMs: 60_000 })
19692024
for (let i = 0; i < mergedUpdates.length; i += TABLE_LIMITS.UPDATE_BATCH_SIZE) {
19702025
const batch = mergedUpdates.slice(i, i + TABLE_LIMITS.UPDATE_BATCH_SIZE)
1971-
const updatePromises = batch.map(({ rowId, mergedData, mergedExecutions }) =>
1972-
trx
2026+
// Same as `updateRow`: patch executions at the SQL level when a patch
2027+
// is set, so concurrent writers don't clobber each other's keys via
2028+
// last-writer-wins on the full jsonb blob.
2029+
const updatePromises = batch.map(({ rowId, mergedData, executionsPatch }) => {
2030+
const executionsExpr = buildExecutionsSqlPatch(executionsPatch)
2031+
return trx
19732032
.update(userTableRows)
1974-
.set({ data: mergedData, executions: mergedExecutions, updatedAt: now })
2033+
.set({
2034+
data: mergedData,
2035+
...(executionsExpr ? { executions: executionsExpr } : {}),
2036+
updatedAt: now,
2037+
})
19752038
.where(eq(userTableRows.id, rowId))
1976-
)
2039+
})
19772040
await Promise.all(updatePromises)
19782041
}
19792042
})
@@ -2006,7 +2069,7 @@ export async function batchUpdateRows(
20062069
// so the scheduler's later per-write notifications (pending/running) land
20072070
// last and stick in the client cache.
20082071
for (const row of updatedRowsForTrigger) notifyTableRowUpdated(data.tableId, row)
2009-
void scheduleRunsForRows(table, updatedRowsForTrigger)
2072+
if (!data.skipScheduler) void scheduleRunsForRows(table, updatedRowsForTrigger)
20102073

20112074
return {
20122075
affectedCount: mergedUpdates.length,

apps/sim/lib/table/types.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,14 @@ export interface UpdateRowData {
308308
* state. `updateRow` returns `null` when the guard rejects the write.
309309
*/
310310
cancellationGuard?: { groupId: string; executionId: string }
311+
/**
312+
* When true, the post-write `scheduleRunsForRows` call is skipped. Used by
313+
* the cancel path (which is tearing rows down, not waking them up) and by
314+
* the manual-run path (which fires its own `scheduleRunsForRows` with
315+
* `isManualRun: true` and doesn't want a duplicate auto-fire pass on the
316+
* cleared cells). Default false: every other write fires the reactor.
317+
*/
318+
skipScheduler?: boolean
311319
}
312320

313321
export interface BulkUpdateData {
@@ -326,6 +334,8 @@ export interface BatchUpdateByIdData {
326334
executionsPatch?: Record<string, RowExecutionMetadata | null>
327335
}>
328336
workspaceId: string
337+
/** Same semantics as `UpdateRowData.skipScheduler`. */
338+
skipScheduler?: boolean
329339
}
330340

331341
export interface BulkDeleteData {

apps/sim/lib/table/workflow-columns.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ interface RunGroupCellOptions {
253253
executionId: string
254254
}
255255

256-
/** Per-table concurrency cap. Mirrors trigger.dev's `concurrencyLimit: 10`. */
257-
const TABLE_CONCURRENCY_LIMIT = 10
256+
/** Per-table concurrency cap. Mirrors trigger.dev's `concurrencyLimit: 20`. */
257+
const TABLE_CONCURRENCY_LIMIT = 20
258258

259259
async function stampQueuedOrCancel(
260260
queue: Awaited<ReturnType<typeof getJobQueue>>,
@@ -364,6 +364,11 @@ export async function cancelWorkflowGroupRuns(tableId: string, rowId?: string):
364364
)
365365
)
366366
)
367+
// `skipScheduler: true` — we're tearing rows down, not waking them up. The
368+
// auto-fire reactor would otherwise see independent (row, group) pairs whose
369+
// deps are now satisfied (because the upstream group already wrote its
370+
// output before the cancel) and re-enqueue them, which is exactly what the
371+
// user clicked Stop to prevent.
367372
await Promise.allSettled(
368373
mutations.map((m) =>
369374
updateRow(
@@ -373,6 +378,7 @@ export async function cancelWorkflowGroupRuns(tableId: string, rowId?: string):
373378
data: {},
374379
workspaceId: table.workspaceId,
375380
executionsPatch: m.executionsPatch,
381+
skipScheduler: true,
376382
},
377383
table,
378384
`wfgrp-cancel-${m.rowId}`
@@ -471,7 +477,11 @@ async function runWorkflowGroupsInternal(opts: {
471477

472478
if (updates.length === 0) return { triggered: 0 }
473479

474-
await batchUpdateRows({ tableId, updates, workspaceId }, table, requestId)
480+
// `skipScheduler: true` because we fire `scheduleRunsForRows` ourselves
481+
// below with `isManualRun: true`. Without the skip, batchUpdateRows runs the
482+
// auto-fire reactor first and any autoRun=true sibling group whose deps are
483+
// satisfied would race the manual call.
484+
await batchUpdateRows({ tableId, updates, workspaceId, skipScheduler: true }, table, requestId)
475485

476486
return scheduleRunsForRows(table, clearedRows, { isManualRun: true })
477487
}

0 commit comments

Comments
 (0)