Denial of Service (Resource Exhaustion) via unbounded/lookahead command prefetch

MEDIUM
redis/redis
Commit: 235e688b010b
Affected: 8.6.2 and earlier (pre-fix)
2026-04-04 11:57 UTC

Description

This commit appears to introduce a configurable lookahead pipeline for parsing and prefetching commands, along with a security constraint that limits lookahead depth for unauthenticated clients to 1. The intent is to mitigate a potential Denial of Service (DoS) vulnerability where unbounded or large prefetch/lookahead could exhaust server memory when handling many concurrent or pipelined clients. The changes include adding a new lookahead configuration (default 16) and reducing lookahead for unauthenticated clients, effectively constraining memory usage under attack scenarios. While the changes are architectural and performance-oriented, the explicit security note and safeguards indicate a vulnerability mitigation rather than a simple dependency or cleanup.

Proof of Concept

PoC steps to demonstrate potential DoS prior to the fix (unauthenticated clients abusing command lookahead): Prerequisites: - Redis server running with the patched lookahead logic (default lookahead depth of 16, unauthenticated lookahead limited to 1). - Client connections without AUTH (unauthenticated). - A hostile client count and rapid command pipelining to maximize in-flight lookahead commands. Attack vector (before the fix): - An attacker can open many unauthenticated connections and flood the server with a pipeline of multiple commands (e.g., 16 PINGs per connection) without waiting for responses. The server would parse and prefetch multiple commands per client, increasing memory usage proportionally to the number of concurrent unauthenticated clients and the configured lookahead depth. With a large lookahead and many clients, memory could quickly be exhausted, leading to a DoS. Proof-of-concept (Python raw-socket example): import socket, time HOST = '127.0.0.1' PORT = 6379 NUM_CLIENTS = 200 # number of concurrent unauthenticated clients PIPE_LEN = 16 # lookahead commands per client (before fix, could exhaust memory) def make_pipeline_socket(): s = socket.create_connection((HOST, PORT)) # Send PIPE_LEN single-command requests back-to-back without waiting for responses for _ in range(PIPE_LEN): s.sendall(b"*1\r\n$4\r\nPING\r\n") return s clients = [make_pipeline_socket() for _ in range(NUM_CLIENTS)] print(f"Launched {NUM_CLIENTS} unauthenticated clients, each pipeline length={PIPE_LEN}.") # Do not read responses to keep commands pending; wait a bit to observe memory growth time.sleep(10) print("Done. Observe server memory usage (INFO memory) and watch for instability.") # Note: In a real test, you would capture redis INFO memory before/after to quantify impact.

Commit Details

Author: debing.sun

Date: 2025-10-22 16:16 UTC

Message:

