Skip to content

[filter_multiline] [engine] Segmentation fault (SIGSEGV) and/or deadlock in threaded mode #9835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
drbugfinder-work opened this issue Jan 15, 2025 · 16 comments

Comments

@drbugfinder-work
Copy link
Contributor

Bug Report

Describe the bug
When using threaded mode in filter_multiline, segmentation faults or deadlocks are occurring randomly (especially in high load situations).
I assume this is caused by missing thread-safe implementation within the flb_log_event_encoder functions.

There is also an auto-closed issue #6728, together with an open and outdated PR from @nokute78 #6765 which are describing a similar issue, which is obviously still not fixed.

Example deadlock stacktraces:

flb_log_event_encoder_commit_record

Thread 57 (Thread 0x7fbe132dc6c0 (LWP 113) "flb-in-tail.47-"):
#0 futex_wait (private=0, expected=2, futex_word=0x7fbe4ec16708) at ../sysdeps/nptl/futex-internal.h:146
#1 __GI___lll_lock_wait (futex=futex@entry=0x7fbe4ec16708, private=0) at ./nptl/lowlevellock.c:49
#2 0x00007fbe505a90f1 in lll_mutex_lock_optimized (mutex=0x7fbe4ec16708) at ./nptl/pthread_mutex_lock.c:48
#3 ___pthread_mutex_lock (mutex=0x7fbe4ec16708) at ./nptl/pthread_mutex_lock.c:93
#4 0x00005648f0d551d1 in ?? ()
#5 0x00005648f0d625b0 in ?? ()
#6 0x00005648f0cf2417 in ?? ()
#7 0x00005648f0dd4436 in flb_log_event_encoder_dynamic_field_scope_leave ()
#8 0x00005648f0dd465d in flb_log_event_encoder_dynamic_field_flush ()
#9 0x00005648f0dd2ac6 in flb_log_event_encoder_commit_record ()
#10 0x00005648f0db459d in flb_ml_flush_stream_group ()
#11 0x00005648f0dd6627 in flb_ml_rule_process ()
#12 0x00005648f0db4f9b in ?? ()
#13 0x00005648f0db5458 in ?? ()
#14 0x00005648f0db573d in flb_ml_append_object ()
#15 0x00005648f0eb7963 in ?? ()
#16 0x00005648f0da95bb in flb_processor_run ()
#17 0x00005648f0dcc8e7 in ?? ()
#18 0x00005648f0dcca6c in flb_input_log_append_skip_processor_stages ()
#19 0x00005648f0ebe3dc in ?? ()
#20 0x00005648f0da95bb in flb_processor_run ()
#21 0x00005648f0dcc8e7 in ?? ()
#22 0x00005648f0dcca9d in flb_input_log_append_records ()
#23 0x00005648f0e0b516 in flb_tail_file_chunk ()
#24 0x00005648f0e05c57 in in_tail_collect_event ()

flb_log_event_encoder_dynamic_field_reset

Thread 153 (Thread 0x7fbe4f67f6c0 (LWP 17) "flb-pipeline"):
#0 futex_wait (private=0, expected=2, futex_word=0x7fbe4ec16708) at ../sysdeps/nptl/futex-internal.h:146
#1 __GI___lll_lock_wait (futex=futex@entry=0x7fbe4ec16708, private=0) at ./nptl/lowlevellock.c:49
#2 0x00007fbe505a90f1 in lll_mutex_lock_optimized (mutex=0x7fbe4ec16708) at ./nptl/pthread_mutex_lock.c:48
#3 ___pthread_mutex_lock (mutex=0x7fbe4ec16708) at ./nptl/pthread_mutex_lock.c:93
#4 0x00005648f0d551d1 in ?? ()
#5 0x00005648f0d625b0 in ?? ()
#6 0x00005648f0cf2417 in ?? ()
#7 0x00005648f0dd4436 in flb_log_event_encoder_dynamic_field_scope_leave ()
#8 0x00005648f0dd46aa in flb_log_event_encoder_dynamic_field_reset ()
#9 0x00005648f0dd2891 in flb_log_event_encoder_reset_record ()
#10 0x00005648f0dd2979 in flb_log_event_encoder_emit_record ()
#11 0x00005648f0db459d in flb_ml_flush_stream_group ()
#12 0x00005648f0db4cd5 in flb_ml_flush_parser_instance ()
#13 0x00005648f0db4d91 in flb_ml_flush_pending ()
#14 0x00005648f0da0446 in flb_sched_event_handler ()
#15 0x00005648f0d9c7c8 in flb_engine_start ()
#16 0x00005648f0d79268 in ?? ()
#17 0x00007fbe505a5a94 in start_thread (arg=) at ./nptl/pthread_create.c:447
#18 0x00007fbe50632c3c in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:78

