Skip to content

Commit

Permalink
Merge pull request #35 from php-service-bus/connection_monitor
Browse files Browse the repository at this point in the history
Connection monitor
  • Loading branch information
mmasiukevich authored Jan 14, 2022
2 parents d44df87 + 78aa1f9 commit 01d4983
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 25 deletions.
11 changes: 7 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
"require": {
"php": ">=8.0",
"amphp/amp": "v2.6.*",
"amphp/socket": "v1.1.*",
"amphp/socket": "v1.2.*",
"phpinnacle/buffer": "v1.2.*"
},
"require-dev": {
"phpunit/phpunit": "v9.5.*",
"vimeo/psalm": "v4.13.*",
"phpstan/phpstan": "v1.2.*"
"vimeo/psalm": "v4.18.*",
"phpstan/phpstan": "v1.4.*"
},
"prefer-stable": true,
"autoload": {
Expand All @@ -52,6 +52,9 @@
},
"config": {
"sort-packages": true,
"optimize-autoloader": true
"optimize-autoloader": true,
"allow-plugins": {
"composer/package-versions-deprecated": false
}
}
}
2 changes: 1 addition & 1 deletion examples/basic.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
}

yield $channel->consume(function (Message $message, Channel $channel) {
echo $message->content() . \PHP_EOL;
echo $message->content . \PHP_EOL;

yield $channel->ack($message);
}, 'basic_queue');
Expand Down
6 changes: 3 additions & 3 deletions examples/worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
echo '[*] Waiting for messages. To exit press CTRL+C', \PHP_EOL;

$tag = yield $channel->consume(function (Message $message, Channel $channel) {
echo "[x] Received message: {$message->content()}.", \PHP_EOL;
echo "[x] Received message: {$message->content}.", \PHP_EOL;

// Do some work - we generate password hashes with a high cost
// sleep() gets interrupted by Ctrl+C so it's not very good for demos
Expand All @@ -43,12 +43,12 @@
password_hash(random_bytes(255), PASSWORD_BCRYPT, ["cost" => 15]);
}

echo "[x] Done ", $message->content(), \PHP_EOL;
echo "[x] Done ", $message->content, \PHP_EOL;

try {
yield $channel->ack($message);

echo "ACK SUCCESS:: {$message->content()}", \PHP_EOL;
echo "ACK SUCCESS:: {$message->content}", \PHP_EOL;
} catch (\Throwable $error) {

echo "ACK FAILED:: {$error->getMessage()}", \PHP_EOL;
Expand Down
3 changes: 1 addition & 2 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
parameters:
checkMissingIterableValueType: false
checkGenericClassInNonGenericObjectType: false
ignoreErrors:
- '#Cannot cast mixed to int#'

41 changes: 39 additions & 2 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace PHPinnacle\Ridge;

use Amp\Loop;
use function Amp\asyncCall;
use function Amp\call;
use Amp\Deferred;
Expand All @@ -24,6 +25,8 @@ final class Client
private const STATE_CONNECTED = 2;
private const STATE_DISCONNECTING = 3;

private const CONNECTION_MONITOR_INTERVAL = 5000;

/**
* @var Config
*/
Expand Down Expand Up @@ -54,6 +57,11 @@ final class Client
*/
private $properties;

/**
* @var string|null
*/
private $connectionMonitorWatcherId;

public function __construct(Config $config)
{
$this->config = $config;
Expand Down Expand Up @@ -91,7 +99,7 @@ function () {

$this->state = self::STATE_CONNECTING;

$this->connection = new Connection($this->config->uri(), fn() => $this->state = self::STATE_NOT_CONNECTED);
$this->connection = new Connection($this->config->uri());

yield $this->connection->open(
$this->config->timeout,
Expand Down Expand Up @@ -128,10 +136,22 @@ function () {

$this->connection->write($buffer);
$this->connection->close();

$this->disableConnectionMonitor();
}
);

$this->state = self::STATE_CONNECTED;

$this->connectionMonitorWatcherId = Loop::repeat(
self::CONNECTION_MONITOR_INTERVAL,
function(): void
{
if($this->connection->connected() === false) {
throw Exception\ClientException::disconnected();
}
}
);
}
);
}
Expand All @@ -143,6 +163,8 @@ function () {
*/
public function disconnect(int $code = 0, string $reason = ''): Promise
{
$this->disableConnectionMonitor();

return call(
function () use ($code, $reason) {
if (\in_array($this->state, [self::STATE_NOT_CONNECTED, self::STATE_DISCONNECTING])) {
Expand All @@ -153,6 +175,12 @@ function () use ($code, $reason) {
throw Exception\ClientException::notConnected();
}

if($this->connectionMonitorWatcherId !== null){
Loop::cancel($this->connectionMonitorWatcherId);

$this->connectionMonitorWatcherId = null;
}

$this->state = self::STATE_DISCONNECTING;

if ($code === 0) {
Expand Down Expand Up @@ -231,7 +259,7 @@ function () {

public function isConnected(): bool
{
return $this->state === self::STATE_CONNECTED;
return $this->state === self::STATE_CONNECTED && $this->connection->connected();
}

/**
Expand Down Expand Up @@ -422,4 +450,13 @@ static function (Protocol\AbstractFrame $frame) use ($deferred) {

return $deferred->promise();
}

private function disableConnectionMonitor(): void {
if($this->connectionMonitorWatcherId !== null) {

Loop::cancel($this->connectionMonitorWatcherId);

$this->connectionMonitorWatcherId = null;
}
}
}
19 changes: 6 additions & 13 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ final class Connection
*/
private $heartbeatWatcherId;

/**
* @var callable|null
*/
private $connectionLost;

public function __construct(string $uri, ?callable $connectionLost = null)
public function __construct(string $uri)
{
$this->uri = $uri;
$this->parser = new Parser;
$this->connectionLost = $connectionLost;
}

public function connected(): bool
{
return $this->socket !== null && $this->socket->isClosed() === false;
}

/**
Expand Down Expand Up @@ -195,12 +194,10 @@ function (string $watcherId) use ($interval){
}

if (
null !== $this->connectionLost &&
0 !== $this->lastRead &&
$currentTime > ($this->lastRead + $interval + 1000)
)
{
call_user_func($this->connectionLost);
Loop::cancel($watcherId);
}

Expand All @@ -218,10 +215,6 @@ public function close(): void
$this->heartbeatWatcherId = null;
}

if ($this->connectionLost !== null) {
call_user_func($this->connectionLost);
}

if ($this->socket !== null) {
$this->socket->close();
}
Expand Down
4 changes: 4 additions & 0 deletions src/Exception/ClientException.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public static function notConnected(): self
return new self('Client is not connected to server.');
}

public static function disconnected(): self {
return new self('The client was unexpectedly disconnected from the server');
}

public static function alreadyConnected(): self
{
return new self('Client is already connected/connecting.');
Expand Down
2 changes: 2 additions & 0 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ protected function runTestAsync(...$args)

$return = yield call([$this, $this->realTestName], ...$args);

yield $client->disconnect();

$info = Loop::getInfo();
$count = $info['enabled_watchers']['referenced'];

Expand Down

0 comments on commit 01d4983

Please sign in to comment.