Skip to content

Commit

Permalink
Return explicit status codes from distribute (#96)
Browse files Browse the repository at this point in the history
This is primarily focused on making sure that the invalid arg ones don't get mapped to 500 errors. Without this, a misconfigured witness can cause a lot of error log spam on the distributor.
  • Loading branch information
mhutchinson authored Feb 13, 2024
1 parent c125524 commit 415c119
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions cmd/internal/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,31 +151,31 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR

l, ok := d.ls[logID]
if !ok {
return fmt.Errorf("unknown log ID %q", logID)
return status.Errorf(codes.InvalidArgument, "unknown unknown log ID %q", logID)
}
wv, ok := d.ws[witID]
if !ok {
return fmt.Errorf("unknown witness ID %q", witID)
return status.Errorf(codes.InvalidArgument, "unknown witness ID %q", witID)
}
newCP, _, n, err := log.ParseCheckpoint(nextRaw, l.Origin, l.Verifier, wv)
if err != nil {
return fmt.Errorf("failed to parse checkpoint: %v", err)
return status.Errorf(codes.InvalidArgument, "failed to parse checkpoint: %v", err)
}
if len(n.Sigs) != 2 {
return fmt.Errorf("failed to verify log and witness signatures; only verified: %v", n.Sigs)
return status.Errorf(codes.InvalidArgument, "failed to verify log and witness signatures; only verified: %v", n.Sigs)
}

// This is a valid checkpoint for this log for this witness
// Now find the previous checkpoint if one exists.

tx, err := d.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
return status.Errorf(codes.Internal, "failed to begin transaction: %v", err)
}
oldBs, err := getLatestCheckpoint(ctx, tx, logID, witID)
if err != nil {
if status.Code(err) != codes.NotFound {
return fmt.Errorf("failed to query for latest checkpoint: %v", err)
return status.Errorf(codes.Internal, "failed to query for latest checkpoint: %v", err)
}
}
if oldBs != nil {
Expand All @@ -184,15 +184,15 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
if err != nil {
// This really shouldn't ever happen unless the DB is corrupted or the config
// for the log or verifier has changed.
return fmt.Errorf("failed to parse checkpoint: %v", err)
return status.Errorf(codes.Internal, "failed to parse checkpoint: %v", err)
}
if newCP.Size < oldCP.Size {
return fmt.Errorf("checkpoint for log %q and witness %q is for size %d, cannot update to size %d", logID, witID, oldCP.Size, newCP.Size)
return status.Errorf(codes.InvalidArgument, "checkpoint for log %q and witness %q is for size %d, cannot update to size %d", logID, witID, oldCP.Size, newCP.Size)
}
if newCP.Size == oldCP.Size {
if !bytes.Equal(newCP.Hash, oldCP.Hash) {
reportInconsistency(oldBs, nextRaw)
return fmt.Errorf("old checkpoint for tree size %d had hash %x but new one has %x", newCP.Size, oldCP.Hash, newCP.Hash)
return status.Errorf(codes.Internal, "old checkpoint for tree size %d had hash %x but new one has %x", newCP.Size, oldCP.Hash, newCP.Hash)
}
// Nothing to do; checkpoint is equivalent to the old one so avoid DB writes.
counterCheckpointUpdateSuccess.Inc()
Expand All @@ -204,7 +204,7 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
n.UnverifiedSigs = nil
nextRaw, err = note.Sign(n)
if err != nil {
return fmt.Errorf("failed to serialise note with filtered sigs: %v", err)
return status.Errorf(codes.Internal, "failed to serialise note with filtered sigs: %v", err)
}
glog.V(1).Infof("Accepted: %s", string(nextRaw))

Expand All @@ -213,13 +213,13 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
// log size to create the checkpoint.N files.

if _, err := tx.ExecContext(ctx, `REPLACE INTO checkpoints_by_witness (logID, witID, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, witID, newCP.Size, nextRaw); err != nil {
return fmt.Errorf("ExecContext(): %v", err)
return status.Errorf(codes.Internal, "ExecContext(): %v", err)
}

// Calculate new checkpoint.N given this new checkpoint.
rows, err := tx.QueryContext(ctx, "SELECT witID, chkpt FROM checkpoints_by_witness WHERE logID = ? AND treeSize = ? ORDER BY witID ASC", logID, newCP.Size)
if err != nil {
return fmt.Errorf("QueryContext(): %v", err)
return status.Errorf(codes.Internal, "QueryContext(): %v", err)
}
defer func() {
if err := rows.Close(); err != nil {
Expand All @@ -233,25 +233,25 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
var witID string
var cp []byte
if err := rows.Scan(&witID, &cp); err != nil {
return fmt.Errorf("failed to scan rows: %v", err)
return status.Errorf(codes.Internal, "failed to scan rows: %v", err)
}
allCheckpoints = append(allCheckpoints, cp)
witnesses = append(witnesses, d.ws[witID])
}

if err := rows.Err(); err != nil {
return fmt.Errorf("rows.Err(): %v", err)
return status.Errorf(codes.Internal, "rows.Err(): %v", err)
}

sigCount := len(witnesses)
row := tx.QueryRowContext(ctx, "SELECT treeSize FROM merged_checkpoints WHERE logID = ? AND sigCount = ?", logID, sigCount)
if row.Err() != nil {
return fmt.Errorf("QueryRowContext(): %v", err)
return status.Errorf(codes.Internal, "QueryRowContext(): %v", err)
}
var lastTreeSize uint64
if err := row.Scan(&lastTreeSize); err != nil {
if err != sql.ErrNoRows {
return fmt.Errorf("Scan(): %v", err)
return status.Errorf(codes.Internal, "Scan(): %v", err)
}
// If there are no rows then that's fine, we'll allow lastTreeSize to stay at 0
}
Expand All @@ -266,7 +266,7 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
} else {
_, err = tx.ExecContext(ctx, `REPLACE INTO merged_checkpoints (logID, sigCount, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, sigCount, newCP.Size, mergedCP)
if err != nil {
return fmt.Errorf("Failed to update checkpoints.%d: %v", sigCount, err)
return status.Errorf(codes.Internal, "Failed to update checkpoints.%d: %v", sigCount, err)
}
}
}
Expand Down

0 comments on commit 415c119

Please sign in to comment.