Skip to content

Commit 109b6f8

Browse files
authored
feat: adds HMAC authentication support for catalog remote (#568)
Implements KMS-based HMAC authentication for catalog gRPC connections: - Add HMACAuthConfig to catalog configuration - Implement AWS KMS HMAC signature generation using SHA-256 - Add HMAC signature and timestamp to gRPC metadata headers - Refactor catalog stores to support HMAC-enabled DataAccess calls - Updated CI config code
1 parent d06057a commit 109b6f8

24 files changed

+804
-144
lines changed

.changeset/flat-moose-check.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink-deployments-framework": minor
3+
---
4+
5+
feat: adds HMAC authentication support for catalog remote

datastore/catalog/remote/address_ref_store.go

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -53,26 +53,26 @@ func (s *catalogAddressRefStore) get(
5353
ignoreTransaction bool,
5454
key datastore.AddressRefKey,
5555
) (datastore.AddressRef, error) {
56-
// Create a bidirectional stream
57-
stream, err := s.client.DataAccess()
58-
if err != nil {
59-
return datastore.AddressRef{}, fmt.Errorf("failed to create data access stream: %w", err)
60-
}
61-
6256
// Create the find request with the key converted to a filter
6357
filter := s.keyToFilter(key)
6458
findRequest := &pb.AddressReferenceFindRequest{
6559
KeyFilter: filter,
6660
IgnoreTransaction: ignoreTransaction,
6761
}
6862

69-
// Send the request
63+
// Create the request
7064
request := &pb.DataAccessRequest{
7165
Operation: &pb.DataAccessRequest_AddressReferenceFindRequest{
7266
AddressReferenceFindRequest: findRequest,
7367
},
7468
}
7569

70+
// Create a bidirectional stream with the initial request for HMAC
71+
stream, err := s.client.DataAccess(request)
72+
if err != nil {
73+
return datastore.AddressRef{}, fmt.Errorf("failed to create data access stream: %w", err)
74+
}
75+
7676
if sendErr := stream.Send(request); sendErr != nil {
7777
return datastore.AddressRef{}, fmt.Errorf("failed to send find request: %w", sendErr)
7878
}
@@ -120,12 +120,6 @@ func (s *catalogAddressRefStore) get(
120120

121121
// Fetch returns a copy of all AddressRef in the catalog.
122122
func (s *catalogAddressRefStore) Fetch(_ context.Context) ([]datastore.AddressRef, error) {
123-
// Create a bidirectional stream
124-
stream, err := s.client.DataAccess()
125-
if err != nil {
126-
return nil, fmt.Errorf("failed to create data access stream: %w", err)
127-
}
128-
129123
// Create the find request with an empty filter to get all records
130124
// We only filter by domain and environment to get all records for this store's scope
131125
filter := &pb.AddressReferenceKeyFilter{
@@ -138,13 +132,19 @@ func (s *catalogAddressRefStore) Fetch(_ context.Context) ([]datastore.AddressRe
138132
KeyFilter: filter,
139133
}
140134

141-
// Send the request
135+
// Create the request
142136
request := &pb.DataAccessRequest{
143137
Operation: &pb.DataAccessRequest_AddressReferenceFindRequest{
144138
AddressReferenceFindRequest: findRequest,
145139
},
146140
}
147141

142+
// Create a bidirectional stream with the initial request for HMAC
143+
stream, err := s.client.DataAccess(request)
144+
if err != nil {
145+
return nil, fmt.Errorf("failed to create data access stream: %w", err)
146+
}
147+
148148
if sendErr := stream.Send(request); sendErr != nil {
149149
return nil, fmt.Errorf("failed to send find request: %w", sendErr)
150150
}
@@ -212,12 +212,6 @@ func (s *catalogAddressRefStore) Filter(
212212
}
213213

214214
func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.AddressRef) error {
215-
// Create a bidirectional stream
216-
stream, err := s.client.DataAccess()
217-
if err != nil {
218-
return fmt.Errorf("failed to create data access stream: %w", err)
219-
}
220-
221215
// Convert the datastore record to protobuf
222216
protoRef := s.addressRefToProto(record)
223217

@@ -227,13 +221,19 @@ func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.Address
227221
Semantics: pb.EditSemantics_SEMANTICS_INSERT,
228222
}
229223

230-
// Send the edit request
224+
// Create the request
231225
editReq := &pb.DataAccessRequest{
232226
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
233227
AddressReferenceEditRequest: editRequest,
234228
},
235229
}
236230

231+
// Create a bidirectional stream with the initial request for HMAC
232+
stream, err := s.client.DataAccess(editReq)
233+
if err != nil {
234+
return fmt.Errorf("failed to create data access stream: %w", err)
235+
}
236+
237237
if sendErr := stream.Send(editReq); sendErr != nil {
238238
return fmt.Errorf("failed to send edit request: %w", sendErr)
239239
}
@@ -259,12 +259,6 @@ func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.Address
259259
}
260260

261261
func (s *catalogAddressRefStore) Upsert(_ context.Context, record datastore.AddressRef) error {
262-
// Create a bidirectional stream
263-
stream, err := s.client.DataAccess()
264-
if err != nil {
265-
return fmt.Errorf("failed to create data access stream: %w", err)
266-
}
267-
268262
// Convert the datastore record to protobuf
269263
protoRef := s.addressRefToProto(record)
270264

@@ -274,13 +268,19 @@ func (s *catalogAddressRefStore) Upsert(_ context.Context, record datastore.Addr
274268
Semantics: pb.EditSemantics_SEMANTICS_UPSERT,
275269
}
276270

277-
// Send the edit request
271+
// Create the request
278272
request := &pb.DataAccessRequest{
279273
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
280274
AddressReferenceEditRequest: editRequest,
281275
},
282276
}
283277

278+
// Create a bidirectional stream with the initial request for HMAC
279+
stream, err := s.client.DataAccess(request)
280+
if err != nil {
281+
return fmt.Errorf("failed to create data access stream: %w", err)
282+
}
283+
284284
if sendErr := stream.Send(request); sendErr != nil {
285285
return fmt.Errorf("failed to send edit request: %w", sendErr)
286286
}
@@ -319,12 +319,6 @@ func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.Ad
319319
}
320320

321321
// Record exists, proceed with updating it
322-
// Create a bidirectional stream
323-
stream, streamErr := s.client.DataAccess()
324-
if streamErr != nil {
325-
return fmt.Errorf("failed to create data access stream: %w", streamErr)
326-
}
327-
328322
// Convert the datastore record to protobuf
329323
protoRef := s.addressRefToProto(record)
330324

@@ -334,13 +328,19 @@ func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.Ad
334328
Semantics: pb.EditSemantics_SEMANTICS_UPDATE,
335329
}
336330

