@@ -3465,6 +3465,156 @@ async def test_xtrim(self, r: redis.Redis):
3465
3465
# 1 message is trimmed
3466
3466
assert await r .xtrim (stream , 3 , approximate = False ) == 1
3467
3467
3468
+ @skip_if_server_version_lt ("8.1.224" )
3469
+ async def test_xdelex (self , r : redis .Redis ):
3470
+ stream = "stream"
3471
+
3472
+ m1 = await r .xadd (stream , {"foo" : "bar" })
3473
+ m2 = await r .xadd (stream , {"foo" : "bar" })
3474
+ m3 = await r .xadd (stream , {"foo" : "bar" })
3475
+ m4 = await r .xadd (stream , {"foo" : "bar" })
3476
+
3477
+ # Test XDELEX with default ref_policy (KEEPREF)
3478
+ result = await r .xdelex (stream , m1 )
3479
+ assert result == [1 ]
3480
+
3481
+ # Test XDELEX with explicit KEEPREF
3482
+ result = await r .xdelex (stream , m2 , ref_policy = "KEEPREF" )
3483
+ assert result == [1 ]
3484
+
3485
+ # Test XDELEX with DELREF
3486
+ result = await r .xdelex (stream , m3 , ref_policy = "DELREF" )
3487
+ assert result == [1 ]
3488
+
3489
+ # Test XDELEX with ACKED
3490
+ result = await r .xdelex (stream , m4 , ref_policy = "ACKED" )
3491
+ assert result == [1 ]
3492
+
3493
+ # Test with non-existent ID
3494
+ result = await r .xdelex (stream , "999999-0" , ref_policy = "KEEPREF" )
3495
+ assert result == [- 1 ]
3496
+
3497
+ # Test with multiple IDs
3498
+ m5 = await r .xadd (stream , {"foo" : "bar" })
3499
+ m6 = await r .xadd (stream , {"foo" : "bar" })
3500
+ result = await r .xdelex (stream , m5 , m6 , ref_policy = "KEEPREF" )
3501
+ assert result == [1 , 1 ]
3502
+
3503
+ # Test error cases
3504
+ with pytest .raises (redis .DataError ):
3505
+ await r .xdelex (stream , "123-0" , ref_policy = "INVALID" )
3506
+
3507
+ with pytest .raises (redis .DataError ):
3508
+ await r .xdelex (stream ) # No IDs provided
3509
+
3510
+ @skip_if_server_version_lt ("8.1.224" )
3511
+ async def test_xackdel (self , r : redis .Redis ):
3512
+ stream = "stream"
3513
+ group = "group"
3514
+ consumer = "consumer"
3515
+
3516
+ m1 = await r .xadd (stream , {"foo" : "bar" })
3517
+ m2 = await r .xadd (stream , {"foo" : "bar" })
3518
+ m3 = await r .xadd (stream , {"foo" : "bar" })
3519
+ m4 = await r .xadd (stream , {"foo" : "bar" })
3520
+ await r .xgroup_create (stream , group , 0 )
3521
+
3522
+ await r .xreadgroup (group , consumer , streams = {stream : ">" })
3523
+
3524
+ # Test XACKDEL with default ref_policy (KEEPREF)
3525
+ result = await r .xackdel (stream , group , m1 )
3526
+ assert result == [1 ]
3527
+
3528
+ # Test XACKDEL with explicit KEEPREF
3529
+ result = await r .xackdel (stream , group , m2 , ref_policy = "KEEPREF" )
3530
+ assert result == [1 ]
3531
+
3532
+ # Test XACKDEL with DELREF
3533
+ result = await r .xackdel (stream , group , m3 , ref_policy = "DELREF" )
3534
+ assert result == [1 ]
3535
+
3536
+ # Test XACKDEL with ACKED
3537
+ result = await r .xackdel (stream , group , m4 , ref_policy = "ACKED" )
3538
+ assert result == [1 ]
3539
+
3540
+ # Test with non-existent ID
3541
+ result = await r .xackdel (stream , group , "999999-0" , ref_policy = "KEEPREF" )
3542
+ assert result == [- 1 ]
3543
+
3544
+ # Test error cases
3545
+ with pytest .raises (redis .DataError ):
3546
+ await r .xackdel (stream , group , m1 , ref_policy = "INVALID" )
3547
+
3548
+ with pytest .raises (redis .DataError ):
3549
+ await r .xackdel (stream , group ) # No IDs provided
3550
+
3551
+ @skip_if_server_version_lt ("8.1.224" )
3552
+ async def test_xtrim_with_options (self , r : redis .Redis ):
3553
+ stream = "stream"
3554
+
3555
+ await r .xadd (stream , {"foo" : "bar" })
3556
+ await r .xadd (stream , {"foo" : "bar" })
3557
+ await r .xadd (stream , {"foo" : "bar" })
3558
+ await r .xadd (stream , {"foo" : "bar" })
3559
+
3560
+ # Test XTRIM with KEEPREF ref_policy
3561
+ assert (
3562
+ await r .xtrim (stream , maxlen = 2 , approximate = False , ref_policy = "KEEPREF" )
3563
+ == 2
3564
+ )
3565
+
3566
+ await r .xadd (stream , {"foo" : "bar" })
3567
+ await r .xadd (stream , {"foo" : "bar" })
3568
+
3569
+ # Test XTRIM with DELREF ref_policy
3570
+ assert (
3571
+ await r .xtrim (stream , maxlen = 2 , approximate = False , ref_policy = "DELREF" ) == 2
3572
+ )
3573
+
3574
+ await r .xadd (stream , {"foo" : "bar" })
3575
+ await r .xadd (stream , {"foo" : "bar" })
3576
+
3577
+ # Test XTRIM with ACKED ref_policy
3578
+ assert (
3579
+ await r .xtrim (stream , maxlen = 2 , approximate = False , ref_policy = "ACKED" ) == 2
3580
+ )
3581
+
3582
+ # Test error case
3583
+ with pytest .raises (redis .DataError ):
3584
+ await r .xtrim (stream , maxlen = 2 , ref_policy = "INVALID" )
3585
+
3586
+ @skip_if_server_version_lt ("8.1.224" )
3587
+ async def test_xadd_with_options (self , r : redis .Redis ):
3588
+ stream = "stream"
3589
+
3590
+ # Test XADD with KEEPREF ref_policy
3591
+ await r .xadd (
3592
+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "KEEPREF"
3593
+ )
3594
+ await r .xadd (
3595
+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "KEEPREF"
3596
+ )
3597
+ await r .xadd (
3598
+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "KEEPREF"
3599
+ )
3600
+ assert await r .xlen (stream ) == 2
3601
+
3602
+ # Test XADD with DELREF ref_policy
3603
+ await r .xadd (
3604
+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "DELREF"
3605
+ )
3606
+ assert await r .xlen (stream ) == 2
3607
+
3608
+ # Test XADD with ACKED ref_policy
3609
+ await r .xadd (
3610
+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "ACKED"
3611
+ )
3612
+ assert await r .xlen (stream ) == 2
3613
+
3614
+ # Test error case
3615
+ with pytest .raises (redis .DataError ):
3616
+ await r .xadd (stream , {"foo" : "bar" }, ref_policy = "INVALID" )
3617
+
3468
3618
@pytest .mark .onlynoncluster
3469
3619
async def test_bitfield_operations (self , r : redis .Redis ):
3470
3620
# comments show affected bits
0 commit comments