Skip to content

Commit 3299646

Browse files
committed
Factor sql generation out of BinCopyUploader
1 parent 8db235a commit 3299646

File tree

6 files changed

+105
-34
lines changed

6 files changed

+105
-34
lines changed

monetdb-spark/src/main/java/org/monetdb/spark/bincopy/BinCopyDataWriterFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,22 @@ public class BinCopyDataWriterFactory implements DataWriterFactory, Serializable
3434
private static final long serialVersionUID = 0L;
3535

3636
private final Parms parms;
37-
private final String[] columns;
3837
private final Step[] steps;
38+
private final BinCopySql sqlstmt;
3939

40-
public BinCopyDataWriterFactory(Parms parms, String[] columns, Step[] steps) {
40+
public BinCopyDataWriterFactory(Parms parms, Step[] steps, BinCopySql sqlstmt) {
4141
this.parms = parms;
42-
this.columns = columns;
4342
this.steps = steps;
43+
this.sqlstmt = sqlstmt;
4444
}
4545

4646
@Override
4747
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
4848
try {
4949
String identifier = "part" + partitionId + "-task" + taskId;
5050
Collector collector = new Collector();
51-
BinCopyUploader uploader = new BinCopyUploader(parms.getDestination(), collector, identifier, columns);
51+
sqlstmt.identifier(identifier);
52+
BinCopyUploader uploader = new BinCopyUploader(parms.getDestination(), collector, sqlstmt);
5253
collector.registerWithConverters(steps);
5354
return new MonetDataWriter(collector, steps, uploader, parms.isImmediateCommit(), identifier, parms.getBatchSize());
5455
} catch (SQLException e) {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* SPDX-License-Identifier: MPL-2.0
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* Copyright MonetDB Solutions B.V.
9+
*/
10+
11+
package org.monetdb.spark.bincopy;
12+
13+
import java.io.PrintWriter;
14+
import java.io.Serializable;
15+
import java.io.StringWriter;
16+
17+
public class BinCopySql implements Serializable {
18+
String quotedTableName;
19+
String[] columnNames;
20+
String identifier;
21+
String prefix = "";
22+
String suffix = "";
23+
boolean onClient = true;
24+
25+
public BinCopySql(String quotedTableName, String[] columnNames) {
26+
this.quotedTableName = quotedTableName;
27+
this.columnNames = columnNames.clone();
28+
}
29+
30+
public BinCopySql identifier(String identifier) {
31+
this.identifier = identifier;
32+
return this;
33+
}
34+
35+
public BinCopySql suffix(String suffix) {
36+
this.suffix = suffix;
37+
return this;
38+
}
39+
40+
public BinCopySql prefix(String prefix) {
41+
this.prefix = prefix;
42+
return this;
43+
}
44+
45+
public BinCopySql onClient(boolean onClient) {
46+
this.onClient = onClient;
47+
return this;
48+
}
49+
50+
public BinCopySql onServer(boolean onServer) {
51+
return onClient(!onServer);
52+
}
53+
54+
@Override
55+
public String toString() {
56+
String sep;
57+
58+
StringWriter sw = new StringWriter();
59+
PrintWriter pw = new PrintWriter(sw);
60+
61+
String identifierComment = identifier != null ? " /* " + identifier.replace("*/", "* /") + " */" : "";
62+
pw.printf("COPY%s LITTLE ENDIAN BINARY INTO %s (", identifierComment, quotedTableName);
63+
sep = "";
64+
for (String col : columnNames) {
65+
pw.printf("%s\"%s\"", sep, col.replace("\"", "\"\""));
66+
sep = ", ";
67+
}
68+
pw.println(")");
69+
70+
pw.print("FROM ");
71+
final String qprefix, qsuffix;
72+
if (prefix.contains("\\") || prefix.contains("'") || suffix.contains("\\") || suffix.contains("'")) {
73+
qprefix = "R'" + prefix.replace("'", "''");
74+
qsuffix = suffix.replace("'", "''") + "'";
75+
} else {
76+
qprefix = "'" + prefix;
77+
qsuffix = suffix + "'";
78+
}
79+
sep = "";
80+
for (int i = 0; i < columnNames.length; i++) {
81+
pw.printf("%s%s%d%s", sep, qprefix, i, qsuffix);
82+
sep = ", ";
83+
} pw.println();
84+
pw.println(onClient ? "ON CLIENT" : "ON SERVER");
85+
86+
return sw.toString();
87+
}
88+
}

monetdb-spark/src/main/java/org/monetdb/spark/bincopy/BinCopyUploader.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
import org.monetdb.spark.common.Destination;
1515
import org.monetdb.spark.workerside.Collector;
1616

17-
import java.io.PrintWriter;
18-
import java.io.StringWriter;
1917
import java.sql.PreparedStatement;
2018
import java.sql.SQLException;
2119

@@ -24,34 +22,14 @@ public class BinCopyUploader {
2422
private final MonetConnection conn;
2523
private final PreparedStatement stmt;
2624

27-
public BinCopyUploader(Destination dest, Collector collector, String identifier, String[] columns) throws SQLException {
25+
public BinCopyUploader(Destination dest, Collector collector, BinCopySql sqlstmt) throws SQLException {
2826
String sep;
2927
this.collector = collector;
3028
this.conn = dest.connect();
3129
conn.setAutoCommit(false);
3230
conn.setUploadHandler(collector);
3331

34-
StringWriter sw = new StringWriter();
35-
PrintWriter pw = new PrintWriter(sw);
36-
37-
pw.printf("COPY /* %s */ LITTLE ENDIAN BINARY INTO %s (", identifier, dest.getTable());
38-
sep = "";
39-
for (String col : columns) {
40-
pw.printf("%s\"%s\"", sep, col.replace("\"", "\"\""));
41-
sep = ", ";
42-
}
43-
pw.println(")");
44-
45-
pw.print("FROM ");
46-
sep = "";
47-
for (int i = 0; i < columns.length; i++) {
48-
pw.printf("%s'%d'", sep, i);
49-
sep = ", ";
50-
}
51-
pw.println();
52-
pw.println("ON CLIENT");
53-
54-
String sql = sw.toString();
32+
String sql = sqlstmt.toString();
5533
stmt = conn.prepareStatement(sql);
5634
}
5735

monetdb-spark/src/main/java/org/monetdb/spark/driverside/MonetBatchWrite.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
1010
import org.apache.spark.sql.connector.write.WriterCommitMessage;
1111
import org.monetdb.spark.bincopy.BinCopyDataWriterFactory;
12+
import org.monetdb.spark.bincopy.BinCopySql;
1213
import org.monetdb.spark.workerside.Step;
1314

1415
/**
@@ -23,18 +24,18 @@
2324
*/
2425
public class MonetBatchWrite implements BatchWrite {
2526
private final Parms parms;
26-
private final String[] columnNames;
2727
private final Step[] steps;
28+
private final BinCopySql sqlstmt;
2829

29-
public MonetBatchWrite(Parms parms, String[] columnNames, Step[] steps) {
30+
public MonetBatchWrite(Parms parms, Step[] steps, BinCopySql sqlstmt) {
3031
this.parms = parms;
31-
this.columnNames = columnNames;
3232
this.steps = steps;
33+
this.sqlstmt = sqlstmt;
3334
}
3435

3536
@Override
3637
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
37-
return new BinCopyDataWriterFactory(parms, columnNames, steps);
38+
return new BinCopyDataWriterFactory(parms, steps, sqlstmt);
3839
}
3940

4041
@Override

monetdb-spark/src/main/java/org/monetdb/spark/driverside/MonetWrite.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.spark.sql.connector.metric.CustomMetric;
88
import org.apache.spark.sql.connector.write.BatchWrite;
99
import org.apache.spark.sql.connector.write.Write;
10+
import org.monetdb.spark.bincopy.BinCopySql;
1011
import org.monetdb.spark.bincopy.PlanBuilder;
1112
import org.monetdb.spark.common.ColumnDescr;
1213
import org.monetdb.spark.common.Destination;
@@ -53,7 +54,8 @@ public MonetWrite(Parms parms) {
5354

5455
@Override
5556
public BatchWrite toBatch() {
56-
return new MonetBatchWrite(parms, builder.getColumns(), builder.getPlan());
57+
BinCopySql sqlstmt = new BinCopySql(this.parms.getDestination().getTable(), builder.getColumns());
58+
return new MonetBatchWrite(parms, builder.getPlan(), sqlstmt);
5759
}
5860

5961
@Override

monetdb-spark/src/test/java/org/monetdb/spark/bincopy/UploadTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public void testUploadingData() throws SQLException, ConversionError, IOExceptio
6565
Collector collector = new Collector();
6666
collector.registerWithConverters(steps);
6767
String identifier = "uploadtest";
68-
BinCopyUploader uploader = new BinCopyUploader(dest, collector, identifier, builder.getColumns());
68+
BinCopySql sqlstmt = new BinCopySql(dest.getTable(), builder.getColumns()).identifier(identifier);
69+
BinCopyUploader uploader = new BinCopyUploader(dest, collector, sqlstmt);
6970
long batchSize = Long.MAX_VALUE;
7071
MonetDataWriter dataWriter = new MonetDataWriter(collector, steps, uploader, false, identifier, batchSize);
7172

0 commit comments

Comments
 (0)