RED-135816: Lookahead pre-fetching (#14440) ## Problem and Motivation Currently, the client only parses one command, then executes it, then parses new commands until the querybuf is consumed. Doing it this way means we cannot perform memory prefetch when IO threads are not enabled, and when IO threads are enabled, we can only parse the first command in the IO thread, while the remaining command parsing still needs to be done in the main thread. This describes a limitation in the current Redis command processing pipeline where: Without IO threads: Commands are parsed and executed one by one sequentially, preventing memory prefetching optimizations With IO threads: Only the first command gets parsed in the IO thread, but subsequent commands from the same client's query buffer must still be parsed in the main thread ## Solution Overview **Core Innovation**: Parse multiple user commands in advance through a lookahead pipeline. **Key Insight**: Since Redis already parses commands to extract keys, we can do this parsing earlier and memory prefetch operations before the command reaches execution, allowing multiple I/O operations to run in parallel. The bulk of the PR is a redesign of the command processing flow for both standalone commands and transactional commands. ### High Level Command Processing Flow #### Before This PR (processInputBuffer()) - While there is data in the client's query buffer: - Read the data and try to parse a complete command (processInlineBuffer() or processMultibulkBuffer()). - If the command is incomplete, exit and wait for more data. - The Command is complete. Process and potentially execute it (processCommandAndResetClient(), processCommand()): - Prepare for the next command (commandProcessed()). ### Major Changes in the Client's Structure To support the new command processing flow: - **New pendingCommand structure**: Since the previous flow processed commands one at a time, it used the client structure to hold the current (and only) parsed command arguments (argv/argc) and other metadata. In the new design, multiple commands are processed, waiting for execution. So, a new pendingCommand structure is introduced to hold a parsed command's arguments and its metadata. - **New pendingCommandList structure (pending_cmds)** that contains all the pending commands with maintained order and includes a ready_len counter that tracks the number of fully parsed commands ready for execution. All commands are fully parsed except possibly the last one (client's command order is maintained). - **New pendingCommandPool structure (cmd_pool)** that manages a shared pool for reusing pendingCommand objects to reduce memory allocation overhead. There is a configurable lookahead limit (server.lookahead) that controls how many fully parsed commands (ready pending commands) to process ahead of time. #### New High Level Flow for Standalone Commands (processInputBuffer()) - While there is data in the client's query buffer or there are ready pending commands: - While there is data in the client's query buffer and we haven't reached the lookahead limit: - Read the data and try to parse a complete command (processInlineBuffer() or processMultibulkBuffer()). Allocate a new pending command if needed, store the command's metadata in the pending command, and add the pending command to the client's pending commands list. - If the command is incomplete, exit and wait for more data. - The command is complete, we have a new ready pending command, preprocess it (preprocessCommand()): - Extract the keys of the command and store the results in the pending command (extractKeysAndSlot()). - If there are pending commands, continue executing them until the queue is empty. ## Transaction Support ### Major Changes in Structures - The multiState structure now contains an array of pendingCommand pointers instead of multiCmd pointers. - The multiCmd structure was deleted (no longer needed). ### New Transaction Support - queueMultiCommand(): - The pending commands are moved from the client's pending_cmds list to the multiState's commands array. ## Detailed Changes ### Additional Client Structure Changes - Replaced argv_len_sum with all_argv_len_sum to reflect the total memory consumed by all pending commands. ### Clients and Pending Commands Management - Clients using pending commands now manage the command arguments via the pendingCommand. Specifically, the memory occupied by argv. - **Pending commands management functions**: - `initPendingCommand()` initializes a newly allocated pending command. - `freeClientPendingCommand()` frees a pending command of a client and its associated resources. - `freeClientPendingCommands()` receives the number of pending commands to free and calls freeClientPendingCommand() to free them. ### Buffer Processing Changes - `processInlineBuffer()`, once a full command is read, used to populate the client's command fields (argc, argv, etc.). Now it creates and populates a pendingCommand, and adds it to the client's pending_cmds list. - `processMultibulkBuffer()`: Similar changes to processInlineBuffer(). The difference is that a pending command may already exist from a previous call to the function, so parsing will continue populating it instead of creating a new one. - `resetClientInternal()` used to receive a free_argv parameter and pass it to freeClientArgvInternal(), which freed the client's argv if set, and also reset client's command fields. It now receives the number of pending commands to free and handles two cases: - The client uses pending commands so they are freed by calling freeClientPendingCommands(). - The client doesn't use pending commands (e.g., LUA client) so the client's argv is freed by calling freeClientArgvInternal(). It then frees the client's command fields that freeClientArgvInternal() doesn't free now. ### Other Changes - Simulate lookahead command preprocessing when loading an AOF and queuing transaction commands; This is necessary since queueMultiCommand() now requires a pending command. - The INVALID_CLUSTER_SLOT constant was defined to indicate an invalid cluster slot. It is used to signal a cross-slot error in preprocessCommand(). - getNodeByQuery() no longer performs cross-slot checks, relying instead on the checks already performed in preprocessCommand(). It also no longer calls getKeysFromCommand() as this was also done in preprocessCommand(). ### Debugging - Added "debug lookahead" command to print the size of the lookahead pipeline for each client. ## New Configuration - **lookahead**: Runtime-configurable lookahead depth (default: 16) ## Security - **Limit lookahead for unauthenticated clients to 1**. This is both to reduce memory overhead, and to prevent errors; AUTH can affect the handling of succeeding commands. --------- Co-authored-by: Slava Koyfman <slava.koyfman@redis.com> Co-authored-by: Oran Agra <oran@redis.com> Co-authored-by: Udi Ron <udi.ron@redis.com> Co-authored-by: moticless <moticless@github.com> Co-authored-by: Yuan Wang <yuan.wang@redis.com>

Triage Assessment

Vulnerability Type: Denial of Service (Memory/Resource Exhaustion)

Confidence: MEDIUM

Reasoning:

Commit introduces a configurable lookahead pipeline for command processing and explicitly documents a security-oriented constraint: limiting lookahead for unauthenticated clients to 1 to reduce memory overhead and potential error conditions. This addresses resource exhaustion/DoS scenarios by constraining how many commands can be pre-parsed. While most changes are architectural and performance-oriented, the security note indicates a mitigation for a class of DoS vulnerabilities related to unbounded lookahead.

Verification Assessment

Vulnerability Type: Denial of Service (Resource Exhaustion) via unbounded/lookahead command prefetch

Confidence: MEDIUM

Affected Versions: 8.6.2 and earlier (pre-fix)

Code Diff

diff --git a/redis.conf b/redis.conf index 7229d5c27fd..51689b334e8 100644 --- a/redis.conf +++ b/redis.conf @@ -2174,6 +2174,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# Defines how many commands in each client pipeline to decode and prefetch +# lookahead 16 + # In some scenarios client connections can hog up memory leading to OOM # errors or data eviction. To avoid this we can cap the accumulated memory # used by all client connections (all pubsub and normal clients). Once we diff --git a/src/acl.c b/src/acl.c index 6bd3f0ee4b2..7caeb28fb8e 100644 --- a/src/acl.c +++ b/src/acl.c @@ -421,8 +421,7 @@ user *ACLCreateUser(const char *name, size_t namelen) { if (raxFind(Users,(unsigned char*)name,namelen,NULL)) return NULL; user *u = zmalloc(sizeof(*u)); u->name = sdsnewlen(name,namelen); - u->flags = USER_FLAG_DISABLED; - u->flags |= USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, USER_FLAG_DISABLED | USER_FLAG_SANITIZE_PAYLOAD); u->passwords = listCreate(); u->acl_string = NULL; listSetMatchMethod(u->passwords,ACLListMatchSds); @@ -1289,22 +1288,18 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { if (oplen == -1) oplen = strlen(op); if (oplen == 0) return C_OK; /* Empty string is a no-operation. */ if (!strcasecmp(op,"on")) { - u->flags |= USER_FLAG_ENABLED; - u->flags &= ~USER_FLAG_DISABLED; + atomicSet(u->flags, (u->flags | USER_FLAG_ENABLED) & ~USER_FLAG_DISABLED); } else if (!strcasecmp(op,"off")) { - u->flags |= USER_FLAG_DISABLED; - u->flags &= ~USER_FLAG_ENABLED; + atomicSet(u->flags, (u->flags | USER_FLAG_DISABLED) & ~USER_FLAG_ENABLED); } else if (!strcasecmp(op,"skip-sanitize-payload")) { - u->flags |= USER_FLAG_SANITIZE_PAYLOAD_SKIP; - u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD_SKIP) & ~USER_FLAG_SANITIZE_PAYLOAD); } else if (!strcasecmp(op,"sanitize-payload")) { - u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD_SKIP; - u->flags |= USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD) & ~USER_FLAG_SANITIZE_PAYLOAD_SKIP); } else if (!strcasecmp(op,"nopass")) { - u->flags |= USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags | USER_FLAG_NOPASS); listEmpty(u->passwords); } else if (!strcasecmp(op,"resetpass")) { - u->flags &= ~USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS); listEmpty(u->passwords); } else if (op[0] == '>' || op[0] == '#') { sds newpass; @@ -1324,7 +1319,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { listAddNodeTail(u->passwords,newpass); else sdsfree(newpass); - u->flags &= ~USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS); } else if (op[0] == '<' || op[0] == '!') { sds delpass; if (op[0] == '<') { @@ -1852,7 +1847,7 @@ int ACLUserCheckChannelPerm(user *u, sds channel, int is_pattern) { * If the command fails an ACL check, idxptr will be to set to the first argv entry that * causes the failure, either 0 if the command itself fails or the idx of the key/channel * that causes the failure */ -int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr) { +int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr) { listIter li; listNode *ln; @@ -1869,6 +1864,10 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i * calls to prevent duplicate lookups. */ aclKeyResultCache cache; initACLKeyResultCache(&cache); + if (key_result) { + cache.keys = *key_result; + cache.keys_init = 1; + } /* Check each selector sequentially */ listRewind(u->selectors,&li); @@ -1876,7 +1875,7 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i aclSelector *s = (aclSelector *) listNodeValue(ln); int acl_retval = ACLSelectorCheckCmd(s, cmd, argv, argc, &local_idxptr, &cache); if (acl_retval == ACL_OK) { - cleanupACLKeyResultCache(&cache); + if (!key_result) cleanupACLKeyResultCache(&cache); return ACL_OK; } if (acl_retval > relevant_error || @@ -1888,13 +1887,13 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i } *idxptr = last_idx; - cleanupACLKeyResultCache(&cache); + if (!key_result) cleanupACLKeyResultCache(&cache); return relevant_error; } /* High level API for checking if a client can execute the queued up command */ int ACLCheckAllPerm(client *c, int *idxptr) { - return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr); + return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, getClientCachedKeyResult(c), idxptr); } /* If 'new' can access all channels 'original' could then return NULL; @@ -3144,7 +3143,7 @@ void aclCommand(client *c) { } int idx; - int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, &idx); + int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, NULL, &idx); if (result != ACL_OK) { sds err = getAclErrorMessage(result, u, cmd, c->argv[idx+3]->ptr, 1); addReplyBulkSds(c, err); diff --git a/src/aof.c b/src/aof.c index 94a28775bf2..90d646bb053 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1641,12 +1641,24 @@ int loadSingleAppendOnlyFile(char *filename) { if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) { + /* queueMultiCommand requires a pendingCommand, so we create a "fake" one here + * for it to consume */ + pendingCommand *pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + addPendingCommand(&fakeClient->pending_cmds, pcmd); + + pcmd->argc = argc; + pcmd->argv_len = argc; + pcmd->argv = argv; + pcmd->cmd = cmd; + /* Note: we don't have to attempt calling evalGetCommandFlags, * since this is AOF, the checks in processCommand are not made * anyway.*/ queueMultiCommand(fakeClient, cmd->flags); } else { cmd->proc(fakeClient); + fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */ } /* The fake client should not have a reply */ diff --git a/src/blocked.c b/src/blocked.c index ee5a365142b..4f518c9a5a4 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -130,8 +130,7 @@ void processUnblockedClients(void) { * call reqresAppendResponse here (for clients blocked on key, * unblockClientOnKey is called, which eventually calls processCommand, * which calls reqresAppendResponse) */ - reqresAppendResponse(c); - resetClient(c); + prepareForNextCommand(c, 0); } if (c->flags & CLIENT_MODULE) { diff --git a/src/cluster.c b/src/cluster.c index 330907b60e8..3fb7af5c030 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1086,6 +1086,31 @@ void clusterCommand(client *c) { } } +/* Extract slot number from keys in a keys_result structure and return to caller. + * Returns INVALID_CLUSTER_SLOT if keys belong to different slots (cross-slot error), + * or if there are no keys. + */ +int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) { + if (keys_result->numkeys == 0) + return INVALID_CLUSTER_SLOT; + + if (!server.cluster_enabled) + return 0; + + int first_slot = INVALID_CLUSTER_SLOT; + for (int j = 0; j < keys_result->numkeys; j++) { + robj *this_key = argv[keys_result->keys[j].pos]; + int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr)); + + if (first_slot == INVALID_CLUSTER_SLOT) + first_slot = this_slot; + else if (first_slot != this_slot) { + return INVALID_CLUSTER_SLOT; + } + } + return first_slot; +} + /* Return the pointer to the cluster node that is able to serve the command. * For the function to succeed the command should only target either: * @@ -1118,13 +1143,16 @@ void clusterCommand(client *c) { * * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is * down but the user attempts to execute a command that addresses one or more keys. */ -clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) { +clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, + getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code) +{ clusterNode *myself = getMyClusterNode(); clusterNode *n = NULL; robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; - multiCmd mc; + pendingCommand mc; + pendingCommand *mcp = &mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0; int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ @@ -1152,11 +1180,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * structure if the client is not in MULTI/EXEC state, this way * we have a single codepath below. */ ms = &_ms; - _ms.commands = &mc; + _ms.commands = &mcp; _ms.count = 1; + + /* Properly initialize the fake pendingCommand */ + initPendingCommand(&mc); mc.argv = argv; mc.argc = argc; mc.cmd = cmd; + mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT; + mc.read_error = read_error; + if (keys_result) { + mc.keys_result = *keys_result; + mc.flags |= PENDING_CMD_KEYS_RESULT_VALID; + } } /* Check that all the keys are in the same hash slot, and obtain this @@ -1164,12 +1201,14 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; - int margc, numkeys, j; + int margc, j; keyReference *keyindex; - mcmd = ms->commands[i].cmd; - margc = ms->commands[i].argc; - margv = ms->commands[i].argv; + pendingCommand *pcmd = ms->commands[i]; + + mcmd = pcmd->cmd; + margc = pcmd->argc; + margv = pcmd->argv; /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ if (!pubsubshard_included && @@ -1178,14 +1217,29 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in pubsubshard_included = 1; } + /* If we have a cached keys result from preprocessCommand(), use it. + * Otherwise, extract keys result. */ + int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID; getKeysResult result = GETKEYS_RESULT_INIT; - numkeys = getKeysFromCommand(mcmd,margv,margc,&result); + if (use_cache_keys_result) + result = pcmd->keys_result; + else + getKeysFromCommand(mcmd,margv,margc,&result); keyindex = result.keys; - for (j = 0; j < numkeys; j++) { + for (j = 0; j < result.numkeys; j++) { + /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */ + if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) { + /* Error: multiple keys from different slots. */ + if (error_code) + *error_code = CLUSTER_REDIR_CROSS_SLOT; + return NULL; + } + robj *thiskey = margv[keyindex[j].pos]; - int thisslot = keyHashSlot((char*)thiskey->ptr, - sdslen(thiskey->ptr)); + int thisslot = pcmd->slot; + if (thisslot == INVALID_CLUSTER_SLOT) + thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); if (firstkey == NULL) { /* This is the first key we see. Check what is the slot @@ -1199,7 +1253,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * not trapped earlier in processCommand(). Report the same * error to the client. */ if (n == NULL) { - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_DOWN_UNBOUND; return NULL; @@ -1222,7 +1276,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * the same key/channel as the first we saw. */ if (slot != thisslot) { /* Error: multiple keys from different slots. */ - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_CROSS_SLOT; return NULL; @@ -1247,7 +1301,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in else existing_keys++; } } - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); } /* No key at all in command? then we can serve the request diff --git a/src/cluster.h b/src/cluster.h index 369c1b91add..830dae87b7c 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -22,6 +22,7 @@ #define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ #define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */ #define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */ +#define INVALID_CLUSTER_SLOT (-1) /* Invalid slot number. */ #define CLUSTER_OK 0 /* Everything looks ok */ #define CLUSTER_FAIL 1 /* The cluster can't work */ #define CLUSTER_NAMELEN 40 /* sha1 hex length */ @@ -158,7 +159,9 @@ int clusterCanAccessKeysInSlot(int slot); struct slotRangeArray *clusterGetLocalSlotRanges(void); /* functions with shared implementations */ -clusterNode *getNodeByQuery(client ... [truncated]
← Back to Alerts View on GitHub →