Skip to content

Commit

Permalink
fix: unreadable symbols in DS stream on client side (0xPolygonHermez#…
Browse files Browse the repository at this point in the history
…1698)

* fix: unreadable symbols in DS stream on client side

* fix: improved handling DS stop command and streaming status
  • Loading branch information
IvanBelyakoff authored Feb 13, 2025
1 parent c4fb0e3 commit 2b8ebec
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 77 deletions.
129 changes: 55 additions & 74 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func NewClient(ctx context.Context, server string, useTLS bool, checkTimeout tim
mtxStreaming: &sync.Mutex{},
useTLS: useTLS,
tlsConfig: &tls.Config{},
allowStops: true,
}

// Extract hostname from server address (removing port if present)
Expand Down Expand Up @@ -133,9 +134,6 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.Ful

default:
}
if err := c.stopStreamingIfStarted(); err != nil {
return nil, fmt.Errorf("stopStreamingIfStarted: %w", err)
}

fullBlock, err := c.getL2BlockByNumber(blockNum)
if err != nil {
Expand Down Expand Up @@ -181,10 +179,6 @@ func (c *StreamClient) getL2BlockByNumber(blockNum uint64) (l2Block *types.FullL
return nil, fmt.Errorf("expected block number %d but got %d", blockNum, l2Block.L2BlockNumber)
}

if err := c.Stop(); err != nil {
return nil, fmt.Errorf("Stop: %w", err)
}

return l2Block, nil
}

Expand All @@ -207,25 +201,41 @@ func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error
return fullBlock, nil
}

func (c *StreamClient) getStreaming() bool {
c.mtxStreaming.Lock()
defer c.mtxStreaming.Unlock()
return c.streaming
}

func (c *StreamClient) setStreaming(val bool) {
c.mtxStreaming.Lock()
defer c.mtxStreaming.Unlock()
c.streaming = val
}

