Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: redesign controller-replica communication #1246

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 27 additions & 136 deletions pkg/dataconn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@ package dataconn

import (
"errors"
"github.com/longhorn/longhorn-engine/pkg/types"
"github.com/sirupsen/logrus"
"io"
"net"
"time"

"github.com/sirupsen/logrus"

journal "github.com/longhorn/sparse-tools/stats"

"github.com/longhorn/longhorn-engine/pkg/types"
)

var (
//ErrRWTimeout r/w operation timeout
ErrRWTimeout = errors.New("r/w timeout")
const (
queueLength = 4196
)

// Client replica client
Expand All @@ -25,7 +19,8 @@ type Client struct {
send chan *Message
responses chan *Message
seq uint32
messages map[uint32]*Message
messages [queueLength]*Message
SeqChan chan uint32
wires []*Wire
peerAddr string
sharedTimeouts types.SharedTimeouts
Expand All @@ -37,17 +32,21 @@ func NewClient(conns []net.Conn, sharedTimeouts types.SharedTimeouts) *Client {
for _, conn := range conns {
wires = append(wires, NewWire(conn))
}

c := &Client{
wires: wires,
peerAddr: conns[0].RemoteAddr().String(),
end: make(chan struct{}, 1024),
requests: make(chan *Message, 1024),
send: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
messages: map[uint32]*Message{},
messages: [queueLength]*Message{},
SeqChan: make(chan uint32, queueLength),
sharedTimeouts: sharedTimeouts,
}
go c.loop()
for i := uint32(0); i < queueLength; i++ {
c.SeqChan <- i
}
c.write()
c.read()
return c
Expand Down Expand Up @@ -99,7 +98,7 @@ func (c *Client) operation(op uint32, buf []byte, length uint32, offset int64) (
msg.Data = buf
}

c.requests <- &msg
c.handleRequest(&msg)

<-msg.Complete
// Only copy the message if a read is requested
Expand All @@ -112,6 +111,9 @@ func (c *Client) operation(op uint32, buf []byte, length uint32, offset int64) (
if msg.Type == TypeEOF {
return int(msg.Size), io.EOF
}

c.SeqChan <- msg.Seq

return int(msg.Size), nil
}

Expand All @@ -123,145 +125,34 @@ func (c *Client) Close() {
c.end <- struct{}{}
}

func (c *Client) loop() {
defer close(c.send)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

var clientError error
var ioInflight int
var timeOfLastActivity time.Time

decremented := false
c.sharedTimeouts.Increment()
// Ensure we always decrement the sharedTimeouts counter regardless of how we leave this loop.
defer func() {
if !decremented {
c.sharedTimeouts.Decrement()
}
}()

// handleClientError cleans up all in flight messages
// also stores the error so that future requests/responses get errored immediately.
handleClientError := func(err error) {
clientError = err
for _, msg := range c.messages {
c.replyError(msg, err)
}

ioInflight = 0
timeOfLastActivity = time.Time{}
}

for {
select {
case <-c.end:
return
case <-ticker.C:
if timeOfLastActivity.IsZero() || ioInflight == 0 {
continue
}

exceededTimeout := c.sharedTimeouts.CheckAndDecrement(time.Since(timeOfLastActivity))
if exceededTimeout > 0 {
decremented = true
logrus.Errorf("R/W Timeout. No response received in %v", exceededTimeout)
handleClientError(ErrRWTimeout)
journal.PrintLimited(1000)
}
case req := <-c.requests:
if clientError != nil {
c.replyError(req, clientError)
continue
}

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
if ioInflight == 0 {
// If nothing is in-flight, we should get a fresh timeout.
timeOfLastActivity = time.Now()
}
ioInflight++
}

c.handleRequest(req)
case resp := <-c.responses:
if resp.transportErr != nil {
handleClientError(resp.transportErr)
continue
}

req, pending := c.messages[resp.Seq]
if !pending {
logrus.Warnf("Received response message id %v seq %v type %v for non pending request", resp.ID, resp.Seq, resp.Type)
continue
}

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
ioInflight--
timeOfLastActivity = time.Now()
}

if clientError != nil {
c.replyError(req, clientError)
continue
}

c.handleResponse(resp)
}
}
}

func (c *Client) nextSeq() uint32 {
c.seq++
return c.seq
}

func (c *Client) replyError(req *Message, err error) {
if opErr := journal.RemovePendingOp(req.ID, false); opErr != nil {
logrus.WithError(opErr).WithFields(logrus.Fields{
"seq": req.Seq,
"id": req.ID,
}).Warn("Error removing pending operation")
}
delete(c.messages, req.Seq)
req.Type = TypeError
req.Data = []byte(err.Error())
req.Complete <- struct{}{}
}

func (c *Client) handleRequest(req *Message) {
switch req.Type {
case TypeRead:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpRead, int(req.Size))
case TypeWrite:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpWrite, int(req.Size))
case TypeUnmap:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpUnmap, int(req.Size))
case TypePing:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpPing, 0)
}

