@@ -1016,7 +1016,10 @@ static int fw_process_message_mode_entry(
10161016 msgpack_object options ;
10171017 int result ;
10181018 msgpack_object chunk ;
1019+ struct flb_in_fw_config * ctx ;
10191020
1021+ /* Save ctx pointer before any operation that might delete the connection */
1022+ ctx = conn -> ctx ;
10201023 metadata = NULL ;
10211024
10221025 if (chunk_id != -1 || metadata_id != -1 ) {
@@ -1030,39 +1033,47 @@ static int fw_process_message_mode_entry(
10301033 result = flb_log_event_decoder_decode_timestamp (ts , & timestamp );
10311034
10321035 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
1033- result = flb_log_event_encoder_begin_record (conn -> ctx -> log_encoder );
1036+ result = flb_log_event_encoder_begin_record (ctx -> log_encoder );
10341037 }
10351038
10361039 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
1037- result = flb_log_event_encoder_set_timestamp (conn -> ctx -> log_encoder ,
1040+ result = flb_log_event_encoder_set_timestamp (ctx -> log_encoder ,
10381041 & timestamp );
10391042 }
10401043
10411044 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
10421045 if (metadata != NULL ) {
10431046 result = flb_log_event_encoder_set_metadata_from_msgpack_object (
1044- conn -> ctx -> log_encoder ,
1047+ ctx -> log_encoder ,
10451048 metadata );
10461049 }
10471050 }
10481051
10491052 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
10501053 result = flb_log_event_encoder_set_body_from_msgpack_object (
1051- conn -> ctx -> log_encoder ,
1054+ ctx -> log_encoder ,
10521055 body );
10531056 }
10541057
10551058 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
1056- result = flb_log_event_encoder_commit_record (conn -> ctx -> log_encoder );
1059+ result = flb_log_event_encoder_commit_record (ctx -> log_encoder );
10571060 }
10581061
10591062 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
10601063 flb_input_log_append (in , tag , tag_len ,
1061- conn -> ctx -> log_encoder -> output_buffer ,
1062- conn -> ctx -> log_encoder -> output_length );
1064+ ctx -> log_encoder -> output_buffer ,
1065+ ctx -> log_encoder -> output_length );
10631066 }
10641067
1065- flb_log_event_encoder_reset (conn -> ctx -> log_encoder );
1068+ flb_log_event_encoder_reset (ctx -> log_encoder );
1069+
1070+ /* Check if plugin was paused during log append (connection may have been deleted) */
1071+ pthread_mutex_lock (& ctx -> conn_mutex );
1072+ if (ctx -> is_paused ) {
1073+ pthread_mutex_unlock (& ctx -> conn_mutex );
1074+ return -1 ;
1075+ }
1076+ pthread_mutex_unlock (& ctx -> conn_mutex );
10661077
10671078 if (chunk_id != -1 ) {
10681079 chunk = options .via .map .ptr [chunk_id ].val ;
@@ -1076,6 +1087,11 @@ static size_t receiver_recv(struct fw_conn *conn, char *buf, size_t try_size) {
10761087 size_t off ;
10771088 size_t actual_size ;
10781089
1090+ /* Safety check: ensure connection is not being deleted and buffer exists */
1091+ if (conn -> being_deleted || !conn -> buf ) {
1092+ return 0 ;
1093+ }
1094+
10791095 off = conn -> buf_len - conn -> rest ;
10801096 actual_size = try_size ;
10811097
@@ -1288,6 +1304,24 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
12881304 conn -> rest = conn -> buf_len ;
12891305
12901306 while (1 ) {
1307+ /* Check if connection is being deleted or plugin is paused */
1308+ if (conn -> being_deleted ) {
1309+ msgpack_unpacker_free (unp );
1310+ msgpack_unpacked_destroy (& result );
1311+ flb_sds_destroy (out_tag );
1312+ return 0 ;
1313+ }
1314+
1315+ pthread_mutex_lock (& ctx -> conn_mutex );
1316+ if (ctx -> is_paused ) {
1317+ pthread_mutex_unlock (& ctx -> conn_mutex );
1318+ msgpack_unpacker_free (unp );
1319+ msgpack_unpacked_destroy (& result );
1320+ flb_sds_destroy (out_tag );
1321+ return 0 ;
1322+ }
1323+ pthread_mutex_unlock (& ctx -> conn_mutex );
1324+
12911325 recv_len = receiver_to_unpacker (conn , EACH_RECV_SIZE , unp );
12921326 if (recv_len == 0 ) {
12931327 /* No more data */
@@ -1445,6 +1479,14 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
14451479 out_tag , flb_sds_len (out_tag ),
14461480 & entry .via .array .ptr [index ],
14471481 chunk_id );
1482+
1483+ /* Check if connection was deleted during processing */
1484+ if (conn -> being_deleted ) {
1485+ msgpack_unpacked_destroy (& result );
1486+ msgpack_unpacker_free (unp );
1487+ flb_sds_destroy (out_tag );
1488+ return 0 ;
1489+ }
14481490 }
14491491
14501492 if (chunk_id != -1 ) {
@@ -1493,11 +1535,31 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
14931535 }
14941536
14951537 /* Process map */
1496- fw_process_message_mode_entry (
1538+ ret = fw_process_message_mode_entry (
14971539 conn -> in , conn ,
14981540 out_tag , flb_sds_len (out_tag ),
14991541 & root , & entry , & map , chunk_id ,
15001542 metadata_id );
1543+
1544+ /* Check if plugin was paused (connection may have been deleted) */
1545+ if (ret == -1 ) {
1546+ pthread_mutex_lock (& ctx -> conn_mutex );
1547+ if (ctx -> is_paused ) {
1548+ pthread_mutex_unlock (& ctx -> conn_mutex );
1549+ msgpack_unpacked_destroy (& result );
1550+ msgpack_unpacker_free (unp );
1551+ flb_sds_destroy (out_tag );
1552+ return 0 ;
1553+ }
1554+ if (conn -> being_deleted ) {
1555+ pthread_mutex_unlock (& ctx -> conn_mutex );
1556+ msgpack_unpacked_destroy (& result );
1557+ msgpack_unpacker_free (unp );
1558+ flb_sds_destroy (out_tag );
1559+ return 0 ;
1560+ }
1561+ pthread_mutex_unlock (& ctx -> conn_mutex );
1562+ }
15011563 }
15021564 else if (entry .type == MSGPACK_OBJECT_STR ||
15031565 entry .type == MSGPACK_OBJECT_BIN ) {
@@ -1618,6 +1680,16 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
16181680
16191681 goto cleanup_decompress ;
16201682 }
1683+
1684+ /* Check if connection was deleted during append */
1685+ if (conn -> being_deleted ) {
1686+ flb_free (decomp_buf );
1687+ msgpack_unpacked_destroy (& result );
1688+ msgpack_unpacker_free (unp );
1689+ flb_sds_destroy (out_tag );
1690+ flb_decompression_context_destroy (conn -> d_ctx );
1691+ return 0 ;
1692+ }
16211693 }
16221694 } while (decomp_len > 0 );
16231695
0 commit comments