Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f865ed5
feat(tables): paginated background row-delete jobs via table_jobs
TheodoreSpeaks Jun 9, 2026
3866cad
fix(tables): address review on async row-delete (filtered count, scop…
TheodoreSpeaks Jun 9, 2026
b324ac0
Merge remote-tracking branch 'origin/staging' into improvement/table-…
TheodoreSpeaks Jun 9, 2026
03c98b0
improvement(tables): filter-aware select-all runs, delete-job read ma…
TheodoreSpeaks Jun 9, 2026
3f8a67a
feat(tables): run import/delete/export/backfill jobs on trigger.dev w…
TheodoreSpeaks Jun 9, 2026
e46c5b5
improvement(tables): raise delete page to 10k and export batch to 5k
TheodoreSpeaks Jun 9, 2026
e2dafbc
improvement(tables): raise CSV import batch to 5k rows (param-cap bou…
TheodoreSpeaks Jun 9, 2026
f8a2aee
feat(tables): surface export jobs in the header tray with progress, c…
TheodoreSpeaks Jun 9, 2026
1ea5871
improvement(tables): surface exports as derived tables-scoped toasts …
TheodoreSpeaks Jun 9, 2026
cdbf43f
Revert "improvement(tables): surface exports as derived tables-scoped…
TheodoreSpeaks Jun 10, 2026
a1465d7
fix(tables): preserve export storage key (NoSuchKey) and unify jobs i…
TheodoreSpeaks Jun 10, 2026
e9cae45
improvement(tables): jobs tray icon reflects aggregate state (spinner…
TheodoreSpeaks Jun 10, 2026
ef14a38
fix(tables): restore jobs tray on the tables list (dropped in staging…
TheodoreSpeaks Jun 10, 2026
6a9d409
improvement(tables): keyset-paginate export row reads (offset paging …
TheodoreSpeaks Jun 10, 2026
d9d688c
perf(tables): keyset pagination for grid infinite scroll
TheodoreSpeaks Jun 10, 2026
a0be974
fix(tables): show export in job tray immediately on kickoff
TheodoreSpeaks Jun 10, 2026
194a939
fix(tables): surface real row/column write errors in toasts
TheodoreSpeaks Jun 10, 2026
f9d8e90
Merge remote-tracking branch 'origin/staging' into improvement/table-…
TheodoreSpeaks Jun 10, 2026
38f9470
perf(tables): tenant-bound filtered row counts (12.7s -> 0.6s)
TheodoreSpeaks Jun 10, 2026
1fd7368
perf(tables): tenant-bound Cmd+F search and stream its window (75s ->…
TheodoreSpeaks Jun 10, 2026
5464ce7
perf(tables): tenant-bound sorted pages and filtered write selections
TheodoreSpeaks Jun 10, 2026
30b950d
perf(tables): tenant-scoped containment index (migration 0232)
TheodoreSpeaks Jun 10, 2026
2f44ac0
perf(tables): tenant-bound unique-constraint checks (3.5s -> <1s per …
TheodoreSpeaks Jun 10, 2026
d0abf3e
perf(tables): tenant-bound upsert conflict lookup
TheodoreSpeaks Jun 10, 2026
d339c0e
refactor(tables): consolidate executor types onto planner exports
TheodoreSpeaks Jun 10, 2026
6849408
fix(tests): drop narrow schema mock override in process-contents test
TheodoreSpeaks Jun 10, 2026
fb3c6c6
fix(tables): scope cancels and counts to the filtered selection (review)
TheodoreSpeaks Jun 10, 2026
93ed482
chore: retrigger CI (Actions dropped the previous push events)
TheodoreSpeaks Jun 10, 2026
3a41a03
Merge remote-tracking branch 'origin/staging' into improvement/table-…
TheodoreSpeaks Jun 10, 2026
3a75b9c
chore: bump api-validation route baseline to 807 (staging route + merge)
TheodoreSpeaks Jun 10, 2026
4954807
fix(tables): release job claim when trigger.dev dispatch fails
TheodoreSpeaks Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { asyncJobs, db } from '@sim/db'
import { userTableDefinitions, workflowExecutionLogs } from '@sim/db/schema'
import { tableJobs, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
Expand All @@ -8,12 +8,15 @@ import { verifyCronAuth } from '@/lib/auth/internal'
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { deleteFile } from '@/lib/uploads/core/storage-service'

const logger = createLogger('CleanupStaleExecutions')

const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000
const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000)
const MAX_INT32 = 2_147_483_647
/** Terminal table-jobs older than this are pruned; only the latest job per table is ever read. */
const TABLE_JOB_RETENTION_HOURS = 24

export const GET = withRouteHandler(async (request: NextRequest) => {
try {
Expand Down Expand Up @@ -110,33 +113,56 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
})
}

// Mark stale table imports as failed. Imports run detached on the web container and
// are lost if the pod is killed mid-load. `updatedAt` is bumped by progress updates, so
// an `importing` table with no recent update has stalled (not merely slow). Rows are
// left in place (no rollback); the user re-imports.
// Mark stale table jobs (import or delete) as failed. Jobs run detached on the web container
// and are lost if the pod is killed mid-run. `updated_at` is bumped by progress updates, so a
// `running` job with no recent update has stalled (not merely slow). Committed work is left in
// place (no rollback); the user retries. Also prune long-settled terminal jobs so the table
// doesn't grow unbounded (the latest job per table is what list/detail reads surface).
let staleImportsMarkedFailed = 0
try {
const now = new Date()
const staleImports = await db
.update(userTableDefinitions)
.update(tableJobs)
.set({
importStatus: 'failed',
importError: `Import terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
updatedAt: new Date(),
status: 'failed',
error: `Job terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
completedAt: now,
updatedAt: now,
})
.where(and(eq(tableJobs.status, 'running'), lt(tableJobs.updatedAt, staleThreshold)))
Comment thread
cursor[bot] marked this conversation as resolved.
.returning({ id: tableJobs.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table jobs as failed`)
}

const terminalRetention = new Date(Date.now() - TABLE_JOB_RETENTION_HOURS * 60 * 60 * 1000)
const pruned = await db
.delete(tableJobs)
.where(
and(
eq(userTableDefinitions.importStatus, 'importing'),
lt(userTableDefinitions.updatedAt, staleThreshold)
inArray(tableJobs.status, ['ready', 'failed', 'canceled']),
lt(tableJobs.updatedAt, terminalRetention)
)
)
.returning({ id: userTableDefinitions.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table imports as failed`)
.returning({ type: tableJobs.type, payload: tableJobs.payload })

// Pruned export jobs carry the generated file's storage key — delete the file with the job
// so the exports prefix doesn't accumulate. Best-effort: a miss just orphans one object.
for (const job of pruned) {
if (job.type !== 'export') continue
const resultKey = (job.payload as { resultKey?: string } | null)?.resultKey
if (!resultKey) continue
await deleteFile({ key: resultKey, context: 'workspace' }).catch((err) => {
logger.warn('Failed to delete pruned export file', {
resultKey,
error: toError(err).message,
})
})
}
} catch (error) {
logger.error('Failed to clean up stale table imports:', {
logger.error('Failed to clean up stale table jobs:', {
error: toError(error).message,
})
}
Expand Down
11 changes: 8 additions & 3 deletions apps/sim/app/api/table/[tableId]/cancel-runs/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { cancelWorkflowGroupRuns } from '@/lib/table/workflow-columns'
import { accessError, checkAccess } from '@/app/api/table/utils'
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'

const logger = createLogger('TableCancelRunsAPI')

Expand All @@ -32,7 +32,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const parsed = await parseRequest(cancelTableRunsContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, scope, rowId } = parsed.data.body
const { workspaceId, scope, rowId, filter } = parsed.data.body

const result = await checkAccess(tableId, authResult.userId, 'write')
if (!result.ok) return accessError(result, requestId, tableId)
Expand All @@ -42,7 +42,12 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
}

const cancelled = await cancelWorkflowGroupRuns(tableId, scope === 'row' ? rowId : undefined)
const filterError = tableFilterError(filter, table.schema.columns)
if (filterError) return filterError

const cancelled = await cancelWorkflowGroupRuns(tableId, scope === 'row' ? rowId : undefined, {
filter,
})
logger.info(
`[${requestId}] cancel-runs: tableId=${tableId} scope=${scope}${
rowId ? ` rowId=${rowId}` : ''
Expand Down
64 changes: 33 additions & 31 deletions apps/sim/app/api/table/[tableId]/columns/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
updateColumnConstraints,
updateColumnType,
} from '@/lib/table'
import { accessError, checkAccess, normalizeColumn } from '@/app/api/table/utils'
import { accessError, checkAccess, normalizeColumn, rootErrorMessage } from '@/app/api/table/utils'

const logger = createLogger('TableColumnsAPI')

Expand Down Expand Up @@ -63,13 +63,17 @@ export const POST = withRouteHandler(async (request: NextRequest, context: Colum
return validationErrorResponse(error, 'Invalid request data')
}

if (error instanceof Error) {
if (error.message.includes('already exists') || error.message.includes('maximum column')) {
return NextResponse.json({ error: error.message }, { status: 400 })
}
if (error.message === 'Table not found') {
return NextResponse.json({ error: error.message }, { status: 404 })
}
const msg = rootErrorMessage(error)
if (
msg.includes('already exists') ||
msg.includes('maximum column') ||
msg.includes('Invalid column') ||
msg.includes('exceeds maximum')
) {
return NextResponse.json({ error: msg }, { status: 400 })
}
if (msg === 'Table not found') {
return NextResponse.json({ error: msg }, { status: 404 })
}

logger.error(`[${requestId}] Error adding column to table ${tableId}:`, error)
Expand Down Expand Up @@ -146,22 +150,21 @@ export const PATCH = withRouteHandler(async (request: NextRequest, context: Colu
return validationErrorResponse(error, 'Invalid request data')
}

if (error instanceof Error) {
const msg = error.message
if (msg.includes('not found') || msg.includes('Table not found')) {
return NextResponse.json({ error: msg }, { status: 404 })
}
if (
msg.includes('already exists') ||
msg.includes('Cannot delete the last column') ||
msg.includes('Cannot set column') ||
msg.includes('Invalid column') ||
msg.includes('exceeds maximum') ||
msg.includes('incompatible') ||
msg.includes('duplicate')
) {
return NextResponse.json({ error: msg }, { status: 400 })
}
const msg = rootErrorMessage(error)
if (msg.includes('not found') || msg.includes('Table not found')) {
return NextResponse.json({ error: msg }, { status: 404 })
}
if (
msg.includes('already exists') ||
msg.includes('Cannot delete the last column') ||
msg.includes('Cannot set column') ||
msg.includes('Cannot set unique column') ||
msg.includes('Invalid column') ||
msg.includes('exceeds maximum') ||
msg.includes('incompatible') ||
msg.includes('duplicate')
) {
return NextResponse.json({ error: msg }, { status: 400 })
}

logger.error(`[${requestId}] Error updating column in table ${tableId}:`, error)
Expand Down Expand Up @@ -211,13 +214,12 @@ export const DELETE = withRouteHandler(
return validationErrorResponse(error, 'Invalid request data')
}

if (error instanceof Error) {
if (error.message.includes('not found') || error.message === 'Table not found') {
return NextResponse.json({ error: error.message }, { status: 404 })
}
if (error.message.includes('Cannot delete') || error.message.includes('last column')) {
return NextResponse.json({ error: error.message }, { status: 400 })
}
const msg = rootErrorMessage(error)
if (msg.includes('not found') || msg === 'Table not found') {
return NextResponse.json({ error: msg }, { status: 404 })
}
if (msg.includes('Cannot delete') || msg.includes('last column')) {
return NextResponse.json({ error: msg }, { status: 400 })
}

logger.error(`[${requestId}] Error deleting column from table ${tableId}:`, error)
Expand Down
9 changes: 7 additions & 2 deletions apps/sim/app/api/table/[tableId]/columns/run/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { runWorkflowColumn } from '@/lib/table/workflow-columns'
import { accessError, checkAccess } from '@/app/api/table/utils'
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'

const logger = createLogger('TableRunColumnAPI')

Expand All @@ -25,16 +25,21 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const parsed = await parseRequest(runColumnContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, groupIds, runMode, rowIds, limit } = parsed.data.body
const { workspaceId, groupIds, runMode, rowIds, filter, limit } = parsed.data.body
const access = await checkAccess(tableId, auth.userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)

// Validate the filter up front (the dispatcher reuses it) so a bad field fails fast.
const filterError = tableFilterError(filter, access.table.schema.columns)
if (filterError) return filterError

const { dispatchId } = await runWorkflowColumn({
tableId,
workspaceId,
groupIds,
mode: runMode,
rowIds,
filter,
limit,
requestId,
triggeredByUserId: auth.userId,
Expand Down
Loading
Loading