Skip to content

Commit 8031588

Browse files
committed
Add support for dumping to file
Configurable using 'dumpdir', 'dumpprefix' and 'dumponserver'.
1 parent 3299646 commit 8031588

File tree

10 files changed

+440
-35
lines changed

10 files changed

+440
-35
lines changed

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ JDBC data source. The following configuration options are supported:
141141
* **immediatecommit**: commit immediately after each batch instead of only at
142142
the end.
143143

144+
* **dumpdir**: if this is set, the data is written to this directory instead of
145+
being uploaded to MonetDB. Every partition will create a subdirectory containing
146+
one sql file `copy.sql` with the COPY INTO statement, plus one binary file
147+
per column.
148+
149+
Note: A **url** must still be provided. It will be used to determine the
150+
exact data types.
151+
152+
* **dumpprefix**: used with **dumpdir**. By default, the `copy.sql` generated by
153+
**dumpdir** will refer to the binary files with an absolute path based on **dumpdir**.
154+
**dumpprefix** can be used to override that.
155+
156+
* **dumponserver**: use `ON SERVER` in `copy.sql` rather than `ON CLIENT`.
157+
Default 'false'.
158+
144159
## Building
145160

146161
To build, simply run `make`. This will create `monetdb-spark-X.Y.Z-fat.jar`

monetdb-spark/src/main/java/org/monetdb/spark/bincopy/BinCopyDataWriterFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.monetdb.spark.workerside.MonetDataWriter;
2323
import org.monetdb.spark.workerside.Step;
2424

