Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Unhelpful error message initializing OpenSearch Ingestion, OpenSearch sink #4195

Open
Jon-AtAWS opened this issue Feb 27, 2024 · 16 comments
Labels
bug Something isn't working

Comments

@Jon-AtAWS
Copy link
Member

See also: opensearch-project/opensearch-java#473

Looks like we maybe fixed this in the java client, but not in the Python client? Or maybe this is a different code path?

I haven't been able to diagnose exactly what's going on and where the failure is. Here's what's in CloudWatch Logs for the OpenSearch sink initialization

2024-02-27T19:37:23.594 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
	at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.model.sink.AbstractSink.initialize(AbstractSink.java:52) ~[data-prepper-api-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.isReady(Pipeline.java:200) ~[data-prepper-core-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:252) ~[data-prepper-core-2.6.1.jar:?]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

FWIW, my pipeline role has:

		{
			"Action": [
				"es:DescribeDomain",
				"es:*"
			],
			"Resource": "arn:aws:es:us-west-2:OBSCURED:domain/OBSCURED",
			"Effect": "Allow"
		},
@Jon-AtAWS Jon-AtAWS added bug Something isn't working untriaged labels Feb 27, 2024
@Jon-AtAWS
Copy link
Member Author

Actually, this may not have anything to do with the Python client, it's either the Java client or Data Prepper. Can you re-route?

@dblock dblock transferred this issue from opensearch-project/opensearch-py Feb 28, 2024
@Utkarsh-Aga
Copy link
Contributor

Hello @Jon-AtAWS ,
Can you please once update the Pipeline Role with the following permission, and see if you continue to face the issue ?

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "es:DescribeDomain",
            "Resource": "arn:aws:es:us-west-2:{your-account-id}:domain/{domain-name}"
        },
        {
            "Effect": "Allow",
            "Action": "es:ESHttp*",
            "Resource": "arn:aws:es:us-west-2:{your-account-id}:domain/{domain-name}/*"
        }
    ]
}

@Jon-AtAWS
Copy link
Member Author

Thanks for the fast response!

My pipeline role has (pardon the CDK code)

        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}"],
            "actions": [
                "es:ESHttp*",
                "es:DescribeDomain"
            ]
        }))

And I have verified that the statement is correct in the generated policy. It's not deployed right now, so I can't screenshot the IAM console.

The pipeline works if I map the role to FGAC's all_access role, or add "" and "" for index and cluster level permissions to the OpenSearch role I create. There's some permission that I need in OpenSearch FGAC that's causing this error.

Looking at the code for checkIfIndexExists (the failing call), it looks like it's calling HEAD /index_name. I have tried cluster_composite_ops, and a variety of other cluster level permissions for FGAC. At the index level, I have indices_all, and I've also tried crud, read, and write.

My next attempt will be to add indices_all to cluster level permissions. That doesn't make any sense, really, but maybe it will work?

My goal is to find the minimum permissions needed for the FGAC role to work with DataPrepper/OpenSearch Ingestion. If you already know that, then please let me know!

@Jon-AtAWS
Copy link
Member Author

