Skip to content

Commit 9fa239d

Browse files
sattvikctamassolteszanku255
authored
feat: multithreaded bulk import (#136)
* feat: multithreaded bulk import (#128) * feat: Add bulk import queries * fix: PR changes * fix: PR changes * fix: removing restriction of connection pool size for bulk import * feat: enable test run trigger from ci by hand * fix: enable test run trigger from ci by hand * Updated config.yml * fix: reverting CI changes * "fix: reverting CI config changes" This reverts commit 4586b90. * fix: fixing transaction rolled back issues with multithreaded bulk import * fix: changelog * fix: put back accidentally deleted comment * feat: mysql implementation for bulk import * fix: using right transaction isolation level for mysql in bulkimport * fix: review fix, onemillion user test move * fix: fixing imports --------- Co-authored-by: Ankit Tiwari <[email protected]> * Update build.gradle * Update pluginInterfaceSupported.json --------- Co-authored-by: Tamas Soltesz <[email protected]> Co-authored-by: Ankit Tiwari <[email protected]>
1 parent f2482ea commit 9fa239d

22 files changed

+2443
-24
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## [Unreleased]
99

10+
- Adds queries for Bulk Import
11+
- Adds support for multithreaded bulk import
12+
1013
## [7.2.0] - 2024-10-03
1114

1215
- Compatible with plugin interface version 6.3

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ plugins {
22
id 'java-library'
33
}
44

5-
version = "7.2.0"
5+
version = "7.3.0"
66

77
repositories {
88
mavenCentral()
@@ -129,4 +129,4 @@ tasks.withType(Test) {
129129
}
130130
}
131131
}
132-
}
132+
}

