Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ public final class WorkflowConstants {
public static final String CONFIG_SCHEDULE_AFTER = "schedule." + CONFIG_AFTER;
public static final String CONFIG_BATCH_SIZE = "batch-size";
public static final String CONFIG_SCHEDULE_BATCH_SIZE = "schedule." + CONFIG_BATCH_SIZE;
public static final String CONFIG_LAST_SCHEDULE_RUN = "schedule.lastRun";
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.ValueIdentityBiFunction;
import org.keycloak.models.workflow.WorkflowScheduleClusterEvent;
import org.keycloak.sessions.CommonClientSessionModel;
import org.keycloak.storage.UserStorageProviderClusterEvent;
import org.keycloak.storage.UserStorageProviderModel;
Expand Down Expand Up @@ -234,6 +235,9 @@
ValueIdentityBiFunction.class,
LoginFailuresLifespanUpdate.class,

// workflow package
WorkflowScheduleClusterEvent.class,

// infinispan.module.certificates
ReloadCertificateFunction.class,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ private Marshalling() {
public static final int VALUE_IDENTITY_BI_FUNCTION = 65619;
public static final int LOGIN_FAILURES_LIFESPAN_UPDATE = 65620;

/** see {@link org.keycloak.models.workflow.WorkflowScheduleClusterEvent} */
public static final int WORKFLOW_SCHEDULE_CLUSTER_EVENT = 65621;

public static void configure(GlobalConfigurationBuilder builder) {
getSchemas().forEach(builder.serialization()::addContextInitializer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.keycloak.models.workflow;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -15,6 +14,7 @@

import org.keycloak.common.util.DurationConverter;
import org.keycloak.common.util.MultivaluedHashMap;
import org.keycloak.common.util.Time;
import org.keycloak.component.ComponentFactory;
import org.keycloak.component.ComponentModel;
import org.keycloak.models.KeycloakSession;
Expand All @@ -26,7 +26,6 @@
import org.keycloak.representations.workflows.WorkflowConstants;
import org.keycloak.representations.workflows.WorkflowRepresentation;
import org.keycloak.representations.workflows.WorkflowStepRepresentation;
import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
import org.keycloak.timer.TimerProvider;

import org.jboss.logging.Logger;
Expand All @@ -36,6 +35,7 @@
public class DefaultWorkflowProvider implements WorkflowProvider {

private static final Logger log = Logger.getLogger(DefaultWorkflowProvider.class);
private static final Logger scheduleLog = Logger.getLogger("org.keycloak.workflow.schedule");

private final KeycloakSession session;
private final WorkflowStateProvider stateProvider;
Expand Down Expand Up @@ -108,10 +108,11 @@ public void updateWorkflow(Workflow workflow, WorkflowRepresentation representat

// finally, update the workflow's config along with the steps' configs
workflow.updateConfig(representation.getConfig(), newSteps);
}

cancelScheduledWorkflow(workflow);
scheduleWorkflow(workflow);
cancelScheduledWorkflow(workflow);
scheduleWorkflow(workflow);
notifyScheduleChange(workflow, false);
}
}

@Override
Expand All @@ -122,6 +123,7 @@ public void removeWorkflow(Workflow workflow) {
realm.removeComponent(component);
stateProvider.removeByWorkflow(workflow.getId());
cancelScheduledWorkflow(workflow);
notifyScheduleChange(workflow, true);
}

@Override
Expand Down Expand Up @@ -480,27 +482,61 @@ private Workflow addWorkflow(Workflow workflow) {
workflow = new Workflow(session, realm.addComponentModel(model));

scheduleWorkflow(workflow);
notifyScheduleChange(workflow, false);

return workflow;
}

private void scheduleWorkflow(Workflow workflow) {
// only start the task if the workflow is enabled and has a schedule configured
String scheduled = workflow.getConfig().getFirst(WorkflowConstants.CONFIG_SCHEDULE_AFTER);

if (workflow.isEnabled() && scheduled != null) {
Duration duration = DurationConverter.parseDuration(scheduled);
initLastScheduleRun(workflow);
int intervalSecs = (int) DurationConverter.parseDuration(scheduled).toSeconds();
int initialDelaySecs = ScheduledWorkflowRunner.computeInitialDelay(workflow, intervalSecs);
TimerProvider timer = session.getProvider(TimerProvider.class);
timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ScheduledWorkflowRunner(workflow.getId(), realm.getId()), duration.toMillis()), duration.toMillis());
ScheduledWorkflowRunner runner = new ScheduledWorkflowRunner(workflow.getId(), realm.getId(), intervalSecs);
timer.scheduleTask(runner, initialDelaySecs * 1000L, intervalSecs * 1000L);
scheduleLog.debugf("Scheduled workflow '%s' with interval %d s, initial delay %d s", workflow.getName(), intervalSecs, initialDelaySecs);
}
}

private void initLastScheduleRun(Workflow workflow) {
if (ScheduledWorkflowRunner.getLastScheduleRun(workflow) <= 0) {
ComponentModel component = realm.getComponent(workflow.getId());
component.put(WorkflowConstants.CONFIG_LAST_SCHEDULE_RUN, String.valueOf(Time.currentTime()));
realm.updateComponent(component);
}
}

void cancelScheduledWorkflow(Workflow workflow) {
session.getProvider(TimerProvider.class).cancelTask(new ScheduledWorkflowRunner(workflow.getId(), realm.getId()).getTaskName());
session.getProvider(TimerProvider.class).cancelTask(ScheduledWorkflowRunner.taskName(workflow.getId()));
}

void rescheduleWorkflow(Workflow workflow) {
cancelScheduledWorkflow(workflow);
scheduleWorkflow(workflow);
}

private void notifyScheduleChange(Workflow workflow, boolean removed) {
DefaultWorkflowProviderFactory factory = (DefaultWorkflowProviderFactory) sessionFactory
.getProviderFactory(WorkflowProvider.class, DefaultWorkflowProviderFactory.ID);
WorkflowScheduleEventListener listener = factory.getScheduleEventListener();

if (listener != null) {
int intervalSecs = 0;
int lastScheduleRun = 0;

if (!removed) {
String scheduled = workflow.getConfig().getFirst(WorkflowConstants.CONFIG_SCHEDULE_AFTER);

if (workflow.isEnabled() && scheduled != null) {
intervalSecs = (int) DurationConverter.parseDuration(scheduled).toSeconds();
lastScheduleRun = ScheduledWorkflowRunner.getLastScheduleRun(workflow);
}
}

listener.notifyCluster(session, realm.getId(), workflow.getId(), removed, intervalSecs, lastScheduleRun);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class DefaultWorkflowProviderFactory implements WorkflowProviderFactory<D
private static final long DEFAULT_EXECUTOR_TASK_TIMEOUT = 5000L;

private WorkflowExecutor executor;
private WorkflowScheduleEventListener scheduleEventListener;
private boolean blocking;
private long taskTimeout;

Expand Down Expand Up @@ -51,7 +52,9 @@ public void init(Scope config) {
@Override
public void postInit(KeycloakSessionFactory factory) {
this.executor = new WorkflowExecutor(getTaskExecutor(factory), blocking, taskTimeout);
this.scheduleEventListener = new WorkflowScheduleEventListener(factory);
factory.register(this);
factory.register(scheduleEventListener);
}

@Override
Expand Down Expand Up @@ -82,6 +85,10 @@ public void onEvent(ProviderEvent event) {
}


WorkflowScheduleEventListener getScheduleEventListener() {
return scheduleEventListener;
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
package org.keycloak.models.workflow;

import jakarta.ws.rs.BadRequestException;

import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.DurationConverter;
import org.keycloak.common.util.Time;
import org.keycloak.component.ComponentModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.representations.workflows.WorkflowConstants;
import org.keycloak.timer.ScheduledTask;
import org.keycloak.timer.TimerProvider;

import org.jboss.logging.Logger;

public class ScheduledWorkflowRunner implements ScheduledTask {

private static final Logger log = Logger.getLogger(DefaultWorkflowProvider.class);
private static final Logger log = Logger.getLogger("org.keycloak.workflow.schedule");

private static final int MIN_LOCK_TIMEOUT_SECS = 30;

private final String workflowId;
private final String realmId;
private final int intervalSecs;

public ScheduledWorkflowRunner(String workflowId, String realmId) {
public ScheduledWorkflowRunner(String workflowId, String realmId, int intervalSecs) {
this.workflowId = workflowId;
this.realmId = realmId;
this.intervalSecs = intervalSecs;
}

@Override
Expand All @@ -24,31 +38,130 @@ public void run(KeycloakSession session) {

if (realm == null) {
log.warnf("Realm %s for scheduled workflow %s not found, cancelling task", realmId, workflowId);
throw new IllegalStateException("Realm for scheduled workflow not found: " + realmId);
cancelTask(session);
return;
}

session.getContext().setRealm(realm);
WorkflowProvider provider = session.getProvider(WorkflowProvider.class);
Workflow workflow = provider.getWorkflow(workflowId);
Workflow workflow;

if (workflow == null) {
try {
workflow = provider.getWorkflow(workflowId);
} catch (BadRequestException e) {
log.warnf("Scheduled workflow %s in realm %s not found, cancelling task", workflowId, realmId);
throw new IllegalStateException("Scheduled workflow not found: " + workflowId);
cancelTask(session);
return;
}

log.debugf("Executing scheduled workflow '%s' in realm %s", workflow.getName(), realm.getName());
if (!workflow.isEnabled()) {
log.debugf("Workflow '%s' in realm %s is disabled, cancelling scheduled task", workflow.getName(), realm.getName());
cancelTask(session);
return;
}

try {
provider.activateForAllEligibleResources(workflow);
} catch (Exception e) {
log.errorf(e, "Error while executing scheduled workflow %s in realm %s", workflow.getName(), realm.getName());
String currentSchedule = workflow.getConfig().getFirst(WorkflowConstants.CONFIG_SCHEDULE_AFTER);
if (currentSchedule == null) {
log.debugf("Workflow '%s' in realm %s no longer has a schedule, cancelling task", workflow.getName(), realm.getName());
cancelTask(session);
return;
}

int currentIntervalSecs = (int) DurationConverter.parseDuration(currentSchedule).toSeconds();
if (currentIntervalSecs != intervalSecs) {
log.debugf("Schedule interval for workflow '%s' in realm %s changed from %d to %d s, rescheduling",
workflow.getName(), realm.getName(), intervalSecs, currentIntervalSecs);
cancelTask(session);
scheduleAligned(session, workflow, currentIntervalSecs);
return;
}

if (!isSchedulePeriod(workflow)) {
log.debugf("Skipping scheduled workflow '%s' in realm %s, too soon since last run", workflow.getName(), realm.getName());
return;
}

ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
String taskKey = workflowId + "::schedule";
int lockTimeout = Math.max(MIN_LOCK_TIMEOUT_SECS, intervalSecs);

ExecutionResult<Void> result = clusterProvider.executeIfNotExecuted(taskKey, lockTimeout, () -> {
KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), s -> {
RealmModel r = s.realms().getRealm(realmId);
s.getContext().setRealm(r);
updateLastScheduleRun(s);
});

log.debugf("Executing scheduled workflow '%s' in realm %s", workflow.getName(), realm.getName());

try {
provider.activateForAllEligibleResources(workflow);
} catch (Exception e) {
log.errorf(e, "Error while executing scheduled workflow %s in realm %s", workflow.getName(), realm.getName());
}

log.debugf("Finished executing scheduled workflow '%s' in realm %s", workflow.getName(), realm.getName());
return null;
});

if (!result.isExecuted()) {
log.debugf("Skipping scheduled workflow '%s' in realm %s, already in progress on another node", workflow.getName(), realm.getName());
}
}

private boolean isSchedulePeriod(Workflow workflow) {
int lastRun = getLastScheduleRun(workflow);

if (lastRun <= 0) {
return true;
}

log.debugf("Finished executing scheduled workflow '%s' in realm %s", workflow.getName(), realm.getName());
int elapsed = Time.currentTime() - lastRun;
return elapsed >= (intervalSecs - 1);
}

private void updateLastScheduleRun(KeycloakSession session) {
ComponentModel component = session.getContext().getRealm().getComponent(workflowId);
component.put(WorkflowConstants.CONFIG_LAST_SCHEDULE_RUN, String.valueOf(Time.currentTime()));
session.getContext().getRealm().updateComponent(component);
}

private void cancelTask(KeycloakSession session) {
session.getProvider(TimerProvider.class).cancelTask(getTaskName());
}

private void scheduleAligned(KeycloakSession session, Workflow workflow, int newIntervalSecs) {
TimerProvider timer = session.getProvider(TimerProvider.class);
ScheduledWorkflowRunner newRunner = new ScheduledWorkflowRunner(workflowId, realmId, newIntervalSecs);
long initialDelayMillis = computeInitialDelay(workflow, newIntervalSecs) * 1000L;
timer.scheduleTask(newRunner, initialDelayMillis, newIntervalSecs * 1000L);
}

@Override
public String getTaskName() {
return taskName(workflowId);
}

static String taskName(String workflowId) {
return "workflow-" + workflowId;
}

static int getLastScheduleRun(Workflow workflow) {
String val = workflow.getConfig().getFirst(WorkflowConstants.CONFIG_LAST_SCHEDULE_RUN);
return val == null ? 0 : Integer.parseInt(val);
}

static int computeInitialDelay(Workflow workflow, int intervalSecs) {
return computeInitialDelay(getLastScheduleRun(workflow), intervalSecs);
}

static int computeInitialDelay(int lastRunSecs, int intervalSecs) {
if (lastRunSecs <= 0) {
return intervalSecs;
}

int nextFireTime = lastRunSecs + intervalSecs;
int delay = nextFireTime - Time.currentTime();
return Math.max(0, delay);
}
}
Loading
Loading