Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/commandlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,147 @@ void commandlogPushCurrentCommand(client *c, struct serverCommand *cmd) {
commandlogPushEntryIfNeeded(c, argv, argc, net_output_bytes_curr_cmd, COMMANDLOG_TYPE_LARGE_REPLY);
}

// bigKeyInfo [string | set | hash | list | zset | xstream | all]
void bigkeyInfoCommand(client *c) {
bool isDisplayAll = 0;
bool isDisplayString = 0;
bool isDisplaySet = 0;
bool isDisplayHash = 0;
bool isDisplayList = 0;
bool isDisplayZset = 0;
long totalLine = 0;

long count_string = listLength(server.string_bigkey_info);
long count_list = listLength(server.list_bigkey_info);
long count_hash = listLength(server.hash_bigkey_info);
long count_set = listLength(server.set_bigkey_info);
long count_zset = listLength(server.zset_bigkey_info);

if (c->argc == 1) {
isDisplayAll = 1;
} else {
int j = 1;
while (j < c->argc) {
char *opt = c->argv[j]->ptr;
if (!strcasecmp(opt, "string")) {
isDisplayString = 1;
} else if (!strcasecmp(opt, "set")) {
isDisplaySet = 1;
} else if (!strcasecmp(opt, "hash")) {
isDisplayHash = 1;
} else if (!strcasecmp(opt, "list")) {
isDisplayList = 1;
} else if (!strcasecmp(opt, "zset")) {
isDisplayZset = 1;
} else if (!strcasecmp(opt, "all")) {
isDisplayAll = 1;
break;
} else {
addReplyErrorFormat(c, "Unsupported option %s", opt);
return;
}
j++;
}
}

if (server.big_key_output < count_string) {
count_string = server.big_key_output;
}
if (server.big_key_output < count_list) {
count_list = server.big_key_output;
}
if (server.big_key_output < count_hash) {
count_hash = server.big_key_output;
}
if (server.big_key_output < count_set) {
count_set = server.big_key_output;
}
if (server.big_key_output < count_zset) {
count_zset = server.big_key_output;
}

if (isDisplayAll | isDisplayString) {
totalLine += count_string;
}
if (isDisplayAll | isDisplaySet) {
totalLine += count_set;
}
if (isDisplayAll | isDisplayHash) {
totalLine += count_hash;
}
if (isDisplayAll | isDisplayList) {
totalLine += count_list;
}
if (isDisplayAll | isDisplayZset) {
totalLine += count_zset;
}

addReplyArrayLen(c, totalLine);

listIter li;
listNode *ln;

if (isDisplayAll | isDisplayString) {
listRewindTail(server.string_bigkey_info, &li);
while (count_string--) {
ln = listNext(&li);
bigkeyEntry *bke = ln->value;
addReplyArrayLen(c, 3);
addReplyLongLong(c, bke->value);
addReplyBulkCBuffer(c, bke->key->ptr, sdslen(bke->key->ptr));
addReplyBulkCString(c, "string");
}
}

if (isDisplayAll | isDisplayList) {
listRewindTail(server.list_bigkey_info, &li);
while (count_list--) {
ln = listNext(&li);
bigkeyEntry *bke = ln->value;
addReplyArrayLen(c, 3);
addReplyLongLong(c, bke->value);
addReplyBulkCBuffer(c, bke->key->ptr, sdslen(bke->key->ptr));
addReplyBulkCString(c, "list");
}
}

if (isDisplayAll | isDisplayHash) {
listRewindTail(server.hash_bigkey_info, &li);
while (count_hash--) {
ln = listNext(&li);
bigkeyEntry *bke = ln->value;
addReplyArrayLen(c, 3);
addReplyLongLong(c, bke->value);
addReplyBulkCBuffer(c, bke->key->ptr, sdslen(bke->key->ptr));
addReplyBulkCString(c, "hash");
}
}

if (isDisplayAll | isDisplaySet) {
listRewindTail(server.set_bigkey_info, &li);
while (count_set--) {
ln = listNext(&li);
bigkeyEntry *bke = ln->value;
addReplyArrayLen(c, 3);
addReplyLongLong(c, bke->value);
addReplyBulkCBuffer(c, bke->key->ptr, sdslen(bke->key->ptr));
addReplyBulkCString(c, "set");
}
}

if (isDisplayAll | isDisplayZset) {
listRewindTail(server.list_bigkey_info, &li);
while (count_zset--) {
ln = listNext(&li);
bigkeyEntry *bke = ln->value;
addReplyArrayLen(c, 3);
addReplyLongLong(c, bke->value);
addReplyBulkCBuffer(c, bke->key->ptr, sdslen(bke->key->ptr));
addReplyBulkCString(c, "Sorted Set");
}
}
}

