-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] Fix concurrency issue #1591 #4
base: master
Are you sure you want to change the base?
Conversation
WalkthroughThe changes refactor the I/O handling in the messaging layer and improve client concurrency management. In Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant FireMethod as fire()
participant Connection
participant ReadLoop as async readLoop()
Client->>FireMethod: Invoke fire(cmd)
FireMethod->>Connection: Write command (header + data)
Connection-->>ReadLoop: Receive response message
ReadLoop->>PendingMap: Lookup pending channel by command ID
ReadLoop->>FireMethod: Dispatch response via channel
FireMethod->>Client: Return response or timeout error
Assessment against linked issues
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
main.go (2)
83-109
: DedicatedreadLoop
for continuously processing responses
- Using a
select
with<-ctx.Done()
ensures the loop terminates cleanly when the client is closed.- Locking around the
pending
map prevents concurrency issues while removing or retrieving the response channel.panic("no id response received")
might be too abrupt for production, but it does surface protocol violations quickly for debugging.- panic("no id response received") + // consider returning an error or logging the issue instead + fmt.Printf("WARN: response received with no matching request: %v\n", resp)
111-141
: Refinedfire
method for sending commands
- Generating a unique correlation ID with
uuid.New()
ensures each request has a distinct channel for handling responses.- The
connWriteMu
lock aroundironhawk.Write()
effectively prevents concurrent writes from interleaving, solving concurrency problems on the TCP stream.- The 10-second timeout in the
select
block is important for preventing indefinite hangs if a response never arrives, but consider making it configurable.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ironhawk/io.go
(1 hunks)main.go
(5 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
ironhawk/io.go (1)
wire/cmd.pb.go (6)
Response
(77-94)Response
(107-107)Response
(122-124)Command
(24-31)Command
(44-44)Command
(59-61)
🔇 Additional comments (10)
ironhawk/io.go (4)
4-9
: Proper import additions for reading/writing protocol headers
The newly introduced imports"encoding/binary"
,"io"
, and"net"
align well with the new header-based protocol approach. This is a clean, direct way to manage binary-encoded message sizes in a TCP context.
13-14
: Validate the maximum message size
DefiningmaxMessageSize = 32 * 1024 * 1024
andheaderSize = 4
helps guard against excessively large payloads and ensures enough bytes are reserved for the header.
18-41
: Robust ‘Read’ function with size checks
- Reading a fixed-size header, followed by a bounded payload governed by
messageSize
, is a safe way to handle untrusted input.- The check for zero-length messages and the upper bound limit help mitigate potential buffer overflows or denial-of-service vectors.
- Returning a descriptive error in each failure case simplifies troubleshooting.
Overall, this code addresses potential concurrency concerns by cleanly separating read operations and providing explicit error results.
44-60
: Efficient ‘Write’ function design
- Constructing a unified buffer of
[header + message]
is a straightforward way to keep the protocol consistent.- The calls to
binary.BigEndian.PutUint32
andcopy
are idiomatic.- Proper error propagation from
conn.Write(messageBuffer)
ensures robust handling of I/O failures.This approach nicely complements the new read logic, simplifying debugging and providing a consistent message format.
main.go (6)
4-4
: Context import
Adding"context"
makes it possible to coordinate cancellation and timeouts for the client’s networking activities, enhancing concurrency safety.
21-29
: New fields for concurrency control and response tracking
cancel context.CancelFunc
,connWriteMu sync.Mutex
, andpendingMu sync.Mutex
provide critical scaffolding to avoid data races.- The
pending
map, keyed byuuid.UUID
, is an effective way to match each command to its async response channel.
59-62
: Initializepending
map and store it in the client
This ensures everyClient
instance can manage request-response channels without requiring global structures or external caches, thus simplifying concurrency.
69-70
: Context creation andcancel
assignment
Obtaining a cancellable context for theClient
is a clean design choice. It allows the client to gracefully close ongoing operations.
72-78
: Kick offreadLoop
after handshake attempt
Starting the reading goroutine (go client.readLoop(ctx)
) early helps ensure that messages arriving soon after the handshake are handled properly. This approach reduces race conditions between initial writes and reads.
245-246
: Clean client teardown viac.cancel()
Invokingcancel()
ensures that all goroutines listening onctx.Done()
can exit gracefully and promptly, avoiding resource leaks. This is a solid best practice for concurrency.
Fixes DiceDB/dice#1591
The source of the issue is absence of synchronization when Client is writing to and reading from TCP connection.
Need your feedback, to make sure you are ok with the approach I took:
wire.Command
/wire.Response
)Client
side, read responses fromnet.Conn
in a loopClient.Fire()
reads from a corresponding channelHere are the links to all of the PRs (this fix requires changes to multiple repositories):
dicedb-go PR
dice PR
dicedb-protos PR
Before this change:


After this change:
Summary by CodeRabbit
New Features
Refactor