and similar stacktraces for other flb_log_event_encoder functions.

Example stacktrace for segmentation fault crash:

[2025/01/09 08:36:09] [engine] caught signal (SIGSEGV)
[2025/01/09 08:36:09] [engine] caught signal (SIGSEGV)
#0  0x55a8643027c8      in  cfl_list_add_before() at lib/cfl/include/cfl/cfl_list.h:130
#1  0x55a864302832      in  cfl_list_prepend() at lib/cfl/include/cfl/cfl_list.h:154
#2  0x55a8643063f2      in  flb_log_event_encoder_dynamic_field_scope_enter() at src/flb_log_event_encoder_dynamic_field.c:67
#3  0x55a864306524      in  flb_log_event_encoder_dynamic_field_begin_array() at src/flb_log_event_encoder_dynamic_field.c:124
#4  0x55a8642fbab2      in  flb_log_event_encoder_emit_record() at src/flb_log_event_encoder.c:168
#5  0x55a8642fbd1c      in  flb_log_event_encoder_commit_record() at src/flb_log_event_encoder.c:267
#6  0x55a8642806a0      in  flb_ml_flush_stream_group() at src/multiline/flb_ml.c:1505
#7  0x55a86427d92a      in  flb_ml_flush_parser_instance() at src/multiline/flb_ml.c:117
#8  0x55a86427d9e0      in  flb_ml_flush_pending() at src/multiline/flb_ml.c:137
#9  0x55a86427da93      in  cb_ml_flush_timer() at src/multiline/flb_ml.c:163  
#10 0x55a864225b73      in  flb_sched_event_handler() at src/flb_scheduler.c:624
#11 0x55a864216cf7      in  flb_engine_start() at src/flb_engine.c:1044
#12 0x55a8641ae5d4      in  flb_lib_worker() at src/flb_lib.c:763
#13 0x7f2ac7abaa93      in  start_thread() at c:447
#14 0x7f2ac7b47c3b      in  clone3() at inux/x86_64/clone3.S:78
#15 0xffffffffffffffff  in  ???() at ???:0

@nokute78 (cc @edsiper) Was there a reason for #6765 not to be merged (and updated to current code base)?

To Reproduce

  • Use tail input plugin (we use globs for multiple files)
  • Use multiline filter with threaded mode enabled
  • Put enough load on it and watch it crash/see deadlock in gdb (e.g. use: gdb -p <pid> --batch -ex "thread apply all bt" -ex "detach" -ex "quit")

Your Environment

  • Version used: 3.2.4 (but the issue exists since many versions)

Maybe related:

As I read in the announcement of v2.0.2, the memory ring buffer mem_buf_limit should be no less than 20M in size. As far as I understand the code, the in_emitter is used with memrb in case of threaded multiline filter.
However, as I've already mentioned in #8473, there is this strange (and most probably wrong) assignment:

ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY;

The default value for the flush frequency is 2000, so I assume this would set the ring buffer size to only 2k. Can you please verify this @nokute78 @edsiper @leonardo-albertovich @pwhelan

@drbugfinder-work
Copy link
Contributor Author

@nokute78 @edsiper Our recent observations indicate that, in addition to segfaults and deadlocks, we are also experiencing log corruption, where log entries are getting mixed up. This appears to be a significant issue

Copy link
Contributor

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

@leonardo-albertovich
Copy link
Collaborator

@drbugfinder-work would you mind helping me validate this issue? I tried to reproduce it and fixed something that may or may not be the root of this issue with PR #10237 but I'd like to be able to follow your exact repro steps.

