Class PostgreSqlFlowRunStore
- Namespace
- FlowOrchestrator.PostgreSQL
- Assembly
- FlowOrchestrator.PostgreSQL.dll
Dapper-based PostgreSQL implementation of all run storage interfaces.
Uses PostgreSQL's INSERT ... ON CONFLICT DO NOTHING for atomic step claim deduplication.
public sealed class PostgreSqlFlowRunStore : IFlowRunStore, IFlowRunRuntimeStore, IFlowRunControlStore, IFlowRetentionStore
- Inheritance
-
PostgreSqlFlowRunStore
- Implements
- Inherited Members
Constructors
PostgreSqlFlowRunStore(string)
public PostgreSqlFlowRunStore(string connectionString)
Parameters
connectionStringstring
Methods
AnnotateDispatchAsync(Guid, string, string, CancellationToken)
Stores the runtime job or message ID returned by the dispatcher alongside the dispatch record. Best-effort — implementations should not throw; failures are silently ignored by the engine.
public Task AnnotateDispatchAsync(Guid runId, string stepKey, string jobId, CancellationToken ct = default)
Parameters
runIdGuidstepKeystringjobIdstringctCancellationToken
Returns
CleanupAsync(DateTimeOffset, CancellationToken)
Deletes all run data (runs, steps, attempts, outputs, events) whose completion time
is older than cutoffUtc.
public Task CleanupAsync(DateTimeOffset cutoffUtc, CancellationToken cancellationToken)
Parameters
cutoffUtcDateTimeOffsetRuns completed before this timestamp are eligible for deletion.
cancellationTokenCancellationTokenPropagates cancellation from the host shutdown signal.
Returns
CompleteRunAsync(Guid, string)
Marks the run as complete (status: Succeeded, Failed, or Cancelled)
and sets CompletedAt.
public Task CompleteRunAsync(Guid runId, string status)
Parameters
Returns
ConfigureRunAsync(Guid, Guid, string, string?, DateTimeOffset?)
Persists the control record for a new run, including an optional idempotency key and an absolute timeout deadline.
public Task ConfigureRunAsync(Guid runId, Guid flowId, string triggerKey, string? idempotencyKey, DateTimeOffset? timeoutAtUtc)
Parameters
runIdGuidflowIdGuidtriggerKeystringidempotencyKeystringtimeoutAtUtcDateTimeOffset?
Returns
FindRunIdByIdempotencyKeyAsync(Guid, string, string)
Looks up an existing run that was started with the given idempotency key. Returns the RunId of the existing run, or null if none exists.
public Task<Guid?> FindRunIdByIdempotencyKeyAsync(Guid flowId, string triggerKey, string idempotencyKey)
Parameters
Returns
GetActiveRunsAsync()
Returns all runs currently in Running status (used for timeout enforcement).
public Task<IReadOnlyList<FlowRunRecord>> GetActiveRunsAsync()
Returns
GetClaimedStepKeysAsync(Guid)
Returns the set of step keys that have been claimed (locked) for execution but not yet completed, used to detect in-progress steps.
public Task<IReadOnlyCollection<string>> GetClaimedStepKeysAsync(Guid runId)
Parameters
runIdGuid
Returns
GetDerivedRunsAsync(Guid)
Returns runs whose SourceRunId equals sourceRunId,
i.e. all re-runs derived from a given run. Used by the dashboard to render lineage.
public Task<IReadOnlyList<FlowRunRecord>> GetDerivedRunsAsync(Guid sourceRunId)
Parameters
sourceRunIdGuid
Returns
GetDispatchedStepKeysAsync(Guid)
Returns the set of step keys that have been dispatched (and not yet released) for a run. Used by the recovery service to avoid re-dispatching already-in-flight steps.
public Task<IReadOnlySet<string>> GetDispatchedStepKeysAsync(Guid runId)
Parameters
runIdGuid
Returns
GetRunControlAsync(Guid)
Returns the control record for the given run, or null if not found.
public Task<FlowRunControlRecord?> GetRunControlAsync(Guid runId)
Parameters
runIdGuid
Returns
GetRunDetailAsync(Guid)
Returns full run detail including all step records and their attempt history,
or null if no run with runId exists.
public Task<FlowRunRecord?> GetRunDetailAsync(Guid runId)
Parameters
runIdGuid
Returns
GetRunStatusAsync(Guid)
Returns the current overall status of the run ("Running", "Succeeded", etc.),
or null if the run does not exist.
public Task<string?> GetRunStatusAsync(Guid runId)
Parameters
runIdGuid
Returns
GetRunTimeseriesAsync(RunTimeseriesGranularity, DateTimeOffset, DateTimeOffset, Guid?)
Returns time-bucketed run counts and duration percentiles for the half-open
interval [since, until). Buckets that contain
no runs are still returned (with zero counts) so the caller can render a contiguous
timeline without gap-filling. When flowId is supplied, only runs
for that flow are included.
public Task<IReadOnlyList<RunTimeseriesBucket>> GetRunTimeseriesAsync(RunTimeseriesGranularity granularity, DateTimeOffset since, DateTimeOffset until, Guid? flowId = null)
Parameters
granularityRunTimeseriesGranularityHour or Day bucket size.
sinceDateTimeOffsetUTC start of the window (inclusive). Aligned to the granularity boundary.
untilDateTimeOffsetUTC end of the window (exclusive). Aligned to the granularity boundary.
flowIdGuid?Optional filter — when non-null, only runs for this flow are counted.
Returns
GetRunsAsync(Guid?, int, int)
Returns a page of run records, optionally filtered by flowId.
Results are ordered by start time descending.
public Task<IReadOnlyList<FlowRunRecord>> GetRunsAsync(Guid? flowId = null, int skip = 0, int take = 50)
Parameters
Returns
GetRunsPageAsync(Guid?, string?, int, int, string?)
Returns a paginated run list with total count, supporting filtering by flow, status, and search text.
public Task<(IReadOnlyList<FlowRunRecord> Runs, int TotalCount)> GetRunsPageAsync(Guid? flowId = null, string? status = null, int skip = 0, int take = 50, string? search = null)
Parameters
Returns
GetStatisticsAsync()
Returns aggregate counts used by the dashboard overview panel.
public Task<DashboardStatistics> GetStatisticsAsync()
Returns
GetStepStatusesAsync(Guid)
Returns the current StepStatus for every step in the run.
public Task<IReadOnlyDictionary<string, StepStatus>> GetStepStatusesAsync(Guid runId)
Parameters
runIdGuid
Returns
MarkTimedOutAsync(Guid, string?)
Marks the run as timed out. Called by the timeout-enforcement background service.
public Task<bool> MarkTimedOutAsync(Guid runId, string? reason)
Parameters
Returns
RecordSkippedStepAsync(Guid, string, string, string?)
Records a step as Skipped without executing it,
used when runAfter conditions cannot be satisfied.
public Task RecordSkippedStepAsync(Guid runId, string stepKey, string stepType, string? reason)
Parameters
Returns
RecordStepCompleteAsync(Guid, string, string, string?, string?)
Records the outcome of a step attempt: updates status, persists output JSON, and sets error message on failure.
public Task RecordStepCompleteAsync(Guid runId, string stepKey, string status, string? outputJson, string? errorMessage)
Parameters
Returns
RecordStepStartAsync(Guid, string, string, string?, string?)
Records the start of a step attempt: sets status to Running, persists inputs, and associates the Hangfire job ID.
public Task RecordStepStartAsync(Guid runId, string stepKey, string stepType, string? inputJson, string? jobId)
Parameters
Returns
ReleaseDispatchAsync(Guid, string, CancellationToken)
Removes the dispatch record for a step, allowing it to be re-dispatched. Called by the engine before rescheduling a Pending (polling) step.
public Task ReleaseDispatchAsync(Guid runId, string stepKey, CancellationToken ct = default)
Parameters
runIdGuidstepKeystringctCancellationToken
Returns
ReleaseStepClaimAsync(Guid, string)
Releases a previously-acquired step claim so a future TryClaimStepAsync(Guid, string) for the
same (runId, stepKey) can succeed. Called by the engine on Pending re-schedule and
retry paths, where the same logical step needs to run again.
public Task ReleaseStepClaimAsync(Guid runId, string stepKey)
Parameters
Returns
Remarks
Idempotent — calling for a key with no claim is a no-op. Default implementation is a no-op so existing custom runtime stores continue to compile; in that case retry/Pending paths behave as before v1.22 (the schedule-time claim was non-strict). New implementations should remove the claim row atomically.
RequestCancelAsync(Guid, string?)
Marks a cancellation request for the run. Steps check this flag before executing.
public Task<bool> RequestCancelAsync(Guid runId, string? reason)
Parameters
Returns
RetryStepAsync(Guid, string)
Resets a step back to a re-runnable state so it can be re-enqueued by the retry flow.
Clears Running/Failed status and increments attempt count.
public Task RetryStepAsync(Guid runId, string stepKey)
Parameters
Returns
StartRunAsync(Guid, string, Guid, string, string?, string?, Guid?)
Creates a new run record in Running status and returns it.
Called once per TriggerAsync invocation.
public Task<FlowRunRecord> StartRunAsync(Guid flowId, string flowName, Guid runId, string triggerKey, string? triggerData, string? jobId, Guid? sourceRunId = null)
Parameters
flowIdGuidThe flow definition this run belongs to.
flowNamestringDisplay name of the flow at trigger time.
runIdGuidUnique identifier for this run, generated by the engine.
triggerKeystringThe manifest trigger key that fired (e.g.
"manual","schedule").triggerDatastringJSON-serialised trigger payload, or null.
jobIdstringRuntime job ID (e.g. Hangfire BackgroundJobId) for cross-referencing, or null.
sourceRunIdGuid?When this run was created via "Re-run all" on a previous run, the ID of that run; otherwise null.
Returns
TryClaimStepAsync(Guid, string)
Atomically claims a step for execution, returning true if this caller acquired the claim or false if another worker already claimed it.
public Task<bool> TryClaimStepAsync(Guid runId, string stepKey)
Parameters
Returns
Remarks
This is the primary guard against duplicate step execution. Since v1.22 the engine calls
this at the top of RunStepAsync (execute time) rather than at schedule time, so it
correctly prevents concurrent execution under at-least-once delivery — including the
Service Bus topic-broadcast case where a single dispatched message reaches multiple
subscriptions. Implementations must use an atomic compare-and-set or equivalent database
primitive. Pair with ReleaseStepClaimAsync(Guid, string) on retry / Pending
re-schedule paths so the SAME step can claim again on a fresh attempt.
TryRecordDispatchAsync(Guid, string, CancellationToken)
Atomically records that a step has been dispatched for execution. Returns true if this is the first dispatch for this step in this run; false if the step was already dispatched (idempotent guard — caller should skip).
public Task<bool> TryRecordDispatchAsync(Guid runId, string stepKey, CancellationToken ct = default)
Parameters
runIdGuidstepKeystringctCancellationToken
Returns
Remarks
Must use an atomic INSERT-if-not-exists primitive (SQL WHERE NOT EXISTS,
PostgreSQL ON CONFLICT DO NOTHING, or TryAdd(TKey, TValue)).
TryRegisterIdempotencyKeyAsync(Guid, string, string, Guid)
Atomically registers an idempotency key for the given run.
public Task<bool> TryRegisterIdempotencyKeyAsync(Guid flowId, string triggerKey, string idempotencyKey, Guid runId)