Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions src/tendisplus/commands/command_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,216 @@ TEST(Command, testObject) {
#endif
}

void testBrpop(std::shared_ptr<ServerEntry> svr) {
asio::io_context ioContext;
asio::ip::tcp::socket socket(ioContext), socket1(ioContext);
NetSession sess(svr, std::move(socket), 1, false, nullptr, nullptr);
sess.setArgs({"lpush", "list", "a"});
auto expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
sess.setArgs({"brpop", "list", "10"});
expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
}

TEST(Command, brpop) {
const auto guard = MakeGuard([] { destroyEnv(); });

EXPECT_TRUE(setupEnv());

auto cfg = makeServerParam();
auto server = makeServerEntry(cfg);

testBrpop(server);

#ifndef _WIN32
server->stop();
EXPECT_EQ(server.use_count(), 1);
#endif
}

void testBlockCommand(std::shared_ptr<ServerEntry> svr) {
asio::io_context ioContext;
asio::ip::tcp::socket socket(ioContext), socket1(ioContext),
socket2(ioContext);
auto sess = std::make_shared<NetSession>(
svr, std::move(socket), 1, false, nullptr, nullptr);
auto sess1 = std::make_shared<NetSession>(
svr, std::move(socket1), 2, false, nullptr, nullptr);
auto sess2 = std::make_shared<NetSession>(
svr, std::move(socket2), 3, false, nullptr, nullptr);
svr->addSession(sess);
svr->addSession(sess1);
svr->addSession(sess2);
{
sess->setArgs({"blpop", "list1", "1"});
auto expect = Command::runSessionCmd(sess.get());
EXPECT_EQ(sess->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
std::this_thread::sleep_for(std::chrono::milliseconds(400));
EXPECT_EQ(sess->isBlocked(), true);
std::this_thread::sleep_for(std::chrono::milliseconds(400));
EXPECT_EQ(sess->isBlocked(), true);
std::this_thread::sleep_for(std::chrono::milliseconds(400));
EXPECT_EQ(sess->isBlocked(), false);
}
{
sess->setArgs({"blpop", "list1", "0"});
auto expect = Command::runSessionCmd(sess.get());
EXPECT_EQ(sess->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
sess1->setArgs({"rpush", "list1", "a"});
expect = Command::runSessionCmd(sess1.get());
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(sess->isBlocked(), false);
}
{
sess2->setArgs({"brpop", "list1", "0"});
auto expect = Command::runSessionCmd(sess2.get());
EXPECT_EQ(sess2->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
sess->setArgs({"brpop", "list1", "0"});
expect = Command::runSessionCmd(sess.get());
EXPECT_EQ(sess->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);

svr->endSession(sess2->id());
sess1->setArgs({"lpush", "list1", "a"});
expect = Command::runSessionCmd(sess1.get());
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(sess->isBlocked(), false);
}
}

TEST(Command, testBlockCommand) {
const auto guard = MakeGuard([] { destroyEnv(); });

EXPECT_TRUE(setupEnv());

auto cfg = makeServerParam();
auto server = makeServerEntry(cfg);
testBlockCommand(server);

#ifndef _WIN32
server->stop();
EXPECT_EQ(server.use_count(), 1);
#endif
}

TEST(Command, BlockCommand) {
const auto guard = MakeGuard([] { destroyEnv(); });

EXPECT_TRUE(setupEnv());

auto cfg = makeServerParam();
auto server = makeServerEntry(cfg);
asio::io_context ioContext;
asio::ip::tcp::socket socket(ioContext), socket1(ioContext),
socket2(ioContext);
auto sess = std::make_shared<NetSession>(
server, std::move(socket), 1, false, nullptr, nullptr);
auto sess1 = std::make_shared<NetSession>(
server, std::move(socket1), 2, false, nullptr, nullptr);
auto sess2 = std::make_shared<NetSession>(
server, std::move(socket2), 3, false, nullptr, nullptr);
server->addSession(sess);
server->addSession(sess1);
server->addSession(sess2);
// test blpop case 1: block command nerver block permanently due to lost
// wakeup
for (int i = 0; i < 20; i++) {
sess1->setArgs({"brpop", "list1", "list2", "0"});
auto expect = Command::runSessionCmd(sess1.get());
EXPECT_EQ(sess1->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
sess2->setArgs({"brpop", "list1", "0"});
expect = Command::runSessionCmd(sess2.get());
EXPECT_EQ(sess2->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
sess->setArgs({"lpush", "list2", "b"});
expect = Command::runSessionCmd(sess.get());
sess->setArgs({"lpush", "list1", "a"});
expect = Command::runSessionCmd(sess.get());
std::this_thread::sleep_for(std::chrono::milliseconds(5));
EXPECT_EQ(sess1->isBlocked(), false);
EXPECT_EQ(sess2->isBlocked(), false);
}
// test blpop case 2: block command nerver block session on the same key twice
for (int i = 0; i < 20; i++) {
sess1->setArgs({"brpop", "list1", "list1", "0"});
auto expect = Command::runSessionCmd(sess1.get());
EXPECT_EQ(sess1->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
sess2->setArgs({"brpop", "list1", "0"});
expect = Command::runSessionCmd(sess2.get());
EXPECT_EQ(sess2->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
sess->setArgs({"lpush", "list1", "a", "b"});
expect = Command::runSessionCmd(sess.get());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(sess1->isBlocked(), false);
EXPECT_EQ(sess2->isBlocked(), false);
}
// test blpop case 3: block command will be waked up by any key which is in
// the key list
for (int i = 0; i < 20; i++) {
sess1->setArgs({"brpop", "list1", "list2", "0"});
auto expect = Command::runSessionCmd(sess1.get());
EXPECT_EQ(sess1->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
auto key = genRand() % 2 ? "list1" : "list2";
sess->setArgs({"lpush", key, "a"});
expect = Command::runSessionCmd(sess.get());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(sess1->isBlocked(), false);
}
// test blpop case 4: block command will not be waked up by any key when key
// is was poped
for (int i = 0; i < 20; i++) {
sess1->setArgs({"brpop", "list1", "list2", "0"});
auto expect = Command::runSessionCmd(sess1.get());
EXPECT_EQ(sess1->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
auto key = genRand() % 2 ? "list1" : "list2";
{
std::lock_guard<std::mutex> lock(sess1->_mtx);
sess->setArgs({"lpush", key, "a"});
expect = Command::runSessionCmd(sess.get());
sess->setArgs({"lpop", key});
expect = Command::runSessionCmd(sess.get());
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(sess1->isBlocked(), true);
sess->setArgs({"lpush", key, "a"});
expect = Command::runSessionCmd(sess.get());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(sess1->isBlocked(), false);
}
// test blpop case 5: block command will be timeout or wakeup
for (int i = 0; i < 20; i++) {
sess1->setArgs({"brpop", "list1", "list2", "1"});
auto expect = Command::runSessionCmd(sess1.get());
EXPECT_EQ(sess1->isBlocked(), true);
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
auto key = genRand() % 2 ? "list1" : "list2";
{
std::lock_guard<std::mutex> lock(sess1->_mtx);
sess->setArgs({"lpush", key, "a"});
expect = Command::runSessionCmd(sess.get());
std::this_thread::sleep_for(std::chrono::milliseconds(995));
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(sess1->isBlocked(), false);
sess->setArgs({"lpop", key});
expect = Command::runSessionCmd(sess.get());
}
#ifndef _WIN32
server->stop();
EXPECT_EQ(server.use_count(), 1);
#endif
}


void testRenameCommand(std::shared_ptr<ServerEntry> svr) {
asio::io_context ioContext;
asio::ip::tcp::socket socket(ioContext), socket1(ioContext);
Expand Down
126 changes: 126 additions & 0 deletions src/tendisplus/commands/list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,131 @@ class RPopCommand : public ListPopWrapper {
RPopCommand() : ListPopWrapper(ListPos::LP_TAIL, "wF") {}
} rpopCommand;

class BlockPopWrapper : public Command {
public:
explicit BlockPopWrapper(ListPos pos, const char* sflags)
: Command(pos == ListPos::LP_HEAD ? "blpop" : "brpop", sflags), _pos(pos) {}

ssize_t arity() const {
return -2;
}

int32_t firstkey() const {
return 1;
}

int32_t lastkey() const {
return 1;
}

int32_t keystep() const {
return 1;
}

Expected<std::string> tryPop(Session* sess, const std::string& key) {
SessionCtx* pCtx = sess->getCtx();
INVARIANT(pCtx != nullptr);

auto server = sess->getServerEntry();
auto expdb = server->getSegmentMgr()->getDbWithKeyLock(
sess, key, mgl::LockMode::LOCK_X);
if (!expdb.ok()) {
return expdb.status();
}
Expected<RecordValue> rv =
Command::expireKeyIfNeeded(sess, key, RecordType::RT_LIST_META);
if (!rv.ok()) {
return rv.status();
}

// record exists
RecordKey metaRk(expdb.value().chunkId,
pCtx->getDbId(),
RecordType::RT_LIST_META,
key,
"");
PStore kvstore = expdb.value().store;

auto ptxn = sess->getCtx()->createTransaction(kvstore);
if (!ptxn.ok()) {
return ptxn.status();
}
Expected<std::string> s1 =
genericPop(sess, kvstore, ptxn.value(), metaRk, rv, _pos);
if (!s1.ok()) {
return s1.status();
}
auto s = sess->getCtx()->commitTransaction(ptxn.value());
if (s.ok()) {
std::stringstream ss;
Command::fmtMultiBulkLen(ss, 2);
Command::fmtBulk(ss, key);
Command::fmtBulk(ss, s1.value());
return ss.str();
}
return s.status();
}

Expected<std::string> tryPop(Session* sess,
const std::vector<std::string>& keys) {
for (const auto& key : keys) {
auto status = tryPop(sess, key);
if (status.ok()) {
return status;
}
}
return {ErrorCodes::ERR_NOTFOUND, "not found"};
}

Expected<std::string> run(Session* sess) final {
const std::vector<std::string>& args = sess->getArgs();
std::vector<std::string> keys;
for (size_t i = 1; i + 1 < args.size(); i++) {
keys.push_back(args[i]);
}
Expected<double> timeout = tendisplus::stod(args.back());
if (!timeout.ok()) {
return {timeout.status().code(), "timeout is not a valid float"};
}
auto status = tryPop(sess, keys);
if (status.ok()) {
return status;
}
startBlocking(sess, keys, timeout.value());
return {ErrorCodes::ERR_BLOCKCMD, "block command"};
}

void startBlocking(Session* sess,
const std::vector<std::string>& keys,
double timeout) {
auto nSess = dynamic_cast<NetSession*>(sess);
if (!nSess) {
return;
}
auto executor = [this, nSess](const std::string& key) {
return tryPop(nSess, key);
};

auto duration_sec = std::chrono::duration<double>(timeout);
auto microsec =
std::chrono::duration_cast<std::chrono::microseconds>(duration_sec);
nSess->pauseSession(std::move(executor), keys, microsec);
}

private:
ListPos _pos;
};

class BLPopCommand : public BlockPopWrapper {
public:
BLPopCommand() : BlockPopWrapper(ListPos::LP_HEAD, "wF") {}
} blpopCommand;

class BRPopCommand : public BlockPopWrapper {
public:
BRPopCommand() : BlockPopWrapper(ListPos::LP_TAIL, "wF") {}
} brpopCommand;

class ListPushWrapper : public Command {
public:
explicit ListPushWrapper(const std::string& name,
Expand Down Expand Up @@ -368,6 +493,7 @@ class ListPushWrapper : public Command {
}
auto s = sess->getCtx()->commitTransaction(ptxn.value());
if (s.ok()) {
server->notifyKeyAvailable(key, valargs.size());
return s1.value();
} else if (s.status().code() != ErrorCodes::ERR_COMMIT_RETRY) {
return s.status();
Expand Down
Loading