pluginInterfaceSupported.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_comment": "contains a list of plugin interfaces branch names that this core supports",
33
"versions": [
4-
"6.3"
4+
"6.4"
55
]
6-
}
6+
}
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
/*
2+
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
3+
*
4+
* This software is licensed under the Apache License, Version 2.0 (the
5+
* "License") as published by the Apache Software Foundation.
6+
*
7+
* You may not use this file except in compliance with the License. You may
8+
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
18+
package io.supertokens.storage.mysql;
19+
20+
import java.sql.*;
21+
import java.util.Map;
22+
import java.util.Properties;
23+
import java.util.concurrent.Executor;
24+
25+
/**
26+
* BulkImportProxyConnection is a class implementing the Connection interface, serving as a Connection instance in the bulk import user cronjob.
27+
* This cron extensively utilizes existing queries to import users, all of which internally operate within transactions and those query sometimes
28+
* call the commit/rollback method on the connection.
29+
*
30+
* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures.
31+
* To achieve this, we use our own proxy Connection instance and override the commit/rollback/close methods to do nothing.
32+
*/
33+
34+
public class BulkImportProxyConnection implements Connection {
35+
private Connection con = null;
36+
37+
public BulkImportProxyConnection(Connection con) {
38+
this.con = con;
39+
}
40+
41+
@Override
42+
public void close() throws SQLException {
43+
// We simply ignore when close is called BulkImportProxyConnection
44+
}
45+
46+
@Override
47+
public void commit() throws SQLException {
48+
// We simply ignore when commit is called BulkImportProxyConnection
49+
}
50+
51+
@Override
52+
public void rollback() throws SQLException {
53+
// We simply ignore when rollback is called BulkImportProxyConnection
54+
}
55+
56+
public void closeForBulkImportProxyStorage() throws SQLException {
57+
this.con.close();
58+
}
59+
60+
public void commitForBulkImportProxyStorage() throws SQLException {
61+
this.con.commit();
62+
}
63+
64+
public void rollbackForBulkImportProxyStorage() throws SQLException {
65+
this.con.rollback();
66+
}
67+
68+
/* Following methods are unchaged */
69+
70+
@Override
71+
public Statement createStatement() throws SQLException {
72+
return this.con.createStatement();
73+
}
74+
75+
@Override
76+
public PreparedStatement prepareStatement(String sql) throws SQLException {
77+
return this.con.prepareStatement(sql);
78+
}
79+
80+
@Override
81+
public CallableStatement prepareCall(String sql) throws SQLException {
82+
return this.con.prepareCall(sql);
83+
}
84+
85+
@Override
86+
public String nativeSQL(String sql) throws SQLException {
87+
return this.con.nativeSQL(sql);
88+
}
89+
90+
@Override
91+
public void setAutoCommit(boolean autoCommit) throws SQLException {
92+
this.con.setAutoCommit(autoCommit);
93+
}
94+
95+
@Override
96+
public boolean getAutoCommit() throws SQLException {
97+
return this.con.getAutoCommit();
98+
}
99+
100+
@Override
101+
public boolean isClosed() throws SQLException {
102+
return this.con.isClosed();
103+
}
104+
105+
@Override
106+
public DatabaseMetaData getMetaData() throws SQLException {
107+
return this.con.getMetaData();
108+
}
109+
110+
@Override
111+
public void setReadOnly(boolean readOnly) throws SQLException {
112+
this.con.setReadOnly(readOnly);
113+
}
114+
115+
@Override
116+
public boolean isReadOnly() throws SQLException {
117+
return this.con.isReadOnly();
118+
}
119+
120+
@Override
121+
public void setCatalog(String catalog) throws SQLException {
122+
this.con.setCatalog(catalog);
123+
}
124+
125+
@Override
126+
public String getCatalog() throws SQLException {
127+
return this.con.getCatalog();
128+
}
129+
130+
@Override
131+
public void setTransactionIsolation(int level) throws SQLException {
132+
this.con.setTransactionIsolation(level);
133+
}
134+
135+
@Override
136+
public int getTransactionIsolation() throws SQLException {
137+
return this.con.getTransactionIsolation();
138+
}
139+
140+
@Override
141+
public SQLWarning getWarnings() throws SQLException {
142+
return this.con.getWarnings();
143+
}
144+
145+
@Override
146+
public void clearWarnings() throws SQLException {
147+
this.con.clearWarnings();
148+
}
149+
150+
@Override
151+
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
152+
return this.con.createStatement(resultSetType, resultSetConcurrency);
153+
}
154+
155+
@Override
156+
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
157+
throws SQLException {
158+
return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency);
159+
}
160+
161+
@Override
162+
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
163+
return this.con.prepareCall(sql, resultSetType, resultSetConcurrency);
164+
}
165+
166+
@Override
167+
public Map<String, Class<?>> getTypeMap() throws SQLException {
168+
return this.con.getTypeMap();
169+
}
170+
171+
@Override
172+
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
173+
this.con.setTypeMap(map);
174+
}
175+
176+
@Override
177+
public void setHoldability(int holdability) throws SQLException {
178+
this.con.setHoldability(holdability);
179+
}
180+
181+
@Override
182+
public int getHoldability() throws SQLException {
183+
return this.con.getHoldability();
184+
}
185+
186+
@Override
187+
public Savepoint setSavepoint() throws SQLException {
188+
return this.con.setSavepoint();
189+
}
190+
191+
@Override
192+
public Savepoint setSavepoint(String name) throws SQLException {
193+
return this.con.setSavepoint(name);
194+
}
195+
196+
@Override
197+
public void rollback(Savepoint savepoint) throws SQLException {
198+
this.con.rollback(savepoint);
199+
}
200+
201+
@Override
202+
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
203+
this.con.releaseSavepoint(savepoint);
204+
}
205+
206+
@Override
207+
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
208+
throws SQLException {
209+
return this.con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
210+
}
211+
212+
@Override
213+
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
214+
int resultSetHoldability) throws SQLException {
215+
return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
216+
}
217+
218+
@Override
219+
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
220+
int resultSetHoldability) throws SQLException {
221+
return this.con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
222+
}
223+
224+
@Override
225+
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
226+
return this.con.prepareStatement(sql, autoGeneratedKeys);
227+
}
228+
229+
@Override
230+
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
231+
return this.con.prepareStatement(sql, columnIndexes);
232+
}
233+
234+
@Override
235+
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
236+
return this.con.prepareStatement(sql, columnNames);
237+
}
238+
239+
@Override
240+
public Clob createClob() throws SQLException {
241+
return this.con.createClob();
242+
}
243+
244+
@Override
245+
public Blob createBlob() throws SQLException {
246+
return this.con.createBlob();
247+
}
248+
249+
@Override
250+
public NClob createNClob() throws SQLException {
251+
return this.con.createNClob();
252+
}
253+
254+
@Override
255+
public SQLXML createSQLXML() throws SQLException {
256+
return this.con.createSQLXML();
257+
}
258+
259+
@Override
260+
public boolean isValid(int timeout) throws SQLException {
261+
return this.con.isValid(timeout);
262+
}
263+
264+
@Override
265+
public void setClientInfo(String name, String value) throws SQLClientInfoException {
266+
this.con.setClientInfo(name, value);
267+
}
268+
269+
@Override
270+
public void setClientInfo(Properties properties) throws SQLClientInfoException {
271+
this.con.setClientInfo(properties);
272+
}
273+
274+
@Override
275+
public String getClientInfo(String name) throws SQLException {
276+
return this.con.getClientInfo(name);
277+
}
278+
279+
@Override
280+
public Properties getClientInfo() throws SQLException {
281+
return this.con.getClientInfo();
282+
}
283+
284+
@Override
285+
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
286+
return this.con.createArrayOf(typeName, elements);
287+
}
288+
289+
@Override
290+
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
291+
return this.con.createStruct(typeName, attributes);
292+
}
293+
294+
@Override
295+
public void setSchema(String schema) throws SQLException {
296+
this.con.setSchema(schema);
297+
}
298+
299+
@Override
300+
public String getSchema() throws SQLException {
301+
return this.con.getSchema();
302+
}
303+
304+
@Override
305+
public void abort(Executor executor) throws SQLException {
306+
this.con.abort(executor);
307+
}
308+
309+
@Override
310+
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
311+
this.con.setNetworkTimeout(executor, milliseconds);
312+
}
313+
314+
@Override
315+
public int getNetworkTimeout() throws SQLException {
316+
return this.con.getNetworkTimeout();
317+
}
318+
319+
@Override
320+
public <T> T unwrap(Class<T> iface) throws SQLException {
321+
return this.con.unwrap(iface);
322+
}
323+
324+
@Override
325+
public boolean isWrapperFor(Class<?> iface) throws SQLException {
326+
return this.con.isWrapperFor(iface);
327+
}
328+
}

0 commit comments

Comments
 (0)