As for @nokute78s PR I think it might be outdated (at some point I think there was a bug where certain coroutines where scheduled in the wrong thread) because unless I'm forgetting about something critical there should be no more than one thread operating on a multiline context ever.

@github-actions github-actions bot removed the Stale label Apr 23, 2025
@shenningz
Copy link

@leonardo-albertovich I am replying for @drbugfinder-work here: we are still observing intermittent crashes on Fluent Bit 4.0.1, this time with slightly different stack traces:

[engine] caught signal (SIGSEGV)
#0  0x560a88611619      in  __cfl_list_del() at lib/cfl/include/cfl/cfl_list.h:72
#1  0x560a88611650      in  cfl_list_del() at lib/cfl/include/cfl/cfl_list.h:78
#2  0x560a886156b1      in  flb_log_event_encoder_dynamic_field_scope_leave() at src/flb_log_event_encoder_dynamic_field.c:107
#3  0x560a88615899      in  flb_log_event_encoder_dynamic_field_flush_scopes() at src/flb_log_event_encoder_dynamic_field.c:210
#4  0x560a886158cf      in  flb_log_event_encoder_dynamic_field_flush() at src/flb_log_event_encoder_dynamic_field.c:225
#5  0x560a8860a96e      in  flb_log_event_encoder_commit_record() at src/flb_log_event_encoder.c:260
#6  0x560a8858a0cf      in  flb_ml_flush_stream_group() at src/multiline/flb_ml.c:1505
#7  0x560a8858730d      in  flb_ml_flush_parser_instance() at src/multiline/flb_ml.c:117
#8  0x560a885873c3      in  flb_ml_flush_pending() at src/multiline/flb_ml.c:137
#9  0x560a88587476      in  cb_ml_flush_timer() at src/multiline/flb_ml.c:163
#10 0x560a885267c7      in  flb_sched_event_handler() at src/flb_scheduler.c:624
#11 0x560a88516e97      in  flb_engine_start() at src/flb_engine.c:1058
#12 0x560a884a939d      in  flb_lib_worker() at src/flb_lib.c:835
#13 0x7faf8bf45aa3      in  start_thread() at c:447
#14 0x7faf8bfd2c3b      in  clone3() at inux/x86_64/clone3.S:78
#15 0xffffffffffffffff  in  ???() at ???:0

[engine] caught signal (SIGSEGV)
#0  0x5564f0ca8619      in  __cfl_list_del() at lib/cfl/include/cfl/cfl_list.h:72
#1  0x5564f0ca8650      in  cfl_list_del() at lib/cfl/include/cfl/cfl_list.h:78
#2  0x5564f0cac6b1      in  flb_log_event_encoder_dynamic_field_scope_leave() at src/flb_log_event_encoder_dynamic_field.c:107
#3  0x5564f0cac899      in  flb_log_event_encoder_dynamic_field_flush_scopes() at src/flb_log_event_encoder_dynamic_field.c:210
#4  0x5564f0cac92e      in  flb_log_event_encoder_dynamic_field_reset() at src/flb_log_event_encoder_dynamic_field.c:240
#5  0x5564f0ca1421      in  flb_log_event_encoder_reset() at src/flb_log_event_encoder.c:33
#6  0x5564f0c211c0      in  flb_ml_flush_stream_group() at src/multiline/flb_ml.c:1526
#7  0x5564f0c1e30d      in  flb_ml_flush_parser_instance() at src/multiline/flb_ml.c:117
#8  0x5564f0c1e3c3      in  flb_ml_flush_pending() at src/multiline/flb_ml.c:137
#9  0x5564f0c1e476      in  cb_ml_flush_timer() at src/multiline/flb_ml.c:163
#10 0x5564f0bbd7c7      in  flb_sched_event_handler() at src/flb_scheduler.c:624
#11 0x5564f0bade97      in  flb_engine_start() at src/flb_engine.c:1058
#12 0x5564f0b4039d      in  flb_lib_worker() at src/flb_lib.c:835
#13 0x7f747fa11aa3      in  start_thread() at c:447
#14 0x7f747fa9ec3b      in  clone3() at inux/x86_64/clone3.S:78
#15 0xffffffffffffffff  in  ???() at ???:0

