Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
mysql -e 'CREATE DATABASE ${{ env.MYSQL_TEST_E2E_DB }};' -u${{ env.MYSQL_TEST_USER }} -p${{ env.MYSQL_TEST_PASS }}
- uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.24'
- name: Set up Go
run: |
go get -u golang.org/x/lint/golint
Expand Down Expand Up @@ -137,7 +137,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.24'
- uses: actions/setup-python@v4
with:
python-version: '3.11'
Expand Down Expand Up @@ -176,7 +176,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.24'
- uses: actions/setup-python@v4
with:
python-version: '3.11'
Expand Down Expand Up @@ -215,7 +215,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.24'
- uses: actions/setup-python@v4
with:
python-version: '3.11'
Expand Down Expand Up @@ -245,7 +245,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.24'
- uses: actions/checkout@v3
with:
path: ${{ github.workspace }}/src/github.com/google/fleetspeak
Expand Down
12 changes: 9 additions & 3 deletions fleetspeak/src/client/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"fmt"

log "github.com/golang/glog"
"google.golang.org/protobuf/proto"
tspb "google.golang.org/protobuf/types/known/timestamppb"

"github.com/google/fleetspeak/fleetspeak/src/client/comms"
"github.com/google/fleetspeak/fleetspeak/src/client/service"
"github.com/google/fleetspeak/fleetspeak/src/client/stats"
"github.com/google/fleetspeak/fleetspeak/src/common"
"google.golang.org/protobuf/proto"
tspb "google.golang.org/protobuf/types/known/timestamppb"

clpb "github.com/google/fleetspeak/fleetspeak/src/client/proto/fleetspeak_client"
fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak"
Expand Down Expand Up @@ -141,10 +140,17 @@ func (c commsContext) CurrentIdentity() (comms.ClientIdentity, error) {
return comms.ClientIdentity{}, fmt.Errorf("failed to create ClientID: %v", err)
}

labels := c.c.config.Labels()
stringLabels := make([]string, 0, len(labels))
for _, l := range labels {
stringLabels = append(stringLabels, l.Label)
}

return comms.ClientIdentity{
ID: id,
Private: k,
Public: k.Public(),
Labels: stringLabels,
}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions fleetspeak/src/client/comms/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"net/url"
"time"

"github.com/google/fleetspeak/fleetspeak/src/client/stats"
"github.com/google/fleetspeak/fleetspeak/src/common"

clpb "github.com/google/fleetspeak/fleetspeak/src/client/proto/fleetspeak_client"
"github.com/google/fleetspeak/fleetspeak/src/client/stats"
fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak"
)

Expand Down Expand Up @@ -60,6 +60,7 @@ type ClientIdentity struct {
ID common.ClientID
Private crypto.PrivateKey
Public crypto.PublicKey
Labels []string
}

