Skip to content

Commit

Permalink
Data type mapping for postgres
Browse files Browse the repository at this point in the history
Data type mapping for postgres

Signed-off-by: Divyansh Bokadia <[email protected]>

Data type mapping for postgres

Signed-off-by: Divyansh Bokadia <[email protected]>

Data type mapping for postgres
  • Loading branch information
divbok committed Feb 4, 2025
1 parent 13e12a2 commit 3d58a1d
Show file tree
Hide file tree
Showing 44 changed files with 1,657 additions and 91 deletions.
1 change: 0 additions & 1 deletion data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,4 @@ dependencies {
testImplementation libs.avro.core
testImplementation libs.parquet.hadoop
testImplementation libs.parquet.avro
// testImplementation 'org.slf4j:slf4j-simple:2.0.9'
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql;

import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql;

import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;

Expand All @@ -7,7 +7,7 @@
* Implementations of this interface are responsible for converting MySQL column values
* to appropriate string representations based on their data types.
*/
public interface DataTypeHandler {
public interface MySQLDataTypeHandler {
String BYTES_KEY = "bytes";
/**
* Handles the conversion of a MySQL column value to its string representation.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql;


import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.BinaryTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.JsonTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.NumericTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.SpatialTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.StringTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler.TemporalTypeHandler;

import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.BinaryTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.JsonTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.NumericTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.SpatialTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.StringTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.TemporalTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;

import java.util.Map;

public class DataTypeHelper {
private static final Map<MySQLDataType.DataCategory, DataTypeHandler> typeHandlers = Map.of(
public class MySQLDataTypeHelper {
private static final Map<MySQLDataType.DataCategory, MySQLDataTypeHandler> typeHandlers = Map.of(
MySQLDataType.DataCategory.NUMERIC, new NumericTypeHandler(),
MySQLDataType.DataCategory.STRING, new StringTypeHandler(),
MySQLDataType.DataCategory.TEMPORAL, new TemporalTypeHandler(),
MySQLDataType.DataCategory.BINARY, new BinaryTypeHandler(),
MySQLDataType.DataCategory.JSON, new JsonTypeHandler(),
MySQLDataType.DataCategory.SPATIAL, new SpatialTypeHandler()
);
);

public static Object getDataByColumnType(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
Expand All @@ -28,4 +30,4 @@ public static Object getDataByColumnType(final MySQLDataType columnType, final S

return typeHandlers.get(columnType.getCategory()).handle(columnType, columnName, value, metadata);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.util.Map;

public class BinaryTypeHandler implements DataTypeHandler {
public class BinaryTypeHandler implements MySQLDataTypeHandler {
@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler;

import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary;
import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

public class JsonTypeHandler implements DataTypeHandler {
public class JsonTypeHandler implements MySQLDataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Map;

public class NumericTypeHandler implements DataTypeHandler {
public class NumericTypeHandler implements MySQLDataTypeHandler {

@Override
public Number handle(final MySQLDataType columnType, final String columnName, final Object value,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -28,7 +28,7 @@
* <li>GEOMETRYCOLLECTION(POINT(x y), LINESTRING(x y, x y))</li>
* </ul>
* */
public class SpatialTypeHandler implements DataTypeHandler {
public class SpatialTypeHandler implements MySQLDataTypeHandler {
// MySQL geometry type constants
private static final int GEOMETRY_POINT = 1;
private static final int GEOMETRY_LINESTRING = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.util.ArrayList;
import java.util.List;

public class StringTypeHandler implements DataTypeHandler {
public class StringTypeHandler implements MySQLDataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;
package org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.handler;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -30,7 +30,7 @@
* Note: Fractional seconds are optional for DATETIME and TIMESTAMP
* - YEAR: "yyyy" (Example: "2024")
*/
public class TemporalTypeHandler implements DataTypeHandler {
public class TemporalTypeHandler implements MySQLDataTypeHandler {
private static final String MYSQL_DATE_FORMAT = "yyyy-MM-dd";
private static final String MYSQL_TIME_FORMAT = "HH:mm:ss[.SSSSSS]";
private static final String MYSQL_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss[.SSSSSS]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,37 @@ public enum ColumnType {
DOUBLE_PRECISION(701, "double precision"),
NUMERIC(1700, "numeric"),
TEXT(25, "text"),
BPCHAR(1042, "bpchar"),
VARCHAR(1043, "varchar"),
DATE(1082, "date"),
TIME(1083, "time"),
TIMETZ(1266, "timetz"),
TIMESTAMP(1114, "timestamp"),
TIMESTAMPTZ(1184, "timestamptz"),
UUID(2950, "uuid"),
INTERVAL(1186, "interval"),
JSON(114, "json"),
JSONB(3802, "jsonb");
JSONB(3802, "jsonb"),
MONEY(790,"money"),
BIT(1560, "bit"),
VARBIT(1562, "varbit"),
POINT(600,"point"),
LINE(628,"line"),
LSEG(601,"lseg"),
BOX(603, "box"),
PATH(602, "path"),
POLYGON(604, "polygon"),
CIRCLE(718, "circle"),
CIDR(650, "cidr"),
INET(869, "inet"),
MACADDR(829, "macaddr"),
MACADDR8(774, "macaddr8"),
XML(142, "xml"),
UUID(2950, "uuid"),
PG_LSN(3220, "pg_lsn"),
TSVECTOR(3614, "tsvector"),
TSQUERY(3615, "tsquery"),
BYTEA(17, "bytea");


private final int typeId;
private final String typeName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres;

import java.util.HashMap;
import java.util.Map;

public enum PostgresDataType {
// Numeric types
SMALLINT("smallint", DataCategory.NUMERIC),
INTEGER("integer", DataCategory.NUMERIC),
BIGINT("bigint", DataCategory.NUMERIC),
SMALLSERIAL("small", DataCategory.NUMERIC),
SERIAL("mediumint unsigned", DataCategory.NUMERIC),
BIGSERIAL("int", DataCategory.NUMERIC),
REAL("real", DataCategory.NUMERIC),
DOUBLE_PRECISION("double precision", DataCategory.NUMERIC),
NUMERIC("numeric", DataCategory.NUMERIC),
MONEY("money", DataCategory.NUMERIC),

//String Data types
TEXT("text", DataCategory.STRING),
VARCHAR("varchar", DataCategory.STRING),
BPCHAR("bpchar", DataCategory.STRING),

//Bit String Data type
BIT("bit",DataCategory.BIT_STRING),
VARBIT("varbit", DataCategory.BIT_STRING),

//Json Data type
JSON("json",DataCategory.JSON),
JSONB("jsonb",DataCategory.JSON),

//Boolean data type
BOOLEAN("boolean", DataCategory.BOOLEAN),

//Date-time data types
DATE("date", DataCategory.TEMPORAL),
TIME("time",DataCategory.TEMPORAL),
TIMETZ("timetz",DataCategory.TEMPORAL),
TIMESTAMP("timestamp",DataCategory.TEMPORAL),
TIMESTAMPTZ("timestamptz",DataCategory.TEMPORAL),
INTERVAL("interval", DataCategory.TEMPORAL),

//Spatial Data types
POINT("point", DataCategory.SPATIAL),
LINE("line", DataCategory.SPATIAL),
LSEG("lseg", DataCategory.SPATIAL),
BOX("box", DataCategory.SPATIAL),
PATH("path", DataCategory.SPATIAL),
POLYGON("polygon", DataCategory.SPATIAL),
CIRCLE("circle", DataCategory.SPATIAL),

//Network Address Data types
CIDR("cidr", DataCategory.NETWORK_ADDRESS),
INET("inet", DataCategory.NETWORK_ADDRESS),
MACADDR("macaddr", DataCategory.NETWORK_ADDRESS),
MACADDR8("macaddr8", DataCategory.NETWORK_ADDRESS),

//Special Data types
UUID( "uuid",DataCategory.SPECIAL),
XML("xml", DataCategory.SPECIAL),
PG_LSN("pg_lsn", DataCategory.SPECIAL),
TSQUERY("tsquery", DataCategory.SPECIAL),

//Binary data type
BYTEA("bytea", DataCategory.BINARY);

private static final Map<String, PostgresDataType> TYPE_MAP;

static {
TYPE_MAP = new HashMap<>(values().length);
for (PostgresDataType dataType : values()) {
TYPE_MAP.put(dataType.dataType, dataType);
}
}

private final String dataType;
private final DataCategory category;

PostgresDataType(String dataType, DataCategory category) {
this.dataType = dataType;
this.category = category;
}

public String getDataType() {
return dataType;
}

public DataCategory getCategory() {
return category;
}


public static PostgresDataType byDataType(final String dataType) {
final PostgresDataType type = TYPE_MAP.get(dataType.toLowerCase());
if (type == null) {
throw new IllegalArgumentException("Unsupported PostgresDataType data type: " + dataType);
}
return type;
}

public enum DataCategory {
NUMERIC,
STRING,
BIT_STRING,
JSON,
BOOLEAN,
TEMPORAL,
SPATIAL,
NETWORK_ADDRESS,
SPECIAL,
BINARY
}


public boolean isNumeric() {
return category == DataCategory.NUMERIC;
}


public boolean isString() {
return category == DataCategory.STRING;
}

public boolean isBitString() {
return category == DataCategory.BIT_STRING;
}

public boolean isJson() {
return category == DataCategory.JSON;
}

public boolean isBoolean() {
return category == DataCategory.BOOLEAN;
}

public boolean isTemporal() {
return category == DataCategory.TEMPORAL;
}

public boolean isSpatial() {
return category == DataCategory.SPATIAL;
}

public boolean isNetworkAddress() {
return category == DataCategory.NETWORK_ADDRESS;
}

public boolean isSpecial() {
return category == DataCategory.SPECIAL;
}

public boolean isBinary() {
return category == DataCategory.BINARY;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres;

/**
* Interface for handling Postgres data type conversions.
* Implementations of this interface are responsible for converting Postgres column values
* to appropriate object representations based on their data types.
*/
public interface PostgresDataTypeHandler {
/**
* Handles the conversion of a Postgres column value to its object representation.
*
* @param columnType The Postgres data type of the column being processed
* @param columnName The name of the column being processed
* @param value The value to be converted, can be null
* @return An object representation of the converted value
*/
Object handle(PostgresDataType columnType, String columnName, Object value);
}
Loading

0 comments on commit 3d58a1d

Please sign in to comment.