Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions design/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,43 @@ attributes:
historySize: ${{ size(data.listVersions('processor', 'result')) }}
```

### data.findBySpec(modelName, specName)

Returns all data records for a model that match a given output spec name.
Commonly used in `forEach` expressions to iterate over variable-length output.

**Workflow run scoping:** When called inside a workflow run, `findBySpec` only
returns data produced during the current run. This prevents stale data from
previous runs leaking into `forEach` iteration. Outside a workflow context, it
returns all data globally.

```yaml
# In a forEach step — only sees data from the current workflow run:
steps:
- name: dl-${{ self.ep.attributes.title }}
forEach:
item: ep
in: ${{ data.findBySpec("dedup-model", "episode") }}
task:
type: model_method
modelIdOrName: downloader
methodName: download
inputs:
uri: ${{ self.ep.attributes.url }}
```

### data.findByTag(tagKey, tagValue)

Returns all data records across all models with a matching tag. This function is
**not** run-scoped — it always returns all matching data globally. Use it when
you need cross-run data access.

```yaml
# Find all data tagged with env=prod across all models:
inputs:
prodData: ${{ data.findByTag("env", "prod") }}
```

## Sensitive Data

You should be able to access sensitive data by referencing the storage keys they
Expand Down
11 changes: 11 additions & 0 deletions src/domain/expressions/model_resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ export interface ExpressionContext {
data?: DataNamespace;
/** File namespace for lazy-loading file contents */
file?: FileNamespace;
/**
* Workflow run ID for scoping data queries.
* When set, `data.findBySpec()` only returns data tagged with this run ID.
* Set by the workflow engine after creating the run.
*/
workflowRunId?: string;
/** Index signature for CEL evaluator compatibility */
[key: string]: unknown;
}
Expand Down Expand Up @@ -605,6 +611,7 @@ export class ModelResolver {
if (!allCoords) return [];
const results: DataRecord[] = [];
const seen = new Set<string>();
const runId = context.workflowRunId;
for (const { modelType, modelId } of allCoords) {
const allData = dataRepo.findAllForModelSync(modelType, modelId);
for (const data of allData) {
Expand All @@ -621,6 +628,10 @@ export class ModelResolver {
latestVersion,
);
if (versionData && versionData.tags["specName"] === specName) {
// When inside a workflow run, scope to current run's data only
if (runId && versionData.tags["workflowRunId"] !== runId) {
continue;
}
const dedupeKey =
`${modelType.normalized}:${modelId}:${data.name}`;
if (seen.has(dedupeKey)) continue;
Expand Down
138 changes: 138 additions & 0 deletions src/domain/expressions/model_resolver_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,141 @@ Deno.test("data.* returns null/empty for missing data name", async () => {
assertEquals(ctx.data.listVersions("empty-model", "nonexistent"), []);
});
});

// ============================================================================
// data.findBySpec() run-scoping via workflowRunId
// ============================================================================

Deno.test("findBySpec: scopes to current run when workflowRunId is set", async () => {
await withTempDir(async (repoDir) => {
await setupRepoDir(repoDir);
const defRepo = new YamlDefinitionRepository(repoDir);
const dataRepo = new FileSystemUnifiedDataRepository(repoDir);
const type = ModelType.create("test/model");

const model = Definition.create({
name: "dedup-model",
globalArguments: {},
});
await defRepo.save(type, model);

// Data from run-1
const episodeA = Data.create({
name: "episode-a",
contentType: "application/json",
lifetime: "infinite",
garbageCollection: 10,
tags: {
type: "resource",
specName: "episode",
workflowRunId: "run-1",
},
ownerDefinition: owner,
});
await dataRepo.save(
type,
model.id,
episodeA,
new TextEncoder().encode(JSON.stringify({ title: "Episode A" })),
);

// Data from run-2
const episodeB = Data.create({
name: "episode-b",
contentType: "application/json",
lifetime: "infinite",
garbageCollection: 10,
tags: {
type: "resource",
specName: "episode",
workflowRunId: "run-2",
},
ownerDefinition: owner,
});
await dataRepo.save(
type,
model.id,
episodeB,
new TextEncoder().encode(JSON.stringify({ title: "Episode B" })),
);

const resolver = new ModelResolver(defRepo, { repoDir, dataRepo });
const ctx = await resolver.buildContext();
assertExists(ctx.data);

// Without workflowRunId, findBySpec returns all data
const allResults = ctx.data.findBySpec("dedup-model", "episode");
assertEquals(allResults.length, 2);

// With workflowRunId set, findBySpec only returns data from that run
ctx.workflowRunId = "run-1";
const run1Results = ctx.data.findBySpec("dedup-model", "episode");
assertEquals(run1Results.length, 1);
assertEquals(run1Results[0].name, "episode-a");

// Switch to run-2
ctx.workflowRunId = "run-2";
const run2Results = ctx.data.findBySpec("dedup-model", "episode");
assertEquals(run2Results.length, 1);
assertEquals(run2Results[0].name, "episode-b");
});
});

Deno.test("findBySpec: returns all data when workflowRunId is not set", async () => {
await withTempDir(async (repoDir) => {
await setupRepoDir(repoDir);
const defRepo = new YamlDefinitionRepository(repoDir);
const dataRepo = new FileSystemUnifiedDataRepository(repoDir);
const type = ModelType.create("test/model");

const model = Definition.create({
name: "global-model",
globalArguments: {},
});
await defRepo.save(type, model);

// Data with no workflowRunId tag (e.g., written outside a workflow)
const dataNoRun = Data.create({
name: "item-standalone",
contentType: "application/json",
lifetime: "infinite",
garbageCollection: 10,
tags: { type: "resource", specName: "item" },
ownerDefinition: owner,
});
await dataRepo.save(
type,
model.id,
dataNoRun,
new TextEncoder().encode(JSON.stringify({ value: 1 })),
);

// Data with a workflowRunId tag
const dataWithRun = Data.create({
name: "item-from-workflow",
contentType: "application/json",
lifetime: "infinite",
garbageCollection: 10,
tags: {
type: "resource",
specName: "item",
workflowRunId: "run-abc",
},
ownerDefinition: owner,
});
await dataRepo.save(
type,
model.id,
dataWithRun,
new TextEncoder().encode(JSON.stringify({ value: 2 })),
);

const resolver = new ModelResolver(defRepo, { repoDir, dataRepo });
const ctx = await resolver.buildContext();
assertExists(ctx.data);

// No workflowRunId set — returns all data regardless of tags
const allResults = ctx.data.findBySpec("global-model", "item");
assertEquals(allResults.length, 2);
});
});
7 changes: 7 additions & 0 deletions src/domain/workflows/execution_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ export class DefaultStepExecutor implements StepExecutor {
...(ctx.workflowTags ?? {}),
source: "step-output",
workflow: ctx.workflowName,
workflowRunId: ctx.workflowRunId,
step: ctx.stepName,
};

Expand Down Expand Up @@ -1139,6 +1140,12 @@ export class WorkflowExecutionService {
};
const run = WorkflowRun.create(workflow, mergedTags);

// Scope data.findBySpec() to this run so forEach expressions
// only see data produced during the current workflow run.
if (expressionContext) {
expressionContext.workflowRunId = run.id;
}

// Create secret redactor — populated during vault resolution, used by log sink and data writers
const secretRedactor = new SecretRedactor();

Expand Down
Loading