Skip to content

Commit 448099c

Browse files
committed
refactor
1 parent c56d18d commit 448099c

File tree

6 files changed

+136
-103
lines changed

6 files changed

+136
-103
lines changed

php-src/ConnectionRepository.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Georgeboot\LaravelEchoApiGateway;
4+
5+
use Aws\ApiGatewayManagementApi\ApiGatewayManagementApiClient;
6+
use Illuminate\Support\Str;
7+
8+
class ConnectionRepository
9+
{
10+
protected ApiGatewayManagementApiClient $apiGatewayManagementApiClient;
11+
12+
public function __construct(array $config)
13+
{
14+
$this->apiGatewayManagementApiClient = new ApiGatewayManagementApiClient(array_merge($config['connection'], [
15+
'version' => '2018-11-29',
16+
'endpoint' => Str::replaceFirst('wss://', 'https://', $config['endpoint']),
17+
]));
18+
}
19+
20+
public function sendMessage(string $connectionId, string $data): void
21+
{
22+
$this->apiGatewayManagementApiClient->postToConnection([
23+
'ConnectionId' => $connectionId,
24+
'Data' => $data,
25+
]);
26+
}
27+
}

php-src/Handler.php

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,23 @@
22

33
namespace Georgeboot\LaravelEchoApiGateway;
44

5-
use Aws\DynamoDb\DynamoDbClient;
65
use Bref\Context\Context;
76
use Bref\Event\ApiGateway\WebsocketEvent;
87
use Bref\Event\ApiGateway\WebsocketHandler;
98
use Bref\Event\Http\HttpResponse;
109
use Illuminate\Contracts\Debug\ExceptionHandler;
11-
use Illuminate\Support\Arr;
1210
use Illuminate\Support\Str;
1311
use Throwable;
1412

1513
class Handler extends WebsocketHandler
1614
{
1715
protected ExceptionHandler $exceptionHandler;
18-
protected DynamoDbClient $dynamoDb;
19-
protected string $table;
16+
protected SubscriptionRepository $connectionRepository;
2017

21-
public function __construct(ExceptionHandler $exceptionHandler)
18+
public function __construct(ExceptionHandler $exceptionHandler, SubscriptionRepository $connectionRepository)
2219
{
23-
$config = config('laravel-echo-api-gateway');
24-
2520
$this->exceptionHandler = $exceptionHandler;
26-
$this->dynamoDb = $this->getDynamoDbClient($config);
27-
$this->table = $config['table'];
28-
}
29-
30-
protected function getDynamoDbClient(array $config): DynamoDbClient
31-
{
32-
return new DynamoDbClient(array_merge($config['connection'], [
33-
'version' => '2012-08-10',
34-
]));
21+
$this->connectionRepository = $connectionRepository;
3522
}
3623

3724
public function handleWebsocket(WebsocketEvent $event, Context $context): HttpResponse
@@ -53,22 +40,7 @@ public function handleWebsocket(WebsocketEvent $event, Context $context): HttpRe
5340

5441
protected function handleDisconnect(WebsocketEvent $event, Context $context): HttpResponse
5542
{
56-
$response = $this->dynamoDb->query([
57-
'TableName' => $this->table,
58-
'IndexName' => 'lookup-by-connection',
59-
'KeyConditionExpression' => 'connectionId = :connectionId',
60-
'ExpressionAttributeValues' => [
61-
':connectionI' => ['S' => $event->getConnectionId()],
62-
],
63-
]);
64-
65-
$this->dynamoDb->batchWriteItem([
66-
$this->table => collect($response['Items'])->map(fn($item) => [
67-
'DeleteRequest' => [
68-
'Key' => Arr::only($item, ['connectionId', 'channel']),
69-
],
70-
])->toArray(),
71-
]);
43+
$this->connectionRepository->clearConnection($event->getConnectionId());
7244

7345
return new HttpResponse('OK');
7446
}
@@ -142,13 +114,7 @@ protected function subscribe(WebsocketEvent $event, Context $context): HttpRespo
142114
}
143115
}
144116

145-
$this->dynamoDb->putItem([
146-
'TableName' => $this->table,
147-
'Item' => [
148-
'connectionId' => ['S' => $event->getConnectionId()],
149-
'channel' => ['S' => $channel],
150-
],
151-
]);
117+
$this->connectionRepository->subscribeToChannel($event->getConnectionId(), $channel);
152118