This does not easily reproduce on demand in an isolated test, only very intermittently in a complex environment. We will try to work on a more simple reproducer.
In any case, PR #10237 does not seem to fix the root cause for this.

@leonardo-albertovich
Copy link
Collaborator

Thank you @shenningz, from your log I can see that the multiline code is running on the main thread.
I'll add some questions to try to clarify the situation :

  1. Are you running a tail input in non threaded mode?
  2. Are you running the multiline filter explicitly or are you setting multiline as part of the tail input instance?
  3. Are you using hot reload?

@leonardo-albertovich
Copy link
Collaborator

Actually, going back to the original report I can see that the multiline filter is running as a processor directly attached to a tail input that also seems to have something like a filter_rewrite_tag (because of the emitter is the only filter I think causes processor stages to be skipped when re-ingesting data).

I'm a bit confused because your traces look different @shenningz.

I've found two defects in multiline so far :

  1. This timer is only stopped when the scheduler is destroyed which can cause issues
  2. This conditional exits without resetting the context which is not correct (not fatal though)

I'll modify my tests to :

  1. Use multiline as a processor combined with an emitter
  2. Use multiline as a filter combined with one or more high volume threaded inputs

Even then it'd be really helpful if you could get in touch with me and share more context (configuration, logs, etc) so feel free to do it here or contact me through the fluent slack if you feel more comfortable that way.

@leonardo-albertovich
Copy link
Collaborator

It's a nasty threading issue caused by this line.

The reason is regardless of if threading is enabled or not processor stacks are initiaklized from the main thread which causes those processors (or filters) that create timers to erroneously schedule those timers on the main pipeline thread instead of the plugins thread.

That is why @nokute78 s patch worked and as I previously mentioned, the fact that there was code running in the wrong thread was the root cause and I'm happy that we didn't merge the PR because it would hide it and make it much more difficult to fix in the future.

There is another issue with how the event loop is picked when a collector is created which causes the emitter to fail if the previously mentioned issue is fixed.

This one is tricky because it decides which event loop to use based on the is_threaded attribute but the problem is since the collector for the emitter is created from the initialization callback and setting threaded to on causes it to create an additional thread (not what we intend when running it as part of a processor stack for a threaded plugin) we can't just fix it directly.

As a proof of concept I patched it to pick the event loop from the TLS instead of the taking it from the config structure but I'm not happy with it so I'll take a few more hours to think about it and make a proper PR that fixes all of these issues.

@drbugfinder-work
Copy link
Contributor Author

Hi @leonardo-albertovich
I've just wrote a reproducer:

config.yaml

service:
  flush: 1
  log_level: info
  parsers_file: parsers_multiline.conf

pipeline:
  inputs:
    - name: tail
      path: /tmp/fluentbit_source_logs/source*.log
      read_from_head: false
      refresh_interval: 1
      tag_regex: 'source(?<source_id>[0-9]+)'
      tag: tailed.<source_id>
      threaded: true
      buffer_chunk_size: 512
      buffer_max_size: 512
      processors:
        logs:
          - name: multiline
            multiline.parser: multiline_start
            multiline.key_content: log
            buffer: true
  outputs:
    - name: file
      match: tailed.*
      path: /tmp/fluentbit_final_output/
      file: tailed_output.log
      format: plain
      mkdir: true

parsers_multiline.conf

[MULTILINE_PARSER]
    name          multiline_start
    type          regex
    flush_timeout 1000
    rule      "start_state"   "^START\:.*"    "cont"
    rule      "cont"          "^INFO\:.*"     "cont"

dummy.sh

#!/bin/bash

sources=10
lines_per_entry=5
sleep_duration=0.001

log_dir="/tmp/fluentbit_source_logs"
mkdir -p "$log_dir"

for i in $(seq 1 $sources); do
  : > "$log_dir/source${i}.log"
done
while true
do
  for i in $(seq 1 $sources); do
    {
      echo "START: Source $i initiated $(date +%s.%N)"
      for j in $(seq 1 $((lines_per_entry - 1))); do
        echo "INFO: Source $i processing step $j $(date +%s.%N)"
      done
    } >> "$log_dir/source${i}.log"
    sleep "$sleep_duration"
  done
done

