@@ -186,24 +186,23 @@ class Notificator {
186186 }
187187 }
188188
189- async parse_connect_file ( connect_filename_with_overrides , decrypt = false ) {
189+ async parse_connect_file ( connection_name , decrypt = false ) {
190190 let connect ;
191- const filename_parts = connect_filename_with_overrides . split ( '?' ) ;
192- const connect_filename_no_overrides = filename_parts [ 0 ] ;
193- const overrides_str = filename_parts [ 1 ] ;
191+ let connect_filename = connection_name ;
192+ let kafka_topic_from_connection_name ;
193+ if ( connection_name . startsWith ( "kafka:::topic/" ) ) {
194+ const connection_parts = connection_name . split ( '/' ) ;
195+ connect_filename = connection_parts [ 1 ] ;
196+ kafka_topic_from_connection_name = connection_parts . length > 2 && connection_parts [ 3 ] ;
197+ }
194198
195199 if ( this . nc_config_fs ) {
196- connect = await this . nc_config_fs . get_connection_by_name ( connect_filename_no_overrides ) ;
200+ connect = await this . nc_config_fs . get_connection_by_name ( connect_filename ) ;
197201 } else {
198- const filepath = path . join ( this . connect_files_dir , connect_filename_no_overrides ) ;
202+ const filepath = path . join ( this . connect_files_dir , connect_filename ) ;
199203 const connect_str = fs . readFileSync ( filepath , 'utf-8' ) ;
200204 connect = JSON . parse ( connect_str ) ;
201205 }
202- if ( overrides_str ) {
203- const overrides_obj = JSON . parse ( overrides_str ) ;
204- _ . merge ( connect , overrides_obj ) ;
205- dbg . log2 ( "effective connect =" , connect ) ;
206- }
207206
208207 //if connect file is encrypted (and decryption is requested),
209208 //decrypt the auth field
@@ -212,6 +211,11 @@ class Notificator {
212211 connect . request_options_object . auth , connect . master_key_id ) ;
213212 }
214213 load_files ( connect ) ;
214+
215+ //use the kafka topic, if it was present in connection_name
216+ if ( kafka_topic_from_connection_name ) {
217+ connect . topic = kafka_topic_from_connection_name ;
218+ }
215219 return connect ;
216220 }
217221}
@@ -371,6 +375,7 @@ async function test_notifications(notifs, nc_config_dir, req) {
371375 let notif_failure ;
372376 try {
373377 connect = await notificator . parse_connect_file ( notif . topic [ 0 ] ) ;
378+ dbg . log0 ( `effective connect for notif ${ notif . id [ 0 ] } is` , connect ) ;
374379 connection = get_connection ( connect ) ;
375380 await connection . connect ( ) ;
376381 await connection . promise_notify ( compose_notification_test ( req ) , async ( notif_cb , err_cb , err ) => {
0 commit comments