@@ -1392,6 +1392,216 @@ TEST(Command, testObject) {
13921392#endif
13931393}
13941394
1395+ void testBrpop (std::shared_ptr<ServerEntry> svr) {
1396+ asio::io_context ioContext;
1397+ asio::ip::tcp::socket socket (ioContext), socket1 (ioContext);
1398+ NetSession sess (svr, std::move (socket), 1 , false , nullptr , nullptr );
1399+ sess.setArgs ({" lpush" , " list" , " a" });
1400+ auto expect = Command::runSessionCmd (&sess);
1401+ EXPECT_TRUE (expect.ok ());
1402+ sess.setArgs ({" brpop" , " list" , " 10" });
1403+ expect = Command::runSessionCmd (&sess);
1404+ EXPECT_TRUE (expect.ok ());
1405+ }
1406+
1407+ TEST (Command, brpop) {
1408+ const auto guard = MakeGuard ([] { destroyEnv (); });
1409+
1410+ EXPECT_TRUE (setupEnv ());
1411+
1412+ auto cfg = makeServerParam ();
1413+ auto server = makeServerEntry (cfg);
1414+
1415+ testBrpop (server);
1416+
1417+ #ifndef _WIN32
1418+ server->stop ();
1419+ EXPECT_EQ (server.use_count (), 1 );
1420+ #endif
1421+ }
1422+
1423+ void testBlockCommand (std::shared_ptr<ServerEntry> svr) {
1424+ asio::io_context ioContext;
1425+ asio::ip::tcp::socket socket (ioContext), socket1 (ioContext),
1426+ socket2 (ioContext);
1427+ auto sess = std::make_shared<NetSession>(
1428+ svr, std::move (socket), 1 , false , nullptr , nullptr );
1429+ auto sess1 = std::make_shared<NetSession>(
1430+ svr, std::move (socket1), 2 , false , nullptr , nullptr );
1431+ auto sess2 = std::make_shared<NetSession>(
1432+ svr, std::move (socket2), 3 , false , nullptr , nullptr );
1433+ svr->addSession (sess);
1434+ svr->addSession (sess1);
1435+ svr->addSession (sess2);
1436+ {
1437+ sess->setArgs ({" blpop" , " list1" , " 1" });
1438+ auto expect = Command::runSessionCmd (sess.get ());
1439+ EXPECT_EQ (sess->isBlocked (), true );
1440+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1441+ std::this_thread::sleep_for (std::chrono::milliseconds (400 ));
1442+ EXPECT_EQ (sess->isBlocked (), true );
1443+ std::this_thread::sleep_for (std::chrono::milliseconds (400 ));
1444+ EXPECT_EQ (sess->isBlocked (), true );
1445+ std::this_thread::sleep_for (std::chrono::milliseconds (400 ));
1446+ EXPECT_EQ (sess->isBlocked (), false );
1447+ }
1448+ {
1449+ sess->setArgs ({" blpop" , " list1" , " 0" });
1450+ auto expect = Command::runSessionCmd (sess.get ());
1451+ EXPECT_EQ (sess->isBlocked (), true );
1452+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1453+ sess1->setArgs ({" rpush" , " list1" , " a" });
1454+ expect = Command::runSessionCmd (sess1.get ());
1455+ std::this_thread::sleep_for (std::chrono::seconds (1 ));
1456+ EXPECT_EQ (sess->isBlocked (), false );
1457+ }
1458+ {
1459+ sess2->setArgs ({" brpop" , " list1" , " 0" });
1460+ auto expect = Command::runSessionCmd (sess2.get ());
1461+ EXPECT_EQ (sess2->isBlocked (), true );
1462+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1463+ sess->setArgs ({" brpop" , " list1" , " 0" });
1464+ expect = Command::runSessionCmd (sess.get ());
1465+ EXPECT_EQ (sess->isBlocked (), true );
1466+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1467+
1468+ svr->endSession (sess2->id ());
1469+ sess1->setArgs ({" lpush" , " list1" , " a" });
1470+ expect = Command::runSessionCmd (sess1.get ());
1471+ std::this_thread::sleep_for (std::chrono::seconds (1 ));
1472+ EXPECT_EQ (sess->isBlocked (), false );
1473+ }
1474+ }
1475+
1476+ TEST (Command, testBlockCommand) {
1477+ const auto guard = MakeGuard ([] { destroyEnv (); });
1478+
1479+ EXPECT_TRUE (setupEnv ());
1480+
1481+ auto cfg = makeServerParam ();
1482+ auto server = makeServerEntry (cfg);
1483+ testBlockCommand (server);
1484+
1485+ #ifndef _WIN32
1486+ server->stop ();
1487+ EXPECT_EQ (server.use_count (), 1 );
1488+ #endif
1489+ }
1490+
1491+ TEST (Command, BlockCommand) {
1492+ const auto guard = MakeGuard ([] { destroyEnv (); });
1493+
1494+ EXPECT_TRUE (setupEnv ());
1495+
1496+ auto cfg = makeServerParam ();
1497+ auto server = makeServerEntry (cfg);
1498+ asio::io_context ioContext;
1499+ asio::ip::tcp::socket socket (ioContext), socket1 (ioContext),
1500+ socket2 (ioContext);
1501+ auto sess = std::make_shared<NetSession>(
1502+ server, std::move (socket), 1 , false , nullptr , nullptr );
1503+ auto sess1 = std::make_shared<NetSession>(
1504+ server, std::move (socket1), 2 , false , nullptr , nullptr );
1505+ auto sess2 = std::make_shared<NetSession>(
1506+ server, std::move (socket2), 3 , false , nullptr , nullptr );
1507+ server->addSession (sess);
1508+ server->addSession (sess1);
1509+ server->addSession (sess2);
1510+ // test blpop case 1: block command nerver block permanently due to lost
1511+ // wakeup
1512+ for (int i = 0 ; i < 20 ; i++) {
1513+ sess1->setArgs ({" brpop" , " list1" , " list2" , " 0" });
1514+ auto expect = Command::runSessionCmd (sess1.get ());
1515+ EXPECT_EQ (sess1->isBlocked (), true );
1516+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1517+ sess2->setArgs ({" brpop" , " list1" , " 0" });
1518+ expect = Command::runSessionCmd (sess2.get ());
1519+ EXPECT_EQ (sess2->isBlocked (), true );
1520+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1521+ sess->setArgs ({" lpush" , " list2" , " b" });
1522+ expect = Command::runSessionCmd (sess.get ());
1523+ sess->setArgs ({" lpush" , " list1" , " a" });
1524+ expect = Command::runSessionCmd (sess.get ());
1525+ std::this_thread::sleep_for (std::chrono::milliseconds (5 ));
1526+ EXPECT_EQ (sess1->isBlocked (), false );
1527+ EXPECT_EQ (sess2->isBlocked (), false );
1528+ }
1529+ // test blpop case 2: block command nerver block session on the same key twice
1530+ for (int i = 0 ; i < 20 ; i++) {
1531+ sess1->setArgs ({" brpop" , " list1" , " list1" , " 0" });
1532+ auto expect = Command::runSessionCmd (sess1.get ());
1533+ EXPECT_EQ (sess1->isBlocked (), true );
1534+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1535+ sess2->setArgs ({" brpop" , " list1" , " 0" });
1536+ expect = Command::runSessionCmd (sess2.get ());
1537+ EXPECT_EQ (sess2->isBlocked (), true );
1538+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1539+ sess->setArgs ({" lpush" , " list1" , " a" , " b" });
1540+ expect = Command::runSessionCmd (sess.get ());
1541+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1542+ EXPECT_EQ (sess1->isBlocked (), false );
1543+ EXPECT_EQ (sess2->isBlocked (), false );
1544+ }
1545+ // test blpop case 3: block command will be waked up by any key which is in
1546+ // the key list
1547+ for (int i = 0 ; i < 20 ; i++) {
1548+ sess1->setArgs ({" brpop" , " list1" , " list2" , " 0" });
1549+ auto expect = Command::runSessionCmd (sess1.get ());
1550+ EXPECT_EQ (sess1->isBlocked (), true );
1551+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1552+ auto key = genRand () % 2 ? " list1" : " list2" ;
1553+ sess->setArgs ({" lpush" , key, " a" });
1554+ expect = Command::runSessionCmd (sess.get ());
1555+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1556+ EXPECT_EQ (sess1->isBlocked (), false );
1557+ }
1558+ // test blpop case 4: block command will not be waked up by any key when key
1559+ // is was poped
1560+ for (int i = 0 ; i < 20 ; i++) {
1561+ sess1->setArgs ({" brpop" , " list1" , " list2" , " 0" });
1562+ auto expect = Command::runSessionCmd (sess1.get ());
1563+ EXPECT_EQ (sess1->isBlocked (), true );
1564+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1565+ auto key = genRand () % 2 ? " list1" : " list2" ;
1566+ {
1567+ std::lock_guard<std::mutex> lock (sess1->_mtx );
1568+ sess->setArgs ({" lpush" , key, " a" });
1569+ expect = Command::runSessionCmd (sess.get ());
1570+ sess->setArgs ({" lpop" , key});
1571+ expect = Command::runSessionCmd (sess.get ());
1572+ }
1573+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1574+ EXPECT_EQ (sess1->isBlocked (), true );
1575+ sess->setArgs ({" lpush" , key, " a" });
1576+ expect = Command::runSessionCmd (sess.get ());
1577+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1578+ EXPECT_EQ (sess1->isBlocked (), false );
1579+ }
1580+ // test blpop case 5: block command will be timeout or wakeup
1581+ for (int i = 0 ; i < 20 ; i++) {
1582+ sess1->setArgs ({" brpop" , " list1" , " list2" , " 1" });
1583+ auto expect = Command::runSessionCmd (sess1.get ());
1584+ EXPECT_EQ (sess1->isBlocked (), true );
1585+ EXPECT_EQ (expect.status ().code (), ErrorCodes::ERR_BLOCKCMD);
1586+ auto key = genRand () % 2 ? " list1" : " list2" ;
1587+ {
1588+ std::lock_guard<std::mutex> lock (sess1->_mtx );
1589+ sess->setArgs ({" lpush" , key, " a" });
1590+ expect = Command::runSessionCmd (sess.get ());
1591+ std::this_thread::sleep_for (std::chrono::milliseconds (995 ));
1592+ }
1593+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1594+ EXPECT_EQ (sess1->isBlocked (), false );
1595+ sess->setArgs ({" lpop" , key});
1596+ expect = Command::runSessionCmd (sess.get ());
1597+ }
1598+ #ifndef _WIN32
1599+ server->stop ();
1600+ EXPECT_EQ (server.use_count (), 1 );
1601+ #endif
1602+ }
1603+
1604+
13951605void testRenameCommand (std::shared_ptr<ServerEntry> svr) {
13961606 asio::io_context ioContext;
13971607 asio::ip::tcp::socket socket (ioContext), socket1 (ioContext);
0 commit comments