From 98ef8fb2e69266d6a69bb30b1905102bdde79cd8 Mon Sep 17 00:00:00 2001 From: Anthony Accomazzo Date: Sun, 18 May 2025 20:43:24 -0700 Subject: [PATCH] kinesis-draft --- assets/svelte/consumers/ShowSink.svelte | 10 ++ assets/svelte/consumers/ShowSinkHeader.svelte | 3 + .../svelte/consumers/SinkConsumerForm.svelte | 3 + assets/svelte/consumers/SinkIndex.svelte | 9 +- assets/svelte/consumers/types.ts | 18 +++- .../svelte/sinks/kinesis/KinesisIcon.svelte | 30 ++++++ .../sinks/kinesis/KinesisSinkCard.svelte | 55 +++++++++++ .../sinks/kinesis/KinesisSinkForm.svelte | 92 +++++++++++++++++++ lib/sequin/consumers/guards.ex | 4 + lib/sequin/consumers/kinesis_sink.ex | 44 +++++++++ lib/sequin/runtime/kinesis_pipeline.ex | 66 +++++++++++++ lib/sequin/runtime/sink_pipeline.ex | 1 + lib/sequin/transforms/transforms.ex | 12 +++ 13 files changed, 344 insertions(+), 3 deletions(-) create mode 100644 assets/svelte/sinks/kinesis/KinesisIcon.svelte create mode 100644 assets/svelte/sinks/kinesis/KinesisSinkCard.svelte create mode 100644 assets/svelte/sinks/kinesis/KinesisSinkForm.svelte create mode 100644 lib/sequin/consumers/kinesis_sink.ex create mode 100644 lib/sequin/runtime/kinesis_pipeline.ex diff --git a/assets/svelte/consumers/ShowSink.svelte b/assets/svelte/consumers/ShowSink.svelte index 6a5d6f157..d47083467 100644 --- a/assets/svelte/consumers/ShowSink.svelte +++ b/assets/svelte/consumers/ShowSink.svelte @@ -27,6 +27,7 @@ TypesenseConsumer, ElasticsearchConsumer, RedisStringConsumer, + KinesisConsumer, } from "./types"; import SinkCardHttpPush from "../components/SinkCardHttpPush.svelte"; import SqsSinkCard from "../sinks/sqs/SqsSinkCard.svelte"; @@ -40,6 +41,7 @@ import TypesenseSinkCard from "../sinks/typesense/TypesenseSinkCard.svelte"; import ElasticsearchSinkCard from "../sinks/elasticsearch/ElasticsearchSinkCard.svelte"; import RedisStringSinkCard from "../sinks/redis-string/RedisStringSinkCard.svelte"; + import KinesisSinkCard from "../sinks/kinesis/KinesisSinkCard.svelte"; import * as d3 from "d3"; import { onMount } from "svelte"; import HealthAlerts from "$lib/health/HealthAlerts.svelte"; @@ -135,6 +137,12 @@ return consumer.sink.type === "redis_string"; } + function isKinesisConsumer( + consumer: Consumer, + ): consumer is KinesisConsumer { + return consumer.sink.type === "kinesis"; + } + function isNatsConsumer(consumer: Consumer): consumer is NatsConsumer { return consumer.sink.type === "nats"; } @@ -1179,6 +1187,8 @@ {:else if isRedisStringConsumer(consumer)} + {:else if isKinesisConsumer(consumer)} + {/if} diff --git a/assets/svelte/consumers/ShowSinkHeader.svelte b/assets/svelte/consumers/ShowSinkHeader.svelte index ba557a420..09efeb0fe 100644 --- a/assets/svelte/consumers/ShowSinkHeader.svelte +++ b/assets/svelte/consumers/ShowSinkHeader.svelte @@ -27,6 +27,7 @@ import AzureEventHubIcon from "../sinks/azure_event_hub/AzureEventHubIcon.svelte"; import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte"; import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte"; + import KinesisIcon from "../sinks/kinesis/KinesisIcon.svelte"; import StopSinkModal from "./StopSinkModal.svelte"; export let consumer; @@ -156,6 +157,8 @@ {:else if consumer.sink.type === "sns"} + {:else if consumer.sink.type === "kinesis"} + {/if}

