|
| 1 | +diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c |
| 2 | +index 2d889e09..cf367d3a 100644 |
| 3 | +--- a/src/rdkafka_partition.c |
| 4 | ++++ b/src/rdkafka_partition.c |
| 5 | +@@ -3612,12 +3612,14 @@ reply: |
| 6 | + |
| 7 | + if (rd_kafka_timer_stop(&rk->rk_timers, &rko->rko_u.leaders.query_tmr, |
| 8 | + RD_DO_LOCK)) |
| 9 | +- rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce, |
| 10 | +- "query timer"); |
| 11 | ++ if (rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce, |
| 12 | ++ "query timer")) |
| 13 | ++ rko->rko_u.leaders.eonce = NULL; |
| 14 | + if (rd_kafka_timer_stop(&rk->rk_timers, &rko->rko_u.leaders.timeout_tmr, |
| 15 | + RD_DO_LOCK)) |
| 16 | +- rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce, |
| 17 | +- "timeout timer"); |
| 18 | ++ if (rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce, |
| 19 | ++ "timeout timer")) |
| 20 | ++ rko->rko_u.leaders.eonce = NULL; |
| 21 | + |
| 22 | + if (rko->rko_u.leaders.eonce) { |
| 23 | + rd_kafka_enq_once_disable(rko->rko_u.leaders.eonce); |
| 24 | +diff --git a/src/rdkafka_queue.h b/src/rdkafka_queue.h |
| 25 | +index 0d50f587..04dddbf9 100644 |
| 26 | +--- a/src/rdkafka_queue.h |
| 27 | ++++ b/src/rdkafka_queue.h |
| 28 | +@@ -983,7 +983,8 @@ rd_kafka_enq_once_add_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) { |
| 29 | + |
| 30 | + |
| 31 | + /** |
| 32 | +- * @brief Decrement refcount for source (non-owner), such as a timer. |
| 33 | ++ * @brief Decrement refcount for source (non-owner), such as a timer |
| 34 | ++ * and return 1 if eonce was destroyed. |
| 35 | + * |
| 36 | + * @param srcdesc a human-readable descriptive string of the source. |
| 37 | + * May be used for future debugging. |
| 38 | +@@ -993,7 +994,7 @@ rd_kafka_enq_once_add_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) { |
| 39 | + * This API is used to undo an add_source() from the |
| 40 | + * same code. |
| 41 | + */ |
| 42 | +-static RD_INLINE RD_UNUSED void |
| 43 | ++static RD_INLINE RD_UNUSED int |
| 44 | + rd_kafka_enq_once_del_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) { |
| 45 | + int do_destroy; |
| 46 | + |
| 47 | +@@ -1006,7 +1007,10 @@ rd_kafka_enq_once_del_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) { |
| 48 | + if (do_destroy) { |
| 49 | + /* We're the last refcount holder, clean up eonce. */ |
| 50 | + rd_kafka_enq_once_destroy0(eonce); |
| 51 | ++ return 1; |
| 52 | + } |
| 53 | ++ |
| 54 | ++ return 0; |
| 55 | + } |
| 56 | + |
| 57 | + /** |
0 commit comments