Skip to content

Commit e99faea

Browse files
committed
fix: cont'd work on sync implementation
1 parent cd5814d commit e99faea

File tree

2 files changed

+37
-10
lines changed

2 files changed

+37
-10
lines changed

internal/api/syncapi/syncclient.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ func (c *SyncClient) RunSync(ctx context.Context) {
131131
c.mgr.peerStateManager.SetPeerState(c.peer.Keyid, state)
132132
} else {
133133
c.reconnectAttempts = 0
134+
state := c.mgr.peerStateManager.GetPeerState(c.peer.Keyid).Clone()
135+
if state == nil {
136+
state = newPeerState(c.peer.InstanceId, c.peer.Keyid)
137+
}
138+
state.ConnectionState = v1.SyncConnectionState_CONNECTION_STATE_DISCONNECTED
139+
state.ConnectionStateMessage = "disconnected"
140+
state.LastHeartbeat = time.Now()
141+
c.mgr.peerStateManager.SetPeerState(c.peer.Keyid, state)
134142
}
135143

136144
wg.Wait()

internal/api/syncapi/synchandler.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,27 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
5858
}()
5959

6060
if err := cmdStream.ConnectStream(ctx, stream); err != nil {
61-
zap.S().Errorf("sync handler stream error: %v", err)
62-
var syncErr *SyncError
63-
if errors.As(err, &syncErr) {
64-
if sessionHandler.peer != nil {
65-
peerState := h.mgr.peerStateManager.GetPeerState(sessionHandler.peer.Keyid).Clone()
66-
if peerState == nil {
67-
peerState = newPeerState(sessionHandler.peer.InstanceId, sessionHandler.peer.Keyid)
68-
}
61+
if sessionHandler.peer != nil {
62+
zap.S().Errorf("sync handler stream error for client %q: %v", sessionHandler.peer.InstanceId, err)
63+
peerState := h.mgr.peerStateManager.GetPeerState(sessionHandler.peer.Keyid).Clone()
64+
if peerState == nil {
65+
peerState = newPeerState(sessionHandler.peer.InstanceId, sessionHandler.peer.Keyid)
66+
}
67+
peerState.LastHeartbeat = time.Now()
68+
peerState.ConnectionState = v1.SyncConnectionState_CONNECTION_STATE_DISCONNECTED
69+
peerState.ConnectionStateMessage = err.Error()
70+
var syncErr *SyncError
71+
if errors.As(err, &syncErr) {
6972
peerState.ConnectionState = syncErr.State
7073
peerState.ConnectionStateMessage = syncErr.Message.Error()
71-
peerState.LastHeartbeat = time.Now()
72-
h.mgr.peerStateManager.SetPeerState(sessionHandler.peer.Keyid, peerState)
7374
}
75+
h.mgr.peerStateManager.SetPeerState(sessionHandler.peer.Keyid, peerState)
76+
} else {
77+
zap.S().Errorf("sync handler stream error for unestablished session: %v", err)
78+
}
79+
80+
var syncErr *SyncError
81+
if errors.As(err, &syncErr) {
7482
switch syncErr.State {
7583
case v1.SyncConnectionState_CONNECTION_STATE_ERROR_AUTH:
7684
return connect.NewError(connect.CodePermissionDenied, syncErr.Message)
@@ -80,6 +88,17 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
8088
return connect.NewError(connect.CodeInternal, syncErr.Message)
8189
}
8290
}
91+
return connect.NewError(connect.CodeInternal, err)
92+
} else {
93+
peerState := h.mgr.peerStateManager.GetPeerState(sessionHandler.peer.Keyid).Clone()
94+
if peerState == nil {
95+
peerState = newPeerState(sessionHandler.peer.InstanceId, sessionHandler.peer.Keyid)
96+
}
97+
peerState.LastHeartbeat = time.Now()
98+
peerState.ConnectionState = v1.SyncConnectionState_CONNECTION_STATE_DISCONNECTED
99+
peerState.ConnectionStateMessage = "disconnected"
100+
h.mgr.peerStateManager.SetPeerState(sessionHandler.peer.Keyid, peerState)
101+
zap.S().Infof("sync handler stream closed for client %q", sessionHandler.peer.InstanceId)
83102
}
84103

85104
return nil

0 commit comments

Comments
 (0)