@@ -16,8 +16,8 @@ import {
16
16
NodeOptions ,
17
17
NodeRequestManager ,
18
18
Queue ,
19
+ JobHandler ,
19
20
createNode ,
20
- createJobHandler ,
21
21
} from '../../src/index.js' ;
22
22
import { OfferData } from '../../src/shared/types.js' ;
23
23
import { DealStatus , ProtocolContracts } from '../../src/shared/contracts.js' ;
@@ -63,6 +63,14 @@ process.once('unhandledRejection', (error) => {
63
63
process . exit ( 1 ) ;
64
64
} ) ;
65
65
66
+ const createJobHandler =
67
+ < JobData = unknown , HandlerOptions = unknown > (
68
+ handler : JobHandler < JobData , HandlerOptions > ,
69
+ ) =>
70
+ ( options : HandlerOptions = { } as HandlerOptions ) =>
71
+ ( data : JobData ) =>
72
+ handler ( data , options ) ;
73
+
66
74
/**
67
75
* This is interface of object that you want to pass to the job handler as options
68
76
*/
@@ -76,8 +84,18 @@ interface DealHandlerOptions {
76
84
const dealHandler = createJobHandler <
77
85
OfferData < RequestQuery , OfferOptions > ,
78
86
DealHandlerOptions
79
- > ( async ( { name, id, data : offer } , { contracts } ) => {
80
- logger . trace ( `Job "${ name } " #${ id } Checking for a deal. Offer #${ offer . id } ` ) ;
87
+ > ( async ( offer , options ) => {
88
+ if ( ! offer || ! options ) {
89
+ throw new Error ( 'Invalid job execution configuration' ) ;
90
+ }
91
+
92
+ const { contracts } = options ;
93
+
94
+ if ( ! contracts ) {
95
+ throw new Error ( 'Contracts manager must be provided to job handler config' ) ;
96
+ }
97
+
98
+ logger . trace ( `Checking for a deal. Offer #${ offer . id } ` ) ;
81
99
82
100
// Check for a deal
83
101
const [ , , , buyer , , , status ] = await contracts . getDeal ( offer ) ;
@@ -98,10 +116,10 @@ const dealHandler = createJobHandler<
98
116
} ,
99
117
) ;
100
118
101
- return true ; // Returning true means that the job must be stopped
119
+ return false ; // Returning true means that the job must be stopped
102
120
}
103
121
104
- return ; // Job continuing
122
+ return true ; // Job continuing
105
123
} ) ;
106
124
107
125
/**
@@ -155,17 +173,20 @@ const createRequestsHandler =
155
173
checkOut : BigInt ( nowSec ( ) + 2000 ) ,
156
174
} ) ;
157
175
158
- queue . addEventListener ( 'expired ' , ( { detail : job } ) => {
159
- logger . trace ( `Job #${ job . id } is expired` ) ;
176
+ queue . addEventListener ( 'status ' , ( { detail : job } ) => {
177
+ logger . trace ( `Job #${ job . id } status changed` , job ) ;
160
178
} ) ;
161
179
162
180
/**
163
181
* On every published offer we expecting a deal.
164
182
* So, we add a job for detection of deals
165
183
*/
166
- queue . addJob ( 'deal' , offer , {
184
+ queue . add ( {
185
+ handlerName : 'deal' ,
186
+ data : offer ,
187
+ isRecurrent : true ,
188
+ recurrenceInterval : 5000 ,
167
189
expire : Number ( offer . expire ) ,
168
- every : 5000 , // 5 sec
169
190
} ) ;
170
191
} ;
171
192
@@ -211,8 +232,8 @@ const main = async (): Promise<void> => {
211
232
212
233
const queue = new Queue ( {
213
234
storage,
214
- hashKey : 'jobs ' ,
215
- concurrentJobsNumber : 3 ,
235
+ idsKeyName : 'jobsIds ' ,
236
+ concurrencyLimit : 3 ,
216
237
} ) ;
217
238
218
239
const requestManager = new NodeRequestManager < RequestQuery > ( {
@@ -234,7 +255,7 @@ const main = async (): Promise<void> => {
234
255
requestManager . add ( topic , data ) ;
235
256
} ) ;
236
257
237
- queue . addJobHandler ( 'deal' , dealHandler ( { contracts : contractsManager } ) ) ;
258
+ queue . registerHandler ( 'deal' , dealHandler ( { contracts : contractsManager } ) ) ;
238
259
239
260
/**
240
261
* Graceful Shutdown handler
0 commit comments