25+
import java.io.IOException;
2526
import java.io.Serial;
2627
import java.io.Serializable;
2728
import java.sql.SQLException;
@@ -49,10 +50,10 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
4950
String identifier = "part" + partitionId + "-task" + taskId;
5051
Collector collector = new Collector();
5152
sqlstmt.identifier(identifier);
52-
BinCopyUploader uploader = new BinCopyUploader(parms.getDestination(), collector, sqlstmt);
53+
Uploader uploader = parms.getDumpdir() == null ? new BinCopyUploader(parms.getDestination(), collector, sqlstmt) : new BinCopyFileDump(parms.getDumpdir(), parms.getDumpPrefix(), collector, sqlstmt, partitionId, taskId);
5354
collector.registerWithConverters(steps);
5455
return new MonetDataWriter(collector, steps, uploader, parms.isImmediateCommit(), identifier, parms.getBatchSize());
55-
} catch (SQLException e) {
56+
} catch (SQLException | IOException e) {
5657
throw new RuntimeException(e);
5758
}
5859
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* SPDX-License-Identifier: MPL-2.0
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* Copyright MonetDB Solutions B.V.
9+
*/
10+
11+
package org.monetdb.spark.bincopy;
12+
13+
import org.jetbrains.annotations.NotNull;
14+
import org.monetdb.spark.workerside.Collector;
15+
16+
import java.io.File;
17+
import java.io.IOException;
18+
import java.io.OutputStream;
19+
import java.nio.charset.StandardCharsets;
20+
import java.nio.file.FileAlreadyExistsException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.Paths;
24+
import java.sql.SQLException;
25+
import java.util.ArrayList;
26+
27+
public class BinCopyFileDump implements Uploader {
28+
private final Path destDir;
29+
private final String copyFileName;
30+
private final String dataFilePrefix;
31+
private final String dataFileSuffix;
32+
private final Collector collector;
33+
private final BinCopySql sqlstmt;
34+
private final ArrayList<OutputStream> outs;
35+
private boolean closed;
36+
private Runnable onStartUpload;
37+
private Runnable onEndUpload;
38+
39+
public BinCopyFileDump(String dumpdir, String dumpPrefix, Collector collector, BinCopySql sqlstmt, int partitionId, long taskId) throws IOException {
40+
this.collector = collector;
41+
this.sqlstmt = sqlstmt;
42+
String part = "part" + partitionId;
43+
String suffix = "tmp" + taskId;
44+
45+
Path topDir = Paths.get(dumpdir);
46+
destDir = topDir.resolve(part);
47+
copyFileName = "copy.sql";
48+
dataFilePrefix = "col";
49+
dataFileSuffix = ".bin";
50+
51+
boolean haveDumpPrefix = dumpPrefix != null && !dumpPrefix.isEmpty();
52+
String prefix = (haveDumpPrefix ? dumpPrefix : ".") + File.separator + part + File.separator + dataFilePrefix;
53+
sqlstmt.nameMapper(i -> getDataFileName(i, prefix, dataFileSuffix));
54+
55+
try {
56+
Files.createDirectory(topDir);
57+
} catch (FileAlreadyExistsException ignored) {
58+
}
59+
Files.createDirectory(destDir);
60+
String sql = sqlstmt.toString();
61+
Files.writeString(getCopyFilePath(), sql + ";", StandardCharsets.UTF_8);
62+
63+
outs = new ArrayList<>(sqlstmt.getColumnCount());
64+
openOutFiles();
65+
}
66+
67+
private @NotNull Path getCopyFilePath() {
68+
return destDir.resolve(copyFileName);
69+
}
70+
71+
private @NotNull Path getDataFilePath(int i) {
72+
String name = getDataFileName(i, dataFilePrefix, dataFileSuffix);
73+
return destDir.resolve(name);
74+
}
75+
76+
private @NotNull String getDataFileName(int i, String prefix, String suffix) {
77+
StringBuilder builder = new StringBuilder(prefix).append(i).append(".");
78+
for (char c : sqlstmt.getColumnName(i).toCharArray()) {
79+
if (!Character.isAlphabetic(c) && !Character.isDigit(c) && c != '_')
80+
c = '~';
81+
builder.append(c);
82+
}
83+
builder.append(suffix);
84+
return builder.toString();
85+
}
86+
87+
private void openOutFiles() throws IOException {
88+
int n = sqlstmt.getColumnCount();
89+
int i = 0;
90+
try {
91+
while (i < n) {
92+
Path p = getDataFilePath(i);
93+
OutputStream s = Files.newOutputStream(p);
94+
outs.add(s);
95+
i++;
96+
}
97+
} catch (IOException e) {
98+
closeOutputFiles(true);
99+
}
100+
}
101+
102+
private void closeOutputFiles(boolean ignoreErrors) throws IOException {
103+
if (closed)
104+
return;
105+
IOException toThrow = null;
106+
for (int i = 0; i < outs.size(); i++) {
107+
OutputStream s = outs.get(i);
108+
if (s != null) {
109+
try {
110+
s.close();
111+
} catch (IOException e) {
112+
if (toThrow == null)
113+
toThrow = e;
114+
}
115+
outs.set(i, null);
116+
}
117+
}
118+
closed = true;
119+
if (toThrow != null && !ignoreErrors)
120+
throw toThrow;
121+
}
122+
123+
@Override
124+
public void uploadBatch() throws SQLException, IOException {
125+
if (onStartUpload != null)
126+
onStartUpload.run();
127+
try {
128+
for (int i = 0; i < outs.size(); i++) {
129+
collector.writeTo(i, outs.get(i));
130+
}
131+
} finally {
132+
if (onEndUpload != null)
133+
onEndUpload.run();
134+
}
135+
}
136+
137+
@Override
138+
public void commit() throws SQLException, IOException {
139+
for (OutputStream s : outs) {
140+
s.flush();
141+
}
142+
closeOutputFiles(false);
143+
}
144+
145+
@Override
146+
public void close() throws IOException {
147+
if (closed) // was committed
148+
return;
149+
closeOutputFiles(true);
150+
silentDeleteFile(getCopyFilePath());
151+
for (int i = 0; i < sqlstmt.getColumnCount(); i++)
152+
silentDeleteFile(getDataFilePath(i));
153+
Files.delete(destDir);
154+
}
155+
156+
private void silentDeleteFile(@NotNull Path p) {
157+
try {
158+
Files.delete(p);
159+
} catch (IOException ignored) {
160+
}
161+
}
162+
163+
@Override
164+
public void setOnStartUpload(Runnable callback) {
165+
onStartUpload = callback;
166+
}
167+
168+
@Override
169+
public void setOnEndUpload(Runnable callback) {
170+
onEndUpload = callback;
171+
172+
}
173+
}

monetdb-spark/src/main/java/org/monetdb/spark/bincopy/BinCopySql.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
import java.io.PrintWriter;
1414
import java.io.Serializable;
1515
import java.io.StringWriter;
16+
import java.util.function.IntFunction;
1617