{consumer.name} diff --git a/assets/svelte/consumers/SinkConsumerForm.svelte b/assets/svelte/consumers/SinkConsumerForm.svelte index bfb95903b..90e6ca7b1 100644 --- a/assets/svelte/consumers/SinkConsumerForm.svelte +++ b/assets/svelte/consumers/SinkConsumerForm.svelte @@ -35,6 +35,7 @@ import { CircleAlert, Info, Plus } from "lucide-svelte"; import TypesenseSinkForm from "$lib/sinks/typesense/TypesenseSinkForm.svelte"; import ElasticsearchSinkForm from "$lib/sinks/elasticsearch/ElasticsearchSinkForm.svelte"; + import KinesisSinkForm from "$lib/sinks/kinesis/KinesisSinkForm.svelte"; import * as Alert from "$lib/components/ui/alert/index.js"; import TableSelector from "../components/TableSelector.svelte"; import * as Tooltip from "$lib/components/ui/tooltip"; @@ -702,6 +703,8 @@ {:else if consumer.type === "elasticsearch"} + {:else if consumer.type === "kinesis"} + {/if} diff --git a/assets/svelte/consumers/SinkIndex.svelte b/assets/svelte/consumers/SinkIndex.svelte index 79b4549d3..e7059d1c6 100644 --- a/assets/svelte/consumers/SinkIndex.svelte +++ b/assets/svelte/consumers/SinkIndex.svelte @@ -30,6 +30,7 @@ import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte"; import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte"; + import KinesisIcon from "../sinks/kinesis/KinesisIcon.svelte"; import { Badge } from "$lib/components/ui/badge"; import * as d3 from "d3"; @@ -50,7 +51,8 @@ | "nats" | "rabbitmq" | "typesense" - | "elasticsearch"; + | "elasticsearch" + | "kinesis"; status: "active" | "disabled" | "paused"; database_name: string; @@ -136,6 +138,11 @@ name: "Elasticsearch", icon: ElasticsearchIcon, }, + { + id: "kinesis", + name: "Amazon Kinesis", + icon: KinesisIcon, + }, ]; function handleConsumerClick(id: string, type: string) { diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts index b374a9c1b..8b906d52a 100644 --- a/assets/svelte/consumers/types.ts +++ b/assets/svelte/consumers/types.ts @@ -16,7 +16,8 @@ export type BaseConsumer = { | "rabbitmq" | "typesense" | "elasticsearch" - | "redis_string"; + | "redis_string" + | "kinesis"; name: string; annotations: Record; status: "active" | "paused" | "disabled"; @@ -210,6 +211,18 @@ export type ElasticsearchConsumer = BaseConsumer & { }; }; +// Kinesis specific sink +export type KinesisConsumer = BaseConsumer & { + sink: { + type: "kinesis"; + stream_name: string; + region: string; + access_key_id: string; + secret_access_key: string; + partition_key_field: string; + }; +}; + // Union type for all consumer types export type Consumer = | HttpPushConsumer @@ -224,4 +237,5 @@ export type Consumer = | TypesenseConsumer | SnsConsumer | ElasticsearchConsumer - | RedisStringConsumer; + | RedisStringConsumer + | KinesisConsumer; diff --git a/assets/svelte/sinks/kinesis/KinesisIcon.svelte b/assets/svelte/sinks/kinesis/KinesisIcon.svelte new file mode 100644 index 000000000..5914bac84 --- /dev/null +++ b/assets/svelte/sinks/kinesis/KinesisIcon.svelte @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + diff --git a/assets/svelte/sinks/kinesis/KinesisSinkCard.svelte b/assets/svelte/sinks/kinesis/KinesisSinkCard.svelte new file mode 100644 index 000000000..70024e882 --- /dev/null +++ b/assets/svelte/sinks/kinesis/KinesisSinkCard.svelte @@ -0,0 +1,55 @@ + + + + +
+

Kinesis Configuration

+ +
+ +
+
+
+ Region +
+ {consumer.sink.region} +
+
+ +
+ Partition Key Field +
+ {consumer.sink.partition_key_field} +
+
+
+
+ Stream Name +
+ {consumer.sink.stream_name} +
+
+
+
+
diff --git a/assets/svelte/sinks/kinesis/KinesisSinkForm.svelte b/assets/svelte/sinks/kinesis/KinesisSinkForm.svelte new file mode 100644 index 000000000..65d42f7ab --- /dev/null +++ b/assets/svelte/sinks/kinesis/KinesisSinkForm.svelte @@ -0,0 +1,92 @@ + + + + + Kinesis Configuration + + +
+ + + {#if errors.sink?.stream_name} +

{errors.sink.stream_name}

+ {/if} +
+ +
+ + + {#if errors.sink?.region} +

{errors.sink.region}

+ {/if} +
+ +
+ + +

+ Field to use for Kinesis partition key. Defaults to "id" if not specified. +

+ {#if errors.sink?.partition_key_field} +

{errors.sink.partition_key_field}

+ {/if} +
+ +
+ + + {#if errors.sink?.access_key_id} +

+ {errors.sink.access_key_id} +

+ {/if} +
+ +
+ + + {#if errors.sink?.secret_access_key} +

+ {errors.sink.secret_access_key} +

+ {/if} +
+
+
diff --git a/lib/sequin/consumers/guards.ex b/lib/sequin/consumers/guards.ex index 61e2d7823..55b7189f0 100644 --- a/lib/sequin/consumers/guards.ex +++ b/lib/sequin/consumers/guards.ex @@ -7,6 +7,7 @@ defmodule Sequin.Consumers.Guards do alias Sequin.Consumers.ConsumerRecord alias Sequin.Consumers.RedisStreamSink alias Sequin.Consumers.RedisStringSink + alias Sequin.Consumers.KinesisSink @doc """ Guard that checks if the given term is either a ConsumerEvent or a ConsumerRecord. @@ -16,4 +17,7 @@ defmodule Sequin.Consumers.Guards do defguard is_redis_sink(sink) when is_struct(sink, RedisStreamSink) or is_struct(sink, RedisStringSink) + + defguard is_kinesis_sink(sink) + when is_struct(sink, KinesisSink) end diff --git a/lib/sequin/consumers/kinesis_sink.ex b/lib/sequin/consumers/kinesis_sink.ex new file mode 100644 index 000000000..c9324a2c8 --- /dev/null +++ b/lib/sequin/consumers/kinesis_sink.ex @@ -0,0 +1,44 @@ +defmodule Sequin.Consumers.KinesisSink do + @moduledoc "Represents configuration for sinking events to an AWS Kinesis Data Stream." + use Ecto.Schema + use TypedEctoSchema + + import Ecto.Changeset + + alias Sequin.Aws.HttpClient + alias Sequin.Encrypted + + @derive {Jason.Encoder, only: [:stream_name, :region]} + @derive {Inspect, except: [:secret_access_key]} + @primary_key false + typed_embedded_schema do + field :type, Ecto.Enum, values: [:kinesis], default: :kinesis + field :stream_name, :string + field :region, :string + field :access_key_id, :string + field :secret_access_key, Encrypted.Field + field :partition_key_field, :string, default: "id" + end + + def changeset(struct, params) do + struct + |> cast(params, [:stream_name, :region, :access_key_id, :secret_access_key, :partition_key_field]) + |> validate_required([:stream_name, :region, :access_key_id, :secret_access_key]) + |> validate_stream_name() + |> validate_length(:partition_key_field, max: 100) + end + + defp validate_stream_name(changeset) do + changeset + |> validate_format(:stream_name, ~r/^[a-zA-Z0-9_.-]+$/, + message: "must contain only alphanumeric characters, hyphens, underscores, and periods" + ) + |> validate_length(:stream_name, min: 1, max: 128) + end + + def aws_client(%__MODULE__{} = sink) do + sink.access_key_id + |> AWS.Client.create(sink.secret_access_key, sink.region) + |> HttpClient.put_client() + end +end diff --git a/lib/sequin/runtime/kinesis_pipeline.ex b/lib/sequin/runtime/kinesis_pipeline.ex new file mode 100644 index 000000000..918f8e79a --- /dev/null +++ b/lib/sequin/runtime/kinesis_pipeline.ex @@ -0,0 +1,66 @@ +defmodule Sequin.Runtime.KinesisPipeline do + @moduledoc false + alias Sequin.Consumers.KinesisSink + alias Sequin.Consumers.SinkConsumer + alias Sequin.Runtime.SinkPipeline + alias Sequin.Transforms.Message + + def send(%SinkConsumer{sink: %KinesisSink{} = sink} = consumer, messages) when is_list(messages) do + client = KinesisSink.aws_client(sink) + + messages + |> Stream.map(fn msg -> + case Message.to_external(consumer, msg.data) do + {:ok, data} -> {msg, data} + {:error, reason} -> {msg, {:error, reason}} + end + end) + |> Stream.filter(fn {_, data} -> !match?({:error, _}, data) end) + |> Stream.map(fn {msg, data} -> + partition_key = extract_partition_key(data, sink.partition_key_field) + + {msg, %{ + "Data" => Jason.encode!(data), + "PartitionKey" => partition_key, + "StreamName" => sink.stream_name + }} + end) + |> SinkPipeline.batch_and_send_to_service("Kinesis", client, &put_records/2) + end + + defp extract_partition_key(data, partition_key_field) do + cond do + is_map(data) && Map.has_key?(data, partition_key_field) -> + to_string(data[partition_key_field]) + + is_map(data) && Map.has_key?(data, String.to_atom(partition_key_field)) -> + to_string(data[String.to_atom(partition_key_field)]) + + true -> + # If the key doesn't exist, generate a random partition key + :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower) + end + end + + defp put_records(client, records) do + request_body = %{ + "Records" => records + } + + case AWS.Kinesis.put_records(client, request_body) do + {:ok, %{"FailedRecordCount" => 0}} -> + {:ok, length(records)} + + {:ok, %{"FailedRecordCount" => failed_count, "Records" => failed_records}} -> + failed_indices = failed_records + |> Enum.with_index() + |> Enum.filter(fn {record, _index} -> Map.has_key?(record, "ErrorCode") end) + |> Enum.map(fn {_record, index} -> index end) + + {:partial_error, length(records) - failed_count, failed_indices} + + {:error, error} -> + {:error, error} + end + end +end diff --git a/lib/sequin/runtime/sink_pipeline.ex b/lib/sequin/runtime/sink_pipeline.ex index bf4422f9d..2dfc1bded 100644 --- a/lib/sequin/runtime/sink_pipeline.ex +++ b/lib/sequin/runtime/sink_pipeline.ex @@ -319,6 +319,7 @@ defmodule Sequin.Runtime.SinkPipeline do :gcp_pubsub -> Sequin.Runtime.GcpPubsubPipeline :http_push -> Sequin.Runtime.HttpPushPipeline :kafka -> Sequin.Runtime.KafkaPipeline + :kinesis -> Sequin.Runtime.KinesisPipeline :nats -> Sequin.Runtime.NatsPipeline :rabbitmq -> Sequin.Runtime.RabbitMqPipeline :redis_stream -> Sequin.Runtime.RedisStreamPipeline diff --git a/lib/sequin/transforms/transforms.ex b/lib/sequin/transforms/transforms.ex index be9ef7031..9fbe1b88f 100644 --- a/lib/sequin/transforms/transforms.ex +++ b/lib/sequin/transforms/transforms.ex @@ -11,6 +11,7 @@ defmodule Sequin.Transforms do alias Sequin.Consumers.HttpEndpoint alias Sequin.Consumers.HttpPushSink alias Sequin.Consumers.KafkaSink + alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.PathTransform alias Sequin.Consumers.RabbitMqSink @@ -242,6 +243,17 @@ defmodule Sequin.Transforms do }) end + def to_external(%KinesisSink{} = sink, show_sensitive) do + Sequin.Map.reject_nil_values(%{ + type: "kinesis", + stream_name: sink.stream_name, + region: sink.region, + access_key_id: maybe_obfuscate(sink.access_key_id, show_sensitive), + secret_access_key: maybe_obfuscate(sink.secret_access_key, show_sensitive), + partition_key_field: sink.partition_key_field + }) + end + def to_external(%GcpPubsubSink{} = sink, _show_sensitive) do Sequin.Map.reject_nil_values(%{ type: "gcp_pubsub",