From 1f0cb700cddc0186a549db962a544153f933d659 Mon Sep 17 00:00:00 2001 From: Divyansh Bokadia Date: Tue, 4 Feb 2025 13:04:40 -0600 Subject: [PATCH] Data type mapping for postgres Data type mapping for postgres Signed-off-by: Divyansh Bokadia Data type mapping for postgres Signed-off-by: Divyansh Bokadia Data type mapping for postgres Data type mapping for postgres stream Data type mapping for postgres stream --- data-prepper-plugins/rds-source/build.gradle | 1 - .../datatype/{ => mysql}/MySQLDataType.java | 2 +- .../MySQLDataTypeHandler.java} | 4 +- .../MySQLDataTypeHelper.java} | 24 +- .../handler}/BinaryTypeHandler.java | 8 +- .../handler}/JsonTypeHandler.java | 8 +- .../handler}/NumericTypeHandler.java | 8 +- .../handler}/SpatialTypeHandler.java | 8 +- .../handler}/StringTypeHandler.java | 8 +- .../handler}/TemporalTypeHandler.java | 8 +- .../rds/datatype/postgres/ColumnType.java | 27 +- .../datatype/postgres/PostgresDataType.java | 156 ++++++++++++ .../postgres/PostgresDataTypeHandler.java | 23 ++ .../postgres/PostgresDataTypeHelper.java | 39 +++ .../postgres/handler/BinaryTypeHandler.java | 15 ++ .../handler/BitStringTypeHandler.java | 16 ++ .../postgres/handler/BooleanTypeHandler.java | 16 ++ .../postgres/handler/JsonTypeHandler.java | 14 ++ .../handler/NetworkAddressTypeHandler.java | 14 ++ .../postgres/handler/NumericTypeHandler.java | 39 +++ .../postgres/handler/SpatialTypeHandler.java | 232 ++++++++++++++++++ .../postgres/handler/SpecialTypeHandler.java | 28 +++ .../postgres/handler/StringTypeHandler.java | 14 ++ .../postgres/handler/TemporalTypeHandler.java | 221 +++++++++++++++++ .../source/rds/export/DataFileLoader.java | 6 +- .../source/rds/model/TableMetadata.java | 115 +++++---- .../source/rds/resync/ResyncWorker.java | 6 +- .../rds/stream/BinlogEventListener.java | 17 +- .../LogicalReplicationEventProcessor.java | 32 ++- .../datatype/impl/JsonTypeHandlerTest.java | 61 ----- .../handler}/BinaryTypeHandlerTest.java | 47 ++-- .../mysql/handler/JsonTypeHandlerTest.java | 76 ++++++ .../handler}/NumericTypeHandlerTest.java | 75 ++++-- .../handler}/SpatialTypeHandlerTest.java | 38 +-- .../handler}/StringTypeHandlerTest.java | 58 +++-- .../handler}/TemporalTypeHandlerTest.java | 4 +- .../handler/BinaryTypeHandlerTest.java | 28 +++ .../handler/BitStringTypeHandlerTest.java | 48 ++++ .../handler/BooleanTypeHandlerTest.java | 43 ++++ .../postgres/handler/JsonTypeHandlerTest.java | 40 +++ .../NetworkAddressTypeHandlerTest.java | 43 ++++ .../handler/NumericTypeHandlerTest.java | 115 +++++++++ .../handler/SpatialTypeHandlerTest.java | 105 ++++++++ .../handler/SpecialTypeHandlerTest.java | 60 +++++ .../handler/StringTypeHandlerTest.java | 40 +++ .../handler/TemporalTypeHandlerTest.java | 170 +++++++++++++ 46 files changed, 1917 insertions(+), 243 deletions(-) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{ => mysql}/MySQLDataType.java (98%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{DataTypeHandler.java => mysql/MySQLDataTypeHandler.java} (88%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{DataTypeHelper.java => mysql/MySQLDataTypeHelper.java} (53%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/BinaryTypeHandler.java (68%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/JsonTypeHandler.java (69%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/NumericTypeHandler.java (91%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/SpatialTypeHandler.java (97%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/StringTypeHandler.java (81%) rename data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/TemporalTypeHandler.java (95%) create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHelper.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandler.java delete mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java rename data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/BinaryTypeHandlerTest.java (55%) create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/JsonTypeHandlerTest.java rename data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/NumericTypeHandlerTest.java (72%) rename data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/SpatialTypeHandlerTest.java (89%) rename data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/StringTypeHandlerTest.java (51%) rename data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/{impl => mysql/handler}/TemporalTypeHandlerTest.java (97%) create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandlerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandlerTest.java diff --git a/data-prepper-plugins/rds-source/build.gradle b/data-prepper-plugins/rds-source/build.gradle index 09f734cc91..1b325457bf 100644 --- a/data-prepper-plugins/rds-source/build.gradle +++ b/data-prepper-plugins/rds-source/build.gradle @@ -34,5 +34,4 @@ dependencies { testImplementation libs.avro.core testImplementation libs.parquet.hadoop testImplementation libs.parquet.avro -// testImplementation 'org.slf4j:slf4j-simple:2.0.9' } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataType.java similarity index 98% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataType.java index a80ff19ada..44ceb91fc6 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataType.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql; import java.util.HashMap; import java.util.Map; diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataTypeHandler.java similarity index 88% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataTypeHandler.java index 078617892b..fa4d434ac2 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataTypeHandler.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; @@ -7,7 +7,7 @@ * Implementations of this interface are responsible for converting MySQL column values * to appropriate string representations based on their data types. */ -public interface DataTypeHandler { +public interface MySQLDataTypeHandler { String BYTES_KEY = "bytes"; /** * Handles the conversion of a MySQL column value to its string representation. diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHelper.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataTypeHelper.java similarity index 53% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHelper.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataTypeHelper.java index 3279fe74c2..e2af492b11 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHelper.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/MySQLDataTypeHelper.java @@ -1,24 +1,26 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql; + + +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.BinaryTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.JsonTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.NumericTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.SpatialTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.StringTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.TemporalTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.BinaryTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.JsonTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.NumericTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.SpatialTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.StringTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.TemporalTypeHandler; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import java.util.Map; -public class DataTypeHelper { - private static final Map typeHandlers = Map.of( +public class MySQLDataTypeHelper { + private static final Map typeHandlers = Map.of( MySQLDataType.DataCategory.NUMERIC, new NumericTypeHandler(), MySQLDataType.DataCategory.STRING, new StringTypeHandler(), MySQLDataType.DataCategory.TEMPORAL, new TemporalTypeHandler(), MySQLDataType.DataCategory.BINARY, new BinaryTypeHandler(), MySQLDataType.DataCategory.JSON, new JsonTypeHandler(), MySQLDataType.DataCategory.SPATIAL, new SpatialTypeHandler() - ); + ); public static Object getDataByColumnType(final MySQLDataType columnType, final String columnName, final Object value, final TableMetadata metadata) { @@ -28,4 +30,4 @@ public static Object getDataByColumnType(final MySQLDataType columnType, final S return typeHandlers.get(columnType.getCategory()).handle(columnType, columnName, value, metadata); } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/BinaryTypeHandler.java similarity index 68% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/BinaryTypeHandler.java index de0ddfea54..41ce02e4e6 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/BinaryTypeHandler.java @@ -1,12 +1,12 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import java.util.Map; -public class BinaryTypeHandler implements DataTypeHandler { +public class BinaryTypeHandler implements MySQLDataTypeHandler { @Override public String handle(final MySQLDataType columnType, final String columnName, final Object value, final TableMetadata metadata) { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/JsonTypeHandler.java similarity index 69% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/JsonTypeHandler.java index 9ca8789690..ae16687986 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/JsonTypeHandler.java @@ -1,11 +1,11 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; -public class JsonTypeHandler implements DataTypeHandler { +public class JsonTypeHandler implements MySQLDataTypeHandler { @Override public String handle(final MySQLDataType columnType, final String columnName, final Object value, diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/NumericTypeHandler.java similarity index 91% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/NumericTypeHandler.java index e072fd3b9e..d72d9ba180 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/NumericTypeHandler.java @@ -1,15 +1,15 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import java.math.BigInteger; import java.util.ArrayList; import java.util.BitSet; import java.util.Map; -public class NumericTypeHandler implements DataTypeHandler { +public class NumericTypeHandler implements MySQLDataTypeHandler { @Override public Number handle(final MySQLDataType columnType, final String columnName, final Object value, diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/SpatialTypeHandler.java similarity index 97% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/SpatialTypeHandler.java index ff36b1be8b..2d12e9b5fe 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/SpatialTypeHandler.java @@ -1,8 +1,8 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -28,7 +28,7 @@ *
  • GEOMETRYCOLLECTION(POINT(x y), LINESTRING(x y, x y))
  • * * */ -public class SpatialTypeHandler implements DataTypeHandler { +public class SpatialTypeHandler implements MySQLDataTypeHandler { // MySQL geometry type constants private static final int GEOMETRY_POINT = 1; private static final int GEOMETRY_LINESTRING = 2; diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/StringTypeHandler.java similarity index 81% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/StringTypeHandler.java index e01aee0bea..6410291ef9 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/StringTypeHandler.java @@ -1,13 +1,13 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import java.util.ArrayList; import java.util.List; -public class StringTypeHandler implements DataTypeHandler { +public class StringTypeHandler implements MySQLDataTypeHandler { @Override public String handle(final MySQLDataType columnType, final String columnName, final Object value, diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/TemporalTypeHandler.java similarity index 95% rename from data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java rename to data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/TemporalTypeHandler.java index cac3ea7072..0f7c5559f8 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/TemporalTypeHandler.java @@ -1,8 +1,8 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import java.time.LocalDate; import java.time.LocalDateTime; @@ -30,7 +30,7 @@ * Note: Fractional seconds are optional for DATETIME and TIMESTAMP * - YEAR: "yyyy" (Example: "2024") */ -public class TemporalTypeHandler implements DataTypeHandler { +public class TemporalTypeHandler implements MySQLDataTypeHandler { private static final String MYSQL_DATE_FORMAT = "yyyy-MM-dd"; private static final String MYSQL_TIME_FORMAT = "HH:mm:ss[.SSSSSS]"; private static final String MYSQL_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss[.SSSSSS]"; diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java index 7d935ea6f8..c60658a577 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java @@ -22,14 +22,37 @@ public enum ColumnType { DOUBLE_PRECISION(701, "double precision"), NUMERIC(1700, "numeric"), TEXT(25, "text"), + BPCHAR(1042, "bpchar"), VARCHAR(1043, "varchar"), DATE(1082, "date"), TIME(1083, "time"), + TIMETZ(1266, "timetz"), TIMESTAMP(1114, "timestamp"), TIMESTAMPTZ(1184, "timestamptz"), - UUID(2950, "uuid"), + INTERVAL(1186, "interval"), JSON(114, "json"), - JSONB(3802, "jsonb"); + JSONB(3802, "jsonb"), + MONEY(790,"money"), + BIT(1560, "bit"), + VARBIT(1562, "varbit"), + POINT(600,"point"), + LINE(628,"line"), + LSEG(601,"lseg"), + BOX(603, "box"), + PATH(602, "path"), + POLYGON(604, "polygon"), + CIRCLE(718, "circle"), + CIDR(650, "cidr"), + INET(869, "inet"), + MACADDR(829, "macaddr"), + MACADDR8(774, "macaddr8"), + XML(142, "xml"), + UUID(2950, "uuid"), + PG_LSN(3220, "pg_lsn"), + TSVECTOR(3614, "tsvector"), + TSQUERY(3615, "tsquery"), + BYTEA(17, "bytea"); + private final int typeId; private final String typeName; diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java new file mode 100644 index 0000000000..c1889ac6a1 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java @@ -0,0 +1,156 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres; + +import java.util.HashMap; +import java.util.Map; + +public enum PostgresDataType { + // Numeric types + SMALLINT("smallint", DataCategory.NUMERIC), + INTEGER("integer", DataCategory.NUMERIC), + BIGINT("bigint", DataCategory.NUMERIC), + SMALLSERIAL("small", DataCategory.NUMERIC), + SERIAL("mediumint unsigned", DataCategory.NUMERIC), + BIGSERIAL("int", DataCategory.NUMERIC), + REAL("real", DataCategory.NUMERIC), + DOUBLE_PRECISION("double precision", DataCategory.NUMERIC), + NUMERIC("numeric", DataCategory.NUMERIC), + MONEY("money", DataCategory.NUMERIC), + + //String Data types + TEXT("text", DataCategory.STRING), + VARCHAR("varchar", DataCategory.STRING), + BPCHAR("bpchar", DataCategory.STRING), + + //Bit String Data type + BIT("bit",DataCategory.BIT_STRING), + VARBIT("varbit", DataCategory.BIT_STRING), + + //Json Data type + JSON("json",DataCategory.JSON), + JSONB("jsonb",DataCategory.JSON), + + //Boolean data type + BOOLEAN("boolean", DataCategory.BOOLEAN), + + //Date-time data types + DATE("date", DataCategory.TEMPORAL), + TIME("time",DataCategory.TEMPORAL), + TIMETZ("timetz",DataCategory.TEMPORAL), + TIMESTAMP("timestamp",DataCategory.TEMPORAL), + TIMESTAMPTZ("timestamptz",DataCategory.TEMPORAL), + INTERVAL("interval", DataCategory.TEMPORAL), + + //Spatial Data types + POINT("point", DataCategory.SPATIAL), + LINE("line", DataCategory.SPATIAL), + LSEG("lseg", DataCategory.SPATIAL), + BOX("box", DataCategory.SPATIAL), + PATH("path", DataCategory.SPATIAL), + POLYGON("polygon", DataCategory.SPATIAL), + CIRCLE("circle", DataCategory.SPATIAL), + + //Network Address Data types + CIDR("cidr", DataCategory.NETWORK_ADDRESS), + INET("inet", DataCategory.NETWORK_ADDRESS), + MACADDR("macaddr", DataCategory.NETWORK_ADDRESS), + MACADDR8("macaddr8", DataCategory.NETWORK_ADDRESS), + + //Special Data types + UUID( "uuid",DataCategory.SPECIAL), + XML("xml", DataCategory.SPECIAL), + PG_LSN("pg_lsn", DataCategory.SPECIAL), + TSQUERY("tsquery", DataCategory.SPECIAL), + + //Binary data type + BYTEA("bytea", DataCategory.BINARY); + + private static final Map TYPE_MAP; + + static { + TYPE_MAP = new HashMap<>(values().length); + for (PostgresDataType dataType : values()) { + TYPE_MAP.put(dataType.dataType, dataType); + } + } + + private final String dataType; + private final DataCategory category; + + PostgresDataType(String dataType, DataCategory category) { + this.dataType = dataType; + this.category = category; + } + + public String getDataType() { + return dataType; + } + + public DataCategory getCategory() { + return category; + } + + + public static PostgresDataType byDataType(final String dataType) { + final PostgresDataType type = TYPE_MAP.get(dataType.toLowerCase()); + if (type == null) { + throw new IllegalArgumentException("Unsupported PostgresDataType data type: " + dataType); + } + return type; + } + + public enum DataCategory { + NUMERIC, + STRING, + BIT_STRING, + JSON, + BOOLEAN, + TEMPORAL, + SPATIAL, + NETWORK_ADDRESS, + SPECIAL, + BINARY + } + + + public boolean isNumeric() { + return category == DataCategory.NUMERIC; + } + + + public boolean isString() { + return category == DataCategory.STRING; + } + + public boolean isBitString() { + return category == DataCategory.BIT_STRING; + } + + public boolean isJson() { + return category == DataCategory.JSON; + } + + public boolean isBoolean() { + return category == DataCategory.BOOLEAN; + } + + public boolean isTemporal() { + return category == DataCategory.TEMPORAL; + } + + public boolean isSpatial() { + return category == DataCategory.SPATIAL; + } + + public boolean isNetworkAddress() { + return category == DataCategory.NETWORK_ADDRESS; + } + + public boolean isSpecial() { + return category == DataCategory.SPECIAL; + } + + public boolean isBinary() { + return category == DataCategory.BINARY; + } +} + diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java new file mode 100644 index 0000000000..edc8ec8b25 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres; + +/** + * Interface for handling Postgres data type conversions. + * Implementations of this interface are responsible for converting Postgres column values + * to appropriate object representations based on their data types. + */ +public interface PostgresDataTypeHandler { + default Object process(final PostgresDataType columnType, final String columnName, final Object value) { + if(value == null) + return null; + return handle(columnType, columnName, value); + } + /** + * Handles the conversion of a Postgres column value to its object representation. + * + * @param columnType The Postgres data type of the column being processed + * @param columnName The name of the column being processed + * @param value The value to be converted, can be null + * @return An object representation of the converted value + */ + Object handle(PostgresDataType columnType, String columnName, Object value); +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHelper.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHelper.java new file mode 100644 index 0000000000..d30a52e41e --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHelper.java @@ -0,0 +1,39 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.BinaryTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.BitStringTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.BooleanTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.JsonTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.NetworkAddressTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.NumericTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.SpatialTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.SpecialTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.StringTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.TemporalTypeHandler; + + +import java.util.Map; + +public class PostgresDataTypeHelper { + private static final Map typeHandlers = Map.of( + PostgresDataType.DataCategory.NUMERIC, new NumericTypeHandler(), + PostgresDataType.DataCategory.STRING, new StringTypeHandler(), + PostgresDataType.DataCategory.BIT_STRING, new BitStringTypeHandler(), + PostgresDataType.DataCategory.JSON, new JsonTypeHandler(), + PostgresDataType.DataCategory.BOOLEAN, new BooleanTypeHandler(), + PostgresDataType.DataCategory.TEMPORAL, new TemporalTypeHandler(), + PostgresDataType.DataCategory.SPATIAL, new SpatialTypeHandler(), + PostgresDataType.DataCategory.NETWORK_ADDRESS, new NetworkAddressTypeHandler(), + PostgresDataType.DataCategory.SPECIAL, new SpecialTypeHandler(), + PostgresDataType.DataCategory.BINARY, new BinaryTypeHandler() + ); + + public static Object getDataByColumnType(final PostgresDataType columnType, final String columnName, final Object value + ) { + if (value == null) { + return null; + } + + return typeHandlers.get(columnType.getCategory()).handle(columnType, columnName, value); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java new file mode 100644 index 0000000000..ba9cc23721 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java @@ -0,0 +1,15 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + + +public class BinaryTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isBinary()) { + throw new IllegalArgumentException("ColumnType is not Binary : " + columnType); + } + return value.toString(); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandler.java new file mode 100644 index 0000000000..fe483af677 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandler.java @@ -0,0 +1,16 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + +import java.math.BigInteger; + +public class BitStringTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isBitString()) { + throw new IllegalArgumentException("ColumnType is not Bit String: " + columnType); + } + return new BigInteger(value.toString(), 2); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java new file mode 100644 index 0000000000..1d7d1a5e2a --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java @@ -0,0 +1,16 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + +import java.util.Objects; + +public class BooleanTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isBoolean()) { + throw new IllegalArgumentException("ColumnType is not Boolean: " + columnType); + } + return (Objects.equals(value.toString(), "t")) ? Boolean.TRUE: Boolean.FALSE; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandler.java new file mode 100644 index 0000000000..4ec5df595b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandler.java @@ -0,0 +1,14 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + +public class JsonTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isJson()) { + throw new IllegalArgumentException("ColumnType is not Json: " + columnType); + } + return value.toString(); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandler.java new file mode 100644 index 0000000000..08449c3be0 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandler.java @@ -0,0 +1,14 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + +public class NetworkAddressTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isNetworkAddress()) { + throw new IllegalArgumentException("ColumnType is not Network Address: " + columnType); + } + return value.toString(); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandler.java new file mode 100644 index 0000000000..5381ef06be --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandler.java @@ -0,0 +1,39 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + + +public class NumericTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isNumeric()) { + throw new IllegalArgumentException("ColumnType is not numeric: " + columnType); + } + return parseNumericValue(columnType, value.toString()); + } + + private Object parseNumericValue(PostgresDataType columnType, String textValue) { + switch (columnType) { + case SMALLINT: + case SMALLSERIAL: + return Short.parseShort(textValue); + case INTEGER: + case SERIAL: + return Integer.parseInt(textValue); + case BIGINT: + case BIGSERIAL: + return Long.parseLong(textValue); + case REAL: + return Float.parseFloat(textValue); + case DOUBLE_PRECISION: + return Double.parseDouble(textValue); + case NUMERIC: + case MONEY: + return textValue; + default: + throw new IllegalArgumentException("Unsupported numeric type: " + columnType); + } + } + +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandler.java new file mode 100644 index 0000000000..48006d426b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandler.java @@ -0,0 +1,232 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; +import org.postgresql.geometric.PGbox; +import org.postgresql.geometric.PGcircle; +import org.postgresql.geometric.PGline; +import org.postgresql.geometric.PGlseg; +import org.postgresql.geometric.PGpath; +import org.postgresql.geometric.PGpoint; +import org.postgresql.geometric.PGpolygon; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class SpatialTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isSpatial()) { + throw new IllegalArgumentException("ColumnType is not spatial: " + columnType); + } + final String val = value.toString(); + final String dataType = columnType.getDataType(); + return parseGeometry(val, columnName, dataType); + } + + private Object parseGeometry(final String val, final String columnName, final String dataType) { + try { + return parseGeometry(val, dataType); + } catch (Exception e) { + throw new RuntimeException("Error processing the geometry data type value for columnName: " + columnName, e); + } + } + + private Object parseGeometry(final String val, final String dataType) { + switch (dataType) { + case "point": + return parsePoint(val); + case "line": + return parseLine(val); + case "lseg": + return parseLseg(val); + case "box": + return parseBox(val); + case "path": + return parsePath(val); + case "polygon": + return parsePolygon(val); + case "circle": + return parseCircle(val); + default: + throw new IllegalArgumentException("Unsupported spatial data type: " + dataType); + } + } + + private String parsePoint(final String val) { + try { + PGpoint point = new PGpoint(val); + return String.format("POINT(%f %f)", point.x, point.y); + } catch (SQLException e) { + throw new RuntimeException("Error converting String to PGpoint object", e); + } + } + + private String parseLine(final String val) { + try { + PGline line = new PGline(val); + PGpoint[] points = getPointsOnLine(line); + return formatLine(points); + } catch (SQLException e) { + throw new RuntimeException("Error converting String to PGpoint object", e); + } + } + + private String parseLseg(final String val) { + try { + PGlseg lseg = new PGlseg(val); + PGpoint[] points = lseg.point; + if (points == null || points.length != 2) + throw new IllegalArgumentException("LineSegment must have at least 2 points"); + return formatLine(points); + } catch (SQLException e) { + throw new RuntimeException("Error converting String to PGlseg object", e); + } + } + + private String parseBox(final String val) { + try { + PGbox box = new PGbox(val); + PGpoint[] points = box.point; + if (points == null || points.length != 2) { + throw new IllegalArgumentException("Box must have exactly 2 points"); + } + return formatBox(points); + } catch (SQLException e) { + throw new RuntimeException("Error converting String to PGbox object", e); + } + } + + private String parsePath(final String val) { + try { + PGpath path = new PGpath(val); + PGpoint[] points = path.points; + if (points == null || points.length == 0) + throw new IllegalArgumentException("Path must have at least 1 point"); + return formatPath(points, path.open); + } catch (SQLException e) { + throw new RuntimeException("Error converting String to PGpath object", e); + } + } + + private Object parseCircle(String val) { + try { + PGcircle circle = new PGcircle(val); + if (circle.center == null ) { + throw new IllegalArgumentException("Circle must have a center point"); + } + return formatCircle(circle); + } catch (SQLException e) { + throw new RuntimeException("Error converting String to PGcircle object", e); + } + } + + private String parsePolygon(String val) { + try { + PGpolygon polygon = new PGpolygon(val); + PGpoint[] points = polygon.points; + if (points == null || points.length == 0) + throw new IllegalArgumentException("Path must have at least 1 point"); + return formatPolygon(points); + } catch (SQLException e) { + throw new RuntimeException("Error converting String to PGpolygon object", e); + } + } + + private PGpoint[] getPointsOnLine(PGline line) { + double a = line.a; + double b = line.b; + double c = line.c; + + // Case 1: Solve for y when x = 0 + double x1 = 0.0; + double y1 = c == 0 ? 0 : -c / b; + + // Case 2: Solve for y when x = 1 + double x2 = 1.0; + double y2 = -(c + a) / b; + + PGpoint p1 = new PGpoint(x1, y1); + PGpoint p2 = new PGpoint(x2, y2); + return new PGpoint[]{p1, p2}; + } + + private static String formatPoint(final PGpoint p) { + return String.format("%f %f", p.x, p.y); + } + + private void appendPoints(final StringBuilder wkt, final PGpoint[] points) { + for (int i = 0; i < points.length; i++) { + if (i > 0) wkt.append(", "); + wkt.append(formatPoint(points[i])); + } + } + + private String formatLine(final PGpoint[] points) { + StringBuilder wkt = new StringBuilder("LINESTRING("); + appendPoints(wkt, points); + wkt.append(")"); + return wkt.toString(); + } + + private String formatBox(final PGpoint[] points) { + StringBuilder wkt = new StringBuilder("POLYGON(("); + appendPoints(wkt, new PGpoint[]{ + points[0], + new PGpoint(points[1].x, points[0].y), + points[1], + new PGpoint(points[0].x, points[1].y), + points[0] // Close the polygon + }); + wkt.append("))"); + return wkt.toString(); + } + + private String formatPath(final PGpoint[] points, final boolean isOpenPath) { + StringBuilder wkt = new StringBuilder(); + wkt.append(isOpenPath ? "LINESTRING(" : "POLYGON(("); + appendPoints(wkt, points); + //For closed paths first and last point must be same + if (!isOpenPath && !points[0].equals(points[points.length - 1])) { + wkt.append(", ").append(formatPoint(points[0])); + } + wkt.append(isOpenPath ? ")" : "))"); + return wkt.toString(); + } + + private String formatPolygon(final PGpoint[] points) { + StringBuilder wkt = new StringBuilder("POLYGON(("); + appendPoints(wkt, points); + if (!points[0].equals(points[points.length - 1])) { + wkt.append(", ").append(formatPoint(points[0])); + } + wkt.append("))"); + return wkt.toString(); + } + /** + * Converts a PostgreSQL circle (PGcircle) object to a Map representation. + * + * This method takes a PGcircle object and creates a Map with two keys: + * 1. "center": A nested Map containing the x and y coordinates of the circle's center. + * 2. "radius": The radius of the circle. + * + * @param circle The PGcircle object to be converted. + * @return A Map containing the circle's center coordinates and radius. + * The structure of the returned Map is: + * { + * "center": { + * "x": (x-coordinate), + * "y": (y-coordinate) + * }, + * "radius": (radius value) + * } + */ + private Object formatCircle(final PGcircle circle) { + PGpoint center = circle.center; + Map circleObject = new HashMap<>(); + circleObject.put("center", Map.of("x", center.x, "y", center.y)); + circleObject.put("radius", circle.radius); + return circleObject; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandler.java new file mode 100644 index 0000000000..f88f8d88d3 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandler.java @@ -0,0 +1,28 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + +import java.util.UUID; + +public class SpecialTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isSpecial()) { + throw new IllegalArgumentException("ColumnType is not special: " + columnType); + } + return handleSpecialType(columnType, value); + } + + private Object handleSpecialType(PostgresDataType columnType, Object value) { + switch(columnType) { + case UUID: + case XML: + case PG_LSN: + case TSQUERY: + return value.toString(); + default: + throw new IllegalArgumentException("Unsupported special type: " + columnType); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandler.java new file mode 100644 index 0000000000..637672fac0 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandler.java @@ -0,0 +1,14 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + +public class StringTypeHandler implements PostgresDataTypeHandler { + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isString()) { + throw new IllegalArgumentException("ColumnType is not string: " + columnType); + } + return value.toString(); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandler.java new file mode 100644 index 0000000000..01e2938649 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandler.java @@ -0,0 +1,221 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; + +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.LocalDateTime; +import java.time.OffsetTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.format.TextStyle; +import java.time.temporal.ChronoField; +import java.util.Locale; + +public class TemporalTypeHandler implements PostgresDataTypeHandler { + + private static final String POSTGRES_DATE_FORMAT = "yyyy-MM-dd"; + private static final String POSTGRES_TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + private static final String POSTGRES_DATE_MIN_INFINITY = "1970-01-01"; + private static final String POSTGRES_DATE_MAX_INFINITY = "9999-12-31"; + private static final String POSTGRES_TIMESTAMP_MIN_INFINITY = "1970-01-01 00:00:00"; + private static final String POSTGRES_TIMESTAMP_MAX_INFINITY = "9999-12-31 23:59:59"; + private static final String POSTGRES_TIMESTAMPTZ_MIN_INFINITY = "1970-01-01 00:00:00+00"; + private static final String POSTGRES_TIMESTAMPTZ_MAX_INFINITY = "9999-12-31 23:59:59+00"; + + + @Override + public Object handle(PostgresDataType columnType, String columnName, Object value) { + if (!columnType.isTemporal()) { + throw new IllegalArgumentException("ColumnType is not Temporal: " + columnType); + } + final String stringVal = value.toString(); + final String val = (stringVal.equals("-infinity") || stringVal.equals("infinity")) + ? parseInfinity(stringVal, columnType) + : stringVal; + + try { + switch (columnType) { + case DATE: + return handleDate(val); + case TIME: + return handleTime(val); + case TIMETZ: + return handleTimeWithTimeZone(val); + case TIMESTAMP: + return handleTimeStamp(val); + case TIMESTAMPTZ: + return handleTimeStampWithTimeZone(val); + case INTERVAL: + return handleInterval(val); + default: + throw new IllegalArgumentException("Unsupported Temporal data type: " + columnType); + } + }catch (Exception e) { + throw new IllegalArgumentException( + String.format("Failed to parse %s value: %s", columnType, val), e); + } + + } + + private String parseInfinity(String val, PostgresDataType columnType) { + switch (columnType) { + case DATE: + return val.equals("-infinity") ? POSTGRES_DATE_MIN_INFINITY : POSTGRES_DATE_MAX_INFINITY; + case TIMESTAMP: + return val.equals("-infinity") ? POSTGRES_TIMESTAMP_MIN_INFINITY : POSTGRES_TIMESTAMP_MAX_INFINITY; + case TIMESTAMPTZ: + return val.equals("-infinity") ? POSTGRES_TIMESTAMPTZ_MIN_INFINITY : POSTGRES_TIMESTAMPTZ_MAX_INFINITY; + default: + return val; // For other types, return the original value + } + } + + private DateTimeFormatter createDateFormatter() { + return new DateTimeFormatterBuilder() + .appendPattern(POSTGRES_DATE_FORMAT) + .optionalStart() + .appendLiteral(' ') + .appendText(ChronoField.ERA, TextStyle.SHORT) + .optionalEnd() + .toFormatter(Locale.ENGLISH); + } + + private DateTimeFormatter createTimeFormatter() { + return new DateTimeFormatterBuilder() + .appendPattern("HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true) + .optionalEnd() + .toFormatter(); + } + + private DateTimeFormatter createTimeWithTimeZoneFormatter() { + return new DateTimeFormatterBuilder() + .appendPattern("HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true) + .optionalEnd() + .appendOffset("+HH:mm:ss", "+HH") + .toFormatter(); + } + + private DateTimeFormatter createTimeStampWithoutTimeZoneFormatter() { + return new DateTimeFormatterBuilder() + .appendPattern(POSTGRES_TIMESTAMP_FORMAT) + .optionalStart() + .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true) + .optionalEnd() + .toFormatter(); + } + + private DateTimeFormatter createTimeStampWithTimeZoneFormatter() { + return new DateTimeFormatterBuilder() + .appendPattern(POSTGRES_TIMESTAMP_FORMAT) + .optionalStart() + .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true) + .optionalEnd() + .optionalStart() + .appendOffset("+HH:mm:ss", "+HH") + .optionalEnd() + .toFormatter(); + } + + private Long handleDate(final String dateStr) { + try { + final LocalDate date = LocalDate.parse(dateStr, createDateFormatter()); + return date.atStartOfDay(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + } + catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid date format: " + dateStr, e); + } + } + + private Long handleTime(final String timeStr) { + try { + final LocalTime time = LocalTime.parse(timeStr, createTimeFormatter()); + return time.atDate(LocalDate.EPOCH) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } + catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid time format: " + timeStr, e); + } + } + + private Long handleTimeWithTimeZone(final String timetzStr) { + try { + final OffsetTime time = OffsetTime.parse(timetzStr, createTimeWithTimeZoneFormatter()); + return time.atDate(LocalDate.EPOCH) + .toInstant() + .toEpochMilli(); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid time with timezone format: " + timetzStr, e); + + } + } + + private Long handleTimeStamp(final String timeStampStr) { + try { + return LocalDateTime.parse(timeStampStr, createTimeStampWithoutTimeZoneFormatter()) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } + catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid timestamp format: " + timeStampStr, e); + } + } + + private Long handleTimeStampWithTimeZone(final String timeStampWithTimeZoneStr) { + try { + return OffsetDateTime.parse(timeStampWithTimeZoneStr, createTimeStampWithTimeZoneFormatter()) + .toInstant() + .toEpochMilli(); + } + catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid timestamp with timezone format: " + timeStampWithTimeZoneStr, e); + } + } + + private String handleInterval(final String intervalStr) { + int years = 0, months = 0, days = 0, hours = 0, minutes = 0, seconds = 0; + + String[] parts = intervalStr.split(" "); + for (int i = 0; i < parts.length; i++) { + if (parts[i].endsWith("year") || parts[i].endsWith("years")) { + years = Integer.parseInt(parts[i-1]); + } else if (parts[i].endsWith("mon") || parts[i].endsWith("mons")) { + months = Integer.parseInt(parts[i-1]); + } else if (parts[i].endsWith("day") || parts[i].endsWith("days")) { + days = Integer.parseInt(parts[i-1]); + } else if (parts[i].contains(":")) { + String[] timeParts = parts[i].split(":"); + hours = Integer.parseInt(timeParts[0]); + minutes = Integer.parseInt(timeParts[1]); + seconds = Integer.parseInt(timeParts[2]); + break; + } + } + + StringBuilder result = new StringBuilder("P"); + if (years > 0) result.append(years).append("Y"); + if (months > 0) result.append(months).append("M"); + if (days > 0) result.append(days).append("D"); + + if (hours > 0 || minutes > 0 || seconds > 0) { + result.append("T"); + if (hours > 0) result.append(hours).append("H"); + if (minutes > 0) result.append(minutes).append("M"); + if (seconds > 0) result.append(seconds).append("S"); + } + return result.length() > 1 ? result.toString() : "PT0S"; + } + +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index b0a205c9ec..a683adf4ec 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -19,8 +19,8 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHelper; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,7 +176,7 @@ public void run() { private void transformEvent(final Event event, final String fullTableName) { Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); for (Map.Entry entry : event.toMap().entrySet()) { - final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), + final Object data = MySQLDataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), entry.getValue(), null); event.put(entry.getKey(), data); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java index b3bf900eb8..55fe4e5472 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java @@ -12,25 +12,22 @@ public class TableMetadata { public static final String DOT_DELIMITER = "."; - private String databaseName; - private String tableName; - private List columnNames; - private List primaryKeys; - private Map setStrValues; - private Map enumStrValues; - - public TableMetadata(String tableName, String databaseName, List columnNames, List primaryKeys) { - this(tableName, databaseName, columnNames, primaryKeys, Collections.emptyMap(), Collections.emptyMap()); - } - - public TableMetadata(String tableName, String databaseName, List columnNames, List primaryKeys, - Map setStrValues, Map enumStrValues) { - this.tableName = tableName; - this.databaseName = databaseName; - this.columnNames = columnNames; - this.primaryKeys = primaryKeys; - this.setStrValues = setStrValues; - this.enumStrValues = enumStrValues; + private final String databaseName; + private final String tableName; + private final List columnNames; + private final List columnTypes; + private final List primaryKeys; + private final Map setStrValues; + private final Map enumStrValues; + + private TableMetadata(final Builder builder) { + this.databaseName = builder.databaseName; + this.tableName = builder.tableName; + this.columnNames = builder.columnNames; + this.columnTypes = builder.columnTypes; + this.primaryKeys = builder.primaryKeys; + this.setStrValues = builder.setStrValues; + this.enumStrValues = builder.enumStrValues; } public String getDatabaseName() { @@ -49,39 +46,75 @@ public List getColumnNames() { return columnNames; } - public List getPrimaryKeys() { - return primaryKeys; - } - - public void setDatabaseName(String databaseName) { - this.databaseName = databaseName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; + public List getColumnTypes() { + return columnTypes; } - public void setColumnNames(List columnNames) { - this.columnNames = columnNames; - } - - public void setPrimaryKeys(List primaryKeys) { - this.primaryKeys = primaryKeys; + public List getPrimaryKeys() { + return primaryKeys; } public Map getSetStrValues() { return setStrValues; } - public void setSetStrValues(Map setStrValues) { - this.setStrValues = setStrValues; - } - public Map getEnumStrValues() { return enumStrValues; } - public void setEnumStrValues(Map enumStrValues) { - this.enumStrValues = enumStrValues; + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String databaseName; + private String tableName; + private List columnNames = Collections.emptyList(); + private List columnTypes = Collections.emptyList(); + private List primaryKeys = Collections.emptyList(); + private Map setStrValues = Collections.emptyMap(); + private Map enumStrValues = Collections.emptyMap(); + + private Builder() { + } + + public Builder withDatabaseName(String databaseName) { + this.databaseName = databaseName; + return this; + } + + public Builder withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Builder withColumnNames(List columnNames) { + this.columnNames = columnNames; + return this; + } + + public Builder withColumnTypes(List columnTypes) { + this.columnTypes = columnTypes; + return this; + } + + public Builder withPrimaryKeys(List primaryKeys) { + this.primaryKeys = primaryKeys; + return this; + } + + public Builder withSetStrValues(Map setStrValues) { + this.setStrValues = setStrValues; + return this; + } + + public Builder withEnumStrValues(Map enumStrValues) { + this.enumStrValues = enumStrValues; + return this; + } + + public TableMetadata build() { + return new TableMetadata(this); + } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java index f7c9c538d8..afab3dceda 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java @@ -16,8 +16,8 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHelper; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; import org.slf4j.Logger; @@ -158,7 +158,7 @@ private Map mapDataType(final Map rowData, final Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); Map rowDataAfterMapping = new HashMap<>(); for (Map.Entry entry : rowData.entrySet()) { - final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), + final Object data = MySQLDataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), entry.getValue(), null); rowDataAfterMapping.put(entry.getKey(), data); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index 1612e94ec3..039e944c72 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -30,8 +30,8 @@ import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHelper; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.model.ParentTable; @@ -215,9 +215,14 @@ void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { final List primaryKeys = tableMapEventMetadata.getSimplePrimaryKeys().stream() .map(columnNames::get) .collect(Collectors.toList()); - final TableMetadata tableMetadata = new TableMetadata( - eventData.getTable(), eventData.getDatabase(), columnNames, primaryKeys, - getSetStrValues(eventData), getEnumStrValues(eventData)); + final TableMetadata tableMetadata = TableMetadata.builder() + .withTableName(eventData.getTable()) + .withDatabaseName(eventData.getDatabase()) + .withColumnNames(columnNames) + .withPrimaryKeys(primaryKeys) + .withSetStrValues(getSetStrValues(eventData)) + .withEnumStrValues(getEnumStrValues(eventData)) + .build(); if (isTableOfInterest(tableMetadata.getFullTableName())) { tableMetadataMap.put(eventData.getTableId(), tableMetadata); } @@ -372,7 +377,7 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, for (int i = 0; i < rowDataArray.length; i++) { final Map tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName()); final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i)); - final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), + final Object data = MySQLDataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), rowDataArray[i], tableMetadata); rowDataMap.put(columnNames.get(i), data); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java index a2a9aa1017..634944ed71 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java @@ -27,6 +27,8 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.ColumnType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHelper; import org.opensearch.dataprepper.plugins.source.rds.model.MessageType; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.postgresql.replication.LogSequenceNumber; @@ -34,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -197,12 +200,14 @@ void processRelationMessage(ByteBuffer msg) { short numberOfColumns = msg.getShort(); List columnNames = new ArrayList<>(); + List columnTypes = new ArrayList<>(); for (int i = 0; i < numberOfColumns; i++) { int flag = msg.get(); // 1 indicates this column is part of the replica identity // null terminated string String columnName = getNullTerminatedString(msg); ColumnType columnType = ColumnType.getByTypeId(msg.getInt()); String columnTypeName = columnType.getTypeName(); + columnTypes.add(columnTypeName); int typeModifier = msg.getInt(); if (columnType == ColumnType.VARCHAR) { int varcharLength = typeModifier - 4; @@ -214,8 +219,13 @@ void processRelationMessage(ByteBuffer msg) { } final List primaryKeys = getPrimaryKeys(schemaName, tableName); - final TableMetadata tableMetadata = new TableMetadata( - tableName, schemaName, columnNames, primaryKeys); + final TableMetadata tableMetadata = TableMetadata.builder(). + withTableName(tableName). + withDatabaseName(schemaName). + withColumnNames(columnNames). + withColumnTypes(columnTypes). + withPrimaryKeys(primaryKeys). + build(); tableMetadataMap.put((long) tableId, tableMetadata); @@ -270,6 +280,7 @@ void processUpdateMessage(ByteBuffer msg) { final TableMetadata tableMetadata = tableMetadataMap.get((long) tableId); final List columnNames = tableMetadata.getColumnNames(); final List primaryKeys = tableMetadata.getPrimaryKeys(); + final List columnTypes = tableMetadata.getColumnTypes(); final long eventTimestampMillis = currentEventTimestamp; TupleDataType tupleDataType = TupleDataType.fromValue((char) msg.get()); @@ -285,9 +296,9 @@ void processUpdateMessage(ByteBuffer msg) { } else if (tupleDataType == TupleDataType.OLD) { // Replica Identity is set to full, containing both old and new row data - Map oldRowDataMap = getRowDataMap(msg, columnNames); + Map oldRowDataMap = getRowDataMap(msg, columnNames, columnTypes); msg.get(); // should be a char 'N' - Map newRowDataMap = getRowDataMap(msg, columnNames); + Map newRowDataMap = getRowDataMap(msg, columnNames, columnTypes); if (isPrimaryKeyChanged(oldRowDataMap, newRowDataMap, primaryKeys)) { createPipelineEvent(oldRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE); @@ -322,12 +333,12 @@ private void doProcess(ByteBuffer msg, List columnNames, TableMetadata t List primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction) { bytesReceived = msg.capacity(); bytesReceivedSummary.record(bytesReceived); - Map rowDataMap = getRowDataMap(msg, columnNames); - + final List columnTypes = tableMetadata.getColumnTypes(); + Map rowDataMap = getRowDataMap(msg, columnNames, columnTypes); createPipelineEvent(rowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, bulkAction); } - private Map getRowDataMap(ByteBuffer msg, List columnNames) { + private Map getRowDataMap(ByteBuffer msg, List columnNames, List columnTypes) { Map rowDataMap = new HashMap<>(); short numberOfColumns = msg.getShort(); for (int i = 0; i < numberOfColumns; i++) { @@ -338,7 +349,12 @@ private Map getRowDataMap(ByteBuffer msg, List columnNam int length = msg.getInt(); byte[] bytes = new byte[length]; msg.get(bytes); - rowDataMap.put(columnNames.get(i), new String(bytes)); + final String value = new String(bytes, StandardCharsets.UTF_8); + final String columnName = columnNames.get(i); + final String columnType = columnTypes.get(i); + final Object data = PostgresDataTypeHelper.getDataByColumnType(PostgresDataType.byDataType(columnType), columnName, + value); + rowDataMap.put(columnNames.get(i), data); } else { LOG.warn("Unknown column type: {}", type); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java deleted file mode 100644 index 769915a1f3..0000000000 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; - -import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; -import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; - -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class JsonTypeHandlerTest { - - @Test - public void testHandleJsonBytes() { - final DataTypeHandler handler = new JsonTypeHandler(); - final MySQLDataType columnType = MySQLDataType.JSON; - final String columnName = "jsonColumn"; - final String jsonValue = "{\"key\":\"value\"}"; - final byte[] testData = jsonValue.getBytes(); - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - Object result = handler.handle(columnType, columnName, testData, metadata); - - assertThat(result, is(instanceOf(String.class))); - assertThat(result, is(jsonValue)); - } - - @Test - public void testHandleJsonString() { - final DataTypeHandler handler = new JsonTypeHandler(); - final MySQLDataType columnType = MySQLDataType.JSON; - final String columnName = "jsonColumn"; - final String jsonValue = "{\"key\":\"value\"}"; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - Object result = handler.handle(columnType, columnName, jsonValue, metadata); - - assertThat(result, is(instanceOf(String.class))); - assertThat(result, is(jsonValue)); - } - - @Test - public void testHandleInvalidJsonBytes() { - final DataTypeHandler handler = new JsonTypeHandler(); - final MySQLDataType columnType = MySQLDataType.JSON; - final String columnName = "jsonColumn"; - final byte[] testData = new byte[]{5}; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - assertThrows(RuntimeException.class, () -> handler.handle(columnType, columnName, testData, metadata)); - } -} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/BinaryTypeHandlerTest.java similarity index 55% rename from data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java rename to data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/BinaryTypeHandlerTest.java index 5321c80e3a..5add8c3bf5 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/BinaryTypeHandlerTest.java @@ -1,8 +1,8 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import static org.hamcrest.CoreMatchers.instanceOf; @@ -18,12 +18,16 @@ public class BinaryTypeHandlerTest { @Test public void testHandleByteArrayData() { - final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataTypeHandler handler = new BinaryTypeHandler(); final MySQLDataType columnType = MySQLDataType.BINARY; final String columnName = "binaryColumn"; final String testData = UUID.randomUUID().toString(); - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); + final TableMetadata metadata = TableMetadata.builder(). + withTableName(UUID.randomUUID().toString()). + withDatabaseName(UUID.randomUUID().toString()). + withColumnNames(List.of(columnName)). + withPrimaryKeys(List.of(columnName)) + .build(); final Object result = handler.handle(columnType, columnName, testData.getBytes(), metadata); assertThat(result, is(instanceOf(String.class))); @@ -32,11 +36,15 @@ public void testHandleByteArrayData() { @Test public void testHandleMapWithByteArrayData() { - final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataTypeHandler handler = new BinaryTypeHandler(); final MySQLDataType columnType = MySQLDataType.BINARY; final String columnName = "test_column"; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .build(); final String testData = UUID.randomUUID().toString(); final Map value = new HashMap<>(); value.put("bytes", testData.getBytes()); @@ -49,11 +57,15 @@ public void testHandleMapWithByteArrayData() { @Test public void testHandleMapValueNotByteArray() { - final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataTypeHandler handler = new BinaryTypeHandler(); final MySQLDataType columnType = MySQLDataType.BINARY; final String columnName = "test_column"; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .build(); final Map value = new HashMap<>(); final String testData = UUID.randomUUID().toString(); value.put("bytes", testData); @@ -66,13 +78,16 @@ public void testHandleMapValueNotByteArray() { @Test public void testHandleNonByteArrayNonMapValue() { - final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataTypeHandler handler = new BinaryTypeHandler(); final MySQLDataType columnType = MySQLDataType.BINARY; final String columnName = "test_column"; final Integer value = 42; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); - + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .build(); final Object result = handler.handle(columnType, columnName, value, metadata); assertThat(result, is(instanceOf(String.class))); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/JsonTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/JsonTypeHandlerTest.java new file mode 100644 index 0000000000..27b4809c7e --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/JsonTypeHandlerTest.java @@ -0,0 +1,76 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class JsonTypeHandlerTest { + + @Test + public void testHandleJsonBytes() { + final MySQLDataTypeHandler handler = new JsonTypeHandler(); + final MySQLDataType columnType = MySQLDataType.JSON; + final String columnName = "jsonColumn"; + final String jsonValue = "{\"key\":\"value\"}"; + final byte[] testData = jsonValue.getBytes(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + Object result = handler.handle(columnType, columnName, testData, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(jsonValue)); + } + + @Test + public void testHandleJsonString() { + final MySQLDataTypeHandler handler = new JsonTypeHandler(); + final MySQLDataType columnType = MySQLDataType.JSON; + final String columnName = "jsonColumn"; + final String jsonValue = "{\"key\":\"value\"}"; + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + Object result = handler.handle(columnType, columnName, jsonValue, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(jsonValue)); + } + + @Test + public void testHandleInvalidJsonBytes() { + final MySQLDataTypeHandler handler = new JsonTypeHandler(); + final MySQLDataType columnType = MySQLDataType.JSON; + final String columnName = "jsonColumn"; + final byte[] testData = new byte[]{5}; + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + assertThrows(RuntimeException.class, () -> handler.handle(columnType, columnName, testData, metadata)); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/NumericTypeHandlerTest.java similarity index 72% rename from data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java rename to data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/NumericTypeHandlerTest.java index faa809894e..2cacc9e4cb 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/NumericTypeHandlerTest.java @@ -1,11 +1,11 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import java.math.BigDecimal; @@ -27,10 +27,15 @@ public class NumericTypeHandlerTest { @ParameterizedTest @MethodSource("provideNumericTypeData") public void test_handle(final MySQLDataType mySQLDataType, final String columnName, final Object value, final Object expectedValue) { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + final MySQLDataTypeHandler numericTypeHandler = new NumericTypeHandler(); Object result = numericTypeHandler.handle(mySQLDataType, columnName, value, metadata); if (result != null) { @@ -114,11 +119,15 @@ private static Stream provideNumericTypeData() { @Test public void test_handleInvalidType() { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), - List.of("invalid_col"), List.of("invalid_col"), - Collections.emptyMap(), Collections.emptyMap()); - final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of("invalid_col")) + .withPrimaryKeys(List.of("invalid_col")) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + final MySQLDataTypeHandler numericTypeHandler = new NumericTypeHandler(); assertThrows(IllegalArgumentException.class, () -> { numericTypeHandler.handle(MySQLDataType.INT_UNSIGNED, "invalid_col", "not_a_number", metadata); @@ -127,11 +136,15 @@ public void test_handleInvalidType() { @Test public void test_handleInvalidValue() { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), - List.of("int_col"), List.of("int_col"), - Collections.emptyMap(), Collections.emptyMap()); - final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of("int_col")) + .withPrimaryKeys(List.of("int_col")) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + final MySQLDataTypeHandler numericTypeHandler = new NumericTypeHandler(); assertThrows(IllegalArgumentException.class, () -> { numericTypeHandler.handle(MySQLDataType.INT, "int_col", "not_a_number", metadata); @@ -140,11 +153,15 @@ public void test_handleInvalidValue() { @Test public void test_handleInvalidUnsignedBigInt() { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), - List.of("int_col"), List.of("int_col"), - Collections.emptyMap(), Collections.emptyMap()); - final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of("int_col")) + .withPrimaryKeys(List.of("int_col")) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + final MySQLDataTypeHandler numericTypeHandler = new NumericTypeHandler(); assertThrows(IllegalArgumentException.class, () -> { numericTypeHandler.handle(MySQLDataType.BIGINT, "bigint_col", "not_a_number", metadata); @@ -154,11 +171,15 @@ public void test_handleInvalidUnsignedBigInt() { @Test public void test_handleInvalidBit() { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), - List.of("int_col"), List.of("int_col"), - Collections.emptyMap(), Collections.emptyMap()); - final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of("int_col")) + .withPrimaryKeys(List.of("int_col")) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); + final MySQLDataTypeHandler numericTypeHandler = new NumericTypeHandler(); assertThrows(IllegalArgumentException.class, () -> { numericTypeHandler.handle(MySQLDataType.BIT, "bit_col", "not_a_number", metadata); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/SpatialTypeHandlerTest.java similarity index 89% rename from data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java rename to data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/SpatialTypeHandlerTest.java index 194a4c70ef..760f40495d 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/SpatialTypeHandlerTest.java @@ -1,11 +1,11 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import java.nio.ByteBuffer; @@ -25,10 +25,13 @@ public class SpatialTypeHandlerTest { @Test public void test_handleInvalidType() { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), - List.of("invalid_col"), List.of("invalid_col")); - final DataTypeHandler spatialTypeHandler = new SpatialTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of("invalid_col")) + .withPrimaryKeys(List.of("invalid_col")) + .build(); + final MySQLDataTypeHandler spatialTypeHandler = new SpatialTypeHandler(); assertThrows(IllegalArgumentException.class, () -> { spatialTypeHandler.handle(MySQLDataType.GEOMETRY, "invalid_col", "not_a_geometry", metadata); @@ -37,10 +40,13 @@ public void test_handleInvalidType() { @Test public void test_handleInvalidGeometryValue() { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), - List.of("invalid_col"), List.of("invalid_col")); - final DataTypeHandler spatialTypeHandler = new SpatialTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of("invalid_col")) + .withPrimaryKeys(List.of("invalid_col")) + .build(); + final MySQLDataTypeHandler spatialTypeHandler = new SpatialTypeHandler(); assertThrows(RuntimeException.class, () -> { spatialTypeHandler.handle(MySQLDataType.GEOMETRY, "invalid_col", "not_a_geometry".getBytes(), metadata); @@ -50,9 +56,13 @@ public void test_handleInvalidGeometryValue() { @ParameterizedTest @MethodSource("provideGeometryTypeData") public void test_handleGeometryTypes_success(final MySQLDataType mySQLDataType, final String columnName, final Object value, final Object expectedValue) { - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); - final DataTypeHandler numericTypeHandler = new SpatialTypeHandler(); + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .build(); + final MySQLDataTypeHandler numericTypeHandler = new SpatialTypeHandler(); Object result = numericTypeHandler.handle(mySQLDataType, columnName, value, metadata); if (result != null) { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/StringTypeHandlerTest.java similarity index 51% rename from data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandlerTest.java rename to data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/StringTypeHandlerTest.java index 434648b871..39eb3cf55c 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/StringTypeHandlerTest.java @@ -1,8 +1,8 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import java.util.Collections; @@ -19,14 +19,18 @@ public class StringTypeHandlerTest { @Test public void test_handle_char_string() { - DataTypeHandler handler = new StringTypeHandler(); + MySQLDataTypeHandler handler = new StringTypeHandler(); String columnName = "testColumn"; MySQLDataType columnType = MySQLDataType.VARCHAR; final String value = "Hello, World!"; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); Object result = handler.handle(columnType, columnName, value, metadata); assertThat(result, is(instanceOf(String.class))); @@ -35,15 +39,19 @@ public void test_handle_char_string() { @Test public void test_handle_byte_string() { - DataTypeHandler handler = new StringTypeHandler(); + MySQLDataTypeHandler handler = new StringTypeHandler(); String columnName = "testColumn"; MySQLDataType columnType = MySQLDataType.TEXT; final String value = "Hello, World!"; byte[] testBytes = value.getBytes(); - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(Collections.emptyMap()) + .build(); Object result = handler.handle(columnType, columnName, testBytes, metadata); assertThat(result, is(instanceOf(String.class))); @@ -57,10 +65,14 @@ public void test_handle_enum_string() { Integer value = 2; String[] enumValues = { "ENUM1", "ENUM2", "ENUM3" }; MySQLDataType columnType = MySQLDataType.ENUM; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Map.of(columnName, enumValues)); - + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Map.of(columnName, enumValues)) + .withSetStrValues(Collections.emptyMap()) + .build(); String result = handler.handle(columnType, columnName, value, metadata); assertThat(result, is("ENUM2")); @@ -75,10 +87,14 @@ public void test_handle_set_string() { Map setStrValuesMap = new HashMap<>(); setStrValuesMap.put(columnName, setStrValues); MySQLDataType columnType = MySQLDataType.SET; - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - setStrValuesMap, Collections.emptyMap()); - + final TableMetadata metadata = TableMetadata.builder() + .withTableName(UUID.randomUUID().toString()) + .withDatabaseName(UUID.randomUUID().toString()) + .withColumnNames(List.of(columnName)) + .withPrimaryKeys(List.of(columnName)) + .withEnumStrValues(Collections.emptyMap()) + .withSetStrValues(setStrValuesMap) + .build(); String result = handler.handle(columnType, columnName, value, metadata); assertEquals("[Value1, Value2]", result); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/TemporalTypeHandlerTest.java similarity index 97% rename from data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java rename to data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/TemporalTypeHandlerTest.java index fdee424ed3..848a32794a 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/mysql/handler/TemporalTypeHandlerTest.java @@ -1,8 +1,8 @@ -package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import java.time.LocalDate; import java.time.ZoneOffset; diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java new file mode 100644 index 0000000000..7ff8589203 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java @@ -0,0 +1,28 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class BinaryTypeHandlerTest { + private BinaryTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new BinaryTypeHandler(); + } + + @Test + public void test_handle_binary_string() { + String columnName = "testColumn"; + PostgresDataType columnType = PostgresDataType.BYTEA; + String value = "\\xDEADBEEF"; + Object result = handler.process(columnType, columnName, value); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandlerTest.java new file mode 100644 index 0000000000..d9b79cbb41 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BitStringTypeHandlerTest.java @@ -0,0 +1,48 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import java.math.BigInteger; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class BitStringTypeHandlerTest { + private BitStringTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new BitStringTypeHandler(); + } + + @ParameterizedTest + @MethodSource("provideBitTypeData") + public void test_handle_bit_string(PostgresDataType columnType, String value, BigInteger expected) { + String columnName = "testColumn"; + Object result = handler.process(columnType, columnName, value); + assertThat(result, is(instanceOf(BigInteger.class))); + assertThat(result, is(expected)); + } + + @Test + public void test_handleInvalidType() { + assertThrows(IllegalArgumentException.class, () -> { + handler.process(PostgresDataType.INTEGER, "invalid_col", 123); + }); + } + + private static Stream provideBitTypeData() { + return Stream.of( + Arguments.of(PostgresDataType.BIT, "10101", BigInteger.valueOf(21)), + Arguments.of(PostgresDataType.VARBIT, "1010111", BigInteger.valueOf(87)) + ); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java new file mode 100644 index 0000000000..0bd969d6b7 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java @@ -0,0 +1,43 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class BooleanTypeHandlerTest { + private BooleanTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new BooleanTypeHandler(); + } + + @Test + void test_handle_true_values() { + String value = "t"; + Object result = handler.process(PostgresDataType.BOOLEAN, "testColumn", value); + assertThat(result, is(instanceOf(Boolean.class))); + assertThat(result, is(Boolean.TRUE)); + } + + @Test + void test_handle_false_values() { + String value = "f"; + Object result = handler.process(PostgresDataType.BOOLEAN, "testColumn", value); + assertThat(result, is(instanceOf(Boolean.class))); + assertThat(result, is(Boolean.FALSE));} + + @Test + void test_handle_non_boolean_type() { + assertThrows(IllegalArgumentException.class, () -> + handler.process(PostgresDataType.INTEGER, "testColumn", 123) + ); + } + +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandlerTest.java new file mode 100644 index 0000000000..9efe3f9db3 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/JsonTypeHandlerTest.java @@ -0,0 +1,40 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class JsonTypeHandlerTest { + private JsonTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new JsonTypeHandler(); + } + + @ParameterizedTest + @CsvSource({ + "JSON, '{\"key\": \"value\"}'", + "JSONB, '{\"nested\": {\"array\": [1, 2, 3]}}'" + }) + public void test_handle_json(PostgresDataType columnType, String value) { + String columnName = "testColumn"; + Object result = handler.process(columnType, columnName, value); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } + + @Test + public void test_handleInvalidType() { + assertThrows(IllegalArgumentException.class, () -> { + handler.process(PostgresDataType.INTEGER, "invalid_col", 123); + }); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandlerTest.java new file mode 100644 index 0000000000..aaf8d1fa31 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NetworkAddressTypeHandlerTest.java @@ -0,0 +1,43 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class NetworkAddressTypeHandlerTest { + private NetworkAddressTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new NetworkAddressTypeHandler(); + } + @ParameterizedTest + @CsvSource({ + "INET, 192.168.0.1", + "INET, 2001:db8::1234", + "CIDR, 192.168.0.0/24", + "CIDR, 2001:db8::/32", + "MACADDR, 08:00:2b:01:02:03", + "MACADDR8, 08:00:2b:01:02:03:04:05" + }) + public void test_handle_network_address_type(PostgresDataType columnType, String value) { + String columnName = "testColumn"; + Object result = handler.process(columnType, columnName, value); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } + + @Test + public void test_handleInvalidType() { + assertThrows(IllegalArgumentException.class, () -> { + handler.process(PostgresDataType.INTEGER, "invalid_col", 123); + }); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandlerTest.java new file mode 100644 index 0000000000..93fab71226 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/NumericTypeHandlerTest.java @@ -0,0 +1,115 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class NumericTypeHandlerTest { + private NumericTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new NumericTypeHandler(); + } + + @ParameterizedTest + @MethodSource("provideNumericTypeData") + public void test_handle(final PostgresDataType postgresDataType, final String columnName, final Object value, final Object expectedValue) { + Object result = handler.process(postgresDataType, columnName, value); + if (result != null) { + assertThat(result, instanceOf(expectedValue.getClass())); + } + assertThat(result, is(expectedValue)); + } + + private static Stream provideNumericTypeData() { + return Stream.of( + // SMALLINT tests (-32768 to 32767) + Arguments.of(PostgresDataType.SMALLINT, "smallint_col", "1", (short)1), + Arguments.of(PostgresDataType.SMALLINT, "smallint_col", "-32768", (short)-32768), + Arguments.of(PostgresDataType.SMALLINT, "smallint_col", "32767", (short)32767), + Arguments.of(PostgresDataType.SMALLINT, "smallint_col", null, null), + + // SMALLSERIAL tests (1 to 32767) + Arguments.of(PostgresDataType.SMALLSERIAL, "smallserial_col", "1", (short)1), + Arguments.of(PostgresDataType.SMALLSERIAL, "smallserial_col", "32767", (short)32767), + Arguments.of(PostgresDataType.SMALLSERIAL, "smallserial_col", null, null), + + // INTEGER tests (-2,147,483,648 to 2,147,483,647) + Arguments.of(PostgresDataType.INTEGER, "int_col", "2147483647", 2147483647), + Arguments.of(PostgresDataType.INTEGER, "int_col", "-2147483648", -2147483648), + Arguments.of(PostgresDataType.INTEGER, "int_col", "0", 0), + Arguments.of(PostgresDataType.INTEGER, "int_col", null, null), + + // SERIAL tests ( 1 to 2,147,483,647) + Arguments.of(PostgresDataType.SERIAL, "serial_col", "2147483647", 2147483647), + Arguments.of(PostgresDataType.SERIAL, "serial_col", "1", 1), + Arguments.of(PostgresDataType.SERIAL, "serial_col", null, null), + + // BIGINT tests (-9,223,372,036,854,775,808 to 9,223,372,036,854,775,807) + Arguments.of(PostgresDataType.BIGINT, "bigint_col", "9223372036854775807", 9223372036854775807L), + Arguments.of(PostgresDataType.BIGINT, "bigint_col", "-9223372036854775808", -9223372036854775808L), + Arguments.of(PostgresDataType.BIGINT, "bigint_col", "0", 0L), + Arguments.of(PostgresDataType.BIGINT, "bigint_col", null, null), + + // BIGSERIAL tests (1 to 9,223,372,036,854,775,807) + Arguments.of(PostgresDataType.BIGSERIAL, "bigserial_col", "9223372036854775807", 9223372036854775807L), + Arguments.of(PostgresDataType.BIGSERIAL, "bigserial_col", "1", 1L), + Arguments.of(PostgresDataType.BIGSERIAL, "bigserial_col", null, null), + + // REAL tests + Arguments.of(PostgresDataType.REAL, "real_col", Float.toString(123.451234f), 123.451234f), + Arguments.of(PostgresDataType.REAL, "real_col", Float.toString(-123.45f), -123.45f), + Arguments.of(PostgresDataType.REAL, "real_col", Float.toString(0.0f), 0.0f), + Arguments.of(PostgresDataType.REAL, "real_col", Float.toString(Float.MAX_VALUE), Float.MAX_VALUE), + Arguments.of(PostgresDataType.REAL, "real_col", "-Infinity", Float.NEGATIVE_INFINITY), + Arguments.of(PostgresDataType.REAL, "real_col", "Infinity", Float.POSITIVE_INFINITY), + Arguments.of(PostgresDataType.REAL, "real_col", "NaN", Float.NaN), + Arguments.of(PostgresDataType.REAL, "real_col", null,null), + + + // DOUBLE PRECISION tests + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", 123.4567890123412345, 123.4567890123412345), + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", -123.45678901234, -123.45678901234), + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", 0.0, 0.0), + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", Double.MAX_VALUE, Double.MAX_VALUE), + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", "-Infinity", Double.NEGATIVE_INFINITY), + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", "Infinity", Double.POSITIVE_INFINITY), + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", "NaN", Double.NaN), + Arguments.of(PostgresDataType.DOUBLE_PRECISION, "double_precision_col", null, null), + + // NUMERIC tests + Arguments.of(PostgresDataType.NUMERIC, "numeric_col", "123.45", "123.45"), + Arguments.of(PostgresDataType.NUMERIC, "numeric_col", "-123.45", "-123.45"), + Arguments.of(PostgresDataType.NUMERIC, "numeric_col", "0", "0"), + Arguments.of(PostgresDataType.NUMERIC, "numeric_col", null, null), + + // MONEY tests + Arguments.of(PostgresDataType.MONEY, "money_col", "$1200", "$1200"), + Arguments.of(PostgresDataType.MONEY, "money_col", null,null) + ); + } + + @Test + public void test_handleInvalidType() { + assertThrows(IllegalArgumentException.class, () -> { + handler.process(PostgresDataType.TEXT, "invalid_col", "not_a_number"); + }); + } + + @Test + public void test_handleInvalidValue() { + assertThrows(IllegalArgumentException.class, () -> { + handler.process(PostgresDataType.INTEGER, "int_col", "not_a_number"); + }); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandlerTest.java new file mode 100644 index 0000000000..3060b82130 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpatialTypeHandlerTest.java @@ -0,0 +1,105 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SpatialTypeHandlerTest { + private SpatialTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new SpatialTypeHandler(); + } + + @Test + void testHandleNull() { + assertNull(handler.process(PostgresDataType.POINT, "testColumn", null)); + } + + @Test + void testHandleNonSpatialType() { + assertThrows(IllegalArgumentException.class, () -> + handler.process(PostgresDataType.INTEGER, "testColumn", "(1,2)") + ); + } + + @ParameterizedTest + @MethodSource("provideSpatialTypeData") + void testHandleSpatialTypes(PostgresDataType type, String input, String expected) { + Object result = handler.process(type, "testColumn", input); + assertEquals(expected, result); + } + + @Test + void testHandleCircle() { + PostgresDataType dataType = PostgresDataType.CIRCLE; + Object result = handler.process(dataType, "testColumn", "<(0,0),1>"); + assertTrue(result instanceof Map); + Map circleMap = (Map) result; + assertEquals(1.0, circleMap.get("radius")); + assertTrue(circleMap.get("center") instanceof Map); + Map centerMap = (Map) circleMap.get("center"); + assertEquals(0.0, centerMap.get("x")); + assertEquals(0.0, centerMap.get("y")); + } + + @Test + void testParseLsegWithInvalidPoints() { + assertThrows(RuntimeException.class, () -> + handler.process(PostgresDataType.LSEG, "testColumn", "[(1,1)]") + ); + } + + @Test + void testParseBoxWithInvalidPoints() { + assertThrows(RuntimeException.class, () -> + handler.process(PostgresDataType.BOX, "testColumn", "(1,1)") + ); + } + + @Test + void testParsePathWithNoPoints() { + assertThrows(RuntimeException.class, () -> + handler.process(PostgresDataType.PATH, "testColumn", "[]") + ); + } + + @Test + void testParseCircleWithNoCenter() { + assertThrows(RuntimeException.class, () -> + handler.process(PostgresDataType.CIRCLE, "testColumn", "<>") + ); + } + + @Test + void testParsePolygonWithNoPoints() { + assertThrows(RuntimeException.class, () -> + handler.process(PostgresDataType.POLYGON, "testColumn", "()") + ); + } + + private static Stream provideSpatialTypeData() { + return Stream.of( + Arguments.of(PostgresDataType.POINT, "(1,2)", "POINT(1.000000 2.000000)" ), + Arguments.of(PostgresDataType.LINE, "{1,-1,0}", "LINESTRING(0.000000 0.000000, 1.000000 1.000000)" ), + Arguments.of(PostgresDataType.LSEG, "[(1,1),(2,2)]", "LINESTRING(1.000000 1.000000, 2.000000 2.000000)"), + Arguments.of(PostgresDataType.BOX, "(1,1),(2,2)", "POLYGON((1.000000 1.000000, 2.000000 1.000000, 2.000000 2.000000, 1.000000 2.000000, 1.000000 1.000000))" ), + Arguments.of(PostgresDataType.PATH, "[(1,1),(2,2),(3,3)]", "LINESTRING(1.000000 1.000000, 2.000000 2.000000, 3.000000 3.000000)"), + Arguments.of(PostgresDataType.PATH, "((1,1),(2,2),(3,3))", "POLYGON((1.000000 1.000000, 2.000000 2.000000, 3.000000 3.000000, 1.000000 1.000000))"), + Arguments.of(PostgresDataType.POLYGON, "((0,0),(0,1),(1,1),(1,0))", "POLYGON((0.000000 0.000000, 0.000000 1.000000, 1.000000 1.000000, 1.000000 0.000000, 0.000000 0.000000))") + ); + } + +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandlerTest.java new file mode 100644 index 0000000000..aaa40f304f --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/SpecialTypeHandlerTest.java @@ -0,0 +1,60 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SpecialTypeHandlerTest { + private SpecialTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new SpecialTypeHandler(); + } + + @Test + public void test_handle_xml() { + String xmlValue = "Test"; + Object result = handler.process(PostgresDataType.XML, "xmlColumn", xmlValue); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(xmlValue)); + } + + @Test + public void test_handle_pg_lsn() { + String lsnValue = "16/B374D848"; + Object result = handler.process(PostgresDataType.PG_LSN, "lsnColumn", lsnValue); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(lsnValue)); + } + + @Test + public void test_handle_uuid() { + UUID uuidValue = UUID.randomUUID(); + Object result = handler.process(PostgresDataType.UUID, "uuidColumn", uuidValue.toString()); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(uuidValue.toString())); + } + + @Test + public void test_handle_tsquery() { + String tsquery = "'fat' & 'rat'"; + Object result = handler.process(PostgresDataType.TSQUERY, "tsqueryColumn", tsquery); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(tsquery)); + } + + @Test + public void test_handleInvalidType() { + assertThrows(IllegalArgumentException.class, () -> { + handler.process(PostgresDataType.INTEGER, "invalid_col", 123); + }); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandlerTest.java new file mode 100644 index 0000000000..3b69007227 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/StringTypeHandlerTest.java @@ -0,0 +1,40 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class StringTypeHandlerTest { + private StringTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new StringTypeHandler(); + } + @ParameterizedTest + @CsvSource({ + "TEXT, Hello, World!", + "VARCHAR, Hello, World!", + "BPCHAR, Hello, World!" + }) + public void test_handle_text_string(PostgresDataType columnType, String value) { + String columnName = "testColumn"; + Object result = handler.process(columnType, columnName, value); + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } + + @Test + public void test_handleInvalidType() { + assertThrows(IllegalArgumentException.class, () -> { + handler.process(PostgresDataType.INTEGER, "invalid_col", 123); + }); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandlerTest.java new file mode 100644 index 0000000000..f0a8436ecc --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/TemporalTypeHandlerTest.java @@ -0,0 +1,170 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; + + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TemporalTypeHandlerTest { + + private TemporalTypeHandler handler; + + @BeforeEach + void setUp() { + handler = new TemporalTypeHandler(); + } + + private static long getEpochMillisFromDate(final int year, final int month, final int day) { + return LocalDate.of(year, month, day) + .atStartOfDay(ZoneOffset.UTC) // Ensure UTC + .toInstant() + .toEpochMilli(); + } + + private static long getEpochMillis(int year, int month, int day, int hour, int minute, int second, int nanoSeconds) { + return LocalDateTime.of(year, month, day, hour, minute, second, nanoSeconds) + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + } + + @Test + void handle_whenValueIsNull_returnsNull() { + assertNull(handler.process(PostgresDataType.DATE, "test_column", null)); + } + + @ParameterizedTest + @MethodSource("provideDateTestCases") + void handle_withDateType_returnsCorrectEpochMillis(String input, Long expected) { + Object result = handler.process(PostgresDataType.DATE, "date_column", input); + assertEquals(expected, result); + } + + private static Stream provideDateTestCases() { + return Stream.of( + Arguments.of("2023-12-25", getEpochMillisFromDate(2023, 12, 25)), + Arguments.of("-infinity", getEpochMillisFromDate(1970, 1, 1)), + Arguments.of("2024-02-29", getEpochMillisFromDate(2024, 2, 29)), // Leap year + Arguments.of("infinity", getEpochMillisFromDate(9999, 12, 31)) + ); + } + + @ParameterizedTest + @MethodSource("provideTimeWithoutTimeZoneTestCases") + void handle_withTimeWithoutTimeZoneType_returnsCorrectEpochMillis(String input, long expected) { + Object result = handler.process(PostgresDataType.TIME, "time_column", input); + assertEquals(result, expected); + } + + private static Stream provideTimeWithoutTimeZoneTestCases() { + return Stream.of( + Arguments.of("14:30:00", + getEpochMillis(1970, 1, 1, 14, 30, 0, 0)), + Arguments.of("00:00:00", + getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), + Arguments.of("23:59:59", + getEpochMillis(1970, 1, 1, 23, 59, 59, 0)), + Arguments.of("23:59:59.123456", + getEpochMillis(1970, 1, 1, 23, 59, 59, 123456000)), + Arguments.of("23:59:59.123", + getEpochMillis(1970, 1, 1, 23, 59, 59, 123000000)) + ); + } + + @ParameterizedTest + @MethodSource("provideTimeStampWithoutTimeZoneTestCases") + void handle_withTimeStampWithoutTimeZoneType_returnsCorrectEpochMillis(String input, long expected) { + Object result = handler.process(PostgresDataType.TIMESTAMP, "timestamp_column", input); + assertEquals(expected, result); + } + + private static Stream provideTimeStampWithoutTimeZoneTestCases() { + return Stream.of( + Arguments.of("2023-12-25 14:30:00.123456", getEpochMillis(2023, 12, 25, 14, 30, 0, 123456000)), + Arguments.of("2023-12-25 14:30:00.123", getEpochMillis(2023, 12, 25, 14, 30, 0, 123000000)), + Arguments.of("1970-01-01 00:00:00", getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), + Arguments.of("-infinity", getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), + Arguments.of("infinity", getEpochMillis(9999, 12, 31, 23, 59, 59, 0)) + ); + } + + @ParameterizedTest + @MethodSource("provideTimeWithTimeZoneTestCases") + void handle_withTimeWithTimeZoneType_returnsCorrectEpochMillis(String input, long expected) { + Object result = handler.process(PostgresDataType.TIMETZ, "timetz_column", input); + assertEquals(expected, result); + } + + private static Stream provideTimeWithTimeZoneTestCases() { + return Stream.of( + Arguments.of("14:30:00+01", getEpochMillis(1970, 1, 1, 13, 30, 0, 0)), + Arguments.of("23:59:59.999999+05:30", getEpochMillis(1970, 1, 1, 18, 29, 59, 999999000)), + Arguments.of("00:00:00-08:00", getEpochMillis(1970, 1, 1, 8, 0, 0, 0)), + Arguments.of("12:34:56.789+01:00", getEpochMillis(1970, 1, 1, 11, 34, 56, 789000000)) + ); + } + + @ParameterizedTest + @MethodSource("provideTimeStampWithTimeZoneTestCases") + void handle_withTimeStampWithTimeZoneType_returnsCorrectEpochMillis(String input, long expected) { + Object result = handler.process(PostgresDataType.TIMESTAMPTZ, "timestamptz_column", input); + assertEquals(expected, result); + } + + private static Stream provideTimeStampWithTimeZoneTestCases() { + return Stream.of( + Arguments.of("2023-12-25 14:30:00+00:00", getEpochMillis(2023, 12, 25, 14, 30, 0, 0)), + Arguments.of("2023-12-25 23:59:59.999999+05:30", getEpochMillis(2023, 12, 25, 18, 29, 59, 999999000)), + Arguments.of("1970-01-01 00:00:00-08:00", getEpochMillis(1970, 1, 1, 8, 0, 0, 0)), + Arguments.of("2024-02-29 12:34:56.789+01:00", getEpochMillis(2024, 2, 29, 11, 34, 56, 789000000)), + Arguments.of("-infinity", getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), + Arguments.of("infinity", getEpochMillis(9999, 12, 31, 23, 59, 59, 0)) + ); + } + + + @ParameterizedTest + @MethodSource("provideIntervalTestCases") + void handle_withInterval_returnsCorrectISO8601Format(String input, String expected) { + Object result = handler.process(PostgresDataType.INTERVAL, "interval_column", input); + assertEquals(expected, result); + } + + private static Stream provideIntervalTestCases() { + return Stream.of( + Arguments.of("1 year 2 mons 3 days 04:05:06", "P1Y2M3DT4H5M6S"), + Arguments.of("3 days 04:05:06", "P3DT4H5M6S" ), + Arguments.of("1 year", "P1Y") + ); + } + + @Test + void handle_withInvalidFormat_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + handler.process(PostgresDataType.DATE, "date_column", "invalid-date")); + } + + @Test + void handle_withUnsupportedType_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + handler.process(PostgresDataType.VARCHAR, "varchar_column", "2023-12-25")); + } + + @Test + void handle_withEmptyString_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + handler.process(PostgresDataType.DATE, "date_column", "")); + } +}