Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions src/cli/commands/datastore_lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,80 @@ import { Command } from "@cliffy/command";
import { createContext, type GlobalOptions } from "../context.ts";
import {
createDatastoreLock,
createModelLock,
resolveDatastoreForRepo,
} from "../repo_context.ts";
import { UserError } from "../../domain/errors.ts";
import {
type DatastoreLockReleaseData,
type DatastoreLockStatusData,
renderDatastoreLockRelease,
renderDatastoreLockStatus,
} from "../../presentation/output/datastore_output.ts";
import { walk } from "@std/fs";
import { relative } from "@std/path";
import type { LockInfo } from "../../domain/datastore/distributed_lock.ts";

// deno-lint-ignore no-explicit-any
type AnyOptions = any;

/**
* Scans the datastore for per-model lock files.
*
* Returns an array of { lockKey, modelType, modelId, info } for each
* found per-model lock. Only works for filesystem datastores.
*/
async function scanModelLocks(
datastorePath: string,
): Promise<
Array<{
lockKey: string;
modelType: string;
modelId: string;
info: LockInfo;
}>
> {
const results: Array<{
lockKey: string;
modelType: string;
modelId: string;
info: LockInfo;
}> = [];

try {
for await (
const entry of walk(datastorePath, {
includeDirs: false,
match: [/\.lock$/],
})
) {
const rel = relative(datastorePath, entry.path);
// Match pattern: data/{modelType}/{modelId}/.lock
const parts = rel.split("/");
if (
parts.length === 4 && parts[0] === "data" && parts[3] === ".lock"
) {
try {
const content = await Deno.readTextFile(entry.path);
const info = JSON.parse(content) as LockInfo;
results.push({
lockKey: rel,
modelType: parts[1],
modelId: parts[2],
info,
});
} catch {
// Skip unreadable lock files
}
}
}
} catch {
// Datastore directory may not exist
}

return results;
}

/**
* Shows the current datastore lock status.
*/
Expand All @@ -49,6 +111,8 @@ const datastoreLockStatusCommand = new Command()
const { datastoreConfig: config } = await resolveDatastoreForRepo(
options.repoDir ?? ".",
);

// Check global lock
const lock = createDatastoreLock(config);
const info = await lock.inspect();

Expand All @@ -59,6 +123,22 @@ const datastoreLockStatusCommand = new Command()
};

renderDatastoreLockStatus(data, ctx.outputMode);

// Scan for per-model locks (filesystem only)
if (config.type === "filesystem") {
const modelLocks = await scanModelLocks(config.path);
if (modelLocks.length > 0) {
for (const ml of modelLocks) {
const modelData: DatastoreLockStatusData = {
held: true,
info: ml.info,
datastoreType: config.type,
lockScope: `${ml.modelType}/${ml.modelId}`,
};
renderDatastoreLockStatus(modelData, ctx.outputMode);
}
}
}
});

