Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { verifyCronAuth } from '@/lib/auth/internal'
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { workflowMetrics } from '@/lib/monitoring/metrics'

const logger = createLogger('CleanupStaleExecutions')

Expand All @@ -32,6 +33,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
executionId: workflowExecutionLogs.executionId,
workflowId: workflowExecutionLogs.workflowId,
startedAt: workflowExecutionLogs.startedAt,
trigger: workflowExecutionLogs.trigger,
})
.from(workflowExecutionLogs)
.where(
Expand All @@ -53,7 +55,10 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
const staleDurationMinutes = Math.round(staleDurationMs / 60000)
const totalDurationMs = Math.min(staleDurationMs, MAX_INT32)

await db
// Conditional on status='running' so a worker that completes between the
// select and this update keeps its own terminal status (and its own
// ExecutionCompleted point) — no force-fail overwrite, no double count.
const transitioned = await db
.update(workflowExecutionLogs)
.set({
status: 'failed',
Expand All @@ -65,13 +70,33 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
to_jsonb(${`Execution terminated: worker timeout or crash after ${staleDurationMinutes} minutes`}::text)
)`,
})
.where(eq(workflowExecutionLogs.id, execution.id))
.where(
and(
eq(workflowExecutionLogs.id, execution.id),
eq(workflowExecutionLogs.status, 'running')
)
)
.returning({ id: workflowExecutionLogs.id })

if (transitioned.length === 0) {
logger.info(`Skipped stale execution ${execution.executionId}: no longer running`, {
workflowId: execution.workflowId,
})
continue
}

logger.info(`Cleaned up stale execution ${execution.executionId}`, {
workflowId: execution.workflowId,
staleDurationMinutes,
})

// Crashed workers never reach a LoggingSession completion path, so this
// is the only place these failures can be counted toward the error rate.
workflowMetrics.recordExecutionCompleted({
trigger: execution.trigger,
status: 'failed',
})
Comment thread
TheodoreSpeaks marked this conversation as resolved.

cleaned++
} catch (error) {
logger.error(`Failed to clean up execution ${execution.executionId}:`, {
Expand Down
18 changes: 18 additions & 0 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { redactApiKeys } from '@/lib/core/security/redaction'
import { normalizeStringArray } from '@/lib/core/utils/arrays'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { compactExecutionPayload } from '@/lib/execution/payloads/serializer'
import { workflowMetrics } from '@/lib/monitoring/metrics'
import {
containsUserFileWithMetadata,
hydrateUserFilesWithBase64,
Expand Down Expand Up @@ -239,6 +240,7 @@ export class BlockExecutor {
if (normalizedOutput.childTraceSpans && Array.isArray(normalizedOutput.childTraceSpans)) {
blockLog.childTraceSpans = normalizedOutput.childTraceSpans
}
this.recordBlockMetric(block, true, duration)
}

const { childTraceSpans: _traces, ...outputForState } = normalizedOutput
Expand Down Expand Up @@ -284,6 +286,21 @@ export class BlockExecutor {
}
}

private recordBlockMetric(block: SerializedBlock, success: boolean, durationMs: number): void {
const operation = block.config?.params?.operation
workflowMetrics.recordBlockExecuted({
blockType: block.metadata?.id || 'unknown',
// Operation is user-configured; only emit registry-style identifiers so
// dynamic values can't explode CloudWatch dimension cardinality.
operation:
typeof operation === 'string' && /^[a-zA-Z0-9_-]{1,64}$/.test(operation)
? operation
: undefined,
success,
durationMs,
})
}

private buildNodeMetadata(node: DAGNode): WorkflowNodeMetadata {
const metadata = node?.metadata ?? {}
return {
Expand Down Expand Up @@ -371,6 +388,7 @@ export class BlockExecutor {
if (ChildWorkflowError.isChildWorkflowError(error) && error.childTraceSpans.length > 0) {
blockLog.childTraceSpans = error.childTraceSpans
}
this.recordBlockMetric(block, false, duration)
}

this.execLogger.error(
Expand Down
148 changes: 148 additions & 0 deletions apps/sim/lib/logs/execution/logging-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,24 @@ const {
completeWorkflowExecutionMock,
startWorkflowExecutionMock,
loadWorkflowStateForExecutionMock,
recordExecutionStartedMock,
recordExecutionCompletedMock,
recordExecutionPausedMock,
} = vi.hoisted(() => ({
completeWorkflowExecutionMock: vi.fn(),
startWorkflowExecutionMock: vi.fn(),
loadWorkflowStateForExecutionMock: vi.fn(),
recordExecutionStartedMock: vi.fn(),
recordExecutionCompletedMock: vi.fn(),
recordExecutionPausedMock: vi.fn(),
}))

vi.mock('@/lib/monitoring/metrics', () => ({
workflowMetrics: {
recordExecutionStarted: recordExecutionStartedMock,
recordExecutionCompleted: recordExecutionCompletedMock,
recordExecutionPaused: recordExecutionPausedMock,
},
}))

vi.mock('@sim/db', () => ({
Expand Down Expand Up @@ -613,6 +627,7 @@ describe('completeWithError cancelled-status guard', () => {
describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => {
beforeEach(() => {
vi.clearAllMocks()
dbMocks.selectLimit.mockResolvedValue([{ status: 'running', trigger: 'api' }])
dbMocks.updateWhere.mockResolvedValue(undefined)
})

Expand Down Expand Up @@ -648,3 +663,136 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => {
expect(combined).toContain('force_failed')
})
})

describe('LoggingSession workflow metrics', () => {
beforeEach(() => {
vi.clearAllMocks()
startWorkflowExecutionMock.mockResolvedValue({})
completeWorkflowExecutionMock.mockResolvedValue({})
loadWorkflowStateForExecutionMock.mockResolvedValue({
blocks: {},
edges: [],
loops: {},
parallels: {},
})
dbMocks.selectLimit.mockResolvedValue([{ status: 'running', trigger: 'api' }])
dbMocks.execute.mockResolvedValue(undefined)
})

it('emits ExecutionStarted on start and not on resume', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.start({ workspaceId: 'ws-1' })
expect(recordExecutionStartedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionStartedMock).toHaveBeenCalledWith({ trigger: 'api' })

recordExecutionStartedMock.mockClear()
const resumeSession = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await resumeSession.start({ workspaceId: 'ws-1', skipLogCreation: true })
expect(recordExecutionStartedMock).not.toHaveBeenCalled()
})

it('emits a success completion with trigger and duration', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'webhook', 'req-1')
await session.complete({ totalDurationMs: 500 })

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'webhook',
status: 'success',
durationMs: 500,
})
})

it('emits a failed completion via completeWithError', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'schedule', 'req-1')
await session.completeWithError({ totalDurationMs: 250, error: { message: 'boom' } })

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'schedule',
status: 'failed',
durationMs: 250,
})
})

it('emits a cancelled completion via completeWithCancellation', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'manual', 'req-1')
await session.completeWithCancellation({ totalDurationMs: 100 })

expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'manual',
status: 'cancelled',
durationMs: 100,
})
})

it('emits ExecutionPaused (not a completion) on pause, then failed if markAsFailed follows', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.completeWithPause({ totalDurationMs: 100 })

expect(recordExecutionPausedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionPausedMock).toHaveBeenCalledWith({ trigger: 'api' })
expect(recordExecutionCompletedMock).not.toHaveBeenCalled()

await session.markAsFailed('pause persistence failed')
expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'api',
status: 'failed',
durationMs: undefined,
})
})

it('does not double-emit when markAsFailed runs after a completed session', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.complete({ totalDurationMs: 500 })

dbMocks.selectLimit.mockResolvedValue([{ status: 'completed', trigger: 'api' }])
await session.markAsFailed('timeout')

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'api',
status: 'success',
durationMs: 500,
})
})

it('emits exactly one completion when the primary write fails and the fallback succeeds', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
completeWorkflowExecutionMock
.mockRejectedValueOnce(new Error('finalize failed'))
.mockResolvedValueOnce({})

await session.safeCompleteWithError({ error: { message: 'boom' } })

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith(
expect.objectContaining({ trigger: 'api', status: 'failed' })
)
})

it('static markExecutionAsFailed emits failed only for non-terminal rows', async () => {
dbMocks.selectLimit.mockResolvedValue([{ status: 'running', trigger: 'webhook' }])
await LoggingSession.markExecutionAsFailed('exec-1', 'crash', undefined, 'wf-1')

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'webhook',
status: 'failed',
})

recordExecutionCompletedMock.mockClear()
dbMocks.selectLimit.mockResolvedValue([{ status: 'failed', trigger: 'webhook' }])
await LoggingSession.markExecutionAsFailed('exec-1', 'crash again', undefined, 'wf-1')
expect(recordExecutionCompletedMock).not.toHaveBeenCalled()
})

it('skips the completion metric when the run was already cancelled elsewhere', async () => {
dbMocks.selectLimit.mockResolvedValue([{ status: 'cancelled' }])
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.completeWithError({ error: { message: 'boom' } })

expect(recordExecutionCompletedMock).not.toHaveBeenCalled()
})
})
Loading
Loading