Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Implements Client-Side RowKey translation on import #140

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion com.pilosa.client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>

<dependency>
<groupId>com.pilosa</groupId>
<artifactId>roaring</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,37 @@ public void importRowKeyColumnIDTest() throws IOException {
}
}

@Test
public void fastImportRowKeyColumnIDTest() throws IOException {
try (PilosaClient client = this.getClient()) {
LineDeserializer deserializer = new RowKeyColumnIDDeserializer();
RecordIterator iterator = csvRecordIterator("row_key-column_id.csv", deserializer);
FieldOptions fieldOptions = FieldOptions.builder()
.setKeys(true)
.build();
Field field = this.index.field("importfield-rowkey-colid", fieldOptions);
client.ensureField(field);
ImportOptions importOptions = ImportOptions.builder()
.setRoaring(true)
.setTranslateKeys(true)
.build();
client.importField(field, iterator, importOptions);
PqlBatchQuery bq = index.batchQuery(
field.row("one"),
field.row("five"),
field.row("three")
);
QueryResponse response = client.query(bq);

List<Long> target = Arrays.asList(10L, 20L, 41L);
List<QueryResult> results = response.getResults();
for (int i = 0; i < results.size(); i++) {
RowResult br = results.get(i).getRow();
assertEquals(target.get(i), br.getColumns().get(0));
}
}
}

