From bc504fd1a7842a8832bda5e70f40c847944fd668 Mon Sep 17 00:00:00 2001
From: kkondaka <41027584+kkondaka@users.noreply.github.com>
Date: Thu, 9 Nov 2023 15:32:41 -0800
Subject: [PATCH] Add support for OTEL metrics source to use Kafka buffer
 (#3539)

* Add support for OTEL metrics source to use Kafka buffer

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Added tests and fixed test failures

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
---
 .../metric/JacksonExponentialHistogram.java   |   4 +-
 .../model/metric/JacksonHistogram.java        |   2 +-
 .../model/metric/JacksonMetric.java           |   2 +-
 .../otelmetrics/OTelMetricsRawProcessor.java  | 286 ++++--------------
 .../OtelMetricsRawProcessorConfig.java        |   4 +-
 .../otelmetrics/MetricsPluginSumTest.java     |  15 +-
 .../otelmetrics/MetricsPluginSummaryTest.java |   8 +-
 .../otel-metrics-source/build.gradle          |   1 +
 .../otelmetrics/OTelMetricsGrpcService.java   |   6 +-
 .../source/otelmetrics/OTelMetricsSource.java |   9 +
 .../plugins/otel/codec/OTelMetricDecoder.java |  39 +++
 .../plugins/otel/codec/OTelProtoCodec.java    | 276 +++++++++++++++++
 .../otel/codec/OTelProtoCodecTest.java        |  85 +++++-
 .../test/resources/test-gauge-metrics.json    |  44 +++
 .../resources/test-histogram-metrics.json     |  50 +++
 .../src/test/resources/test-sum-metrics.json  |  45 +++
 16 files changed, 626 insertions(+), 250 deletions(-)
 create mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java
 create mode 100644 data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json
 create mode 100644 data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json
 create mode 100644 data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json

diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java
index b52c850e46..b865ce0eb5 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java
@@ -27,8 +27,8 @@ public class JacksonExponentialHistogram extends JacksonMetric implements Expone
     private static final String SCALE_KEY = "scale";
     private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality";
     private static final String ZERO_COUNT_KEY = "zeroCount";
-    private static final String POSITIVE_BUCKETS_KEY = "positiveBuckets";
-    private static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets";
+    public static final String POSITIVE_BUCKETS_KEY = "positiveBuckets";
+    public static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets";
     private static final String NEGATIVE_KEY = "negative";
     private static final String POSITIVE_KEY = "positive";
     private static final String NEGATIVE_OFFSET_KEY = "negativeOffset";
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java
index 0209f7012d..f9e066875d 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java
@@ -29,7 +29,7 @@ public class JacksonHistogram extends JacksonMetric implements Histogram {
     private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality";
     private static final String BUCKET_COUNTS_KEY = "bucketCounts";
     private static final String EXPLICIT_BOUNDS_COUNT_KEY = "explicitBoundsCount";
-    private static final String BUCKETS_KEY = "buckets";
+    public static final String BUCKETS_KEY = "buckets";
     private static final String BUCKET_COUNTS_LIST_KEY = "bucketCountsList";
     private static final String EXPLICIT_BOUNDS_KEY = "explicitBounds";
 
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java
index 0ab81ed7e0..8d8ebf0f87 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java
@@ -28,7 +28,7 @@ public abstract class JacksonMetric extends JacksonEvent implements Metric {
     protected static final String SERVICE_NAME_KEY = "serviceName";
     protected static final String KIND_KEY = "kind";
     protected static final String UNIT_KEY = "unit";
-    protected static final String ATTRIBUTES_KEY = "attributes";
+    public static final String ATTRIBUTES_KEY = "attributes";
     protected static final String SCHEMA_URL_KEY = "schemaUrl";
     protected static final String EXEMPLARS_KEY = "exemplars";
     protected static final String FLAGS_KEY = "flags";
diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java
index e180e48ac3..679eef3224 100644
--- a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java
+++ b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java
@@ -6,35 +6,32 @@
 package org.opensearch.dataprepper.plugins.processor.otelmetrics;
 
 import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
-import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
-import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
-import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
 import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
 import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
 import org.opensearch.dataprepper.model.configuration.PluginSetting;
-import org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram;
-import org.opensearch.dataprepper.model.metric.JacksonGauge;
-import org.opensearch.dataprepper.model.metric.JacksonHistogram;
-import org.opensearch.dataprepper.model.metric.JacksonSum;
-import org.opensearch.dataprepper.model.metric.JacksonSummary;
+import static org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram.POSITIVE_BUCKETS_KEY;
+import static org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram.NEGATIVE_BUCKETS_KEY;
+import static org.opensearch.dataprepper.model.metric.JacksonHistogram.BUCKETS_KEY;
 import org.opensearch.dataprepper.model.metric.Metric;
+import static org.opensearch.dataprepper.model.metric.JacksonMetric.ATTRIBUTES_KEY;
 import org.opensearch.dataprepper.model.processor.AbstractProcessor;
 import org.opensearch.dataprepper.model.processor.Processor;
 import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.model.event.Event;
 import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
 import io.micrometer.core.instrument.Counter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 @DataPrepperPlugin(name = "otel_metrics", deprecatedName = "otel_metrics_raw_processor", pluginType = Processor.class, pluginConfigurationType = OtelMetricsRawProcessorConfig.class)
-public class OTelMetricsRawProcessor extends AbstractProcessor<Record<ExportMetricsServiceRequest>, Record<? extends Metric>> {
+public class OTelMetricsRawProcessor extends AbstractProcessor<Record<?>, Record<? extends Metric>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsRawProcessor.class);
     public static final String RECORDS_DROPPED_METRICS_RAW = "recordsDroppedMetricsRaw";
@@ -52,240 +49,61 @@ public OTelMetricsRawProcessor(PluginSetting pluginSetting, final OtelMetricsRaw
         this.flattenAttributesFlag = otelMetricsRawProcessorConfig.getFlattenAttributesFlag();
     }
 
-    @Override
-    public Collection<Record<? extends Metric>> doExecute(Collection<Record<ExportMetricsServiceRequest>> records) {
-        Collection<Record<? extends Metric>> recordsOut = new ArrayList<>();
-        for (Record<ExportMetricsServiceRequest> ets : records) {
-            for (ResourceMetrics rs : ets.getData().getResourceMetricsList()) {
-                final String schemaUrl = rs.getSchemaUrl();
-                final Map<String, Object> resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource());
-                final String serviceName = OTelProtoCodec.getServiceName(rs.getResource()).orElse(null);
+    private void modifyRecord(Record<? extends Metric> record,
+                              boolean flattenAttributes,
+                              boolean calcualteHistogramBuckets,
+                              boolean calcualteExponentialHistogramBuckets) {
+        Event event = (Event)record.getData();
 
-                for (InstrumentationLibraryMetrics is : rs.getInstrumentationLibraryMetricsList()) {
-                    final Map<String, Object> ils = OTelProtoCodec.getInstrumentationLibraryAttributes(is.getInstrumentationLibrary());
-                    recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl));
-                }
+        if (flattenAttributes) {
+            Map<String, Object> attributes = event.get(ATTRIBUTES_KEY, Map.class);
 
-                for (ScopeMetrics sm : rs.getScopeMetricsList()) {
-                    final Map<String, Object> ils = OTelProtoCodec.getInstrumentationScopeAttributes(sm.getScope());
-                    recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl));
-                }
+            for (Map.Entry<String, Object> entry : attributes.entrySet()) {
+                event.put(entry.getKey(), entry.getValue());
             }
+            event.delete(ATTRIBUTES_KEY);
         }
-        return recordsOut;
-    }
-
-    private List<? extends Record<? extends Metric>> processMetricsList(final List<io.opentelemetry.proto.metrics.v1.Metric> metricsList,
-                                                                        final String serviceName,
-                                                                        final Map<String, Object> ils,
-                                                                        final Map<String, Object> resourceAttributes,
-                                                                        final String schemaUrl) {
-        List<Record<? extends Metric>> recordsOut = new ArrayList<>();
-        for (io.opentelemetry.proto.metrics.v1.Metric metric : metricsList) {
-            try {
-                if (metric.hasGauge()) {
-                    recordsOut.addAll(mapGauge(metric, serviceName, ils, resourceAttributes, schemaUrl));
-                } else if (metric.hasSum()) {
-                    recordsOut.addAll(mapSum(metric, serviceName, ils, resourceAttributes, schemaUrl));
-                } else if (metric.hasSummary()) {
-                    recordsOut.addAll(mapSummary(metric, serviceName, ils, resourceAttributes, schemaUrl));
-                } else if (metric.hasHistogram()) {
-                    recordsOut.addAll(mapHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl));
-                } else if (metric.hasExponentialHistogram()) {
-                    recordsOut.addAll(mapExponentialHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl));
-                }
-            } catch (Exception e) {
-                LOG.warn("Error while processing metrics", e);
-                recordsDroppedMetricsRawCounter.increment();
+        if (!calcualteHistogramBuckets && event.get(BUCKETS_KEY, List.class) != null) {
+            event.delete(BUCKETS_KEY);
+        }
+        if (!calcualteExponentialHistogramBuckets) {
+            if (event.get(POSITIVE_BUCKETS_KEY, List.class) != null) {
+                event.delete(POSITIVE_BUCKETS_KEY);
+            }
+            if (event.get(NEGATIVE_BUCKETS_KEY, List.class) != null) {
+                event.delete(NEGATIVE_BUCKETS_KEY);
             }
         }
-        return recordsOut;
-    }
-
-    private List<? extends Record<? extends Metric>> mapGauge(io.opentelemetry.proto.metrics.v1.Metric metric,
-                                         String serviceName,
-                                         final Map<String, Object> ils,
-                                         final Map<String, Object> resourceAttributes,
-                                         final String schemaUrl) {
-        return metric.getGauge().getDataPointsList().stream()
-                .map(dp -> JacksonGauge.builder()
-                        .withUnit(metric.getUnit())
-                        .withName(metric.getName())
-                        .withDescription(metric.getDescription())
-                        .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp))
-                        .withTime(OTelProtoCodec.getTimeISO8601(dp))
-                        .withServiceName(serviceName)
-                        .withValue(OTelProtoCodec.getValueAsDouble(dp))
-                        .withAttributes(OTelProtoCodec.mergeAllAttributes(
-                                Arrays.asList(
-                                        OTelProtoCodec.convertKeysOfDataPointAttributes(dp),
-                                        resourceAttributes,
-                                        ils
-                                )
-                        ))
-                        .withSchemaUrl(schemaUrl)
-                        .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
-                        .withFlags(dp.getFlags())
-                        .build(flattenAttributesFlag))
-                .map(Record::new)
-                .collect(Collectors.toList());
     }
 