153119
return new HttpResponse(json_encode([
154120
'event' => 'subscription_succeeded',
@@ -162,13 +128,7 @@ protected function unsubscribe(WebsocketEvent $event, Context $context): HttpRes
162128
$eventBody = json_decode($event->getBody(), true);
163129
$channel = $eventBody['data']['channel'];
164130

165-
$this->dynamoDb->deleteItem([
166-
'TableName' => $this->table,
167-
'Key' => [
168-
'connectionId' => ['S' => $event->getConnectionId()],
169-
'channel' => ['S' => $channel],
170-
],
171-
]);
131+
$this->connectionRepository->unsubscribeFromChannel($event->getConnectionId(), $channel);
172132

173133
return new HttpResponse(json_encode([
174134
'event' => 'unsubscription_succeeded',

php-src/Jobs/QueueMessageToChannels.php

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22

33
namespace Georgeboot\LaravelEchoApiGateway\Jobs;
44

5-
use Aws\DynamoDb\DynamoDbClient;
6-
use GuzzleHttp\Promise\Utils;
5+
use Georgeboot\LaravelEchoApiGateway\SubscriptionRepository;
76
use Illuminate\Bus\Queueable;
87
use Illuminate\Contracts\Queue\ShouldQueue;
98
use Illuminate\Foundation\Bus\Dispatchable;
109
use Illuminate\Queue\InteractsWithQueue;
1110
use Illuminate\Queue\SerializesModels;
12-
use Illuminate\Support\Collection;
1311

1412
class QueueMessageToChannels implements ShouldQueue
1513
{
@@ -18,55 +16,21 @@ class QueueMessageToChannels implements ShouldQueue
1816
use Queueable;
1917
use SerializesModels;
2018

21-
protected DynamoDbClient $dynamoDb;
22-
protected string $table;
2319
protected array $channels;
2420
protected string $data;
2521
protected ?string $skipConnectionId;
2622

2723
public function __construct(array $channels, string $data, string $skipConnectionId = null)
2824
{
29-
$config = config('laravel-echo-api-gateway');
30-
31-
$this->dynamoDb = $this->getDynamoDbClient($config);
32-
$this->table = $config['table'];
33-
3425
$this->channels = $channels;
3526
$this->data = $data;
3627
$this->skipConnectionId = $skipConnectionId;
3728
}
3829

39-
protected function getDynamoDbClient(array $config): DynamoDbClient
40-
{
41-
return new DynamoDbClient(array_merge($config['connection'], [
42-
'version' => '2012-08-10',
43-
]));
44-
}
45-
46-
47-
public function handle()
30+
public function handle(SubscriptionRepository $connectionRepository)
4831
{
49-
$this->getConnectionIdsForChannels($this->channels)
32+
$connectionRepository->getConnectionIdsForChannels($this->channels)
5033
->reject(fn($connectionId) => $connectionId === $this->skipConnectionId)
5134
->each(fn($connectionId) => dispatch(new SendMessageToConnection($connectionId, $this->data)));
5235
}
53-
54-
protected function getConnectionIdsForChannels(string ...$channels): Collection
55-
{
56-
$promises = collect($channels)->map(fn($channel) => $this->dynamoDb->queryAsync([
57-
'TableName' => $this->table,
58-
'IndexName' => 'lookup-by-channel',
59-
'KeyConditionExpression' => 'channel = :channel',
60-
'ExpressionAttributeValues' => [
61-
':channel' => ['S' => $channel],
62-
],
63-
]))->toArray();
64-
65-
$responses = Utils::all($promises)->wait();
66-
67-
return collect($responses)
68-
->flatmap(fn($result) => $result['Items'])
69-
->map(fn($item) => $item['connectionId']['S'])
70-
->unique();
71-
}
7236
}

php-src/Jobs/SendMessageToConnection.php

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22

33
namespace Georgeboot\LaravelEchoApiGateway\Jobs;
44

5-
use Aws\ApiGatewayManagementApi\ApiGatewayManagementApiClient;
65
use Aws\ApiGatewayManagementApi\Exception\ApiGatewayManagementApiException;
6+
use Georgeboot\LaravelEchoApiGateway\ConnectionRepository;
77
use Illuminate\Bus\Queueable;
88
use Illuminate\Contracts\Queue\ShouldQueue;
99
use Illuminate\Foundation\Bus\Dispatchable;
1010
use Illuminate\Queue\InteractsWithQueue;
1111
use Illuminate\Queue\SerializesModels;
12-
use Illuminate\Support\Str;
1312

1413
class SendMessageToConnection implements ShouldQueue
1514
{
@@ -18,30 +17,19 @@ class SendMessageToConnection implements ShouldQueue
1817
use Queueable;
1918
use SerializesModels;
2019

21-
protected ApiGatewayManagementApiClient $apiGatewayManagementApiClient;
2220
protected string $connectionId;
2321
protected string $data;
2422

2523
public function __construct(string $connectionId, string $data)
2624
{
27-
$config = config('laravel-echo-api-gateway');
28-
29-
$this->apiGatewayManagementApiClient = new ApiGatewayManagementApiClient(array_merge($config['connection'], [
30-
'version' => '2018-11-29',
31-
'endpoint' => Str::replaceFirst('wss://', 'https://', $config['endpoint']),
32-
]));
33-
3425
$this->connectionId = $connectionId;
3526
$this->data = $data;
3627
}
3728

38-
public function handle()
29+
public function handle(ConnectionRepository $connectionRepository)
3930
{
4031
try {
41-
$this->apiGatewayManagementApiClient->postToConnection([
42-
'ConnectionId' => $this->connectionId,
43-
'Data' => $this->data,
44-
]);
32+
$connectionRepository->sendMessage($this->connectionId, $this->data);
4533
} catch (ApiGatewayManagementApiException $exception) {
4634
// $exception->getErrorCode() is one of:
4735
// GoneException

php-src/ServiceProvider.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,24 @@ public function register()
2424
__DIR__ . '/../config/laravel-echo-api-gateway.php' => config_path('laravel-echo-api-gateway.php'),
2525
], 'laravel-echo-api-gateway-config');
2626
}
27+
28+
$this->app->bind(ConnectionRepository::class, function () {
29+
return new ConnectionRepository(
30+
config('laravel-echo-api-gateway')
31+
);
32+
});
33+
34+
$this->app->bind(SubscriptionRepository::class, function () {
35+
return new SubscriptionRepository(
36+
config('laravel-echo-api-gateway')
37+
);
38+
});
2739
}
2840

2941
public function boot(BroadcastManager $broadcastManager)
3042
{
3143
$broadcastManager->extend('laravel-echo-api-gateway', function (): Broadcaster {
32-
return new Driver(
33-
config('laravel-echo-api-gateway')
34-
);
44+
return new Driver();
3545
});
3646
}
3747
}

php-src/SubscriptionRepository.php

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?php
2+
3+
namespace Georgeboot\LaravelEchoApiGateway;
4+
5+
use Aws\DynamoDb\DynamoDbClient;
6+
use GuzzleHttp\Promise\Utils;
7+
use Illuminate\Support\Arr;
8+
use Illuminate\Support\Collection;
9+
10+
class SubscriptionRepository
11+
{
12+
protected DynamoDbClient $dynamoDb;
13+
protected string $table;
14+
15+
public function __construct(array $config)
16+
{
17+
$this->dynamoDb = new DynamoDbClient(array_merge($config['connection'], [
18+
'version' => '2012-08-10',
19+
]));
20+
21+
$this->table = $config['table'];
22+
}
23+
24+
public function getConnectionIdsForChannels(string ...$channels): Collection
25+
{
26+
$promises = collect($channels)->map(fn($channel) => $this->dynamoDb->queryAsync([
27+
'TableName' => $this->table,
28+
'IndexName' => 'lookup-by-channel',
29+
'KeyConditionExpression' => 'channel = :channel',
30+
'ExpressionAttributeValues' => [
31+
':channel' => ['S' => $channel],
32+
],
33+
]))->toArray();
34+
35+
$responses = Utils::all($promises)->wait();
36+
37+
return collect($responses)
38+
->flatmap(fn($result) => $result['Items'])
39+
->map(fn($item) => $item['connectionId']['S'])
40+
->unique();
41+
}
42+
43+
public function clearConnection(string $connectionId): void
44+
{
45+
$response = $this->dynamoDb->query([
46+
'TableName' => $this->table,
47+
'IndexName' => 'lookup-by-connection',
48+
'KeyConditionExpression' => 'connectionId = :connectionId',
49+
'ExpressionAttributeValues' => [
50+
':connectionI' => ['S' => $connectionId],
51+
],
52+
]);
53+
54+
$this->dynamoDb->batchWriteItem([
55+
$this->table => collect($response['Items'])->map(fn($item) => [
56+
'DeleteRequest' => [
57+
'Key' => Arr::only($item, ['connectionId', 'channel']),
58+
],
59+
])->toArray(),
60+
]);
61+
}
62+
63+
public function subscribeToChannel(string $connectionId, string $channel): void
64+
{
65+
$this->dynamoDb->putItem([
66+
'TableName' => $this->table,
67+
'Item' => [
68+
'connectionId' => ['S' => $connectionId],
69+
'channel' => ['S' => $channel],
70+
],
71+
]);
72+
}
73+
74+
public function unsubscribeFromChannel(string $connectionId, string $channel): void
75+
{
76+
$this->dynamoDb->deleteItem([
77+
'TableName' => $this->table,
78+
'Key' => [
79+
'connectionId' => ['S' => $connectionId],
80+
'channel' => ['S' => $channel],
81+
],
82+
]);
83+
}
84+
}

0 commit comments

Comments
 (0)