Code Diff
diff --git a/redis.conf b/redis.conf
index 7229d5c27fd..51689b334e8 100644
--- a/redis.conf
+++ b/redis.conf
@@ -2174,6 +2174,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60
#
# client-query-buffer-limit 1gb
+# Defines how many commands in each client pipeline to decode and prefetch
+# lookahead 16
+
# In some scenarios client connections can hog up memory leading to OOM
# errors or data eviction. To avoid this we can cap the accumulated memory
# used by all client connections (all pubsub and normal clients). Once we
diff --git a/src/acl.c b/src/acl.c
index 6bd3f0ee4b2..7caeb28fb8e 100644
--- a/src/acl.c
+++ b/src/acl.c
@@ -421,8 +421,7 @@ user *ACLCreateUser(const char *name, size_t namelen) {
if (raxFind(Users,(unsigned char*)name,namelen,NULL)) return NULL;
user *u = zmalloc(sizeof(*u));
u->name = sdsnewlen(name,namelen);
- u->flags = USER_FLAG_DISABLED;
- u->flags |= USER_FLAG_SANITIZE_PAYLOAD;
+ atomicSet(u->flags, USER_FLAG_DISABLED | USER_FLAG_SANITIZE_PAYLOAD);
u->passwords = listCreate();
u->acl_string = NULL;
listSetMatchMethod(u->passwords,ACLListMatchSds);
@@ -1289,22 +1288,18 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
if (oplen == -1) oplen = strlen(op);
if (oplen == 0) return C_OK; /* Empty string is a no-operation. */
if (!strcasecmp(op,"on")) {
- u->flags |= USER_FLAG_ENABLED;
- u->flags &= ~USER_FLAG_DISABLED;
+ atomicSet(u->flags, (u->flags | USER_FLAG_ENABLED) & ~USER_FLAG_DISABLED);
} else if (!strcasecmp(op,"off")) {
- u->flags |= USER_FLAG_DISABLED;
- u->flags &= ~USER_FLAG_ENABLED;
+ atomicSet(u->flags, (u->flags | USER_FLAG_DISABLED) & ~USER_FLAG_ENABLED);
} else if (!strcasecmp(op,"skip-sanitize-payload")) {
- u->flags |= USER_FLAG_SANITIZE_PAYLOAD_SKIP;
- u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD;
+ atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD_SKIP) & ~USER_FLAG_SANITIZE_PAYLOAD);
} else if (!strcasecmp(op,"sanitize-payload")) {
- u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD_SKIP;
- u->flags |= USER_FLAG_SANITIZE_PAYLOAD;
+ atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD) & ~USER_FLAG_SANITIZE_PAYLOAD_SKIP);
} else if (!strcasecmp(op,"nopass")) {
- u->flags |= USER_FLAG_NOPASS;
+ atomicSet(u->flags, u->flags | USER_FLAG_NOPASS);
listEmpty(u->passwords);
} else if (!strcasecmp(op,"resetpass")) {
- u->flags &= ~USER_FLAG_NOPASS;
+ atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS);
listEmpty(u->passwords);
} else if (op[0] == '>' || op[0] == '#') {
sds newpass;
@@ -1324,7 +1319,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
listAddNodeTail(u->passwords,newpass);
else
sdsfree(newpass);
- u->flags &= ~USER_FLAG_NOPASS;
+ atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS);
} else if (op[0] == '<' || op[0] == '!') {
sds delpass;
if (op[0] == '<') {
@@ -1852,7 +1847,7 @@ int ACLUserCheckChannelPerm(user *u, sds channel, int is_pattern) {
* If the command fails an ACL check, idxptr will be to set to the first argv entry that
* causes the failure, either 0 if the command itself fails or the idx of the key/channel
* that causes the failure */
-int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr) {
+int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr) {
listIter li;
listNode *ln;
@@ -1869,6 +1864,10 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
* calls to prevent duplicate lookups. */
aclKeyResultCache cache;
initACLKeyResultCache(&cache);
+ if (key_result) {
+ cache.keys = *key_result;
+ cache.keys_init = 1;
+ }
/* Check each selector sequentially */
listRewind(u->selectors,&li);
@@ -1876,7 +1875,7 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
aclSelector *s = (aclSelector *) listNodeValue(ln);
int acl_retval = ACLSelectorCheckCmd(s, cmd, argv, argc, &local_idxptr, &cache);
if (acl_retval == ACL_OK) {
- cleanupACLKeyResultCache(&cache);
+ if (!key_result) cleanupACLKeyResultCache(&cache);
return ACL_OK;
}
if (acl_retval > relevant_error ||
@@ -1888,13 +1887,13 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
}
*idxptr = last_idx;
- cleanupACLKeyResultCache(&cache);
+ if (!key_result) cleanupACLKeyResultCache(&cache);
return relevant_error;
}
/* High level API for checking if a client can execute the queued up command */
int ACLCheckAllPerm(client *c, int *idxptr) {
- return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr);
+ return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, getClientCachedKeyResult(c), idxptr);
}
/* If 'new' can access all channels 'original' could then return NULL;
@@ -3144,7 +3143,7 @@ void aclCommand(client *c) {
}
int idx;
- int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, &idx);
+ int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, NULL, &idx);
if (result != ACL_OK) {
sds err = getAclErrorMessage(result, u, cmd, c->argv[idx+3]->ptr, 1);
addReplyBulkSds(c, err);
diff --git a/src/aof.c b/src/aof.c
index 94a28775bf2..90d646bb053 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1641,12 +1641,24 @@ int loadSingleAppendOnlyFile(char *filename) {
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
+ /* queueMultiCommand requires a pendingCommand, so we create a "fake" one here
+ * for it to consume */
+ pendingCommand *pcmd = zmalloc(sizeof(pendingCommand));
+ initPendingCommand(pcmd);
+ addPendingCommand(&fakeClient->pending_cmds, pcmd);
+
+ pcmd->argc = argc;
+ pcmd->argv_len = argc;
+ pcmd->argv = argv;
+ pcmd->cmd = cmd;
+
/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
queueMultiCommand(fakeClient, cmd->flags);
} else {
cmd->proc(fakeClient);
+ fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */
}
/* The fake client should not have a reply */
diff --git a/src/blocked.c b/src/blocked.c
index ee5a365142b..4f518c9a5a4 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -130,8 +130,7 @@ void processUnblockedClients(void) {
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
* which calls reqresAppendResponse) */
- reqresAppendResponse(c);
- resetClient(c);
+ prepareForNextCommand(c, 0);
}
if (c->flags & CLIENT_MODULE) {
diff --git a/src/cluster.c b/src/cluster.c
index 330907b60e8..3fb7af5c030 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -1086,6 +1086,31 @@ void clusterCommand(client *c) {
}
}
+/* Extract slot number from keys in a keys_result structure and return to caller.
+ * Returns INVALID_CLUSTER_SLOT if keys belong to different slots (cross-slot error),
+ * or if there are no keys.
+ */
+int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) {
+ if (keys_result->numkeys == 0)
+ return INVALID_CLUSTER_SLOT;
+
+ if (!server.cluster_enabled)
+ return 0;
+
+ int first_slot = INVALID_CLUSTER_SLOT;
+ for (int j = 0; j < keys_result->numkeys; j++) {
+ robj *this_key = argv[keys_result->keys[j].pos];
+ int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr));
+
+ if (first_slot == INVALID_CLUSTER_SLOT)
+ first_slot = this_slot;
+ else if (first_slot != this_slot) {
+ return INVALID_CLUSTER_SLOT;
+ }
+ }
+ return first_slot;
+}
+
/* Return the pointer to the cluster node that is able to serve the command.
* For the function to succeed the command should only target either:
*
@@ -1118,13 +1143,16 @@ void clusterCommand(client *c) {
*
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
* down but the user attempts to execute a command that addresses one or more keys. */
-clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) {
+clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
+ getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code)
+{
clusterNode *myself = getMyClusterNode();
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
- multiCmd mc;
+ pendingCommand mc;
+ pendingCommand *mcp = &mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */
@@ -1152,11 +1180,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
- _ms.commands = &mc;
+ _ms.commands = &mcp;
_ms.count = 1;
+
+ /* Properly initialize the fake pendingCommand */
+ initPendingCommand(&mc);
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
+ mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT;
+ mc.read_error = read_error;
+ if (keys_result) {
+ mc.keys_result = *keys_result;
+ mc.flags |= PENDING_CMD_KEYS_RESULT_VALID;
+ }
}
/* Check that all the keys are in the same hash slot, and obtain this
@@ -1164,12 +1201,14 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
- int margc, numkeys, j;
+ int margc, j;
keyReference *keyindex;
- mcmd = ms->commands[i].cmd;
- margc = ms->commands[i].argc;
- margv = ms->commands[i].argv;
+ pendingCommand *pcmd = ms->commands[i];
+
+ mcmd = pcmd->cmd;
+ margc = pcmd->argc;
+ margv = pcmd->argv;
/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
if (!pubsubshard_included &&
@@ -1178,14 +1217,29 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
pubsubshard_included = 1;
}
+ /* If we have a cached keys result from preprocessCommand(), use it.
+ * Otherwise, extract keys result. */
+ int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID;
getKeysResult result = GETKEYS_RESULT_INIT;
- numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
+ if (use_cache_keys_result)
+ result = pcmd->keys_result;
+ else
+ getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
- for (j = 0; j < numkeys; j++) {
+ for (j = 0; j < result.numkeys; j++) {
+ /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */
+ if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) {
+ /* Error: multiple keys from different slots. */
+ if (error_code)
+ *error_code = CLUSTER_REDIR_CROSS_SLOT;
+ return NULL;
+ }
+
robj *thiskey = margv[keyindex[j].pos];
- int thisslot = keyHashSlot((char*)thiskey->ptr,
- sdslen(thiskey->ptr));
+ int thisslot = pcmd->slot;
+ if (thisslot == INVALID_CLUSTER_SLOT)
+ thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));
if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
@@ -1199,7 +1253,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
- getKeysFreeResult(&result);
+ if (!use_cache_keys_result) getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
@@ -1222,7 +1276,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* the same key/channel as the first we saw. */
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
- getKeysFreeResult(&result);
+ if (!use_cache_keys_result) getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
@@ -1247,7 +1301,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
else existing_keys++;
}
}
- getKeysFreeResult(&result);
+ if (!use_cache_keys_result) getKeysFreeResult(&result);
}
/* No key at all in command? then we can serve the request
diff --git a/src/cluster.h b/src/cluster.h
index 369c1b91add..830dae87b7c 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -22,6 +22,7 @@
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
+#define INVALID_CLUSTER_SLOT (-1) /* Invalid slot number. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
@@ -158,7 +159,9 @@ int clusterCanAccessKeysInSlot(int slot);
struct slotRangeArray *clusterGetLocalSlotRanges(void);
/* functions with shared implementations */
-clusterNode *getNodeByQuery(client
... [truncated]