Skip to content

Commit 876fc24

Browse files
committed
all tests passing
1 parent 0dcfbd2 commit 876fc24

File tree

8 files changed

+81
-109
lines changed

8 files changed

+81
-109
lines changed

cmd/backrest/backrest.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
v1 "github.com/garethgeorge/backrest/gen/go/v1"
2121
"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
22+
"github.com/garethgeorge/backrest/gen/go/v1sync/v1syncconnect"
2223
"github.com/garethgeorge/backrest/internal/api"
2324
syncapi "github.com/garethgeorge/backrest/internal/api/syncapi"
2425
"github.com/garethgeorge/backrest/internal/auth"
@@ -68,12 +69,11 @@ func runApp() {
6869
go onterm(os.Interrupt, newForceKillHandler())
6970

7071
// Create dependency components
71-
configStore := createConfigStore()
72-
cfg, err := configStore.Get()
72+
configMgr := &config.ConfigManager{Store: createConfigStore()}
73+
cfg, err := configMgr.Get()
7374
if err != nil {
7475
zap.L().Fatal("error loading config", zap.Error(err))
7576
}
76-
configMgr := &config.ConfigManager{Store: configStore}
7777

7878
opLog, opLogStore, err := newOpLog(cfg)
7979
if err != nil {
@@ -108,13 +108,7 @@ func runApp() {
108108
if err != nil {
109109
zap.L().Fatal("error creating peer state manager", zap.Error(err))
110110
}
111-
<<<<<<< HEAD
112111
syncMgr := syncapi.NewSyncManager(configMgr, opLog, orch, peerStateManager)
113-
114-
=======
115-
116-
syncMgr := syncapi.NewSyncManager(configMgr, opLog, logStore, orch, peerStateManager)
117-
>>>>>>> 9041d3c (improve sync api security by using 'Authorization' headers for initial key exchange)
118112
authenticator := newAuthenticator(configMgr)
119113

120114
// Start background services
@@ -251,16 +245,16 @@ func newServer(
251245
func newRootMux(
252246
apiBackrestHandler v1connect.BackrestHandler,
253247
apiAuthenticationHandler v1connect.AuthenticationHandler,
254-
syncHandler v1connect.BackrestSyncServiceHandler,
255-
syncStateHandler v1connect.BackrestSyncStateServiceHandler,
248+
syncHandler v1syncconnect.BackrestSyncServiceHandler,
249+
syncStateHandler v1syncconnect.BackrestSyncStateServiceHandler,
256250
downloadHandler http.Handler,
257251
authenticator *auth.Authenticator,
258252
) *http.ServeMux {
259253
// Authenticated routes
260254
authedMux := http.NewServeMux()
261255
backrestPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
262256
authedMux.Handle(backrestPath, backrestHandler)
263-
syncStatePath, syncStateHandlerUnauthed := v1connect.NewBackrestSyncStateServiceHandler(syncStateHandler)
257+
syncStatePath, syncStateHandlerUnauthed := v1syncconnect.NewBackrestSyncStateServiceHandler(syncStateHandler)
264258
authedMux.Handle(syncStatePath, syncStateHandlerUnauthed)
265259
authedMux.Handle("/download/", http.StripPrefix("/download", downloadHandler))
266260
authedMux.Handle("/metrics", metric.GetRegistry().Handler())
@@ -269,7 +263,7 @@ func newRootMux(
269263
unauthedMux := http.NewServeMux()
270264
authPath, authHandler := v1connect.NewAuthenticationHandler(apiAuthenticationHandler)
271265
unauthedMux.Handle(authPath, authHandler)
272-
syncPath, syncHandlerUnauthed := v1connect.NewBackrestSyncServiceHandler(syncHandler)
266+
syncPath, syncHandlerUnauthed := v1syncconnect.NewBackrestSyncServiceHandler(syncHandler)
273267
unauthedMux.Handle(syncPath, syncHandlerUnauthed)
274268

275269
// Root mux to dispatch to authenticated or unauthenticated handlers

internal/api/syncapi/syncapi_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -370,24 +370,24 @@ func TestSimpleOperationSync(t *testing.T) {
370370
tryExpectExactOperations(t, ctx, peerHost, oplog.Query{}.SetInstanceID(defaultClientID).SetRepoGUID(defaultRepoGUID),
371371
testutil.OperationsWithDefaults(basicClientOperationTempl, []*v1.Operation{
372372
{
373-
Id: 3, // b/c of the already inserted host ops the sync'd ops start at 3
374-
FlowId: 3,
373+
Id: 2, // b/c of the already inserted host ops the sync'd ops start at 3
374+
FlowId: 2,
375375
OriginalId: 1,
376376
OriginalFlowId: 1,
377377
OriginalInstanceKeyid: identity2.Keyid,
378378
DisplayMessage: "clientop1",
379379
},
380380
{
381-
Id: 4,
382-
FlowId: 3,
381+
Id: 3,
382+
FlowId: 2,
383383
OriginalId: 2,
384384
OriginalFlowId: 1,
385385
OriginalInstanceKeyid: identity2.Keyid,
386386
DisplayMessage: "clientop2",
387387
},
388388
{
389-
Id: 5,
390-
FlowId: 5,
389+
Id: 4,
390+
FlowId: 4,
391391
OriginalId: 3,
392392
OriginalFlowId: 2,
393393
OriginalInstanceKeyid: identity2.Keyid,
@@ -584,13 +584,15 @@ func tryExpectOperationsSynced(t *testing.T, ctx context.Context, peer1 *peerUnd
584584
op.OriginalId = 0
585585
op.OriginalFlowId = 0
586586
op.OriginalInstanceKeyid = ""
587+
op.Modno = 0
587588
}
588589
for _, op := range peer2Ops {
589590
op.Id = 0
590591
op.FlowId = 0
591592
op.OriginalId = 0
592593
op.OriginalFlowId = 0
593594
op.OriginalInstanceKeyid = ""
595+
op.Modno = 0
594596
}
595597

596598
sortFn := func(a, b *v1.Operation) int {

internal/api/syncapi/synccommon.go

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -290,29 +290,61 @@ func newRemoteOpIDMapper(oplog *oplog.OpLog, cacheSize int) (*remoteOpIDMapper,
290290
}, nil
291291
}
292292

293-
// translateSingleID translates a single ID (either opID or flowID) using the provided cache and query
294-
func (sh *remoteOpIDMapper) translateSingleID(
295-
originalInstanceKeyid string,
296-
originalID int64,
297-
cache *lru.Cache[remoteOpIdCacheKey, int64],
298-
query oplog.Query,
299-
) (int64, error) {
300-
if originalID == 0 {
293+
// translateOpID translates a remote operation ID to a local one.
294+
func (om *remoteOpIDMapper) translateOpID(originalInstanceKeyid string, originalOpId int64) (int64, error) {
295+
if originalOpId == 0 {
301296
return 0, nil
302297
}
303298

304299
cacheKey := remoteOpIdCacheKey{
305300
OriginalInstanceKeyid: unique.Make(originalInstanceKeyid),
306-
ID: originalID,
301+
ID: originalOpId,
307302
}
308303

309304
// Check cache first
310-
if translatedID, ok := cache.Get(cacheKey); ok {
305+
if translatedID, ok := om.opIDLru.Get(cacheKey); ok {
311306
return translatedID, nil
312307
}
313308

314309
// Cache miss - query the database
315-
op, err := sh.oplog.FindOneMetadata(query)
310+
op, err := om.oplog.FindOneMetadata(oplog.Query{
311+
OriginalInstanceKeyid: &originalInstanceKeyid,
312+
OriginalID: &originalOpId,
313+
})
314+
if err != nil {
315+
if errors.Is(err, oplog.ErrNoResults) {
316+
return 0, nil // No results means the ID is not found
317+
}
318+
return 0, err // Other errors should be propagated
319+
}
320+
321+
// Cache the result and return
322+
translatedID := op.ID
323+
om.opIDLru.Add(cacheKey, translatedID)
324+
return translatedID, nil
325+
}
326+
327+
// translateFlowID translates a remote flow ID to a local one.
328+
func (om *remoteOpIDMapper) translateFlowID(originalInstanceKeyid string, originalFlowId int64) (int64, error) {
329+
if originalFlowId == 0 {
330+
return 0, nil
331+
}
332+
333+
cacheKey := remoteOpIdCacheKey{
334+
OriginalInstanceKeyid: unique.Make(originalInstanceKeyid),
335+
ID: originalFlowId,
336+
}
337+
338+
// Check cache first
339+
if translatedID, ok := om.flowIDLru.Get(cacheKey); ok {
340+
return translatedID, nil
341+
}
342+
343+
// Cache miss - query the database
344+
op, err := om.oplog.FindOneMetadata(oplog.Query{
345+
OriginalInstanceKeyid: &originalInstanceKeyid,
346+
OriginalFlowID: &originalFlowId,
347+
})
316348
if err != nil {
317349
if errors.Is(err, oplog.ErrNoResults) {
318350
return 0, nil // No results means the ID is not found
@@ -322,7 +354,7 @@ func (sh *remoteOpIDMapper) translateSingleID(
322354

323355
// Cache the result and return
324356
translatedID := op.FlowID
325-
cache.Add(cacheKey, translatedID)
357+
om.flowIDLru.Add(cacheKey, translatedID)
326358
return translatedID, nil
327359
}
328360

@@ -331,29 +363,13 @@ func (om *remoteOpIDMapper) TranslateOpIdAndFlowID(originalInstanceKeyid string,
331363
defer om.opCacheMu.Unlock()
332364

333365
// Translate opID
334-
opID, err := om.translateSingleID(
335-
originalInstanceKeyid,
336-
originalOpId,
337-
om.opIDLru,
338-
oplog.Query{
339-
OriginalInstanceKeyid: &originalInstanceKeyid,
340-
OriginalID: &originalOpId,
341-
},
342-
)
366+
opID, err := om.translateOpID(originalInstanceKeyid, originalOpId)
343367
if err != nil {
344368
return 0, 0, err
345369
}
346370

347371
// Translate flowID
348-
flowID, err := om.translateSingleID(
349-
originalInstanceKeyid,
350-
originalFlowId,
351-
om.flowIDLru,
352-
oplog.Query{
353-
OriginalInstanceKeyid: &originalInstanceKeyid,
354-
OriginalFlowID: &originalFlowId,
355-
},
356-
)
372+
flowID, err := om.translateFlowID(originalInstanceKeyid, originalFlowId)
357373
if err != nil {
358374
return 0, 0, err
359375
}

internal/api/syncapi/syncserver.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func (h *syncSessionHandlerServer) HandleReceiveResources(ctx context.Context, s
248248

249249
func (h *syncSessionHandlerServer) insertOrUpdate(op *v1.Operation, isUpdate bool) error {
250250
// Returns a localOpID and localFlowID or 0 if not found in which case a new ID will be assigned by the insert.
251-
localOpID, localFlowID, err := h.mapper.TranslateOpIdAndFlowID(op.OriginalInstanceKeyid, op.Id, op.FlowId)
251+
localOpID, localFlowID, err := h.mapper.TranslateOpIdAndFlowID(h.peer.Keyid, op.Id, op.FlowId)
252252
if err != nil {
253253
return fmt.Errorf("translating operation ID and flow ID: %w", err)
254254
}
@@ -257,13 +257,11 @@ func (h *syncSessionHandlerServer) insertOrUpdate(op *v1.Operation, isUpdate boo
257257
op.OriginalFlowId = op.FlowId
258258
op.Id = localOpID
259259
op.FlowId = localFlowID
260-
if (op.Id == 0) != (op.FlowId == 0) {
261-
return fmt.Errorf("inconsistent operation and flow ID mapping results: op.ID=%d, flow.ID=%d expected both to be 0 or both to be non-zero", op.Id, op.FlowId)
262-
}
263260
if op.Id == 0 {
264261
if isUpdate {
265262
h.l.Sugar().Warnf("received update for non-existent operation %+v, inserting instead", op)
266263
}
264+
op.Modno = 0
267265
return h.mgr.oplog.Add(op)
268266
} else {
269267
if !isUpdate {

internal/oplog/oplog.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,10 @@ func (o *OpLog) Get(opID int64) (*v1.Operation, error) {
135135

136136
func (o *OpLog) Add(ops ...*v1.Operation) error {
137137
for _, o := range ops {
138-
if o.Id != 0 {
139-
return errors.New("operation already has an ID, OpLog.Add is expected to set the ID")
140-
}
141-
if o.Modno == 0 {
142-
o.Modno = NewRandomModno(0)
138+
if o.Id != 0 || o.Modno != 0 {
139+
return errors.New("operation already has an ID or Modno, OpLog.Add is expected to set the ID/Modno")
143140
}
144141
}
145-
146142
if err := o.store.Add(ops...); err != nil {
147143
return err
148144
}
@@ -156,7 +152,6 @@ func (o *OpLog) Update(ops ...*v1.Operation) error {
156152
if o.Id == 0 {
157153
return errors.New("operation does not have an ID, OpLog.Update is expected to have an ID")
158154
}
159-
o.Modno = NewRandomModno(o.Modno)
160155
}
161156

162157
if err := o.store.Update(ops...); err != nil {

internal/oplog/randmodno.go

Lines changed: 0 additions & 24 deletions
This file was deleted.

internal/oplog/sqlitestore/sqlitestore.go

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ const (
3737
)
3838

3939
type SqliteStore struct {
40-
dbpool *sql.DB
41-
lastIDVal atomic.Int64
42-
dblock *flock.Flock
40+
dbpool *sql.DB
41+
dblock *flock.Flock
4342

4443
ogidCache *lru.TwoQueueCache[opGroupInfo, int64]
4544

4645
kvstore kvstore.KvStore
4746
highestModno atomic.Int64
47+
highestOpID atomic.Int64
4848
}
4949

5050
var _ oplog.OpStore = (*SqliteStore)(nil)
@@ -136,27 +136,15 @@ func (m *SqliteStore) init() error {
136136
return err
137137
}
138138

139-
var lastID int64
140-
err := m.dbpool.QueryRowContext(context.Background(), "SELECT operations.id FROM operations ORDER BY operations.id DESC LIMIT 1").Scan(&lastID)
141-
if err != nil && !errors.Is(err, sql.ErrNoRows) {
142-
return fmt.Errorf("init sqlite: %v", err)
143-
}
144-
m.lastIDVal.Store(lastID)
145-
146-
var highestModno int64
147-
err = m.dbpool.QueryRowContext(context.Background(), "SELECT operations.modno FROM operations ORDER BY operations.modno DESC LIMIT 1").Scan(&highestModno)
148-
if err != nil && !errors.Is(err, sql.ErrNoRows) {
149-
return fmt.Errorf("init sqlite: %v", err)
139+
highestID, highestModno, err := m.GetHighestOpIDAndModno(oplog.Query{}.SetOriginalInstanceKeyid(""))
140+
if err != nil {
141+
return err
150142
}
143+
m.highestOpID.Store(highestID)
151144
m.highestModno.Store(highestModno)
152-
153145
return nil
154146
}
155147

156-
func (m *SqliteStore) nextModno() int64 {
157-
return m.highestModno.Add(1)
158-
}
159-
160148
func (m *SqliteStore) GetHighestOpIDAndModno(q oplog.Query) (int64, int64, error) {
161149
var highestID sql.NullInt64
162150
var highestModno sql.NullInt64
@@ -383,8 +371,6 @@ func (m *SqliteStore) Transform(q oplog.Query, f func(*v1.Operation) (*v1.Operat
383371
continue
384372
}
385373

386-
newOp.Modno = m.nextModno()
387-
388374
if err := m.updateInternal(tx, newOp); err != nil {
389375
return err
390376
}
@@ -431,8 +417,8 @@ func (m *SqliteStore) Add(op ...*v1.Operation) error {
431417
defer tx.Rollback()
432418

433419
for _, o := range op {
434-
o.Id = m.lastIDVal.Add(1)
435-
o.Modno = m.nextModno()
420+
o.Id = m.highestOpID.Add(1)
421+
o.Modno = m.highestModno.Add(1)
436422
if o.FlowId == 0 {
437423
o.FlowId = o.Id
438424
}
@@ -464,7 +450,7 @@ func (m *SqliteStore) Update(op ...*v1.Operation) error {
464450

465451
func (m *SqliteStore) updateInternal(tx *sql.Tx, op ...*v1.Operation) error {
466452
for _, o := range op {
467-
o.Modno = m.nextModno()
453+
o.Modno = m.highestModno.Add(1)
468454
if err := protoutil.ValidateOperation(o); err != nil {
469455
return err
470456
}
@@ -595,7 +581,8 @@ func (m *SqliteStore) ResetForTest(t *testing.T) error {
595581
if err != nil {
596582
return fmt.Errorf("reset for test: %v", err)
597583
}
598-
m.lastIDVal.Store(0)
584+
m.highestOpID.Store(0)
585+
m.highestModno.Store(0)
599586
return nil
600587
}
601588

0 commit comments

Comments
 (0)