@Test
public void importRowKeyColumnKeyTest() throws IOException {
try (PilosaClient client = this.getClient()) {
Expand Down Expand Up @@ -834,7 +865,6 @@ public void importTestWithBatch() throws IOException {
Field field = this.index.field("importfield");
client.ensureField(field);
ImportOptions options = ImportOptions.builder().
setStrategy(ImportOptions.Strategy.BATCH).
setBatchSize(3).
setThreadCount(1).
build();
Expand Down Expand Up @@ -917,8 +947,6 @@ public void run() {
ImportOptions options = ImportOptions.builder()
.setBatchSize(100000)
.setThreadCount(2)
.setStrategy(ImportOptions.Strategy.TIMEOUT)
.setTimeoutMs(5)
.build();
client.importField(field, iterator, options, statusQueue);
monitorThread.interrupt();
Expand All @@ -942,7 +970,6 @@ public void run() {
this.client.ensureField(field);

ImportOptions options = ImportOptions.builder()
.setStrategy(ImportOptions.Strategy.BATCH)
.setBatchSize(500)
.setThreadCount(1)
.build();
Expand Down Expand Up @@ -989,7 +1016,6 @@ public void run() {
this.client.ensureField(field);

ImportOptions options = ImportOptions.builder()
.setStrategy(ImportOptions.Strategy.BATCH)
.setBatchSize(1_000)
.setThreadCount(1)
.build();
Expand Down Expand Up @@ -1037,7 +1063,6 @@ public void getEmptySchemaTest() throws IOException {
@Test
public void syncSchemaTest() throws IOException {
Index index = null;

try (PilosaClient client = this.getClient()) {
Schema schema = client.readSchema();
IndexOptions indexOptions = IndexOptions.builder()
Expand Down Expand Up @@ -1065,8 +1090,6 @@ public void syncSchemaTest() throws IOException {
Index index4 = schema4.index("index11", indexOptions);
client.syncSchema(schema4);
assertEquals(index, index4);


} finally {
try (PilosaClient client = this.getClient()) {
if (index != null) {
Expand Down Expand Up @@ -1195,6 +1218,25 @@ public void warningResponseTest() throws IOException, InterruptedException {
}
}

@Test
public void translateRowKeysTest() throws IOException {
try (PilosaClient client = getClient()) {
FieldOptions options = FieldOptions.builder()
.setKeys(true)
.build();
Field field = this.index.field("translate-rowkey-field", options);
client.syncSchema(this.schema);
client.query(this.index.batchQuery(
field.set("key1", 10),
field.set("key2", 1000)
));

List<Long> rowIDs = client.translateKeys(field, Arrays.asList("key1", "key2"));
List<Long> target = Arrays.asList(1L, 2L);
assertEquals(target, rowIDs);
}
}

@Test(expected = PilosaException.class)
public void importFailNot200() throws IOException {
HttpServer server = runImportFailsHttpServer();
Expand Down Expand Up @@ -1642,21 +1684,13 @@ private PilosaClient getClient() {
String bindAddress = getBindAddress();
Cluster cluster = Cluster.withHost(URI.address(bindAddress));
ClientOptions.Builder optionsBuilder = ClientOptions.builder();
long shardWidth = getShardWidth();
if (shardWidth > 0) {
optionsBuilder.setShardWidth(shardWidth);
}
return new InsecurePilosaClientIT(cluster, optionsBuilder.build());
}

private PilosaClient getClientManualAddress() {
String bindAddress = getBindAddress();
ClientOptions.Builder optionsBuilder = ClientOptions.builder()
.setManualServerAddress(true);
long shardWidth = getShardWidth();
if (shardWidth > 0) {
optionsBuilder.setShardWidth(shardWidth);
}
return new InsecurePilosaClientIT(bindAddress, optionsBuilder.build());
}

Expand All @@ -1668,11 +1702,6 @@ private String getBindAddress() {
return bindAddress;
}

private boolean isLegacyModeOff() {
String legacyModeOffStr = System.getenv("LEGACY_MODE_OFF");
return legacyModeOffStr != null && legacyModeOffStr.equals("true");
}

private long getShardWidth() {
String shardWidthStr = System.getenv("SHARD_WIDTH");
return (shardWidthStr == null) ? 0 : Long.parseLong(shardWidthStr);
Expand Down
2 changes: 1 addition & 1 deletion com.pilosa.client/src/internal/public.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ message Pair {
message FieldRow{
string Field = 1;
uint64 RowID = 2;
string RowKey = 3;
string RowKey = 3;
}

message GroupCount{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ public Builder setSslContext(SSLContext sslContext) {
return this;
}

public Builder setShardWidth(long shardWidth) {
this.shardWidth = shardWidth;
return this;
}

public Builder setManualServerAddress(boolean manualServerAddress) {
this.manualServerAddress = manualServerAddress;
return this;
Expand All @@ -138,7 +133,7 @@ public Builder setManualServerAddress(boolean manualServerAddress) {
public ClientOptions build() {
return new ClientOptions(this.socketTimeout, this.connectTimeout,
this.retryCount, this.connectionPoolSizePerRoute, this.connectionPoolTotalSize,
this.sslContext, this.shardWidth, this.manualServerAddress);
this.sslContext, this.manualServerAddress);
}

private int socketTimeout = 300000;
Expand Down Expand Up @@ -185,25 +180,20 @@ public SSLContext getSslContext() {
return this.sslContext;
}

public long getShardWidth() {
return this.shardWidth;
}

public boolean isManualServerAddress() {
return this.manualServerAddress;
}

private ClientOptions(final int socketTimeout, final int connectTimeout, final int retryCount,
final int connectionPoolSizePerRoute, final int connectionPoolTotalSize,
final SSLContext sslContext, final long shardWidth,
final SSLContext sslContext,
final boolean manualServerAddress) {
this.socketTimeout = socketTimeout;
this.connectTimeout = connectTimeout;
this.retryCount = retryCount;
this.connectionPoolSizePerRoute = connectionPoolSizePerRoute;
this.connectionPoolTotalSize = connectionPoolTotalSize;
this.sslContext = sslContext;
this.shardWidth = shardWidth;
this.manualServerAddress = manualServerAddress;
}

Expand All @@ -213,6 +203,5 @@ private ClientOptions(final int socketTimeout, final int connectTimeout, final i
private final int connectionPoolSizePerRoute;
private final int connectionPoolTotalSize;
private final SSLContext sslContext;
private final long shardWidth;
private final boolean manualServerAddress;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class FieldRow {
public static FieldRow create(String fieldName, long rowID) {
return new FieldRow(fieldName, rowID, "");
}

public static FieldRow create(String fieldName, String rowKey) {
return new FieldRow(fieldName, 0, rowKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,24 @@ private Builder() {
}

public ImportOptions build() {
return new ImportOptions(this.threadCount,
this.timeoutMs, this.batchSize, this.strategy,
this.roaring, this.clear);
return new ImportOptions(
this.threadCount,
this.batchSize,
this.roaring,
this.clear,
this.translateKeys);
}

public Builder setThreadCount(int threadCount) {
this.threadCount = threadCount;
return this;
}

public Builder setTimeoutMs(long timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}

public Builder setBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}

public Builder setStrategy(Strategy strategy) {
this.strategy = (strategy == Strategy.DEFAULT) ? Strategy.BATCH : strategy;
return this;
}

public Builder setRoaring(boolean roaring) {
this.roaring = roaring;
return this;
Expand All @@ -82,26 +75,28 @@ public Builder setClear(boolean clear) {
return this;
}

public Builder setTranslateKeys(boolean translateKeys) {
this.translateKeys = translateKeys;
return this;
}

private int threadCount = 1;
private long timeoutMs = 100;
private int batchSize = 100000;
private Strategy strategy = Strategy.BATCH;
private boolean roaring = false;
private boolean clear = false;
private boolean translateKeys = false;
}

private ImportOptions(int threadCount,
long timeoutMs,
int batchSize,
Strategy strategy,
boolean roaring,
boolean clear) {
boolean clear,
boolean translateKeys) {
this.threadCount = threadCount;
this.timeoutMs = timeoutMs;
this.batchSize = batchSize;
this.strategy = strategy;
this.roaring = roaring;
this.clear = clear;
this.translateKeys = translateKeys;
}

public static Builder builder() {
Expand All @@ -112,18 +107,10 @@ public int getThreadCount() {
return this.threadCount;
}

public long getTimeoutMs() {
return this.timeoutMs;
}

public int getBatchSize() {
return this.batchSize;
}

public Strategy getStrategy() {
return this.strategy;
}

public long getShardWidth() {
return ClientOptions.DEFAULT_SHARD_WIDTH;
}
Expand All @@ -136,10 +123,13 @@ public boolean isClear() {
return this.clear;
}

public boolean isTranslateKeys() {
return this.translateKeys;
}

final private int threadCount;
final private long timeoutMs;
final private int batchSize;
final private Strategy strategy;
final private boolean roaring;
final private boolean clear;
final private boolean translateKeys;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,24 @@
import static com.pilosa.client.PilosaClient.PQL_VERSION;

class ImportRequest {
ImportRequest(final String path, final byte[] payload, final String contentType) {
ImportRequest(final String path, final byte[] payload, final String contentType, final boolean roaring) {
this.path = path;
this.payload = payload;
this.contentType = contentType;
this.roaring = roaring;
}

static ImportRequest createCSVImport(final Field field, final byte[] payload, boolean clear) {
String clearStr = clear ? "?clear=true" : "";
String path = String.format("/index/%s/field/%s/import%s", field.getIndex().getName(), field.getName(), clearStr);
return new ImportRequest(path, payload, "application/x-protobuf");
return new ImportRequest(path, payload, "application/x-protobuf", false);
}

static ImportRequest createRoaringImport(final Field field, long shard, final byte[] payload, boolean clear) {
String clearStr = clear ? "?clear=true" : "";
String path = String.format("/index/%s/field/%s/import-roaring/%d%s",
field.getIndex().getName(), field.getName(), shard, clearStr);
return new ImportRequest(path, payload, "application/x-protobuf");
return new ImportRequest(path, payload, "application/x-protobuf", true);
}

String getPath() {
Expand All @@ -76,7 +77,12 @@ Header[] getHeaders() {
};
}

public boolean isRoaring() {
return this.roaring;
}

protected final String path;
protected final String contentType;
protected final byte[] payload;
protected final boolean roaring;
}
Loading