Table of Contents

Class PostgreSqlFlowRunStore

Namespace
FlowOrchestrator.PostgreSQL
Assembly
FlowOrchestrator.PostgreSQL.dll

Dapper-based PostgreSQL implementation of all run storage interfaces. Uses PostgreSQL's INSERT ... ON CONFLICT DO NOTHING for atomic step claim deduplication.

public sealed class PostgreSqlFlowRunStore : IFlowRunStore, IFlowRunRuntimeStore, IFlowRunControlStore, IFlowRetentionStore
Inheritance
PostgreSqlFlowRunStore
Implements
Inherited Members

Constructors

PostgreSqlFlowRunStore(string)

public PostgreSqlFlowRunStore(string connectionString)

Parameters

connectionString string

Methods

AnnotateDispatchAsync(Guid, string, string, CancellationToken)

Stores the runtime job or message ID returned by the dispatcher alongside the dispatch record. Best-effort — implementations should not throw; failures are silently ignored by the engine.

public Task AnnotateDispatchAsync(Guid runId, string stepKey, string jobId, CancellationToken ct = default)

Parameters

runId Guid
stepKey string
jobId string
ct CancellationToken

Returns

Task

CleanupAsync(DateTimeOffset, CancellationToken)

Deletes all run data (runs, steps, attempts, outputs, events) whose completion time is older than cutoffUtc.

public Task CleanupAsync(DateTimeOffset cutoffUtc, CancellationToken cancellationToken)

Parameters

cutoffUtc DateTimeOffset

Runs completed before this timestamp are eligible for deletion.

cancellationToken CancellationToken

Propagates cancellation from the host shutdown signal.

Returns

Task

CompleteRunAsync(Guid, string)

Marks the run as complete (status: Succeeded, Failed, or Cancelled) and sets CompletedAt.

public Task CompleteRunAsync(Guid runId, string status)

Parameters

runId Guid
status string

Returns

Task

ConfigureRunAsync(Guid, Guid, string, string?, DateTimeOffset?)

Persists the control record for a new run, including an optional idempotency key and an absolute timeout deadline.

public Task ConfigureRunAsync(Guid runId, Guid flowId, string triggerKey, string? idempotencyKey, DateTimeOffset? timeoutAtUtc)

Parameters

runId Guid
flowId Guid
triggerKey string
idempotencyKey string
timeoutAtUtc DateTimeOffset?

Returns

Task

FindRunIdByIdempotencyKeyAsync(Guid, string, string)

Looks up an existing run that was started with the given idempotency key. Returns the RunId of the existing run, or null if none exists.

public Task<Guid?> FindRunIdByIdempotencyKeyAsync(Guid flowId, string triggerKey, string idempotencyKey)

Parameters

flowId Guid
triggerKey string
idempotencyKey string

Returns

Task<Guid?>

GetActiveRunsAsync()

Returns all runs currently in Running status (used for timeout enforcement).

public Task<IReadOnlyList<FlowRunRecord>> GetActiveRunsAsync()

Returns

Task<IReadOnlyList<FlowRunRecord>>

GetClaimedStepKeysAsync(Guid)

Returns the set of step keys that have been claimed (locked) for execution but not yet completed, used to detect in-progress steps.

public Task<IReadOnlyCollection<string>> GetClaimedStepKeysAsync(Guid runId)

Parameters

runId Guid

Returns

Task<IReadOnlyCollection<string>>

GetDerivedRunsAsync(Guid)

Returns runs whose SourceRunId equals sourceRunId, i.e. all re-runs derived from a given run. Used by the dashboard to render lineage.

public Task<IReadOnlyList<FlowRunRecord>> GetDerivedRunsAsync(Guid sourceRunId)

Parameters

sourceRunId Guid

Returns

Task<IReadOnlyList<FlowRunRecord>>

GetDispatchedStepKeysAsync(Guid)

Returns the set of step keys that have been dispatched (and not yet released) for a run. Used by the recovery service to avoid re-dispatching already-in-flight steps.

public Task<IReadOnlySet<string>> GetDispatchedStepKeysAsync(Guid runId)

Parameters

runId Guid

Returns

Task<IReadOnlySet<string>>

GetRunControlAsync(Guid)

Returns the control record for the given run, or null if not found.

public Task<FlowRunControlRecord?> GetRunControlAsync(Guid runId)

Parameters

runId Guid

Returns

Task<FlowRunControlRecord>

GetRunDetailAsync(Guid)

Returns full run detail including all step records and their attempt history, or null if no run with runId exists.

public Task<FlowRunRecord?> GetRunDetailAsync(Guid runId)

Parameters

runId Guid

Returns

Task<FlowRunRecord>

GetRunStatusAsync(Guid)

Returns the current overall status of the run ("Running", "Succeeded", etc.), or null if the run does not exist.

public Task<string?> GetRunStatusAsync(Guid runId)

Parameters

runId Guid

Returns

Task<string>

GetRunTimeseriesAsync(RunTimeseriesGranularity, DateTimeOffset, DateTimeOffset, Guid?)

Returns time-bucketed run counts and duration percentiles for the half-open interval [since, until). Buckets that contain no runs are still returned (with zero counts) so the caller can render a contiguous timeline without gap-filling. When flowId is supplied, only runs for that flow are included.

public Task<IReadOnlyList<RunTimeseriesBucket>> GetRunTimeseriesAsync(RunTimeseriesGranularity granularity, DateTimeOffset since, DateTimeOffset until, Guid? flowId = null)

Parameters

granularity RunTimeseriesGranularity

Hour or Day bucket size.

since DateTimeOffset

UTC start of the window (inclusive). Aligned to the granularity boundary.

until DateTimeOffset

UTC end of the window (exclusive). Aligned to the granularity boundary.

flowId Guid?

Optional filter — when non-null, only runs for this flow are counted.

Returns

Task<IReadOnlyList<RunTimeseriesBucket>>

GetRunsAsync(Guid?, int, int)

Returns a page of run records, optionally filtered by flowId. Results are ordered by start time descending.

public Task<IReadOnlyList<FlowRunRecord>> GetRunsAsync(Guid? flowId = null, int skip = 0, int take = 50)

Parameters

flowId Guid?
skip int
take int

Returns

Task<IReadOnlyList<FlowRunRecord>>

GetRunsPageAsync(Guid?, string?, int, int, string?)

Returns a paginated run list with total count, supporting filtering by flow, status, and search text.

public Task<(IReadOnlyList<FlowRunRecord> Runs, int TotalCount)> GetRunsPageAsync(Guid? flowId = null, string? status = null, int skip = 0, int take = 50, string? search = null)

Parameters

flowId Guid?
status string
skip int
take int
search string

Returns

Task<(IReadOnlyList<FlowRunRecord> Runs, int TotalCount)>

GetStatisticsAsync()

Returns aggregate counts used by the dashboard overview panel.

public Task<DashboardStatistics> GetStatisticsAsync()

Returns

Task<DashboardStatistics>

GetStepStatusesAsync(Guid)

Returns the current StepStatus for every step in the run.

public Task<IReadOnlyDictionary<string, StepStatus>> GetStepStatusesAsync(Guid runId)

Parameters

runId Guid

Returns

Task<IReadOnlyDictionary<string, StepStatus>>

MarkTimedOutAsync(Guid, string?)

Marks the run as timed out. Called by the timeout-enforcement background service.

public Task<bool> MarkTimedOutAsync(Guid runId, string? reason)

Parameters

runId Guid
reason string

Returns

Task<bool>

true if the record was found and updated; false otherwise.

RecordSkippedStepAsync(Guid, string, string, string?)

Records a step as Skipped without executing it, used when runAfter conditions cannot be satisfied.

public Task RecordSkippedStepAsync(Guid runId, string stepKey, string stepType, string? reason)

Parameters

runId Guid
stepKey string
stepType string
reason string

Returns

Task

RecordStepCompleteAsync(Guid, string, string, string?, string?)

Records the outcome of a step attempt: updates status, persists output JSON, and sets error message on failure.

public Task RecordStepCompleteAsync(Guid runId, string stepKey, string status, string? outputJson, string? errorMessage)

Parameters

runId Guid
stepKey string
status string
outputJson string
errorMessage string

Returns

Task

RecordStepStartAsync(Guid, string, string, string?, string?)

Records the start of a step attempt: sets status to Running, persists inputs, and associates the Hangfire job ID.

public Task RecordStepStartAsync(Guid runId, string stepKey, string stepType, string? inputJson, string? jobId)

Parameters

runId Guid
stepKey string
stepType string
inputJson string
jobId string

Returns

Task

ReleaseDispatchAsync(Guid, string, CancellationToken)

Removes the dispatch record for a step, allowing it to be re-dispatched. Called by the engine before rescheduling a Pending (polling) step.

public Task ReleaseDispatchAsync(Guid runId, string stepKey, CancellationToken ct = default)

Parameters

runId Guid
stepKey string
ct CancellationToken

Returns

Task

ReleaseStepClaimAsync(Guid, string)

Releases a previously-acquired step claim so a future TryClaimStepAsync(Guid, string) for the same (runId, stepKey) can succeed. Called by the engine on Pending re-schedule and retry paths, where the same logical step needs to run again.

public Task ReleaseStepClaimAsync(Guid runId, string stepKey)

Parameters

runId Guid
stepKey string

Returns

Task

Remarks

Idempotent — calling for a key with no claim is a no-op. Default implementation is a no-op so existing custom runtime stores continue to compile; in that case retry/Pending paths behave as before v1.22 (the schedule-time claim was non-strict). New implementations should remove the claim row atomically.

RequestCancelAsync(Guid, string?)

Marks a cancellation request for the run. Steps check this flag before executing.

public Task<bool> RequestCancelAsync(Guid runId, string? reason)

Parameters

runId Guid
reason string

Returns

Task<bool>

true if the record was found and updated; false otherwise.

RetryStepAsync(Guid, string)

Resets a step back to a re-runnable state so it can be re-enqueued by the retry flow. Clears Running/Failed status and increments attempt count.

public Task RetryStepAsync(Guid runId, string stepKey)

Parameters

runId Guid
stepKey string

Returns

Task

StartRunAsync(Guid, string, Guid, string, string?, string?, Guid?)

Creates a new run record in Running status and returns it. Called once per TriggerAsync invocation.

public Task<FlowRunRecord> StartRunAsync(Guid flowId, string flowName, Guid runId, string triggerKey, string? triggerData, string? jobId, Guid? sourceRunId = null)

Parameters

flowId Guid

The flow definition this run belongs to.

flowName string

Display name of the flow at trigger time.

runId Guid

Unique identifier for this run, generated by the engine.

triggerKey string

The manifest trigger key that fired (e.g. "manual", "schedule").

triggerData string

JSON-serialised trigger payload, or null.

jobId string

Runtime job ID (e.g. Hangfire BackgroundJobId) for cross-referencing, or null.

sourceRunId Guid?

When this run was created via "Re-run all" on a previous run, the ID of that run; otherwise null.

Returns

Task<FlowRunRecord>

TryClaimStepAsync(Guid, string)

Atomically claims a step for execution, returning true if this caller acquired the claim or false if another worker already claimed it.

public Task<bool> TryClaimStepAsync(Guid runId, string stepKey)

Parameters

runId Guid
stepKey string

Returns

Task<bool>

Remarks

This is the primary guard against duplicate step execution. Since v1.22 the engine calls this at the top of RunStepAsync (execute time) rather than at schedule time, so it correctly prevents concurrent execution under at-least-once delivery — including the Service Bus topic-broadcast case where a single dispatched message reaches multiple subscriptions. Implementations must use an atomic compare-and-set or equivalent database primitive. Pair with ReleaseStepClaimAsync(Guid, string) on retry / Pending re-schedule paths so the SAME step can claim again on a fresh attempt.

TryRecordDispatchAsync(Guid, string, CancellationToken)

Atomically records that a step has been dispatched for execution. Returns true if this is the first dispatch for this step in this run; false if the step was already dispatched (idempotent guard — caller should skip).

public Task<bool> TryRecordDispatchAsync(Guid runId, string stepKey, CancellationToken ct = default)

Parameters

runId Guid
stepKey string
ct CancellationToken

Returns

Task<bool>

Remarks

Must use an atomic INSERT-if-not-exists primitive (SQL WHERE NOT EXISTS, PostgreSQL ON CONFLICT DO NOTHING, or TryAdd(TKey, TValue)).

TryRegisterIdempotencyKeyAsync(Guid, string, string, Guid)

Atomically registers an idempotency key for the given run.

public Task<bool> TryRegisterIdempotencyKeyAsync(Guid flowId, string triggerKey, string idempotencyKey, Guid runId)

Parameters

flowId Guid
triggerKey string
idempotencyKey string
runId Guid

Returns

Task<bool>

true if the key was registered by this call; false if a record with this key already existed (duplicate trigger).