/* The SLOWLOG command. Implements all the subcommands needed to handle the
* slow log. */
void slowlogCommand(client *c) {
Expand Down
26 changes: 26 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,31 @@ struct COMMAND_ARG SELECT_Args[] = {
{MAKE_ARG("index",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/********** BIGKEYINFO ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* BIGKEYINFO history */
#define BIGKEYINFO_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* BIGKEYINFO tips */
const char *BIGKEYINFO_Tips[] = {
"request_policy:all_nodes",
"nondeterministic_output",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* BIGKEYINFO key specs */
#define BIGKEYINFO_Keyspecs NULL
#endif

/* BIGKEYINFO argument table */
struct COMMAND_ARG BIGKEYINFO_Args[] = {
{MAKE_ARG("count",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
};

/********** COPY ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -11752,6 +11777,7 @@ struct COMMAND_STRUCT serverCommandTable[] = {
{MAKE_CMD("reset","Resets the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,RESET_History,0,RESET_Tips,0,resetCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION,RESET_Keyspecs,0,NULL,0)},
{MAKE_CMD("select","Changes the selected database.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,SELECT_History,0,SELECT_Tips,0,selectCommand,2,CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_CONNECTION,SELECT_Keyspecs,0,NULL,1),.args=SELECT_Args},
/* generic */
{MAKE_CMD("bigkeyinfo","Returns the big key's entries.","O(N) where N is the number of entries returned","8.0.0",CMD_DOC_NONE,NULL,NULL,"generic",COMMAND_GROUP_GENERIC,BIGKEYINFO_History,0,BIGKEYINFO_Tips,2,bigkeyInfoCommand,-1,CMD_READONLY|CMD_FAST,0,BIGKEYINFO_Keyspecs,0,NULL,1),.args=BIGKEYINFO_Args},
{MAKE_CMD("copy","Copies the value of a key to a new key.","O(N) worst case for collections, where N is the number of nested items. O(1) for string values.","6.2.0",CMD_DOC_NONE,NULL,NULL,"generic",COMMAND_GROUP_GENERIC,COPY_History,0,COPY_Tips,0,copyCommand,-3,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_KEYSPACE,COPY_Keyspecs,2,NULL,4),.args=COPY_Args},
{MAKE_CMD("del","Deletes one or more keys.","O(N) where N is the number of keys that will be removed. When a key to remove holds a value other than a string, the individual complexity for this key is O(M) where M is the number of elements in the list, set, sorted set or hash. Removing a single key that holds a string value is O(1).","1.0.0",CMD_DOC_NONE,NULL,NULL,"generic",COMMAND_GROUP_GENERIC,DEL_History,0,DEL_Tips,2,delCommand,-2,CMD_WRITE,ACL_CATEGORY_KEYSPACE,DEL_Keyspecs,1,NULL,1),.args=DEL_Args},
{MAKE_CMD("dump","Returns a serialized representation of the value stored at a key.","O(1) to access the key and additional O(N*M) to serialize it, where N is the number of objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1).","2.6.0",CMD_DOC_NONE,NULL,NULL,"generic",COMMAND_GROUP_GENERIC,DUMP_History,0,DUMP_Tips,1,dumpCommand,2,CMD_READONLY,ACL_CATEGORY_KEYSPACE,DUMP_Keyspecs,1,NULL,1),.args=DUMP_Args},
Expand Down
26 changes: 26 additions & 0 deletions src/commands/bigKeyList.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"BIGKEYINFO": {
"summary": "Returns the big key's entries.",
"complexity": "O(N) where N is the number of entries returned",
"group": "generic",
"since": "8.0.0",
"arity": -1,
"function": "bigkeyInfoCommand",
"command_flags": [
"READONLY",
"FAST"
],
"command_tips": [
"REQUEST_POLICY:ALL_NODES",
"NONDETERMINISTIC_OUTPUT"
],
"reply_schema": {},
"arguments": [
{
"name": "count",
"type": "integer",
"optional": true
}
]
}
}
3 changes: 3 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3360,6 +3360,9 @@ standardConfig static_configs[] = {
createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort),
createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("string-memory-use", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.string_memory_use, 10240, INTEGER_CONFIG, NULL, NULL),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string_memory_use does not sound like the thing we are tracking right? we are tracking for cardinality, so maybe "big-key-object-size-min"? or "big-key-object-card-min"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should delete it, it exists in old design.

createIntConfig("big-key-number-element", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.big_key_number_element, 2000, INTEGER_CONFIG, NULL, NULL),
createIntConfig("big-key-output", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.big_key_output, 100, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
136 changes: 136 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,38 @@ void trackInstantaneousMetric(int metric, long long current_value, long long cur
server.inst_metric[metric].last_sample_value = current_value;
}

/*
void displayUpdate(int pre_value, int current_value) {
serverLog(LL_WARNING, "This is for testing, previous item number is %d, and current item number is %d", pre_value,
current_value);
}

void displayDataTypeArray(keysizeInfo *keysize_array, int length) {
serverLog(LL_WARNING, "Current array length is %d", length);
for (int i = 0; i < length; i++) {
serverLog(LL_WARNING, "Item %lld and value is %lld", keysize_array[i].element_size, keysize_array[i].num);
}
}
*/

void updateKeySizeArray(serverDb *db, robj *dstkey) {
robj *t_obj = lookupKeyWrite(db, dstkey);
if (t_obj) {
if (t_obj->type == OBJ_STRING) {
} else if (t_obj->type == OBJ_LIST) {
updateBigKeyList(t_obj, listTypeLength(t_obj), 0, LIST_TYPE);
} else if (t_obj->type == OBJ_SET) {
updateBigKeyList(t_obj, setTypeSize(t_obj), 0, SET_TYPE);
} else if (t_obj->type == OBJ_ZSET) {
updateBigKeyList(t_obj, zsetLength(t_obj), 0, SORTED_SET_TYPE);
} else if (t_obj->type == OBJ_HASH) {
updateBigKeyList(t_obj, hashTypeLength(t_obj), 0, HASH_TYPE);
} else if (t_obj->type == OBJ_STREAM) {
}
}
}


/* Return the mean of all the samples. */
long long getInstantaneousMetric(int metric) {
int j;
Expand Down Expand Up @@ -2825,6 +2857,109 @@ serverDb *createDatabaseIfNeeded(int id) {
return server.db[id];
}

void removeNodeFromList(dataType previous_type, robj *keyobj) {
list *process_list = NULL;
if (previous_type == STRING_TYPE) {
process_list = server.string_bigkey_info;
} else if (previous_type == LIST_TYPE) {
process_list = server.list_bigkey_info;
} else if (previous_type == HASH_TYPE) {
process_list = server.hash_bigkey_info;
} else if (previous_type == SET_TYPE) {
process_list = server.set_bigkey_info;
} else if (previous_type == SORTED_SET_TYPE) {
process_list = server.zset_bigkey_info;
} else {
// TO DO later
}

listIter li;
listNode *ln;
listRewind(process_list, &li);
while ((ln = listNext(&li)) != NULL) {
bigkeyEntry *node = ln->value;
if (sdscmp(node->key->ptr, keyobj->ptr) != 0) continue;
decrRefCount(node->key);
listDelNode(process_list, ln);
zfree(node);
}
Comment on lines +2879 to +2885
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems somewhat inefficient. Maybe we should try and include a b+ tree structure that will maintain the order and the ability to add/remove the object.
Another thing is the rename command. is it possible we might be renaming a key and it's metadata is not updated?

}

void addNodeToList(dataType new_type, robj *keyobj, long curr) {
list *process_list = NULL;
if (new_type == STRING_TYPE) {
process_list = server.string_bigkey_info;
} else if (new_type == LIST_TYPE) {
process_list = server.list_bigkey_info;
} else if (new_type == HASH_TYPE) {
process_list = server.hash_bigkey_info;
} else if (new_type == SET_TYPE) {
process_list = server.set_bigkey_info;
} else if (new_type == SORTED_SET_TYPE) {
process_list = server.zset_bigkey_info;
} else {
// TO DO later
}

bigkeyEntry *bke = zmalloc(sizeof(bigkeyEntry));
incrRefCount(keyobj);

bke->value = curr;
bke->key = keyobj;

if (listLength(process_list) == 0) {
listAddNodeHead(process_list, bke);
} else {
listIter li;
listNode *ln;
listRewind(process_list, &li);
int isInsert = 0;
while ((ln = listNext(&li)) != NULL) {
bigkeyEntry *node = ln->value;
if (curr > node->value) continue;
listInsertNode(process_list, ln, bke, 0);
isInsert = 1;
break;
}
if (!isInsert) listAddNodeTail(process_list, bke);
}
}


void updateBigKeyList(robj *keyobj, long previous, long curr, dataType type) {
if (type == STRING_TYPE) {
if (previous < server.string_memory_use && curr < server.string_memory_use) {
return;
} else if (previous >= server.string_memory_use && curr < server.string_memory_use) {
removeNodeFromList(type, keyobj);
} else if (previous < server.string_memory_use && curr >= server.string_memory_use) {
addNodeToList(type, keyobj, curr);
} else if (previous >= server.string_memory_use && curr >= server.string_memory_use) {
removeNodeFromList(type, keyobj);
addNodeToList(type, keyobj, curr);
}
} else {
if (previous < server.big_key_number_element && curr < server.big_key_number_element) {
return;
} else if (previous >= server.big_key_number_element && curr < server.big_key_number_element) {
removeNodeFromList(type, keyobj);
} else if (previous < server.big_key_number_element && curr >= server.big_key_number_element) {
addNodeToList(type, keyobj, curr);
} else if (previous >= server.big_key_number_element && curr >= server.big_key_number_element) {
removeNodeFromList(type, keyobj);
addNodeToList(type, keyobj, curr);
}
}
}

void bigkeyListInit(void) {
server.string_bigkey_info = listCreate();
server.list_bigkey_info = listCreate();
server.hash_bigkey_info = listCreate();
server.set_bigkey_info = listCreate();
server.zset_bigkey_info = listCreate();
}

void initServer(void) {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
Expand Down Expand Up @@ -3032,6 +3167,7 @@ void initServer(void) {
evalInit();

commandlogInit();
bigkeyListInit();
latencyMonitorInit();
initSharedQueryBuf();

Expand Down
Loading
Loading