diff --git a/assets/svelte/consumers/ShowSink.svelte b/assets/svelte/consumers/ShowSink.svelte
index 6a5d6f157..d47083467 100644
--- a/assets/svelte/consumers/ShowSink.svelte
+++ b/assets/svelte/consumers/ShowSink.svelte
@@ -27,6 +27,7 @@
TypesenseConsumer,
ElasticsearchConsumer,
RedisStringConsumer,
+ KinesisConsumer,
} from "./types";
import SinkCardHttpPush from "../components/SinkCardHttpPush.svelte";
import SqsSinkCard from "../sinks/sqs/SqsSinkCard.svelte";
@@ -40,6 +41,7 @@
import TypesenseSinkCard from "../sinks/typesense/TypesenseSinkCard.svelte";
import ElasticsearchSinkCard from "../sinks/elasticsearch/ElasticsearchSinkCard.svelte";
import RedisStringSinkCard from "../sinks/redis-string/RedisStringSinkCard.svelte";
+ import KinesisSinkCard from "../sinks/kinesis/KinesisSinkCard.svelte";
import * as d3 from "d3";
import { onMount } from "svelte";
import HealthAlerts from "$lib/health/HealthAlerts.svelte";
@@ -135,6 +137,12 @@
return consumer.sink.type === "redis_string";
}
+ function isKinesisConsumer(
+ consumer: Consumer,
+ ): consumer is KinesisConsumer {
+ return consumer.sink.type === "kinesis";
+ }
+
function isNatsConsumer(consumer: Consumer): consumer is NatsConsumer {
return consumer.sink.type === "nats";
}
@@ -1179,6 +1187,8 @@
{:else if isRedisStringConsumer(consumer)}
+ {:else if isKinesisConsumer(consumer)}
+
{/if}
diff --git a/assets/svelte/consumers/ShowSinkHeader.svelte b/assets/svelte/consumers/ShowSinkHeader.svelte
index ba557a420..09efeb0fe 100644
--- a/assets/svelte/consumers/ShowSinkHeader.svelte
+++ b/assets/svelte/consumers/ShowSinkHeader.svelte
@@ -27,6 +27,7 @@
import AzureEventHubIcon from "../sinks/azure_event_hub/AzureEventHubIcon.svelte";
import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte";
import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte";
+ import KinesisIcon from "../sinks/kinesis/KinesisIcon.svelte";
import StopSinkModal from "./StopSinkModal.svelte";
export let consumer;
@@ -156,6 +157,8 @@
{:else if consumer.sink.type === "sns"}
+ {:else if consumer.sink.type === "kinesis"}
+
{/if}
{consumer.name}
diff --git a/assets/svelte/consumers/SinkConsumerForm.svelte b/assets/svelte/consumers/SinkConsumerForm.svelte
index bfb95903b..90e6ca7b1 100644
--- a/assets/svelte/consumers/SinkConsumerForm.svelte
+++ b/assets/svelte/consumers/SinkConsumerForm.svelte
@@ -35,6 +35,7 @@
import { CircleAlert, Info, Plus } from "lucide-svelte";
import TypesenseSinkForm from "$lib/sinks/typesense/TypesenseSinkForm.svelte";
import ElasticsearchSinkForm from "$lib/sinks/elasticsearch/ElasticsearchSinkForm.svelte";
+ import KinesisSinkForm from "$lib/sinks/kinesis/KinesisSinkForm.svelte";
import * as Alert from "$lib/components/ui/alert/index.js";
import TableSelector from "../components/TableSelector.svelte";
import * as Tooltip from "$lib/components/ui/tooltip";
@@ -702,6 +703,8 @@
{:else if consumer.type === "elasticsearch"}
+ {:else if consumer.type === "kinesis"}
+
{/if}
diff --git a/assets/svelte/consumers/SinkIndex.svelte b/assets/svelte/consumers/SinkIndex.svelte
index 79b4549d3..e7059d1c6 100644
--- a/assets/svelte/consumers/SinkIndex.svelte
+++ b/assets/svelte/consumers/SinkIndex.svelte
@@ -30,6 +30,7 @@
import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte";
import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte";
+ import KinesisIcon from "../sinks/kinesis/KinesisIcon.svelte";
import { Badge } from "$lib/components/ui/badge";
import * as d3 from "d3";
@@ -50,7 +51,8 @@
| "nats"
| "rabbitmq"
| "typesense"
- | "elasticsearch";
+ | "elasticsearch"
+ | "kinesis";
status: "active" | "disabled" | "paused";
database_name: string;
@@ -136,6 +138,11 @@
name: "Elasticsearch",
icon: ElasticsearchIcon,
},
+ {
+ id: "kinesis",
+ name: "Amazon Kinesis",
+ icon: KinesisIcon,
+ },
];
function handleConsumerClick(id: string, type: string) {
diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts
index b374a9c1b..8b906d52a 100644
--- a/assets/svelte/consumers/types.ts
+++ b/assets/svelte/consumers/types.ts
@@ -16,7 +16,8 @@ export type BaseConsumer = {
| "rabbitmq"
| "typesense"
| "elasticsearch"
- | "redis_string";
+ | "redis_string"
+ | "kinesis";
name: string;
annotations: Record;
status: "active" | "paused" | "disabled";
@@ -210,6 +211,18 @@ export type ElasticsearchConsumer = BaseConsumer & {
};
};
+// Kinesis specific sink
+export type KinesisConsumer = BaseConsumer & {
+ sink: {
+ type: "kinesis";
+ stream_name: string;
+ region: string;
+ access_key_id: string;
+ secret_access_key: string;
+ partition_key_field: string;
+ };
+};
+
// Union type for all consumer types
export type Consumer =
| HttpPushConsumer
@@ -224,4 +237,5 @@ export type Consumer =
| TypesenseConsumer
| SnsConsumer
| ElasticsearchConsumer
- | RedisStringConsumer;
+ | RedisStringConsumer
+ | KinesisConsumer;
diff --git a/assets/svelte/sinks/kinesis/KinesisIcon.svelte b/assets/svelte/sinks/kinesis/KinesisIcon.svelte
new file mode 100644
index 000000000..5914bac84
--- /dev/null
+++ b/assets/svelte/sinks/kinesis/KinesisIcon.svelte
@@ -0,0 +1,30 @@
+
diff --git a/assets/svelte/sinks/kinesis/KinesisSinkCard.svelte b/assets/svelte/sinks/kinesis/KinesisSinkCard.svelte
new file mode 100644
index 000000000..70024e882
--- /dev/null
+++ b/assets/svelte/sinks/kinesis/KinesisSinkCard.svelte
@@ -0,0 +1,55 @@
+
+
+
+
+
+
Kinesis Configuration
+
+
+
+
+
+
+
Region
+
+ {consumer.sink.region}
+
+
+
+
+
Partition Key Field
+
+ {consumer.sink.partition_key_field}
+
+
+
+
+
Stream Name
+
+ {consumer.sink.stream_name}
+
+
+
+
+
diff --git a/assets/svelte/sinks/kinesis/KinesisSinkForm.svelte b/assets/svelte/sinks/kinesis/KinesisSinkForm.svelte
new file mode 100644
index 000000000..65d42f7ab
--- /dev/null
+++ b/assets/svelte/sinks/kinesis/KinesisSinkForm.svelte
@@ -0,0 +1,92 @@
+
+
+
+
+ Kinesis Configuration
+
+
+
+
+
+ {#if errors.sink?.stream_name}
+
{errors.sink.stream_name}
+ {/if}
+
+
+
+
+
+ {#if errors.sink?.region}
+
{errors.sink.region}
+ {/if}
+
+
+
+
+
+
+ Field to use for Kinesis partition key. Defaults to "id" if not specified.
+
+ {#if errors.sink?.partition_key_field}
+
{errors.sink.partition_key_field}
+ {/if}
+
+
+
+
+
+ {#if errors.sink?.access_key_id}
+
+ {errors.sink.access_key_id}
+
+ {/if}
+
+
+
+
+
+ {#if errors.sink?.secret_access_key}
+
+ {errors.sink.secret_access_key}
+
+ {/if}
+
+
+
diff --git a/lib/sequin/consumers/guards.ex b/lib/sequin/consumers/guards.ex
index 61e2d7823..55b7189f0 100644
--- a/lib/sequin/consumers/guards.ex
+++ b/lib/sequin/consumers/guards.ex
@@ -7,6 +7,7 @@ defmodule Sequin.Consumers.Guards do
alias Sequin.Consumers.ConsumerRecord
alias Sequin.Consumers.RedisStreamSink
alias Sequin.Consumers.RedisStringSink
+ alias Sequin.Consumers.KinesisSink
@doc """
Guard that checks if the given term is either a ConsumerEvent or a ConsumerRecord.
@@ -16,4 +17,7 @@ defmodule Sequin.Consumers.Guards do
defguard is_redis_sink(sink)
when is_struct(sink, RedisStreamSink) or is_struct(sink, RedisStringSink)
+
+ defguard is_kinesis_sink(sink)
+ when is_struct(sink, KinesisSink)
end
diff --git a/lib/sequin/consumers/kinesis_sink.ex b/lib/sequin/consumers/kinesis_sink.ex
new file mode 100644
index 000000000..c9324a2c8
--- /dev/null
+++ b/lib/sequin/consumers/kinesis_sink.ex
@@ -0,0 +1,44 @@
+defmodule Sequin.Consumers.KinesisSink do
+ @moduledoc "Represents configuration for sinking events to an AWS Kinesis Data Stream."
+ use Ecto.Schema
+ use TypedEctoSchema
+
+ import Ecto.Changeset
+
+ alias Sequin.Aws.HttpClient
+ alias Sequin.Encrypted
+
+ @derive {Jason.Encoder, only: [:stream_name, :region]}
+ @derive {Inspect, except: [:secret_access_key]}
+ @primary_key false
+ typed_embedded_schema do
+ field :type, Ecto.Enum, values: [:kinesis], default: :kinesis
+ field :stream_name, :string
+ field :region, :string
+ field :access_key_id, :string
+ field :secret_access_key, Encrypted.Field
+ field :partition_key_field, :string, default: "id"
+ end
+
+ def changeset(struct, params) do
+ struct
+ |> cast(params, [:stream_name, :region, :access_key_id, :secret_access_key, :partition_key_field])
+ |> validate_required([:stream_name, :region, :access_key_id, :secret_access_key])
+ |> validate_stream_name()
+ |> validate_length(:partition_key_field, max: 100)
+ end
+
+ defp validate_stream_name(changeset) do
+ changeset
+ |> validate_format(:stream_name, ~r/^[a-zA-Z0-9_.-]+$/,
+ message: "must contain only alphanumeric characters, hyphens, underscores, and periods"
+ )
+ |> validate_length(:stream_name, min: 1, max: 128)
+ end
+
+ def aws_client(%__MODULE__{} = sink) do
+ sink.access_key_id
+ |> AWS.Client.create(sink.secret_access_key, sink.region)
+ |> HttpClient.put_client()
+ end
+end
diff --git a/lib/sequin/runtime/kinesis_pipeline.ex b/lib/sequin/runtime/kinesis_pipeline.ex
new file mode 100644
index 000000000..918f8e79a
--- /dev/null
+++ b/lib/sequin/runtime/kinesis_pipeline.ex
@@ -0,0 +1,66 @@
+defmodule Sequin.Runtime.KinesisPipeline do
+ @moduledoc false
+ alias Sequin.Consumers.KinesisSink
+ alias Sequin.Consumers.SinkConsumer
+ alias Sequin.Runtime.SinkPipeline
+ alias Sequin.Transforms.Message
+
+ def send(%SinkConsumer{sink: %KinesisSink{} = sink} = consumer, messages) when is_list(messages) do
+ client = KinesisSink.aws_client(sink)
+
+ messages
+ |> Stream.map(fn msg ->
+ case Message.to_external(consumer, msg.data) do
+ {:ok, data} -> {msg, data}
+ {:error, reason} -> {msg, {:error, reason}}
+ end
+ end)
+ |> Stream.filter(fn {_, data} -> !match?({:error, _}, data) end)
+ |> Stream.map(fn {msg, data} ->
+ partition_key = extract_partition_key(data, sink.partition_key_field)
+
+ {msg, %{
+ "Data" => Jason.encode!(data),
+ "PartitionKey" => partition_key,
+ "StreamName" => sink.stream_name
+ }}
+ end)
+ |> SinkPipeline.batch_and_send_to_service("Kinesis", client, &put_records/2)
+ end
+
+ defp extract_partition_key(data, partition_key_field) do
+ cond do
+ is_map(data) && Map.has_key?(data, partition_key_field) ->
+ to_string(data[partition_key_field])
+
+ is_map(data) && Map.has_key?(data, String.to_atom(partition_key_field)) ->
+ to_string(data[String.to_atom(partition_key_field)])
+
+ true ->
+ # If the key doesn't exist, generate a random partition key
+ :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower)
+ end
+ end
+
+ defp put_records(client, records) do
+ request_body = %{
+ "Records" => records
+ }
+
+ case AWS.Kinesis.put_records(client, request_body) do
+ {:ok, %{"FailedRecordCount" => 0}} ->
+ {:ok, length(records)}
+
+ {:ok, %{"FailedRecordCount" => failed_count, "Records" => failed_records}} ->
+ failed_indices = failed_records
+ |> Enum.with_index()
+ |> Enum.filter(fn {record, _index} -> Map.has_key?(record, "ErrorCode") end)
+ |> Enum.map(fn {_record, index} -> index end)
+
+ {:partial_error, length(records) - failed_count, failed_indices}
+
+ {:error, error} ->
+ {:error, error}
+ end
+ end
+end
diff --git a/lib/sequin/runtime/sink_pipeline.ex b/lib/sequin/runtime/sink_pipeline.ex
index bf4422f9d..2dfc1bded 100644
--- a/lib/sequin/runtime/sink_pipeline.ex
+++ b/lib/sequin/runtime/sink_pipeline.ex
@@ -319,6 +319,7 @@ defmodule Sequin.Runtime.SinkPipeline do
:gcp_pubsub -> Sequin.Runtime.GcpPubsubPipeline
:http_push -> Sequin.Runtime.HttpPushPipeline
:kafka -> Sequin.Runtime.KafkaPipeline
+ :kinesis -> Sequin.Runtime.KinesisPipeline
:nats -> Sequin.Runtime.NatsPipeline
:rabbitmq -> Sequin.Runtime.RabbitMqPipeline
:redis_stream -> Sequin.Runtime.RedisStreamPipeline
diff --git a/lib/sequin/transforms/transforms.ex b/lib/sequin/transforms/transforms.ex
index be9ef7031..9fbe1b88f 100644
--- a/lib/sequin/transforms/transforms.ex
+++ b/lib/sequin/transforms/transforms.ex
@@ -11,6 +11,7 @@ defmodule Sequin.Transforms do
alias Sequin.Consumers.HttpEndpoint
alias Sequin.Consumers.HttpPushSink
alias Sequin.Consumers.KafkaSink
+ alias Sequin.Consumers.KinesisSink
alias Sequin.Consumers.NatsSink
alias Sequin.Consumers.PathTransform
alias Sequin.Consumers.RabbitMqSink
@@ -242,6 +243,17 @@ defmodule Sequin.Transforms do
})
end
+ def to_external(%KinesisSink{} = sink, show_sensitive) do
+ Sequin.Map.reject_nil_values(%{
+ type: "kinesis",
+ stream_name: sink.stream_name,
+ region: sink.region,
+ access_key_id: maybe_obfuscate(sink.access_key_id, show_sensitive),
+ secret_access_key: maybe_obfuscate(sink.secret_access_key, show_sensitive),
+ partition_key_field: sink.partition_key_field
+ })
+ end
+
def to_external(%GcpPubsubSink{} = sink, _show_sensitive) do
Sequin.Map.reject_nil_values(%{
type: "gcp_pubsub",