Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retryability of DLQ data #4295

Closed
travisbenedict opened this issue Mar 18, 2024 · 9 comments
Closed

Improve retryability of DLQ data #4295

travisbenedict opened this issue Mar 18, 2024 · 9 comments
Assignees
Labels
bug Something isn't working question Further information is requested

Comments

@travisbenedict
Copy link
Contributor

travisbenedict commented Mar 18, 2024

Is your feature request related to a problem? Please describe.
I have an OpenSearch cluster that intermittently has specific indexes write blocked due to miscellaneous failures. To maintain the ingestion throughput for the non blocked indexes during this time I set the opensearch.max_retries to a low value. This way data for the blocked index gets sent to the DLQ quickly and the rest of the data continues being written to my sink.

After I resolve the index write blocks I want to redrive the data from my DLQ using a separate instance of Data Prepper with an S3 Scan source. As far as I can tell the existing codecs/processors for Data Prepper do not allow for directly processing the JSON object that's written to the DLQ and splitting the dlqObjects array into multiple documents.

Here's an example of the data that was written to my DLQ bucket

{
    "dlqObjects": [{
        "pluginId": "opensearch",
        "pluginName": "opensearch",
        "pipelineName": "apache-log-pipeline",
        "failedData": {
            "index": "logs",
            "indexId": null,
            "status": 0,
            "message": "Number of retries reached the limit of max retries (configured value 1)",
            "document": {
                "time": "2014-08-11T11:40:13+00:00",
                "remote_addr": "xxx.xxx.xx",
                "status": "404",
                "request": "GET http://www.k2proxy.com//hello.html HTTP/1.1",
                "http_user_agent": "Mozilla/4.0 (compatible; WOW64; SLCC2;)",
                "@timestamp": "2024-03-14T21:24:37.498Z"
            }
        },
        "timestamp": "2024-03-14T21:24:37.572Z"
    }]
}

Describe the solution you'd like

I would like to be able to enable my source codec to parse the elements out of the dlqObjects array.

The S3 json codec configuration could add something like an array_source key. The JSON array at the array_source would then be processed by the codec and the rest of the data in the original JSON object would be ignored/dropped.

An S3 scan pipeline for redriving DLQ data might look something like this:

dlq-pipeline:
  source:
    s3:
      acknowledgments: true
      codec:
        json: 
         array_source: "dlqObjects" # NEW
      scan:
        scheduling:
            interval: PT1H
        buckets:
          - bucket:
              name: "mydlq"

Describe alternatives you've considered (Optional)
Add a new configuration for specifying the format of the data that's written to the DLQ. This might allow Data Prepper to write the DLQ as a JSON array rather than a JSON object.

This might look some thing like this:

        dlq:
          s3:
            bucket: "mydlq"
            codec:
              json:

or

        dlq:
          format:
            json:
          s3:
            bucket: "mydlq"

The data written to the DLQ would then be the same as the dlqObjects JSON array.

In my case it might look like this:

[{
        "pluginId": "opensearch",
        "pluginName": "opensearch",
        "pipelineName": "apache-log-pipeline",
        "failedData": {
            "index": "logs",
            "indexId": null,
            "status": 0,
            "message": "Number of retries reached the limit of max retries (configured value 1)",
            "document": {
                "time": "2014-08-11T11:40:13+00:00",
                "remote_addr": "xxx.xxx.xx",
                "status": "404",
                "request": "GET http://www.k2proxy.com//hello.html HTTP/1.1",
                "http_user_agent": "Mozilla/4.0 (compatible; WOW64; SLCC2;)",
                "@timestamp": "2024-03-14T21:24:37.498Z"
            }
        },
        "timestamp": "2024-03-14T21:24:37.572Z"
    }]

Additional context
None

@dlvenable
Copy link
Member

@travisbenedict , Can you share an example of the DLQ S3 with multiple DLQ objects?

@dlvenable dlvenable added bug Something isn't working question Further information is requested and removed untriaged labels Mar 19, 2024
@oeyh oeyh self-assigned this Mar 21, 2024
@oeyh
Copy link
Collaborator

oeyh commented Mar 21, 2024

I do see such an example in my DLQ bucket that has multiple objects:

{
  "dlqObjects": [
    {
      "pluginId": "opensearch",
      "pluginName": "opensearch",
      "pipelineName": "log-pipeline",
      "failedData": {
        "index": "raw-logs-alias",
        "indexId": null,
        "status": 400,
        "message": "no write index is defined for alias [raw-logs-alias]. The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index",
        "document": {
          "http_user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.1 Mobile/15E148 Safari/604.1"
        }
      },
      "timestamp": "2024-01-10T17:57:19.983Z"
    },
    {
      "pluginId": "opensearch",
      "pluginName": "opensearch",
      "pipelineName": "log-pipeline",
      "failedData": {
        "index": "raw-logs-alias",
        "indexId": null,
        "status": 400,
        "message": "no write index is defined for alias [raw-logs-alias]. The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index",
        "document": {
          "http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 12.6; rv:42.0) Gecko/20100101 Firefox/42.0"
        }
      },
      "timestamp": "2024-01-10T17:57:19.984Z"
    },
    {
      "pluginId": "opensearch",
      "pluginName": "opensearch",
      "pipelineName": "log-pipeline",
      "failedData": {
        "index": "raw-logs-alias",
        "indexId": null,
        "status": 400,
        "message": "no write index is defined for alias [raw-logs-alias]. The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index",
        "document": {
          "http_user_agent": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
        }
      },
      "timestamp": "2024-01-10T17:57:19.984Z"
    }
  ]
}

