Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add annotations to subscriptions table #295

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion lib/event_store/sql/init.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,24 @@ defmodule EventStore.Sql.Init do
create_subscription_index(),
create_snapshots_table(column_data_type),
create_schema_migrations_table(),
record_event_store_schema_version()
record_event_store_schema_version(),
add_annotations_to_subscriptions(),
create_subscription_annotations_index()
]
end

defp add_annotations_to_subscriptions do
"""
ALTER TABLE subscriptions ADD COLUMN annotations jsonb;
"""
end

defp create_subscription_annotations_index do
"""
CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations);
"""
end

defp create_streams_table do
"""
CREATE TABLE streams
Expand Down
7 changes: 4 additions & 3 deletions lib/event_store/sql/statements/insert_subscription.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ INSERT INTO "<%= schema %>".subscriptions
(
stream_uuid,
subscription_name,
last_seen
last_seen,
annotations
)
VALUES ($1, $2, $3)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at;
VALUES ($1, $2, $3, $4)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at, annotations;
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SELECT
stream_uuid,
subscription_name,
last_seen,
created_at
created_at,
annotations
FROM "<%= schema %>".subscriptions
ORDER BY created_at;
3 changes: 2 additions & 1 deletion lib/event_store/sql/statements/query_subscription.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SELECT
stream_uuid,
subscription_name,
last_seen,
created_at
created_at,
annotations
FROM "<%= schema %>".subscriptions
WHERE stream_uuid = $1 AND subscription_name = $2;
61 changes: 54 additions & 7 deletions lib/event_store/storage/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,40 @@ defmodule EventStore.Storage.Subscription do
QuerySubscription
}

@typedoc """
A subscription to an event stream.

* `subscription_id` - Unique identifier for the subscription
* `stream_uuid` - The stream being subscribed to
* `subscription_name` - Name of the subscription
* `last_seen` - Last event seen by this subscription
* `created_at` - When the subscription was created
* `annotations` - Arbitrary non-identifying metadata attached to the
subscription, inspired by Kubernetes annotations. These are key-value pairs
that can be used to store auxiliary information about a subscription that
is not directly part of its core functionality. For example:
* Build/release information (team owner, git sha, etc.)
* Client-specific configuration
* Debugging info
* Tool information
"""
@type t :: %EventStore.Storage.Subscription{
subscription_id: non_neg_integer(),
stream_uuid: String.t(),
subscription_name: String.t(),
last_seen: non_neg_integer() | nil,
created_at: DateTime.t()
created_at: DateTime.t(),
annotations: map()
}

defstruct [:subscription_id, :stream_uuid, :subscription_name, :last_seen, :created_at]
defstruct [
:subscription_id,
:stream_uuid,
:subscription_name,
:last_seen,
:created_at,
:annotations
]

defdelegate subscriptions(conn, opts), to: QueryAllSubscriptions, as: :execute

Expand Down Expand Up @@ -49,7 +74,17 @@ defmodule EventStore.Storage.Subscription do
do: Subscription.Delete.execute(conn, stream_uuid, subscription_name, opts)

defp create_subscription(conn, stream_uuid, subscription_name, start_from, opts) do
case CreateSubscription.execute(conn, stream_uuid, subscription_name, start_from, opts) do
{splitted_opts, opts} = Keyword.split(opts, [:annotations])
annotations = Keyword.get(splitted_opts, :annotations) || %{}

case CreateSubscription.execute(
conn,
stream_uuid,
subscription_name,
start_from,
annotations,
opts
) do
{:ok, %Subscription{}} = reply ->
reply

Expand Down Expand Up @@ -96,7 +131,8 @@ defmodule EventStore.Storage.Subscription do
defmodule CreateSubscription do
@moduledoc false

def execute(conn, stream_uuid, subscription_name, start_from, opts) do
def execute(conn, stream_uuid, subscription_name, start_from, annotations, opts)
when is_map(annotations) do
Logger.debug(
"Attempting to create subscription on stream " <>
inspect(stream_uuid) <>
Expand All @@ -107,7 +143,12 @@ defmodule EventStore.Storage.Subscription do

query = Statements.insert_subscription(schema)

case Postgrex.query(conn, query, [stream_uuid, subscription_name, start_from], opts) do
case Postgrex.query(
conn,
query,
[stream_uuid, subscription_name, start_from, annotations],
opts
) do
{:ok, %Postgrex.Result{rows: rows}} ->
Logger.debug(
"Created subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\""
Expand Down Expand Up @@ -200,16 +241,22 @@ defmodule EventStore.Storage.Subscription do
stream_uuid,
subscription_name,
last_seen,
created_at
created_at,
annotations
] = row

%Subscription{
subscription_id: subscription_id,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
last_seen: last_seen,
created_at: created_at
created_at: created_at,
annotations: annotations_from_row(annotations)
}
end

defp annotations_from_row(nil), do: %{}
defp annotations_from_row([]), do: %{}
defp annotations_from_row(annotations), do: annotations
end
end
7 changes: 7 additions & 0 deletions priv/event_store/migrations/v1.4.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add annotations field to subscriptions table

ALTER TABLE subscriptions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about this file btw, do we even need these files since we use the SQL init module?

ADD COLUMN annotations jsonb DEFAULT '{}'::jsonb NOT NULL;

-- Add index on annotations for better query performance
CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations);
37 changes: 33 additions & 4 deletions test/storage/subscription_persistence_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,44 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do
verify_subscription(subscription, 1)
end

test "create subscription with annotations", context do
annotations = %{"key" => "value", "nested" => %{"data" => 123}}
{:ok, subscription} = subscribe_to_stream(context, annotations: annotations)

verify_subscription(subscription, nil, annotations)
end

test "create subscription when already exists preserves annotations", context do
annotations = %{"key" => "value", "metadata" => %{"version" => 1}}
{:ok, subscription1} = subscribe_to_stream(context, annotations: annotations)

# No annotations provided
{:ok, subscription2} = subscribe_to_stream(context)
# Explicit nil
{:ok, subscription3} = subscribe_to_stream(context, annotations: nil)
# Empty map
{:ok, subscription4} = subscribe_to_stream(context, annotations: %{})

assert subscription1.subscription_id == subscription2.subscription_id
assert subscription2.subscription_id == subscription3.subscription_id
assert subscription3.subscription_id == subscription4.subscription_id
assert subscription1.annotations == annotations
assert subscription2.annotations == annotations
assert subscription3.annotations == annotations
assert subscription4.annotations == annotations
end

def ack_last_seen_event(context, last_seen) do
%{conn: conn, schema: schema} = context

Storage.ack_last_seen_event(conn, @all_stream, @subscription_name, last_seen, schema: schema)
end

defp subscribe_to_stream(context) do
defp subscribe_to_stream(context, opts \\ []) do
%{conn: conn, schema: schema} = context
opts = Keyword.merge([schema: schema], opts)

Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, schema: schema)
Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, opts)
end

defp delete_subscription(context) do
Expand All @@ -93,13 +121,14 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do
Storage.subscriptions(conn, schema: schema)
end

defp verify_subscription(subscription, last_seen \\ nil)
defp verify_subscription(subscription, last_seen \\ nil, annotations \\ %{})

defp verify_subscription(subscription, last_seen) do
defp verify_subscription(subscription, last_seen, annotations) do
assert subscription.subscription_id > 0
assert subscription.stream_uuid == @all_stream
assert subscription.subscription_name == @subscription_name
assert subscription.last_seen == last_seen
assert subscription.created_at != nil
assert subscription.annotations == annotations
end
end
Loading