Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/bindings.properties
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ solr7:site.ycsb.db.solr7.SolrClient
tarantool:site.ycsb.db.TarantoolClient
tablestore:site.ycsb.db.tablestore.TableStoreClient
voltdb:site.ycsb.db.voltdb.VoltClient4
zookeeper:site.ycsb.db.zookeeper.ZKClient
zookeeper:site.ycsb.db.zookeeper.ZKClient
oxia:site.ycsb.db.oxia.OxiaClient
3 changes: 2 additions & 1 deletion bin/ycsb
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ DATABASES = {
"solr7" : "site.ycsb.db.solr7.SolrClient",
"tarantool" : "site.ycsb.db.TarantoolClient",
"tablestore" : "site.ycsb.db.tablestore.TableStoreClient",
"zookeeper" : "site.ycsb.db.zookeeper.ZKClient"
"zookeeper" : "site.ycsb.db.zookeeper.ZKClient",
"oxia" : "site.ycsb.db.oxia.OxiaClient"
}

OPTIONS = {
Expand Down
7 changes: 6 additions & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,13 @@ LICENSE file.
<artifactId>zookeeper-binding</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>site.ycsb</groupId>
<artifactId>oxia-binding</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
98 changes: 98 additions & 0 deletions oxia/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020 YCSB contributors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>site.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.18.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>

<artifactId>oxia-binding</artifactId>
<name>StreamNative Oxia Binding</name>
<packaging>jar</packaging>


<dependencies>
<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-client</artifactId>
<version>${oxia.version}</version>
</dependency>
<dependency>
<groupId>site.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
<version>${oxia.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-buffer -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.111.Final</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
175 changes: 175 additions & 0 deletions oxia/src/main/java/site/ycsb/db/oxia/OxiaClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package site.ycsb.db.oxia;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.GetOption;
import io.streamnative.oxia.client.api.GetResult;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.api.PutOption;
import site.ycsb.*;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutionException;

/**
* StreamNative Oxia client for YCSB framework.
*/
public final class OxiaClient extends DB {
private AsyncOxiaClient oxiaClient;


@Override
public void init() throws DBException {
final Properties properties = getProperties();
final var serviceURL = (String) properties.getOrDefault(CONF_KEY_SERVER_URL,
CONF_VALUE_DEFAULT_SERVER_URL);
final var namespace = (String) properties.getOrDefault(CONF_KEY_NAMESPACE,
CONF_VALUE_DEFAULT_NAMESPACE);
try {
this.oxiaClient =
OxiaClientBuilder.create(serviceURL).namespace(namespace)
.asyncClient().get();
} catch (InterruptedException | ExecutionException ex) {
throw new DBException(ex);
}
}

@Override
public void cleanup() throws DBException {
try {
oxiaClient.close();
} catch (Exception ex) {
throw new DBException(ex);
}
}

@Override
public Status read(String table, String key, Set<String> fields,
Map<String, ByteIterator> output) {
final GetResult result =
oxiaClient.get(key, Set.of(GetOption.PartitionKey(key))).join();
if (result == null) {
return Status.NOT_FOUND;
}
deserializeWithFilter(result.getValue(), output, fields);
return Status.OK;
}

@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
return Status.NOT_IMPLEMENTED;
}

@Override
public Status update(String table, String key,
Map<String, ByteIterator> values) {
final GetResult result =
oxiaClient.get(key, Set.of(GetOption.PartitionKey(key))).join();
final Map<String, ByteIterator> existingMap = new HashMap<>();
deserializeWithFilter(result.getValue(), existingMap, null);
existingMap.putAll(values);
final byte[] payload = serializeByMap(existingMap);
try {
// todo: batch operation
oxiaClient.put(key, payload,
Set.of(PutOption.PartitionKey(key))).join();
return Status.OK;
} catch (Throwable ex) {
return Status.ERROR;
}
}

@Override
public Status insert(String table, String key,
Map<String, ByteIterator> values) {
final byte[] payload = serializeByMap(values);
try {
// todo: batch operation
oxiaClient.put(key, payload,
Set.of(PutOption.PartitionKey(key))).join();
return Status.OK;
} catch (Throwable ex) {
return Status.ERROR;
}
}

@Override
public Status delete(String table, String key) {
if (oxiaClient.delete(key).join()) {
return Status.OK;
} else {
return Status.NOT_FOUND;
}
}

/*
* The binary serializer for YCSB payloads, the protocol format is as follows.
* <p>
* +--------------+-----------+--------------+------------+
* | field_length | field | value_length | value |
* +--------------+-----------+--------------+------------+
* | (4)Bytes | (n)Bytes | (4)Bytes | (n)Bytes |
* +--------------+-----------+--------------+------------+
*/

public static byte[] serializeByMap(
Map<String, ByteIterator> structurePayload) {
final ByteBuf b = ByteBufAllocator.DEFAULT.heapBuffer();
try {
structurePayload.forEach((field, value) -> {
final byte[] v = value.toArray();
final byte[] f = field.getBytes(StandardCharsets.UTF_8);
b.writeInt(f.length);
b.writeBytes(f);
b.writeInt(v.length);
b.writeBytes(v);
});
final byte[] payload = new byte[b.readableBytes()];
b.readBytes(payload);
return payload;
} finally {
b.release();
}
}


public static void deserializeWithFilter(byte[] arrays,
Map<String, ByteIterator> output,
Set<String> containField) {
final ByteBuf buf = Unpooled.wrappedBuffer(arrays);
// since oxia has CRC, we don't need check here.
// plus, it's testing, we can let it failed
while (buf.isReadable()) {
final int kl = buf.readInt();
final byte[] bKey = new byte[kl];
buf.readBytes(bKey);
final String field = new String(bKey, StandardCharsets.UTF_8);
final int vl = buf.readInt();

if (containField == null || containField.contains(field)) {
final byte[] bValue = new byte[vl];
buf.readBytes(bValue);
final var value = new ByteArrayByteIterator(bValue);
output.put(field, value);
} else {
// get rid of the value
buf.readerIndex(buf.readerIndex() + vl);
}
}
}

static final String CONF_KEY_SERVER_URL = "oxia.server";
static final String CONF_VALUE_DEFAULT_SERVER_URL = "localhost:6648";

static final String CONF_KEY_NAMESPACE = "oxia.namespace";
static final String CONF_VALUE_DEFAULT_NAMESPACE = "benchmark";
}
22 changes: 22 additions & 0 deletions oxia/src/main/java/site/ycsb/db/oxia/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2020 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

/**
* The YCSB binding for StreamNative Oxia.
*/
package site.ycsb.db.oxia;

22 changes: 22 additions & 0 deletions oxia/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2020 YCSB contributors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
# Root logger option
log4j.rootLogger=INFO, stderr

# Direct log messages to stderr
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.Target=System.err
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Loading