/**
Expand All @@ -68,6 +148,10 @@ const datastoreLockReleaseCommand = new Command()
.description("Force-release a stuck datastore lock")
.option("--repo-dir <dir:string>", "Repository directory", { default: "." })
.option("--force", "Required to confirm force release", { required: true })
.option(
"--model <model:string>",
"Release a specific model's lock (type/id format, e.g. aws-ec2/my-server)",
)
.action(async function (options: AnyOptions) {
const ctx = createContext(options as GlobalOptions, [
"datastore",
Expand All @@ -85,14 +169,32 @@ const datastoreLockReleaseCommand = new Command()
const { datastoreConfig: config } = await resolveDatastoreForRepo(
options.repoDir ?? ".",
);
const lock = createDatastoreLock(config);

const modelSpec = options.model as string | undefined;

let lock;
if (modelSpec) {
// Per-model lock release
const parts = modelSpec.split("/");
if (parts.length !== 2) {
throw new UserError(
`Invalid --model format: "${modelSpec}". Expected "type/id" (e.g. aws-ec2/my-server).`,
);
}
lock = createModelLock(config, parts[0], parts[1]);
} else {
// Global lock release
lock = createDatastoreLock(config);
}

const info = await lock.inspect();

if (!info) {
renderDatastoreLockRelease(
{ released: false, reason: "no lock held" },
ctx.outputMode,
);
const releaseData: DatastoreLockReleaseData = {
released: false,
reason: "no lock held",
};
renderDatastoreLockRelease(releaseData, ctx.outputMode);
return;
}

Expand Down
76 changes: 54 additions & 22 deletions src/cli/commands/model_evaluate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import {
renderModelEvaluateSingle,
} from "../../presentation/output/model_evaluate_output.ts";
import { createContext, type GlobalOptions } from "../context.ts";
import { requireInitializedRepo } from "../repo_context.ts";
import {
acquireModelLocks,
requireInitializedRepo,
requireInitializedRepoUnlocked,
} from "../repo_context.ts";
import { UserError } from "../../domain/errors.ts";
import { ExpressionEvaluationService } from "../../domain/expressions/expression_evaluation_service.ts";
import { findDefinitionByIdOrName } from "../../domain/models/model_lookup.ts";
Expand All @@ -45,20 +49,21 @@ export const modelEvaluateCommand = new Command()
"model",
"evaluate",
]);
const { repoDir, repoContext } = await requireInitializedRepo({
repoDir: options.repoDir ?? ".",
outputMode: ctx.outputMode,
});
const definitionRepo = repoContext.definitionRepo;
const dataRepo = repoContext.unifiedDataRepo;
const evaluationService = new ExpressionEvaluationService(
definitionRepo,
repoDir,
{ dataRepo },
);

// If --all flag or no argument, evaluate all definitions
// If --all flag or no argument, evaluate all definitions (global lock)
if (options.all || !modelIdOrName) {
const { repoDir, repoContext } = await requireInitializedRepo({
repoDir: options.repoDir ?? ".",
outputMode: ctx.outputMode,
});
const definitionRepo = repoContext.definitionRepo;
const dataRepo = repoContext.unifiedDataRepo;
const evaluationService = new ExpressionEvaluationService(
definitionRepo,
repoDir,
{ dataRepo },
);

ctx.logger.debug`Evaluating all model definitions`;

const results = await evaluationService.evaluateAllDefinitions();
Expand Down Expand Up @@ -90,10 +95,18 @@ export const modelEvaluateCommand = new Command()
return;
}

// Single model evaluation
// Single model evaluation — use per-model lock
const { repoDir, repoContext, datastoreConfig } =
await requireInitializedRepoUnlocked({
repoDir: options.repoDir ?? ".",
outputMode: ctx.outputMode,
});
const definitionRepo = repoContext.definitionRepo;
const dataRepo = repoContext.unifiedDataRepo;

ctx.logger.debug`Evaluating model: ${modelIdOrName}`;

// Look up the model definition
// Look up the model definition (reads YAML — no lock needed)
ctx.logger.debug`Looking up model: ${modelIdOrName}`;
const lookupResult = await findDefinitionByIdOrName(
definitionRepo,
Expand All @@ -107,15 +120,34 @@ export const modelEvaluateCommand = new Command()
ctx.logger
.debug`Found model: id=${definition.id}, type=${type.normalized}`;

// Evaluate the definition
const result = await evaluationService.evaluateDefinition(
definition,
type,
);
// Acquire per-model lock (for S3, also pulls model-scoped files)
const flushModelLocks = await acquireModelLocks(datastoreConfig, [
{ modelType: type.normalized, modelId: definition.id },
]);

// Persist evaluated definition for --last-evaluated
const evaluationService = new ExpressionEvaluationService(
definitionRepo,
repoDir,
{ dataRepo },
);
const evaluatedDefRepo = repoContext.evaluatedDefinitionRepo;
await evaluatedDefRepo.save(type, result.definition);

let result: Awaited<
ReturnType<typeof evaluationService.evaluateDefinition>
>;
try {
// Evaluate the definition
result = await evaluationService.evaluateDefinition(
definition,
type,
);

// Persist evaluated definition for --last-evaluated
await evaluatedDefRepo.save(type, result.definition);
} finally {
// Release per-model lock (and push to S3 for S3 datastores)
await flushModelLocks();
}

const item: ModelEvaluateItemData = {
id: result.definition.id,
Expand Down
Loading
Loading