Skip to content

Commit

Permalink
Workaround for the mismatched server identifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p committed Jan 17, 2025
1 parent 3ea44d5 commit 75a65b1
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
3 changes: 2 additions & 1 deletion pkg/experiment/metastore/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (c *Client) updateServers(servers []discovery.Server) {
c.logger.Log("msg", "updating servers", "servers", fmt.Sprintf("%+v", servers))
byID := make(map[raft.ServerID][]discovery.Server, len(servers))
for _, srv := range servers {
byID[srv.Raft.ID] = append(byID[srv.Raft.ID], srv)
id := stripPort(string(srv.Raft.ID))
byID[id] = append(byID[id], srv)
}
for k, ss := range byID {
if len(ss) > 1 {
Expand Down
15 changes: 13 additions & 2 deletions pkg/experiment/metastore/client/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"time"

"github.com/go-kit/log/level"
"github.com/hashicorp/raft"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -54,9 +56,9 @@ func invoke[R any](ctx context.Context, cl *Client,
node, ok := raftnode.RaftLeaderFromStatusDetails(err)
if ok {
cl.mu.Lock()
if cl.leader == it.srv.Raft.ID {
if strings.Contains(string(it.srv.Raft.ID), string(cl.leader)) {
cl.logger.Log("msg", "changing metastore client leader", "current", cl.leader, "new", node.Id)
cl.leader = raft.ServerID(node.Id)
cl.leader = stripPort(node.Id)
}
cl.mu.Unlock()
} else {
Expand Down Expand Up @@ -88,6 +90,7 @@ func (c *Client) selectInstance(override bool) *client {
if j == idx {
it = v
c.leader = k
level.Debug(c.logger).Log("msg", "selected a random metastore server", "new_leader", c.leader)
break
}
j++
Expand All @@ -96,6 +99,14 @@ func (c *Client) selectInstance(override bool) *client {
return it
}

func stripPort(server string) raft.ServerID {
serverWithoutPort := server
if idx := strings.LastIndex(serverWithoutPort, ":"); idx != -1 {
serverWithoutPort = serverWithoutPort[:idx]
}
return raft.ServerID(serverWithoutPort)
}

// TODO(kolesnikovae): Interceptor.

func (c *Client) AddBlock(ctx context.Context, in *metastorev1.AddBlockRequest, opts ...grpc.CallOption) (*metastorev1.AddBlockResponse, error) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/experiment/metastore/discovery/kuberesolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package discovery

import (
"fmt"
"github.com/go-kit/log"
kuberesolver2 "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery/kuberesolver"
"github.com/hashicorp/raft"
"google.golang.org/grpc/resolver"
"net/url"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/hashicorp/raft"
"google.golang.org/grpc/resolver"

kuberesolver2 "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery/kuberesolver"
)

type KubeDiscovery struct {
Expand Down Expand Up @@ -97,7 +99,7 @@ func convertEndpoints(e kuberesolver2.Endpoints, ti targetInfo) []Server {
continue
}
podName := addr.TargetRef.Name
raftServerId := fmt.Sprintf("%s.%s.%s:%d", podName, ti.service, ti.namespace, port.Port)
raftServerId := fmt.Sprintf("%s.%s.%s.svc.cluster.local.:%d", podName, ti.service, ti.namespace, port.Port)

servers = append(servers, Server{
ResolvedAddress: fmt.Sprintf("%s:%d", addr.IP, port.Port),
Expand Down

0 comments on commit 75a65b1

Please sign in to comment.