// don't check for errors here, we just need to empty the socket for next reads
func (c *StreamClient) stopStreamingIfStarted() error {
if c.getStreaming() {
func (c *StreamClient) stopStreaming() error {
c.mtxStreaming.Lock()
defer c.mtxStreaming.Unlock()

if c.streaming {
c.streaming = false
if err := c.sendStopCmd(); err != nil {
return fmt.Errorf("sendStopCmd: %w", err)
}
c.setStreaming(false)

// Read packet
packet, err := c.readBuffer(1)
if err != nil {
log.Info("[Datastream client] Error reading packet", "err", err)
return nil
}

// Check packet type
if packet[0] != PtResult {
log.Info("[Datastream client] Expecting result packet type", "packet", packet[0])
return nil
}

// Read server result entry for the command
if _, err := c.readResultEntry(packet); err != nil {
log.Info("[Datastream client] Error reading result entry", "err", err)
return nil
}
}

return nil
Expand Down Expand Up @@ -262,10 +272,6 @@ func (c *StreamClient) getLatestL2Block() (l2Block *types.FullL2Block, err error
return nil, errors.New("no block found")
}

if err := c.Stop(); err != nil {
return nil, fmt.Errorf("Stop: %w", err)
}

return l2Block, nil
}

Expand Down Expand Up @@ -296,7 +302,7 @@ func (c *StreamClient) Stop() error {
if c.conn == nil || !c.allowStops {
return nil
}
if err := c.sendStopCmd(); err != nil {
if err := c.stopStreaming(); err != nil {
return fmt.Errorf("sendStopCmd: %w", err)
}

Expand All @@ -307,8 +313,9 @@ func (c *StreamClient) Stop() error {
// Returns the current status of the header.
// If started, terminate the connection.
func (c *StreamClient) GetHeader() (*types.HeaderEntry, error) {
if err := c.stopStreamingIfStarted(); err != nil {
return nil, fmt.Errorf("stopStreamingIfStarted: %w", err)
log.Info("[Datastream client] Getting header", "client", c.conn)
if err := c.stopStreaming(); err != nil {
return nil, fmt.Errorf("stopStreaming: %w", err)
}

if err := c.sendHeaderCmd(); err != nil {
Expand Down Expand Up @@ -344,6 +351,7 @@ func (c *StreamClient) GetHeader() (*types.HeaderEntry, error) {

// sendEntryCmdWrapper sends CmdEntry command and reads packet type and decodes result entry.
func (c *StreamClient) sendEntryCmdWrapper(entryNum uint64) error {
log.Info("[Datastream client] Sending entry command", "entryNum", entryNum)
if err := c.sendEntryCmd(entryNum); err != nil {
return fmt.Errorf("sendEntryCmd: %w", err)
}
Expand Down Expand Up @@ -434,48 +442,23 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
return fmt.Errorf("context done - stopping")
default:
}
if err := c.stopStreamingIfStarted(); err != nil {
return fmt.Errorf("stopStreamingIfStarted: %w", err)
}

// first load up the header of the stream
if _, err := c.GetHeader(); err != nil {
return fmt.Errorf("GetHeader: %w", err)
if _, err = c.GetHeader(); err != nil {
err = fmt.Errorf("GetHeader: %w", err)
return err
}

if err = c.readAllEntriesToChannel(); err != nil {
c.lastError = err
return err
}

return nil
}

func (c *StreamClient) handleSocketError(socketErr error) bool {
if socketErr != nil {
log.Warn(fmt.Sprintf("%v", socketErr))
}
if err := c.tryReConnect(); err != nil {
log.Warn(fmt.Sprintf("try reconnect: %v", err))
return false
}

c.RenewEntryChannel()

return true
}

// reads entries to the end of the stream
// at end will wait for new entries to arrive
func (c *StreamClient) readAllEntriesToChannel() (err error) {
defer func() {
if err != nil {
c.setStreaming(false)
c.lastError = err
}
}()

c.setStreaming(true)
c.stopReadingToChannel.Store(false)

var bookmark *types.BookmarkProto
Expand Down Expand Up @@ -505,6 +488,10 @@ func (c *StreamClient) readAllEntriesToChannel() (err error) {

// runs the prerequisites for entries download
func (c *StreamClient) initiateDownloadBookmark(bookmark []byte) (*types.ResultEntry, error) {
if err := c.stopStreaming(); err != nil {
return nil, fmt.Errorf("stopStreaming: %w", err)
}

// send CmdStartBookmark command
if err := c.sendBookmarkCmd(bookmark, true); err != nil {
return nil, fmt.Errorf("sendBookmarkCmd: %w", err)
Expand Down Expand Up @@ -542,14 +529,8 @@ LOOP:
break LOOP
}

var timeout time.Time
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
} else {
timeout = time.Now().Add(c.checkTimeout)
}

if err = c.conn.SetReadDeadline(timeout); err != nil {
err = c.resetReadTimeout()
if err != nil {
return err
}

Expand Down Expand Up @@ -893,7 +874,7 @@ func (c *StreamClient) readResultEntry(packet []byte) (re *types.ResultEntry, er
case types.CmdErrInvalidCommand:
return re, fmt.Errorf("%w: %s", types.ErrInvalidCommand, re.ErrorStr)
default:
return re, fmt.Errorf("unknown error code: %s", re.ErrorStr)
return re, fmt.Errorf("unknown error code: %d str: %s", re.ErrorNum, re.ErrorStr)
}
}

Expand Down Expand Up @@ -949,39 +930,39 @@ func (c *StreamClient) writeToConn(data interface{}) error {
}

func (c *StreamClient) resetWriteTimeout() error {
var timeout time.Time
var timeout time.Duration
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
timeout = minimumCheckTimeout
} else {
timeout = time.Now().Add(c.checkTimeout)
timeout = c.checkTimeout
}

if err := c.conn.SetWriteDeadline(timeout); err != nil {
if err := c.setWriteTimeout(timeout); err != nil {
return fmt.Errorf("%w: conn.SetWriteDeadline: %v", ErrSocket, err)
}

return nil
}

func (c *StreamClient) resetReadTimeout() error {
var timeout time.Time
var timeout time.Duration
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
timeout = minimumCheckTimeout
} else {
timeout = time.Now().Add(c.checkTimeout)
timeout = c.checkTimeout
}

if err := c.conn.SetReadDeadline(timeout); err != nil {
if err := c.setReadTimeout(timeout); err != nil {
return fmt.Errorf("%w: conn.SetReadDeadline: %v", ErrSocket, err)
}

return nil
}

// PrepUnwind handles the state of the client prior to searching to the
// common ancestor block
func (c *StreamClient) PrepUnwind() {
// this is to ensure that the later call to stop streaming if streaming
// is activated.
c.setStreaming(true)
func (c *StreamClient) setReadTimeout(timeout time.Duration) error {
return c.conn.SetReadDeadline(time.Now().Add(timeout))
}

func (c *StreamClient) setWriteTimeout(timeout time.Duration) error {
return c.conn.SetWriteDeadline(time.Now().Add(timeout))
}
7 changes: 5 additions & 2 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ type DatastreamClient interface {
GetProgressAtomic() *atomic.Uint64
Start() error
Stop() error
PrepUnwind()
HandleStart() error
}

Expand Down Expand Up @@ -840,7 +839,11 @@ func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (
}
} else {
dsClient = cfg.dsClient
stopFn = func() {}
stopFn = func() {
if err := dsClient.Stop(); err != nil {
log.Warn("Failed to stop datastream client", "err", err)
}
}
}

return dsClient, stopFn, nil
Expand Down
2 changes: 1 addition & 1 deletion zk/stages/stage_batches_datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *DatastreamClientRunner) StartRead(errorChan chan struct{}, diffBlock ui
} else {
r.dsClient.RenewEntryChannel()
}
r.dsClient.RenewEntryChannel()

if r.isReading.Load() {
return fmt.Errorf("tried starting datastream client runner thread while another is running")
}
Expand Down

0 comments on commit 2b8ebec

Please sign in to comment.