Well, shoot. As I was re-reading that, I realized that my resource is missing a wildcard for the es:ESHttp action. So, I changed to

        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}"],
            "actions": [
                "es:DescribeDomain"
            ]
        }))
        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}/*"],
            "actions": [
                "es:ESHttp*",
            ]
        }))

And it worked. I still don't understand why it also worked when I changed to all_access for FGAC, I will try that again. And, I still think that we need a better error message, including the entity that was presenting credentials, and the API that was called. Even better if the error message specifies the permissions that I need (whether IAM or FGAC)

@Jon-AtAWS
Copy link
Member Author

Nope, something else made it work.

Here's the error

2024-03-01T17:38:09.531 [Thread-11] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
	at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.model.sink.SinkThread.run(SinkThread.java:25) ~[data-prepper-api-2.6.1.jar:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

Here is the role's trust relationship

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "osis-pipelines.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Here is the role's permissions

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Action": "es:DescribeDomain",
			"Resource": "arn:aws:es:us-west-2:XXXXXXXXXXX:domain/X",
			"Effect": "Allow"
		},
		{
			"Action": "es:ESHttp*",
			"Resource": "arn:aws:es:us-west-2:XXXXXXXXXXX:domain/X/*",
			"Effect": "Allow"
		},
		{
			"Action": "dynamodb:DescribeExport",
			"Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X/export/*",
			"Effect": "Allow"
		},
		{
			"Action": [
				"dynamodb:DescribeStream",
				"dynamodb:GetRecords",
				"dynamodb:GetShardIterator"
			],
			"Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X/stream/*",
			"Effect": "Allow"
		},
		{
			"Action": [
				"dynamodb:DescribeContinuousBackups",
				"dynamodb:DescribeTable",
				"dynamodb:ExportTableToPointInTime"
			],
			"Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X",
			"Effect": "Allow"
		},
		{
			"Action": [
				"s3:AbortMultipartUpload",
				"s3:GetObject",
				"s3:PutObject",
				"s3:PutObjectAcl"
			],
			"Resource": "arn:aws:s3:::XXXXXXXXX/X/export/*",
			"Effect": "Allow"
		}
	]
}

Here is the FGAC permissions

{
  "pipeline_write_role": {
    "reserved": false,
    "hidden": false,
    "cluster_permissions": [
      "cluster_all",
      "indices_all"
    ],
    "index_permissions": [
      {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "crud",
          "create_index"
        ]
      }
    ],
    "tenant_permissions": [],
    "static": false
  }
}

@Jon-AtAWS
Copy link
Member Author

This:

PUT _plugins/_security/api/roles/pipeline_write_role
{
  "cluster_permissions": ["cluster_monitor", "indices_all"],
  "index_permissions": [
  {
    "index_patterns": [ "*" ],
    "dls": "",
    "fls": [],
    "masked_fields": [],
    "allowed_actions": [
      "indices_all"
    ]
  }]
}

Allowed the sink to initialize

Summarizing

  1. It's NOT the permissions on the IAM role
  2. crud, and create_index are not enough for FGAC

What is the additional permission I need, and can we fix the error message?

@kkondaka
Copy link
Collaborator

kkondaka commented Mar 5, 2024

DataPrepper can only show the message provided by OpenSearch.

@kkondaka kkondaka removed the untriaged label Mar 5, 2024
@Jon-AtAWS
Copy link
Member Author

It would be good for DP to add context like: "called client's IndexExists method, attempting to validate the non-existence of index 'foo', in order to set the template." Or something like that.

@dlvenable
Copy link
Member

@Jon-AtAWS , Yes, I agree with this.

@kkondaka , I think we could accomplish this by adding a try-catch when making the call. Maybe we can even have a special exception for failed requests?

try {
  final BooleanResponse booleanResponse = openSearchClient.indices().exists(
                new ExistsRequest.Builder().index(indexAlias).build());
} catch(Exception ex) {
  throw new OpenSearchRequestException("checking that index exists.", ex);
}

And

class OpenSearchRequestException extends RuntimeExtension {

  @Override
  public String getMessage() {
    return "Failed while" + message + " with error code: " + innerException.getMessage();
  }
}

@Jon-AtAWS
Copy link
Member Author

throw new OpenSearchRequestException("checking that index exists.", ex);

Thanks @dlvenable !

Nitpicking a bit - let's pack as much information in these messages as possible. For instance, in this message, we can add the index name we're checking. If we know the authenticated entity, and its roles, we should add that as well. More information is better!

@dlvenable
Copy link
Member

throw new OpenSearchRequestException("checking that index exists.", ex);

Thanks @dlvenable !

Nitpicking a bit - let's pack as much information in these messages as possible. For instance, in this message, we can add the index name we're checking. If we know the authenticated entity, and its roles, we should add that as well. More information is better!

@Jon-AtAWS , This is great feedback. We could quite easily include the index name. The role name is a bit more challenging, but it may be provided back in the OpenSearch response.

@krishna-ggk
Copy link

@Jon-AtAWS Based on stacktrace, the failure happened while checking if the index exists which requires indices:admin/exists FGAC permission. I don't see this permission being explicit part of any action group.

@Jon-AtAWS
Copy link
Member Author

I now have this policy

{
  "pipeline_write_role": {
    "reserved": false,
    "hidden": false,
    "cluster_permissions": [
      "indices:admin/exists",
      "cluster_monitor",
      "cluster_composite_ops"
    ],
    "index_permissions": [
      {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "indices:admin/exists",
          "crud"
        ]
      }
    ],
    "tenant_permissions": [],
    "static": false
  }
}

And receiving this error:

2024-03-07T23:16:55.564 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
	at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.model.sink.AbstractSink.initialize(AbstractSink.java:52) ~[data-prepper-api-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.isReady(Pipeline.java:200) ~[data-prepper-core-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:252) ~[data-prepper-core-2.6.1.jar:?]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

@Jon-AtAWS
Copy link
Member Author

This policy is successful (changing to "indices_all" at index level)

PUT _plugins/_security/api/roles/pipeline_write_role
{
    "cluster_permissions": [
        "indices:admin/exists",
        "cluster_monitor",
        "cluster_composite_ops"
    ],
    "index_permissions": [
    {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "indices_all",
          "crud"
        ]
    }
    ],
    "tenant_permissions": []
}

The docs (https://opensearch.org/docs/latest/security/access-control/default-action-groups/) are not very helpful here. indices_all action group is documented as "Grants all permissions on the index. Equates to indices:*". So, I can't really tell which permission I added that enabled the sink to initialize. And, the error doesn't tell me which permission I need.

@krishna-ggk
Copy link

If you have indices_all, ideally you should not require crud. Are you trying to breakdown indices_all further to scope it down for ingestion?

@Jon-AtAWS
Copy link
Member Author

Yes, I am trying to figure out minimum permissions. I understand that I don't need crud, I just threw indices_all in there (as an edit to indices:admin/exists) to verify that the exists perm is not sufficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Development

No branches or pull requests

5 participants