Skip to content

Commit 0c05bc7

Browse files
committed
fix(sse): replace Coroutine::sleep(1) with event-driven channel blocking
The SSE loop was sleeping for a full second between patch checks, causing visible lag for anything updating faster than 1 Hz (e.g. game-of-life at 200 ms). Replace the fixed sleep with a blocking Channel::pop() timeout so the loop wakes up immediately when a patch is pushed and only paces itself when the queue is empty. - PatchManager: cache poll timeout as $pollTimeout (float seconds) in constructor; use it in getPatch() instead of the previous non-blocking pop(0.01) - Config: add $ssePollIntervalMs (default 100 ms) with withSsePollIntervalMs() / getSsePollIntervalMs() for tuning - SseHandler: remove Coroutine::sleep(1) and now-unused Coroutine import
1 parent dcc188b commit 0c05bc7

3 files changed

Lines changed: 33 additions & 9 deletions

File tree

src/Config.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ class Config {
2020
/** @var array<string, mixed> */
2121
private array $openSwooleSettings = [];
2222

23+
/** Poll interval for the SSE loop in milliseconds (default 100 ms). */
24+
private int $ssePollIntervalMs = 100;
25+
2326
/**
2427
* Set basePath from reverse proxy header.
2528
* Called on first request with X-Base-Path header from Caddy.
@@ -78,6 +81,21 @@ public function withBasePath(string $basePath): self {
7881
return $this;
7982
}
8083

84+
/**
85+
* How long the SSE loop blocks waiting for a patch before looping again.
86+
* Lower values increase responsiveness; higher values reduce CPU overhead.
87+
* Default: 100 ms.
88+
*/
89+
public function withSsePollIntervalMs(int $ms): self {
90+
$this->ssePollIntervalMs = max(1, $ms);
91+
92+
return $this;
93+
}
94+
95+
public function getSsePollIntervalMs(): int {
96+
return $this->ssePollIntervalMs;
97+
}
98+
8199
/**
82100
* @param array<string, mixed> $settings
83101
*/

src/Context/PatchManager.php

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class PatchManager {
2323
/** @var null|array<string, mixed>|Channel */
2424
private array|Channel|null $patchChannel = null;
2525
private bool $useArray = false;
26+
private float $pollTimeout;
2627

2728
public function __construct(
2829
private Context $context,
@@ -40,6 +41,8 @@ public function __construct(
4041
$this->patchChannel = new Channel(5);
4142
$this->useArray = false;
4243
}
44+
45+
$this->pollTimeout = $this->app->getConfig()->getSsePollIntervalMs() / 1000.0;
4346
}
4447

4548
/**
@@ -77,6 +80,13 @@ public function queuePatch(array $patch): void {
7780
*
7881
* @return null|array<string, mixed> Next patch data or null if none available
7982
*/
83+
/**
84+
* Get next patch from the queue.
85+
*
86+
* In production, blocks until a patch arrives or the configured SSE poll
87+
* interval elapses — so the SSE loop wakes up immediately on a new patch
88+
* and paces itself otherwise without a separate Coroutine::sleep().
89+
*/
8090
public function getPatch(): ?array {
8191
if ($this->useArray) {
8292
// Array-based queue for tests
@@ -87,12 +97,10 @@ public function getPatch(): ?array {
8797
return array_shift($this->patchChannel);
8898
}
8999

90-
// OpenSwoole Channel for production
91-
if ($this->patchChannel->isEmpty()) {
92-
return null;
93-
}
100+
// OpenSwoole Channel for production — block until a patch arrives or timeout elapses
101+
$result = $this->patchChannel->pop($this->pollTimeout);
94102

95-
return $this->patchChannel->pop(0.01);
103+
return $result !== false ? $result : null;
96104
}
97105

98106
/**

src/Http/SseHandler.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
namespace Mbolli\PhpVia\Http;
66

77
use Mbolli\PhpVia\Via;
8-
use OpenSwoole\Coroutine;
98
use OpenSwoole\Http\Request;
109
use OpenSwoole\Http\Response;
11-
use OpenSwoole\Timer;
1210
use starfederation\datastar\enums\ElementPatchMode;
1311
use starfederation\datastar\ServerSentEventGenerator;
1412

@@ -134,8 +132,8 @@ public function handleSSE(Request $request, Response $response): void {
134132
break;
135133
}
136134
}
137-
138-
Coroutine::sleep(1);
135+
// No explicit sleep needed: getPatch() blocks on the channel for up to 100 ms,
136+
// waking immediately when a patch is available.
139137
}
140138

141139
$this->via->log('debug', "SSE connection closed for context: {$context->getId()}");

0 commit comments

Comments
 (0)