Skip to content

Commit 47d6d9e

Browse files
authored
Adds Hyperf\AsyncQueue\Job::fail() method (#6124)
1 parent 9d1eeae commit 47d6d9e

File tree

3 files changed

+15
-0
lines changed

3 files changed

+15
-0
lines changed

src/Driver/Driver.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public function consume(): void
5757

5858
while (ProcessManager::isRunning()) {
5959
try {
60+
/** @var MessageInterface $message */
6061
[$data, $message] = $this->pop();
6162

6263
if ($data === false) {
@@ -95,6 +96,10 @@ protected function checkQueueLength(): void
9596
}
9697
}
9798

99+
/**
100+
* @param mixed $data
101+
* @param MessageInterface $message
102+
*/
98103
protected function getCallback($data, $message): callable
99104
{
100105
return function () use ($data, $message) {
@@ -114,6 +119,7 @@ protected function getCallback($data, $message): callable
114119
} else {
115120
$this->event?->dispatch(new FailedHandle($message, $ex));
116121
$this->fail($data);
122+
$message->job()->fail($ex);
117123
}
118124
}
119125
}

src/Job.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@
1313

1414
use Hyperf\Contract\CompressInterface;
1515
use Hyperf\Contract\UnCompressInterface;
16+
use Throwable;
1617

1718
abstract class Job implements JobInterface, CompressInterface, UnCompressInterface
1819
{
1920
protected int $maxAttempts = 0;
2021

22+
public function fail(Throwable $e): void
23+
{
24+
}
25+
2126
public function setMaxAttempts(int $maxAttempts): static
2227
{
2328
$this->maxAttempts = $maxAttempts;

src/JobInterface.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@
1111
*/
1212
namespace Hyperf\AsyncQueue;
1313

14+
use Throwable;
15+
1416
interface JobInterface
1517
{
18+
public function fail(Throwable $e): void;
19+
1620
/**
1721
* Handle the job.
1822
*/

0 commit comments

Comments
 (0)