req.MagicVersion = MagicVersion
req.Seq = c.nextSeq()

req.Seq = <-c.SeqChan

c.messages[req.Seq] = req
c.send <- req
}

func (c *Client) handleResponse(resp *Message) {
if req, ok := c.messages[resp.Seq]; ok {
err := journal.RemovePendingOp(req.ID, true)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"seq": resp.Seq,
"id": req.ID,
}).Warn("Error removing pending operation")
}
delete(c.messages, resp.Seq)
req.Type = resp.Type
req.Size = resp.Size
req.Data = resp.Data
req.Complete <- struct{}{}
}
req := c.messages[resp.Seq]

req.Type = resp.Type
req.Size = resp.Size
req.Data = resp.Data
req.Complete <- struct{}{}

Comment on lines +149 to +155
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Possible memory retention
Since c.messages[resp.Seq] is never set to nil after req.Complete, stale requests may remain in memory. If large objects are attached, memory usage could grow unnecessarily. Consider clearing this reference:

 func (c *Client) handleResponse(resp *Message) {
     req := c.messages[resp.Seq]
     req.Type = resp.Type
     req.Size = resp.Size
     req.Data = resp.Data
     req.Complete <- struct{}{}
+    c.messages[resp.Seq] = nil
 }

Committable suggestion skipped: line range outside the PR's diff.

}

func (c *Client) write() {
Expand Down Expand Up @@ -290,7 +181,7 @@ func (c *Client) read() {
}
break
}
c.responses <- msg
c.handleResponse(msg)
}
}(wire)
}
Expand Down
38 changes: 27 additions & 11 deletions pkg/dataconn/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,45 @@ import (
"github.com/longhorn/longhorn-engine/pkg/types"
)

const (
threadCount = 256
)

type Server struct {
wire *Wire
requests chan *Message
responses chan *Message
done chan struct{}
data types.DataProcessor
}

func NewServer(conn net.Conn, data types.DataProcessor) *Server {
return &Server{
//init theads
server := &Server{
wire: NewWire(conn),
requests: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
done: make(chan struct{}, 5),
data: data,
}
for i := 0; i < threadCount; i++ {
go func(s *Server) {
for {
msg := <-s.requests
switch msg.Type {
case TypeRead:
s.handleRead(msg)
case TypeWrite:
s.handleWrite(msg)
case TypeUnmap:
s.handleUnmap(msg)
case TypePing:
s.handlePing(msg)
}
}
}(server)
}
Comment on lines +33 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add graceful shutdown mechanism for goroutines
Spawning 256 goroutines without a select statement in the loop risks indefinite blocking if requests is never closed. Consider adding a select on a done channel so these goroutines can gracefully exit when the server shuts down.

 for i := 0; i < threadCount; i++ {
     go func(s *Server) {
         for {
-            msg := <-s.requests
+            select {
+            case msg, ok := <-s.requests:
+                if !ok {
+                    return
+                }
                 switch msg.Type {
                     ...
                 }
+            case <-s.done:
+                return
+            }
         }
     }(server)
 }

Committable suggestion skipped: line range outside the PR's diff.

return server
}

func (s *Server) Handle() error {
Expand All @@ -43,16 +68,7 @@ func (s *Server) readFromWire(ret chan<- error) {
ret <- err
return
}
switch msg.Type {
case TypeRead:
go s.handleRead(msg)
case TypeWrite:
go s.handleWrite(msg)
case TypeUnmap:
go s.handleUnmap(msg)
case TypePing:
go s.handlePing(msg)
}
s.requests <- msg
ret <- nil
}

Expand Down
Loading