@leonardo-albertovich
Copy link
Collaborator

Great, I was able to reproduce the crash immediately with this and so far it seems that it validates my approach because the patched version doesn't crash.

@drbugfinder-work

This comment has been minimized.

@drbugfinder-work
Copy link
Contributor Author

@leonardo-albertovich

I have a new version of the log generator (this time in python for better multithreaded log creation). The old version was not fast enough in bash:

import threading
import time
import os

sources = 20
lines_per_entry = 10
entries_per_second = 50  # 50 entries * 10 lines = 500 lines/sec
log_dir = "/tmp/fluentbit_source_logs"
os.makedirs(log_dir, exist_ok=True)


def generate_logs(source_id):
    logfile = os.path.join(log_dir, f"source{source_id}.log")
    sleep_time = 1.0 / entries_per_second

    with open(logfile, "a", buffering=1) as f:
        while True:
            ts = time.time()
            f.write(f"START: Source {source_id} initiated {ts:.9f}\n")
            for i in range(1, lines_per_entry):
                ts = time.time()
                f.write(f"INFO: Source {source_id} processing step {i} {ts:.9f}\n")
            time.sleep(sleep_time)


# Start Threads
threads = []
for i in range(1, sources + 1):
    t = threading.Thread(target=generate_logs, args=(i,), daemon=True)
    t.start()
    threads.append(t)

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Terminate.")

@drbugfinder-work
Copy link
Contributor Author

Hi @leonardo-albertovich,

I've just proofed our initial observations regarding mixed-up multiline messages.
I let the test run for a couple of times (unfortunately, it crashes multiple times because of the same issue), and finally, I got one record, which is mixed up from three different ML streams (14, 20, 19).

The output record looks like this:

{"log":"START: Source 14 initiated 1747311704.282039881\nINFO: Source 20 processing step 1 1747311704.282145500\nINFO: Source 20 processing step 2 1747311704.282228470\nINFO: Source 20 processing step 3 1747311704.282335281\nINFO: Source 20 processing step 4 1747311704.282494068\nINFO: Source 20 processing step 5 1747311704.282578230\nINFO: Source 19 processing step 6 1747311704.276471376\nINFO: Source 19 processing step 7 1747311704.276487827\nINFO: Source 19 processing step 8 1747311704.276503801\nINFO: Source 19 processing step 9 1747311704.276520014"}

You can use this python script to check your own output files:

import json
import re

inconsistent_lines = 0
total_lines = 0

with open("/tmp/fluentbit_final_output/tailed_output.log", "r") as file:
    for line in file:
        total_lines += 1
        try:
            entry = json.loads(line)
            log_content = entry.get("log", "")
            sources = re.findall(r"Source (\d+)", log_content)

            if not sources:
                continue

            unique_sources = set(sources)
            if len(unique_sources) > 1:
                print(f"Inconsistent line {total_lines}: found different sources {unique_sources} in this line")
                inconsistent_lines += 1

        except json.JSONDecodeError:
            print(f"Invalid JSON at line {total_lines}")

print(f"\nChecked {total_lines} lines.")
print(f"Found {inconsistent_lines} inconsistent multiline blocks.")

This happens not very often, but it does happen. I guess it is caused by the same issue.

@leonardo-albertovich
Copy link
Collaborator

Thank you for the update, the original bash script worked reliably enough for me but I'll check this new script to see if I can reproduce the mixup issue so I ensure that my PR covers both issues.

I'm a bit behind the schedule for this PR but I'll try to wrap it up today.

If I share a branch with my changes would you be able to run some tests on your side to help me validate the patch?

@drbugfinder-work
Copy link
Contributor Author

If I share a branch with my changes would you be able to run some tests on your side to help me validate the patch?

@leonardo-albertovich Absolutely! Just let me know

@leonardo-albertovich
Copy link
Collaborator

Sorry about the delay @drbugfinder-work, these are the PRs for 4.0 and 3.2

Please let me know how it goes.

These PRs fix the incorrect scheduling issue, they do not fix the leftover collector timer issue but that shouldn't be the culprit here and I wanted to keep things properly compartmentalized.

@drbugfinder-work
Copy link
Contributor Author

@leonardo-albertovich Great! I'll test it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants