Skip to content

Commit aa5092e

Browse files
authored
Merge pull request #71 from chrisryan/unordered
Add UnorderedQueue class.
2 parents a9aca1e + 0065ca0 commit aa5092e

File tree

4 files changed

+538
-4
lines changed

4 files changed

+538
-4
lines changed

.github/workflows/php.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ on:
88

99
jobs:
1010
build:
11-
runs-on: ubuntu-18.04
11+
runs-on: ubuntu-latest
1212
strategy:
1313
matrix:
1414
php-versions: ['7.2', '7.3', '7.4', '8.0', '8.1', '8.2']

src/AbstractQueue.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ final public function get(array $query, array $options = []) : array
157157
$query
158158
);
159159

160-
$options += self::DEFAULT_GET_OPTIONS;
160+
$options += static::DEFAULT_GET_OPTIONS;
161161
$update = ['$set' => ['earliestGet' => $this->calculateEarliestGet($options['runningResetDuration'])]];
162162
$end = $this->calculateEndTime($options['waitDurationInMillis']);
163163
$sleepTime = $this->calculateSleepTime($options['pollDurationInMillis']);
@@ -305,12 +305,12 @@ private function calculateEarliestGet(int $runningResetDuration) : UTCDateTime
305305
{
306306
$resetTimestamp = time() + $runningResetDuration;
307307
//ints overflow to floats, max at PHP_INT_MAX
308-
return new UTCDateTime(min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX));
308+
return new UTCDateTime(min(max(0, $resetTimestamp * 1000), static::MONGO_INT32_MAX));
309309
}
310310

311311
private function tryFindOneAndUpdate(array $query, array $update, ArrayObject $messages) : bool
312312
{
313-
$document = $this->collection->findOneAndUpdate($query, $update, self::FIND_ONE_AND_UPDATE_OPTIONS);
313+
$document = $this->collection->findOneAndUpdate($query, $update, static::FIND_ONE_AND_UPDATE_OPTIONS);
314314
if ($document === null) {
315315
return false;
316316
}

src/UnorderedQueue.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
/**
3+
* Defines the TraderInteractive\Mongo\UnorderedQueue class.
4+
*/
5+
6+
namespace TraderInteractive\Mongo;
7+
8+
use MongoDB\Client;
9+
use MongoDB\Operation\FindOneAndUpdate;
10+
11+
/**
12+
* Abstraction of mongo db collection as unordered queue.
13+
*/
14+
final class UnorderedQueue extends AbstractQueue implements QueueInterface
15+
{
16+
/**
17+
* Override from AbstractQueue class to remove sort options.
18+
*
19+
* @var array
20+
*/
21+
const FIND_ONE_AND_UPDATE_OPTIONS = [
22+
'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
23+
'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
24+
];
25+
26+
/**
27+
* Construct queue.
28+
*
29+
* @param \MongoDB\Collection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
30+
* @param string $db the mongo db name
31+
* @param string $collection the collection name to use for the queue
32+
*
33+
* @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
34+
*/
35+
public function __construct($collectionOrUrl, string $db = null, string $collection = null)
36+
{
37+
if ($collectionOrUrl instanceof \MongoDB\Collection) {
38+
$this->collection = $collectionOrUrl;
39+
return;
40+
}
41+
42+
if (!is_string($collectionOrUrl)) {
43+
throw new \InvalidArgumentException('$collectionOrUrl was not a string');
44+
}
45+
46+
$this->collection = (new Client($collectionOrUrl))->selectDatabase($db)->selectCollection($collection);
47+
}
48+
}

0 commit comments

Comments
 (0)