-
Notifications
You must be signed in to change notification settings - Fork 14
feat: multithreaded bulk import #235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
sattvikc
merged 26 commits into
feat/bulk-import-base
from
feat/multithreaded-bulk-import
Dec 19, 2024
Merged
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
1f5d327
feat: Add BulkImport APIs and cron
anku255 cc2ffe1
fix: PR changes
anku255 38882d7
fix: PR changes
anku255 6d4da2e
fix: PR changes
anku255 c39e8e7
fix: PR changes
anku255 5634523
fix: PR changes
anku255 3b0df86
fix: PR changes
anku255 737db91
fix: Update version and changelog
anku255 1375803
fix: PR changes
anku255 c6a9329
Merge branch 'feat/bulk-import-base' into feat/bulk-import-1
anku255 3ecd8ea
fix: PR changes
anku255 ecb078b
fix: PR changes
anku255 4404916
fix: removing restriction of connection pool size for bulk import
tamassoltesz 9c093fb
fix: actually closing the connection
tamassoltesz e115ebe
fix: add bulk import retry logic for postgres too
tamassoltesz bc03e1a
chore: merge master to feature branch
tamassoltesz a6ed05f
fix: fix failing tests
tamassoltesz b83a780
chore: current state save
tamassoltesz dfcdf49
chore: merging master to feature branch
tamassoltesz a5c740b
fix: fixing merge error with changelog
tamassoltesz 57f7d04
feat: bulk inserting the bulk migration data
tamassoltesz 11bd067
fix: fixes and error handling changes
tamassoltesz ec02abb
fix: fixing tests
tamassoltesz 05edffc
chore: changelog and build version update
tamassoltesz 12768c9
fix: handling app/tenant not found
tamassoltesz 9803649
fix: review fix
tamassoltesz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ plugins { | |
id 'java-library' | ||
} | ||
|
||
version = "7.2.0" | ||
version = "7.3.0" | ||
|
||
repositories { | ||
mavenCentral() | ||
|
327 changes: 327 additions & 0 deletions
327
src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,327 @@ | ||
/* | ||
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. | ||
* | ||
* This software is licensed under the Apache License, Version 2.0 (the | ||
* "License") as published by the Apache Software Foundation. | ||
* | ||
* You may not use this file except in compliance with the License. You may | ||
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.supertokens.storage.postgresql; | ||
|
||
import java.sql.*; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
import java.util.concurrent.Executor; | ||
|
||
/** | ||
* BulkImportProxyConnection is a class implementing the Connection interface, serving as a Connection instance in the bulk import user cronjob. | ||
* This cron extensively utilizes existing queries to import users, all of which internally operate within transactions and those query sometimes | ||
* call the commit/rollback method on the connection. | ||
* | ||
* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures. | ||
* To achieve this, we use our own proxy Connection instance and override the commit/rollback/close methods to do nothing. | ||
*/ | ||
|
||
public class BulkImportProxyConnection implements Connection { | ||
private Connection con = null; | ||
|
||
public BulkImportProxyConnection(Connection con) { | ||
this.con = con; | ||
} | ||
|
||
@Override | ||
public void close() throws SQLException { | ||
//this.con.close(); // why are we against the close? | ||
} | ||
|
||
@Override | ||
public void commit() throws SQLException { | ||
//this.con.commit(); | ||
} | ||
|
||
@Override | ||
public void rollback() throws SQLException { | ||
//this.con.rollback(); | ||
} | ||
|
||
public void closeForBulkImportProxyStorage() throws SQLException { | ||
this.con.close(); | ||
} | ||
|
||
public void commitForBulkImportProxyStorage() throws SQLException { | ||
this.con.commit(); | ||
} | ||
|
||
public void rollbackForBulkImportProxyStorage() throws SQLException { | ||
this.con.rollback(); | ||
} | ||
|
||
/* Following methods are unchaged */ | ||
|
||
@Override | ||
public Statement createStatement() throws SQLException { | ||
return this.con.createStatement(); | ||
} | ||
|
||
@Override | ||
public PreparedStatement prepareStatement(String sql) throws SQLException { | ||
return this.con.prepareStatement(sql); | ||
} | ||
|
||
@Override | ||
public CallableStatement prepareCall(String sql) throws SQLException { | ||
return this.con.prepareCall(sql); | ||
} | ||
|
||
@Override | ||
public String nativeSQL(String sql) throws SQLException { | ||
return this.con.nativeSQL(sql); | ||
} | ||
|
||
@Override | ||
public void setAutoCommit(boolean autoCommit) throws SQLException { | ||
this.con.setAutoCommit(autoCommit); | ||
} | ||
|
||
@Override | ||
public boolean getAutoCommit() throws SQLException { | ||
return this.con.getAutoCommit(); | ||
} | ||
|
||
@Override | ||
public boolean isClosed() throws SQLException { | ||
return this.con.isClosed(); | ||
} | ||
|
||
@Override | ||
public DatabaseMetaData getMetaData() throws SQLException { | ||
return this.con.getMetaData(); | ||
} | ||
|
||
@Override | ||
public void setReadOnly(boolean readOnly) throws SQLException { | ||
this.con.setReadOnly(readOnly); | ||
} | ||
|
||
@Override | ||
public boolean isReadOnly() throws SQLException { | ||
return this.con.isReadOnly(); | ||
} | ||
|
||
@Override | ||
public void setCatalog(String catalog) throws SQLException { | ||
this.con.setCatalog(catalog); | ||
} | ||
|
||
@Override | ||
public String getCatalog() throws SQLException { | ||
return this.con.getCatalog(); | ||
} | ||
|
||
@Override | ||
public void setTransactionIsolation(int level) throws SQLException { | ||
this.con.setTransactionIsolation(level); | ||
} | ||
|
||
@Override | ||
public int getTransactionIsolation() throws SQLException { | ||
return this.con.getTransactionIsolation(); | ||
} | ||
|
||
@Override | ||
public SQLWarning getWarnings() throws SQLException { | ||
return this.con.getWarnings(); | ||
} | ||
|
||
@Override | ||
public void clearWarnings() throws SQLException { | ||
this.con.clearWarnings(); | ||
} | ||
|
||
@Override | ||
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { | ||
return this.con.createStatement(resultSetType, resultSetConcurrency); | ||
} | ||
|
||
@Override | ||
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) | ||
throws SQLException { | ||
return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency); | ||
} | ||
|
||
@Override | ||
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { | ||
return this.con.prepareCall(sql, resultSetType, resultSetConcurrency); | ||
} | ||
|
||
@Override | ||
public Map<String, Class<?>> getTypeMap() throws SQLException { | ||
return this.con.getTypeMap(); | ||
} | ||
|
||
@Override | ||
public void setTypeMap(Map<String, Class<?>> map) throws SQLException { | ||
this.con.setTypeMap(map); | ||
} | ||
|
||
@Override | ||
public void setHoldability(int holdability) throws SQLException { | ||
this.con.setHoldability(holdability); | ||
} | ||
|
||
@Override | ||
public int getHoldability() throws SQLException { | ||
return this.con.getHoldability(); | ||
} | ||
|
||
@Override | ||
public Savepoint setSavepoint() throws SQLException { | ||
return this.con.setSavepoint(); | ||
} | ||
|
||
@Override | ||
public Savepoint setSavepoint(String name) throws SQLException { | ||
return this.con.setSavepoint(name); | ||
} | ||
|
||
@Override | ||
public void rollback(Savepoint savepoint) throws SQLException { | ||
this.con.rollback(savepoint); | ||
} | ||
|
||
@Override | ||
public void releaseSavepoint(Savepoint savepoint) throws SQLException { | ||
this.con.releaseSavepoint(savepoint); | ||
} | ||
|
||
@Override | ||
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) | ||
throws SQLException { | ||
return this.con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); | ||
} | ||
|
||
@Override | ||
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, | ||
int resultSetHoldability) throws SQLException { | ||
return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); | ||
} | ||
|
||
@Override | ||
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, | ||
int resultSetHoldability) throws SQLException { | ||
return this.con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); | ||
} | ||
|
||
@Override | ||
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { | ||
return this.con.prepareStatement(sql, autoGeneratedKeys); | ||
} | ||
|
||
@Override | ||
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { | ||
return this.con.prepareStatement(sql, columnIndexes); | ||
} | ||
|
||
@Override | ||
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { | ||
return this.con.prepareStatement(sql, columnNames); | ||
} | ||
|
||
@Override | ||
public Clob createClob() throws SQLException { | ||
return this.con.createClob(); | ||
} | ||
|
||
@Override | ||
public Blob createBlob() throws SQLException { | ||
return this.con.createBlob(); | ||
} | ||
|
||
@Override | ||
public NClob createNClob() throws SQLException { | ||
return this.con.createNClob(); | ||
} | ||
|
||
@Override | ||
public SQLXML createSQLXML() throws SQLException { | ||
return this.con.createSQLXML(); | ||
} | ||
|
||
@Override | ||
public boolean isValid(int timeout) throws SQLException { | ||
return this.con.isValid(timeout); | ||
} | ||
|
||
@Override | ||
public void setClientInfo(String name, String value) throws SQLClientInfoException { | ||
this.con.setClientInfo(name, value); | ||
} | ||
|
||
@Override | ||
public void setClientInfo(Properties properties) throws SQLClientInfoException { | ||
this.con.setClientInfo(properties); | ||
} | ||
|
||
@Override | ||
public String getClientInfo(String name) throws SQLException { | ||
return this.con.getClientInfo(name); | ||
} | ||
|
||
@Override | ||
public Properties getClientInfo() throws SQLException { | ||
return this.con.getClientInfo(); | ||
} | ||
|
||
@Override | ||
public Array createArrayOf(String typeName, Object[] elements) throws SQLException { | ||
return this.con.createArrayOf(typeName, elements); | ||
} | ||
|
||
@Override | ||
public Struct createStruct(String typeName, Object[] attributes) throws SQLException { | ||
return this.con.createStruct(typeName, attributes); | ||
} | ||
|
||
@Override | ||
public void setSchema(String schema) throws SQLException { | ||
this.con.setSchema(schema); | ||
} | ||
|
||
@Override | ||
public String getSchema() throws SQLException { | ||
return this.con.getSchema(); | ||
} | ||
|
||
@Override | ||
public void abort(Executor executor) throws SQLException { | ||
this.con.abort(executor); | ||
} | ||
|
||
@Override | ||
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { | ||
this.con.setNetworkTimeout(executor, milliseconds); | ||
} | ||
|
||
@Override | ||
public int getNetworkTimeout() throws SQLException { | ||
return this.con.getNetworkTimeout(); | ||
} | ||
|
||
@Override | ||
public <T> T unwrap(Class<T> iface) throws SQLException { | ||
return this.con.unwrap(iface); | ||
} | ||
|
||
@Override | ||
public boolean isWrapperFor(Class<?> iface) throws SQLException { | ||
return this.con.isWrapperFor(iface); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.