From 415c119b5002ef07dc36d91cee245e3f2348aabb Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Tue, 13 Feb 2024 14:09:58 +0000 Subject: [PATCH] Return explicit status codes from distribute (#96) This is primarily focused on making sure that the invalid arg ones don't get mapped to 500 errors. Without this, a misconfigured witness can cause a lot of error log spam on the distributor. --- cmd/internal/distributor/distributor.go | 34 ++++++++++++------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/cmd/internal/distributor/distributor.go b/cmd/internal/distributor/distributor.go index 1eecf94..55fd826 100644 --- a/cmd/internal/distributor/distributor.go +++ b/cmd/internal/distributor/distributor.go @@ -151,18 +151,18 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR l, ok := d.ls[logID] if !ok { - return fmt.Errorf("unknown log ID %q", logID) + return status.Errorf(codes.InvalidArgument, "unknown unknown log ID %q", logID) } wv, ok := d.ws[witID] if !ok { - return fmt.Errorf("unknown witness ID %q", witID) + return status.Errorf(codes.InvalidArgument, "unknown witness ID %q", witID) } newCP, _, n, err := log.ParseCheckpoint(nextRaw, l.Origin, l.Verifier, wv) if err != nil { - return fmt.Errorf("failed to parse checkpoint: %v", err) + return status.Errorf(codes.InvalidArgument, "failed to parse checkpoint: %v", err) } if len(n.Sigs) != 2 { - return fmt.Errorf("failed to verify log and witness signatures; only verified: %v", n.Sigs) + return status.Errorf(codes.InvalidArgument, "failed to verify log and witness signatures; only verified: %v", n.Sigs) } // This is a valid checkpoint for this log for this witness @@ -170,12 +170,12 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR tx, err := d.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { - return fmt.Errorf("failed to begin transaction: %v", err) + return status.Errorf(codes.Internal, "failed to begin transaction: %v", err) } oldBs, err := getLatestCheckpoint(ctx, tx, logID, witID) if err != nil { if status.Code(err) != codes.NotFound { - return fmt.Errorf("failed to query for latest checkpoint: %v", err) + return status.Errorf(codes.Internal, "failed to query for latest checkpoint: %v", err) } } if oldBs != nil { @@ -184,15 +184,15 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR if err != nil { // This really shouldn't ever happen unless the DB is corrupted or the config // for the log or verifier has changed. - return fmt.Errorf("failed to parse checkpoint: %v", err) + return status.Errorf(codes.Internal, "failed to parse checkpoint: %v", err) } if newCP.Size < oldCP.Size { - return fmt.Errorf("checkpoint for log %q and witness %q is for size %d, cannot update to size %d", logID, witID, oldCP.Size, newCP.Size) + return status.Errorf(codes.InvalidArgument, "checkpoint for log %q and witness %q is for size %d, cannot update to size %d", logID, witID, oldCP.Size, newCP.Size) } if newCP.Size == oldCP.Size { if !bytes.Equal(newCP.Hash, oldCP.Hash) { reportInconsistency(oldBs, nextRaw) - return fmt.Errorf("old checkpoint for tree size %d had hash %x but new one has %x", newCP.Size, oldCP.Hash, newCP.Hash) + return status.Errorf(codes.Internal, "old checkpoint for tree size %d had hash %x but new one has %x", newCP.Size, oldCP.Hash, newCP.Hash) } // Nothing to do; checkpoint is equivalent to the old one so avoid DB writes. counterCheckpointUpdateSuccess.Inc() @@ -204,7 +204,7 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR n.UnverifiedSigs = nil nextRaw, err = note.Sign(n) if err != nil { - return fmt.Errorf("failed to serialise note with filtered sigs: %v", err) + return status.Errorf(codes.Internal, "failed to serialise note with filtered sigs: %v", err) } glog.V(1).Infof("Accepted: %s", string(nextRaw)) @@ -213,13 +213,13 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR // log size to create the checkpoint.N files. if _, err := tx.ExecContext(ctx, `REPLACE INTO checkpoints_by_witness (logID, witID, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, witID, newCP.Size, nextRaw); err != nil { - return fmt.Errorf("ExecContext(): %v", err) + return status.Errorf(codes.Internal, "ExecContext(): %v", err) } // Calculate new checkpoint.N given this new checkpoint. rows, err := tx.QueryContext(ctx, "SELECT witID, chkpt FROM checkpoints_by_witness WHERE logID = ? AND treeSize = ? ORDER BY witID ASC", logID, newCP.Size) if err != nil { - return fmt.Errorf("QueryContext(): %v", err) + return status.Errorf(codes.Internal, "QueryContext(): %v", err) } defer func() { if err := rows.Close(); err != nil { @@ -233,25 +233,25 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR var witID string var cp []byte if err := rows.Scan(&witID, &cp); err != nil { - return fmt.Errorf("failed to scan rows: %v", err) + return status.Errorf(codes.Internal, "failed to scan rows: %v", err) } allCheckpoints = append(allCheckpoints, cp) witnesses = append(witnesses, d.ws[witID]) } if err := rows.Err(); err != nil { - return fmt.Errorf("rows.Err(): %v", err) + return status.Errorf(codes.Internal, "rows.Err(): %v", err) } sigCount := len(witnesses) row := tx.QueryRowContext(ctx, "SELECT treeSize FROM merged_checkpoints WHERE logID = ? AND sigCount = ?", logID, sigCount) if row.Err() != nil { - return fmt.Errorf("QueryRowContext(): %v", err) + return status.Errorf(codes.Internal, "QueryRowContext(): %v", err) } var lastTreeSize uint64 if err := row.Scan(&lastTreeSize); err != nil { if err != sql.ErrNoRows { - return fmt.Errorf("Scan(): %v", err) + return status.Errorf(codes.Internal, "Scan(): %v", err) } // If there are no rows then that's fine, we'll allow lastTreeSize to stay at 0 } @@ -266,7 +266,7 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR } else { _, err = tx.ExecContext(ctx, `REPLACE INTO merged_checkpoints (logID, sigCount, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, sigCount, newCP.Size, mergedCP) if err != nil { - return fmt.Errorf("Failed to update checkpoints.%d: %v", sigCount, err) + return status.Errorf(codes.Internal, "Failed to update checkpoints.%d: %v", sigCount, err) } } }