Skip to content

Commit efa54d4

Browse files
authored
Merge pull request #601 from data-integrations/PLUGIN-1893
[PLUGIN-1893] Adding fields in Oracle source and connector which acts like a flag for Backward compatibility issues for Timestamp and number (precisionless)
2 parents 8ffb8d7 + 8c1f8ea commit efa54d4

File tree

7 files changed

+153
-21
lines changed

7 files changed

+153
-21
lines changed

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ protected DBConnectorPath getDBConnectorPath(String path) {
112112

113113
@Override
114114
protected SchemaReader getSchemaReader(String sessionID) {
115-
return new OracleSourceSchemaReader(sessionID);
115+
return new OracleSourceSchemaReader(sessionID, config.getTreatAsOldTimestamp(),
116+
config.getTreatPrecisionlessNumAsDeci());
116117
}
117118

118119
@Override

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import io.cdap.plugin.db.TransactionIsolationLevel;
2323
import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig;
2424

25-
import java.util.HashMap;
26-
import java.util.Map;
2725
import java.util.Properties;
2826
import javax.annotation.Nullable;
2927

@@ -43,12 +41,14 @@ public OracleConnectorConfig(String host, int port, String user, String password
4341

4442
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
4543
String connectionArguments, String connectionType, String database) {
46-
this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null);
44+
this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null, null,
45+
null);
4746
}
4847

