@@ -37,6 +37,7 @@ export function initHandlers(handlers) {
3737 */
3838export async function consume ( channel , exchangeName , queue , publishChannel ) {
3939 channel . assertExchange ( exchangeName , 'topic' , { durable : true } ) ;
40+ publishChannel . assertExchange ( exchangeName , 'topic' , { durable : true } ) ;
4041 channel . assertQueue ( queue , { durable : true } ) ;
4142 const bindings = _ . keys ( EVENT_HANDLERS ) ;
4243 const bindingPromises = _ . map ( bindings , rk =>
@@ -78,11 +79,17 @@ export async function consume(channel, exchangeName, queue, publishChannel) {
7879 // we can use cloudamqp console to check the messages and may be manually create SF lead
7980 // nacking here was causing flood of messages to the worker and it keep on consuming high resources
8081 channel . ack ( msg ) ;
81- publishChannel . publish (
82- exchangeName ,
83- EVENT . ROUTING_KEY . CONNECT_TO_SF_FAILED ,
84- new Buffer ( msg . content . toString ( ) )
85- ) ;
82+ try {
83+ publishChannel . publish (
84+ exchangeName ,
85+ EVENT . ROUTING_KEY . CONNECT_TO_SF_FAILED ,
86+ new Buffer ( msg . content . toString ( ) )
87+ ) ;
88+ } catch ( e ) {
89+ // TODO decide if we want nack the original msg here
90+ // for now just ignoring the error in requeue
91+ logger . logFullError ( e , `Error in publising Exchange to ${ exchangeName } ` ) ;
92+ }
8693 }
8794 }
8895 } ) ;
0 commit comments