diff --git a/lib/sequin/consumers/consumers.ex b/lib/sequin/consumers/consumers.ex index 8d5b6ca34..ca47604ae 100644 --- a/lib/sequin/consumers/consumers.ex +++ b/lib/sequin/consumers/consumers.ex @@ -21,7 +21,6 @@ defmodule Sequin.Consumers do alias Sequin.Consumers.Transform alias Sequin.Databases alias Sequin.Databases.PostgresDatabase - alias Sequin.Databases.PostgresDatabaseTable alias Sequin.Databases.Sequence alias Sequin.Error alias Sequin.Health @@ -1309,16 +1308,6 @@ defmodule Sequin.Consumers do Enum.any?(all_consumers(), fn consumer -> is_nil(consumer.sequence_id) end) end - def group_column_values(%SinkConsumer{} = consumer, record_data) do - table = Sequin.Enum.find!(consumer.postgres_database.tables, &(&1.oid == consumer.sequence.table_oid)) - group_column_attnums = consumer.sequence_filter.group_column_attnums - group_column_names = PostgresDatabaseTable.column_attnums_to_names(table, group_column_attnums) - - Enum.map(group_column_names, fn group_column_name -> - Map.get(record_data.record, group_column_name) - end) - end - def get_backfill(id) do case Repo.get(Backfill, id) do nil -> {:error, Error.not_found(entity: :backfill, params: %{id: id})} diff --git a/lib/sequin/runtime/gcp_pubsub_pipeline.ex b/lib/sequin/runtime/gcp_pubsub_pipeline.ex index 565554b0b..f300710b8 100644 --- a/lib/sequin/runtime/gcp_pubsub_pipeline.ex +++ b/lib/sequin/runtime/gcp_pubsub_pipeline.ex @@ -31,12 +31,12 @@ defmodule Sequin.Runtime.GcpPubsubPipeline do @impl SinkPipeline def handle_message(message, context) do - %{consumer: consumer, test_pid: test_pid} = context + %{test_pid: test_pid} = context setup_allowances(test_pid) record_or_event = message.data - ordering_key = ordering_key(consumer, record_or_event.data) + ordering_key = record_or_event.group_id message = Broadway.Message.put_batch_key(message, ordering_key) {:ok, message, context} @@ -69,7 +69,7 @@ defmodule Sequin.Runtime.GcpPubsubPipeline do "type" => "record", "table_name" => record.data.metadata.table_name }, - ordering_key: ordering_key(consumer, record.data) + ordering_key: record.group_id } end @@ -82,16 +82,10 @@ defmodule Sequin.Runtime.GcpPubsubPipeline do "table_name" => event.data.metadata.table_name, "action" => to_string(event.data.action) }, - ordering_key: ordering_key(consumer, event.data) + ordering_key: event.group_id } end - defp ordering_key(consumer, data) do - consumer - |> Sequin.Consumers.group_column_values(data) - |> Enum.join(":") - end - defp setup_allowances(nil), do: :ok defp setup_allowances(test_pid) do diff --git a/lib/sequin/runtime/message_handler.ex b/lib/sequin/runtime/message_handler.ex index 5cc6542b9..273686aed 100644 --- a/lib/sequin/runtime/message_handler.ex +++ b/lib/sequin/runtime/message_handler.ex @@ -521,13 +521,13 @@ defmodule Sequin.Runtime.MessageHandler do # This should be way more assertive - we should error if we don't find the source table # We have a lot of tests that do not line up consumer source_tables with the message table oid if consumer.sequence_filter.group_column_attnums do - Enum.map_join(consumer.sequence_filter.group_column_attnums, ",", fn attnum -> + Enum.map_join(consumer.sequence_filter.group_column_attnums, ":", fn attnum -> fields = if message.action == :delete, do: message.old_fields, else: message.fields field = Sequin.Enum.find!(fields, &(&1.column_attnum == attnum)) to_string(field.value) end) else - Enum.map_join(message.ids, ",", &to_string/1) + Enum.map_join(message.ids, ":", &to_string/1) end end diff --git a/lib/sequin/runtime/sink_pipeline.ex b/lib/sequin/runtime/sink_pipeline.ex index bf4422f9d..16a696187 100644 --- a/lib/sequin/runtime/sink_pipeline.ex +++ b/lib/sequin/runtime/sink_pipeline.ex @@ -103,7 +103,9 @@ defmodule Sequin.Runtime.SinkPipeline do consumer = opts |> Keyword.fetch!(:consumer) - |> Repo.lazy_preload([:sequence, :postgres_database, :transform, :routing]) + |> Repo.lazy_preload([:sequence, :transform, :routing]) + # Ensure db is not on there + |> Ecto.reset_fields([:postgres_database]) slot_message_store_mod = Keyword.get(opts, :slot_message_store_mod, Sequin.Runtime.SlotMessageStore) producer = Keyword.get(opts, :producer, Sequin.Runtime.ConsumerProducer) diff --git a/lib/sequin/runtime/sns_pipeline.ex b/lib/sequin/runtime/sns_pipeline.ex index df74fd729..5a93e7508 100644 --- a/lib/sequin/runtime/sns_pipeline.ex +++ b/lib/sequin/runtime/sns_pipeline.ex @@ -61,12 +61,7 @@ defmodule Sequin.Runtime.SnsPipeline do } if consumer.sink.is_fifo do - group_id = - consumer - |> Sequin.Consumers.group_column_values(record_or_event.data) - |> Enum.join(",") - - Map.put(message, :message_group_id, group_id) + Map.put(message, :message_group_id, record_or_event.group_id) else message end diff --git a/lib/sequin/runtime/sqs_pipeline.ex b/lib/sequin/runtime/sqs_pipeline.ex index 6d282887d..ab95fe917 100644 --- a/lib/sequin/runtime/sqs_pipeline.ex +++ b/lib/sequin/runtime/sqs_pipeline.ex @@ -61,12 +61,7 @@ defmodule Sequin.Runtime.SqsPipeline do } if consumer.sink.is_fifo do - group_id = - consumer - |> Sequin.Consumers.group_column_values(record_or_event.data) - |> Enum.join(",") - - Map.put(message, :message_group_id, group_id) + Map.put(message, :message_group_id, record_or_event.group_id) else message end diff --git a/lib/sequin/sinks/kafka/kafka.ex b/lib/sequin/sinks/kafka/kafka.ex index 1ca8940e7..5d921a05f 100644 --- a/lib/sequin/sinks/kafka/kafka.ex +++ b/lib/sequin/sinks/kafka/kafka.ex @@ -1,6 +1,5 @@ defmodule Sequin.Sinks.Kafka do @moduledoc false - alias Sequin.Consumers alias Sequin.Consumers.ConsumerEvent alias Sequin.Consumers.ConsumerRecord alias Sequin.Consumers.KafkaSink @@ -43,16 +42,12 @@ defmodule Sequin.Sinks.Kafka do end @spec message_key(SinkConsumer.t(), ConsumerRecord.t() | ConsumerEvent.t()) :: String.t() - def message_key(%SinkConsumer{sink: %KafkaSink{}} = consumer, %ConsumerRecord{} = record) do - consumer - |> Consumers.group_column_values(record.data) - |> Enum.join(":") + def message_key(%SinkConsumer{sink: %KafkaSink{}}, %ConsumerRecord{} = record) do + record.group_id end - def message_key(%SinkConsumer{sink: %KafkaSink{}} = consumer, %ConsumerEvent{} = event) do - consumer - |> Consumers.group_column_values(event.data) - |> Enum.join(":") + def message_key(%SinkConsumer{sink: %KafkaSink{}}, %ConsumerEvent{} = event) do + event.group_id end defp impl do