4948
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
5049
String connectionArguments, String connectionType, String database,
51-
String role, Boolean useSSL) {
50+
String role, Boolean useSSL, @Nullable Boolean treatAsOldTimestamp,
51+
@Nullable Boolean treatPrecisionlessNumAsDeci) {
5252

5353
this.host = host;
5454
this.port = port;
@@ -60,6 +60,8 @@ public OracleConnectorConfig(String host, int port, String user, String password
6060
this.database = database;
6161
this.role = role;
6262
this.useSSL = useSSL;
63+
this.treatAsOldTimestamp = treatAsOldTimestamp;
64+
this.treatPrecisionlessNumAsDeci = treatPrecisionlessNumAsDeci;
6365
}
6466

6567
@Override
@@ -86,6 +88,16 @@ public String getConnectionString() {
8688
@Nullable
8789
public Boolean useSSL;
8890

91+
@Name(OracleConstants.TREAT_AS_OLD_TIMESTAMP)
92+
@Description("A hidden field to handle timestamp as CDAP's timestamp micros or string as per old behavior.")
93+
@Nullable
94+
public Boolean treatAsOldTimestamp;
95+
96+
@Name(OracleConstants.TREAT_PRECISIONLESSNUM_AS_DECI)
97+
@Description("A hidden field to handle precision less number as CDAP's decimal per old behavior.")
98+
@Nullable
99+
public Boolean treatPrecisionlessNumAsDeci;
100+
89101
@Override
90102
protected int getDefaultPort() {
91103
return 1521;
@@ -108,6 +120,14 @@ public Boolean getSSlMode() {
108120
return useSSL != null && useSSL;
109121
}
110122

123+
public Boolean getTreatAsOldTimestamp() {
124+
return Boolean.TRUE.equals(treatAsOldTimestamp);
125+
}
126+
127+
public Boolean getTreatPrecisionlessNumAsDeci() {
128+
return Boolean.TRUE.equals(treatPrecisionlessNumAsDeci);
129+
}
130+
111131
@Override
112132
public Properties getConnectionArgumentsProperties() {
113133
Properties prop = super.getConnectionArgumentsProperties();

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ private OracleConstants() {
4343
public static final String TNS_CONNECTION_TYPE = "tns";
4444
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
4545
public static final String USE_SSL = "useSSL";
46+
public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp";
47+
public static final String TREAT_PRECISIONLESSNUM_AS_DECI = "treatPrecisionlessNumAsDeci";
4648

4749
/**
4850
* Constructs the Oracle connection string based on the provided connection type, host, port, and database.

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ protected String createConnectionString() {
6363

6464
@Override
6565
protected SchemaReader getSchemaReader() {
66-
return new OracleSourceSchemaReader();
66+
// PLUGIN-1893 : Based on field/properties from Oracle source and Oracle connection we will pass the flag to control
67+
// handle schema to make it backward compatible.
68+
boolean treatAsOldTimestamp = oracleSourceConfig.getConnection().getTreatAsOldTimestamp();
69+
boolean treatPrecisionlessNumAsDeci = oracleSourceConfig.getConnection().getTreatPrecisionlessNumAsDeci();
70+
71+
return new OracleSourceSchemaReader(null, treatAsOldTimestamp, treatPrecisionlessNumAsDeci);
6772
}
6873

6974
@Override
@@ -127,9 +132,11 @@ public OracleSourceConfig(String host, int port, String user, String password, S
127132
String connectionArguments, String connectionType, String database, String role,
128133
int defaultBatchValue, int defaultRowPrefetch,
129134
String importQuery, Integer numSplits, int fetchSize,
130-
String boundingQuery, String splitBy, Boolean useSSL) {
135+
String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp,
136+
Boolean treatPrecisionlessNumAsDeci) {
131137
this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments,
132-
connectionType, database, role, useSSL);
138+
connectionType, database, role, useSSL, treatAsOldTimestamp,
139+
treatPrecisionlessNumAsDeci);
133140
this.defaultBatchValue = defaultBatchValue;
134141
this.defaultRowPrefetch = defaultRowPrefetch;
135142
this.fetchSize = fetchSize;

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.sql.SQLException;
2727
import java.sql.Types;
2828
import java.util.Set;
29+
import javax.annotation.Nullable;
2930

3031
/**
3132
* Oracle Source schema reader.
@@ -65,14 +66,17 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
6566
);
6667

6768
private final String sessionID;
69+
private final Boolean isTimestampOldBehavior;
70+
private final Boolean isPrecisionlessNumAsDecimal;
6871

6972
public OracleSourceSchemaReader() {
70-
this(null);
73+
this(null, false, false);
7174
}
72-
73-
public OracleSourceSchemaReader(String sessionID) {
74-
super();
75+
public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampOldBehavior,
76+
boolean isPrecisionlessNumAsDecimal) {
7577
this.sessionID = sessionID;
78+
this.isTimestampOldBehavior = isTimestampOldBehavior;
79+
this.isPrecisionlessNumAsDecimal = isPrecisionlessNumAsDecimal;
7680
}
7781

7882
@Override
@@ -81,10 +85,12 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
8185

8286
switch (sqlType) {
8387
case TIMESTAMP_TZ:
84-
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
85-
case Types.TIMESTAMP:
88+
return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
8689
case TIMESTAMP_LTZ:
87-
return Schema.of(Schema.LogicalType.DATETIME);
90+
return isTimestampOldBehavior ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)
91+
: Schema.of(Schema.LogicalType.DATETIME);
92+
case Types.TIMESTAMP:
93+
return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME);
8894
case BINARY_FLOAT:
8995
return Schema.of(Schema.Type.FLOAT);
9096
case BINARY_DOUBLE:
@@ -107,12 +113,24 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
107113
// For a Number type without specified precision and scale, precision will be 0 and scale will be -127
108114
if (precision == 0) {
109115
// reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832
110-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
111-
+ "converting into STRING type to avoid any precision loss.",
112-
metadata.getColumnName(index),
113-
metadata.getColumnTypeName(index),
114-
metadata.getColumnName(index)));
115-
return Schema.of(Schema.Type.STRING);
116+
if (isPrecisionlessNumAsDecimal) {
117+
precision = 38;
118+
scale = 0;
119+
LOG.warn(String.format("%s type with undefined precision and scale is detected, "
120+
+ "there may be a precision loss while running the pipeline. "
121+
+ "Please define an output precision and scale for field '%s' to avoid "
122+
+ "precision loss.",
123+
metadata.getColumnTypeName(index),
124+
metadata.getColumnName(index)));
125+
return Schema.decimalOf(precision, scale);
126+
} else {
127+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
128+
+ "converting into STRING type to avoid any precision loss.",
129+
metadata.getColumnName(index),
130+
metadata.getColumnTypeName(index),
131+
metadata.getColumnName(index)));
132+
return Schema.of(Schema.Type.STRING);
133+
}
116134
}
117135
return Schema.decimalOf(precision, scale);
118136
}

oracle-plugin/widgets/Oracle-batchsource.json

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,44 @@
120120
]
121121
}
122122
},
123+
{
124+
"widget-type": "hidden",
125+
"label": "Treat as old timestamp",
126+
"name": "treatAsOldTimestamp",
127+
"widget-attributes": {
128+
"layout": "inline",
129+
"default": "false",
130+
"options": [
131+
{
132+
"id": "true",
133+
"label": "true"
134+
},
135+
{
136+
"id": "false",
137+
"label": "false"
138+
}
139+
]
140+
}
141+
},
142+
{
143+
"widget-type": "hidden",
144+
"label": "Treat precision less number as Decimal(old behavior)",
145+
"name": "treatPrecisionlessNumAsDeci",
146+
"widget-attributes": {
147+
"layout": "inline",
148+
"default": "false",
149+
"options": [
150+
{
151+
"id": "true",
152+
"label": "true"
153+
},
154+
{
155+
"id": "false",
156+
"label": "false"
157+
}
158+
]
159+
}
160+
},
123161
{
124162
"name": "connectionType",
125163
"label": "Connection Type",
@@ -326,6 +364,14 @@
326364
{
327365
"type": "property",
328366
"name": "transactionIsolationLevel"
367+
},
368+
{
369+
"type": "property",
370+
"name": "getTreatAsOldTimestampConn"
371+
},
372+
{
373+
"type": "property",
374+
"name": "treatPrecisionlessNumAsDeci"
329375
}
330376
]
331377
},

oracle-plugin/widgets/Oracle-connector.json

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,44 @@
129129
}
130130
]
131131
}
132+
},
133+
{
134+
"widget-type": "hidden",
135+
"label": "Treat as old timestamp",
136+
"name": "treatAsOldTimestamp",
137+
"widget-attributes": {
138+
"layout": "inline",
139+
"default": "false",
140+
"options": [
141+
{
142+
"id": "true",
143+
"label": "true"
144+
},
145+
{
146+
"id": "false",
147+
"label": "false"
148+
}
149+
]
150+
}
151+
},
152+
{
153+
"widget-type": "hidden",
154+
"label": "Treat precision less number as Decimal(old behavior)",
155+
"name": "treatPrecisionlessNumAsDeci",
156+
"widget-attributes": {
157+
"layout": "inline",
158+
"default": "false",
159+
"options": [
160+
{
161+
"id": "true",
162+
"label": "true"
163+
},
164+
{
165+
"id": "false",
166+
"label": "false"
167+
}
168+
]
169+
}
132170
}
133171
]
134172
},

0 commit comments

Comments
 (0)