-    private List<? extends Record<? extends Metric>> mapSum(final io.opentelemetry.proto.metrics.v1.Metric metric,
-                                     final String serviceName,
-                                     final Map<String, Object> ils,
-                                     final Map<String, Object> resourceAttributes,
-                                     final String schemaUrl) {
-        return metric.getSum().getDataPointsList().stream()
-                .map(dp -> JacksonSum.builder()
-                        .withUnit(metric.getUnit())
-                        .withName(metric.getName())
-                        .withDescription(metric.getDescription())
-                        .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp))
-                        .withTime(OTelProtoCodec.getTimeISO8601(dp))
-                        .withServiceName(serviceName)
-                        .withIsMonotonic(metric.getSum().getIsMonotonic())
-                        .withValue(OTelProtoCodec.getValueAsDouble(dp))
-                        .withAggregationTemporality(metric.getSum().getAggregationTemporality().toString())
-                        .withAttributes(OTelProtoCodec.mergeAllAttributes(
-                                Arrays.asList(
-                                        OTelProtoCodec.convertKeysOfDataPointAttributes(dp),
-                                        resourceAttributes,
-                                        ils
-                                )
-                        ))
-                        .withSchemaUrl(schemaUrl)
-                        .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
-                        .withFlags(dp.getFlags())
-                        .build(flattenAttributesFlag))
-                .map(Record::new)
-                .collect(Collectors.toList());
-    }
-
-    private List<? extends Record<? extends Metric>> mapSummary(final io.opentelemetry.proto.metrics.v1.Metric metric,
-                                             final String serviceName,
-                                             final Map<String, Object> ils,
-                                             final Map<String, Object> resourceAttributes,
-                                             final String schemaUrl) {
-        return metric.getSummary().getDataPointsList().stream()
-                .map(dp -> JacksonSummary.builder()
-                        .withUnit(metric.getUnit())
-                        .withName(metric.getName())
-                        .withDescription(metric.getDescription())
-                        .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano()))
-                        .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano()))
-                        .withServiceName(serviceName)
-                        .withCount(dp.getCount())
-                        .withSum(dp.getSum())
-                        .withQuantiles(OTelProtoCodec.getQuantileValues(dp.getQuantileValuesList()))
-                        .withQuantilesValueCount(dp.getQuantileValuesCount())
-                        .withAttributes(OTelProtoCodec.mergeAllAttributes(
-                                Arrays.asList(
-                                        OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()),
-                                        resourceAttributes,
-                                        ils
-                                )
-                        ))
-                        .withSchemaUrl(schemaUrl)
-                        .withFlags(dp.getFlags())
-                        .build(flattenAttributesFlag))
-                .map(Record::new)
-                .collect(Collectors.toList());
-    }
-
-    private List<? extends Record<? extends Metric>> mapHistogram(final io.opentelemetry.proto.metrics.v1.Metric metric,
-                                                 final String serviceName,
-                                                 final Map<String, Object> ils,
-                                                 final Map<String, Object> resourceAttributes,
-                                                 final String schemaUrl) {
-        return metric.getHistogram().getDataPointsList().stream()
-                .map(dp -> {
-                    JacksonHistogram.Builder builder = JacksonHistogram.builder()
-                            .withUnit(metric.getUnit())
-                            .withName(metric.getName())
-                            .withDescription(metric.getDescription())
-                            .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano()))
-                            .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano()))
-                            .withServiceName(serviceName)
-                            .withSum(dp.getSum())
-                            .withCount(dp.getCount())
-                            .withBucketCount(dp.getBucketCountsCount())
-                            .withExplicitBoundsCount(dp.getExplicitBoundsCount())
-                            .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString())
-                            .withBucketCountsList(dp.getBucketCountsList())
-                            .withExplicitBoundsList(dp.getExplicitBoundsList())
-                            .withAttributes(OTelProtoCodec.mergeAllAttributes(
-                                    Arrays.asList(
-                                            OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()),
-                                            resourceAttributes,
-                                            ils
-                                    )
-                            ))
-                            .withSchemaUrl(schemaUrl)
-                            .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
-                            .withFlags(dp.getFlags());
-                    if (otelMetricsRawProcessorConfig.getCalculateHistogramBuckets()) {
-                        builder.withBuckets(OTelProtoCodec.createBuckets(dp.getBucketCountsList(), dp.getExplicitBoundsList()));
-                    }
-                    JacksonHistogram jh = builder.build(flattenAttributesFlag);
-                    return jh;
-
-                })
-                .map(Record::new)
-                .collect(Collectors.toList());
-    }
-
-    private List<? extends Record<? extends Metric>> mapExponentialHistogram(io.opentelemetry.proto.metrics.v1.Metric metric, String serviceName, Map<String, Object> ils, Map<String, Object> resourceAttributes, String schemaUrl) {
-        return metric.getExponentialHistogram().getDataPointsList().stream()
-                .filter(dp -> {
-                    if (otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets() &&
-                            otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale() < Math.abs(dp.getScale())){
-                        LOG.error("Exponential histogram can not be processed since its scale of {} is bigger than the configured max of {}.", dp.getScale(), otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale());
-                        return false;
-                    } else {
-                        return true;
-                    }
-                })
-                .map(dp -> {
-                    JacksonExponentialHistogram.Builder builder = JacksonExponentialHistogram.builder()
-                            .withUnit(metric.getUnit())
-                            .withName(metric.getName())
-                            .withDescription(metric.getDescription())
-                            .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano()))
-                            .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano()))
-                            .withServiceName(serviceName)
-                            .withSum(dp.getSum())
-                            .withCount(dp.getCount())
-                            .withZeroCount(dp.getZeroCount())
-                            .withScale(dp.getScale())
-                            .withPositive(dp.getPositive().getBucketCountsList())
-                            .withPositiveOffset(dp.getPositive().getOffset())
-                            .withNegative(dp.getNegative().getBucketCountsList())
-                            .withNegativeOffset(dp.getNegative().getOffset())
-                            .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString())
-                            .withAttributes(OTelProtoCodec.mergeAllAttributes(
-                                    Arrays.asList(
-                                            OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()),
-                                            resourceAttributes,
-                                            ils
-                                    )
-                            ))
-                            .withSchemaUrl(schemaUrl)
-                            .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
-                            .withFlags(dp.getFlags());
+    @Override
+    public Collection<Record<? extends Metric>> doExecute(Collection<Record<?>> records) {
+        Collection<Record<? extends Metric>> recordsOut = new ArrayList<>();
+        OTelProtoCodec.OTelProtoDecoder otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder();
+        AtomicInteger droppedCounter = new AtomicInteger(0);
+
+        for (Record<?> rec : records) {
+            if ((rec.getData() instanceof Event)) {
+                Record<? extends Metric> newRecord = (Record<? extends Metric>)rec;
+                if (otelMetricsRawProcessorConfig.getFlattenAttributesFlag() ||
+                    !otelMetricsRawProcessorConfig.getCalculateHistogramBuckets() ||
+                    !otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()) {
+                    modifyRecord(newRecord, otelMetricsRawProcessorConfig.getFlattenAttributesFlag(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets());
+                }
+                recordsOut.add(newRecord);
+            }
 
-                    if (otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()) {
-                        builder.withPositiveBuckets(OTelProtoCodec.createExponentialBuckets(dp.getPositive(), dp.getScale()));
-                        builder.withNegativeBuckets(OTelProtoCodec.createExponentialBuckets(dp.getNegative(), dp.getScale()));
-                    }
+            if (!(rec.getData() instanceof ExportMetricsServiceRequest)) {
+                continue;
+            }
 
-                    return builder.build(flattenAttributesFlag);
-                })
-                .map(Record::new)
-                .collect(Collectors.toList());
+            ExportMetricsServiceRequest request = ((Record<ExportMetricsServiceRequest>)rec).getData();
+            recordsOut.addAll(otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets(), flattenAttributesFlag));
+        }
+        recordsDroppedMetricsRawCounter.increment(droppedCounter.get());
+        return recordsOut;
     }
 
