Skip to content

Kinesis sink (2) #1543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assets/svelte/consumers/ShowSink.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
TypesenseConsumer,
ElasticsearchConsumer,
RedisStringConsumer,
KinesisConsumer,
} from "./types";
import SinkCardHttpPush from "../components/SinkCardHttpPush.svelte";
import SqsSinkCard from "../sinks/sqs/SqsSinkCard.svelte";
Expand All @@ -40,6 +41,7 @@
import TypesenseSinkCard from "../sinks/typesense/TypesenseSinkCard.svelte";
import ElasticsearchSinkCard from "../sinks/elasticsearch/ElasticsearchSinkCard.svelte";
import RedisStringSinkCard from "../sinks/redis-string/RedisStringSinkCard.svelte";
import KinesisSinkCard from "../sinks/kinesis/KinesisSinkCard.svelte";
import * as d3 from "d3";
import { onMount } from "svelte";
import HealthAlerts from "$lib/health/HealthAlerts.svelte";
Expand Down Expand Up @@ -135,6 +137,12 @@
return consumer.sink.type === "redis_string";
}

function isKinesisConsumer(
consumer: Consumer,
): consumer is KinesisConsumer {
return consumer.sink.type === "kinesis";
}

function isNatsConsumer(consumer: Consumer): consumer is NatsConsumer {
return consumer.sink.type === "nats";
}
Expand Down Expand Up @@ -1179,6 +1187,8 @@
<ElasticsearchSinkCard {consumer} />
{:else if isRedisStringConsumer(consumer)}
<RedisStringSinkCard {consumer} />
{:else if isKinesisConsumer(consumer)}
<KinesisSinkCard {consumer} />
{/if}

<ShowSequence {consumer} />
Expand Down
3 changes: 3 additions & 0 deletions assets/svelte/consumers/ShowSinkHeader.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import AzureEventHubIcon from "../sinks/azure_event_hub/AzureEventHubIcon.svelte";
import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte";
import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte";
import KinesisIcon from "../sinks/kinesis/KinesisIcon.svelte";
import StopSinkModal from "./StopSinkModal.svelte";

export let consumer;
Expand Down Expand Up @@ -156,6 +157,8 @@
<ElasticsearchIcon class="h-6 w-6 mr-2" />
{:else if consumer.sink.type === "sns"}
<SnsIcon class="h-6 w-6 mr-2" />
{:else if consumer.sink.type === "kinesis"}
<KinesisIcon class="h-6 w-6 mr-2" />
{/if}
<h1 class="text-xl font-semibold">
{consumer.name}
Expand Down
3 changes: 3 additions & 0 deletions assets/svelte/consumers/SinkConsumerForm.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import { CircleAlert, Info, Plus } from "lucide-svelte";
import TypesenseSinkForm from "$lib/sinks/typesense/TypesenseSinkForm.svelte";
import ElasticsearchSinkForm from "$lib/sinks/elasticsearch/ElasticsearchSinkForm.svelte";
import KinesisSinkForm from "$lib/sinks/kinesis/KinesisSinkForm.svelte";
import * as Alert from "$lib/components/ui/alert/index.js";
import TableSelector from "../components/TableSelector.svelte";
import * as Tooltip from "$lib/components/ui/tooltip";
Expand Down Expand Up @@ -702,6 +703,8 @@
<TypesenseSinkForm errors={errors.consumer} bind:form />
{:else if consumer.type === "elasticsearch"}
<ElasticsearchSinkForm errors={errors.consumer} bind:form />
{:else if consumer.type === "kinesis"}
<KinesisSinkForm errors={errors.consumer} bind:form />
{/if}

<Card>
Expand Down
9 changes: 8 additions & 1 deletion assets/svelte/consumers/SinkIndex.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte";
import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte";
import KinesisIcon from "../sinks/kinesis/KinesisIcon.svelte";

import { Badge } from "$lib/components/ui/badge";
import * as d3 from "d3";
Expand All @@ -50,7 +51,8 @@
| "nats"
| "rabbitmq"
| "typesense"
| "elasticsearch";
| "elasticsearch"
| "kinesis";

