From 569e3d19a390eca64ae5198e528c3f055fdc0765 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=BCrgen=20Walter?= <juergen.walter@sap.com>
Date: Wed, 30 Oct 2024 14:00:52 +0100
Subject: [PATCH] Add index_types for OTEL logs and metrics #3148 (#3929)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Add ISM policies for logs and metrics

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Add index templates for logs

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Add index templates for metrics

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Add index_types for OTEL logs and metrics

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Test OpenSearch sink

works with log-analytics and metric-analytics
and index type

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Test log and metric index types

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Document log and metrics index_type usage

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Minor: Remove incorrect html tag

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Fix test by adding date_detection false

Fixes testInstantiateSinkMetricsDefaultMetricSink

Alertnative would have been to adjust the test

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Rename test

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Fix metric ism file constants

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Rename index template file for metrics

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Add assertions to tests

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Fix index patterns for logs and metrics

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Update to plugins ISM API

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Add assertion

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Revert "Update to plugins ISM API"

This reverts commit 3fd61af58662bbdd978f05531219880079690c1c.

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Add fields data prepper writes

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

* Use field type data prepper writes

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>

---------

Signed-off-by: Jürgen Walter <juergen.walter@sap.com>
---
 data-prepper-plugins/opensearch/README.md     |  43 ++-
 .../sink/opensearch/OpenSearchSinkIT.java     |  93 +++++++
 .../opensearch/index/IndexConfiguration.java  |   4 +
 .../sink/opensearch/index/IndexConstants.java |  15 +-
 .../opensearch/index/IndexManagerFactory.java |  44 +++
 .../sink/opensearch/index/IndexType.java      |   2 +
 .../logs-otel-v1-index-template.json          | 110 ++++++++
 .../metrics-otel-v1-index-template.json       | 250 ++++++++++++++++++
 .../logs-otel-v1-index-template.json          | 108 ++++++++
 .../logs-policy-no-ism-template.json          |  19 ++
 .../logs-policy-with-ism-template.json        |  22 ++
 .../metrics-otel-v1-index-template.json       | 248 +++++++++++++++++
 .../metrics-policy-no-ism-template.json       |  19 ++
 .../metrics-policy-with-ism-template.json     |  22 ++
 .../sink/opensearch/index/IndexTypeTests.java |   4 +-
 15 files changed, 999 insertions(+), 4 deletions(-)
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/index-template/logs-otel-v1-index-template.json
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/index-template/metrics-otel-v1-index-template.json
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/logs-otel-v1-index-template.json
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/logs-policy-no-ism-template.json
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/logs-policy-with-ism-template.json
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/metrics-otel-v1-index-template.json
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/metrics-policy-no-ism-template.json
 create mode 100644 data-prepper-plugins/opensearch/src/main/resources/metrics-policy-with-ism-template.json

diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md
index 628a75cc80..05c0951d47 100644
--- a/data-prepper-plugins/opensearch/README.md
+++ b/data-prepper-plugins/opensearch/README.md
@@ -27,7 +27,7 @@ pipeline:
 
 The OpenSearch sink will reserve `otel-v1-apm-span-*` as index pattern and `otel-v1-apm-span` as index alias for record ingestion.
 
-### </a>Service map trace analytics
+### Service map trace analytics
 
 ```
 pipeline:
@@ -45,6 +45,45 @@ pipeline:
 
 The OpenSearch sink will reserve `otel-v1-apm-service-map` as index for record ingestion.
 
+### Log analytics
+
+```
+pipeline:
+  ...
+  sink:
+    opensearch:
+      hosts: ["https://localhost:9200"]
+      cert: path/to/cert
+      username: YOUR_USERNAME_HERE
+      password: YOUR_PASSWORD_HERE
+      index_type: log-analytics
+      dlq_file: /your/local/dlq-file
+      max_retries: 20
+      bulk_size: 4
+```
+
+The OpenSearch sink will reserve `logs-otel-v1-*` as index pattern and `logs-otel-v1` as index alias for record ingestion.
+
+### Metric analytics
+
+```
+pipeline:
+  ...
+  sink:
+    opensearch:
+      hosts: ["https://localhost:9200"]
+      cert: path/to/cert
+      username: YOUR_USERNAME_HERE
+      password: YOUR_PASSWORD_HERE
+      index_type: metric-analytics
+      dlq_file: /your/local/dlq-file
+      max_retries: 20
+      bulk_size: 4
+```
+
+The OpenSearch sink will reserve `metrics-otel-v1-*` as index pattern and `metrics-otel-v1` as index alias for record ingestion.
+
+
 ### Amazon OpenSearch Service
 
 The OpenSearch sink can also be configured for an Amazon OpenSearch Service domain. See [security](security.md) for details.
@@ -93,7 +132,7 @@ Default is null.
 
 - `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.
 
-- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
+- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `metric-analytics`, `log-analytics`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
 
 - `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. For `distribution_version` set to `es6`, default value is `false`, otherwise default value is `true`.
 
diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java
index b17c0ea47c..d97feb30ba 100644
--- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java
+++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java
@@ -122,6 +122,8 @@ public class OpenSearchSinkIT {
     private static final String DEFAULT_SERVICE_MAP_FILE = "service-map-1.json";
     private static final String INCLUDE_TYPE_NAME_FALSE_URI = "?include_type_name=false";
     private static final String TRACE_INGESTION_TEST_DISABLED_REASON = "Trace ingestion is not supported for ES 6";
+    private static final String LOG_INGESTION_TEST_DISABLED_REASON = "Log ingestion is not supported for ES 6";
+    private static final String METRIC_INGESTION_TEST_DISABLED_REASON = "Metric ingestion is not supported for ES 6";
 
     private RestClient client;
     private SinkContext sinkContext;
@@ -188,6 +190,7 @@ public void testInstantiateSinkRawSpanDefault() throws IOException {
         final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
         OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
         final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
+        assertThat(indexAlias, equalTo("otel-v1-apm-span"));
         Request request = new Request(HttpMethod.HEAD, indexAlias);
         Response response = client.performRequest(request);
         assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
@@ -226,6 +229,96 @@ public void testInstantiateSinkRawSpanDefault() throws IOException {
         }
     }
 
+    @Test
+    @DisabledIf(value = "isES6", disabledReason = LOG_INGESTION_TEST_DISABLED_REASON)
+    public void testInstantiateSinkLogsDefaultLogSink() throws IOException {
+        final PluginSetting pluginSetting = generatePluginSetting(IndexType.LOG_ANALYTICS.getValue(), null, null);
+        OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
+        final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.LOG_ANALYTICS);
+        assertThat(indexAlias, equalTo("logs-otel-v1"));
+        Request request = new Request(HttpMethod.HEAD, indexAlias);
+        Response response = client.performRequest(request);
+        assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
+        final String index = String.format("%s-000001", indexAlias);
+        final Map<String, Object> mappings = getIndexMappings(index);
+        assertThat(mappings, notNullValue());
+        assertThat((boolean) mappings.get("date_detection"), equalTo(false));
+        sink.shutdown();
+
+        if (isOSBundle()) {
+            // Check managed index
+            await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+                        assertThat(getIndexPolicyId(index), equalTo(IndexConstants.LOGS_ISM_POLICY));
+                    }
+            );
+        }
+
+        // roll over initial index
+        request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias));
+        request.setJsonEntity("{ \"conditions\" : { } }\n");
+        response = client.performRequest(request);
+        assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
+
+        // Instantiate sink again
+        sink = createObjectUnderTest(pluginSetting, true);
+        // Make sure no new write index *-000001 is created under alias
+        final String rolloverIndexName = String.format("%s-000002", indexAlias);
+        request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias");
+        response = client.performRequest(request);
+        assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true));
+        sink.shutdown();
+
+        if (isOSBundle()) {
+            // Check managed index
+            assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.LOGS_ISM_POLICY));
+        }
+    }
+
+    @Test
+    @DisabledIf(value = "isES6", disabledReason = METRIC_INGESTION_TEST_DISABLED_REASON)
+    public void testInstantiateSinkMetricsDefaultMetricSink() throws IOException {
+        final PluginSetting pluginSetting = generatePluginSetting(IndexType.METRIC_ANALYTICS.getValue(), null, null);
+        OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
+        final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.METRIC_ANALYTICS);
+        assertThat(indexAlias, equalTo("metrics-otel-v1"));
+        Request request = new Request(HttpMethod.HEAD, indexAlias);
+        Response response = client.performRequest(request);
+        assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
+        final String index = String.format("%s-000001", indexAlias);
+        final Map<String, Object> mappings = getIndexMappings(index);
+        assertThat(mappings, notNullValue());
+        assertThat((boolean) mappings.get("date_detection"), equalTo(false));
+        sink.shutdown();
+
+        if (isOSBundle()) {
+            // Check managed index
+            await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+                        assertThat(getIndexPolicyId(index), equalTo(IndexConstants.METRICS_ISM_POLICY));
+                    }
+            );
+        }
+
+        // roll over initial index
+        request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias));
+        request.setJsonEntity("{ \"conditions\" : { } }\n");
+        response = client.performRequest(request);
+        assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
+
+        // Instantiate sink again
+        sink = createObjectUnderTest(pluginSetting, true);
+        // Make sure no new write index *-000001 is created under alias
+        final String rolloverIndexName = String.format("%s-000002", indexAlias);
+        request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias");
+        response = client.performRequest(request);
+        assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true));
+        sink.shutdown();
+
+        if (isOSBundle()) {
+            // Check managed index
+            assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.METRICS_ISM_POLICY));
+        }
+    }
+
     @Test
     @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON)
     public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws IOException {
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java
index 392b3bf556..10dd892296 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java
@@ -425,6 +425,10 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
                 templateURL = loadExistingTemplate(templateType, IndexConstants.RAW_DEFAULT_TEMPLATE_FILE);
             } else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) {
                 templateURL = loadExistingTemplate(templateType, IndexConstants.SERVICE_MAP_DEFAULT_TEMPLATE_FILE);
+            } else if (indexType.equals(IndexType.LOG_ANALYTICS)) {
+                templateURL = loadExistingTemplate(templateType, IndexConstants.LOGS_DEFAULT_TEMPLATE_FILE);
+            } else if (indexType.equals(IndexType.METRIC_ANALYTICS)) {
+                templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE);
             } else if (templateFile != null) {
                 if (templateFile.toLowerCase().startsWith(S3_PREFIX)) {
                     FileReader s3FileReader = new S3FileReader(s3Client);
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java
index d20003e222..63529c3419 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java
@@ -18,6 +18,17 @@ public class IndexConstants {
   public static final String RAW_ISM_POLICY = "raw-span-policy";
   public static final String RAW_ISM_FILE_NO_ISM_TEMPLATE = "raw-span-policy-no-ism-template.json";
   public static final String RAW_ISM_FILE_WITH_ISM_TEMPLATE = "raw-span-policy-with-ism-template.json";
+
+  public static final String LOGS_DEFAULT_TEMPLATE_FILE = "logs-otel-v1-index-template.json";
+  public static final String LOGS_ISM_POLICY = "logs-policy";
+  public static final String LOGS_ISM_FILE_NO_ISM_TEMPLATE = "logs-policy-no-ism-template.json";
+  public static final String LOGS_ISM_FILE_WITH_ISM_TEMPLATE = "logs-policy-with-ism-template.json";
+
+  public static final String METRICS_DEFAULT_TEMPLATE_FILE = "metrics-otel-v1-index-template.json";
+  public static final String METRICS_ISM_POLICY = "metrics-policy";
+  public static final String METRICS_ISM_FILE_NO_ISM_TEMPLATE = "metrics-policy-no-ism-template.json";
+  public static final String METRICS_ISM_FILE_WITH_ISM_TEMPLATE = "metrics-policy-with-ism-template.json";
+
   public static final String ISM_ENABLED_SETTING = "opendistro.index_state_management.enabled";
   public static final String ISM_POLICY_ID_SETTING = "opendistro.index_state_management.policy_id";
   public static final String ISM_ROLLOVER_ALIAS_SETTING = "opendistro.index_state_management.rollover_alias";
@@ -26,7 +37,9 @@ public class IndexConstants {
 
   static {
     // TODO: extract out version number into version enum
-    TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW, "otel-v1-apm-span");
     TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map");
+    TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW, "otel-v1-apm-span");
+    TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS, "logs-otel-v1");
+    TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1");
   }
 }
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java
index 4e4868debf..17574a5f51 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java
@@ -58,6 +58,14 @@ public final IndexManager getIndexManager(final IndexType indexType,
                 indexManager = new TraceAnalyticsServiceMapIndexManager(
                         restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
                 break;
+            case LOG_ANALYTICS:
+                indexManager = new LogAnalyticsIndexManager(
+                        restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
+                break;
+            case METRIC_ANALYTICS:
+                indexManager = new MetricAnalyticsIndexManager(
+                        restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
+                break;
             case MANAGEMENT_DISABLED:
                 indexManager = new ManagementDisabledIndexManager(
                         restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
@@ -140,6 +148,42 @@ public TraceAnalyticsServiceMapIndexManager(final RestHighLevelClient restHighLe
         }
     }
 
+    private static class LogAnalyticsIndexManager extends AbstractIndexManager {
+
+        public LogAnalyticsIndexManager(final RestHighLevelClient restHighLevelClient,
+                                                    final OpenSearchClient openSearchClient,
+                                                    final OpenSearchSinkConfiguration openSearchSinkConfiguration,
+                                                    final ClusterSettingsParser clusterSettingsParser,
+                                                    final TemplateStrategy templateStrategy,
+                                                    final String indexAlias) {
+            super(restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
+            this.ismPolicyManagementStrategy = new IsmPolicyManagement(
+                    openSearchClient,
+                    restHighLevelClient,
+                    IndexConstants.LOGS_ISM_POLICY,
+                    IndexConstants.LOGS_ISM_FILE_WITH_ISM_TEMPLATE,
+                    IndexConstants.LOGS_ISM_FILE_NO_ISM_TEMPLATE);
+        }
+    }
+
+    private static class MetricAnalyticsIndexManager extends AbstractIndexManager {
+
+        public MetricAnalyticsIndexManager(final RestHighLevelClient restHighLevelClient,
+                                                    final OpenSearchClient openSearchClient,
+                                                    final OpenSearchSinkConfiguration openSearchSinkConfiguration,
+                                                    final ClusterSettingsParser clusterSettingsParser,
+                                                    final TemplateStrategy templateStrategy,
+                                                    final String indexAlias) {
+            super(restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
+            this.ismPolicyManagementStrategy = new IsmPolicyManagement(
+                    openSearchClient,
+                    restHighLevelClient,
+                    IndexConstants.METRICS_ISM_POLICY,
+                    IndexConstants.METRICS_ISM_FILE_WITH_ISM_TEMPLATE,
+                    IndexConstants.METRICS_ISM_FILE_NO_ISM_TEMPLATE);
+        }
+    }
+
     private class ManagementDisabledIndexManager extends AbstractIndexManager {
         protected ManagementDisabledIndexManager(final RestHighLevelClient restHighLevelClient,
                                                  final OpenSearchClient openSearchClient,
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java
index 1636dd3163..58cc57abe1 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java
@@ -14,6 +14,8 @@
 public enum IndexType {
     TRACE_ANALYTICS_RAW("trace-analytics-raw"),
     TRACE_ANALYTICS_SERVICE_MAP("trace-analytics-service-map"),
+    LOG_ANALYTICS("log-analytics"),
+    METRIC_ANALYTICS("metric-analytics"),
     CUSTOM("custom"),
     MANAGEMENT_DISABLED("management_disabled");
 
diff --git a/data-prepper-plugins/opensearch/src/main/resources/index-template/logs-otel-v1-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/index-template/logs-otel-v1-index-template.json
new file mode 100644
index 0000000000..d96b7ae9c3
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/index-template/logs-otel-v1-index-template.json
@@ -0,0 +1,110 @@
+{
+  "version": 1,
+  "template": {
+    "mappings": {
+      "date_detection": false,
+      "dynamic_templates": [
+        {
+          "resource_attributes_map": {
+            "mapping": {
+              "type": "keyword"
+            },
+            "path_match": "resource.attributes.*"
+          }
+        },
+        {
+          "log_attributes_map": {
+            "mapping": {
+              "type": "keyword"
+            },
+            "path_match": "log.attributes.*"
+          }
+        }
+      ],
+      "_source": {
+        "enabled": true
+      },
+      "properties": {
+        "severity": {
+          "properties": {
+            "number": {
+              "type": "long"
+            },
+            "text": {
+              "type": "keyword"
+            }
+          }
+        },
+        "body": {
+          "type": "text"
+        },
+        "@timestamp": {
+          "type": "date_nanos"
+        },
+        "time": {
+          "type": "date_nanos"
+        },
+        "observedTimestamp": {
+          "type": "date_nanos"
+        },
+        "observedTime": {
+          "type": "alias",
+          "path": "observedTimestamp"
+        },
+        "traceId": {
+          "ignore_above": 256,
+          "type": "keyword"
+        },
+        "spanId": {
+          "ignore_above": 256,
+          "type": "keyword"
+        },
+        "schemaUrl": {
+          "type": "keyword"
+        },
+        "instrumentationScope": {
+          "properties": {
+            "name": {
+              "type": "keyword"
+            },
+            "version": {
+              "type": "keyword"
+            }
+          }
+        },
+        "event": {
+          "properties": {
+            "kind": {
+              "type": "keyword"
+            },
+            "domain": {
+              "type": "keyword"
+            },
+            "category": {
+              "type": "keyword"
+            },
+            "type": {
+              "type": "keyword"
+            },
+            "result": {
+              "type": "keyword"
+            },
+           "exception": {
+              "properties": {
+                "message": {
+                  "type": "text"
+                },
+                "stacktrace": {
+                  "type": "text"
+                },
+                "type": {
+                  "type": "keyword"
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/opensearch/src/main/resources/index-template/metrics-otel-v1-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/index-template/metrics-otel-v1-index-template.json
new file mode 100644
index 0000000000..a54a512aec
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/index-template/metrics-otel-v1-index-template.json
@@ -0,0 +1,250 @@
+{
+  "version": 1,
+  "template": {
+    "mappings": {
+      "date_detection": false,
+      "dynamic_templates": [
+        {
+          "resources_map": {
+            "mapping": {
+              "type": "keyword"
+            },
+            "path_match": "resource.*"
+          }
+        }
+        ],
+      "_source": {
+        "enabled": true
+      },
+      "properties": {
+        "name": {
+          "type": "text",
+          "fields": {
+            "keyword": {
+              "type": "keyword",
+              "ignore_above": 256
+            }
+          }
+        },
+        "attributes": {
+          "type": "object",
+          "properties": {
+            "data_stream": {
+              "properties": {
+                "dataset": {
+                  "ignore_above": 128,
+                  "type": "keyword"
+                },
+                "namespace": {
+                  "ignore_above": 128,
+                  "type": "keyword"
+                },
+                "type": {
+                  "ignore_above": 56,
+                  "type": "keyword"
+                }
+              }
+            }
+          }
+        },
+        "description": {
+          "type": "text",
+          "fields": {
+            "keyword": {
+              "type": "keyword",
+              "ignore_above": 256
+            }
+          }
+        },
+        "unit": {
+          "type": "keyword",
+          "ignore_above": 128
+        },
+        "kind": {
+          "type": "keyword",
+          "ignore_above": 128
+        },
+        "aggregationTemporality": {
+          "type": "keyword",
+          "ignore_above": 128
+        },
+        "monotonic": {
+          "type": "boolean"
+        },
+        "startTime": {
+          "type": "date"
+        },
+        "@timestamp": {
+          "type": "date"
+        },
+        "time": {
+          "type": "date_nanos"
+        },
+        "observedTimestamp": {
+          "type": "date_nanos"
+        },
+        "value@int": {
+          "type": "integer"
+        },
+        "value@double": {
+          "type": "double"
+        },
+        "value": {
+          "type": "double"
+        },
+        "buckets": {
+          "type" : "nested",
+          "properties": {
+            "count": {
+              "type": "long"
+            },
+            "sum": {
+              "type": "double"
+            },
+            "max": {
+              "type": "float"
+            },
+            "min": {
+              "type": "float"
+            }
+          }
+        },
+        "bucketCount": {
+          "type": "long"
+        },
+        "bucketCountsList": {
+          "type": "long"
+        },
+        "explicitBoundsList": {
+          "type": "float"
+        },
+        "explicitBoundsCount": {
+          "type": "float"
+        },
+        "quantiles": {
+          "properties": {
+            "quantile": {
+              "type": "double"
+            },
+            "value": {
+              "type": "double"
+            }
+          }
+        },
+        "quantileValuesCount": {
+          "type": "long"
+        },
+        "positiveBuckets": {
+          "type" : "nested",
+          "properties": {
+            "count": {
+              "type": "long"
+            },
+            "max": {
+              "type": "float"
+            },
+            "min": {
+              "type": "float"
+            }
+          }
+        },
+        "negativeBuckets": {
+          "type" : "nested",
+          "properties": {
+            "count": {
+              "type": "long"
+            },
+            "max": {
+              "type": "float"
+            },
+            "min": {
+              "type": "float"
+            }
+          }
+        },
+        "negativeOffset": {
+          "type": "integer"
+        },
+        "positiveOffset": {
+          "type": "integer"
+        },
+        "zeroCount": {
+          "type": "long"
+        },
+        "scale": {
+          "type": "long"
+        },
+        "max": {
+          "type": "float"
+        },
+        "min": {
+          "type": "float"
+        },
+        "sum": {
+          "type": "float"
+        },
+        "count": {
+          "type": "long"
+        },
+        "exemplar": {
+          "properties": {
+            "time": {
+              "type": "date_nanos"
+            },
+            "traceId": {
+              "ignore_above": 256,
+              "type": "keyword"
+            },
+            "spanId": {
+              "ignore_above": 256,
+              "type": "keyword"
+            }
+          }
+        },
+        "instrumentationScope": {
+          "properties": {
+            "name": {
+              "type": "text",
+              "fields": {
+                "keyword": {
+                  "type": "keyword",
+                  "ignore_above": 128
+                }
+              }
+            },
+            "version": {
+              "type": "text",
+              "fields": {
+                "keyword": {
+                  "type": "keyword",
+                  "ignore_above": 256
+                }
+              }
+            },
+            "droppedAttributesCount": {
+              "type": "integer"
+            },
+            "schemaUrl": {
+              "type": "text",
+              "fields": {
+                "keyword": {
+                  "type": "keyword",
+                  "ignore_above": 256
+                }
+              }
+            }
+          }
+        },
+        "schemaUrl": {
+          "type": "text",
+          "fields": {
+            "keyword": {
+              "type": "keyword",
+              "ignore_above": 256
+            }
+          }
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/opensearch/src/main/resources/logs-otel-v1-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/logs-otel-v1-index-template.json
new file mode 100644
index 0000000000..d55d8662fc
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/logs-otel-v1-index-template.json
@@ -0,0 +1,108 @@
+{
+  "version": 1,
+  "mappings": {
+    "date_detection": false,
+    "dynamic_templates": [
+      {
+        "resource_attributes_map": {
+          "mapping": {
+            "type": "keyword"
+          },
+          "path_match": "resource.attributes.*"
+        }
+      },
+      {
+        "log_attributes_map": {
+          "mapping": {
+            "type": "keyword"
+          },
+          "path_match": "log.attributes.*"
+        }
+      }
+    ],
+    "_source": {
+      "enabled": true
+    },
+    "properties": {
+      "severity": {
+        "properties": {
+          "number": {
+            "type": "long"
+          },
+          "text": {
+            "type": "keyword"
+          }
+        }
+      },
+      "body": {
+        "type": "text"
+      },
+      "@timestamp": {
+        "type": "date_nanos"
+      },
+      "time": {
+        "type": "date_nanos"
+      },
+      "observedTimestamp": {
+        "type": "date_nanos"
+      },
+      "observedTime": {
+        "type": "alias",
+        "path": "observedTimestamp"
+      },
+      "traceId": {
+        "ignore_above": 256,
+        "type": "keyword"
+      },
+      "spanId": {
+        "ignore_above": 256,
+        "type": "keyword"
+      },
+      "schemaUrl": {
+        "type": "keyword"
+      },
+      "instrumentationScope": {
+        "properties": {
+          "name": {
+            "type": "keyword"
+          },
+          "version": {
+            "type": "keyword"
+          }
+        }
+      },
+      "event": {
+        "properties": {
+          "kind": {
+            "type": "keyword"
+          },
+          "domain": {
+            "type": "keyword"
+          },
+          "category": {
+            "type": "keyword"
+          },
+          "type": {
+            "type": "keyword"
+          },
+          "result": {
+            "type": "keyword"
+          },
+          "exception": {
+            "properties": {
+              "message": {
+                "type": "text"
+              },
+              "stacktrace": {
+                "type": "text"
+              },
+              "type": {
+                "type": "keyword"
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/opensearch/src/main/resources/logs-policy-no-ism-template.json b/data-prepper-plugins/opensearch/src/main/resources/logs-policy-no-ism-template.json
new file mode 100644
index 0000000000..a17daf6b39
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/logs-policy-no-ism-template.json
@@ -0,0 +1,19 @@
+{
+  "policy": {
+    "description": "Managing logs for log analytics",
+    "default_state": "current_write_index",
+    "states": [
+      {
+        "name": "current_write_index",
+        "actions": [
+          {
+            "rollover": {
+              "min_size": "50gb",
+              "min_index_age": "24h"
+            }
+          }
+        ]
+      }
+    ]
+  }
+}
diff --git a/data-prepper-plugins/opensearch/src/main/resources/logs-policy-with-ism-template.json b/data-prepper-plugins/opensearch/src/main/resources/logs-policy-with-ism-template.json
new file mode 100644
index 0000000000..8d89124f4f
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/logs-policy-with-ism-template.json
@@ -0,0 +1,22 @@
+{
+  "policy": {
+    "description": "Managing logs for log analytics",
+    "default_state": "current_write_index",
+    "states": [
+      {
+        "name": "current_write_index",
+        "actions": [
+          {
+            "rollover": {
+              "min_size": "50gb",
+              "min_index_age": "24h"
+            }
+          }
+        ]
+      }
+    ],
+    "ism_template": {
+      "index_patterns": ["logs-otel-v1-*"]
+    }
+  }
+}
diff --git a/data-prepper-plugins/opensearch/src/main/resources/metrics-otel-v1-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/metrics-otel-v1-index-template.json
new file mode 100644
index 0000000000..812fbbd7d9
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/metrics-otel-v1-index-template.json
@@ -0,0 +1,248 @@
+{
+  "version": 1,
+  "mappings": {
+    "date_detection": false,
+    "dynamic_templates": [
+      {
+        "resources_map": {
+          "mapping": {
+            "type": "keyword"
+          },
+          "path_match": "resource.*"
+        }
+      }
+      ],
+    "_source": {
+      "enabled": true
+    },
+    "properties": {
+      "name": {
+        "type": "text",
+        "fields": {
+          "keyword": {
+            "type": "keyword",
+            "ignore_above": 256
+          }
+        }
+      },
+      "attributes": {
+        "type": "object",
+        "properties": {
+          "data_stream": {
+            "properties": {
+              "dataset": {
+                "ignore_above": 128,
+                "type": "keyword"
+              },
+              "namespace": {
+                "ignore_above": 128,
+                "type": "keyword"
+              },
+              "type": {
+                "ignore_above": 56,
+                "type": "keyword"
+              }
+            }
+          }
+        }
+      },
+      "description": {
+        "type": "text",
+        "fields": {
+          "keyword": {
+            "type": "keyword",
+            "ignore_above": 256
+          }
+        }
+      },
+      "unit": {
+        "type": "keyword",
+        "ignore_above": 128
+      },
+      "kind": {
+        "type": "keyword",
+        "ignore_above": 128
+      },
+      "aggregationTemporality": {
+        "type": "keyword",
+        "ignore_above": 128
+      },
+      "monotonic": {
+        "type": "boolean"
+      },
+      "startTime": {
+        "type": "date"
+      },
+      "@timestamp": {
+        "type": "date"
+      },
+      "time": {
+        "type": "date_nanos"
+      },
+      "observedTimestamp": {
+        "type": "date_nanos"
+      },
+      "value@int": {
+        "type": "integer"
+      },
+      "value@double": {
+        "type": "double"
+      },
+      "value": {
+        "type": "double"
+      },
+      "buckets": {
+        "type" : "nested",
+        "properties": {
+          "count": {
+            "type": "long"
+          },
+          "sum": {
+            "type": "double"
+          },
+          "max": {
+            "type": "float"
+          },
+          "min": {
+            "type": "float"
+          }
+        }
+      },
+      "bucketCount": {
+        "type": "long"
+      },
+      "bucketCountsList": {
+        "type": "long"
+      },
+      "explicitBoundsList": {
+        "type": "float"
+      },
+      "explicitBoundsCount": {
+        "type": "float"
+      },
+      "quantiles": {
+        "properties": {
+          "quantile": {
+            "type": "double"
+          },
+          "value": {
+            "type": "double"
+          }
+        }
+      },
+      "quantileValuesCount": {
+        "type": "long"
+      },
+      "positiveBuckets": {
+        "type" : "nested",
+        "properties": {
+          "count": {
+            "type": "long"
+          },
+          "max": {
+            "type": "float"
+          },
+          "min": {
+            "type": "float"
+          }
+        }
+      },
+      "negativeBuckets": {
+        "type" : "nested",
+        "properties": {
+          "count": {
+            "type": "long"
+          },
+          "max": {
+            "type": "float"
+          },
+          "min": {
+            "type": "float"
+          }
+        }
+      },
+      "negativeOffset": {
+        "type": "integer"
+      },
+      "positiveOffset": {
+        "type": "integer"
+      },
+      "zeroCount": {
+        "type": "long"
+      },
+      "scale": {
+        "type": "long"
+      },
+      "max": {
+        "type": "float"
+      },
+      "min": {
+        "type": "float"
+      },
+      "sum": {
+        "type": "float"
+      },
+      "count": {
+        "type": "long"
+      },
+      "exemplar": {
+        "properties": {
+          "time": {
+            "type": "date_nanos"
+          },
+          "traceId": {
+            "ignore_above": 256,
+            "type": "keyword"
+          },
+          "spanId": {
+            "ignore_above": 256,
+            "type": "keyword"
+          }
+        }
+      },
+      "instrumentationScope": {
+        "properties": {
+          "name": {
+            "type": "text",
+            "fields": {
+              "keyword": {
+                "type": "keyword",
+                "ignore_above": 128
+              }
+            }
+          },
+          "version": {
+            "type": "text",
+            "fields": {
+              "keyword": {
+                "type": "keyword",
+                "ignore_above": 256
+              }
+            }
+          },
+          "droppedAttributesCount": {
+            "type": "integer"
+          },
+          "schemaUrl": {
+            "type": "text",
+            "fields": {
+              "keyword": {
+                "type": "keyword",
+                "ignore_above": 256
+              }
+            }
+          }
+        }
+      },
+      "schemaUrl": {
+        "type": "text",
+        "fields": {
+          "keyword": {
+            "type": "keyword",
+            "ignore_above": 256
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/data-prepper-plugins/opensearch/src/main/resources/metrics-policy-no-ism-template.json b/data-prepper-plugins/opensearch/src/main/resources/metrics-policy-no-ism-template.json
new file mode 100644
index 0000000000..1eb4d012f7
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/metrics-policy-no-ism-template.json
@@ -0,0 +1,19 @@
+{
+  "policy": {
+    "description": "Managing metrics for metric analytics",
+    "default_state": "current_write_index",
+    "states": [
+      {
+        "name": "current_write_index",
+        "actions": [
+          {
+            "rollover": {
+              "min_size": "50gb",
+              "min_index_age": "24h"
+            }
+          }
+        ]
+      }
+    ]
+  }
+}
diff --git a/data-prepper-plugins/opensearch/src/main/resources/metrics-policy-with-ism-template.json b/data-prepper-plugins/opensearch/src/main/resources/metrics-policy-with-ism-template.json
new file mode 100644
index 0000000000..ea432fdbf1
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/resources/metrics-policy-with-ism-template.json
@@ -0,0 +1,22 @@
+{
+  "policy": {
+    "description": "Managing metrics for metric analytics",
+    "default_state": "current_write_index",
+    "states": [
+      {
+        "name": "current_write_index",
+        "actions": [
+          {
+            "rollover": {
+              "min_size": "50gb",
+              "min_index_age": "24h"
+            }
+          }
+        ]
+      }
+    ],
+    "ism_template": {
+      "index_patterns": ["metrics-otel-v1-*"]
+    }
+  }
+}
diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java
index 591e3d1311..c6a5fc56d6 100644
--- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java
+++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java
@@ -27,11 +27,13 @@ public void getByValue() {
         assertEquals(Optional.of(IndexType.MANAGEMENT_DISABLED), IndexType.getByValue("management_disabled"));
         assertEquals(Optional.of(IndexType.TRACE_ANALYTICS_RAW), IndexType.getByValue("trace-analytics-raw"));
         assertEquals(Optional.of(IndexType.TRACE_ANALYTICS_SERVICE_MAP), IndexType.getByValue("trace-analytics-service-map"));
+        assertEquals(Optional.of(IndexType.LOG_ANALYTICS), IndexType.getByValue("log-analytics"));
+        assertEquals(Optional.of(IndexType.METRIC_ANALYTICS), IndexType.getByValue("metric-analytics"));
     }
 
     @Test
     public void getIndexTypeValues() {
-        assertEquals("[trace-analytics-raw, trace-analytics-service-map, custom, management_disabled]", IndexType.getIndexTypeValues());
+        assertEquals("[trace-analytics-raw, trace-analytics-service-map, log-analytics, metric-analytics, custom, management_disabled]", IndexType.getIndexTypeValues());
     }
 
     @ParameterizedTest