337-
// Send the edit request
331+
// Create the request
338332
editReq := &pb.DataAccessRequest{
339333
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
340334
AddressReferenceEditRequest: editRequest,
341335
},
342336
}
343337

338+
// Create a bidirectional stream with the initial request for HMAC
339+
stream, streamErr := s.client.DataAccess(editReq)
340+
if streamErr != nil {
341+
return fmt.Errorf("failed to create data access stream: %w", streamErr)
342+
}
343+
344344
if sendErr := stream.Send(editReq); sendErr != nil {
345345
return fmt.Errorf("failed to send edit request: %w", sendErr)
346346
}

datastore/catalog/remote/address_ref_store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ func setupTestStore(t *testing.T, domain, environment string) *catalogAddressRef
637637
}
638638

639639
// Test if the service is actually available by making a simple call
640-
_, err = catalogClient.DataAccess()
640+
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
641641
if err != nil {
642642
t.Skipf("gRPC service not available at %s: %v. Skipping integration tests.", address, err)
643643
return nil

datastore/catalog/remote/chain_metadata_store.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,7 @@ func (s *catalogChainMetadataStore) Get(
119119
}
120120

121121
func (s *catalogChainMetadataStore) get(ignoreTransaction bool, key datastore.ChainMetadataKey) (datastore.ChainMetadata, error) {
122-
stream, err := s.client.DataAccess()
123-
if err != nil {
124-
return datastore.ChainMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
125-
}
126-
127-
// Send find request
122+
// Create find request
128123
findReq := &pb.DataAccessRequest{
129124
Operation: &pb.DataAccessRequest_ChainMetadataFindRequest{
130125
ChainMetadataFindRequest: &pb.ChainMetadataFindRequest{
@@ -134,6 +129,12 @@ func (s *catalogChainMetadataStore) get(ignoreTransaction bool, key datastore.Ch
134129
},
135130
}
136131

132+
// Create stream with the initial request for HMAC
133+
stream, err := s.client.DataAccess(findReq)
134+
if err != nil {
135+
return datastore.ChainMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
136+
}
137+
137138
if sendErr := stream.Send(findReq); sendErr != nil {
138139
return datastore.ChainMetadata{}, fmt.Errorf("failed to send find request: %w", sendErr)
139140
}
@@ -181,12 +182,7 @@ func (s *catalogChainMetadataStore) get(ignoreTransaction bool, key datastore.Ch
181182

182183
// Fetch returns a copy of all ChainMetadata in the catalog.
183184
func (s *catalogChainMetadataStore) Fetch(_ context.Context) ([]datastore.ChainMetadata, error) {
184-
stream, err := s.client.DataAccess()
185-
if err != nil {
186-
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
187-
}
188-
189-
// Send find request with domain and environment filter only (fetch all)
185+
// Create find request with domain and environment filter only (fetch all)
190186
findReq := &pb.DataAccessRequest{
191187
Operation: &pb.DataAccessRequest_ChainMetadataFindRequest{
192188
ChainMetadataFindRequest: &pb.ChainMetadataFindRequest{
@@ -198,6 +194,12 @@ func (s *catalogChainMetadataStore) Fetch(_ context.Context) ([]datastore.ChainM
198194
},
199195
}
200196

197+
// Create stream with the initial request for HMAC
198+
stream, err := s.client.DataAccess(findReq)
199+
if err != nil {
200+
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
201+
}
202+
201203
if sendErr := stream.Send(findReq); sendErr != nil {
202204
return nil, fmt.Errorf("failed to send find request: %w", sendErr)
203205
}
@@ -347,16 +349,11 @@ func (s *catalogChainMetadataStore) Delete(_ context.Context, _ datastore.ChainM
347349

348350
// editRecord is a helper method that handles Add, Upsert, and Update operations
349351
func (s *catalogChainMetadataStore) editRecord(record datastore.ChainMetadata, semantics pb.EditSemantics) error {
350-
stream, err := s.client.DataAccess()
351-
if err != nil {
352-
return fmt.Errorf("failed to create gRPC stream: %w", err)
353-
}
354-
355352
// Get the current version for this record
356353
key := record.Key()
357354
version := s.getVersion(key)
358355

359-
// Send edit request
356+
// Create edit request
360357
editReq := &pb.DataAccessRequest{
361358
Operation: &pb.DataAccessRequest_ChainMetadataEditRequest{
362359
ChainMetadataEditRequest: &pb.ChainMetadataEditRequest{
@@ -366,6 +363,12 @@ func (s *catalogChainMetadataStore) editRecord(record datastore.ChainMetadata, s
366363
},
367364
}
368365

366+
// Create stream with the initial request for HMAC
367+
stream, err := s.client.DataAccess(editReq)
368+
if err != nil {
369+
return fmt.Errorf("failed to create gRPC stream: %w", err)
370+
}
371+
369372
if sendErr := stream.Send(editReq); sendErr != nil {
370373
return fmt.Errorf("failed to send edit request: %w", sendErr)
371374
}

datastore/catalog/remote/chain_metadata_store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func setupTestChainStore(t *testing.T, domain, environment string) *catalogChain
6060
}
6161

6262
// Test if the service is actually available by making a simple call
63-
_, err = catalogClient.DataAccess()
63+
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
6464
if err != nil {
6565
t.Skipf("gRPC service not available at %s: %v. Skipping integration tests.", address, err)
6666
return nil

datastore/catalog/remote/contract_metadata_store.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,7 @@ func (s *catalogContractMetadataStore) Get(
123123
}
124124

125125
func (s *catalogContractMetadataStore) get(ignoreTransaction bool, key datastore.ContractMetadataKey) (datastore.ContractMetadata, error) {
126-
stream, err := s.client.DataAccess()
127-
if err != nil {
128-
return datastore.ContractMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
129-
}
130-
131-
// Send find request
126+
// Create find request
132127
findReq := &pb.DataAccessRequest{
133128
Operation: &pb.DataAccessRequest_ContractMetadataFindRequest{
134129
ContractMetadataFindRequest: &pb.ContractMetadataFindRequest{
@@ -138,6 +133,12 @@ func (s *catalogContractMetadataStore) get(ignoreTransaction bool, key datastore
138133
},
139134
}
140135

136+
// Create stream with the initial request for HMAC
137+
stream, err := s.client.DataAccess(findReq)
138+
if err != nil {
139+
return datastore.ContractMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
140+
}
141+
141142
if sendErr := stream.Send(findReq); sendErr != nil {
142143
return datastore.ContractMetadata{}, fmt.Errorf("failed to send find request: %w", sendErr)
143144
}
@@ -185,12 +186,7 @@ func (s *catalogContractMetadataStore) get(ignoreTransaction bool, key datastore
185186

186187
// Fetch returns a copy of all ContractMetadata in the catalog.
187188
func (s *catalogContractMetadataStore) Fetch(_ context.Context) ([]datastore.ContractMetadata, error) {
188-
stream, err := s.client.DataAccess()
189-
if err != nil {
190-
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
191-
}
192-
193-
// Send find request with domain and environment filter only (fetch all)
189+
// Create find request with domain and environment filter only (fetch all)
194190
findReq := &pb.DataAccessRequest{
195191
Operation: &pb.DataAccessRequest_ContractMetadataFindRequest{
196192
ContractMetadataFindRequest: &pb.ContractMetadataFindRequest{
@@ -202,6 +198,12 @@ func (s *catalogContractMetadataStore) Fetch(_ context.Context) ([]datastore.Con
202198
},
203199
}
204200

201+
// Create stream with the initial request for HMAC
202+
stream, err := s.client.DataAccess(findReq)
203+
if err != nil {
204+
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
205+
}
206+
205207
if sendErr := stream.Send(findReq); sendErr != nil {
206208
return nil, fmt.Errorf("failed to send find request: %w", sendErr)
207209
}
@@ -354,16 +356,11 @@ func (s *catalogContractMetadataStore) Delete(_ context.Context, _ datastore.Con
354356

355357
// editRecord is a helper method that handles Add, Upsert, and Update operations
356358
func (s *catalogContractMetadataStore) editRecord(record datastore.ContractMetadata, semantics pb.EditSemantics) error {
357-
stream, err := s.client.DataAccess()
358-
if err != nil {
359-
return fmt.Errorf("failed to create gRPC stream: %w", err)
360-
}
361-
362359
// Get the current version for this record
363360
key := record.Key()
364361
version := s.getVersion(key)
365362

366-
// Send edit request
363+
// Create edit request
367364
editReq := &pb.DataAccessRequest{
368365
Operation: &pb.DataAccessRequest_ContractMetadataEditRequest{
369366
ContractMetadataEditRequest: &pb.ContractMetadataEditRequest{
@@ -373,6 +370,12 @@ func (s *catalogContractMetadataStore) editRecord(record datastore.ContractMetad
373370
},
374371
}
375372

373+
// Create stream with the initial request for HMAC
374+
stream, err := s.client.DataAccess(editReq)
375+
if err != nil {
376+
return fmt.Errorf("failed to create gRPC stream: %w", err)
377+
}
378+
376379
if sendErr := stream.Send(editReq); sendErr != nil {
377380
return fmt.Errorf("failed to send edit request: %w", sendErr)
378381
}

datastore/catalog/remote/contract_metadata_store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func setupTestContractStore(t *testing.T, domain, environment string) *catalogCo
6060
}
6161

6262
// Test if the gRPC service is actually available by making a simple call
63-
_, err = catalogClient.DataAccess()
63+
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
6464
if err != nil {
6565
t.Skipf("gRPC service not available at %s: %v. Skipping integration tests.", address, err)
6666
return nil

datastore/catalog/remote/datastore_tx_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ import (
1111

1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
14+
1415
"google.golang.org/grpc/credentials/insecure"
1516

1617
"github.com/smartcontractkit/chainlink-deployments-framework/datastore"
18+
19+
pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore"
1720
)
1821

1922
//nolint:paralleltest
@@ -448,7 +451,7 @@ func setupStore(t *testing.T, ctx context.Context) (*catalogDataStore, error) {
448451
return nil, fmt.Errorf("failed to connect to gRPC server at %s: %w. Skipping integration tests", address, err)
449452
}
450453
// Test if the service is actually available by making a simple call
451-
_, err = catalogClient.DataAccess()
454+
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
452455
if err != nil {
453456
return nil, fmt.Errorf("gRPC service not available at %s: %w. Skipping integration tests", address, err)
454457
}

0 commit comments

Comments
 (0)