From 50c979dfdaa57e2320bf752a2bbb82c1aa9e5336 Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:16:45 +0000 Subject: [PATCH 1/6] Move v1 API into dedicated package --- activation/e2e/activation_test.go | 4 +- activation/e2e/atx_merge_test.go | 4 +- activation/e2e/builds_atx_v2_test.go | 4 +- activation/e2e/certifier_client_test.go | 4 +- activation/e2e/checkpoint_merged_test.go | 4 +- activation/e2e/checkpoint_test.go | 4 +- activation/e2e/nipost_test.go | 7 +- activation/e2e/validation_test.go | 4 +- api/grpcserver/http_server_test.go | 15 +-- api/grpcserver/{ => v1}/activation_service.go | 2 +- .../{ => v1}/activation_service_test.go | 2 +- api/grpcserver/{ => v1}/admin_service.go | 2 +- api/grpcserver/{ => v1}/admin_service_test.go | 2 +- api/grpcserver/{ => v1}/debug_service.go | 2 +- api/grpcserver/{ => v1}/events.go | 2 +- .../{ => v1}/globalstate_service.go | 2 +- .../{ => v1}/globalstate_service_test.go | 2 +- api/grpcserver/{ => v1}/grpcserver_test.go | 66 +------------ api/grpcserver/{ => v1}/interface.go | 4 +- api/grpcserver/{ => v1}/mesh_service.go | 2 +- api/grpcserver/{ => v1}/mesh_service_test.go | 2 +- api/grpcserver/{ => v1}/mocks.go | 4 +- api/grpcserver/{ => v1}/node_service.go | 2 +- api/grpcserver/{ => v1}/node_service_test.go | 2 +- api/grpcserver/{ => v1}/post_client.go | 2 +- api/grpcserver/{ => v1}/post_info_service.go | 2 +- .../{ => v1}/post_info_service_test.go | 2 +- api/grpcserver/{ => v1}/post_service.go | 2 +- api/grpcserver/{ => v1}/post_service_test.go | 7 +- api/grpcserver/{ => v1}/smesher_service.go | 2 +- .../{ => v1}/smesher_service_test.go | 58 +++++------ .../{ => v1}/transaction_service.go | 2 +- .../{ => v1}/transaction_service_test.go | 2 +- .../{grpcserver_tls_test.go => v1/v1_test.go} | 96 +++++++++++++------ cmd/bootstrapper/generator_test.go | 8 +- node/node.go | 27 +++--- .../distributed_post_verification_test.go | 5 +- 37 files changed, 173 insertions(+), 190 deletions(-) rename api/grpcserver/{ => v1}/activation_service.go (99%) rename api/grpcserver/{ => v1}/activation_service_test.go (99%) rename api/grpcserver/{ => v1}/admin_service.go (99%) rename api/grpcserver/{ => v1}/admin_service_test.go (99%) rename api/grpcserver/{ => v1}/debug_service.go (99%) rename api/grpcserver/{ => v1}/events.go (98%) rename api/grpcserver/{ => v1}/globalstate_service.go (99%) rename api/grpcserver/{ => v1}/globalstate_service_test.go (99%) rename api/grpcserver/{ => v1}/grpcserver_test.go (97%) rename api/grpcserver/{ => v1}/interface.go (96%) rename api/grpcserver/{ => v1}/mesh_service.go (99%) rename api/grpcserver/{ => v1}/mesh_service_test.go (99%) rename api/grpcserver/{ => v1}/mocks.go (99%) rename api/grpcserver/{ => v1}/node_service.go (99%) rename api/grpcserver/{ => v1}/node_service_test.go (99%) rename api/grpcserver/{ => v1}/post_client.go (99%) rename api/grpcserver/{ => v1}/post_info_service.go (98%) rename api/grpcserver/{ => v1}/post_info_service_test.go (98%) rename api/grpcserver/{ => v1}/post_service.go (99%) rename api/grpcserver/{ => v1}/post_service_test.go (99%) rename api/grpcserver/{ => v1}/smesher_service.go (99%) rename api/grpcserver/{ => v1}/smesher_service_test.go (87%) rename api/grpcserver/{ => v1}/transaction_service.go (99%) rename api/grpcserver/{ => v1}/transaction_service_test.go (99%) rename api/grpcserver/{grpcserver_tls_test.go => v1/v1_test.go} (80%) diff --git a/activation/e2e/activation_test.go b/activation/e2e/activation_test.go index d0c86e5785..426d225fa2 100644 --- a/activation/e2e/activation_test.go +++ b/activation/e2e/activation_test.go @@ -20,7 +20,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" "github.com/spacemeshos/go-spacemesh/activation/wire" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -64,7 +64,7 @@ func Test_BuilderWithMultipleClients(t *testing.T) { db := statesql.InMemoryTest(t) localDB := localsql.InMemoryTest(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) diff --git a/activation/e2e/atx_merge_test.go b/activation/e2e/atx_merge_test.go index 2cbc7885d5..d7ac66e127 100644 --- a/activation/e2e/atx_merge_test.go +++ b/activation/e2e/atx_merge_test.go @@ -20,7 +20,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" ae2e "github.com/spacemeshos/go-spacemesh/activation/e2e" "github.com/spacemeshos/go-spacemesh/activation/wire" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -214,7 +214,7 @@ func Test_MarryAndMerge(t *testing.T) { t.Cleanup(func() { assert.NoError(t, cdb.Close()) }) localDB := localsql.InMemoryTest(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) diff --git a/activation/e2e/builds_atx_v2_test.go b/activation/e2e/builds_atx_v2_test.go index f3992b6b49..5201780f85 100644 --- a/activation/e2e/builds_atx_v2_test.go +++ b/activation/e2e/builds_atx_v2_test.go @@ -17,7 +17,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" ae2e "github.com/spacemeshos/go-spacemesh/activation/e2e" "github.com/spacemeshos/go-spacemesh/activation/wire" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -58,7 +58,7 @@ func TestBuilder_SwitchesToBuildV2(t *testing.T) { t.Cleanup(func() { assert.NoError(t, cdb.Close()) }) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) diff --git a/activation/e2e/certifier_client_test.go b/activation/e2e/certifier_client_test.go index 3a0d8b3350..71612f7cc3 100644 --- a/activation/e2e/certifier_client_test.go +++ b/activation/e2e/certifier_client_test.go @@ -20,7 +20,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/activation" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql/localsql" @@ -39,7 +39,7 @@ func TestCertification(t *testing.T) { opts := testPostSetupOpts(t) logger := zaptest.NewLogger(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) diff --git a/activation/e2e/checkpoint_merged_test.go b/activation/e2e/checkpoint_merged_test.go index 88c2a87238..7498369ceb 100644 --- a/activation/e2e/checkpoint_merged_test.go +++ b/activation/e2e/checkpoint_merged_test.go @@ -17,7 +17,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" ae2e "github.com/spacemeshos/go-spacemesh/activation/e2e" "github.com/spacemeshos/go-spacemesh/activation/wire" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/checkpoint" "github.com/spacemeshos/go-spacemesh/codec" @@ -48,7 +48,7 @@ func Test_CheckpointAfterMerge(t *testing.T) { t.Cleanup(func() { assert.NoError(t, cdb.Close()) }) localDB := localsql.InMemoryTest(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/checkpoint_test.go b/activation/e2e/checkpoint_test.go index cf481facb7..00eaa1c8b2 100644 --- a/activation/e2e/checkpoint_test.go +++ b/activation/e2e/checkpoint_test.go @@ -16,7 +16,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" ae2e "github.com/spacemeshos/go-spacemesh/activation/e2e" "github.com/spacemeshos/go-spacemesh/activation/wire" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/checkpoint" "github.com/spacemeshos/go-spacemesh/codec" @@ -51,7 +51,7 @@ func TestCheckpoint_PublishingSoloATXs(t *testing.T) { t.Cleanup(func() { assert.NoError(t, cdb.Close()) }) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/nipost_test.go b/activation/e2e/nipost_test.go index a9a13970e9..68e5733f48 100644 --- a/activation/e2e/nipost_test.go +++ b/activation/e2e/nipost_test.go @@ -20,6 +20,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" ae2e "github.com/spacemeshos/go-spacemesh/activation/e2e" "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" @@ -130,7 +131,7 @@ func initPost( sig *signing.EdSigner, golden types.ATXID, grpcCfg grpcserver.Config, - svc *grpcserver.PostService, + svc *v1.PostService, ) { tb.Helper() @@ -161,7 +162,7 @@ func TestNIPostBuilderWithClients(t *testing.T) { localDb := localsql.InMemoryTest(t) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) @@ -247,7 +248,7 @@ func Test_NIPostBuilderWithMultipleClients(t *testing.T) { db := statesql.InMemoryTest(t) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/activation/e2e/validation_test.go b/activation/e2e/validation_test.go index 493aa1cf17..17bcd2a1b7 100644 --- a/activation/e2e/validation_test.go +++ b/activation/e2e/validation_test.go @@ -13,7 +13,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" ae2e "github.com/spacemeshos/go-spacemesh/activation/e2e" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql/localsql" @@ -34,7 +34,7 @@ func TestValidator_Validate(t *testing.T) { validator := activation.NewMocknipostValidator(gomock.NewController(t)) opts := testPostSetupOpts(t) - svc := grpcserver.NewPostService(logger, grpcserver.PostServiceQueryInterval(100*time.Millisecond)) + svc := v1.NewPostService(logger, v1.PostServiceQueryInterval(100*time.Millisecond)) svc.AllowConnections(true) grpcCfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) diff --git a/api/grpcserver/http_server_test.go b/api/grpcserver/http_server_test.go index 47747e84ad..533e367e08 100644 --- a/api/grpcserver/http_server_test.go +++ b/api/grpcserver/http_server_test.go @@ -16,6 +16,7 @@ import ( "go.uber.org/zap/zaptest" "google.golang.org/protobuf/encoding/protojson" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/sql/statesql" @@ -59,16 +60,16 @@ func TestJsonApi(t *testing.T) { const build = "cafebabe" ctrl, ctx := gomock.WithContext(context.Background(), t) - peerCounter := NewMockpeerCounter(ctrl) - meshAPIMock := NewMockmeshAPI(ctrl) - genTime := NewMockgenesisTimeAPI(ctrl) - syncer := NewMocksyncer(ctrl) - conStateAPI := NewMockconservativeState(ctrl) + peerCounter := v1.NewMockpeerCounter(ctrl) + meshAPIMock := v1.NewMockmeshAPI(ctrl) + genTime := v1.NewMockgenesisTimeAPI(ctrl) + syncer := v1.NewMocksyncer(ctrl) + conStateAPI := v1.NewMockconservativeState(ctrl) cdb := datastore.NewCachedDB(statesql.InMemoryTest(t), zaptest.NewLogger(t)) t.Cleanup(func() { assert.NoError(t, cdb.Close()) }) - svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, version, build) - svc2 := NewMeshService( + svc1 := v1.NewNodeService(peerCounter, meshAPIMock, genTime, syncer, version, build) + svc2 := v1.NewMeshService( cdb, meshAPIMock, conStateAPI, diff --git a/api/grpcserver/activation_service.go b/api/grpcserver/v1/activation_service.go similarity index 99% rename from api/grpcserver/activation_service.go rename to api/grpcserver/v1/activation_service.go index 08fbecb2df..a24de0e378 100644 --- a/api/grpcserver/activation_service.go +++ b/api/grpcserver/v1/activation_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/activation_service_test.go b/api/grpcserver/v1/activation_service_test.go similarity index 99% rename from api/grpcserver/activation_service_test.go rename to api/grpcserver/v1/activation_service_test.go index 6d65524c03..42d82ad51a 100644 --- a/api/grpcserver/activation_service_test.go +++ b/api/grpcserver/v1/activation_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/admin_service.go b/api/grpcserver/v1/admin_service.go similarity index 99% rename from api/grpcserver/admin_service.go rename to api/grpcserver/v1/admin_service.go index 2c525c85b4..dda9664cd6 100644 --- a/api/grpcserver/admin_service.go +++ b/api/grpcserver/v1/admin_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/admin_service_test.go b/api/grpcserver/v1/admin_service_test.go similarity index 99% rename from api/grpcserver/admin_service_test.go rename to api/grpcserver/v1/admin_service_test.go index b08b84a183..183e43f62f 100644 --- a/api/grpcserver/admin_service_test.go +++ b/api/grpcserver/v1/admin_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/debug_service.go b/api/grpcserver/v1/debug_service.go similarity index 99% rename from api/grpcserver/debug_service.go rename to api/grpcserver/v1/debug_service.go index 787734920d..507627b2b1 100644 --- a/api/grpcserver/debug_service.go +++ b/api/grpcserver/v1/debug_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/events.go b/api/grpcserver/v1/events.go similarity index 98% rename from api/grpcserver/events.go rename to api/grpcserver/v1/events.go index 3ac23caf73..26aa228059 100644 --- a/api/grpcserver/events.go +++ b/api/grpcserver/v1/events.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/globalstate_service.go b/api/grpcserver/v1/globalstate_service.go similarity index 99% rename from api/grpcserver/globalstate_service.go rename to api/grpcserver/v1/globalstate_service.go index 639f42f31a..dad270859f 100644 --- a/api/grpcserver/globalstate_service.go +++ b/api/grpcserver/v1/globalstate_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/globalstate_service_test.go b/api/grpcserver/v1/globalstate_service_test.go similarity index 99% rename from api/grpcserver/globalstate_service_test.go rename to api/grpcserver/v1/globalstate_service_test.go index 08255f8456..728c7bab1b 100644 --- a/api/grpcserver/globalstate_service_test.go +++ b/api/grpcserver/v1/globalstate_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/grpcserver_test.go b/api/grpcserver/v1/grpcserver_test.go similarity index 97% rename from api/grpcserver/grpcserver_test.go rename to api/grpcserver/v1/grpcserver_test.go index 4b9e15c0bd..53764fc00a 100644 --- a/api/grpcserver/grpcserver_test.go +++ b/api/grpcserver/v1/grpcserver_test.go @@ -1,17 +1,14 @@ -package grpcserver +package v1 import ( "context" "errors" - "fmt" "io" "log" "math" "math/big" - "net" "os" "path/filepath" - "strconv" "testing" "time" @@ -28,13 +25,12 @@ import ( "go.uber.org/zap/zaptest/observer" "golang.org/x/sync/errgroup" "google.golang.org/genproto/googleapis/rpc/code" - "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "github.com/spacemeshos/go-spacemesh/activation" + "github.com/spacemeshos/go-spacemesh/api/grpcserver" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" @@ -122,17 +118,6 @@ func genLayerBlock(layerID types.LayerID, txs []types.TransactionID) *types.Bloc return b } -func dialGrpc(tb testing.TB, cfg Config) *grpc.ClientConn { - tb.Helper() - conn, err := grpc.NewClient( - cfg.PublicListener, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - require.NoError(tb, err) - tb.Cleanup(func() { require.NoError(tb, conn.Close()) }) - return conn -} - func TestMain(m *testing.M) { types.SetLayersPerEpoch(layersPerEpoch) @@ -399,47 +384,6 @@ func NewTx(nonce uint64, recipient types.Address, signer *signing.EdSigner) *typ return &tx } -func launchServer(tb testing.TB, services ...ServiceAPI) (Config, func()) { - cfg := DefaultTestConfig() - grpcService, err := NewWithServices(cfg.PublicListener, zaptest.NewLogger(tb).Named("grpc"), cfg, services) - require.NoError(tb, err) - - // start gRPC server - require.NoError(tb, grpcService.Start()) - - // update config with bound addresses - cfg.PublicListener = grpcService.BoundAddress - - return cfg, func() { assert.NoError(tb, grpcService.Close()) } -} - -func getFreePort(optionalPort int) (int, error) { - l, e := net.Listen("tcp", fmt.Sprintf(":%v", optionalPort)) - if e != nil { - l, e = net.Listen("tcp", ":0") - if e != nil { - return 0, fmt.Errorf("listen TCP: %w", e) - } - } - defer l.Close() - return l.Addr().(*net.TCPAddr).Port, nil -} - -func TestNewServersConfig(t *testing.T) { - port1, err := getFreePort(0) - require.NoError(t, err, "Should be able to establish a connection on a port") - - port2, err := getFreePort(0) - require.NoError(t, err, "Should be able to establish a connection on a port") - - grpcService := New(fmt.Sprintf(":%d", port1), zaptest.NewLogger(t).Named("grpc"), DefaultTestConfig()) - jsonService := NewJSONHTTPServer(zaptest.NewLogger(t).Named("grpc.JSON"), fmt.Sprintf(":%d", port2), - []string{}, false) - - require.Contains(t, grpcService.listener, strconv.Itoa(port1), "Expected same port") - require.Contains(t, jsonService.listener, strconv.Itoa(port2), "Expected same port") -} - func TestNewLocalServer(t *testing.T) { tt := []struct { name string @@ -483,10 +427,10 @@ func TestNewLocalServer(t *testing.T) { genTime := NewMockgenesisTimeAPI(ctrl) syncer := NewMocksyncer(ctrl) - cfg := DefaultTestConfig() + cfg := grpcserver.DefaultTestConfig() cfg.PostListener = tc.listener svc := NewNodeService(peerCounter, meshApi, genTime, syncer, "v0.0.0", "cafebabe") - grpcService, err := NewWithServices(cfg.PostListener, logger, cfg, []ServiceAPI{svc}) + _, err := grpcserver.NewWithServices(cfg.PostListener, logger, cfg, []grpcserver.ServiceAPI{svc}) if tc.warn { require.Equal(t, 1, observedLogs.Len(), "Expected a warning log") require.Equal(t, "unsecured grpc server is listening on a public IP address", @@ -495,9 +439,7 @@ func TestNewLocalServer(t *testing.T) { require.Equal(t, tc.listener, observedLogs.All()[0].ContextMap()["address"]) return } - require.NoError(t, err) - require.Equal(t, grpcService.listener, tc.listener, "expected same listener") }) } } diff --git a/api/grpcserver/interface.go b/api/grpcserver/v1/interface.go similarity index 96% rename from api/grpcserver/interface.go rename to api/grpcserver/v1/interface.go index bfa88836b9..b5b841525b 100644 --- a/api/grpcserver/interface.go +++ b/api/grpcserver/v1/interface.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" @@ -15,7 +15,7 @@ import ( "github.com/spacemeshos/go-spacemesh/system" ) -//go:generate mockgen -typed -package=grpcserver -destination=./mocks.go -source=./interface.go +//go:generate mockgen -typed -package=v1 -destination=./mocks.go -source=./interface.go // networkInfo interface. type networkInfo interface { diff --git a/api/grpcserver/mesh_service.go b/api/grpcserver/v1/mesh_service.go similarity index 99% rename from api/grpcserver/mesh_service.go rename to api/grpcserver/v1/mesh_service.go index 5243dbd411..35a337ed09 100644 --- a/api/grpcserver/mesh_service.go +++ b/api/grpcserver/v1/mesh_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/mesh_service_test.go b/api/grpcserver/v1/mesh_service_test.go similarity index 99% rename from api/grpcserver/mesh_service_test.go rename to api/grpcserver/v1/mesh_service_test.go index 988750b7cc..c50c163cfe 100644 --- a/api/grpcserver/mesh_service_test.go +++ b/api/grpcserver/v1/mesh_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/mocks.go b/api/grpcserver/v1/mocks.go similarity index 99% rename from api/grpcserver/mocks.go rename to api/grpcserver/v1/mocks.go index 3867be40c6..22889853b4 100644 --- a/api/grpcserver/mocks.go +++ b/api/grpcserver/v1/mocks.go @@ -3,11 +3,11 @@ // // Generated by this command: // -// mockgen -typed -package=grpcserver -destination=./mocks.go -source=./interface.go +// mockgen -typed -package=v1 -destination=./mocks.go -source=./interface.go // // Package grpcserver is a generated GoMock package. -package grpcserver +package v1 import ( context "context" diff --git a/api/grpcserver/node_service.go b/api/grpcserver/v1/node_service.go similarity index 99% rename from api/grpcserver/node_service.go rename to api/grpcserver/v1/node_service.go index 2fc7c2145b..f2139a3bd3 100644 --- a/api/grpcserver/node_service.go +++ b/api/grpcserver/v1/node_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/node_service_test.go b/api/grpcserver/v1/node_service_test.go similarity index 99% rename from api/grpcserver/node_service_test.go rename to api/grpcserver/v1/node_service_test.go index d9b937868f..8132e57998 100644 --- a/api/grpcserver/node_service_test.go +++ b/api/grpcserver/v1/node_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/post_client.go b/api/grpcserver/v1/post_client.go similarity index 99% rename from api/grpcserver/post_client.go rename to api/grpcserver/v1/post_client.go index 4810ad8c92..c19fc0c9f8 100644 --- a/api/grpcserver/post_client.go +++ b/api/grpcserver/v1/post_client.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "bytes" diff --git a/api/grpcserver/post_info_service.go b/api/grpcserver/v1/post_info_service.go similarity index 98% rename from api/grpcserver/post_info_service.go rename to api/grpcserver/v1/post_info_service.go index 284a459f90..41b99a85ff 100644 --- a/api/grpcserver/post_info_service.go +++ b/api/grpcserver/v1/post_info_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/post_info_service_test.go b/api/grpcserver/v1/post_info_service_test.go similarity index 98% rename from api/grpcserver/post_info_service_test.go rename to api/grpcserver/v1/post_info_service_test.go index 319e5b413b..12b4737e6a 100644 --- a/api/grpcserver/post_info_service_test.go +++ b/api/grpcserver/v1/post_info_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/post_service.go b/api/grpcserver/v1/post_service.go similarity index 99% rename from api/grpcserver/post_service.go rename to api/grpcserver/v1/post_service.go index 102e366fc2..d1591dc7e9 100644 --- a/api/grpcserver/post_service.go +++ b/api/grpcserver/v1/post_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/post_service_test.go b/api/grpcserver/v1/post_service_test.go similarity index 99% rename from api/grpcserver/post_service_test.go rename to api/grpcserver/v1/post_service_test.go index 856520dfa9..e55a85393f 100644 --- a/api/grpcserver/post_service_test.go +++ b/api/grpcserver/v1/post_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/status" "github.com/spacemeshos/go-spacemesh/activation" + "github.com/spacemeshos/go-spacemesh/api/grpcserver" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" @@ -29,7 +30,7 @@ import ( func launchPostSupervisor( tb testing.TB, log *zap.Logger, - cfg Config, + cfg grpcserver.Config, serviceCfg activation.PostSupervisorConfig, postOpts activation.PostSetupOpts, ) (types.NodeID, func()) { @@ -74,7 +75,7 @@ func launchPostSupervisor( func launchPostSupervisorTLS( tb testing.TB, log *zap.Logger, - cfg Config, + cfg grpcserver.Config, serviceCfg activation.PostSupervisorConfig, postOpts activation.PostSetupOpts, ) (types.NodeID, func()) { diff --git a/api/grpcserver/smesher_service.go b/api/grpcserver/v1/smesher_service.go similarity index 99% rename from api/grpcserver/smesher_service.go rename to api/grpcserver/v1/smesher_service.go index 3a557f41b4..6562590405 100644 --- a/api/grpcserver/smesher_service.go +++ b/api/grpcserver/v1/smesher_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/smesher_service_test.go b/api/grpcserver/v1/smesher_service_test.go similarity index 87% rename from api/grpcserver/smesher_service_test.go rename to api/grpcserver/v1/smesher_service_test.go index d3c100da4d..913b29f3a5 100644 --- a/api/grpcserver/smesher_service_test.go +++ b/api/grpcserver/v1/smesher_service_test.go @@ -1,4 +1,4 @@ -package grpcserver_test +package v1_test import ( "context" @@ -15,7 +15,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "github.com/spacemeshos/go-spacemesh/activation" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" ) @@ -23,9 +23,9 @@ import ( func TestPostConfig(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) - svc := grpcserver.NewSmesherService( + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -56,12 +56,12 @@ func TestPostConfig(t *testing.T) { func TestStartSmeshingPassesCorrectSmeshingOpts(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) sig, err := signing.NewEdSigner() require.NoError(t, err) cmdCfg := activation.DefaultTestPostServiceConfig() - svc := grpcserver.NewSmesherService( + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -104,11 +104,11 @@ func TestStartSmeshingPassesCorrectSmeshingOpts(t *testing.T) { func TestStartSmeshing_ErrorOnMissingPostServiceConfig(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) sig, err := signing.NewEdSigner() require.NoError(t, err) - svc := grpcserver.NewSmesherService( + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -145,9 +145,9 @@ func TestStartSmeshing_ErrorOnMissingPostServiceConfig(t *testing.T) { func TestStartSmeshing_ErrorOnMultiSmeshingSetup(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) - svc := grpcserver.NewSmesherService( + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -186,9 +186,9 @@ func TestStartSmeshing_ErrorOnMultiSmeshingSetup(t *testing.T) { func TestSmesherService_PostSetupProviders(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) - svc := grpcserver.NewSmesherService( + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -235,9 +235,9 @@ func TestSmesherService_PostSetupStatus(t *testing.T) { t.Run("completed", func(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) - svc := grpcserver.NewSmesherService( + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -261,9 +261,9 @@ func TestSmesherService_PostSetupStatus(t *testing.T) { t.Run("completed with last Opts", func(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) - svc := grpcserver.NewSmesherService( + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -300,9 +300,9 @@ func TestSmesherService_PostSetupStatus(t *testing.T) { t.Run("in progress", func(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) - svc := grpcserver.NewSmesherService( + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, @@ -340,9 +340,9 @@ func TestSmesherService_PostSetupStatus(t *testing.T) { func TestSmesherService_SmesherID(t *testing.T) { ctrl := gomock.NewController(t) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - postSupervisor := grpcserver.NewMockpostSupervisor(ctrl) - grpcPostService := grpcserver.NewMockgrpcPostService(ctrl) - svc := grpcserver.NewSmesherService( + postSupervisor := v1.NewMockpostSupervisor(ctrl) + grpcPostService := v1.NewMockgrpcPostService(ctrl) + svc := v1.NewSmesherService( smeshingProvider, postSupervisor, grpcPostService, diff --git a/api/grpcserver/transaction_service.go b/api/grpcserver/v1/transaction_service.go similarity index 99% rename from api/grpcserver/transaction_service.go rename to api/grpcserver/v1/transaction_service.go index c56f96af8b..ef41b0e202 100644 --- a/api/grpcserver/transaction_service.go +++ b/api/grpcserver/v1/transaction_service.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "bytes" diff --git a/api/grpcserver/transaction_service_test.go b/api/grpcserver/v1/transaction_service_test.go similarity index 99% rename from api/grpcserver/transaction_service_test.go rename to api/grpcserver/v1/transaction_service_test.go index f6a6786154..12bfcb3ce9 100644 --- a/api/grpcserver/transaction_service_test.go +++ b/api/grpcserver/v1/transaction_service_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "context" diff --git a/api/grpcserver/grpcserver_tls_test.go b/api/grpcserver/v1/v1_test.go similarity index 80% rename from api/grpcserver/grpcserver_tls_test.go rename to api/grpcserver/v1/v1_test.go index b637c06c4f..cdae4ef3a1 100644 --- a/api/grpcserver/grpcserver_tls_test.go +++ b/api/grpcserver/v1/v1_test.go @@ -1,4 +1,4 @@ -package grpcserver +package v1 import ( "crypto/rand" @@ -17,6 +17,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/spacemeshos/go-spacemesh/api/grpcserver" ) const ( @@ -28,38 +32,34 @@ const ( clientKeyName = "client.key" ) -func genPrivateKey(tb testing.TB, path string) *rsa.PrivateKey { - caKey, err := rsa.GenerateKey(rand.Reader, 4096) +func launchServer(tb testing.TB, services ...grpcserver.ServiceAPI) (grpcserver.Config, func()) { + cfg := grpcserver.DefaultTestConfig() + grpcService, err := grpcserver.NewWithServices( + cfg.PublicListener, + zaptest.NewLogger(tb).Named("grpc"), + cfg, + services, + ) require.NoError(tb, err) - f, err := os.Create(path) - require.NoError(tb, err) - defer f.Close() - require.NoError(tb, pem.Encode(f, &pem.Block{ - Type: "RSA PRIVATE KEY", - Bytes: x509.MarshalPKCS1PrivateKey(caKey), - })) - return caKey -} + // start gRPC server + require.NoError(tb, grpcService.Start()) -func genCertificate( - tb testing.TB, - template, - parent *x509.Certificate, - pub *rsa.PublicKey, - priv *rsa.PrivateKey, - path string, -) { - caBytes, err := x509.CreateCertificate(rand.Reader, template, parent, pub, priv) - require.NoError(tb, err) + // update config with bound addresses + cfg.PublicListener = grpcService.BoundAddress - f, err := os.Create(path) + return cfg, func() { assert.NoError(tb, grpcService.Close()) } +} + +func dialGrpc(tb testing.TB, cfg grpcserver.Config) *grpc.ClientConn { + tb.Helper() + conn, err := grpc.NewClient( + cfg.PublicListener, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) require.NoError(tb, err) - defer f.Close() - require.NoError(tb, pem.Encode(f, &pem.Block{ - Type: "CERTIFICATE", - Bytes: caBytes, - })) + tb.Cleanup(func() { require.NoError(tb, conn.Close()) }) + return conn } func genKeys(tb testing.TB) string { @@ -142,18 +142,52 @@ func genKeys(tb testing.TB) string { return dir } -func launchTLSServer(tb testing.TB, certDir string, services ...ServiceAPI) (Config, func()) { +func genPrivateKey(tb testing.TB, path string) *rsa.PrivateKey { + caKey, err := rsa.GenerateKey(rand.Reader, 4096) + require.NoError(tb, err) + + f, err := os.Create(path) + require.NoError(tb, err) + defer f.Close() + require.NoError(tb, pem.Encode(f, &pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(caKey), + })) + return caKey +} + +func genCertificate( + tb testing.TB, + template, + parent *x509.Certificate, + pub *rsa.PublicKey, + priv *rsa.PrivateKey, + path string, +) { + caBytes, err := x509.CreateCertificate(rand.Reader, template, parent, pub, priv) + require.NoError(tb, err) + + f, err := os.Create(path) + require.NoError(tb, err) + defer f.Close() + require.NoError(tb, pem.Encode(f, &pem.Block{ + Type: "CERTIFICATE", + Bytes: caBytes, + })) +} + +func launchTLSServer(tb testing.TB, certDir string, services ...grpcserver.ServiceAPI) (grpcserver.Config, func()) { caCert := filepath.Join(certDir, caCertName) serverCert := filepath.Join(certDir, serverCertName) serverKey := filepath.Join(certDir, serverKeyName) - cfg := DefaultTestConfig() + cfg := grpcserver.DefaultTestConfig() cfg.TLSListener = "127.0.0.1:0" cfg.TLSCACert = caCert cfg.TLSCert = serverCert cfg.TLSKey = serverKey - grpcService, err := NewTLS(zaptest.NewLogger(tb).Named("grpc.TLS"), cfg, services) + grpcService, err := grpcserver.NewTLS(zaptest.NewLogger(tb).Named("grpc.TLS"), cfg, services) require.NoError(tb, err) // start gRPC server diff --git a/cmd/bootstrapper/generator_test.go b/cmd/bootstrapper/generator_test.go index 4651da4241..da328eee0d 100644 --- a/cmd/bootstrapper/generator_test.go +++ b/cmd/bootstrapper/generator_test.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap/zaptest" "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/bootstrap" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" @@ -67,9 +68,10 @@ func launchServer(tb testing.TB, db sql.StateDatabase) (grpcserver.Config, func( []string{}, false) cdb := datastore.NewCachedDB(db, zaptest.NewLogger(tb)) tb.Cleanup(func() { assert.NoError(tb, cdb.Close()) }) - s := grpcserver.NewMeshService(cdb, grpcserver.NewMockmeshAPI(gomock.NewController(tb)), nil, nil, - 0, types.Hash20{}, 0, 0, 0) - + s := v1.NewMeshService(cdb, + v1.NewMockmeshAPI(gomock.NewController(tb)), + nil, nil, 0, types.Hash20{}, 0, 0, 0, + ) pb.RegisterMeshServiceServer(grpcService.GrpcServer, s) // start gRPC and json servers err := grpcService.Start() diff --git a/node/node.go b/node/node.go index ff7ccda2e4..3148485542 100644 --- a/node/node.go +++ b/node/node.go @@ -38,6 +38,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/api/grpcserver/v2alpha1" "github.com/spacemeshos/go-spacemesh/api/grpcserver/v2beta1" "github.com/spacemeshos/go-spacemesh/atxsdata" @@ -1099,7 +1100,7 @@ func (app *App) initServices(ctx context.Context) error { nipostBuilder, err := activation.NewNIPostBuilder( app.localDB, - grpcPostService.(*grpcserver.PostService), + grpcPostService.(*v1.PostService), nipostLogger, app.Config.POET, app.clock, @@ -1495,15 +1496,15 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv switch svc { case grpcserver.Debug: - service := grpcserver.NewDebugService(app.db, app.conState, app.host, app.hOracle, app.loggers) + service := v1.NewDebugService(app.db, app.conState, app.host, app.hOracle, app.loggers) app.grpcServices[svc] = service return service, nil case grpcserver.GlobalState: - service := grpcserver.NewGlobalStateService(app.mesh, app.conState) + service := v1.NewGlobalStateService(app.mesh, app.conState) app.grpcServices[svc] = service return service, nil case grpcserver.Mesh: - service := grpcserver.NewMeshService( + service := v1.NewMeshService( app.cachedDB, app.mesh, app.conState, @@ -1517,7 +1518,7 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv app.grpcServices[svc] = service return service, nil case grpcserver.Node: - service := grpcserver.NewNodeService( + service := v1.NewNodeService( app.host, app.mesh, app.clock, @@ -1528,7 +1529,7 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv app.grpcServices[svc] = service return service, nil case grpcserver.Admin: - service := grpcserver.NewAdminService(app.db, app.Config.DataDir(), app.host) + service := v1.NewAdminService(app.db, app.Config.DataDir(), app.host) app.grpcServices[svc] = service return service, nil case grpcserver.Smesher: @@ -1541,10 +1542,10 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv if err != nil { return nil, err } - service := grpcserver.NewSmesherService( + service := v1.NewSmesherService( app.atxBuilder, app.postSupervisor, - postService.(*grpcserver.PostService), + postService.(*v1.PostService), app.Config.API.SmesherStreamInterval, app.Config.SMESHING.Opts, sig, @@ -1552,7 +1553,7 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv app.grpcServices[svc] = service return service, nil case grpcserver.Post: - service := grpcserver.NewPostService(app.addLogger(PostServiceLogger, lg).Zap()) + service := v1.NewPostService(app.addLogger(PostServiceLogger, lg).Zap()) isCoinbaseSet := app.Config.SMESHING.CoinbaseAccount != "" if !isCoinbaseSet { lg.Warning("coinbase account is not set, connections from remote post services will be rejected") @@ -1561,11 +1562,11 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv app.grpcServices[svc] = service return service, nil case grpcserver.PostInfo: - service := grpcserver.NewPostInfoService(app.atxBuilder) + service := v1.NewPostInfoService(app.atxBuilder) app.grpcServices[svc] = service return service, nil case grpcserver.Transaction: - service := grpcserver.NewTransactionService( + service := v1.NewTransactionService( app.db, app.host, app.mesh, @@ -1576,7 +1577,7 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv app.grpcServices[svc] = service return service, nil case grpcserver.Activation: - service := grpcserver.NewActivationService(app.cachedDB, types.ATXID(app.Config.Genesis.GoldenATX())) + service := v1.NewActivationService(app.cachedDB, types.ATXID(app.Config.Genesis.GoldenATX())) app.grpcServices[svc] = service return service, nil case v2alpha1.Activation: @@ -1851,7 +1852,7 @@ func (app *App) startAPIServices(ctx context.Context) error { if err != nil { return err } - svc.(*grpcserver.SmesherService).SetPostServiceConfig(app.Config.POSTService) + svc.(*v1.SmesherService).SetPostServiceConfig(app.Config.POSTService) if app.Config.SMESHING.Start { if app.Config.SMESHING.CoinbaseAccount == "" { return errors.New("smeshing enabled but no coinbase account provided") diff --git a/systest/tests/distributed_post_verification_test.go b/systest/tests/distributed_post_verification_test.go index 4b50846fda..cd2de59db9 100644 --- a/systest/tests/distributed_post_verification_test.go +++ b/systest/tests/distributed_post_verification_test.go @@ -27,6 +27,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" "github.com/spacemeshos/go-spacemesh/activation/wire" "github.com/spacemeshos/go-spacemesh/api/grpcserver" + v1 "github.com/spacemeshos/go-spacemesh/api/grpcserver/v1" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -207,9 +208,9 @@ func testPostMalfeasance( defer postSupervisor.Stop(false) // 2. create ATX with invalid POST labels - grpcPostService := grpcserver.NewPostService( + grpcPostService := v1.NewPostService( logger.Named("grpc-post-service"), - grpcserver.PostServiceQueryInterval(500*time.Millisecond), + v1.PostServiceQueryInterval(500*time.Millisecond), ) grpcPostService.AllowConnections(true) From a15474b7c2264146c789676e9bceff83c3596c5c Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Wed, 5 Feb 2025 17:54:15 +0000 Subject: [PATCH 2/6] Migrate system tests to v2 API --- systest/tests/checkpoint_test.go | 9 +- systest/tests/common.go | 268 ++++++------------ .../distributed_post_verification_test.go | 17 +- systest/tests/equivocation_test.go | 15 +- systest/tests/nodes_test.go | 21 +- systest/tests/partition_test.go | 18 +- systest/tests/poets_test.go | 12 +- systest/tests/smeshing_test.go | 5 +- systest/tests/steps_test.go | 46 +-- systest/tests/timeskew_test.go | 12 +- systest/tests/transactions_test.go | 37 ++- 11 files changed, 193 insertions(+), 267 deletions(-) diff --git a/systest/tests/checkpoint_test.go b/systest/tests/checkpoint_test.go index a220ad2325..9c8e20771a 100644 --- a/systest/tests/checkpoint_test.go +++ b/systest/tests/checkpoint_test.go @@ -69,7 +69,14 @@ func TestCheckpoint(t *testing.T) { ctx, cancel := context.WithDeadline(tctx, deadline) defer cancel() require.NoError(t, sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, 1, 100)) - require.NoError(t, waitLayer(tctx, cl.Client(0), snapshotLayer)) + + layerTime := cl.Genesis().Add(time.Duration(snapshotLayer) * layerDuration) + tctx.Log.Debugw("waiting for layer", "layer", snapshotLayer, "layer time", layerTime) + select { + case <-tctx.Done(): + require.FailNow(t, "test context is done") + case <-time.After(time.Until(layerTime)): + } tctx.Log.Debugw("getting account balances") before, err := getBalance(tctx, cl, snapshotLayer) diff --git a/systest/tests/common.go b/systest/tests/common.go index 99950fb61b..38727600b8 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -46,11 +46,11 @@ func sendTransactions( if err != nil { return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i), err) } - watchLayers(ctx, eg, client, logger, func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number >= stop { + watchLayers(ctx, eg, client, logger, func(layer *pb2.Layer) (bool, error) { + if layer.Number >= stop { return false, nil } - if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPROVED || layer.Layer.Number.Number < first { + if layer.Status != pb2.Layer_LAYER_STATUS_APPLIED || layer.Number < first { return true, nil } // give some time for a previous layer to be applied @@ -62,14 +62,19 @@ func sendTransactions( zap.String("client", client.Name), zap.Stringer("address", cl.Address(i)), ) - if err := submitSpawn(ctx, cl, i, client); err != nil { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if _, err := submitTransaction(ctx, + wallet.SelfSpawn(cl.Private(i), 0, sdk.WithGenesisID(cl.GenesisID())), + client, + ); err != nil { return false, fmt.Errorf("failed to spawn %w", err) } nonce++ return true, nil } logger.Debug("submitting transactions", - zap.Uint32("layer", layer.Layer.Number.Number), + zap.Uint32("layer", layer.Number), zap.String("client", client.Name), zap.Stringer("address", cl.Address(i)), zap.Uint64("nonce", nonce), @@ -102,67 +107,17 @@ func sendTransactions( } func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient) ([]byte, error) { - txclient := pb.NewTransactionServiceClient(node.PubConn()) + client := pb2.NewTransactionServiceClient(node.PubConn()) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - response, err := txclient.SubmitTransaction(ctx, &pb.SubmitTransactionRequest{Transaction: tx}) + resp, err := client.SubmitTransaction(ctx, &pb2.SubmitTransactionRequest{Transaction: tx}) if err != nil { return nil, err } - if response.Txstate == nil { - return nil, errors.New("tx state should not be nil") - } - return response.Txstate.Id.Id, nil -} - -func stateHashStream( - ctx context.Context, - node *cluster.NodeClient, - logger *zap.Logger, - collector func(*pb.GlobalStateStreamResponse) (bool, error), -) error { - retries := 0 -BACKOFF: - stateapi := pb.NewGlobalStateServiceClient(node.PubConn()) - states, err := stateapi.GlobalStateStream(ctx, &pb.GlobalStateStreamRequest{ - GlobalStateDataFlags: uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_GLOBAL_STATE_HASH), - }) - if err != nil { - return err - } - for { - state, err := states.Recv() - s, ok := status.FromError(err) - if !ok { - if ctx.Err() != nil { - return ctx.Err() - } - return fmt.Errorf("unknown error: %w", err) - } - switch s.Code() { - case codes.OK: - if cont, err := collector(state); !cont { - return err - } - case codes.Canceled, codes.DeadlineExceeded: - return nil - case codes.Unavailable: - if retries == attempts { - return errors.New("state stream unavailable") - } - retries++ - time.Sleep(retryBackoff) - goto BACKOFF - default: - logger.Warn( - "global state stream error", - zap.String("client", node.Name), - zap.Error(err), - zap.Any("status", s), - ) - return fmt.Errorf("stream err from client %v: %w", node.Name, err) - } + if resp.TxId == nil { + return nil, errors.New("tx id should not be nil") } + return resp.TxId, nil } func watchLayers( @@ -170,7 +125,7 @@ func watchLayers( eg *errgroup.Group, node *cluster.NodeClient, logger *zap.Logger, - collector func(*pb.LayerStreamResponse) (bool, error), + collector func(*pb2.Layer) (bool, error), ) { eg.Go(func() error { return layersStream(ctx, node, logger, collector) @@ -181,17 +136,20 @@ func layersStream( ctx context.Context, node *cluster.NodeClient, logger *zap.Logger, - collector func(*pb.LayerStreamResponse) (bool, error), + collector func(*pb2.Layer) (bool, error), ) error { retries := 0 BACKOFF: - meshapi := pb.NewMeshServiceClient(node.PubConn()) - layers, err := meshapi.LayerStream(ctx, &pb.LayerStreamRequest{}) + client := pb2.NewLayerStreamServiceClient(node.PubConn()) + stream, err := client.Stream(ctx, &pb2.LayerStreamRequest{ + Watch: true, + }) if err != nil { return err } + defer stream.CloseSend() for { - layer, err := layers.Recv() + layer, err := stream.Recv() s, ok := status.FromError(err) if !ok { if ctx.Err() != nil { @@ -233,13 +191,16 @@ func malfeasanceStream( ) error { retries := 0 BACKOFF: - malapi := pb2.NewMalfeasanceStreamServiceClient(node.PrivConn()) - proofs, err := malapi.Stream(ctx, &pb2.MalfeasanceStreamRequest{Watch: true}) + client := pb2.NewMalfeasanceStreamServiceClient(node.PrivConn()) + stream, err := client.Stream(ctx, &pb2.MalfeasanceStreamRequest{ + Watch: true, + }) if err != nil { return err } + defer stream.CloseSend() for { - proof, err := proofs.Recv() + proof, err := stream.Recv() s, ok := status.FromError(err) if !ok { if ctx.Err() != nil { @@ -273,58 +234,20 @@ BACKOFF: } } -func waitGenesis(ctx *testcontext.Context, node *cluster.NodeClient) error { - svc := pb.NewMeshServiceClient(node.PubConn()) - resp, err := svc.GenesisTime(ctx, &pb.GenesisTimeRequest{}) - if err != nil { - return err - } - genesis := time.Unix(int64(resp.Unixtime.Value), 0) - now := time.Now() - if !genesis.After(now) { - return nil - } - ctx.Log.Debugw("waiting for genesis", "now", now, "genesis", genesis) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(genesis.Sub(now)): - return nil - } -} - -func waitLayer(ctx *testcontext.Context, node *cluster.NodeClient, lid uint32) error { - svc := pb.NewMeshServiceClient(node.PubConn()) - resp, err := svc.GenesisTime(ctx, &pb.GenesisTimeRequest{}) - if err != nil { - return err - } - lyrTime := time.Unix(int64(resp.Unixtime.Value), 0). - Add(time.Duration(lid) * testcontext.LayerDuration.Get(ctx.Parameters)) - - now := time.Now() - if !lyrTime.After(now) { - return nil - } - ctx.Log.Debugw("waiting for layer", "now", now, "layer time", lyrTime, "layer", lid) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(lyrTime.Sub(now)): - return nil - } -} - -func waitTransaction(ctx context.Context, eg *errgroup.Group, client *cluster.NodeClient, id []byte) { +func waitTransaction(ctx context.Context, eg *errgroup.Group, node *cluster.NodeClient, id []byte) { eg.Go(func() error { - api := pb.NewTransactionServiceClient(client.PubConn()) - rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true, Id: id}) + client := pb2.NewTransactionStreamServiceClient(node.PubConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Watch: true, + Txid: [][]byte{id}, + }) if err != nil { return err } - _, err = rsts.Recv() + defer stream.CloseSend() + _, err = stream.Recv() if err != nil { - return fmt.Errorf("stream error on receiving result %s: %w", client.Name, err) + return fmt.Errorf("stream error on receiving result %s: %w", node.Name, err) } return nil }) @@ -332,19 +255,22 @@ func waitTransaction(ctx context.Context, eg *errgroup.Group, client *cluster.No func watchTransactionResults( ctx context.Context, - client *cluster.NodeClient, + node *cluster.NodeClient, log *zap.Logger, - collector func(*pb.TransactionResult) (bool, error), + collector func(*pb2.TransactionResponse) (bool, error), ) error { retries := 0 BACKOFF: - api := pb.NewTransactionServiceClient(client.PubConn()) - rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true}) + client := pb2.NewTransactionStreamServiceClient(node.PubConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Watch: true, + }) if err != nil { return err } + defer stream.CloseSend() for { - rst, err := rsts.Recv() + rst, err := stream.Recv() s, ok := status.FromError(err) if !ok { if ctx.Err() != nil { @@ -369,11 +295,11 @@ BACKOFF: default: log.Warn( "transactions stream error", - zap.String("client", client.Name), + zap.String("client", node.Name), zap.Error(err), zap.Any("status", s), ) - return fmt.Errorf("stream error on receiving result %s: %w", client.Name, err) + return fmt.Errorf("stream error on receiving result %s: %w", node.Name, err) } } } @@ -381,20 +307,21 @@ BACKOFF: func watchProposals( ctx context.Context, eg *errgroup.Group, - client *cluster.NodeClient, + node *cluster.NodeClient, log *zap.Logger, collector func(*pb.Proposal) (bool, error), ) { eg.Go(func() error { retries := 0 BACKOFF: - dbg := pb.NewDebugServiceClient(client.PrivConn()) - proposals, err := dbg.ProposalsStream(ctx, &emptypb.Empty{}) + client := pb.NewDebugServiceClient(node.PrivConn()) + stream, err := client.ProposalsStream(ctx, &emptypb.Empty{}) if err != nil { - return fmt.Errorf("proposal stream for %s: %w", client.Name, err) + return fmt.Errorf("proposal stream for %s: %w", node.Name, err) } + defer stream.CloseSend() for { - proposal, err := proposals.Recv() + proposal, err := stream.Recv() s, ok := status.FromError(err) if !ok { if ctx.Err() != nil { @@ -419,11 +346,11 @@ func watchProposals( default: log.Warn( "proposals stream error", - zap.String("client", client.Name), + zap.String("client", node.Name), zap.Error(err), zap.Any("status", s), ) - return fmt.Errorf("proposal event for %s: %w", client.Name, err) + return fmt.Errorf("proposal event for %s: %w", node.Name, err) } } }) @@ -442,15 +369,15 @@ func scheduleChaos( action func(context.Context) (chaos.Teardown, error), ) { var teardown chaos.Teardown - watchLayers(ctx, eg, client, logger, func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number == from && teardown == nil { + watchLayers(ctx, eg, client, logger, func(layer *pb2.Layer) (bool, error) { + if layer.Number == from && teardown == nil { var err error teardown, err = action(ctx) if err != nil { return false, err } } - if layer.Layer.Number.Number == to { + if layer.Number == to { if err := teardown(ctx); err != nil { return false, err } @@ -462,9 +389,9 @@ func scheduleChaos( func currentLayer(ctx context.Context, tb testing.TB, client *cluster.NodeClient) uint32 { tb.Helper() - response, err := pb.NewMeshServiceClient(client.PubConn()).CurrentLayer(ctx, &pb.CurrentLayerRequest{}) + resp, err := pb2.NewNodeServiceClient(client.PubConn()).Status(ctx, &pb2.NodeStatusRequest{}) require.NoError(tb, err) - return response.Layernum.Number + return resp.CurrentLayer } func waitAll(tctx *testcontext.Context, cl *cluster.Cluster) error { @@ -484,32 +411,24 @@ func nextFirstLayer(current, size uint32) uint32 { return current } -func getNonce(ctx context.Context, client *cluster.NodeClient, address types.Address) (uint64, error) { - gstate := pb.NewGlobalStateServiceClient(client.PubConn()) - resp, err := gstate.Account(ctx, &pb.AccountRequest{AccountId: &pb.AccountId{Address: address.String()}}) +func getNonce(ctx context.Context, node *cluster.NodeClient, address types.Address) (uint64, error) { + resp, err := pb2.NewAccountServiceClient(node.PubConn()).List(ctx, &pb2.AccountRequest{ + Addresses: []string{address.String()}, + }) if err != nil { return 0, err } - return resp.AccountWrapper.StateProjected.Counter, nil + return resp.Accounts[0].Current.Counter, nil } -func currentBalance(ctx context.Context, client *cluster.NodeClient, address types.Address) (uint64, error) { - gstate := pb.NewGlobalStateServiceClient(client.PubConn()) - resp, err := gstate.Account(ctx, &pb.AccountRequest{AccountId: &pb.AccountId{Address: address.String()}}) +func currentBalance(ctx context.Context, node *cluster.NodeClient, address types.Address) (uint64, error) { + resp, err := pb2.NewAccountServiceClient(node.PubConn()).List(ctx, &pb2.AccountRequest{ + Addresses: []string{address.String()}, + }) if err != nil { return 0, err } - return resp.AccountWrapper.StateCurrent.Balance.Value, nil -} - -func submitSpawn(ctx context.Context, cluster *cluster.Cluster, account int, client *cluster.NodeClient) error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - _, err := submitTransaction(ctx, - wallet.SelfSpawn(cluster.Private(account), 0, sdk.WithGenesisID(cluster.GenesisID())), - client, - ) - return err + return resp.Accounts[0].Current.Balance, nil } func submitSpend( @@ -541,38 +460,37 @@ func syncedNodes(ctx context.Context, cl *cluster.Cluster) []*cluster.NodeClient func isSynced(ctx context.Context, node *cluster.NodeClient) bool { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - svc := pb.NewNodeServiceClient(node.PubConn()) - resp, err := svc.Status(ctx, &pb.StatusRequest{}) + svc := pb2.NewNodeServiceClient(node.PubConn()) + resp, err := svc.Status(ctx, &pb2.NodeStatusRequest{}) if err != nil { return false } - return resp.Status.IsSynced + return resp.Status == pb2.NodeStatusResponse_SYNC_STATUS_SYNCED } -func getLayer(ctx context.Context, node *cluster.NodeClient, lid uint32) (*pb.Layer, error) { +func getLayer(ctx context.Context, node *cluster.NodeClient, lid uint32) (*pb2.Layer, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - layer := &pb.LayerNumber{Number: lid} - msvc := pb.NewMeshServiceClient(node.PubConn()) - lresp, err := msvc.LayersQuery(ctx, &pb.LayersQueryRequest{StartLayer: layer, EndLayer: layer}) + client := pb2.NewLayerServiceClient(node.PubConn()) + resp, err := client.List(ctx, &pb2.LayerRequest{StartLayer: lid, EndLayer: lid}) if err != nil { return nil, err } - if len(lresp.Layer) != 1 { - return nil, fmt.Errorf("request was made for one layer (%d)", layer.Number) + if len(resp.Layers) != 1 { + return nil, fmt.Errorf("request was made for one layer (%d)", lid) } - return lresp.Layer[0], nil + return resp.Layers[0], nil } -func getVerifiedLayer(ctx context.Context, node *cluster.NodeClient) (*pb.Layer, error) { +func getVerifiedLayer(ctx context.Context, node *cluster.NodeClient) (*pb2.Layer, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - svc := pb.NewNodeServiceClient(node.PubConn()) - resp, err := svc.Status(ctx, &pb.StatusRequest{}) + client := pb2.NewNodeServiceClient(node.PubConn()) + resp, err := client.Status(ctx, &pb2.NodeStatusRequest{}) if err != nil { return nil, err } - return getLayer(ctx, node, resp.Status.VerifiedLayer.Number) + return getLayer(ctx, node, resp.AppliedLayer) } type txClient struct { @@ -604,20 +522,21 @@ type txRequest struct { node *cluster.NodeClient txid []byte - rst *pb.TransactionResult + rst *pb2.TransactionResponse } func (r *txRequest) wait(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - client := pb.NewTransactionServiceClient(r.node.PubConn()) - stream, err := client.StreamResults(ctx, &pb.TransactionResultsRequest{ - Id: r.txid, + client := pb2.NewTransactionStreamServiceClient(r.node.PubConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Txid: [][]byte{r.txid}, Watch: true, }) if err != nil { return err } + defer stream.CloseSend() rst, err := stream.Recv() if err != nil { return err @@ -626,17 +545,18 @@ func (r *txRequest) wait(ctx context.Context) error { return nil } -func (r *txRequest) result(ctx context.Context) (*pb.TransactionResult, error) { +func (r *txRequest) result(ctx context.Context) (*pb2.TransactionResponse, error) { if r.rst != nil { return r.rst, nil } - client := pb.NewTransactionServiceClient(r.node.PubConn()) - stream, err := client.StreamResults(ctx, &pb.TransactionResultsRequest{ - Id: r.txid, + client := pb2.NewTransactionStreamServiceClient(r.node.PubConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Txid: [][]byte{r.txid}, }) if err != nil { return nil, err } + defer stream.CloseSend() rst, err := stream.Recv() if err != nil { // eof without result - transaction wasn't applied yet diff --git a/systest/tests/distributed_post_verification_test.go b/systest/tests/distributed_post_verification_test.go index cd2de59db9..f65e167e02 100644 --- a/systest/tests/distributed_post_verification_test.go +++ b/systest/tests/distributed_post_verification_test.go @@ -12,8 +12,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" - pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/spacemeshos/go-scale" "github.com/spacemeshos/post/shared" "github.com/spacemeshos/post/verifying" @@ -358,7 +357,7 @@ func testPostMalfeasance( var ( atx builtAtx - expectedDomain pb2.MalfeasanceProof_MalfeasanceDomain + expectedDomain pb.MalfeasanceProof_MalfeasanceDomain expectedType uint32 ) expectedProperties := make(map[string]string) @@ -375,7 +374,7 @@ func testPostMalfeasance( } watx.Sign(signer) atx = watx - expectedDomain = pb2.MalfeasanceProof_DOMAIN_UNSPECIFIED + expectedDomain = pb.MalfeasanceProof_DOMAIN_UNSPECIFIED expectedType = 4 expectedProperties["atx"] = atx.ID().String() case types.AtxV2: @@ -406,7 +405,7 @@ func testPostMalfeasance( } watx.Sign(signer) atx = watx - expectedDomain = pb2.MalfeasanceProof_DOMAIN_ACTIVATION + expectedDomain = pb.MalfeasanceProof_DOMAIN_ACTIVATION expectedType = 0 expectedProperties["type"] = "InvalidPoSTProof" expectedProperties["atx"] = atx.ID().String() @@ -421,9 +420,9 @@ func testPostMalfeasance( zap.Uint32("epoch", publishEpoch.Uint32()), zap.Uint32("layer", publishEpoch.FirstLayer().Uint32()), ) - err = layersStream(ctx, cl.Client(0), logger, func(resp *pb.LayerStreamResponse) (bool, error) { - logger.Info("new layer", zap.Uint32("layer", resp.Layer.Number.Number)) - return resp.Layer.Number.Number < publishEpoch.FirstLayer().Uint32(), nil + err = layersStream(ctx, cl.Client(0), logger, func(resp *pb.Layer) (bool, error) { + logger.Info("new layer", zap.Uint32("layer", resp.Number)) + return resp.Number < publishEpoch.FirstLayer().Uint32(), nil }) require.NoError(t, err) @@ -451,7 +450,7 @@ func testPostMalfeasance( // 5. Wait for POST malfeasance proof receivedProof := false logger.Info("waiting for malfeasance proof", zap.Duration("timeout", timeout)) - err = malfeasanceStream(publishCtx, cl.Client(0), logger, func(proof *pb2.MalfeasanceProof) (bool, error) { + err = malfeasanceStream(publishCtx, cl.Client(0), logger, func(proof *pb.MalfeasanceProof) (bool, error) { if !bytes.Equal(proof.GetSmesher(), signer.NodeID().Bytes()) { return true, nil } diff --git a/systest/tests/equivocation_test.go b/systest/tests/equivocation_test.go index 7a21628f6a..00a597a21f 100644 --- a/systest/tests/equivocation_test.go +++ b/systest/tests/equivocation_test.go @@ -7,8 +7,7 @@ import ( "time" "github.com/oasisprotocol/curve25519-voi/primitives/ed25519" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" - pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -89,15 +88,15 @@ func TestEquivocation(t *testing.T) { for i := 0; i < cl.Total(); i++ { client := cl.Client(i) results[client.Name] = make(map[int]string) - watchLayers(cctx, &eg, client, cctx.Log.Desugar(), func(resp *pb.LayerStreamResponse) (bool, error) { - if resp.Layer.Status != pb.Layer_LAYER_STATUS_APPLIED { + watchLayers(cctx, &eg, client, cctx.Log.Desugar(), func(resp *pb.Layer) (bool, error) { + if resp.Status != pb.Layer_LAYER_STATUS_VERIFIED { return true, nil } - if resp.Layer.Number.Number > stopTest { + if resp.Number > stopTest { return false, nil } - num := int(resp.Layer.Number.Number) - consensus := types.BytesToHash(resp.Layer.Hash).ShortString() + num := int(resp.Number) + consensus := types.BytesToHash(resp.StateHash).ShortString() cctx.Log.Debugw("consensus hash collected", "client", client.Name, "layer", num, @@ -126,7 +125,7 @@ func TestEquivocation(t *testing.T) { proofs := make([]types.NodeID, 0, len(malfeasants)) ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - malfeasanceStream(ctx, client, cctx.Log.Desugar(), func(proof *pb2.MalfeasanceProof) (bool, error) { + malfeasanceStream(ctx, client, cctx.Log.Desugar(), func(proof *pb.MalfeasanceProof) (bool, error) { malfeasant := proof.GetSmesher() proofs = append(proofs, types.BytesToNodeID(malfeasant)) return len(proofs) < len(malfeasants), nil diff --git a/systest/tests/nodes_test.go b/systest/tests/nodes_test.go index 63f7f7ac2f..79cc57c21b 100644 --- a/systest/tests/nodes_test.go +++ b/systest/tests/nodes_test.go @@ -6,6 +6,7 @@ import ( "testing" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -38,11 +39,11 @@ func TestAddNodes(t *testing.T) { tctx.Log.Info("cluster size changed to ", tctx.ClusterSize) var eg errgroup.Group - watchLayers(tctx, &eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number >= beforeAdding { + watchLayers(tctx, &eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { + if layer.Number >= beforeAdding { tctx.Log.Debugw("adding new smeshers", "n", addedLater, - "layer", layer.Layer.Number, + "layer", layer.Number, ) // the new smeshers will use the old sync protocol return false, cl.AddSmeshers(tctx, addedLater) @@ -136,22 +137,22 @@ func TestFailedNodes(t *testing.T) { } for i := range cl.Total() - failed { client := cl.Client(i) - watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Status == pb.Layer_LAYER_STATUS_APPLIED { + watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { + if layer.Status == pb2.Layer_LAYER_STATUS_VERIFIED { tctx.Log.Debugw( "layer applied", "client", client.Name, "layer", - layer.Layer.Number.Number, + layer.Number, "hash", - prettyHex(layer.Layer.Hash), + prettyHex(layer.StateHash), ) - if layer.Layer.Number.Number == stopLayer { + if layer.Number == stopLayer { return false, nil } - if layer.Layer.Number.Number <= lastLayer { - hashes[i][layer.Layer.Number.Number] = prettyHex(layer.Layer.Hash) + if layer.Number <= lastLayer { + hashes[i][layer.Number] = prettyHex(layer.StateHash) } } return true, nil diff --git a/systest/tests/partition_test.go b/systest/tests/partition_test.go index 06e393e0e5..5950f073dd 100644 --- a/systest/tests/partition_test.go +++ b/systest/tests/partition_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -84,23 +84,19 @@ func testPartition(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster numLayers := stop - types.GetEffectiveGenesis().Uint32() // assuming each client can update state for the same layer up to 10 times stateCh := make(chan *stateUpdate, uint32(cl.Total())*numLayers*10) - tctx.Log.Debug("listening to state hashes...") + tctx.Log.Debug("listening to layers for state hashes...") for i := range cl.Total() { node := cl.Client(i) eg.Go(func() error { - err := stateHashStream(ctx, node, tctx.Log.Desugar(), - func(state *pb.GlobalStateStreamResponse) (bool, error) { - data := state.Datum.Datum - require.IsType(tb, &pb.GlobalStateData_GlobalState{}, data) - - resp := data.(*pb.GlobalStateData_GlobalState) - layer := resp.GlobalState.Layer.Number + err := layersStream(ctx, node, tctx.Log.Desugar(), + func(state *pb.Layer) (bool, error) { + layer := state.Number if layer > stop { return false, nil } - stateHash := types.BytesToHash(resp.GlobalState.RootHash) + stateHash := types.BytesToHash(state.StateHash) tctx.Log.Debugw("state hash collected", "client", node.Name, "layer", layer, @@ -126,7 +122,7 @@ func testPartition(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster }, ) if err != nil { - return fmt.Errorf("state hash stream error for %s: %w", node.Name, err) + return fmt.Errorf("layer stream error for %s: %w", node.Name, err) } return nil }) diff --git a/systest/tests/poets_test.go b/systest/tests/poets_test.go index ba04aa566b..59b4b0690b 100644 --- a/systest/tests/poets_test.go +++ b/systest/tests/poets_test.go @@ -8,6 +8,7 @@ import ( "testing" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -71,25 +72,24 @@ func TestPoetsFailures(t *testing.T) { }) } - watchLayers(ctx, eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { + watchLayers(ctx, eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { // Will kill a poet from time to time - if layer.Layer.Number.Number > last { + if layer.Number > last { tctx.Log.Debug("Poet killer is done") return false, nil } - if layer.Layer.GetStatus() != pb.Layer_LAYER_STATUS_APPLIED { + if layer.Status != pb2.Layer_LAYER_STATUS_VERIFIED { return true, nil } // don't kill a poet if this is not ~middle of epoch - if ((layer.Layer.GetNumber().GetNumber() + layersPerEpoch/2) % layersPerEpoch) != 0 { + if ((layer.Number + layersPerEpoch/2) % layersPerEpoch) != 0 { return true, nil } poetToDelete := cl.Poet(0) - tctx.Log.Debugw("deleting poet pod", "poet", poetToDelete.Name, "layer", layer.Layer.GetNumber().GetNumber()) + tctx.Log.Debugw("deleting poet pod", "poet", poetToDelete.Name, "layer", layer.Number) require.NoError(t, cl.DeletePoet(tctx, 0)) require.NoError(t, cl.AddPoet(tctx)) - return true, nil }) diff --git a/systest/tests/smeshing_test.go b/systest/tests/smeshing_test.go index 2d13a3f494..860bc021b7 100644 --- a/systest/tests/smeshing_test.go +++ b/systest/tests/smeshing_test.go @@ -12,6 +12,7 @@ import ( "github.com/oasisprotocol/curve25519-voi/primitives/ed25519" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -211,8 +212,8 @@ func testVesting(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster, client := cl.Client(i % cl.Total()) eg.Go(func() error { var subeg errgroup.Group - watchLayers(tctx, &subeg, client, tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - return layer.Layer.Number.Number < uint32(acc.start), nil + watchLayers(tctx, &subeg, client, tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { + return layer.Number < uint32(acc.start), nil }) if err := subeg.Wait(); err != nil { return err diff --git a/systest/tests/steps_test.go b/systest/tests/steps_test.go index 5d284ee264..85399f883a 100644 --- a/systest/tests/steps_test.go +++ b/systest/tests/steps_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -100,7 +100,14 @@ func TestStepTransactions(t *testing.T) { tctx := testcontext.New(t, testcontext.SkipClusterLimits()) cl, err := cluster.Reuse(tctx, cluster.WithKeys(tctx.ClusterSize)) require.NoError(t, err) - require.NoError(t, waitGenesis(tctx, cl.Client(0))) + + tctx.Log.Debugw("waiting for genesis", "genesis time", cl.Genesis()) + select { + case <-tctx.Done(): + require.FailNow(t, "context canceled") + case <-time.After(time.Until(cl.Genesis())): // wait for genesis + } + t.Cleanup(cl.CloseClients) clients := make([]*txClient, cl.Accounts()) @@ -142,8 +149,7 @@ func TestStepTransactions(t *testing.T) { if err != nil { return err } - - tctx.Log.Debugw("spawned wallet", "address", client.account, "layer", rst.Layer) + tctx.Log.Debugw("spawned wallet", "address", client.account, "layer", rst.TxResult.Layer) } tctx.Log.Debugw("submitting transactions", "address", client.account, @@ -223,9 +229,9 @@ func TestStepVerifyConsistency(t *testing.T) { require.NoError(t, err) cctx.Log.Debugw("using verified layer as a reference", "node", synced[0].Name, - "layer", reference.Number.Number, - "hash", prettyHex(reference.Hash), - "state hash", prettyHex(reference.RootStateHash), + "layer", reference.Number, + "hash", prettyHex(reference.StateHash), + "state hash", prettyHex(reference.CumulativeStateHash), ) layers := make([]*pb.Layer, len(synced)) @@ -235,22 +241,22 @@ func TestStepVerifyConsistency(t *testing.T) { var eg errgroup.Group for i, node := range synced[1:] { eg.Go(func() error { - layer, err := getLayer(cctx, node, reference.Number.Number) + layer, err := getLayer(cctx, node, reference.Number) if err != nil { return err } layers[i] = layer - if !bytes.Equal(layer.Hash, reference.Hash) { + if !bytes.Equal(layer.StateHash, reference.StateHash) { return fmt.Errorf("hash doesn't match reference %s in layer %d: %x != %x", - node.Name, reference.Number.Number, layer.Hash, reference.Hash) + node.Name, reference.Number, layer.StateHash, reference.StateHash) } - if !bytes.Equal(layer.RootStateHash, reference.RootStateHash) { + if !bytes.Equal(layer.CumulativeStateHash, reference.CumulativeStateHash) { return fmt.Errorf( "state hash doesn't match reference %s in layer %d: %x != %x", node.Name, - reference.Number.Number, - layer.RootStateHash, - reference.RootStateHash, + reference.Number, + layer.CumulativeStateHash, + reference.CumulativeStateHash, ) } return nil @@ -266,12 +272,12 @@ func TestStepVerifyConsistency(t *testing.T) { if i == 0 { continue } - require.NotNil(t, layer, "client %s doesn't have layer %d", - synced[i].Name, reference.Number) - require.Equal(t, reference.Hash, layer.Hash, "consensus hash on client %s", - synced[i].Name) - require.Equal(t, reference.RootStateHash, layer.RootStateHash, "state hash on client %s", - synced[i].Name) + require.NotNil(t, layer, "client %s doesn't have layer %d", synced[i].Name, reference.Number) + require.Equal(t, reference.ConsensusHash, layer.ConsensusHash, "consensus hash on client %s", synced[i].Name) + require.Equal(t, + reference.CumulativeStateHash, layer.CumulativeStateHash, + "state hash on client %s", synced[i].Name, + ) } } diff --git a/systest/tests/timeskew_test.go b/systest/tests/timeskew_test.go index f47b8e9047..f3500c8fc1 100644 --- a/systest/tests/timeskew_test.go +++ b/systest/tests/timeskew_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -70,15 +70,15 @@ func TestShortTimeSkew(t *testing.T) { // abstain on one or two layers. in such case longer delay might be necessary to confirm that layer var confirmed uint32 - watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number >= stopTest { + watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb.Layer) (bool, error) { + if layer.Number >= stopTest { return false, nil } - if layer.Layer.Status == pb.Layer_LAYER_STATUS_APPLIED { + if layer.Status == pb.Layer_LAYER_STATUS_VERIFIED { tctx.Log.Debugw( - "layer applied", "layer", layer.Layer.Number.Number, "hash", prettyHex(layer.Layer.Hash), + "layer applied", "layer", layer.Number, "hash", prettyHex(layer.StateHash), ) - confirmed = layer.Layer.Number.Number + confirmed = layer.Number if confirmed >= stopSkew { return false, nil } diff --git a/systest/tests/transactions_test.go b/systest/tests/transactions_test.go index d8d2bece6d..f273147bf3 100644 --- a/systest/tests/transactions_test.go +++ b/systest/tests/transactions_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -40,13 +40,12 @@ func testTransactions( "expected transactions", expectedCount, ) receiver := types.GenerateAddress([]byte{11, 1, 1}) - state := pb.NewGlobalStateServiceClient(cl.Client(0).PubConn()) - response, err := state.Account( - tctx, - &pb.AccountRequest{AccountId: &pb.AccountId{Address: receiver.String()}}, - ) + state := pb.NewAccountServiceClient(cl.Client(0).PubConn()) + response, err := state.List(tctx, &pb.AccountRequest{ + Addresses: []string{receiver.String()}, + }) require.NoError(tb, err) - before := response.AccountWrapper.StateCurrent.Balance + before := response.Accounts[0].Current.Balance layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) layersPerEpoch := uint32(testcontext.LayersPerEpoch.Get(tctx.Parameters)) @@ -63,11 +62,11 @@ func testTransactions( client := cl.Client(i) eg.Go(func() error { err := watchTransactionResults(ctx, client, tctx.Log.Desugar(), - func(rst *pb.TransactionResult) (bool, error) { + func(rst *pb.TransactionResponse) (bool, error) { txs[i] = append(txs[i], rst.Tx) count := len(txs[i]) tctx.Log.Desugar().Debug("received transaction client", - zap.Uint32("layer", rst.Layer), + zap.Uint32("layer", rst.TxResult.Layer), zap.String("client", client.Name), zap.String("tx", "0x"+hex.EncodeToString(rst.Tx.Id)), zap.Int("count", count), @@ -99,20 +98,18 @@ func testTransactions( diff := batch * amount * int(sendFor-1) * cl.Accounts() for i := 0; i < cl.Total(); i++ { client := cl.Client(i) - state := pb.NewGlobalStateServiceClient(client.PubConn()) - response, err := state.Account( - tctx, - &pb.AccountRequest{AccountId: &pb.AccountId{Address: receiver.String()}}, - ) + state := pb.NewAccountServiceClient(client.PubConn()) + response, err := state.List(tctx, &pb.AccountRequest{ + Addresses: []string{receiver.String()}, + }) require.NoError(tb, err) - after := response.AccountWrapper.StateCurrent.Balance + after := response.Accounts[0].Current.Balance tctx.Log.Infow("receiver state", - "before", before.Value, - "after", after.Value, + "before", before, + "after", after, "expected-diff", diff, - "diff", after.Value-before.Value, + "diff", after-before, ) - require.Equal(tb, int(before.Value)+diff, - int(response.AccountWrapper.StateCurrent.Balance.Value), "client=%s", client.Name) + require.Equal(tb, int(before)+diff, int(after), "client=%s", client.Name) } } From 09497e5307182fe9ca509278f790f4c35893a561 Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Wed, 5 Feb 2025 18:54:30 +0000 Subject: [PATCH 3/6] make generate --- api/grpcserver/v1/mocks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/grpcserver/v1/mocks.go b/api/grpcserver/v1/mocks.go index 22889853b4..234ea8d057 100644 --- a/api/grpcserver/v1/mocks.go +++ b/api/grpcserver/v1/mocks.go @@ -6,7 +6,7 @@ // mockgen -typed -package=v1 -destination=./mocks.go -source=./interface.go // -// Package grpcserver is a generated GoMock package. +// Package v1 is a generated GoMock package. package v1 import ( From 59bc9943c3a3b510fa209cc5eddafb74032f68a7 Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Wed, 5 Feb 2025 19:07:57 +0000 Subject: [PATCH 4/6] Streams are generally on the private connection --- api/grpcserver/config.go | 18 ++++++++++++------ systest/tests/common.go | 10 +++++----- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/api/grpcserver/config.go b/api/grpcserver/config.go index 2e73a6798b..696376a624 100644 --- a/api/grpcserver/config.go +++ b/api/grpcserver/config.go @@ -74,9 +74,12 @@ const ( func DefaultConfig() Config { return Config{ PublicServices: []Service{ - GlobalState, Mesh, Transaction, Node, Activation, ActivationV2Alpha1, - RewardV2Alpha1, NetworkV2Alpha1, NodeV2Alpha1, LayerV2Alpha1, TransactionV2Alpha1, - AccountV2Alpha1, MalfeasanceV2Alpha1, + // v1 + GlobalState, Mesh, Transaction, Node, Activation, + + // v2alpha1 + ActivationV2Alpha1, RewardV2Alpha1, NetworkV2Alpha1, NodeV2Alpha1, + LayerV2Alpha1, TransactionV2Alpha1, AccountV2Alpha1, MalfeasanceV2Alpha1, // v2beta1 ActivationV2Beta1, RewardV2Beta1, NetworkV2Beta1, NodeV2Beta1, @@ -84,9 +87,12 @@ func DefaultConfig() Config { }, PublicListener: "0.0.0.0:9092", PrivateServices: []Service{ - Admin, Smesher, Debug, ActivationStreamV2Alpha1, - RewardStreamV2Alpha1, LayerStreamV2Alpha1, TransactionStreamV2Alpha1, - MalfeasanceStreamV2Alpha1, + // v1 + Admin, Smesher, Debug, + + // v2alpha1 + ActivationStreamV2Alpha1, RewardStreamV2Alpha1, LayerStreamV2Alpha1, + TransactionStreamV2Alpha1, MalfeasanceStreamV2Alpha1, // v2beta1 ActivationStreamV2Beta1, RewardStreamV2Beta1, LayerStreamV2Beta1, diff --git a/systest/tests/common.go b/systest/tests/common.go index 38727600b8..4abcaaa313 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -140,7 +140,7 @@ func layersStream( ) error { retries := 0 BACKOFF: - client := pb2.NewLayerStreamServiceClient(node.PubConn()) + client := pb2.NewLayerStreamServiceClient(node.PrivConn()) stream, err := client.Stream(ctx, &pb2.LayerStreamRequest{ Watch: true, }) @@ -236,7 +236,7 @@ BACKOFF: func waitTransaction(ctx context.Context, eg *errgroup.Group, node *cluster.NodeClient, id []byte) { eg.Go(func() error { - client := pb2.NewTransactionStreamServiceClient(node.PubConn()) + client := pb2.NewTransactionStreamServiceClient(node.PrivConn()) stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ Watch: true, Txid: [][]byte{id}, @@ -261,7 +261,7 @@ func watchTransactionResults( ) error { retries := 0 BACKOFF: - client := pb2.NewTransactionStreamServiceClient(node.PubConn()) + client := pb2.NewTransactionStreamServiceClient(node.PrivConn()) stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ Watch: true, }) @@ -528,7 +528,7 @@ type txRequest struct { func (r *txRequest) wait(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - client := pb2.NewTransactionStreamServiceClient(r.node.PubConn()) + client := pb2.NewTransactionStreamServiceClient(r.node.PrivConn()) stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ Txid: [][]byte{r.txid}, Watch: true, @@ -549,7 +549,7 @@ func (r *txRequest) result(ctx context.Context) (*pb2.TransactionResponse, error if r.rst != nil { return r.rst, nil } - client := pb2.NewTransactionStreamServiceClient(r.node.PubConn()) + client := pb2.NewTransactionStreamServiceClient(r.node.PrivConn()) stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ Txid: [][]byte{r.txid}, }) From d93dda5b05e0ba212fb1849d6fef5738980c418d Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Thu, 6 Feb 2025 09:41:37 +0000 Subject: [PATCH 5/6] Fix getNonce request --- systest/tests/common.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/systest/tests/common.go b/systest/tests/common.go index 4abcaaa313..dff6d5a2b2 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -44,7 +44,7 @@ func sendTransactions( client := cl.Client(i % cl.Total()) nonce, err := getNonce(ctx, client, cl.Address(i)) if err != nil { - return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i), err) + return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i).String(), err) } watchLayers(ctx, eg, client, logger, func(layer *pb2.Layer) (bool, error) { if layer.Number >= stop { @@ -414,6 +414,7 @@ func nextFirstLayer(current, size uint32) uint32 { func getNonce(ctx context.Context, node *cluster.NodeClient, address types.Address) (uint64, error) { resp, err := pb2.NewAccountServiceClient(node.PubConn()).List(ctx, &pb2.AccountRequest{ Addresses: []string{address.String()}, + Limit: 1, }) if err != nil { return 0, err From 0d2a705c13077db93d0ad47925c04d45c455a574 Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Thu, 6 Feb 2025 10:45:33 +0000 Subject: [PATCH 6/6] Add more context to errors --- systest/tests/common.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/systest/tests/common.go b/systest/tests/common.go index dff6d5a2b2..59da672059 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -145,7 +145,7 @@ BACKOFF: Watch: true, }) if err != nil { - return err + return fmt.Errorf("streaming layers for %s: %w", node.Name, err) } defer stream.CloseSend() for { @@ -196,7 +196,7 @@ BACKOFF: Watch: true, }) if err != nil { - return err + return fmt.Errorf("streaming malfeasance for %s: %w", node.Name, err) } defer stream.CloseSend() for { @@ -266,7 +266,7 @@ BACKOFF: Watch: true, }) if err != nil { - return err + return fmt.Errorf("streaming transactions for %s: %w", node.Name, err) } defer stream.CloseSend() for { @@ -317,7 +317,7 @@ func watchProposals( client := pb.NewDebugServiceClient(node.PrivConn()) stream, err := client.ProposalsStream(ctx, &emptypb.Empty{}) if err != nil { - return fmt.Errorf("proposal stream for %s: %w", node.Name, err) + return fmt.Errorf("streaming proposals for %s: %w", node.Name, err) } defer stream.CloseSend() for {