@dlvenable
Copy link
Member

@travisbenedict , @oeyh ,

The existing json source code already supports this format. It will read a JSON file, and then parse out objects from the first array found. This is how it supports CloudTrail logs.

The following example should work.

dlq-pipeline:
  source:
    s3:
      acknowledgments: true
      codec:
        json: 
      scan:
        scheduling:
            interval: PT1H
        buckets:
          - bucket:
              name: "mydlq"

@dlvenable
Copy link
Member

I just tested this using a very simple pipeline on the latest main for the upcoming 2.7.0 release. I took the sample you shared and saved it to the dlq-object.json.

dlq-replay-pipeline:
  workers: 2
  delay: 100
  source:
    file:
      path: /usr/share/data-prepper-input-files/dlq-object.json
      codec:
        json:

  sink:
    - stdout:

Docker compose to help get started:

version: "3.7"
services:
  data-prepper:
    container_name: data-prepper
    image: opensearch-data-prepper:2.7.0-SNAPSHOT
    volumes:
      - ./data-prepper/pipelines:/usr/share/data-prepper/pipelines
      - ./input-files:/usr/share/data-prepper-input-files
    ports:
      - "4900:4900"
    networks:
      - sample_network

networks:
  sample_network:

I see the following logs (note that data-prepper | is from Docker) :

data-prepper  | {"pluginId":"opensearch","pluginName":"opensearch","pipelineName":"log-pipeline","failedData":{"index":"raw-logs-alias","indexId":null,"status":400,"message":"no write index is defined for alias [raw-logs-alias]. The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index","document":{"http_user_agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 13_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.1 Mobile/15E148 Safari/604.1"}},"timestamp":"2024-01-10T17:57:19.983Z"}
data-prepper  | {"pluginId":"opensearch","pluginName":"opensearch","pipelineName":"log-pipeline","failedData":{"index":"raw-logs-alias","indexId":null,"status":400,"message":"no write index is defined for alias [raw-logs-alias]. The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index","document":{"http_user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 12.6; rv:42.0) Gecko/20100101 Firefox/42.0"}},"timestamp":"2024-01-10T17:57:19.984Z"}
data-prepper  | {"pluginId":"opensearch","pluginName":"opensearch","pipelineName":"log-pipeline","failedData":{"index":"raw-logs-alias","indexId":null,"status":400,"message":"no write index is defined for alias [raw-logs-alias]. The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index","document":{"http_user_agent":"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"}},"timestamp":"2024-01-10T17:57:19.984Z"}

@travisbenedict
Copy link
Contributor Author

Thanks @dlvenable let me try this out. I think there might be some room to improve the documentation for the JSON codec. This is all it says now

The json codec parses each S3 object as a single JSON object from a JSON array and then creates a Data Prepper log event for each object in the array.

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#json-codec

@dlvenable
Copy link
Member

@travisbenedict , Yes, one thought I have is to split out the codec documentation and then we can put some examples in there as well.

@travisbenedict
Copy link
Contributor Author

Confirmed that S3 scan with the json codec works for retrying data from my DLQ bucket.

@travisbenedict
Copy link
Contributor Author

Posting my full working DLQ redriving pipeline configuration here in case anyone else comes across this issue

version: "2"
dlq-pipeline:
  source:
    s3:
      codec:
        json: 
      compression: "none"
      aws:
        region: "<<region>>"
        sts_role_arn: "<<role>>"
      acknowledgments: true
      scan:
        scheduling:
            interval: PT1M # Tune this
        buckets:
          - bucket:
              name: "<<dlq-bucket>>"
  processor:
    # Only retry DLQ records that failed because the max retries were reached on the sink
    - drop_events:
        drop_when: not contains(/failedData/message, "Number of retries reached the limit of max retries")    
  sink:
    - opensearch:
        max_retries: 1
        document_root_key: "/failedData/document" # Exclude the DLQ metadata from being written
        hosts: [ "<<my-host>>" ]
        aws:
          sts_role_arn: "<<role>>"
          region: "<<region>>"
        index: "<<index>>"

I'll create a documentation PR if I get some time

@claraddepar
Copy link

claraddepar commented Jun 22, 2024

Hi!

I am currently configuring a DLQ pipeline to redrive failed events from each of my pipelines (which each use a different index template since theyre processing from different dynamo tables).

With the final pipeline config you've posted, would I need to define the template again under > sink, and have 2 separate DLQ pipelines, 1 for each of the indexes? Or would it be enough to just have index set to the already-existing index name-- would the template get linked that way?

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working question Further information is requested
Projects
Development

No branches or pull requests

4 participants