Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/etcdserver/api/rafthttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))

logRaftCommunication(h.lg, h.localID, m, types.ID(m.From), "receive")
if err := h.r.Process(context.TODO(), m); err != nil {
var writerErr writerToResponse
switch {
Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/api/rafthttp/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (p *pipeline) handle() {
for {
select {
case m := <-p.msgc:
logRaftCommunication(p.tr.Logger, p.tr.ID, m, p.peerID, "send")
start := time.Now()
err := p.post(pbutil.MustMarshal(&m))
end := time.Now()
Expand Down
2 changes: 2 additions & 0 deletions server/etcdserver/api/rafthttp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (cw *streamWriter) run() {
heartbeatc, msgc = nil, nil

case m := <-msgc:
logRaftCommunication(cw.lg, cw.localID, m, cw.peerID, "send")
err := enc.encode(&m)
if err == nil {
unflushed += m.Size()
Expand Down Expand Up @@ -497,6 +498,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
cr.mu.Unlock()
return err
}
logRaftCommunication(cr.lg, cr.tr.ID, m, cr.peerID, "receive")

// gofail: var raftDropHeartbeat struct{}
// continue labelRaftDropHeartbeat
Expand Down
39 changes: 39 additions & 0 deletions server/etcdserver/api/rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package rafthttp

import (
"context"
"encoding/binary"
"net/http"
"sync"
"time"
Expand All @@ -24,6 +25,7 @@ import (
"go.uber.org/zap"
"golang.org/x/time/rate"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
Expand Down Expand Up @@ -451,3 +453,40 @@ func (t *Transport) ActivePeers() (cnt int) {
}
return cnt
}

func logRaftCommunication(lg *zap.Logger, localID types.ID, m raftpb.Message, remote types.ID, direction string) {
if !lg.Core().Enabled(zap.DebugLevel) {
return
}
var requestID uint64
switch m.Type {
case raftpb.MsgBeat, raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
if len(m.Context) != 8 {
return
}
requestID = binary.BigEndian.Uint64(m.Context)
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
if len(m.Entries) > 0 && len(m.Entries[0].Data) == 8 {
requestID = binary.BigEndian.Uint64(m.Entries[0].Data)
}
case raftpb.MsgProp:
if len(m.Entries) > 0 {
r := pb.InternalRaftRequest{}
if err := r.Unmarshal(m.Entries[0].Data); err == nil {
requestID = r.Header.ID
}
}
default:
}
lg.Debug(
"Raft communication",
zap.String("direction", direction),
zap.String("message-type", m.Type.String()),
zap.String("local-member-id", localID.String()),
zap.String("remote-peer-id", remote.String()),
zap.Uint64("term", m.Term),
zap.Uint64("index", m.Index),
zap.Uint64("commit", m.Commit),
zap.Uint64("request-id", requestID),
)
}
209 changes: 94 additions & 115 deletions server/etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/requestid"
"go.etcd.io/raft/v3"
)

Expand Down Expand Up @@ -77,139 +78,117 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {

func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
requestID := s.RequestID()
ctx = requestid.NewContext(ctx, requestID)
lg := s.Logger()
debug := lg.Core().Enabled(zap.DebugLevel)
if debug {
logUnaryRequest(ctx, lg.Debug, info, req)
}

startTime := time.Now()
resp, err := handler(ctx, req)
lg := s.Logger()
if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive
defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp)
duration := time.Since(startTime)

if duration > s.Cfg.WarningUnaryRequestDuration {
var fields []zap.Field
if !debug {
fields = requestLogFields(req)
}
logUnaryResponseStats(ctx, lg.Warn, duration, info, startTime, resp, fields...)
} else if debug {
logUnaryResponseStats(ctx, lg.Debug, duration, info, startTime, resp)
}
return resp, err
}
}

func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, warnLatency time.Duration, info *grpc.UnaryServerInfo, startTime time.Time, req any, resp any) {
duration := time.Since(startTime)
var enabledDebugLevel, expensiveRequest bool
if lg.Core().Enabled(zap.DebugLevel) {
enabledDebugLevel = true
}
if duration > warnLatency {
expensiveRequest = true
}
if !enabledDebugLevel && !expensiveRequest {
return
}
remote := "No remote client info."
func logUnaryRequest(ctx context.Context, log func(msg string, fields ...zap.Field), info *grpc.UnaryServerInfo, req any) {
remote := ""
peerInfo, ok := peer.FromContext(ctx)
if ok {
remote = peerInfo.Addr.String()
}
responseType := info.FullMethod
var reqCount, respCount int64
var reqSize, respSize int
var reqContent string
switch _resp := resp.(type) {
case *pb.RangeResponse:
_req, ok := req.(*pb.RangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.GetCount()
respSize = _resp.Size()
}
case *pb.PutResponse:
_req, ok := req.(*pb.PutRequest)
if ok {
reqCount = 1
reqSize = _req.Size()
reqContent = pb.NewLoggablePutRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
case *pb.DeleteRangeResponse:
_req, ok := req.(*pb.DeleteRangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.GetDeleted()
respSize = _resp.Size()
}
case *pb.TxnResponse:
_req, ok := req.(*pb.TxnRequest)
if ok && _resp != nil {
if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
reqCount = int64(len(_req.GetSuccess()))
reqSize = 0
for _, r := range _req.GetSuccess() {
reqSize += r.Size()
}
} else {
reqCount = int64(len(_req.GetFailure()))
reqSize = 0
for _, r := range _req.GetFailure() {
reqSize += r.Size()
}
}
reqContent = pb.NewLoggableTxnRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
default:
reqCount = -1
reqSize = -1
respCount = -1
respSize = -1
requestID := requestid.FromContext(ctx)
var size int
if msg, ok := req.(Sizer); ok {
size = msg.Size()
}

if enabledDebugLevel {
logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
} else if expensiveRequest {
logExpensiveRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
fields := []zap.Field{
zap.String("method", info.FullMethod),
zap.Uint64("request_id", requestID),
zap.String("remote", remote),
zap.Int("size", size),
}
fields = append(fields, requestLogFields(req)...)
log("request", fields...)
}

func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string,
) {
lg.Debug("request stats",
zap.Time("start time", startTime),
zap.Duration("time spent", duration),
zap.String("remote", remote),
zap.String("response type", responseType),
zap.Int64("request count", reqCount),
zap.Int("request size", reqSize),
zap.Int64("response count", respCount),
zap.Int("response size", respSize),
zap.String("request content", reqContent),
)
type Sizer interface {
Size() int
}

func logExpensiveRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string,
) {
lg.Warn("request stats",
zap.Time("start time", startTime),
zap.Duration("time spent", duration),
zap.String("remote", remote),
zap.String("response type", responseType),
zap.Int64("request count", reqCount),
zap.Int("request size", reqSize),
zap.Int64("response count", respCount),
zap.Int("response size", respSize),
zap.String("request content", reqContent),
func requestLogFields(req any) []zap.Field {
fields := []zap.Field{}
switch _req := req.(type) {
case *pb.RangeRequest:
fields = append(fields,
zap.String("range_begin", string(_req.GetKey())),
zap.String("range_end", string(_req.GetRangeEnd())),
zap.Int64("range_revision", _req.GetRevision()),
zap.Int64("range_limit", _req.GetLimit()),
zap.Bool("range_count_only", _req.GetCountOnly()),
zap.Bool("range_keys_only", _req.GetKeysOnly()),
)
case *pb.PutRequest:
fields = append(fields,
zap.String("put_key", string(_req.GetKey())),
)
case *pb.DeleteRangeRequest:
fields = append(fields,
zap.String("delete_range_begin", string(_req.GetKey())),
zap.String("delete_range_end", string(_req.GetRangeEnd())),
)
case *pb.TxnRequest:
fields = append(fields,
zap.Int("txn_compare_len", len(_req.GetCompare())),
zap.Int("txn_success_len", len(_req.GetSuccess())),
zap.Int("txn_failure_len", len(_req.GetFailure())),
)
default:
}
return fields
}

func logUnaryResponseStats(ctx context.Context, log func(msg string, fields ...zap.Field), duration time.Duration, info *grpc.UnaryServerInfo, startTime time.Time, resp any, fields ...zap.Field) {
var size int
if msg, ok := resp.(Sizer); ok {
size = msg.Size()
}
fields = append(fields,
zap.String("method", info.FullMethod),
zap.Duration("duration", duration),
zap.Int("size", size),
zap.Uint64("request_id", requestid.FromContext(ctx)),
)
switch _resp := resp.(type) {
case *pb.RangeResponse:
fields = append(fields,
zap.Int("size", _resp.Size()),
zap.Int64("count", _resp.GetCount()),
)
case *pb.PutResponse:
fields = append(fields,
zap.Int("size", _resp.Size()),
)
case *pb.DeleteRangeResponse:
fields = append(fields,
zap.Int64("delete_range_deleted", _resp.GetDeleted()),
)
case *pb.TxnResponse:
default:
}
log("response", fields...)
}

func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
Expand Down
17 changes: 17 additions & 0 deletions server/etcdserver/requestid/requestid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package requestid

import "context"

type requestIDKey struct{}

func NewContext(ctx context.Context, requestID uint64) context.Context {
return context.WithValue(ctx, requestIDKey{}, requestID)
}

func FromContext(ctx context.Context) uint64 {
val := ctx.Value(requestIDKey{})
if val == nil {
return 0
}
return val.(uint64)
}
4 changes: 4 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2464,3 +2464,7 @@ func addFeatureGateMetrics(fg featuregate.FeatureGate, guageVec *prometheus.Gaug
guageVec.With(prometheus.Labels{"name": string(feature), "stage": string(featureSpec.PreRelease)}).Set(metricVal)
}
}

func (s *EtcdServer) RequestID() uint64 {
return s.reqIDGen.Next()
}
4 changes: 3 additions & 1 deletion server/etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool
type notifier struct {
c chan struct{}
err error
readIndexID uint64
}

func newNotifier() *notifier {
Expand All @@ -91,7 +92,8 @@ func newNotifier() *notifier {
}
}

func (nc *notifier) notify(err error) {
func (nc *notifier) notify(err error, readIndexID uint64) {
nc.err = err
nc.readIndexID = readIndexID
close(nc.c)
}
Loading