Skip to content

Commit aa89618

Browse files
Address translator support and configuration related documentation (#12)
* support for record to table mapping config * some minor bug fixes * error handling and ssl config support * some typo mistake fix * support for address translator * documentation changes for new configs * resolved review comments * renamed ComposeAddressTranslator to ClusterAddressTranslator * addressed review comments
1 parent e177741 commit aa89618

File tree

6 files changed

+295
-72
lines changed

6 files changed

+295
-72
lines changed

config/scylladb-sink-quickstart.properties

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,15 @@ name=scylladb-sink-test
33
topics=<comma-separated-kafka-topics-name>
44
tasks.max=1
55
connector.class=io.connect.scylladb.ScyllaDbSinkConnector
6+
67
scylladb.contact.points=<scylladb-hosts>
8+
#Eg. scylladb.contact.points=10.0.24.69,10.0.24.70,10.0.24.71
9+
# configure this to the public hostname of the Scylla nodes, the port will be taken from configuration scylladb.port
10+
11+
#scylladb.contact.points={\"private_host1:port1\",\"public_host1:port1\", \"private_host2:port2\",\"public_host2:port2\", ...}
12+
#Eg. scylladb.contact.points={\"10.0.24.69:9042\": \"sl-eu-lon-2-portal.3.dblayer.com:15227\", \"10.0.24.71:9042\": \"sl-eu-lon-2-portal.2.dblayer.com:15229\", \"10.0.24.70:9042\": \"sl-eu-lon-2-portal.1.dblayer.com:15228\"}
13+
# configure this to a JSON string having key-values pairs of internal private network address(es) mapped to external network address(es).
14+
715
scylladb.keyspace=<keyspace-name>
816

917
### Connection based configs:
@@ -20,15 +28,29 @@ scylladb.keyspace=<keyspace-name>
2028
#scylladb.keyspace.replication.factor=3
2129

2230
### SSL based configs:
23-
#scylladb.ssl.truststore.path=<truststore-path>
24-
#scylladb.ssl.truststore.password=<truststore-passsword>
2531
#scylladb.ssl.provider=JDK
32+
#scylladb.ssl.truststore.path=<truststore-path>
33+
#scylladb.ssl.truststore.password=<truststore-password>
34+
#scylladb.ssl.keystore.path=<keystore-path>
35+
#scylladb.ssl.keystore.password=<keystore-password>
36+
#scylladb.ssl.cipherSuites=<cipher-suites-to-enable>
37+
#scylladb.ssl.openssl.keyCertChain=<ssl-certificate-path>
38+
#ssl.openssl.privateKey=<privateKey-path>
39+
40+
### ScyllaDB related configs:
41+
#behavior.on.error=FAIL
2642

2743
### Table related configs:
2844
#scylladb.table.manage.enabled=true
2945
#scylladb.table.create.compression.algorithm=NONE
3046
#scylladb.offset.storage.table=kafka_connect_offsets
3147

48+
### Topic to table related configs:
49+
#topic.my_topic.my_ks.my_table.mapping=column1=key.field1, column2=value.field1, __ttl=value.field2, __timestamp=value.field3, column3=header.field1
50+
#topic.my_topic.my_ks.my_table.consistencyLevel=LOCAL_ONE
51+
#topic.my_topic.my_ks.my_table.ttlSeconds=1
52+
#topic.my_topic.my_ks.my_table.deletesEnabled=true
53+
3254
### Writer configs
3355
#scylladb.consistency.level=LOCAL_QUORUM
3456
#scylladb.deletes.enabled=true

documentation/CONFIG.md

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@ Connector-specific configuration properties are described below.
1313

1414
``scylladb.contact.points``
1515

16-
The ScyllaDB hosts to connect to. Scylla nodes use this list of hosts to find each other and learn the topology of the ring. You must change this if you are running multiple nodes.
17-
It's essential to put at least 2 hosts in case of bigger cluster, since if first host is down, it will contact second one and get the state of the cluster from it.
18-
Eg. When using the docker image, connect to the host it uses.
16+
The ScyllaDB hosts to connect to. Scylla nodes use this list of hosts to find each other and learn the topology of the ring.
17+
You must change this if you are running multiple nodes.
18+
It's essential to put at least 2 hosts in case of bigger cluster, since if first host is down,
19+
it will contact second one and get the state of the cluster from it.
20+
Eg. When using the docker image, connect to the host it uses.
21+
To connect to private Scylla nodes, provide a JSON string having all internal private network address:port mapped to
22+
an external network address:port as key value pairs. Need to pass it as
23+
{\"private_host1:port1\",\"public_host1:port1\",\"private_host2:port2\",\"public_host2:port2\", ...}
24+
Eg. {\"10.0.24.69:9042\": \"sl-eu-lon-2-portal.3.dblayer.com:15227\", \"10.0.24.71:9042\": \"sl-eu-lon-2-portal.2.dblayer.com:15229\", \"10.0.24.70:9042\": \"sl-eu-lon-2-portal.1.dblayer.com:15228\"}
1925

2026
* Type: List
2127
* Importance: High
@@ -32,7 +38,9 @@ Connector-specific configuration properties are described below.
3238
* Valid Values: ValidPort{start=1, end=65535}
3339

3440
``scylladb.loadbalancing.localdc``
35-
The case-sensitive Data Center name local to the machine on which the connector is running. It is a recommended config if we have more than one DC.
41+
42+
The case-sensitive Data Center name local to the machine on which the connector is running.
43+
It is a recommended configuration if we have more than one DC.
3644

3745
* Type: string
3846
* Default: ""
@@ -44,7 +52,7 @@ Connector-specific configuration properties are described below.
4452

4553
* Type: Boolean
4654
* Importance: High
47-
* Default Value: False
55+
* Default Value: false
4856

4957
``scylladb.username``
5058

@@ -82,28 +90,38 @@ Connector-specific configuration properties are described below.
8290
###SSL
8391

8492
``scylladb.ssl.truststore.path``
93+
8594
Path to the Java Truststore.
8695

8796
* Type: string
8897
* Default: ""
8998
* Importance: medium
9099

91100
``scylladb.ssl.truststore.password``
101+
92102
Password to open the Java Truststore with.
93103

94104
* Type: password
95105
* Default: [hidden]
96106
* Importance: medium
97107

98108
``scylladb.ssl.provider``
109+
99110
The SSL Provider to use when connecting to ScyllaDB.
100111

101112
* Type: string
102113
* Default: JDK
103114
* Valid Values: [JDK, OPENSSL, OPENSSL_REFCNT]
104115
* Importance: low
105116

106-
### Keyspace
117+
###Keyspace
118+
119+
**Note**: Both keyspace and table names consist of only alphanumeric characters,
120+
cannot be empty and are limited in size to 48 characters (that limit exists
121+
mostly to avoid filenames, which may include the keyspace and table name,
122+
to go over the limits of certain file systems). By default, keyspace and table names
123+
are case insensitive (myTable is equivalent to mytable) but case sensitivity
124+
can be forced by using double-quotes ("myTable" is different from mytable).
107125

108126
``scylladb.keyspace``
109127

@@ -158,6 +176,39 @@ Connector-specific configuration properties are described below.
158176
* Type: String
159177
* Importance: Low
160178
* Default: kafka_connect_offsets
179+
180+
###Topic to Table
181+
182+
These configurations can be specified for multiple Kafka topics from which records are being processed.
183+
Also, these topic level configurations will be override the behavior of Connector level configurations such as
184+
``scylladb.consistency.level``, ``scylladb.deletes.enabled`` and ``scylladb.ttl``
185+
186+
``topic.my_topic.my_ks.my_table.mapping``
187+
188+
For mapping topic and fields from Kafka record's key, value and headers to ScyllaDB table and its columns.
189+
190+
**Note**: Ensure that the data type of the Kafka record's fields are compatible with the data type of the ScyllaDB column.
191+
In the Kafka topic mapping, you can optionally specify which column should be used as the ttl (time-to-live) and
192+
timestamp of the record being inserted into the database table using the special property __ttl and __timestamp.
193+
By default, the database internally tracks the write time(timestamp) of records inserted into Kafka.
194+
However, this __timestamp feature in the mapping supports the scenario where the Kafka records have an explicit
195+
timestamp field that you want to use as a write time for the database record produced by the connector.
196+
Eg. "topic.my_topic.my_ks.my_table.mapping":
197+
"column1=key.field1, column2=value.field1, __ttl=value.field2, __timestamp=value.field3, column3=header.field1"
198+
199+
``topic.my_topic.my_ks.my_table.consistencyLevel``
200+
201+
By using this property we can specify table wide consistencyLevel.
202+
203+
``topic.my_topic.my_ks.my_table.ttlSeconds``
204+
205+
By using this property we can specify table wide ttl(time-to-live).
206+
207+
``topic.my_topic.my_ks.my_table.deletesEnabled``
208+
209+
By using this property we can specify if tombstone records(records with Kafka value as null)
210+
should processed as delete request.
211+
161212

162213
###Write
163214

@@ -171,6 +222,7 @@ Connector-specific configuration properties are described below.
171222
* Valid Values: ``ANY``, ``ONE``, ``TWO``, ``THREE``, ``QUORUM``, ``ALL``, ``LOCAL_QUORUM``, ``EACH_QUORUM``, ``SERIAL``, ``LOCAL_SERIAL``, ``LOCAL_ONE``
172223

173224
``scylladb.deletes.enabled``
225+
174226
Flag to determine if the connector should process deletes.
175227
The Kafka records with kafka record value as null will result in deletion of ScyllaDB record
176228
with the primary key present in Kafka record key.
@@ -206,6 +258,7 @@ Connector-specific configuration properties are described below.
206258
* Default Value: True
207259

208260
``scylladb.max.batch.size.kb``
261+
209262
Maximum size(in kilobytes) of a single batch consisting ScyllaDB operations. This should be equal to
210263
batch_size_warn_threshold_in_kb and 1/10th of the batch_size_fail_threshold_in_kb configured in scylla.yaml.
211264
The default value is set to 5kb, any change in this configuration should be accompanied by change in scylla.yaml.
@@ -224,6 +277,27 @@ Connector-specific configuration properties are described below.
224277
* Importance: Low
225278
* Valid Values: [0,...]
226279
* Default Value: 0
280+
281+
###ScyllaDB
282+
283+
``behavior.on.error``
284+
285+
Error handling behavior setting. Must be configured to one of the following:
286+
287+
``fail``
288+
The Connector throws ConnectException and stops processing records when an error occurs while processing or inserting records into ScyllaDB.
289+
290+
``ignore``
291+
Continues to process next set of records when error occurs while processing or inserting records into ScyllaDB.
292+
293+
``log``
294+
Logs the error via connect-reporter when an error occurs while processing or inserting records into ScyllaDB and continues to process next set of records, available in the kafka topics.
295+
296+
* Type: string
297+
* Default: FAIL
298+
* Valid Values: [FAIL, LOG, IGNORE]
299+
* Importance: medium
300+
227301

228302
###Confluent Platform Configurations.
229303

@@ -232,6 +306,7 @@ Connector-specific configuration properties are described below.
232306
The maximum number of tasks to use for the connector that helps in parallelism.
233307

234308
* Type:int
309+
* Default: 1
235310
* Importance: high
236311

237312
``topics``
@@ -246,6 +321,7 @@ The name of the topics to consume data from and write to ScyllaDB.
246321
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster used for licensing. All servers in the cluster will be discovered from the initial connection. This list should be in the form <code>host1:port1,host2:port2,…</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
247322

248323
* Type: list
324+
* Default: localhost:9092
249325
* Importance: high
250326

251327
------------------------

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,12 @@
196196
<version>${netty-tcnative.version}</version>
197197
</dependency>
198198

199+
<dependency>
200+
<groupId>org.json</groupId>
201+
<artifactId>json</artifactId>
202+
<version>20160810</version>
203+
</dependency>
204+
199205
<dependency>
200206
<groupId>org.apache.kafka</groupId>
201207
<artifactId>connect-runtime</artifactId>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright (c) 2016 Compose, an IBM company
2+
//
3+
// MIT License
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining
6+
// a copy of this software and associated documentation files (the
7+
// "Software"), to deal in the Software without restriction, including
8+
// without limitation the rights to use, copy, modify, merge, publish,
9+
// distribute, sublicense, and/or sell copies of the Software, and to
10+
// permit persons to whom the Software is furnished to do so, subject to
11+
// the following conditions:
12+
//
13+
// The above copyright notice and this permission notice shall be
14+
// included in all copies or substantial portions of the Software.
15+
//
16+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17+
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19+
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20+
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21+
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22+
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23+
24+
package io.connect.scylladb;
25+
26+
import com.datastax.driver.core.Cluster;
27+
import com.datastax.driver.core.policies.AddressTranslator;
28+
import java.net.InetSocketAddress;
29+
import java.util.Collection;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.Iterator;
33+
import java.util.Map;
34+
import org.json.*;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
class ClusterAddressTranslator implements AddressTranslator {
39+
40+
public Map<InetSocketAddress, InetSocketAddress> addressMap = new HashMap<>();
41+
private static final Logger log = LoggerFactory.getLogger(ClusterAddressTranslator.class);
42+
43+
@Override
44+
public void init(Cluster cluster) {
45+
}
46+
47+
public void setMap(String addressMapString) {
48+
JSONObject jsonmap;
49+
50+
if (addressMapString.charAt(0) == '[') {
51+
JSONArray jsonarray = new JSONArray(addressMapString);
52+
log.trace("Address translation map: " + jsonarray.toString());
53+
Iterator jai = jsonarray.iterator();
54+
55+
while (jai.hasNext()) {
56+
JSONObject element = (JSONObject) jai.next();
57+
Iterator subpart = element.keys();
58+
String internal = (String) subpart.next();
59+
String external = element.getString(internal);
60+
addAddresses(internal, external);
61+
}
62+
} else {
63+
jsonmap = new JSONObject(addressMapString);
64+
Iterator keys = jsonmap.keys();
65+
while (keys.hasNext()) {
66+
String internal = (String) keys.next();
67+
String external = (String) jsonmap.getString(internal);
68+
addAddresses(internal, external);
69+
}
70+
}
71+
}
72+
73+
public void addAddresses(String internal, String external) {
74+
String[] internalhostport = internal.split(":");
75+
String[] externalhostport = external.split(":");
76+
InetSocketAddress internaladdress = new InetSocketAddress(internalhostport[0], Integer.parseInt(internalhostport[1]));
77+
InetSocketAddress externaladdress = new InetSocketAddress(externalhostport[0], Integer.parseInt(externalhostport[1]));
78+
addressMap.put(internaladdress, externaladdress);
79+
}
80+
81+
public Collection<InetSocketAddress> getContactPoints() {
82+
return Collections.unmodifiableCollection(addressMap.values());
83+
}
84+
85+
@Override
86+
public InetSocketAddress translate(final InetSocketAddress inetSocketAddress) {
87+
return addressMap.getOrDefault(inetSocketAddress, inetSocketAddress);
88+
}
89+
90+
@Override
91+
public void close() {
92+
}
93+
}

src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import io.netty.handler.ssl.SslContext;
1313
import io.netty.handler.ssl.SslContextBuilder;
1414
import org.apache.kafka.connect.errors.ConnectException;
15+
import org.json.JSONObject;
16+
import org.json.JSONException;
1517
import org.slf4j.Logger;
1618
import org.slf4j.LoggerFactory;
1719

@@ -43,10 +45,18 @@ public class ScyllaDbSessionFactory {
4345

4446
public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
4547
Cluster.Builder clusterBuilder = Cluster.builder()
46-
.withPort(config.port)
47-
.addContactPoints(config.contactPoints)
4848
.withProtocolVersion(ProtocolVersion.NEWEST_SUPPORTED)
4949
.withCodecRegistry(CODEC_REGISTRY);
50+
51+
try {
52+
configureAddressTranslator(config, clusterBuilder);
53+
} catch (JSONException e) {
54+
log.info("Failed to configure address translator, provide a valid JSON string " +
55+
"with external network address and port mapped to private network " +
56+
"address and port.");
57+
configurePublicContactPoints(config, clusterBuilder);
58+
}
59+
5060
if (!config.loadBalancingLocalDc.isEmpty()) {
5161
clusterBuilder.withLoadBalancingPolicy(
5262
DCAwareRoundRobinPolicy.builder()
@@ -131,6 +141,22 @@ public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
131141
return new ScyllaDbSessionImpl(config, cluster, session);
132142
}
133143

144+
private void configurePublicContactPoints(ScyllaDbSinkConnectorConfig config, Cluster.Builder clusterBuilder) {
145+
log.info("Configuring public contact points={}", config.contactPoints);
146+
String[] contactPointsArray = config.contactPoints.split(",");
147+
clusterBuilder.withPort(config.port)
148+
.addContactPoints(contactPointsArray);
149+
}
150+
151+
private void configureAddressTranslator(ScyllaDbSinkConnectorConfig config, Cluster.Builder clusterBuilder) {
152+
log.info("Trying to configure address translator for private network address and port.");
153+
new JSONObject(config.contactPoints);
154+
ClusterAddressTranslator translator = new ClusterAddressTranslator();
155+
translator.setMap(config.contactPoints);
156+
clusterBuilder.addContactPointsWithPorts(translator.getContactPoints())
157+
.withAddressTranslator(translator);
158+
}
159+
134160
private KeyStore createKeyStore(File path, char[] password) {
135161
KeyStore keyStore;
136162
try {

0 commit comments

Comments
 (0)