// A ServerInfo describes what a Communicator needs to know about the servers
Expand Down Expand Up @@ -112,7 +113,7 @@ type Context interface {
// the previous call.
MakeContactData(msgs []*fspb.Message, baseMessages map[string]uint64) (*fspb.WrappedContactData, map[string]uint64, error)

// ProcessContactData processes a ContactData recevied from the server.
// ProcessContactData processes a ContactData received from the server.
ProcessContactData(ctx context.Context, data *fspb.ContactData, streaming bool) error

// ChainRevoked takes an x509 certificate chain, and returns true if any link
Expand Down
30 changes: 24 additions & 6 deletions fleetspeak/src/client/https/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"io"
Expand All @@ -33,10 +34,9 @@ import (
"path"
"time"

log "github.com/golang/glog"
"github.com/google/fleetspeak/fleetspeak/src/client/comms"
"github.com/google/fleetspeak/fleetspeak/src/client/stats"
"github.com/google/fleetspeak/fleetspeak/src/common"

"golang.org/x/net/http2"
)

Expand Down Expand Up @@ -145,10 +145,10 @@ func jitter(seconds int32) time.Duration {
// with the HTTP GET method. The hosts are sequentially dialed until one of them
// successfully responds. If no proper response was received, the function
// returns the most recent error.
func getFileIfModified(ctx context.Context, hosts []string, client *http.Client, service, name string, modSince time.Time, stats stats.CommunicatorCollector) (io.ReadCloser, time.Time, error) {
func getFileIfModified(ctx context.Context, cctx comms.Context, clientCert []byte, hosts []string, client *http.Client, service, name string, modSince time.Time) (io.ReadCloser, time.Time, error) {
var lastErr error
for _, h := range hosts {
body, modSince, err := getFileIfModifiedFromHost(ctx, h, client, service, name, modSince, stats)
body, modSince, err := getFileIfModifiedFromHost(ctx, cctx, clientCert, h, client, service, name, modSince)
if err != nil {
lastErr = err
if ctx.Err() != nil {
Expand All @@ -162,11 +162,11 @@ func getFileIfModified(ctx context.Context, hosts []string, client *http.Client,
return nil, time.Time{}, fmt.Errorf("unable to retrieve file, last attempt failed with: %v", lastErr)
}

func getFileIfModifiedFromHost(ctx context.Context, host string, client *http.Client, service, name string, modSince time.Time, stats stats.CommunicatorCollector) (io.ReadCloser, time.Time, error) {
func getFileIfModifiedFromHost(ctx context.Context, cctx comms.Context, clientCert []byte, host string, client *http.Client, service, name string, modSince time.Time) (io.ReadCloser, time.Time, error) {
var didFetch bool
var err error
defer func() {
stats.AfterGetFileRequest(host, service, name, didFetch, err)
cctx.Stats().AfterGetFileRequest(host, service, name, didFetch, err)
}()

u := url.URL{
Expand All @@ -186,6 +186,24 @@ func getFileIfModifiedFromHost(ctx context.Context, host string, client *http.Cl
req.Header.Set("If-Modified-Since", modSince.Format(http.TimeFormat))
}

if ci, err := cctx.CurrentIdentity(); err == nil {
for _, label := range ci.Labels {
req.Header.Add("X-Fleetspeak-Labels", label)
}
} else {
log.Errorf("Failed to get current identity: %v", err)
}

if si, err := cctx.ServerInfo(); err == nil {
if si.ClientCertificateHeader != "" && clientCert != nil {
bc := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: clientCert})
cc := url.PathEscape(string(bc))
req.Header.Set(si.ClientCertificateHeader, cc)
}
} else {
log.Errorf("Failed to get server info: %v", err)
}

var resp *http.Response
resp, err = client.Do(req)
if err != nil {
Expand Down
66 changes: 56 additions & 10 deletions fleetspeak/src/client/https/https_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
"testing"
"time"

"github.com/google/fleetspeak/fleetspeak/src/client/comms"
"github.com/google/fleetspeak/fleetspeak/src/client/stats"
)

type testStatsCollector struct {
stats.CommunicatorCollector
stats.Collector
fetches atomic.Int64
}

Expand All @@ -38,12 +39,37 @@ func (c *testStatsCollector) AfterGetFileRequest(_, _, _ string, didFetch bool,
}
}

func (*testStatsCollector) OutboundContactData(string, int, error) {}
type testCommsContext struct {
comms.Context
stats stats.Collector
clientLabels []string
}

func (c *testCommsContext) Stats() stats.Collector {
return c.stats
}

func (c *testCommsContext) CurrentIdentity() (comms.ClientIdentity, error) {
return comms.ClientIdentity{Labels: c.clientLabels}, nil
}

func (*testStatsCollector) InboundContactData(string, int, error) {}
func (c *testCommsContext) ServerInfo() (comms.ServerInfo, error) {
return comms.ServerInfo{}, nil
}

func createFakeServer(lastModified time.Time) (*httptest.Server, []string) {
func createFakeServer(lastModified time.Time, blockedLabels ...string) (*httptest.Server, []string) {
blocked := make(map[string]bool)
for _, label := range blockedLabels {
blocked[label] = true
}
fakeServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for _, label := range r.Header.Values("X-Fleetspeak-Labels") {
if blocked[label] {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
}

content := strings.NewReader("test")
http.ServeContent(w, r, "test.txt", lastModified, content)
}))
Expand All @@ -52,12 +78,12 @@ func createFakeServer(lastModified time.Time) (*httptest.Server, []string) {
return fakeServer, hosts
}

func doRequest(t *testing.T, hosts []string, client *http.Client, lastModifiedOnClient time.Time, stats stats.CommunicatorCollector) (string, time.Time) {
func doRequest(t *testing.T, cctx comms.Context, hosts []string, client *http.Client, lastModifiedOnClient time.Time) (string, time.Time) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
defer cancel()

reader, modTime, err := getFileIfModified(ctx, hosts, client, "TestService", "test.txt", lastModifiedOnClient, stats)
reader, modTime, err := getFileIfModified(ctx, cctx, nil, hosts, client, "TestService", "test.txt", lastModifiedOnClient)
if err != nil {
t.Fatalf("getFileIfModified() failed: %v", err)
}
Expand All @@ -74,12 +100,13 @@ func doRequest(t *testing.T, hosts []string, client *http.Client, lastModifiedOn

func TestGetFileIfModified(t *testing.T) {
stats := &testStatsCollector{}
cctx := &testCommsContext{stats: stats}
lastModifiedOnServer := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
lastModifiedOnClient := lastModifiedOnServer.Add(-time.Hour)
fakeServer, hosts := createFakeServer(lastModifiedOnServer)
defer fakeServer.Close()

body, modTime := doRequest(t, hosts, fakeServer.Client(), lastModifiedOnClient, stats)
body, modTime := doRequest(t, cctx, hosts, fakeServer.Client(), lastModifiedOnClient)
if !modTime.Equal(lastModifiedOnServer) {
t.Errorf("Unexpected modTime, got: %v, want: %v", modTime, lastModifiedOnServer)
}
Expand All @@ -94,12 +121,13 @@ func TestGetFileIfModified(t *testing.T) {

func TestGetFileIfNotModified(t *testing.T) {
stats := &testStatsCollector{}
cctx := &testCommsContext{stats: stats}
lastModifiedOnServer := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
lastModifiedOnClient := lastModifiedOnServer
fakeServer, hosts := createFakeServer(lastModifiedOnServer)
defer fakeServer.Close()

body, _ := doRequest(t, hosts, fakeServer.Client(), lastModifiedOnClient, stats)
body, _ := doRequest(t, cctx, hosts, fakeServer.Client(), lastModifiedOnClient)
if want := ""; body != want {
t.Errorf("Unexpected response body, got: %q, want: %q", body, want)
}
Expand All @@ -110,6 +138,7 @@ func TestGetFileIfNotModified(t *testing.T) {
}

func TestGetFileUnreachableHost(t *testing.T) {
cctx := &testCommsContext{stats: &stats.NoopCollector{}}
lastModifiedOnServer := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
lastModifiedOnClient := lastModifiedOnServer.Add(-time.Hour)
fakeServer, hosts := createFakeServer(lastModifiedOnServer)
Expand All @@ -119,11 +148,28 @@ func TestGetFileUnreachableHost(t *testing.T) {
// should still succeed by trying the next one in the list.
hosts = append([]string{"unreachable_host"}, hosts...)

body, modTime := doRequest(t, hosts, fakeServer.Client(), lastModifiedOnClient, stats.NoopCollector{})
body, modTime := doRequest(t, cctx, hosts, fakeServer.Client(), lastModifiedOnClient)
if !modTime.Equal(lastModifiedOnServer) {
t.Errorf("Unexpected modTime, got: %v, want: %v", modTime, lastModifiedOnServer)
}
if want := "test"; body != want {
t.Errorf("Unexpected body, got: %q, want: %q", body, want)
}
}

func TestGetFileUnauthorizedClient(t *testing.T) {
stats := &testStatsCollector{}
cctx := &testCommsContext{stats: stats, clientLabels: []string{"label1"}}
lastModifiedOnServer := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
lastModifiedOnClient := lastModifiedOnServer.Add(-time.Hour)
fakeServer, hosts := createFakeServer(lastModifiedOnServer, "label1")
defer fakeServer.Close()

ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
defer cancel()
_, _, err := getFileIfModified(ctx, cctx, nil, hosts, fakeServer.Client(), "TestService", "test.txt", lastModifiedOnClient)

if err == nil {
t.Errorf("getFileIfModified() succeeded, want error")
}
}
2 changes: 1 addition & 1 deletion fleetspeak/src/client/https/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,5 +366,5 @@ func (c *Communicator) GetFileIfModified(ctx context.Context, service, name stri
c.hostLock.RLock()
hosts := append([]string(nil), c.hosts...)
c.hostLock.RUnlock()
return getFileIfModified(ctx, hosts, c.hc, service, name, modSince, c.cctx.Stats())
return getFileIfModified(ctx, c.cctx, nil, hosts, c.hc, service, name, modSince)
}
5 changes: 2 additions & 3 deletions fleetspeak/src/client/https/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ import (
"time"

log "github.com/golang/glog"
"google.golang.org/protobuf/proto"

"github.com/google/fleetspeak/fleetspeak/src/client/comms"
"github.com/google/fleetspeak/fleetspeak/src/client/watchdog"
"github.com/google/fleetspeak/fleetspeak/src/common"
"google.golang.org/protobuf/proto"

clpb "github.com/google/fleetspeak/fleetspeak/src/client/proto/fleetspeak_client"
fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak"
Expand Down Expand Up @@ -104,7 +103,7 @@ func (c *StreamingCommunicator) GetFileIfModified(ctx context.Context, service,
c.hostLock.RLock()
hosts := append([]string(nil), c.hosts...)
c.hostLock.RUnlock()
return getFileIfModified(ctx, hosts, c.hc, service, name, modSince, c.cctx.Stats())
return getFileIfModified(ctx, c.cctx, c.certBytes, hosts, c.hc, service, name, modSince)
}

func (c *StreamingCommunicator) configure() error {
Expand Down
12 changes: 6 additions & 6 deletions fleetspeak/src/server/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/google/fleetspeak/fleetspeak/src/server/service"
"github.com/google/fleetspeak/fleetspeak/src/server/spanner"
"github.com/google/fleetspeak/fleetspeak/src/server/stats"

"github.com/prometheus/client_golang/prometheus/promhttp"

cpb "github.com/google/fleetspeak/fleetspeak/src/server/components/proto/fleetspeak_components"
Expand Down Expand Up @@ -119,11 +118,12 @@ func MakeComponents(cfg *cpb.Config) (*server.Components, error) {
l = &chttps.ProxyListener{Listener: l}
}
comm, err = https.NewCommunicator(https.Params{
Listener: l,
Cert: []byte(hcfg.Certificates),
FrontendConfig: hcfg.GetFrontendConfig(),
Key: []byte(hcfg.Key),
Streaming: !hcfg.DisableStreaming,
Listener: l,
Cert: []byte(hcfg.Certificates),
FrontendConfig: hcfg.GetFrontendConfig(),
Key: []byte(hcfg.Key),
Streaming: !hcfg.DisableStreaming,
FileServerAuthorization: hcfg.EnableFileServerAuthorization,
})
if err != nil {
return nil, fmt.Errorf("failed to create communicator: %v", err)
Expand Down
Loading
Loading