status: "active" | "disabled" | "paused";
database_name: string;
Expand Down Expand Up @@ -136,6 +138,11 @@
name: "Elasticsearch",
icon: ElasticsearchIcon,
},
{
id: "kinesis",
name: "Amazon Kinesis",
icon: KinesisIcon,
},
];

function handleConsumerClick(id: string, type: string) {
Expand Down
18 changes: 16 additions & 2 deletions assets/svelte/consumers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ export type BaseConsumer = {
| "rabbitmq"
| "typesense"
| "elasticsearch"
| "redis_string";
| "redis_string"
| "kinesis";
name: string;
annotations: Record<string, boolean>;
status: "active" | "paused" | "disabled";
Expand Down Expand Up @@ -210,6 +211,18 @@ export type ElasticsearchConsumer = BaseConsumer & {
};
};

// Kinesis specific sink
export type KinesisConsumer = BaseConsumer & {
sink: {
type: "kinesis";
stream_name: string;
region: string;
access_key_id: string;
secret_access_key: string;
partition_key_field: string;
};
};

// Union type for all consumer types
export type Consumer =
| HttpPushConsumer
Expand All @@ -224,4 +237,5 @@ export type Consumer =
| TypesenseConsumer
| SnsConsumer
| ElasticsearchConsumer
| RedisStringConsumer;
| RedisStringConsumer
| KinesisConsumer;
30 changes: 30 additions & 0 deletions assets/svelte/sinks/kinesis/KinesisIcon.svelte
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<svg
viewBox="0 0 80 80"
version="1.1"
xmlns="http://www.w3.org/2000/svg"
xmlns:xlink="http://www.w3.org/1999/xlink"
class={$$props.class}
>
<defs>
<linearGradient
x1="0%"
y1="100%"
x2="100%"
y2="0%"
id="kinesis-icon-gradient-1"
>
<stop stop-color="#4D27A8" offset="0%"></stop>
<stop stop-color="#8D62EC" offset="100%"></stop>
</linearGradient>
</defs>
<g stroke="none" stroke-width="1" fill="none" fill-rule="evenodd">
<g fill="url(#kinesis-icon-gradient-1)">
<rect id="Rectangle" x="0" y="0" width="80" height="80"></rect>
</g>
<path
d="M13,40 C13,37.8 14.8,36 17,36 C19.2,36 21,37.8 21,40 C21,42.2 19.2,44 17,44 C14.8,44 13,42.2 13,40 M57,68 C54.8,68 53,66.2 53,64 C53,61.8 54.8,60 57,60 C59.2,60 61,61.8 61,64 C61,66.2 59.2,68 57,68 M63,24 C60.8,24 59,22.2 59,20 C59,17.8 60.8,16 63,16 C65.2,16 67,17.8 67,20 C67,22.2 65.2,24 63,24 M40,64 C38.9,64 38,63.1 38,62 C38,60.9 38.9,60 40,60 C41.1,60 42,60.9 42,62 C42,63.1 41.1,64 40,64 M40,24 C38.9,24 38,23.1 38,22 C38,20.9 38.9,20 40,20 C41.1,20 42,20.9 42,22 C42,23.1 41.1,24 40,24 M23,36 L19,26 L17,26 C13.7,26 11,28.7 11,32 L11,48 C11,51.3 13.7,54 17,54 L23,54 L23,50 L17,50 C15.9,50 15,49.1 15,48 L15,32 C15,30.9 15.9,30 17,30 L22.3,30 L26.3,40 L23,50 L17,54 L23,54 L27,44 L31,54 L40,54 L40,50 L33.7,50 L29.7,40 L33.7,30 L40,30 L40,26 L31,26 L27,36 L23,36 Z M57,54 L57,50 C58.1,50 59,49.1 59,48 L59,32 C59,30.9 58.1,30 57,30 L40,30 L40,34 L57,34 L57,46 L40,46 L40,50 L57,50 M23,36 L27,36 L29.7,40 L26.3,40 L23,36 Z M57,54 C53.7,54 51,51.3 51,48 L51,32 C51,28.7 53.7,26 57,26 L63,26 L63,30 L57,30 M63,54 L63,50 L67,50 L67,30 L63,30 L63,26 L69,26 L69,54 L63,54 Z M15.1,22.7 C16.9,18.3 20.2,14.7 24.4,12.6 C28.8,10.2 33.8,9.7 38.5,11 C47.7,14.1 54,22.9 54,32.7 C54,36.8 52.8,41 50.6,44.5 L47.3,42.7 C49.1,39.8 50,36.2 50,32.7 C50,24.9 45,17.9 37.6,15.4 C33.9,14.2 29.9,14.7 26.4,16.6 C23.2,18.2 20.6,21 19.2,24.3 L15.1,22.7 Z M22,57.3 C20.2,61.7 16.9,65.3 12.7,67.4 C8.3,69.8 3.3,70.3 -1.4,69 C-10.6,65.9 -16.9,57.1 -16.9,47.3 C-16.9,43.2 -15.7,39 -13.5,35.5 L-10.2,37.3 C-12,40.2 -12.9,43.8 -12.9,47.3 C-12.9,55.1 -7.9,62.1 -0.5,64.6 C3.2,65.8 7.2,65.3 10.7,63.4 C13.9,61.8 16.5,59 17.9,55.7 L22,57.3 Z"
id="AWS-Kinesis_Icon_64_Squid"
fill="#FFFFFF"
></path>
</g>
</svg>
55 changes: 55 additions & 0 deletions assets/svelte/sinks/kinesis/KinesisSinkCard.svelte
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<script lang="ts">
import { ExternalLink } from "lucide-svelte";
import { Card, CardContent } from "$lib/components/ui/card";
import { Button } from "$lib/components/ui/button";
import type { KinesisConsumer } from "../../consumers/types";

export let consumer: KinesisConsumer;
</script>

<Card>
<CardContent class="p-6">
<div class="flex justify-between items-center mb-4">
<h2 class="text-lg font-semibold">Kinesis Configuration</h2>
<div class="flex gap-2">
<a
href={`https://${consumer.sink.region}.console.aws.amazon.com/kinesis/home?region=${consumer.sink.region}#/streams/details/${encodeURIComponent(consumer.sink.stream_name)}/monitoring`}
target="_blank"
rel="noopener noreferrer"
>
<Button variant="outline" size="sm">
<ExternalLink class="h-4 w-4 mr-2" />
View in AWS Console
</Button>
</a>
</div>
</div>

<div class="flex flex-col gap-4">
<div class="grid grid-cols-2 gap-4">
<div>
<span class="text-sm text-gray-500">Region</span>
<div class="mt-2">
<span>{consumer.sink.region}</span>
</div>
</div>

<div>
<span class="text-sm text-gray-500">Partition Key Field</span>
<div class="mt-2">
<span>{consumer.sink.partition_key_field}</span>
</div>
</div>
</div>
<div>
<span class="text-sm text-gray-500">Stream Name</span>
<div class="mt-2">
<span
class="font-mono bg-slate-50 pl-1 pr-4 py-1 border border-slate-100 rounded-md whitespace-nowrap"
>{consumer.sink.stream_name}</span
>
</div>
</div>
</div>
</CardContent>
</Card>
92 changes: 92 additions & 0 deletions assets/svelte/sinks/kinesis/KinesisSinkForm.svelte
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<script lang="ts">
import { Input } from "$lib/components/ui/input";
import {
Card,
CardContent,
CardHeader,
CardTitle,
} from "$lib/components/ui/card";
import { Label } from "$lib/components/ui/label";

export let form;
export let errors: any = {};
</script>

<Card>
<CardHeader>
<CardTitle>Kinesis Configuration</CardTitle>
</CardHeader>
<CardContent class="flex flex-col gap-4">
<div class="flex flex-col gap-2">
<Label for="stream-name">Stream Name</Label>
<Input
id="stream-name"
bind:value={form.sink.stream_name}
placeholder="my-kinesis-stream"
/>
{#if errors.sink?.stream_name}
<p class="text-destructive text-sm">{errors.sink.stream_name}</p>
{/if}
</div>

<div class="flex flex-col gap-2">
<Label for="region">AWS Region</Label>
<Input
id="region"
bind:value={form.sink.region}
placeholder="us-east-1"
/>
{#if errors.sink?.region}
<p class="text-destructive text-sm">{errors.sink.region}</p>
{/if}
</div>

<div class="flex flex-col gap-2">
<Label for="partition-key-field">Partition Key Field</Label>
<Input
id="partition-key-field"
bind:value={form.sink.partition_key_field}
placeholder="id"
/>
<p class="text-sm text-muted-foreground">
Field to use for Kinesis partition key. Defaults to "id" if not specified.
</p>
{#if errors.sink?.partition_key_field}
<p class="text-destructive text-sm">{errors.sink.partition_key_field}</p>
{/if}
</div>

<div class="flex flex-col gap-2">
<Label for="access-key">AWS Access Key ID</Label>
<Input
id="access-key"
data-1p-ignore
bind:value={form.sink.access_key_id}
placeholder="Enter your AWS access key ID"
autocomplete="off"
/>
{#if errors.sink?.access_key_id}
<p class="text-destructive text-sm">
{errors.sink.access_key_id}
</p>
{/if}
</div>

<div class="flex flex-col gap-2">
<Label for="secret-key">AWS Secret Access Key</Label>
<Input
id="secret-key"
data-1p-ignore
type="password"
bind:value={form.sink.secret_access_key}
placeholder="Enter your AWS secret access key"
autocomplete="off"
/>
{#if errors.sink?.secret_access_key}
<p class="text-destructive text-sm">
{errors.sink.secret_access_key}
</p>
{/if}
</div>
</CardContent>
</Card>
4 changes: 4 additions & 0 deletions lib/sequin/consumers/guards.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Sequin.Consumers.Guards do
alias Sequin.Consumers.ConsumerRecord
alias Sequin.Consumers.RedisStreamSink
alias Sequin.Consumers.RedisStringSink
alias Sequin.Consumers.KinesisSink

@doc """
Guard that checks if the given term is either a ConsumerEvent or a ConsumerRecord.
Expand All @@ -16,4 +17,7 @@ defmodule Sequin.Consumers.Guards do

defguard is_redis_sink(sink)
when is_struct(sink, RedisStreamSink) or is_struct(sink, RedisStringSink)

defguard is_kinesis_sink(sink)
when is_struct(sink, KinesisSink)
end
44 changes: 44 additions & 0 deletions lib/sequin/consumers/kinesis_sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Sequin.Consumers.KinesisSink do
@moduledoc "Represents configuration for sinking events to an AWS Kinesis Data Stream."
use Ecto.Schema
use TypedEctoSchema

import Ecto.Changeset

alias Sequin.Aws.HttpClient
alias Sequin.Encrypted

@derive {Jason.Encoder, only: [:stream_name, :region]}
@derive {Inspect, except: [:secret_access_key]}
@primary_key false
typed_embedded_schema do
field :type, Ecto.Enum, values: [:kinesis], default: :kinesis
field :stream_name, :string
field :region, :string
field :access_key_id, :string
field :secret_access_key, Encrypted.Field
field :partition_key_field, :string, default: "id"
end

def changeset(struct, params) do
struct
|> cast(params, [:stream_name, :region, :access_key_id, :secret_access_key, :partition_key_field])
|> validate_required([:stream_name, :region, :access_key_id, :secret_access_key])
|> validate_stream_name()
|> validate_length(:partition_key_field, max: 100)
end

defp validate_stream_name(changeset) do
changeset
|> validate_format(:stream_name, ~r/^[a-zA-Z0-9_.-]+$/,
message: "must contain only alphanumeric characters, hyphens, underscores, and periods"
)
|> validate_length(:stream_name, min: 1, max: 128)
end

def aws_client(%__MODULE__{} = sink) do
sink.access_key_id
|> AWS.Client.create(sink.secret_access_key, sink.region)
|> HttpClient.put_client()
end
end
Loading
Loading