1718
public class BinCopySql implements Serializable {
18-
String quotedTableName;
19-
String[] columnNames;
20-
String identifier;
21-
String prefix = "";
22-
String suffix = "";
23-
boolean onClient = true;
19+
private final String quotedTableName;
20+
private final String[] columnNames;
21+
private String identifier;
22+
private boolean onClient = true;
23+
private transient IntFunction<String> nameMapper;
2424

2525
public BinCopySql(String quotedTableName, String[] columnNames) {
2626
this.quotedTableName = quotedTableName;
@@ -32,13 +32,8 @@ public BinCopySql identifier(String identifier) {
3232
return this;
3333
}
3434

35-
public BinCopySql suffix(String suffix) {
36-
this.suffix = suffix;
37-
return this;
38-
}
39-
40-
public BinCopySql prefix(String prefix) {
41-
this.prefix = prefix;
35+
public BinCopySql nameMapper(IntFunction<String> f) {
36+
nameMapper = f;
4237
return this;
4338
}
4439

@@ -51,6 +46,14 @@ public BinCopySql onServer(boolean onServer) {
5146
return onClient(!onServer);
5247
}
5348

49+
public int getColumnCount() {
50+
return columnNames.length;
51+
}
52+
53+
public String getColumnName(int i) {
54+
return columnNames[i];
55+
}
56+
5457
@Override
5558
public String toString() {
5659
String sep;
@@ -67,22 +70,23 @@ public String toString() {
6770
}
6871
pw.println(")");
6972

70-
pw.print("FROM ");
71-
final String qprefix, qsuffix;
72-
if (prefix.contains("\\") || prefix.contains("'") || suffix.contains("\\") || suffix.contains("'")) {
73-
qprefix = "R'" + prefix.replace("'", "''");
74-
qsuffix = suffix.replace("'", "''") + "'";
75-
} else {
76-
qprefix = "'" + prefix;
77-
qsuffix = suffix + "'";
78-
}
79-
sep = "";
73+
pw.println("FROM");
74+
sep = "\t";
8075
for (int i = 0; i < columnNames.length; i++) {
81-
pw.printf("%s%s%d%s", sep, qprefix, i, qsuffix);
82-
sep = ", ";
76+
String name = nameMapper != null ? nameMapper.apply(i) : Integer.toString(i);
77+
String escaped = escape(name);
78+
pw.printf("%s%s", sep, escaped);
79+
sep = ",\n\t";
8380
} pw.println();
8481
pw.println(onClient ? "ON CLIENT" : "ON SERVER");
8582

8683
return sw.toString();
8784
}
85+
86+
private String escape(String s) {
87+
if (s.contains("'") || s.contains("\\"))
88+
return "R'" + s.replace("'", "''") + "'";
89+
else
90+
return "'" + s + "'";
91+
}
8892
}

monetdb-spark/src/main/java/org/monetdb/spark/bincopy/BinCopyUploader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.sql.PreparedStatement;
1818
import java.sql.SQLException;
1919

20-
public class BinCopyUploader {
20+
public class BinCopyUploader implements Uploader {
2121
private final Collector collector;
2222
private final MonetConnection conn;
2323
private final PreparedStatement stmt;
@@ -33,27 +33,32 @@ public BinCopyUploader(Destination dest, Collector collector, BinCopySql sqlstmt
3333
stmt = conn.prepareStatement(sql);
3434
}
3535

36+
@Override
3637
public void uploadBatch() throws SQLException {
3738
if (collector.getRowCount() == 0)
3839
return;
3940
stmt.execute();
4041
collector.clear();
4142
}
4243

44+
@Override
4345
public void commit() throws SQLException {
4446
uploadBatch();
4547
conn.commit();
4648
}
4749

50+
@Override
4851
public void close() throws SQLException {
4952
stmt.close();
5053
conn.close();
5154
}
5255

56+
@Override
5357
public void setOnStartUpload(Runnable callback) {
5458
collector.setOnStartUpload(callback);
5559
}
5660

61+
@Override
5762
public void setOnEndUpload(Runnable callback) {
5863
collector.setOnEndUpload(callback);
5964
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: MPL-2.0
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* Copyright MonetDB Solutions B.V.
9+
*/
10+
11+
package org.monetdb.spark.bincopy;
12+
13+
import java.io.IOException;
14+
import java.sql.SQLException;
15+
16+
public interface Uploader {
17+
void uploadBatch() throws SQLException, IOException;
18+
19+
void commit() throws SQLException, IOException;
20+
21+
void close() throws SQLException, IOException;
22+
23+
void setOnStartUpload(Runnable callback);
24+
25+
void setOnEndUpload(Runnable callback);
26+
}

0 commit comments

Comments
 (0)