@@ -2,10 +2,16 @@ use crate::db::{DbPool, PoolError};
2
2
use primitives:: { Campaign , CampaignId , ChannelId } ;
3
3
use tokio_postgres:: types:: Json ;
4
4
5
+ pub use campaign_remaining:: CampaignRemaining ;
6
+
7
+ /// ```text
8
+ /// INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to)
9
+ /// VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
10
+ /// ```
5
11
pub async fn insert_campaign ( pool : & DbPool , campaign : & Campaign ) -> Result < bool , PoolError > {
6
12
let client = pool. get ( ) . await ?;
7
13
let ad_units = Json ( campaign. ad_units . clone ( ) ) ;
8
- let stmt = client. prepare ( "INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)" ) . await ?;
14
+ let stmt = client. prepare ( "INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)" ) . await ?;
9
15
let inserted = client
10
16
. execute (
11
17
& stmt,
@@ -49,6 +55,10 @@ pub async fn fetch_campaign(
49
55
}
50
56
51
57
// TODO: We might need to use LIMIT to implement pagination
58
+ /// ```text
59
+ /// SELECT id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to
60
+ /// FROM campaigns WHERE channel_id = $1
61
+ /// ```
52
62
pub async fn get_campaigns_by_channel (
53
63
pool : & DbPool ,
54
64
channel_id : & ChannelId ,
@@ -95,6 +105,277 @@ pub async fn update_campaign(pool: &DbPool, campaign: &Campaign) -> Result<Campa
95
105
Ok ( Campaign :: from ( & updated_row) )
96
106
}
97
107
108
+ /// struct that handles redis calls for the Campaign Remaining Budget
109
+ mod campaign_remaining {
110
+ use crate :: db:: RedisError ;
111
+ use primitives:: { CampaignId , UnifiedNum } ;
112
+ use redis:: aio:: MultiplexedConnection ;
113
+
114
+ #[ derive( Clone ) ]
115
+ pub struct CampaignRemaining {
116
+ redis : MultiplexedConnection ,
117
+ }
118
+
119
+ impl CampaignRemaining {
120
+ pub const CAMPAIGN_REMAINING_KEY : & ' static str = "campaignRemaining" ;
121
+
122
+ pub fn get_key ( campaign : CampaignId ) -> String {
123
+ format ! ( "{}:{}" , Self :: CAMPAIGN_REMAINING_KEY , campaign)
124
+ }
125
+
126
+ pub fn new ( redis : MultiplexedConnection ) -> Self {
127
+ Self { redis }
128
+ }
129
+
130
+ pub async fn set_initial (
131
+ & self ,
132
+ campaign : CampaignId ,
133
+ amount : UnifiedNum ,
134
+ ) -> Result < bool , RedisError > {
135
+ redis:: cmd ( "SETNX" )
136
+ . arg ( & Self :: get_key ( campaign) )
137
+ . arg ( amount. to_u64 ( ) )
138
+ . query_async ( & mut self . redis . clone ( ) )
139
+ . await
140
+ }
141
+
142
+ pub async fn get_remaining_opt (
143
+ & self ,
144
+ campaign : CampaignId ,
145
+ ) -> Result < Option < i64 > , RedisError > {
146
+ redis:: cmd ( "GET" )
147
+ . arg ( & Self :: get_key ( campaign) )
148
+ . query_async :: < _ , Option < i64 > > ( & mut self . redis . clone ( ) )
149
+ . await
150
+ }
151
+
152
+ /// This method uses `max(0, value)` to clamp the value of a campaign, which can be negative and uses `i64`.
153
+ /// In addition, it defaults the campaign keys that were not found to `0`.
154
+ pub async fn get_multiple (
155
+ & self ,
156
+ campaigns : & [ CampaignId ] ,
157
+ ) -> Result < Vec < UnifiedNum > , RedisError > {
158
+ // `MGET` fails on empty keys
159
+ if campaigns. is_empty ( ) {
160
+ return Ok ( vec ! [ ] ) ;
161
+ }
162
+
163
+ let keys: Vec < String > = campaigns
164
+ . iter ( )
165
+ . map ( |campaign| Self :: get_key ( * campaign) )
166
+ . collect ( ) ;
167
+
168
+ let campaigns_remaining = redis:: cmd ( "MGET" )
169
+ . arg ( keys)
170
+ . query_async :: < _ , Vec < Option < i64 > > > ( & mut self . redis . clone ( ) )
171
+ . await ?
172
+ . into_iter ( )
173
+ . map ( |remaining| match remaining {
174
+ Some ( remaining) => UnifiedNum :: from_u64 ( remaining. max ( 0 ) . unsigned_abs ( ) ) ,
175
+ None => UnifiedNum :: from_u64 ( 0 ) ,
176
+ } )
177
+ . collect ( ) ;
178
+
179
+ Ok ( campaigns_remaining)
180
+ }
181
+
182
+ pub async fn increase_by (
183
+ & self ,
184
+ campaign : CampaignId ,
185
+ amount : UnifiedNum ,
186
+ ) -> Result < i64 , RedisError > {
187
+ let key = Self :: get_key ( campaign) ;
188
+ redis:: cmd ( "INCRBY" )
189
+ . arg ( & key)
190
+ . arg ( amount. to_u64 ( ) )
191
+ . query_async ( & mut self . redis . clone ( ) )
192
+ . await
193
+ }
194
+
195
+ pub async fn decrease_by (
196
+ & self ,
197
+ campaign : CampaignId ,
198
+ amount : UnifiedNum ,
199
+ ) -> Result < i64 , RedisError > {
200
+ let key = Self :: get_key ( campaign) ;
201
+ redis:: cmd ( "DECRBY" )
202
+ . arg ( & key)
203
+ . arg ( amount. to_u64 ( ) )
204
+ . query_async ( & mut self . redis . clone ( ) )
205
+ . await
206
+ }
207
+ }
208
+
209
+ #[ cfg( test) ]
210
+ mod test {
211
+ use primitives:: util:: tests:: prep_db:: DUMMY_CAMPAIGN ;
212
+
213
+ use crate :: db:: redis_pool:: TESTS_POOL ;
214
+
215
+ use super :: * ;
216
+
217
+ #[ tokio:: test]
218
+ async fn it_sets_initial_increases_and_decreases_remaining_for_campaign ( ) {
219
+ let redis = TESTS_POOL . get ( ) . await . expect ( "Should return Object" ) ;
220
+
221
+ let campaign = DUMMY_CAMPAIGN . id ;
222
+ let campaign_remaining = CampaignRemaining :: new ( redis. connection . clone ( ) ) ;
223
+
224
+ // Get remaining on a key which was not set
225
+ {
226
+ let get_remaining = campaign_remaining
227
+ . get_remaining_opt ( campaign)
228
+ . await
229
+ . expect ( "Should get None" ) ;
230
+
231
+ assert_eq ! ( None , get_remaining) ;
232
+ }
233
+
234
+ // Set Initial amount on that key
235
+ {
236
+ let initial_amount = UnifiedNum :: from ( 1_000_u64 ) ;
237
+ let set_initial = campaign_remaining
238
+ . set_initial ( campaign, initial_amount)
239
+ . await
240
+ . expect ( "Should set value in redis" ) ;
241
+ assert ! ( set_initial) ;
242
+
243
+ // get the remaining of that key, should be the initial value
244
+ let get_remaining = campaign_remaining
245
+ . get_remaining_opt ( campaign)
246
+ . await
247
+ . expect ( "Should get None" ) ;
248
+
249
+ assert_eq ! (
250
+ Some ( 1_000_i64 ) ,
251
+ get_remaining,
252
+ "should return the initial value that was set"
253
+ ) ;
254
+ }
255
+
256
+ // Set initial on already existing key, should return `false`
257
+ {
258
+ let set_failing_initial = campaign_remaining
259
+ . set_initial ( campaign, UnifiedNum :: from ( 999_u64 ) )
260
+ . await
261
+ . expect ( "Should set value in redis" ) ;
262
+ assert ! ( !set_failing_initial) ;
263
+ }
264
+
265
+ // Decrease by amount
266
+ {
267
+ let decrease_amount = UnifiedNum :: from ( 64 ) ;
268
+ let decrease_by = campaign_remaining
269
+ . decrease_by ( campaign, decrease_amount)
270
+ . await
271
+ . expect ( "Should decrease remaining amount" ) ;
272
+
273
+ assert_eq ! ( 936_i64 , decrease_by) ;
274
+ }
275
+
276
+ // Increase by amount
277
+ {
278
+ let increase_amount = UnifiedNum :: from ( 1064 ) ;
279
+ let increase_by = campaign_remaining
280
+ . increase_by ( campaign, increase_amount)
281
+ . await
282
+ . expect ( "Should increase remaining amount" ) ;
283
+
284
+ assert_eq ! ( 2_000_i64 , increase_by) ;
285
+ }
286
+
287
+ let get_remaining = campaign_remaining
288
+ . get_remaining_opt ( campaign)
289
+ . await
290
+ . expect ( "Should get remaining" ) ;
291
+
292
+ assert_eq ! ( Some ( 2_000_i64 ) , get_remaining) ;
293
+
294
+ // Decrease by amount > than currently set
295
+ {
296
+ let decrease_amount = UnifiedNum :: from ( 5_000 ) ;
297
+ let decrease_by = campaign_remaining
298
+ . decrease_by ( campaign, decrease_amount)
299
+ . await
300
+ . expect ( "Should decrease remaining amount" ) ;
301
+
302
+ assert_eq ! ( -3_000_i64 , decrease_by) ;
303
+ }
304
+
305
+ // Increase the negative value without going > 0
306
+ {
307
+ let increase_amount = UnifiedNum :: from ( 1000 ) ;
308
+ let increase_by = campaign_remaining
309
+ . increase_by ( campaign, increase_amount)
310
+ . await
311
+ . expect ( "Should increase remaining amount" ) ;
312
+
313
+ assert_eq ! ( -2_000_i64 , increase_by) ;
314
+ }
315
+ }
316
+
317
+ #[ tokio:: test]
318
+ async fn it_gets_multiple_campaigns_remaining ( ) {
319
+ let redis = TESTS_POOL . get ( ) . await . expect ( "Should return Object" ) ;
320
+ let campaign_remaining = CampaignRemaining :: new ( redis. connection . clone ( ) ) ;
321
+
322
+ // get multiple with empty campaigns slice
323
+ // `MGET` throws error on an empty keys argument
324
+ assert ! (
325
+ campaign_remaining
326
+ . get_multiple( & [ ] )
327
+ . await
328
+ . expect( "Should get multiple" )
329
+ . is_empty( ) ,
330
+ "Should return an empty result"
331
+ ) ;
332
+
333
+ let campaigns = ( CampaignId :: new ( ) , CampaignId :: new ( ) , CampaignId :: new ( ) ) ;
334
+
335
+ // set initial amounts
336
+ {
337
+ assert ! ( campaign_remaining
338
+ . set_initial( campaigns. 0 , UnifiedNum :: from( 100 ) )
339
+ . await
340
+ . expect( "Should set value in redis" ) ) ;
341
+
342
+ assert ! ( campaign_remaining
343
+ . set_initial( campaigns. 1 , UnifiedNum :: from( 200 ) )
344
+ . await
345
+ . expect( "Should set value in redis" ) ) ;
346
+
347
+ assert ! ( campaign_remaining
348
+ . set_initial( campaigns. 2 , UnifiedNum :: from( 300 ) )
349
+ . await
350
+ . expect( "Should set value in redis" ) ) ;
351
+ }
352
+
353
+ // set campaigns.1 to negative value, should return `0` because of `max(value, 0)`
354
+ assert_eq ! (
355
+ -300_i64 ,
356
+ campaign_remaining
357
+ . decrease_by( campaigns. 1 , UnifiedNum :: from( 500 ) )
358
+ . await
359
+ . expect( "Should decrease remaining" )
360
+ ) ;
361
+
362
+ let multiple = campaign_remaining
363
+ . get_multiple ( & [ campaigns. 0 , campaigns. 1 , campaigns. 2 ] )
364
+ . await
365
+ . expect ( "Should get multiple" ) ;
366
+
367
+ assert_eq ! (
368
+ vec![
369
+ UnifiedNum :: from( 100 ) ,
370
+ UnifiedNum :: from( 0 ) ,
371
+ UnifiedNum :: from( 300 )
372
+ ] ,
373
+ multiple
374
+ ) ;
375
+ }
376
+ }
377
+ }
378
+
98
379
#[ cfg( test) ]
99
380
mod test {
100
381
use primitives:: {
0 commit comments