-
     @Override
     public void prepareForShutdown() {
     }
diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java
index b74460d0f7..9935cc9218 100644
--- a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java
+++ b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java
@@ -4,6 +4,8 @@
  */
 
 package org.opensearch.dataprepper.plugins.processor.otelmetrics;
+
+import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class OtelMetricsRawProcessorConfig {
@@ -15,7 +17,7 @@ public class OtelMetricsRawProcessorConfig {
 
     private Boolean calculateExponentialHistogramBuckets = true;
 
-    private Integer exponentialHistogramMaxAllowedScale = 10;
+    private Integer exponentialHistogramMaxAllowedScale = DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;
 
     public Boolean getCalculateExponentialHistogramBuckets() {
         return calculateExponentialHistogramBuckets;
diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java
index e202219ae1..9c6341f5da 100644
--- a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java
+++ b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java
@@ -24,11 +24,12 @@
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opensearch.dataprepper.model.configuration.PluginSetting;
-import org.opensearch.dataprepper.model.event.Event;
 import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.model.metric.JacksonMetric;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -102,8 +103,9 @@ public void test() throws JsonProcessingException {
 
         Record record = new Record<>(exportMetricRequest);
 
-        List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
-        Record<Event> firstRecord = rec.get(0);
+        Collection<Record<?>> records = Arrays.asList((Record<?>)record);
+        List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
+        Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);
 
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
@@ -182,8 +184,11 @@ public void missingNameInvalidMetricTest() throws JsonProcessingException {
         Record record = new Record<>(exportMetricRequest);
         Record invalidRecord = new Record<>(exportMetricRequestWithInvalidMetric);
 
-        List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
-        org.hamcrest.MatcherAssert.assertThat(rec.size(), equalTo(1));
+        Collection<Record<?>> records = Arrays.asList((Record<?>)record, invalidRecord);
+        List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
+
+        //List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
+        org.hamcrest.MatcherAssert.assertThat(outputRecords.size(), equalTo(1));
     }
 
     private void assertSumProcessing(Map<String, Object> map) {
diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java
index d915a48e37..234765e740 100644
--- a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java
+++ b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java
@@ -21,11 +21,12 @@
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opensearch.dataprepper.model.configuration.PluginSetting;
-import org.opensearch.dataprepper.model.event.Event;
 import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.model.metric.JacksonMetric;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -83,8 +84,9 @@ public void testSummaryProcessing() throws JsonProcessingException {
 
         Record record = new Record<>(exportMetricRequest);
 
-        List<Record<Event>> rec =  (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
-        Record<Event> firstRecord = rec.get(0);
+        Collection<Record<?>> records = Arrays.asList((Record<?>)record);
+        List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
+        Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);
 
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
diff --git a/data-prepper-plugins/otel-metrics-source/build.gradle b/data-prepper-plugins/otel-metrics-source/build.gradle
index abe038c645..6372395a81 100644
--- a/data-prepper-plugins/otel-metrics-source/build.gradle
+++ b/data-prepper-plugins/otel-metrics-source/build.gradle
@@ -13,6 +13,7 @@ dependencies {
     implementation project(':data-prepper-plugins:blocking-buffer')
     implementation libs.commons.codec
     implementation project(':data-prepper-plugins:armeria-common')
+    implementation project(':data-prepper-plugins:otel-proto-common')
     testImplementation project(':data-prepper-api').sourceSets.test.output
     implementation libs.opentelemetry.proto
     implementation libs.commons.io
diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java
index 8da1ad63f7..0177a57584 100644
--- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java
+++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java
@@ -70,7 +70,11 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv
 
     private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
         try {
-            buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
+            if (buffer.isByteBuffer()) {
+                buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
+            } else {
+                buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
+            }
         } catch (Exception e) {
             if (ServiceRequestContext.current().isTimedOut()) {
                 LOG.warn("Exception writing to buffer but request already timed out.", e);
diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java
index fcfd9524d9..33c4023e67 100644
--- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java
+++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java
@@ -32,6 +32,8 @@
 import org.opensearch.dataprepper.model.plugin.PluginFactory;
 import org.opensearch.dataprepper.model.record.Record;
 import org.opensearch.dataprepper.model.source.Source;
+import org.opensearch.dataprepper.model.codec.ByteDecoder;
+import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
 import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
 import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
 import org.opensearch.dataprepper.plugins.health.HealthGrpcService;
@@ -62,6 +64,7 @@ public class OTelMetricsSource implements Source<Record<ExportMetricsServiceRequ
     private final CertificateProviderFactory certificateProviderFactory;
     private final GrpcRequestExceptionHandler requestExceptionHandler;
     private Server server;
+    private final ByteDecoder byteDecoder;
 
     @DataPrepperPluginConstructor
     public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics,
@@ -79,6 +82,12 @@ public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig,
         this.pipelineName = pipelineDescription.getPipelineName();
         this.authenticationProvider = createAuthenticationProvider(pluginFactory);
         this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
+        this.byteDecoder = new OTelMetricDecoder();
+    }
+
+    @Override
+    public ByteDecoder getDecoder() {
+        return byteDecoder;
     }
 
     @Override
diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java
new file mode 100644
index 0000000000..bdb51cada1
--- /dev/null
+++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.otel.codec;
+
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import org.opensearch.dataprepper.model.codec.ByteDecoder;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.event.JacksonEvent;
+import org.opensearch.dataprepper.model.metric.Metric;
+
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.Consumer;
+
+
+public class OTelMetricDecoder implements ByteDecoder {
+    private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
+    public OTelMetricDecoder() {
+        otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder();
+    }
+    public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
+        ExportMetricsServiceRequest request = ExportMetricsServiceRequest.parseFrom(inputStream);
+        AtomicInteger droppedCounter = new AtomicInteger(0);
+        Collection<Record<? extends Metric>> records =
+            otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false);
+        for (Record<? extends Metric> record: records) {
+            final JacksonEvent event = JacksonEvent.fromEvent(record.getData());
+            eventConsumer.accept(new Record<>(event));
+        }
+    }
+
+}
diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java
index 16f596c989..dba17b0851 100644
--- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java
+++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java
@@ -8,6 +8,7 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.ByteString;
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
 import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
 import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
 import io.opentelemetry.proto.common.v1.AnyValue;
@@ -19,6 +20,9 @@
 import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
 import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
 import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
+import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
 import io.opentelemetry.proto.resource.v1.Resource;
 import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans;
 import io.opentelemetry.proto.trace.v1.ResourceSpans;
@@ -26,6 +30,7 @@
 import io.opentelemetry.proto.trace.v1.Status;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
+import org.opensearch.dataprepper.model.record.Record;
 import org.opensearch.dataprepper.model.log.JacksonOtelLog;
 import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
 import org.opensearch.dataprepper.model.metric.Bucket;
@@ -34,6 +39,12 @@
 import org.opensearch.dataprepper.model.metric.DefaultQuantile;
 import org.opensearch.dataprepper.model.metric.Exemplar;
 import org.opensearch.dataprepper.model.metric.Quantile;
+import org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram;
+import org.opensearch.dataprepper.model.metric.JacksonGauge;
+import org.opensearch.dataprepper.model.metric.JacksonHistogram;
+import org.opensearch.dataprepper.model.metric.JacksonSum;
+import org.opensearch.dataprepper.model.metric.JacksonSummary;
+import org.opensearch.dataprepper.model.metric.Metric;
 import org.opensearch.dataprepper.model.trace.DefaultLink;
 import org.opensearch.dataprepper.model.trace.DefaultSpanEvent;
 import org.slf4j.Logger;
@@ -57,6 +68,7 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -67,6 +79,7 @@
 public class OTelProtoCodec {
 
     private static final Logger LOG = LoggerFactory.getLogger(OTelProtoCodec.class);
+    public static final int DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE = 10;
 
     private static final ObjectMapper OBJECT_MAPPER =  new ObjectMapper();
     private static final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
@@ -147,6 +160,7 @@ public static long timeISO8601ToNanos(final String timeISO08601) {
     }
 
     public static class OTelProtoDecoder {
+
         public List<Span> parseExportTraceServiceRequest(final ExportTraceServiceRequest exportTraceServiceRequest) {
             return exportTraceServiceRequest.getResourceSpansList().stream()
                     .flatMap(rs -> parseResourceSpans(rs).stream()).collect(Collectors.toList());
@@ -445,6 +459,268 @@ protected Optional<String> getServiceName(final Resource resource) {
                             && !keyValue.getValue().getStringValue().isEmpty()
             ).findFirst().map(i -> i.getValue().getStringValue());
         }
+
+        public Collection<Record<? extends Metric>> parseExportMetricsServiceRequest(
+                            final ExportMetricsServiceRequest request,
+                            AtomicInteger droppedCounter,
+                            final Integer exponentialHistogramMaxAllowedScale,
+                            final boolean calculateHistogramBuckets,
+                            final boolean calculateExponentialHistogramBuckets,
+                            final boolean flattenAttributes) {
+            Collection<Record<? extends Metric>> recordsOut = new ArrayList<>();
+            for (ResourceMetrics rs : request.getResourceMetricsList()) {
+                final String schemaUrl = rs.getSchemaUrl();
+                final Map<String, Object> resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource());
+                final String serviceName = OTelProtoCodec.getServiceName(rs.getResource()).orElse(null);
+
+                for (InstrumentationLibraryMetrics is : rs.getInstrumentationLibraryMetricsList()) {
+                    final Map<String, Object> ils = OTelProtoCodec.getInstrumentationLibraryAttributes(is.getInstrumentationLibrary());
+                    recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes));
+                }
+
+                for (ScopeMetrics sm : rs.getScopeMetricsList()) {
+                    final Map<String, Object> ils = OTelProtoCodec.getInstrumentationScopeAttributes(sm.getScope());
+                    recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes));
+                }
+            }
+            return recordsOut;
+        }
+
+        private List<? extends Record<? extends Metric>> processMetricsList(
+                    final List<io.opentelemetry.proto.metrics.v1.Metric> metricsList,
+                    final String serviceName,
+                    final Map<String, Object> ils,
+                    final Map<String, Object> resourceAttributes,
+                    final String schemaUrl,
+                    AtomicInteger droppedCounter,
+                    final Integer exponentialHistogramMaxAllowedScale,
+                    final boolean calculateHistogramBuckets,
+                    final boolean calculateExponentialHistogramBuckets,
+                    final boolean flattenAttributes) {
+            List<Record<? extends Metric>> recordsOut = new ArrayList<>();
+            for (io.opentelemetry.proto.metrics.v1.Metric metric : metricsList) {
+                try {
+                    if (metric.hasGauge()) {
+                        recordsOut.addAll(mapGauge(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes));
+                    } else if (metric.hasSum()) {
+                        recordsOut.addAll(mapSum(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes));
+                    } else if (metric.hasSummary()) {
+                        recordsOut.addAll(mapSummary(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes));
+                    } else if (metric.hasHistogram()) {
+                        recordsOut.addAll(mapHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, calculateHistogramBuckets, flattenAttributes));
+                    } else if (metric.hasExponentialHistogram()) {
+                        recordsOut.addAll(mapExponentialHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, exponentialHistogramMaxAllowedScale, calculateExponentialHistogramBuckets, flattenAttributes));
+                    }
+                } catch (Exception e) {
+                    LOG.warn("Error while processing metrics", e);
+                    droppedCounter.incrementAndGet();
+                }
+            }
+            return recordsOut;
+        }
+
+        private List<? extends Record<? extends Metric>> mapGauge(
+                                         io.opentelemetry.proto.metrics.v1.Metric metric,
+                                         String serviceName,
+                                         final Map<String, Object> ils,
+                                         final Map<String, Object> resourceAttributes,
+                                         final String schemaUrl,
+                                         final boolean flattenAttributes) {
+            return metric.getGauge().getDataPointsList().stream()
+                .map(dp -> JacksonGauge.builder()
+                        .withUnit(metric.getUnit())
+                        .withName(metric.getName())
+                        .withDescription(metric.getDescription())
+                        .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp))
+                        .withTime(OTelProtoCodec.getTimeISO8601(dp))
+                        .withServiceName(serviceName)
+                        .withValue(OTelProtoCodec.getValueAsDouble(dp))
+                        .withAttributes(OTelProtoCodec.mergeAllAttributes(
+                                Arrays.asList(
+                                        OTelProtoCodec.convertKeysOfDataPointAttributes(dp),
+                                        resourceAttributes,
+                                        ils
+                                )
+                        ))
+                        .withSchemaUrl(schemaUrl)
+                        .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
+                        .withFlags(dp.getFlags())
+                        .build(flattenAttributes))
+                .map(Record::new)
+                .collect(Collectors.toList());
+        }
+
+        private List<? extends Record<? extends Metric>> mapSum(
+                                     final io.opentelemetry.proto.metrics.v1.Metric metric,
+                                     final String serviceName,
+                                     final Map<String, Object> ils,
+                                     final Map<String, Object> resourceAttributes,
+                                     final String schemaUrl,
+                                     final boolean flattenAttributes) {
+            return metric.getSum().getDataPointsList().stream()
+                .map(dp -> JacksonSum.builder()
+                        .withUnit(metric.getUnit())
+                        .withName(metric.getName())
+                        .withDescription(metric.getDescription())
+                        .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp))
+                        .withTime(OTelProtoCodec.getTimeISO8601(dp))
+                        .withServiceName(serviceName)
+                        .withIsMonotonic(metric.getSum().getIsMonotonic())
+                        .withValue(OTelProtoCodec.getValueAsDouble(dp))
+                        .withAggregationTemporality(metric.getSum().getAggregationTemporality().toString())
+                        .withAttributes(OTelProtoCodec.mergeAllAttributes(
+                                Arrays.asList(
+                                        OTelProtoCodec.convertKeysOfDataPointAttributes(dp),
+                                        resourceAttributes,
+                                        ils
+                                )
+                        ))
+                        .withSchemaUrl(schemaUrl)
+                        .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
+                        .withFlags(dp.getFlags())
+                        .build(flattenAttributes))
+                .map(Record::new)
+                .collect(Collectors.toList());
+        }
+
+        private List<? extends Record<? extends Metric>> mapSummary(
+                                             final io.opentelemetry.proto.metrics.v1.Metric metric,
+                                             final String serviceName,
+                                             final Map<String, Object> ils,
+                                             final Map<String, Object> resourceAttributes,
+                                             final String schemaUrl,
+                                             final boolean flattenAttributes) {
+            return metric.getSummary().getDataPointsList().stream()
+                .map(dp -> JacksonSummary.builder()
+                        .withUnit(metric.getUnit())
+                        .withName(metric.getName())
+                        .withDescription(metric.getDescription())
+                        .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano()))
+                        .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano()))
+                        .withServiceName(serviceName)
+                        .withCount(dp.getCount())
+                        .withSum(dp.getSum())
+                        .withQuantiles(OTelProtoCodec.getQuantileValues(dp.getQuantileValuesList()))
+                        .withQuantilesValueCount(dp.getQuantileValuesCount())
+                        .withAttributes(OTelProtoCodec.mergeAllAttributes(
+                                Arrays.asList(
+                                        OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()),
+                                        resourceAttributes,
+                                        ils
+                                )
+                        ))
+                        .withSchemaUrl(schemaUrl)
+                        .withFlags(dp.getFlags())
+                        .build(flattenAttributes))
+                .map(Record::new)
+                .collect(Collectors.toList());
+        }
+
+        private List<? extends Record<? extends Metric>> mapHistogram(
+                                                 final io.opentelemetry.proto.metrics.v1.Metric metric,
+                                                 final String serviceName,
+                                                 final Map<String, Object> ils,
+                                                 final Map<String, Object> resourceAttributes,
+                                                 final String schemaUrl,
+                                                 final boolean calculateHistogramBuckets,
+                                                 final boolean flattenAttributes) {
+            return metric.getHistogram().getDataPointsList().stream()
+                .map(dp -> {
+                    JacksonHistogram.Builder builder = JacksonHistogram.builder()
+                            .withUnit(metric.getUnit())
+                            .withName(metric.getName())
+                            .withDescription(metric.getDescription())
+                            .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano()))
+                            .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano()))
+                            .withServiceName(serviceName)
+                            .withSum(dp.getSum())
+                            .withCount(dp.getCount())
+                            .withBucketCount(dp.getBucketCountsCount())
+                            .withExplicitBoundsCount(dp.getExplicitBoundsCount())
+                            .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString())
+                            .withBucketCountsList(dp.getBucketCountsList())
+                            .withExplicitBoundsList(dp.getExplicitBoundsList())
+                            .withAttributes(OTelProtoCodec.mergeAllAttributes(
+                                    Arrays.asList(
+                                            OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()),
+                                            resourceAttributes,
+                                            ils
+                                    )
+                            ))
+                            .withSchemaUrl(schemaUrl)
+                            .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
+                            .withFlags(dp.getFlags());
+                    if (calculateHistogramBuckets) {
+                        builder.withBuckets(OTelProtoCodec.createBuckets(dp.getBucketCountsList(), dp.getExplicitBoundsList()));
+                    }
+                    JacksonHistogram jh = builder.build(flattenAttributes);
+                    return jh;
+
+                })
+                .map(Record::new)
+                .collect(Collectors.toList());
+        }
+
+        private List<? extends Record<? extends Metric>> mapExponentialHistogram(
+                                            final io.opentelemetry.proto.metrics.v1.Metric metric,
+                                            final String serviceName,
+                                            final Map<String, Object> ils,
+                                            final Map<String, Object> resourceAttributes,
+                                            final String schemaUrl,
+                                            final Integer exponentialHistogramMaxAllowedScale,
+                                            final boolean calculateExponentialHistogramBuckets,
+                                            final boolean flattenAttributes) {
+            return metric.getExponentialHistogram()
+                .getDataPointsList()
+                .stream()
+                .filter(dp -> {
+                    if (calculateExponentialHistogramBuckets &&
+                            exponentialHistogramMaxAllowedScale < Math.abs(dp.getScale())){
+                        LOG.error("Exponential histogram can not be processed since its scale of {} is bigger than the configured max of {}.", dp.getScale(), exponentialHistogramMaxAllowedScale);
+                        return false;
+                    } else {
+                        return true;
+                    }
+                })
+                .map(dp -> {
+                    JacksonExponentialHistogram.Builder builder = JacksonExponentialHistogram.builder()
+                            .withUnit(metric.getUnit())
+                            .withName(metric.getName())
+                            .withDescription(metric.getDescription())
+                            .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano()))
+                            .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano()))
+                            .withServiceName(serviceName)
+                            .withSum(dp.getSum())
+                            .withCount(dp.getCount())
+                            .withZeroCount(dp.getZeroCount())
+                            .withScale(dp.getScale())
+                            .withPositive(dp.getPositive().getBucketCountsList())
+                            .withPositiveOffset(dp.getPositive().getOffset())
+                            .withNegative(dp.getNegative().getBucketCountsList())
+                            .withNegativeOffset(dp.getNegative().getOffset())
+                            .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString())
+                            .withAttributes(OTelProtoCodec.mergeAllAttributes(
+                                    Arrays.asList(
+                                            OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()),
+                                            resourceAttributes,
+                                            ils
+                                    )
+                            ))
+                            .withSchemaUrl(schemaUrl)
+                            .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList()))
+                            .withFlags(dp.getFlags());
+
+                    if (calculateExponentialHistogramBuckets) {
+                        builder.withPositiveBuckets(OTelProtoCodec.createExponentialBuckets(dp.getPositive(), dp.getScale()));
+                        builder.withNegativeBuckets(OTelProtoCodec.createExponentialBuckets(dp.getNegative(), dp.getScale()));
+                    }
+
+                    return builder.build(flattenAttributes);
+                })
+                .map(Record::new)
+                .collect(Collectors.toList());
+        }
+
     }
 
     public static class OTelProtoEncoder {
diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java
index afc4cf2ab3..6c9a167ad5 100644
--- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java
+++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java
@@ -12,6 +12,7 @@
 import com.google.protobuf.util.JsonFormat;
 import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
 import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
 import io.opentelemetry.proto.common.v1.AnyValue;
 import io.opentelemetry.proto.common.v1.ArrayValue;
 import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
@@ -34,6 +35,12 @@
 import org.junit.jupiter.api.Test;
 import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
 import org.opensearch.dataprepper.model.metric.Bucket;
+import org.opensearch.dataprepper.model.metric.Metric;
+import org.opensearch.dataprepper.model.metric.JacksonMetric;
+import org.opensearch.dataprepper.model.metric.JacksonGauge;
+import org.opensearch.dataprepper.model.metric.JacksonSum;
+import org.opensearch.dataprepper.model.metric.JacksonHistogram;
+import org.opensearch.dataprepper.model.record.Record;
 import org.opensearch.dataprepper.model.trace.DefaultLink;
 import org.opensearch.dataprepper.model.trace.DefaultSpanEvent;
 import org.opensearch.dataprepper.model.trace.DefaultTraceGroupFields;
@@ -56,12 +63,14 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.entry;
@@ -84,9 +93,10 @@ public class OTelProtoCodecTest {
     private static final String TEST_REQUEST_BOTH_SPAN_TYPES_JSON_FILE = "test-request-both-span-types.json";
     private static final String TEST_REQUEST_NO_SPANS_JSON_FILE = "test-request-no-spans.json";
     private static final String TEST_SPAN_EVENT_JSON_FILE = "test-span-event.json";
-
+    private static final String TEST_REQUEST_GAUGE_METRICS_JSON_FILE = "test-gauge-metrics.json";
+    private static final String TEST_REQUEST_SUM_METRICS_JSON_FILE = "test-sum-metrics.json";
+    private static final String TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE = "test-histogram-metrics.json";
     private static final String TEST_REQUEST_LOGS_JSON_FILE = "test-request-log.json";
-
     private static final String TEST_REQUEST_LOGS_IS_JSON_FILE = "test-request-log-is.json";
 
 
@@ -124,6 +134,12 @@ private ExportLogsServiceRequest buildExportLogsServiceRequestFromJsonFile(Strin
         return builder.build();
     }
 
+    private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException {
+        final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder();
+        JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder);
+        return builder.build();
+    }
+
     private String getFileAsJsonString(String requestJsonFileName) throws IOException {
         final StringBuilder jsonBuilder = new StringBuilder();
         try (final InputStream inputStream = Objects.requireNonNull(
@@ -460,6 +476,71 @@ public void testParseExportLogsServiceRequest_InstrumentationLibrarySpans() thro
             validateSpans(spans);
         }
 
+        @Test
+        public void testParseExportMetricsServiceRequest_Guage() throws IOException {
+            final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_GAUGE_METRICS_JSON_FILE);
+            AtomicInteger droppedCount = new AtomicInteger(0);
+            final Collection<Record<? extends Metric>> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true);
+
+            validateGaugeMetricRequest(metrics);
+        }
+
+        @Test
+        public void testParseExportMetricsServiceRequest_Sum() throws IOException {
+            final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_SUM_METRICS_JSON_FILE);
+            AtomicInteger droppedCount = new AtomicInteger(0);
+            final Collection<Record<? extends Metric>> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true);
+            validateSumMetricRequest(metrics);
+        }
+
+        @Test
+        public void testParseExportMetricsServiceRequest_Histogram() throws IOException {
+            final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE);
+            AtomicInteger droppedCount = new AtomicInteger(0);
+            final Collection<Record<? extends Metric>> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true);
+            validateHistogramMetricRequest(metrics);
+        }
+
+        private void validateGaugeMetricRequest(Collection<Record<? extends Metric>> metrics) {
+            assertThat(metrics.size(), equalTo(1));
+            Record<? extends Metric> record = ((List<Record<? extends Metric>>)metrics).get(0);
+            JacksonMetric metric = (JacksonMetric) record.getData();
+            assertThat(metric.getKind(), equalTo(Metric.KIND.GAUGE.toString()));
+            assertThat(metric.getUnit(), equalTo("1"));
+            assertThat(metric.getName(), equalTo("counter-int"));
+            JacksonGauge gauge = (JacksonGauge)metric;
+            assertThat(gauge.getValue(), equalTo(123.0));
+        }
+
+        private void validateSumMetricRequest(Collection<Record<? extends Metric>> metrics) {
+            assertThat(metrics.size(), equalTo(1));
+            Record<? extends Metric> record = ((List<Record<? extends Metric>>)metrics).get(0);
+            JacksonMetric metric = (JacksonMetric) record.getData();
+            assertThat(metric.getKind(), equalTo(Metric.KIND.SUM.toString()));
+            assertThat(metric.getUnit(), equalTo("1"));
+            assertThat(metric.getName(), equalTo("sum-int"));
+            JacksonSum sum = (JacksonSum)metric;
+            assertThat(sum.getValue(), equalTo(456.0));
+        }
+
+        private void validateHistogramMetricRequest(Collection<Record<? extends Metric>> metrics) {
+            assertThat(metrics.size(), equalTo(1));
+            Record<? extends Metric> record = ((List<Record<? extends Metric>>)metrics).get(0);
+            JacksonMetric metric = (JacksonMetric) record.getData();
+            assertThat(metric.getKind(), equalTo(Metric.KIND.HISTOGRAM.toString()));
+            assertThat(metric.getUnit(), equalTo("1"));
+            assertThat(metric.getName(), equalTo("histogram-int"));
+            JacksonHistogram histogram = (JacksonHistogram)metric;
+            assertThat(histogram.getSum(), equalTo(100.0));
+            assertThat(histogram.getCount(), equalTo(30L));
+            assertThat(histogram.getExemplars(), equalTo(Collections.emptyList()));
+            assertThat(histogram.getExplicitBoundsList(), equalTo(List.of(1.0, 2.0, 3.0, 4.0)));
+            assertThat(histogram.getExplicitBoundsCount(), equalTo(4));
+            assertThat(histogram.getBucketCountsList(), equalTo(List.of(3L, 5L, 15L, 6L, 1L)));
+            assertThat(histogram.getBucketCount(), equalTo(5));
+            assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE"));
+        }
+
     }
 
     @Nested
diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json
new file mode 100644
index 0000000000..abca7b0b29
--- /dev/null
+++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json
@@ -0,0 +1,44 @@
+{
+  "resourceMetrics": [
+    {
+      "resource": {
+        "attributes": [
+          {
+            "key": "resource-attr",
+            "value": {
+              "stringValue": "resource-attr-val-1"
+            }
+          }
+        ]
+      },
+      "scopeMetrics": [
+        {
+          "scope": {},
+          "metrics": [
+            {
+              "name": "counter-int",
+              "unit": 1,
+              "gauge": {
+                "dataPoints": [
+                  {
+                    "attributes": [
+                      {
+                        "key": "label-1",
+                        "value": {
+                          "stringValue": "label-value-1"
+                        }
+                      }
+                    ],
+                    "startTimeUnixNano": "1581452773000000789",
+                    "timeUnixNano": "1581452773000000789",
+                    "asInt": "123"
+                  }
+                ]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json
new file mode 100644
index 0000000000..1220de6214
--- /dev/null
+++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json
@@ -0,0 +1,50 @@
+{
+  "resourceMetrics": [
+    {
+      "resource": {
+        "attributes": [
+          {
+            "key": "resource-attr",
+            "value": {
+              "stringValue": "resource-attr-val-1"
+            }
+          }
+        ]
+      },
+      "scopeMetrics": [
+        {
+          "scope": {},
+          "metrics": [
+            {
+              "name": "histogram-int",
+              "unit": 1,
+              "histogram": {
+                "dataPoints": [
+                  {
+                    "attributes": [
+                      {
+                        "key": "label-1",
+                        "value": {
+                          "stringValue": "label-value-1"
+                        }
+                      }
+                    ],
+                    "startTimeUnixNano": "1581452773000000789",
+                    "timeUnixNano": "1581452773000000789",
+                    "count": "30",
+                    "sum": "100",
+                    "bucket_counts": [3, 5, 15, 6, 1],
+                    "explicit_bounds": [1.0, 2.0, 3.0, 4.0],
+                    "exemplars": []
+                  }
+                ],
+                "aggregationTemporality":"2"
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
+
diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json
new file mode 100644
index 0000000000..97d3560cc6
--- /dev/null
+++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json
@@ -0,0 +1,45 @@
+{
+  "resourceMetrics": [
+    {
+      "resource": {
+        "attributes": [
+          {
+            "key": "resource-attr",
+            "value": {
+              "stringValue": "resource-attr-val-1"
+            }
+          }
+        ]
+      },
+      "scopeMetrics": [
+        {
+          "scope": {},
+          "metrics": [
+            {
+              "name": "sum-int",
+              "unit": 1,
+              "sum": {
+                "dataPoints": [
+                  {
+                    "attributes": [
+                      {
+                        "key": "label-1",
+                        "value": {
+                          "stringValue": "label-value-1"
+                        }
+                      }
+                    ],
+                    "startTimeUnixNano": "1581452773000000789",
+                    "timeUnixNano": "1581452773000000789",
+                    "asInt": "456"
+                  }
+                ]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
+