@@ -10,6 +10,8 @@ import (
1010 "context"
1111 "fmt"
1212 "io"
13+ "os"
14+ "path/filepath"
1315 "strings"
1416 "sync"
1517 "time"
@@ -18,7 +20,9 @@ import (
1820 "github.com/hyperledger/fabric-protos-go-apiv2/common"
1921 ab "github.com/hyperledger/fabric-protos-go-apiv2/orderer"
2022 "github.com/hyperledger/fabric-x-orderer/common/tools/armageddon"
23+ "github.com/hyperledger/fabric-x-orderer/common/types"
2124 "github.com/hyperledger/fabric-x-orderer/node/comm"
25+ "github.com/hyperledger/fabric-x-orderer/node/crypto"
2226 "github.com/hyperledger/fabric-x-orderer/testutil/tx"
2327)
2428
@@ -27,6 +31,9 @@ type StreamInfo struct {
2731 totalTxSent uint32
2832 endpoint string
2933 streamLock sync.Mutex
34+ partyID types.PartyID
35+ signer * crypto.ECDSASigner
36+ certBytes []byte
3037}
3138
3239var logger = flogging .MustGetLogger ("BroadcastClient" )
@@ -42,7 +49,7 @@ func NewBroadcastTxClient(userConfigFile *armageddon.UserConfig, timeOut time.Du
4249 return & BroadcastTxClient {
4350 userConfig : userConfigFile ,
4451 timeOut : timeOut ,
45- streamRoutersMap : make (map [string ]* StreamInfo , len (userConfigFile .RouterEndpoints )),
52+ streamRoutersMap : make (map [string ]* StreamInfo , len (userConfigFile .RouterUserConfigs )),
4653 }
4754}
4855
@@ -78,7 +85,7 @@ func (c *BroadcastTxClient) SendTx(txContent []byte) error {
7885 streamInfo .streamLock .Lock ()
7986 defer streamInfo .streamLock .Unlock ()
8087
81- env := tx .CreateStructuredEnvelope (txContent )
88+ env := tx .CreateSignedStructuredEnvelope (txContent , streamInfo . signer , streamInfo . certBytes , fmt . Sprintf ( "org%d" , streamInfo . partyID ) )
8289 err := streamInfo .stream .Send (env )
8390 if err != nil {
8491 updateStateLock .Lock ()
@@ -115,9 +122,9 @@ func (c *BroadcastTxClient) SendTx(txContent []byte) error {
115122
116123 waitForReceiveDone .Wait ()
117124 // check if we had any failures
118- possibleNumOfFailures := len (c .userConfig .RouterEndpoints )
119- if len (c .userConfig .RouterEndpoints ) >= 3 {
120- possibleNumOfFailures = len (c .userConfig .RouterEndpoints ) / 3
125+ possibleNumOfFailures := len (c .userConfig .RouterUserConfigs )
126+ if len (c .userConfig .RouterUserConfigs ) >= 3 {
127+ possibleNumOfFailures = len (c .userConfig .RouterUserConfigs ) / 3
121128 }
122129 if failures > possibleNumOfFailures {
123130 er := fmt .Sprintf ("\n failed to send tx to %d out of %d send streams" , failures , len (c .streamRoutersMap ))
@@ -165,15 +172,20 @@ func (c *BroadcastTxClient) createSendStreams() error {
165172 }
166173
167174 // create gRPC clients and streams to the routers
168- for i := 0 ; i < len (userConfig .RouterEndpoints ); i ++ {
169- routerEndpoint := userConfig .RouterEndpoints [i ]
175+ // routerEnpoints are sorted in the userConfig in the same order as party IDs
176+ for _ , routerUserConfig := range userConfig .RouterUserConfigs {
177+ routerEndpoint := routerUserConfig .Endpoint
170178 streamInfo , ok := c .streamRoutersMap [routerEndpoint ]
171179 if ! ok {
172180 // create a gRPC connection to the router
173181 stream := createSendStream (userConfig , serverRootCAs , routerEndpoint )
174182 // if stream is created successfully, add it to the map
175183 if stream != nil {
176- c .streamRoutersMap [routerEndpoint ] = & StreamInfo {stream : stream , totalTxSent : 0 , endpoint : routerEndpoint }
184+ signer , certBytes , err := buildCryptoMaterials (routerUserConfig .UserMSPDir )
185+ if err != nil {
186+ return fmt .Errorf ("failed to build crypto materials: %v" , err )
187+ }
188+ c .streamRoutersMap [routerEndpoint ] = & StreamInfo {stream : stream , totalTxSent : 0 , endpoint : routerEndpoint , partyID : routerUserConfig .PartyID , signer : signer , certBytes : certBytes }
177189 }
178190 } else {
179191 if streamInfo .stream == nil {
@@ -192,6 +204,24 @@ func (c *BroadcastTxClient) createSendStreams() error {
192204 return nil
193205}
194206
207+ func buildCryptoMaterials (configDir string ) (* crypto.ECDSASigner , []byte , error ) {
208+ keyBytes , err := os .ReadFile (filepath .Join (configDir , "keystore/priv_sk" ))
209+ if err != nil {
210+ return nil , nil , fmt .Errorf ("failed to read private key file: %v" , err )
211+ }
212+ signer , err := tx .CreateECDSASigner (keyBytes )
213+ if err != nil {
214+ return nil , nil , fmt .Errorf ("failed to create signer from private key: %v" , err )
215+ }
216+
217+ certBytes , err := os .ReadFile (filepath .Join (configDir , "signcerts/sign-cert.pem" ))
218+ if err != nil {
219+ return nil , nil , fmt .Errorf ("failed to read sign certificate file: %v" , err )
220+ }
221+
222+ return signer , certBytes , nil
223+ }
224+
195225func (c * BroadcastTxClient ) Stop () {
196226 c .streamsMapLock .Lock ()
197227 defer c .streamsMapLock .Unlock ()
0 commit comments