Skip to content

Commit cd5814d

Browse files
authored
fix: race condition in taskcollectgarbage potentially prematurely deletes logs for tasks currently running (#828)
1 parent 6e0c201 commit cd5814d

File tree

5 files changed

+29
-18
lines changed

5 files changed

+29
-18
lines changed

internal/api/backresthandler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,7 @@ func TestRestore(t *testing.T) {
849849
}
850850

851851
func TestRunCommand(t *testing.T) {
852+
testutil.InstallZapLogger(t)
852853
sut := createSystemUnderTest(t, createConfigManager(&v1.Config{
853854
Modno: 1234,
854855
Instance: "test",
@@ -1087,7 +1088,7 @@ func createSystemUnderTest(t *testing.T, config *config.ConfigManager) systemUnd
10871088
}
10881089
}
10891090

1090-
h := NewBackrestHandler(config, peerStateManager, orch, oplog, logStore)
1091+
h := NewBackrestHandler(config, peerStateManager, orch, oplog, logStore)
10911092

10921093
return systemUnderTest{
10931094
handler: h,

internal/logstore/logstore.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (ls *LogStore) Create(id string, parentOpID int64, ttl time.Duration) (io.W
154154
defer ls.dbpool.Put(conn)
155155

156156
// potentially prune any expired logs
157-
if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE expiration_ts_unix < ? AND expiration_ts_unix != 0", &sqlitex.ExecOptions{
157+
if err := sqlitex.Execute(conn, "DELETE FROM logs WHERE expiration_ts_unix < ? AND expiration_ts_unix != 0", &sqlitex.ExecOptions{
158158
Args: []any{time.Now().Unix()},
159159
}); err != nil {
160160
return nil, fmt.Errorf("prune expired logs: %v", err)
@@ -171,15 +171,13 @@ func (ls *LogStore) Create(id string, parentOpID int64, ttl time.Duration) (io.W
171171
return nil, fmt.Errorf("create temp file: %v", err)
172172
}
173173

174-
expire_ts_unix := time.Unix(0, 0)
174+
var expire_ts_unix int64 = 0
175175
if ttl != 0 {
176-
expire_ts_unix = time.Now().Add(ttl)
176+
expire_ts_unix = time.Now().Add(ttl).Unix()
177177
}
178178

179-
// fmt.Printf("INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (%v, %v, %v, %v)\n", id, expire_ts_unix.Unix(), parentOpID, fname)
180-
181-
if err := sqlitex.ExecuteTransient(conn, "INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (?, ?, ?, ?)", &sqlitex.ExecOptions{
182-
Args: []any{id, expire_ts_unix.Unix(), parentOpID, fname},
179+
if err := sqlitex.Execute(conn, "INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (?, ?, ?, ?)", &sqlitex.ExecOptions{
180+
Args: []any{id, expire_ts_unix, parentOpID, fname},
183181
}); err != nil {
184182
return nil, fmt.Errorf("insert log: %v", err)
185183
}
@@ -210,7 +208,7 @@ func (ls *LogStore) Open(id string) (io.ReadCloser, error) {
210208
var found bool
211209
var fname string
212210
var dataGz []byte
213-
if err := sqlitex.ExecuteTransient(conn, "SELECT data_fname, data_gz FROM logs WHERE id = ?", &sqlitex.ExecOptions{
211+
if err := sqlitex.Execute(conn, "SELECT data_fname, data_gz FROM logs WHERE id = ?", &sqlitex.ExecOptions{
214212
Args: []any{id},
215213
ResultFunc: func(stmt *sqlite.Stmt) error {
216214
found = true
@@ -267,7 +265,7 @@ func (ls *LogStore) Delete(id string) error {
267265
}
268266
defer ls.dbpool.Put(conn)
269267

270-
if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE id = ?", &sqlitex.ExecOptions{
268+
if err := sqlitex.Execute(conn, "DELETE FROM logs WHERE id = ?", &sqlitex.ExecOptions{
271269
Args: []any{id},
272270
}); err != nil {
273271
return fmt.Errorf("delete log: %v", err)
@@ -286,7 +284,7 @@ func (ls *LogStore) DeleteWithParent(parentOpID int64) error {
286284
}
287285
defer ls.dbpool.Put(conn)
288286

289-
if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE owner_opid = ?", &sqlitex.ExecOptions{
287+
if err := sqlitex.Execute(conn, "DELETE FROM logs WHERE owner_opid = ?", &sqlitex.ExecOptions{
290288
Args: []any{parentOpID},
291289
}); err != nil {
292290
return fmt.Errorf("delete log: %v", err)
@@ -302,7 +300,7 @@ func (ls *LogStore) SelectAll(f func(id string, parentID int64)) error {
302300
}
303301
defer ls.dbpool.Put(conn)
304302

305-
return sqlitex.ExecuteTransient(conn, "SELECT id, owner_opid FROM logs ORDER BY owner_opid", &sqlitex.ExecOptions{
303+
return sqlitex.Execute(conn, "SELECT id, owner_opid FROM logs ORDER BY owner_opid", &sqlitex.ExecOptions{
306304
ResultFunc: func(stmt *sqlite.Stmt) error {
307305
f(stmt.ColumnText(0), stmt.ColumnInt64(1))
308306
return nil
@@ -364,7 +362,7 @@ func (ls *LogStore) finalizeLogFile(id string, fname string) error {
364362
}); e != nil {
365363
return fmt.Errorf("update log: %v", e)
366364
} else if conn.Changes() != 1 {
367-
return fmt.Errorf("expected 1 row to be updated, got %d", conn.Changes())
365+
return fmt.Errorf("expected 1 row to be updated for %q, got %d", id, conn.Changes())
368366
}
369367

370368
return nil

internal/orchestrator/orchestrator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ func (o *Orchestrator) setupTaskContext(ctx context.Context, op *v1.Operation, c
572572
o.taskCancelMu.Unlock()
573573

574574
// Set up logging
575-
logID := uuid.New().String()
575+
logID := "t-" + uuid.New().String()
576576
var err error
577577
logWriter, err = o.logStore.Create(logID, op.Id, defaultTaskLogDuration)
578578
if err != nil {

internal/orchestrator/taskrunnerimpl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (t *taskRunnerImpl) Logger(ctx context.Context) *zap.Logger {
172172
}
173173

174174
func (t *taskRunnerImpl) LogrefWriter() (string, io.WriteCloser, error) {
175-
logID := uuid.New().String()
175+
logID := "c-" + uuid.New().String()
176176
writer, err := t.orchestrator.logStore.Create(logID, t.op.GetId(), time.Duration(0))
177177
return logID, writer, err
178178
}

internal/orchestrator/tasks/taskcollectgarbage.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package tasks
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"reflect"
78
"time"
@@ -212,18 +213,29 @@ func (t *CollectGarbageTask) gcOperations(runner TaskRunner) error {
212213
zap.Any("removed_by_unknown_peer_keyid", deletedByUnknownPeerKeyid))
213214

214215
// cleaning up logstore
215-
toDelete := []string{}
216+
toDelete := make(map[string]int64)
216217
if err := t.logstore.SelectAll(func(id string, parentID int64) {
217218
if parentID == 0 {
218219
return
219220
}
220221
if _, ok := validIDs[parentID]; !ok {
221-
toDelete = append(toDelete, id)
222+
toDelete[id] = parentID // this logstore entry is orphaned, mark it for deletion
222223
}
223224
}); err != nil {
224225
return fmt.Errorf("selecting all logstore entries: %w", err)
225226
}
226-
for _, id := range toDelete {
227+
for id, parentID := range toDelete {
228+
// Confirm that the ID is invalid by trying to get it from the oplog
229+
if _, err := runner.GetOperation(parentID); !errors.Is(err, oplog.ErrNotExist) {
230+
if err != nil {
231+
zap.L().Error("getting operation for logstore entry", zap.String("id", id), zap.Int64("parent_id", parentID), zap.Error(err))
232+
continue
233+
}
234+
zap.L().Debug("logstore entry is still valid, skipping deletion", zap.String("id", id), zap.Error(err))
235+
continue
236+
}
237+
238+
// The logstore entry is orphaned, delete it
227239
if err := t.logstore.Delete(id); err != nil {
228240
zap.L().Error("deleting logstore entry", zap.String("id", id), zap.Error(err))
229241
}

0 commit comments

Comments
 (0)