fix workflow system so scheduled triggers run on latest code#2706
fix workflow system so scheduled triggers run on latest code#2706shagun-singh-inkeep wants to merge 23 commits intomainfrom
Conversation
🦋 Changeset detectedLatest commit: 35bb793 The changes in this PR will be included in the next version bump. This PR includes changesets to release 10 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
TL;DR — Replaces the per-trigger daisy-chaining workflow architecture with a single centralized scheduler workflow that polls every 60 seconds, dispatches due triggers as independent one-shot workflows, and stores Key changes
Summary | 34 files | 9 commits | base: Centralized scheduler workflow and trigger dispatcher
The dispatcher atomically advances
|
| Migration | DB | Change |
|---|---|---|
0013_lumpy_apocalypse.sql |
manage (Doltgres) | ALTER TABLE scheduled_triggers ADD COLUMN next_run_at timestamptz |
0023_broad_sharon_ventura.sql |
runtime (Postgres) | CREATE TABLE scheduler_state (singleton) |
manage-schema.ts · computeNextRunAt.ts · scheduledTriggers.ts (DAL)
One-shot scheduledTriggerRunnerWorkflow
Before: The runner was a long-lived daisy-chaining workflow: check trigger → compute next time → create invocation → sleep → post-sleep re-check → execute → chain to next iteration. It handled adoption, supersession, and cancellation mid-sleep.
After: The runner is a stateless one-shot: check trigger enabled → create idempotent invocation → retry loop with cancellation checks → mark completed or failed → exit.
All scheduling concerns (when to run, what's due) are now the dispatcher's responsibility. The runner only needs a TriggerPayload with scheduledFor and focuses purely on execution.
scheduledTriggerRunner.ts · scheduledTriggerSteps.ts
Deploy restart endpoint and CI integration
Before: No mechanism to move scheduled trigger workflows to a new deployment; they continued running on the old instance until they naturally chained.
After:POST /api/deploy/restart-schedulerstarts a fresh scheduler workflow, superseding the old one. The Vercel production workflow calls this after promote + deploy.
The endpoint uses constant-time comparison of INKEEP_AGENTS_RUN_API_BYPASS_SECRET for auth and is registered with noAuth() / security: [] in the OpenAPI spec.
restartScheduler.ts · vercel-production.yml
There was a problem hiding this comment.
Significant architectural improvement — moving from per-trigger daisy-chaining workflows to a centralized scheduler with next_run_at on the manage table is a cleaner model. The supersession mechanism and one-shot runner design are well thought out.
There are a few issues to address before merging, roughly in priority order:
- Reconciliation
checkis broken —listEnabledScheduledTriggersonly selects{ id, name }, so(t as any).nextRunAtis alwaysundefinedand every enabled trigger will be flagged as missing. agentFullandprojectFullcreate paths don't setnextRunAt— triggers created through these bulk routes will sit dormant until reconciliation detects them.- Crash between advance-and-dispatch loses one-time triggers — if the process dies after
advanceScheduledTriggerNextRunAtcommits but beforestart(workflow)executes, one-time triggers are permanently disabled with no execution. as anycasts —nextRunAtis omitted fromScheduledTriggerInsertSchemabut passed at the call site, causing multipleas anycasts. Clean fix: makenextRunAtan accepted (optional) field in the insert schema.- Security nits on the deploy endpoint — timing-safe comparison leaks secret length; error responses expose
err.message.
|
|
||
| const missingWorkflows = enabledTriggers | ||
| .filter((t) => !workflowsByTriggerId.has(t.id)) | ||
| .filter((t) => !(t as any).nextRunAt) |
There was a problem hiding this comment.
Bug: listEnabledScheduledTriggers (in audit-queries.ts) only selects { id, name } — nextRunAt is never present on the returned objects. This means !(t as any).nextRunAt is always true, and every enabled trigger will be reported as missing.
Fix: add nextRunAt: scheduledTriggers.nextRunAt to the select() in listEnabledScheduledTriggers, then remove this as any cast.
| orphanedWorkflows: [], | ||
| staleWorkflows: [], | ||
| deadWorkflows: [], | ||
| verificationFailures: [], |
There was a problem hiding this comment.
These four fields (orphanedWorkflows, staleWorkflows, deadWorkflows, verificationFailures) are now hardcoded empty arrays. Consider updating ScheduledTriggerAuditResult to remove or mark them optional — returning dead-letter fields that can never be populated adds noise.
| nextRunAt, | ||
| } as any); |
There was a problem hiding this comment.
The as any cast is needed because nextRunAt is omitted from ScheduledTriggerInsertSchemaBase. Since the create route now always computes and passes nextRunAt, the insert type should accept it.
Fix: remove nextRunAt from the .omit() in ScheduledTriggerInsertSchemaBase (or add it as .optional()) so the DAL function accepts it without a cast.
| const mergedEnabled = body.enabled !== undefined ? body.enabled : existing.enabled; | ||
| const enabledChanged = body.enabled !== undefined && body.enabled !== existing.enabled; | ||
|
|
||
| let nextRunAt: string | null | undefined; | ||
| if (!mergedEnabled) { | ||
| nextRunAt = null; | ||
| } else if (scheduleChanged || enabledChanged) { | ||
| const mergedCron = | ||
| body.cronExpression !== undefined ? body.cronExpression : existing.cronExpression; | ||
| const mergedTimezone = | ||
| body.cronTimezone !== undefined ? body.cronTimezone : existing.cronTimezone; | ||
| const mergedRunAt = body.runAt !== undefined ? body.runAt : existing.runAt; | ||
| nextRunAt = computeNextRunAt({ | ||
| cronExpression: mergedCron, | ||
| cronTimezone: mergedTimezone, | ||
| runAt: mergedRunAt, | ||
| }); | ||
| } |
There was a problem hiding this comment.
nextRunAt is only recomputed when scheduleChanged || enabledChanged. If only payload or messageTemplate changes, nextRunAt is left unchanged — that's correct.
However, the enabled → disabled transition sets nextRunAt = null here, but onTriggerUpdated no longer cancels pending invocations for that case (the old Case 2 was removed from ScheduledTriggerService.ts). Already-queued pending or running invocations will continue executing even though the user disabled the trigger. Consider adding cancelPendingInvocationsForTrigger to the enabled→disabled path, either here or in onTriggerUpdated.
| await withRef( | ||
| manageDbPool, | ||
| resolvedRef, | ||
| (db) => | ||
| advanceScheduledTriggerNextRunAt(db)({ | ||
| scopes: { tenantId, projectId, agentId }, | ||
| scheduledTriggerId, | ||
| nextRunAt, | ||
| enabled: isOneTime ? false : undefined, | ||
| }), | ||
| { commit: true, commitMessage: `Advance next_run_at for trigger ${scheduledTriggerId}` } | ||
| ); |
There was a problem hiding this comment.
Risk: crash-window between advance and dispatch. If the process dies after this advanceScheduledTriggerNextRunAt commit but before start(scheduledTriggerRunnerWorkflow) on line 100, nextRunAt is already advanced but no workflow was started. The rollback on line 102 only runs if start() throws, not on a process crash.
- Cron triggers: miss one execution (next tick computes a new
nextRunAt) — acceptable. - One-time triggers: permanently disabled (
enabled=false,nextRunAt=null) with no execution — data loss.
Consider reversing the order: start the workflow first (idempotent via createInvocationIdempotentStep), then advance nextRunAt. If advance fails, the trigger is picked up again next tick; the idempotency key prevents double-execution.
| await upsertSchedulerState(runDbClient)({ | ||
| currentRunId: run.runId, | ||
| deploymentId: getDeploymentId(), | ||
| }); |
There was a problem hiding this comment.
Minor race: this upsertSchedulerState call is redundant with registerSchedulerStep inside the workflow. Both write currentRunId to the same singleton row. If the deploy endpoint is called twice in quick succession (e.g. CI retry), you can get:
- Call 1:
start()→ run C - Call 2:
start()→ run D - Call 1:
upsertSchedulerState(C) - Call 2:
upsertSchedulerState(D) - Workflow C:
registerSchedulerStep(C)— overwrites D
Now both C and D think they're current. Consider removing this outer upsertSchedulerState and relying solely on registerSchedulerStep inside the workflow.
| function constantTimeEqual(a: string, b: string): boolean { | ||
| const bufA = Buffer.from(a); | ||
| const bufB = Buffer.from(b); | ||
| if (bufA.length !== bufB.length) return false; | ||
| return timingSafeEqual(bufA, bufB); |
There was a problem hiding this comment.
The early return false on length mismatch exits faster than timingSafeEqual, letting an attacker binary-search the secret's byte length via response timing. Low-severity in practice (network jitter dominates), but cheap to fix:
import { createHash, timingSafeEqual } from 'node:crypto';
function constantTimeEqual(a: string, b: string): boolean {
const hash = (s: string) => createHash('sha256').update(s).digest();
return timingSafeEqual(hash(a), hash(b));
}| { error: err instanceof Error ? err.message : String(err) }, | ||
| 'Failed to restart scheduler workflow' | ||
| ); | ||
| return c.json({ error: err instanceof Error ? err.message : 'Internal error' }, 500); |
There was a problem hiding this comment.
err.message is returned to the caller, potentially leaking internal details (DB connection strings, file paths). Since the caller is CI, return a generic message and rely on the server-side log (line 53) for debugging:
return c.json({ error: 'Failed to restart scheduler workflow' }, 500);| const rows = await db.execute( | ||
| sql`SELECT id, tenant_id, project_id, agent_id, | ||
| cron_expression, cron_timezone, run_at, | ||
| next_run_at, enabled | ||
| FROM scheduled_triggers AS OF ${sql.raw(`'${branchName}'`)} |
There was a problem hiding this comment.
branchName is interpolated unescaped into the query via sql.raw. It's constructed from tenantId/projectId via getProjectScopedRef (simple concatenation, no sanitization). The values come from the runtime DB so they're trusted internal data — not a regression since this pattern exists elsewhere in the dolt module — but worth hardening. Consider a shared helper that validates branch names for AS OF clauses (e.g., reject values containing ').
| }): Promise<DueScheduledTrigger[]> => { | ||
| const allDue: DueScheduledTrigger[] = []; | ||
|
|
||
| for (const project of params.projects) { |
There was a problem hiding this comment.
This loops over every project one-at-a-time with a separate SQL query per project. Fine for small deployments, but could become a bottleneck at scale (N round-trips to Doltgres). Consider adding a log/metric for the iteration count so you can detect when this becomes slow.
There was a problem hiding this comment.
PR Review Summary
(4) Total Issues | Risk: High
This is a delta review covering 3 commits since the last automated review. The delta addresses several prior issues but leaves critical blocking items unresolved.
✅ Issues Fixed in Delta
| Prior Issue | Status | Evidence |
|---|---|---|
Reconciliation check broken (nextRunAt missing in select) |
✅ Fixed | audit-queries.ts:18 now selects nextRunAt, as any cast removed |
| Crash between advance-and-dispatch loses one-time triggers | ✅ Fixed | triggerDispatcher.ts:84 now starts workflow before advancing |
No test coverage for computeNextRunAt |
✅ Fixed | 136 lines of tests added |
No test coverage for triggerDispatcher |
✅ Fixed | 240 lines of tests added |
| Timing-safe comparison leaks secret length | ✅ Fixed | restartScheduler.ts:11-14 now uses SHA256 hash comparison |
Error response exposes err.message |
✅ Fixed | restartScheduler.ts:54 now returns generic error |
🔴❗ Critical (1) ❗🔴
🔴 1) 0013_lumpy_apocalypse.sql:1 Missing data migration for existing enabled triggers
Issue: The migration adds a nullable next_run_at column but does NOT backfill existing enabled triggers. All currently-enabled triggers will have next_run_at = NULL after migration.
Why: The scheduler workflow at findDueScheduledTriggersAcrossProjects only dispatches triggers where next_run_at IS NOT NULL AND next_run_at <= now(). Existing enabled triggers will silently stop running after deploy. This is a one-way door causing production outages for customers relying on scheduled triggers.
Fix: Add a data migration to backfill existing triggers:
-- After the ALTER TABLE, add:
UPDATE scheduled_triggers
SET next_run_at = NOW()
WHERE enabled = true AND next_run_at IS NULL;Or implement startup reconciliation that calls computeNextRunAt for any enabled trigger with NULL next_run_at.
Refs:
- findDueScheduledTriggersAcrossProjects:233 — WHERE clause filters on
next_run_at
🟠⚠️ Major (2) 🟠⚠️
🟠 1) agentFull.ts + projectFull.ts Bulk routes don't compute nextRunAt for new triggers
Issue: Triggers created via createFullAgentServerSide and createFullProjectServerSide (the PUT/PATCH bulk routes) call upsertScheduledTrigger without computing nextRunAt. These triggers will have nextRunAt = NULL and won't be dispatched.
Why: SDK push commands and bulk imports use these routes. Triggers will appear enabled in the UI but will never execute until manually updated via the individual trigger PATCH endpoint.
Fix: Compute nextRunAt before upserting in the DAL functions, following the pattern at scheduledTriggers.ts:374-380:
const nextRunAt = enabled
? computeNextRunAt({ cronExpression, cronTimezone, runAt })
: null;Refs:
- scheduledTriggers.ts:374-380 — correct pattern for individual create
🟠 2) triggerDispatcher.ts:44 Unbounded concurrent dispatches
Issue: All due triggers are dispatched in parallel via Promise.allSettled with no concurrency limit. If many triggers become due simultaneously (scheduler outage recovery, popular cron times), this could spawn hundreds of concurrent workflow starts.
Why: Risks connection pool exhaustion, workflow engine overload, and cascading failures during recovery scenarios.
Fix: Add concurrency limiting:
import pLimit from 'p-limit';
const limit = pLimit(10);
const results = await Promise.allSettled(
dueTriggers.map((trigger) => limit(() => dispatchSingleTrigger(trigger)))
);Refs:
🟡 Minor (1) 🟡
Inline Comments:
- 🟡 Minor:
triggerDispatcher.ts:104Error log missing correlation context
💭 Consider (2) 💭
Inline Comments:
- 💭 Consider:
computeNextRunAt.test.ts:104-126DST tests use weak assertions - 💭 Consider:
triggerDispatcher.test.ts:188Missing test for advance-failure scenario
🕐 Pending Recommendations (5)
Prior issues from pullfrog and earlier claude review that remain unresolved:
- 🔴 Missing data migration — triggers with NULL
next_run_at(pullfrog #1) - 🟠 Bulk routes don't set nextRunAt — agentFull/projectFull (pullfrog #2)
- 🟠 Unbounded concurrent dispatches — resource exhaustion risk (claude)
- 🟡 Remaining
as anycast —scheduledTriggers.tsupdate path (pullfrog #4) - 🟡 Reconciliation types weakened — optional fields hide missing implementation
🚫 REQUEST CHANGES
Summary: Good progress on the delta — 6 of 9 prior issues have been addressed, including critical fixes to the reconciliation check, dispatch ordering, test coverage, and security. However, the most critical issue remains unresolved: the data migration that will cause all existing scheduled triggers to silently stop working after deploy. This must be addressed before merge.
Secondary priority: the bulk routes (agentFull/projectFull) still don't compute nextRunAt, which will cause SDK-created triggers to be dormant.
Discarded (3)
| Location | Issue | Reason Discarded |
|---|---|---|
triggerDispatcher.ts:108 |
Success log emitted even when advance fails | Acceptable — log clearly says "dispatched" which is accurate; advance failure is logged separately |
computeNextRunAt.test.ts |
No test for invalid cron expressions | Low priority — invalid crons should be caught at trigger creation validation |
triggerDispatcher.test.ts |
No test for scheduledFor fallback |
Low priority — simple fallback unlikely to regress |
Reviewers (4)
| Reviewer | Returned | Main Findings | Consider | While You're Here | Inline Comments | Pending Recs | Discarded |
|---|---|---|---|---|---|---|---|
pr-review-standards |
0 | 0 | 0 | 0 | 0 | 0 | 0 |
pr-review-tests |
5 | 0 | 1 | 0 | 1 | 0 | 2 |
pr-review-sre |
4 | 1 | 0 | 0 | 1 | 1 | 1 |
pr-review-breaking-changes |
3 | 2 | 0 | 0 | 0 | 2 | 0 |
| Total | 12 | 3 | 1 | 0 | 2 | 3 | 3 |
Note: Many findings were deduplicated with prior reviews (pullfrog, earlier claude). Delta-focused review intentionally narrow.
| logger.error( | ||
| { scheduledTriggerId, err }, | ||
| 'Failed to advance next_run_at after workflow start; next tick will retry (idempotent)' | ||
| ); |
There was a problem hiding this comment.
🟡 Minor: Misleading "idempotent" comment
Issue: The comment claims "next tick will retry (idempotent)" but the idempotency guarantee is at the workflow layer (via idempotencyKey = sched_{triggerId}_{scheduledFor} in scheduledTriggerRunner.ts), not the dispatcher layer. If advance fails, the next tick will start another workflow instance that must detect the duplicate via createInvocationIdempotentStep.
Why: During incident triage, this comment could mislead engineers into thinking the dispatcher itself prevents duplicates, when it actually relies on downstream workflow-level deduplication.
Fix: Clarify the comment:
| ); | |
| logger.error( | |
| { scheduledTriggerId, err }, | |
| 'Failed to advance next_run_at after workflow start; next tick will re-dispatch (workflow has invocation-level idempotency)' | |
| ); |
Refs:
| } catch (err) { | ||
| logger.error( | ||
| { scheduledTriggerId, err }, | ||
| 'Failed to advance next_run_at after workflow start; next tick will retry (idempotent)' |
There was a problem hiding this comment.
🟡 Minor: Error log missing context for incident correlation
Issue: The error log includes scheduledTriggerId and err but lacks tenantId, projectId, agentId, and scheduledFor timestamp. During an incident with multiple failing triggers, correlating these logs to specific customers will be difficult.
Why: All these identifiers are already in scope — including them costs nothing and significantly improves debuggability.
Fix:
| 'Failed to advance next_run_at after workflow start; next tick will retry (idempotent)' | |
| logger.error( | |
| { scheduledTriggerId, tenantId, projectId, agentId, scheduledFor: trigger.nextRunAt, err }, |
Refs:
| } catch (err) { | ||
| logger.error( | ||
| { scheduledTriggerId, err }, | ||
| 'Failed to advance next_run_at after workflow start; next tick will retry (idempotent)' |
There was a problem hiding this comment.
🟡 Minor: Error log missing correlation context for incident debugging
Issue: This log includes scheduledTriggerId and err but is missing tenantId, projectId, agentId, and scheduledFor — all of which are available in scope on line 64.
Why: During an incident with multiple failing triggers, operators would need to manually correlate trigger IDs back to tenant/project context. Including all identifiers makes debugging significantly easier and costs nothing.
Fix:
| 'Failed to advance next_run_at after workflow start; next tick will retry (idempotent)' | |
| logger.error( | |
| { scheduledTriggerId, tenantId, projectId, agentId, scheduledFor: trigger.nextRunAt, err }, | |
| 'Failed to advance next_run_at after workflow start; next tick will retry (idempotent)' | |
| ); |
Refs:
| return Promise.resolve(); | ||
| }); | ||
| mockWithRef.mockImplementation(async (_pool, _ref, fn, _opts) => { | ||
| return fn('mock-branch-db'); |
There was a problem hiding this comment.
🟡 Minor: Missing test for advance-failure-after-workflow-start scenario
Issue: This test covers workflow start failure, but there's no test for when advanceScheduledTriggerNextRunAt fails after workflow start succeeds. In triggerDispatcher.ts:88-106, if withRef throws, the code logs an error but still returns 'dispatched'.
Why: Without this test, it's unclear whether the current behavior (count as dispatched, log error) is intentional or accidental. The next scheduler tick will dispatch the same trigger again, relying on workflow-level idempotency. This test would document the expected behavior and catch regressions.
Fix: Add a test case:
it('counts as dispatched even when advance fails (workflow already started)', async () => {
mockStart.mockResolvedValue(undefined);
mockWithRef.mockRejectedValue(new Error('db write failed'));
const result = await dispatchDueTriggers();
expect(result).toEqual({ dispatched: 1 });
expect(mockStart).toHaveBeenCalledTimes(1);
});Refs:
| it('handles DST spring-forward transition', () => { | ||
| const result = computeNextRunAt({ | ||
| cronExpression: '30 2 * * *', | ||
| cronTimezone: 'America/New_York', | ||
| lastScheduledFor: '2026-03-07T07:30:00.000Z', | ||
| }); | ||
|
|
||
| expect(result).toBeDefined(); | ||
| const nextDate = new Date(result!); | ||
| expect(nextDate.getTime()).toBeGreaterThan(new Date('2026-03-07T07:30:00.000Z').getTime()); | ||
| }); | ||
|
|
||
| it('handles DST fall-back transition', () => { | ||
| const result = computeNextRunAt({ | ||
| cronExpression: '30 1 * * *', | ||
| cronTimezone: 'America/New_York', | ||
| lastScheduledFor: '2026-10-31T05:30:00.000Z', | ||
| }); | ||
|
|
||
| expect(result).toBeDefined(); | ||
| const nextDate = new Date(result!); | ||
| expect(nextDate.getTime()).toBeGreaterThan(new Date('2026-10-31T05:30:00.000Z').getTime()); | ||
| }); |
There was a problem hiding this comment.
💭 Consider: DST tests use weak assertions
Issue: These DST tests only assert that the result is defined and later than the input. They don't verify the exact expected next run time, meaning bugs in DST handling (e.g., skipping to the wrong day, firing twice during fall-back) would pass.
Why: DST bugs in scheduler systems are notoriously hard to debug in production. Spring-forward and fall-back transitions can cause triggers to fire at unexpected times or not at all.
Fix: Strengthen assertions to verify exact expected times. For example, for the spring-forward test (March 8, 2026 in America/New_York):
// 2:30 AM doesn't exist during spring-forward, so next valid occurrence
// after 2026-03-07T07:30:00Z should be 2026-03-09T07:30:00Z
expect(result).toBe('2026-03-09T07:30:00.000Z');Refs:
feb29ce to
d02e0fd
Compare
|
TL;DR — Replaces the per-trigger daisy-chaining workflow model with a single centralized scheduler workflow that polls the runtime DB every 60 seconds and dispatches one-shot workflows for due triggers. This moves the Key changes
Summary | 60 files | 23 commits | base: Scheduler architecture overhaul
The scheduler registers itself in a new
Trigger dispatcher and one-shot runner
The dispatcher computes
Schema migration: manage DB → runtime DB
The manage DB migration (
Branch-aware trigger execution
When a branch is deleted,
Deploy restart endpoint and CI integration
The endpoint uses constant-time comparison for the bearer token and the
CRUD routes and service simplification
Removed reconciliation and audit infrastructure
UI and cleanup updatesThe scheduled triggers table component is rewritten from a
|
|
TL;DR — Replaces the per-trigger daisy-chaining workflow model with a single centralized scheduler workflow that polls the runtime DB every 60 seconds and dispatches one-shot workflows for due triggers. This moves the Key changes
Summary | 60 files | 23 commits | base: Scheduler architecture overhaul
The scheduler registers itself in a new
Trigger dispatcher and one-shot runner
The dispatcher computes
Schema migration: manage DB → runtime DB
The manage DB migration (
Branch-aware trigger execution
When a branch is deleted,
Deploy restart endpoint and CI integration
The endpoint uses constant-time comparison for the bearer token and the
CRUD routes and service simplification
Removed reconciliation and audit infrastructure
UI and cleanup updatesThe scheduled triggers table component is rewritten from a
|
|
This pull request has been automatically marked as stale because it has not had recent activity. If this PR is still relevant:
Thank you for your contributions! |
|
This pull request has been automatically closed due to inactivity. If you'd like to continue working on this, please:
Thank you for your understanding! |
No description provided.