Skip to content

Commit

Permalink
Metrics for for core distributor ops (#92)
Browse files Browse the repository at this point in the history
This should give us a basic understanding of traffic over time, and allow us to compute basic error ratios.
  • Loading branch information
mhutchinson authored Feb 6, 2024
1 parent 81ecc61 commit 82a83f2
Showing 1 changed file with 44 additions and 1 deletion.
45 changes: 44 additions & 1 deletion cmd/internal/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,43 @@ import (
"golang.org/x/mod/sumdb/note"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// maxSigs is the maximum number of sigs that can be requested.
const maxSigs = 100

var (
counterCheckpointUpdateRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "distributor_update_checkpoint_request",
Help: "The total number of requests to update a checkpoint",
})
counterCheckpointUpdateSuccess = promauto.NewCounter(prometheus.CounterOpts{
Name: "distributor_update_checkpoint_success",
Help: "The total number of successful requests to update a checkpoint",
})

counterCheckpointGetNRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "distributor_get_checkpoint_n_request",
Help: "The total number of requests to GetCheckpointN",
})
counterCheckpointGetNSuccess = promauto.NewCounter(prometheus.CounterOpts{
Name: "distributor_get_checkpoint_n_success",
Help: "The total number of successful requests to GetCheckpointN",
})

counterCheckpointGetByWitRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "distributor_get_checkpoint_wit_request",
Help: "The total number of requests to GetCheckpointWitness",
})
counterCheckpointGetByWitSuccess = promauto.NewCounter(prometheus.CounterOpts{
Name: "distributor_get_checkpoint_wit_success",
Help: "The total number of successful requests to GetCheckpointWitness",
})
)

// NewDistributor returns a distributor that will accept checkpoints from
// the given witnesses, for the given logs, and persist its state in the
// database provided. Callers must call Init() on the returned distributor.
Expand Down Expand Up @@ -70,6 +102,7 @@ func (d *Distributor) GetLogs(ctx context.Context) ([]string, error) {

// GetCheckpointN gets the largest checkpoint for a given log that has at least `n` signatures.
func (d *Distributor) GetCheckpointN(ctx context.Context, logID string, n uint32) ([]byte, error) {
counterCheckpointGetNRequests.Inc()
if n == 0 || n > maxSigs {
return nil, status.Errorf(codes.InvalidArgument, "invalid N %d", n)
}
Expand All @@ -92,22 +125,30 @@ func (d *Distributor) GetCheckpointN(ctx context.Context, logID string, n uint32
}
return nil, fmt.Errorf("Scan(): %v", err)
}
counterCheckpointGetNSuccess.Inc()
return cp, nil
}

// GetCheckpointWitness gets the largest checkpoint for the log that was witnessed by the given witness.
func (d *Distributor) GetCheckpointWitness(ctx context.Context, logID, witID string) ([]byte, error) {
counterCheckpointGetByWitRequests.Inc()
tx, err := d.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %v", err)
}
return getLatestCheckpoint(ctx, tx, logID, witID)
cp, err := getLatestCheckpoint(ctx, tx, logID, witID)
if err == nil {
counterCheckpointGetByWitSuccess.Inc()
}
return cp, err
}

// Distribute adds a new witnessed checkpoint to be distributed. This checkpoint must be signed
// by both the log and the witness specified, and be larger than any previous checkpoint distributed
// for this pair.
func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextRaw []byte) error {
counterCheckpointUpdateRequests.Inc()

l, ok := d.ls[logID]
if !ok {
return fmt.Errorf("unknown log ID %q", logID)
Expand Down Expand Up @@ -154,6 +195,7 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
return fmt.Errorf("old checkpoint for tree size %d had hash %x but new one has %x", newCP.Size, oldCP.Hash, newCP.Hash)
}
// Nothing to do; checkpoint is equivalent to the old one so avoid DB writes.
counterCheckpointUpdateSuccess.Inc()
return nil
}
}
Expand Down Expand Up @@ -232,6 +274,7 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
if err := tx.Commit(); err != nil {
return err
}
counterCheckpointUpdateSuccess.Inc()
return nil
}

Expand Down

0 comments on commit 82a83f2

Please sign in to comment.