Skip to content

Commit e177741

Browse files
Errorhandling and ssl support (#10)
behavior.on.error : Error handling behavior setting. Must be configured to one of the following: scylladb.ssl.keystore.path : Path to the Java Keystore. scylladb.ssl.keystore.password : Password to open the Java Keystore with. scylladb.ssl.cipherSuites : The cipher suites to enable. Defaults to none, resulting in a minimal quality of service according to JDK documentation. scylladb.ssl.openssl.keyCertChain : Path to the SSL certificate file, when using OpenSSL. ssl.openssl.privateKey : Path to the private key file, when using OpenSSL.
1 parent e301ee1 commit e177741

File tree

4 files changed

+270
-61
lines changed

4 files changed

+270
-61
lines changed

src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ DataType dataType(Schema schema) {
124124
dataType = DataType.varchar();
125125
break;
126126
default:
127-
throw new UnsupportedOperationException(
127+
throw new DataException(
128128
String.format("Unsupported type %s", schema.type())
129129
);
130130
}

src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717

18+
import javax.net.ssl.KeyManagerFactory;
1819
import javax.net.ssl.SSLException;
1920
import javax.net.ssl.TrustManagerFactory;
21+
import java.io.BufferedInputStream;
22+
import java.io.File;
2023
import java.io.FileInputStream;
24+
import java.io.FileNotFoundException;
2125
import java.io.IOException;
2226
import java.io.InputStream;
2327
import java.security.KeyStore;
2428
import java.security.KeyStoreException;
2529
import java.security.NoSuchAlgorithmException;
30+
import java.security.UnrecoverableKeyException;
2631
import java.security.cert.CertificateException;
2732

2833
public class ScyllaDbSessionFactory {
@@ -59,25 +64,13 @@ public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
5964

6065
if (null != config.trustStorePath) {
6166
log.info("Configuring SSLContext to use Truststore {}", config.trustStorePath);
62-
final KeyStore keyStore;
63-
try {
64-
keyStore = KeyStore.getInstance("JKS");
65-
try (InputStream inputStream = new FileInputStream(config.trustStorePath)) {
66-
keyStore.load(inputStream, config.trustStorePassword);
67-
} catch (IOException e) {
68-
throw new ConnectException("Exception while reading keystore", e);
69-
} catch (CertificateException | NoSuchAlgorithmException e) {
70-
throw new ConnectException("Exception while loading keystore", e);
71-
}
72-
} catch (KeyStoreException e) {
73-
throw new ConnectException("Exception while creating keystore", e);
74-
}
67+
final KeyStore trustKeyStore = createKeyStore(config.trustStorePath, config.trustStorePassword);
7568

7669
final TrustManagerFactory trustManagerFactory;
7770
try {
7871
trustManagerFactory =
7972
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
80-
trustManagerFactory.init(keyStore);
73+
trustManagerFactory.init(trustKeyStore);
8174
} catch (NoSuchAlgorithmException e) {
8275
throw new ConnectException("Exception while creating TrustManagerFactory", e);
8376
} catch (KeyStoreException e) {
@@ -86,6 +79,42 @@ public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
8679
sslContextBuilder.trustManager(trustManagerFactory);
8780
}
8881

82+
if (null != config.keyStorePath) {
83+
log.info("Configuring SSLContext to use Keystore {}", config.keyStorePath);
84+
final KeyStore keyStore = createKeyStore(config.keyStorePath, config.keyStorePassword);
85+
86+
final KeyManagerFactory keyManagerFactory;
87+
try {
88+
keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
89+
keyManagerFactory.init(keyStore, config.keyStorePassword);
90+
} catch (NoSuchAlgorithmException e) {
91+
throw new ConnectException("Exception while creating KeyManagerFactory", e);
92+
} catch (UnrecoverableKeyException | KeyStoreException e) {
93+
throw new ConnectException("Exception while calling KeyManagerFactory.init()", e);
94+
}
95+
sslContextBuilder.keyManager(keyManagerFactory);
96+
}
97+
98+
if (config.cipherSuites.size() > 0) {
99+
sslContextBuilder.ciphers(config.cipherSuites);
100+
}
101+
102+
if (config.certFilePath != null && config.privateKeyPath != null) {
103+
try {
104+
sslContextBuilder.keyManager(new BufferedInputStream(new FileInputStream(config.certFilePath)),
105+
new BufferedInputStream(new FileInputStream(config.privateKeyPath)));
106+
}
107+
catch (IllegalArgumentException e) {
108+
throw new ConnectException(String.format("Invalid certificate or private key: %s", e.getMessage()));
109+
} catch (FileNotFoundException e) {
110+
throw new ConnectException("Invalid certificate or private key file path", e);
111+
}
112+
} else if (config.certFilePath == null != (config.privateKeyPath == null)) {
113+
throw new ConnectException(String.format("%s cannot be set without %s and vice-versa: %s is not set",
114+
"scylladb.ssl.openssl.keyCertChain", "scylladb.ssl.openssl.privateKey",
115+
(config.certFilePath == null) ? "scylladb.ssl.openssl.keyCertChain" : "scylladb.ssl.openssl.privateKey"));
116+
}
117+
89118
final SslContext context;
90119
try {
91120
context = sslContextBuilder.build();
@@ -101,4 +130,21 @@ public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
101130
final Session session = cluster.newSession();
102131
return new ScyllaDbSessionImpl(config, cluster, session);
103132
}
133+
134+
private KeyStore createKeyStore(File path, char[] password) {
135+
KeyStore keyStore;
136+
try {
137+
keyStore = KeyStore.getInstance("JKS");
138+
try (InputStream inputStream = new FileInputStream(path)) {
139+
keyStore.load(inputStream, password);
140+
} catch (IOException e) {
141+
throw new ConnectException("Exception while reading keystore", e);
142+
} catch (CertificateException | NoSuchAlgorithmException e) {
143+
throw new ConnectException("Exception while loading keystore", e);
144+
}
145+
} catch (KeyStoreException e) {
146+
throw new ConnectException("Exception while creating keystore", e);
147+
}
148+
return keyStore;
149+
}
104150
}

src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.connect.scylladb;
22

33
import java.io.File;
4+
import java.util.Collections;
45
import java.util.HashMap;
56
import java.util.List;
67
import java.util.Map;
@@ -47,18 +48,23 @@ public class ScyllaDbSinkConnectorConfig extends AbstractConfig {
4748
public final TableOptions.CompressionOptions tableCompressionAlgorithm;
4849
public final char[] trustStorePassword;
4950
public final File trustStorePath;
51+
public final char[] keyStorePassword;
52+
public final File keyStorePath;
5053
public final String offsetStorageTable;
5154
public final long statementTimeoutMs;
5255
public final int maxBatchSizeKb;
5356
public final String loadBalancingLocalDc;
5457
public final long timestampResolutionMs;
5558
public final Map<String, TopicConfigs> topicWiseConfigs;
5659
public final Integer ttl;
60+
public final BehaviorOnError behaviourOnError;
61+
public final List<String> cipherSuites;
62+
public final File certFilePath;
63+
public final File privateKeyPath;
5764

5865
private static final Pattern TOPIC_KS_TABLE_SETTING_PATTERN =
5966
Pattern.compile("topic\\.([a-zA-Z0-9._-]+)\\.([^.]+|\"[\"]+\")\\.([^.]+|\"[\"]+\")\\.(mapping|consistencyLevel|ttlSeconds|deletesEnabled)$");
6067

61-
6268
static final Map<String, ProtocolOptions.Compression> CLIENT_COMPRESSION =
6369
ImmutableMap.of(
6470
"NONE", ProtocolOptions.Compression.NONE,
@@ -95,6 +101,18 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
95101
this.trustStorePassword =
96102
this.getPassword(SSL_TRUSTSTORE_PASSWORD_CONFIG).value().toCharArray();
97103

104+
final String keyStorePath = this.getString(SSL_KEYSTORE_PATH_CONFIG);
105+
this.keyStorePath = Strings.isNullOrEmpty(keyStorePath) ? null : new File(keyStorePath);
106+
this.keyStorePassword =
107+
this.getPassword(SSL_KEYSTORE_PASSWORD_CONFIG).value().toCharArray();
108+
109+
this.cipherSuites = getList(SSL_CIPHER_SUITES_CONFIG);
110+
111+
final String certFilePath = this.getString(SSL_OPENSLL_KEYCERTCHAIN_CONFIG);
112+
this.certFilePath = Strings.isNullOrEmpty(certFilePath) ? null : new File(certFilePath);
113+
final String privateKeyPath = this.getString(SSL_OPENSLL_PRIVATEKEY_CONFIG);
114+
this.privateKeyPath = Strings.isNullOrEmpty(privateKeyPath) ? null : new File(privateKeyPath);
115+
98116
final String compression = getString(COMPRESSION_CONFIG);
99117
this.compression = CLIENT_COMPRESSION.get(compression);
100118
this.sslProvider = ConfigUtils.getEnum(SslProvider.class, this, SSL_PROVIDER_CONFIG);
@@ -127,6 +145,8 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
127145
this.maxBatchSizeKb = getInt(MAX_BATCH_SIZE_CONFIG);
128146
this.loadBalancingLocalDc = getString(LOAD_BALANCING_LOCAL_DC_CONFIG);
129147
this.timestampResolutionMs = getLong(TIMESTAMP_RESOLUTION_MS_CONF);
148+
this.behaviourOnError = BehaviorOnError.valueOf(getString(BEHAVIOR_ON_ERROR_CONFIG).toUpperCase());
149+
130150
Map<String, Map<String, String>> topicWiseConfigsMap = new HashMap<>();
131151
for (final Map.Entry<String, String> entry : ((Map<String, String>) originals).entrySet()) {
132152
final String name2 = entry.getKey();
@@ -236,6 +256,22 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
236256
public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "scylladb.ssl.truststore.password";
237257
private static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password to open the Java Truststore with.";
238258

259+
public static final String SSL_KEYSTORE_PATH_CONFIG = "scylladb.ssl.keystore.path";
260+
private static final String SSL_KEYSTORE_PATH_DOC = "Path to the Java Keystore";
261+
262+
public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "scylladb.ssl.keystore.password";
263+
private static final String SSL_KEYSTORE_PASSWORD_DOC = "Password to open the Java Keystore with.";
264+
265+
public static final String SSL_CIPHER_SUITES_CONFIG = "scylladb.ssl.cipherSuites";
266+
private static final String SSL_CIPHER_SUITES_DOC = "The cipher suites to enable. "
267+
+ "Defaults to none, resulting in a ``minimal quality of service`` according to JDK documentation.";
268+
269+
public static final String SSL_OPENSLL_KEYCERTCHAIN_CONFIG = "scylladb.ssl.openssl.keyCertChain";
270+
private static final String SSL_OPENSLL_KEYCERTCHAIN_DOC = "Path to the SSL certificate file, when using OpenSSL.";
271+
272+
public static final String SSL_OPENSLL_PRIVATEKEY_CONFIG = "ssl.openssl.privateKey";
273+
private static final String SSL_OPENSLL_PRIVATEKEY_DOC = "Path to the private key file, when using OpenSSL.";
274+
239275
public static final String TTL_CONFIG = "scylladb.ttl";
240276
/*If TTL value is not specified then skip setting ttl value while making insert query*/
241277
public static final String TTL_DEFAULT = null;
@@ -263,6 +299,23 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
263299
+ "local to the machine on which the connector is running. It is a recommended config if "
264300
+ "we have more than one DC.";
265301

302+
public static final String BEHAVIOR_ON_ERROR_CONFIG = "behavior.on.error";
303+
public static final String BEHAVIOR_ON_ERROR_DEFAULT = BehaviorOnError.FAIL.name();
304+
private static final String BEHAVIOR_ON_ERROR_DISPLAY = "Behavior On Error";
305+
private static final String BEHAVIOR_ON_ERROR_DOC = "Error handling behavior setting. "
306+
+ "Must be configured to one of the following:\n"
307+
+ "``fail``\n"
308+
+ "The Connector throws ConnectException and stops processing records "
309+
+ "when an error occurs while processing or inserting records into ScyllDB.\n"
310+
+ "``ignore``\n"
311+
+ "Continues to process next set of records "
312+
+ "when error occurs while processing or inserting records into ScyllDB.\n"
313+
+ "``log``\n"
314+
+ "Logs the error via connect-reporter when an error occurs while processing or "
315+
+ "inserting records into ScyllDB and continues to process next set of records, "
316+
+ "available in the kafka topics.";
317+
318+
public static final String SCYLLADB_GROUP = "ScyllaDB";
266319
public static final String CONNECTION_GROUP = "Connection";
267320
public static final String SSL_GROUP = "SSL";
268321
public static final String KEYSPACE_GROUP = "Keyspace";
@@ -389,6 +442,56 @@ public static ConfigDef config() {
389442
"SSL Truststore Password")
390443
//TODO .validator(Validators.blankOr(ValidFile.of()))
391444
//TODO .recommender(Recommenders.visibleIf(SSL_ENABLED_CONFIG, true))
445+
.define(
446+
SSL_KEYSTORE_PATH_CONFIG,
447+
ConfigDef.Type.STRING,
448+
"",
449+
ConfigDef.Importance.MEDIUM,
450+
SSL_KEYSTORE_PATH_DOC,
451+
SSL_GROUP,
452+
2,
453+
ConfigDef.Width.SHORT,
454+
"SSL Keystore Path")
455+
.define(
456+
SSL_KEYSTORE_PASSWORD_CONFIG,
457+
ConfigDef.Type.PASSWORD,
458+
"password123",
459+
ConfigDef.Importance.MEDIUM,
460+
SSL_KEYSTORE_PASSWORD_DOC,
461+
SSL_GROUP,
462+
3,
463+
ConfigDef.Width.SHORT,
464+
"SSL Keystore Password")
465+
.define(
466+
SSL_CIPHER_SUITES_CONFIG,
467+
ConfigDef.Type.LIST,
468+
(Object) Collections.EMPTY_LIST,
469+
ConfigDef.Importance.HIGH,
470+
SSL_CIPHER_SUITES_DOC,
471+
SSL_GROUP,
472+
4,
473+
ConfigDef.Width.LONG,
474+
"The cipher suites to enable")
475+
.define(
476+
SSL_OPENSLL_KEYCERTCHAIN_CONFIG,
477+
ConfigDef.Type.STRING,
478+
"",
479+
ConfigDef.Importance.HIGH,
480+
SSL_OPENSLL_KEYCERTCHAIN_DOC,
481+
SSL_GROUP,
482+
5,
483+
ConfigDef.Width.SHORT,
484+
"The path to the certificate chain file")
485+
.define(
486+
SSL_OPENSLL_PRIVATEKEY_CONFIG,
487+
ConfigDef.Type.STRING,
488+
"",
489+
ConfigDef.Importance.HIGH,
490+
SSL_OPENSLL_PRIVATEKEY_DOC,
491+
SSL_GROUP,
492+
6,
493+
ConfigDef.Width.SHORT,
494+
"The path to the private key file")
392495
.define(
393496
CONSISTENCY_LEVEL_CONFIG,
394497
ConfigDef.Type.STRING,
@@ -524,7 +627,21 @@ public static ConfigDef config() {
524627
WRITE_GROUP,
525628
6,
526629
ConfigDef.Width.SHORT,
527-
"Timestamp Threshold in MS");
630+
"Timestamp Threshold in MS")
631+
.define(
632+
BEHAVIOR_ON_ERROR_CONFIG,
633+
ConfigDef.Type.STRING,
634+
BEHAVIOR_ON_ERROR_DEFAULT,
635+
ConfigDef.ValidString.in(BehaviorOnError.FAIL.name(),
636+
BehaviorOnError.LOG.name(), BehaviorOnError.IGNORE.name()),
637+
ConfigDef.Importance.MEDIUM,
638+
BEHAVIOR_ON_ERROR_DOC,
639+
SCYLLADB_GROUP,
640+
0,
641+
ConfigDef.Width.NONE,
642+
BEHAVIOR_ON_ERROR_DISPLAY
643+
//Recommenders.enumValues(BehaviorOnError.class)
644+
);
528645
}
529646

530647
private String tryMatchTopicName(final String name) {
@@ -535,6 +652,15 @@ private String tryMatchTopicName(final String name) {
535652
throw new IllegalArgumentException("The setting: " + name + " does not match topic.keyspace.table nor topic.codec regular expression pattern");
536653
}
537654

655+
/**
656+
* Enums for behavior on error.
657+
*/
658+
public enum BehaviorOnError {
659+
IGNORE,
660+
LOG,
661+
FAIL
662+
}
663+
538664
public boolean isOffsetEnabledInScyllaDb() {
539665
return getBoolean(ENABLE_OFFSET_STORAGE_TABLE);
540666
}

0 commit comments

Comments
 (0)