Skip to content

Commit 95f78c3

Browse files
committed
Merge pull request #24 from cheprasov/dev-with-versions
PubSub is supported now
2 parents 1ec28ea + bdec2c2 commit 95f78c3

18 files changed

+340
-30
lines changed

src/RedisClient/Client/AbstractRedisClient.php

+9-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ abstract class AbstractRedisClient {
3535
*/
3636
protected static $defaultConfig = [
3737
self::CONFIG_SERVER => 'tcp://127.0.0.1:6379', // or 'unix:///tmp/redis.sock'
38-
self::CONFIG_TIMEOUT => 0.1, // in seconds
38+
self::CONFIG_TIMEOUT => 1, // in seconds
3939
];
4040

4141
/**
@@ -128,6 +128,14 @@ public function executeRawString($stringCommand) {
128128
return $this->executeRaw(explode(' ', $stringCommand));
129129
}
130130

131+
/**
132+
* @inheritdoc
133+
*/
134+
protected function subscribeCommand(array $subCommand, array $unsubCommand, array $params = null, $callback) {
135+
$this->getProtocol()->subscribe($this->getStructure($subCommand, $params), $callback);
136+
return $this->executeCommand($unsubCommand, $params);
137+
}
138+
131139
/**
132140
* @param string[] $command
133141
* @param array|null $params

src/RedisClient/Command/Traits/AbstractCommandsTrait.php

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ trait AbstractCommandsTrait {
1919
*/
2020
abstract protected function returnCommand(array $command, array $params = null, $parserId = null);
2121

22+
/**
23+
* @param array $subCommand
24+
* @param array $unsubCommand
25+
* @param array|null $params
26+
* @param \Closure|string|array $callback
27+
* @return mixed
28+
*/
29+
abstract protected function subscribeCommand(array $subCommand, array $unsubCommand, array $params = null, $callback);
30+
2231
/**
2332
* @return string
2433
*/

src/RedisClient/Command/Traits/Version2x6/PubSubCommandsTrait.php

+20-6
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,17 @@ trait PubSubCommandsTrait {
2020
* PSUBSCRIBE pattern [pattern ...]
2121
* Available since 2.0.0.
2222
* Time complexity: O(N) where N is the number of patterns the client is already subscribed to.
23+
* @link http://redis.io/commands/psubscribe
24+
*
25+
* @param string|string[] $patterns
26+
* @param \Closure|string|array $callback
27+
* @return string[]
2328
*/
24-
public function psubscribe($patterns) {
25-
return $this->returnCommand(['PSUBSCRIBE'], (array) $patterns);
29+
public function psubscribe($patterns, $callback) {
30+
if (!is_callable($callback)) {
31+
throw new \InvalidArgumentException('Function $callback is not callable');
32+
}
33+
return $this->subscribeCommand(['PSUBSCRIBE'], ['PUNSUBSCRIBE'], (array) $patterns, $callback);
2634
}
2735

2836
/**
@@ -33,7 +41,7 @@ public function psubscribe($patterns) {
3341
*
3442
* @param string $channel
3543
* @param string $message
36-
* @return int The number of clients that received the message.e
44+
* @return int The number of clients that received the message.
3745
*/
3846
public function publish($channel, $message) {
3947
return $this->returnCommand(['PUBLISH'], [$channel, $message]);
@@ -57,11 +65,17 @@ public function punsubscribe($patterns = null) {
5765
* Available since 2.0.0.
5866
* Time complexity: O(N) where N is the number of channels to subscribe to.
5967
*
68+
* @link http://redis.io/commands/psubscribe
69+
*
6070
* @param string|string[] $channels
61-
* @return
71+
* @param \Closure|string|array $callback
72+
* @return string[]
6273
*/
63-
public function subscribe($channels) {
64-
return $this->returnCommand(['SUBSCRIBE'], (array) $channels);
74+
public function subscribe($channels, $callback) {
75+
if (!is_callable($callback)) {
76+
throw new \InvalidArgumentException('Function $callback is not callable');
77+
}
78+
return $this->subscribeCommand(['SUBSCRIBE'], ['UNSUBSCRIBE'], (array) $channels, $callback);
6579
}
6680

6781
/**

src/RedisClient/Command/Traits/Version2x6/ServerCommandsTrait.php

+5-2
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,12 @@ public function lastsave() {
219219
/**
220220
* MONITOR
221221
* Available since 1.0.0.
222+
*
223+
* @param \Closure $callback
224+
* @return mixed
222225
*/
223-
public function monitor() {
224-
return $this->returnCommand(['MONITOR']);
226+
public function monitor(\Closure $callback) {
227+
return $this->subscribeCommand(['MONITOR'], ['QUIT'], null, $callback);
225228
}
226229

227230
/**

src/RedisClient/Pipeline/AbstractPipeline.php

+8
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ protected function returnCommand(array $command, array $params = null, $parserId
3737
return $this;
3838
}
3939

40+
/**
41+
* @inheritdoc
42+
*/
43+
protected function subscribeCommand(array $subCommand, array $unsubCommand, array $params = null, $callback) {
44+
$this->getProtocol()->subscribe($this->getStructure($subCommand, $params), $callback);
45+
return $this->executeCommand($unsubCommand, $params);
46+
}
47+
4048
/**
4149
* @return array[]
4250
*/

src/RedisClient/Protocol/ProtocolInterface.php

+7
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,11 @@ public function send(array $structure);
2424
*/
2525
public function sendMulti(array $structures);
2626

27+
/**
28+
* @param string[] $structure
29+
* @param \Closure|string|array $callback
30+
* @return mixed
31+
*/
32+
public function subscribe(array $structure, $callback);
33+
2734
}

src/RedisClient/Protocol/RedisProtocol.php

+18-4
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,7 @@ protected function write($raw) {
9797
*/
9898
protected function read() {
9999
if (!$line = $this->Connection->readLine()) {
100-
//todo: timeout usleep
101-
if (!$line = $this->Connection->readLine()) {
102-
throw new EmptyResponseException();
103-
}
100+
throw new EmptyResponseException('Empty response. Please, check connection timeout.');
104101
}
105102

106103
$type = $line[0];
@@ -170,4 +167,21 @@ public function sendMulti(array $structures) {
170167
return $response;
171168
}
172169

170+
/**
171+
* @inheritdoc
172+
*/
173+
public function subscribe(array $structures, $callback) {
174+
$this->write($this->packProtocolArray($structures));
175+
do {
176+
try {
177+
$response = (array) $this->read();
178+
array_push($response, null, null, null, null);
179+
} catch (EmptyResponseException $Ex) {
180+
$response = [null, null, null, null];
181+
}
182+
$continue = call_user_func_array($callback, $response);
183+
} while ($continue);
184+
185+
}
186+
173187
}

tests/Integration/Version2x6/ListsCommandsTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ListsCommandsTest extends \PHPUnit_Framework_TestCase {
3131
public static function setUpBeforeClass() {
3232
static::$Redis = new RedisClient2x6([
3333
'server' => static::TEST_REDIS_SERVER_1,
34-
'timeout' => 2,
34+
'timeout' => 10,
3535
]);
3636
}
3737

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
<?php
2+
/**
3+
* This file is part of RedisClient.
4+
* git: https://github.com/cheprasov/php-redis-client
5+
*
6+
* (C) Alexander Cheprasov <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
namespace Test\Integration\Version2x6;
12+
13+
use RedisClient\Client\Version\RedisClient2x6;
14+
use RedisClient\Exception\ErrorResponseException;
15+
16+
/**
17+
* @see PubSubCommandsTrait
18+
*/
19+
class PubSubCommandsTest extends \PHPUnit_Framework_TestCase {
20+
21+
const TEST_REDIS_SERVER_1 = TEST_REDIS_SERVER_2x6_1;
22+
23+
/**
24+
* @var RedisClient2x6
25+
*/
26+
protected static $Redis;
27+
28+
/**
29+
* @var RedisClient2x6
30+
*/
31+
protected static $Redis2;
32+
33+
/**
34+
* @inheritdoc
35+
*/
36+
public static function setUpBeforeClass() {
37+
static::$Redis = new RedisClient2x6([
38+
'server' => static::TEST_REDIS_SERVER_1,
39+
'timeout' => 2,
40+
]);
41+
static::$Redis2 = new RedisClient2x6([
42+
'server' => static::TEST_REDIS_SERVER_1,
43+
'timeout' => 2,
44+
]);
45+
}
46+
47+
/**
48+
* @inheritdoc
49+
*/
50+
public static function tearDownAfterClass() {
51+
static::$Redis->flushall();
52+
}
53+
54+
/**
55+
* @inheritdoc
56+
*/
57+
protected function setUp() {
58+
static::$Redis->flushall();
59+
}
60+
61+
/**
62+
* @see PubSubCommandsTrait::subscribe
63+
* @see PubSubCommandsTrait::unsubscribe
64+
*/
65+
public function test_subscribe_and_unsubscribe() {
66+
$Redis = static::$Redis;
67+
$Redis2 = static::$Redis2;
68+
69+
$time = time();
70+
$messages = [];
71+
$posts = [];
72+
73+
$result = $Redis->subscribe('channel-foo',
74+
function($type, $channel, $message) use ($Redis2, $time, &$messages, &$posts) {
75+
if (time() - $time > 1 || count($messages) > 20) {
76+
return false;
77+
}
78+
if (!isset($type)) {
79+
sleep(1);
80+
return true;
81+
}
82+
if ($type === 'message') {
83+
$messages[] = $message;
84+
}
85+
$Redis2->publish('channel-foo', $post = md5(rand(1, 9999)));
86+
$Redis2->publish('channel-bar', md5(rand(1, 9999)));
87+
$posts[] = $post;
88+
89+
return true;
90+
});
91+
92+
$this->assertSame(['unsubscribe', 'channel-foo', 0], $result);
93+
array_pop($posts);
94+
$this->assertSame($posts, $messages);
95+
96+
$this->assertSame(true, $Redis->set('foo', 'bar'));
97+
$this->assertSame('bar', $Redis->get('foo'));
98+
$this->assertSame(['foo'], $Redis->keys('*'));
99+
}
100+
101+
/**
102+
* @see PubSubCommandsTrait::psubscribe
103+
* @see PubSubCommandsTrait::punsubscribe
104+
*/
105+
public function test_psubscribe_and_punsubscribe() {
106+
$Redis = static::$Redis;
107+
$Redis2 = static::$Redis2;
108+
109+
$time = time();
110+
$messages = [];
111+
$posts = [];
112+
113+
$result = $Redis->psubscribe('channel-f*',
114+
function($type, $pattern, $channel, $message) use ($Redis2, $time, &$messages, &$posts) {
115+
if (time() - $time > 1 || count($messages) > 20) {
116+
return false;
117+
}
118+
if (!isset($type)) {
119+
sleep(1);
120+
return true;
121+
}
122+
if ($type === 'pmessage') {
123+
$messages[] = $message;
124+
}
125+
$Redis2->publish('channel-foo', $post = md5(rand(1, 9999)));
126+
$Redis2->publish('channel-bar', md5(rand(1, 9999)));
127+
$posts[] = $post;
128+
129+
return true;
130+
});
131+
132+
$this->assertSame(['punsubscribe', 'channel-f*', 0], $result);
133+
array_pop($posts);
134+
$this->assertSame($posts, $messages);
135+
136+
$this->assertSame(true, $Redis->set('foo', 'bar'));
137+
$this->assertSame('bar', $Redis->get('foo'));
138+
$this->assertSame(['foo'], $Redis->keys('*'));
139+
}
140+
141+
}

tests/Integration/Version2x6/ScriptingCommandsTest.php

-10
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,6 @@ class ScriptingCommandsTest extends \PHPUnit_Framework_TestCase {
2525
*/
2626
protected static $Redis;
2727

28-
/**
29-
* @var RedisClient2x6
30-
*/
31-
protected static $Redis2;
32-
33-
/**
34-
* @var array
35-
*/
36-
protected static $fields;
37-
3828
/**
3929
* @inheritdoc
4030
*/

tests/Integration/Version2x6/ServerCommandsTest.php

-2
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,13 @@ public static function setUpBeforeClass() {
5050
*/
5151
public static function tearDownAfterClass() {
5252
static::$Redis->flushall();
53-
static::$Redis->scriptFlush();
5453
}
5554

5655
/**
5756
* @inheritdoc
5857
*/
5958
protected function setUp() {
6059
static::$Redis->flushall();
61-
static::$Redis->scriptFlush();
6260
}
6361

6462
/**

tests/Integration/Version2x8/ListsCommandsTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ListsCommandsTest extends ListsCommandsTestVersion2x6 {
2828
public static function setUpBeforeClass() {
2929
static::$Redis = new RedisClient2x8([
3030
'server' => static::TEST_REDIS_SERVER_1,
31-
'timeout' => 2,
31+
'timeout' => 10,
3232
]);
3333
}
3434
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
/**
3+
* This file is part of RedisClient.
4+
* git: https://github.com/cheprasov/php-redis-client
5+
*
6+
* (C) Alexander Cheprasov <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
namespace Test\Integration\Version2x8;
12+
13+
include_once(__DIR__. '/../Version2x6/PubSubCommandsTest.php');
14+
15+
use RedisClient\Client\Version\RedisClient2x8;
16+
use Test\Integration\Version2x6\PubSubCommandsTest as PubSubCommandsTestVersion2x6;
17+
18+
/**
19+
* @see PubSubCommandsTrait
20+
*/
21+
class PubSubCommandsTest extends PubSubCommandsTestVersion2x6 {
22+
23+
const TEST_REDIS_SERVER_1 = TEST_REDIS_SERVER_2x8_1;
24+
25+
/**
26+
* @inheritdoc
27+
*/
28+
public static function setUpBeforeClass() {
29+
static::$Redis = new RedisClient2x8([
30+
'server' => static::TEST_REDIS_SERVER_1,
31+
'timeout' => 2,
32+
]);
33+
static::$Redis2 = new RedisClient2x8([
34+
'server' => static::TEST_REDIS_SERVER_1,
35+
'timeout' => 2,
36+
]);
37+
}
38+
}

0 commit comments

Comments
 (0)