Skip to content

Problems with command processor #3766

@static-moonlight

Description

@static-moonlight

I'm using a command processor to run a script. Unfortunately, I have some problems with it, which in combination might pose a show stopper for what I wanted to do.

This is a simplified version of my stream processor (simplified as in: I removed everything that is not relevant), around the command processor. Basically, it's inside a for_each, which is inside a branch, which is inside a switch.

input:
  nats_jetstream:
    [...]

pipeline:
  processors:
    - switch:
        - check: [...]
          processors:
            - [...]
        - processors:
            - branch:
                request_map: |
                  root = this.package.files.map_each(f -> f.path)
                processors:
                  - unarchive:
                      format: json_array
                  - for_each:
                      - command:
                          name: transfer-file
                      - switch:
                          - check: !errored()
                            processors:
                              - log:
                                  level: DEBUG
                                  message: "file transferred"
                                  fields_mapping: |
                                    root.output = @command_stderr
                                    root.metadata = metadata()
                          - check: errored()
                            processors:
                              - log:
                                  level: ERROR
                                  message: "file transfer failed: ${! error() }"
                                  fields_mapping: |
                                    root.metadata = metadata()
                  - archive:
                      format: lines
            [...]
output:
  reject_errored:
    nats_jetstream:
      [...]

When the command processor fails, I see logs like this for a message with 3 files:

DEBU[2025-11-11T08:32:41Z] Processor failed: execution error: exit status 66: [...]  label="" path=root.pipeline.processors.0.switch.1.processors.2.branch.processors.1.for_each.1
DEBU[2025-11-11T08:32:41Z] file transferred                              custom_source=true label="" metadata="[...]" output="<nil>" path=root.pipeline.processors.0.switch.1.processors.2.branch.processors.1.for_each.2.switch.0.processors.0
DEBU[2025-11-11T08:32:41Z] Processor failed: execution error: exit status 66: [...]  label="" path=root.pipeline.processors.0.switch.1.processors.2.branch.processors.1.for_each.1
DEBU[2025-11-11T08:32:41Z] file transferred                              custom_source=true label="" metadata="[...]" output="<nil>" path=root.pipeline.processors.0.switch.1.processors.2.branch.processors.1.for_each.2.switch.0.processors.0
DEBU[2025-11-11T08:32:42Z] Processor failed: execution error: exit status 66: [...]  label="" path=root.pipeline.processors.0.switch.1.processors.2.branch.processors.1.for_each.1
DEBU[2025-11-11T08:32:42Z] file transferred                              custom_source=true label="" metadata="[...]" output="<nil>" path=root.pipeline.processors.0.switch.1.processors.2.branch.processors.1.for_each.2.switch.0.processors.0
DEBU[2025-11-11T08:32:42Z] Branch error: processors failed: execution error: exit status 66: [...]  label="" path=root.pipeline.processors.0.switch.1.processors.2

There are several things wrong here:

Problem 1: logging

In this case the process fails with exit code 66. Yet the "Processor failed" log message is logged on debug level. Why? An failure of a task like this should be error, or at least warning. I had to tune down the logger to debug, to actually see the problem. The way I see it: the log level for the "Processor failed" log message should be changes to error, or even better: make it configurable, with error as default.

This would also simplify my config, since I wouldn't be required to add an additional log processor for error scenarios.

Problem 2: error state

The process, executed by this command processor exited with code 66, yet the message batch is not errored(). Why? As you can see from my config and the logs, the "file transferred" message was logged, which should only happen when the processor was NOT errored().

So the question is: why is the message batch not errored(), despite the fact that the process exited with a non-zero code?

What makes this even weirder is the fact (that's how I actually noticed that something was wrong) that the messages are rejected, according to my config. Soooo, the message is errored() enough to reject the message, but not errored() enough to print the log? This doesn't make any sense to me. Please explain this. What am I missing?

Problem 3: exit code

Since errored() seems a bit unreliable, I wanted to use the exit code directly to decide, what to log. Then I realized, I can't get the exit code. I also planned to publish certain metrics later, based on the exit code. So, I need that anyway. Is there some way to get it already? If not, is it possible to put it in?

CONCLUSION:

  1. The log level for the "Processor failed" message of the command processor should be configurable and error by default
  2. The message batch not being errored() when the process failed (non-zero exit code) is a problem. At least explain this, or fix it (in case it's an actual bug).
  3. Make the exit code for a command processor available, always (even when it's zero), or update the documentation, in case this is already possible.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions