evoq_event_store (evoq v1.23.0)

View Source

Wrapper for event store operations via adapter.

Provides a consistent interface for event store operations, delegating to a configured adapter.

Configuration (Required)

You must configure an adapter in your application config:

  {evoq, [
      {event_store_adapter, evoq_esdb_gater_adapter}
  ]}

Available adapters: - evoq_esdb_gater_adapter (from reckon_evoq package)

Summary

Functions

Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).

Read DCB events whose payload field Key equals Value (CCC).

Read DCB events matching a composite payload field combination (CCC).

Check if a stream exists.

Get the configured event store adapter. Crashes if no adapter is configured.

Check if a store contains at least one event.

List all streams in the store.

Payload field key-sets hash-indexed in a store ({payload_hash, Keys}).

Payload field keys individually indexed in a store ({payload, Key}).

Read all events from a stream.

Read all events from a stream with batch size.

Read all events from all streams, sorted by global position. This is useful for projection rebuild.

Read all events across all streams in global order.

Read events across streams whose metadata Key equals Value. The cross-cutting lineage primitive (causation_id / correlation_id / conversation_id). Backed by reckon-db's {meta, Key} index when declared.

Read events across streams by tag match. Match is the atom any (union) or all (intersection).

Read all events of specific types from all streams.

Set the event store adapter (primarily for testing).

Get the current version of a stream.

Types

evoq_event/0

-type evoq_event() ::
          #evoq_event{event_id :: binary(),
                      event_type :: binary() | undefined,
                      stream_id :: binary(),
                      version :: non_neg_integer(),
                      data :: map() | binary(),
                      metadata :: map(),
                      tags :: [binary()] | undefined,
                      timestamp :: integer(),
                      epoch_us :: integer(),
                      data_content_type :: binary(),
                      metadata_content_type :: binary(),
                      prev_event_hash :: binary() | undefined}.

Functions

append(StoreId, StreamId, ExpectedVersion, Events)

-spec append(atom(), binary(), integer(), [map()]) -> {ok, non_neg_integer()} | {error, term()}.

Append events to a stream.

append_if_no_tag_matches(StoreId, TagFilter, SeqCutoff, Events)

-spec append_if_no_tag_matches(atom(), term(), integer(), [map()]) ->
                                  {ok, non_neg_integer()} |
                                  {error, {context_changed, non_neg_integer()}} |
                                  {error, term()}.

Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).

TagFilter is the consistency context query (a backend-defined tag-filter term). SeqCutoff is the highest seq the caller saw (or -1 for "saw nothing"). Returns {error, {context_changed, MaxSeq}} if any event matching TagFilter has seq above SeqCutoff.

The typical caller is evoq_decision_runtime; user code uses the evoq_decision behaviour rather than calling this directly.

ccc_read_by_payload(StoreId, Key, Value, BatchSize)

-spec ccc_read_by_payload(atom(), binary(), binary(), pos_integer()) -> {ok, [map()]} | {error, term()}.

Read DCB events whose payload field Key equals Value (CCC).

Backed by reckon-db's {payload, Key} index, evaluated server-side. The consistency-boundary read counterpart to the payload condition that append_if_no_tag_matches/4 already evaluates atomically at append. Requires the store to declare the {payload, Key} index; an undeclared index surfaces as a backend error here, which the decision runtime maps to {payload_index_unavailable, Filter}.

ccc_read_by_payload_hash(StoreId, Keys, Values, BatchSize)

-spec ccc_read_by_payload_hash(atom(), [binary()], [binary()], pos_integer()) ->
                                  {ok, [map()]} | {error, term()}.

Read DCB events matching a composite payload field combination (CCC).

Backed by reckon-db's {payload_hash, Keys} index. All Keys must match their corresponding Values; field order is ignored.

exists(StoreId, StreamId)

-spec exists(atom(), binary()) -> boolean().

Check if a stream exists.

get_adapter()

-spec get_adapter() -> module().

Get the configured event store adapter. Crashes if no adapter is configured.

has_events(StoreId)

-spec has_events(atom()) -> boolean().

Check if a store contains at least one event.

list_streams(StoreId)

-spec list_streams(atom()) -> {ok, [binary()]} | {error, term()}.

List all streams in the store.

payload_hash_indexes(StoreId)

-spec payload_hash_indexes(atom()) -> {ok, [[binary()]]} | {error, term()}.

Payload field key-sets hash-indexed in a store ({payload_hash, Keys}).

payload_indexes(StoreId)

-spec payload_indexes(atom()) -> {ok, [binary()]} | {error, term()}.

Payload field keys individually indexed in a store ({payload, Key}).

Used by the decision runtime to fail early when a payload filter references an undeclared index. Returns {ok, [Key]}.

read(StoreId, StreamId, FromVersion, Count, Direction)

-spec read(atom(), binary(), non_neg_integer(), pos_integer(), forward | backward) ->
              {ok, [map()]} | {error, term()}.

Read events from a stream.

read_all(StoreId, StreamId, Direction)

-spec read_all(atom(), binary(), forward | backward) -> {ok, [map()]} | {error, term()}.

Read all events from a stream.

read_all(StoreId, StreamId, BatchSize, Direction)

-spec read_all(atom(), binary(), pos_integer(), forward | backward) -> {ok, [map()]} | {error, term()}.

Read all events from a stream with batch size.

read_all_events(StoreId, BatchSize)

-spec read_all_events(atom(), pos_integer()) -> {ok, [map()]} | {error, term()}.

Read all events from all streams, sorted by global position. This is useful for projection rebuild.

read_all_global(StoreId, Offset, BatchSize)

-spec read_all_global(atom(), non_neg_integer(), pos_integer()) ->
                         {ok, [evoq_event()]} | {error, term()}.

Read all events across all streams in global order.

Returns events sorted by epoch_us, starting from Offset. Used for catch-up subscriptions and global event replay. Falls back to read_all_events/2 if adapter does not implement the optional read_all_global/3 callback.

read_by_metadata(StoreId, Key, Value)

-spec read_by_metadata(atom(), binary(), binary()) -> {ok, [map()]} | {error, term()}.

Read events across streams whose metadata Key equals Value. The cross-cutting lineage primitive (causation_id / correlation_id / conversation_id). Backed by reckon-db's {meta, Key} index when declared.

read_by_tags(StoreId, Tags, Match, BatchSize)

-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) -> {ok, [map()]} | {error, term()}.

Read events across streams by tag match. Match is the atom any (union) or all (intersection).

read_events_by_types(StoreId, EventTypes, BatchSize)

-spec read_events_by_types(atom(), [binary()], pos_integer()) -> {ok, [map()]} | {error, term()}.

Read all events of specific types from all streams.

Routes through the adapter which uses native filtering when available. Returns events sorted by epoch_us (global ordering).

set_adapter(Adapter)

-spec set_adapter(module()) -> ok.

Set the event store adapter (primarily for testing).

version(StoreId, StreamId)

-spec version(atom(), binary()) -> integer().

Get the current version of a stream.