diff --git a/cassandra/README.md b/cassandra/README.md index 94c3a3d9a9..017d05eedc 100644 --- a/cassandra/README.md +++ b/cassandra/README.md @@ -1,5 +1,5 @@ -# Apache Cassandra 2.x CQL binding +# Apache Cassandra >= 2.1 CQL binding Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API -via the [DataStax -driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html). +via the [DataStax driver](https://docs.datastax.com/en/developer/java-driver/4.17/manual/index.html) To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding. ## Creating a table for use with YCSB -For keyspace `ycsb`, table `usertable`: - - cqlsh> create keyspace ycsb - WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 }; - cqlsh> USE ycsb; - cqlsh> create table usertable ( - y_id varchar primary key, - field0 varchar, - field1 varchar, - field2 varchar, - field3 varchar, - field4 varchar, - field5 varchar, - field6 varchar, - field7 varchar, - field8 varchar, - field9 varchar); - -**Note that `replication_factor` and consistency levels (below) will affect performance.** +For keyspace `ycsb`, table `usertable`, noting you should apply your own replication, +compaction and other settings based on your testing configuration. +```sql +CREATE KEYSPACE ycsb WITH REPLICATION = { + 'class' : 'SimpleStrategy', + 'replication_factor': 3 +}; +``` + +```sql +CREATE TABLE ycsb.usertable ( + y_id VARCHAR PRIMARY KEY, + field0 VARCHAR, + field1 VARCHAR, + field2 VARCHAR, + field3 VARCHAR, + field4 VARCHAR, + field5 VARCHAR, + field6 VARCHAR, + field7 VARCHAR, + field8 VARCHAR, + field9 VARCHAR +); +``` + +**Note that parameters like `replication_factor`, `compaction` etc. will affect performance.** ## Cassandra Configuration Parameters -- `hosts` (**required**) - - Cassandra nodes to connect to. - - No default. - -* `port` - * CQL port for communicating with Cassandra cluster. - * Default is `9042`. - -- `cassandra.keyspace` - Keyspace name - must match the keyspace for the table created (see above). - See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details. - - - Default value is `ycsb` - -- `cassandra.username` -- `cassandra.password` - - Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details. - -* `cassandra.readconsistencylevel` -* `cassandra.writeconsistencylevel` - - * Default value is `QUORUM` - - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. - -* `cassandra.maxconnections` -* `cassandra.coreconnections` - * Defaults for max and core connections can be found here: https://datastax.github.io/java-driver/2.1.8/features/pooling/#pool-size. Cassandra 2.0.X falls under protocol V2, Cassandra 2.1+ falls under protocol V3. -* `cassandra.connecttimeoutmillis` -* `cassandra.useSSL` - * Default value is false. - - To connect with SSL set this value to true. -* `cassandra.readtimeoutmillis` - * Defaults for connect and read timeouts can be found here: https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/SocketOptions.html. -* `cassandra.tracing` - * Default is false - * https://docs.datastax.com/en/cql/3.3/cql/cql_reference/tracing_r.html +* `cassandra.driverconfig` (**required**) + * Path to a HOCON configuration for configuring the Cassandra driver. + * See the reference configuration here . + +* `cassandra.driverprofile.` + * `` is one of `read`, `scan`, `insert`, `update` or `delete`. + * Defaults to anonymous global profile. + * Profile within the driver config to use for a specific operation. + * Driver configs have an anonymous config that all profiles inherit from (and you can modify this), which is used by default if this parameter is not specified. + +* `cassandra.tracing.` + * `` is one of `read`, `scan`, `insert`, `update` or `delete`. + * Default is `false`. + * Captures detailed information about the internal operations performed by all nodes in the cluster in order to build the response. + * This is an expensive operation and should only be done on a few queries, adjusted by `cassandra.tracingfrequency`. + +* `cassandra.tracingfrequency.` + * `` is one of `read`, `scan`, `insert`, `update` or `delete`. + * Default is `1000`. + * Determines how often tracing will be performed, i.e. for every `n` queries, tracing will be enabled on that query. diff --git a/cassandra/pom.xml b/cassandra/pom.xml index 46758f3acc..8fde85312d 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -38,10 +38,21 @@ LICENSE file. + + com.datastax.oss + java-driver-core + ${cassandra.cql.version} + + + com.datastax.oss + java-driver-query-builder + ${cassandra.cql.version} + com.datastax.cassandra cassandra-driver-core - ${cassandra.cql.version} + 3.0.0 + test site.ycsb @@ -59,8 +70,7 @@ LICENSE file. org.slf4j slf4j-simple - 1.7.21 - test + 1.7.25 junit diff --git a/cassandra/src/main/java/site/ycsb/db/CassandraCQLClient.java b/cassandra/src/main/java/site/ycsb/db/CassandraCQLClient.java index cc3b38e4fa..e03488eec5 100644 --- a/cassandra/src/main/java/site/ycsb/db/CassandraCQLClient.java +++ b/cassandra/src/main/java/site/ycsb/db/CassandraCQLClient.java @@ -14,223 +14,192 @@ * the License. See accompanying LICENSE file. * * Submitted by Chrisjan Matser on 10/11/2010. + * Updated by EngineersBox (Jack Kilrain) on 17/07/2024 */ package site.ycsb.db; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.Host; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.querybuilder.Insert; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.datastax.driver.core.querybuilder.Select; -import com.datastax.driver.core.querybuilder.Update; +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.*; +import com.datastax.oss.driver.api.core.cql.*; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.delete.Delete; +import com.datastax.oss.driver.api.querybuilder.insert.InsertInto; +import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert; +import com.datastax.oss.driver.api.querybuilder.relation.Relation; +import com.datastax.oss.driver.api.querybuilder.select.Select; +import com.datastax.oss.driver.api.querybuilder.update.Assignment; +import com.datastax.oss.driver.api.querybuilder.update.Update; +import com.datastax.oss.driver.api.querybuilder.update.UpdateStart; import site.ycsb.ByteArrayByteIterator; import site.ycsb.ByteIterator; import site.ycsb.DB; import site.ycsb.DBException; import site.ycsb.Status; +import java.io.File; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.Vector; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; /** - * Cassandra 2.x CQL client. + * Cassandra CQL client for versions >= 2.1. * - * See {@code cassandra2/README.md} for details. + * See {@code cassandra/README.md} for details. * * @author cmatser + * @author EngineersBox (Jack Kilrain) */ public class CassandraCQLClient extends DB { private static Logger logger = LoggerFactory.getLogger(CassandraCQLClient.class); - private static Cluster cluster = null; - private static Session session = null; - - private static ConcurrentMap, PreparedStatement> readStmts = - new ConcurrentHashMap, PreparedStatement>(); - private static ConcurrentMap, PreparedStatement> scanStmts = - new ConcurrentHashMap, PreparedStatement>(); - private static ConcurrentMap, PreparedStatement> insertStmts = - new ConcurrentHashMap, PreparedStatement>(); - private static ConcurrentMap, PreparedStatement> updateStmts = - new ConcurrentHashMap, PreparedStatement>(); - private static AtomicReference readAllStmt = - new AtomicReference(); - private static AtomicReference scanAllStmt = - new AtomicReference(); - private static AtomicReference deleteStmt = - new AtomicReference(); + private static CqlSession session = null; + private static DriverExecutionProfile readProfile = null; + private static DriverExecutionProfile scanProfile = null; + private static DriverExecutionProfile insertProfile = null; + private static DriverExecutionProfile updateProfile = null; + private static DriverExecutionProfile deleteProfile = null; + + private static ConcurrentMap, PreparedStatement> readStmts = new ConcurrentHashMap<>(); + private static ConcurrentMap, PreparedStatement> scanStmts = new ConcurrentHashMap<>(); + private static ConcurrentMap, PreparedStatement> insertStmts = new ConcurrentHashMap<>(); + private static ConcurrentMap, PreparedStatement> updateStmts = new ConcurrentHashMap<>(); + private static AtomicReference readAllStmt = new AtomicReference<>(); + private static AtomicReference scanAllStmt = new AtomicReference<>(); + private static AtomicReference deleteStmt = new AtomicReference<>(); private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.QUORUM; private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.QUORUM; public static final String YCSB_KEY = "y_id"; - public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; - public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; - public static final String USERNAME_PROPERTY = "cassandra.username"; - public static final String PASSWORD_PROPERTY = "cassandra.password"; - - public static final String HOSTS_PROPERTY = "hosts"; - public static final String PORT_PROPERTY = "port"; - public static final String PORT_PROPERTY_DEFAULT = "9042"; - - public static final String READ_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.readconsistencylevel"; - public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = readConsistencyLevel.name(); - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.writeconsistencylevel"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = writeConsistencyLevel.name(); - - public static final String MAX_CONNECTIONS_PROPERTY = - "cassandra.maxconnections"; - public static final String CORE_CONNECTIONS_PROPERTY = - "cassandra.coreconnections"; - public static final String CONNECT_TIMEOUT_MILLIS_PROPERTY = - "cassandra.connecttimeoutmillis"; - public static final String READ_TIMEOUT_MILLIS_PROPERTY = - "cassandra.readtimeoutmillis"; - public static final String TRACING_PROPERTY = "cassandra.tracing"; + public static final String DRIVER_CONFIG_PROPERTY = "cassandra.driverconfig"; + public static final String DRIVER_PROFILE_READ_PROPERTY = "cassandra.driverprofile.read"; + public static final String DRIVER_PROFILE_SCAN_PROPERTY = "cassandra.driverprofile.scan"; + public static final String DRIVER_PROFILE_INSERT_PROPERTY = "cassandra.driverprofile.insert"; + public static final String DRIVER_PROFILE_UPDATE_PROPERTY = "cassandra.driverprofile.update"; + public static final String DRIVER_PROFILE_DELETE_PROPERTY = "cassandra.driverprofile.delete"; + public static final String TRACING_READ_PROPERTY = "cassandra.tracing.read"; + public static final String TRACING_SCAN_PROPERTY = "cassandra.tracing.scan"; + public static final String TRACING_INSERT_PROPERTY = "cassandra.tracing.insert"; + public static final String TRACING_UPDATE_PROPERTY = "cassandra.tracing.update"; + public static final String TRACING_DELETE_PROPERTY = "cassandra.tracing.delete"; + public static final String TRACING_FREQUENCY_READ_PROPERTY = "cassandra.tracingfrequency.read"; + public static final String TRACING_FREQUENCY_SCAN_PROPERTY = "cassandra.tracingfrequency.scan"; + public static final String TRACING_FREQUENCY_INSERT_PROPERTY = "cassandra.tracingfrequency.insert"; + public static final String TRACING_FREQUENCY_UPDATE_PROPERTY = "cassandra.tracingfrequency.update"; + public static final String TRACING_FREQUENCY_DELETE_PROPERTY = "cassandra.tracingfrequency.delete"; public static final String TRACING_PROPERTY_DEFAULT = "false"; - - public static final String USE_SSL_CONNECTION = "cassandra.useSSL"; - private static final String DEFAULT_USE_SSL_CONNECTION = "false"; + public static final String TRACING_FREQUENCY_PROPERTY_DEFAULT = "1000"; /** * Count the number of times initialized to teardown on the last * {@link #cleanup()}. */ private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + private static final AtomicLong READ_TRACE_COUNT = new AtomicLong(0); + private static final AtomicLong SCAN_TRACE_COUNT = new AtomicLong(0); + private static final AtomicLong INSERT_TRACE_COUNT = new AtomicLong(0); + private static final AtomicLong UPDATE_TRACE_COUNT = new AtomicLong(0); + private static final AtomicLong DELETE_TRACE_COUNT = new AtomicLong(0); + private static boolean traceRead = false; + private static boolean traceScan = false; + private static boolean traceInsert = false; + private static boolean traceUpdate = false; + private static boolean traceDelete = false; + private static int traceReadFrequency = 1000; + private static int traceScanFrequency = 1000; + private static int traceInsertFrequency = 1000; + private static int traceUpdateFrequency = 1000; + private static int traceDeleteFrequency = 1000; - private static boolean debug = false; - private static boolean trace = false; - /** * Initialize any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ @Override public void init() throws DBException { - // Keep track of number of calls to init (for later cleanup) INIT_COUNT.incrementAndGet(); - // Synchronized so that we only have a single // cluster/session instance for all the threads. synchronized (INIT_COUNT) { - // Check if the cluster has already been initialized - if (cluster != null) { + if (session != null) { return; } - try { - - debug = - Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - trace = Boolean.valueOf(getProperties().getProperty(TRACING_PROPERTY, TRACING_PROPERTY_DEFAULT)); - - String host = getProperties().getProperty(HOSTS_PROPERTY); - if (host == null) { - throw new DBException(String.format( - "Required property \"%s\" missing for CassandraCQLClient", - HOSTS_PROPERTY)); - } - String[] hosts = host.split(","); - String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT); - - String username = getProperties().getProperty(USERNAME_PROPERTY); - String password = getProperties().getProperty(PASSWORD_PROPERTY); - - String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, - KEYSPACE_PROPERTY_DEFAULT); - - readConsistencyLevel = ConsistencyLevel.valueOf( - getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, - READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - writeConsistencyLevel = ConsistencyLevel.valueOf( - getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, - WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - - Boolean useSSL = Boolean.parseBoolean(getProperties().getProperty(USE_SSL_CONNECTION, - DEFAULT_USE_SSL_CONNECTION)); - - if ((username != null) && !username.isEmpty()) { - Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(username, password) - .withPort(Integer.valueOf(port)).addContactPoints(hosts); - if (useSSL) { - clusterBuilder = clusterBuilder.withSSL(); - } - cluster = clusterBuilder.build(); - } else { - cluster = Cluster.builder().withPort(Integer.valueOf(port)) - .addContactPoints(hosts).build(); - } - - String maxConnections = getProperties().getProperty( - MAX_CONNECTIONS_PROPERTY); - if (maxConnections != null) { - cluster.getConfiguration().getPoolingOptions() - .setMaxConnectionsPerHost(HostDistance.LOCAL, - Integer.valueOf(maxConnections)); - } - - String coreConnections = getProperties().getProperty( - CORE_CONNECTIONS_PROPERTY); - if (coreConnections != null) { - cluster.getConfiguration().getPoolingOptions() - .setCoreConnectionsPerHost(HostDistance.LOCAL, - Integer.valueOf(coreConnections)); - } - - String connectTimoutMillis = getProperties().getProperty( - CONNECT_TIMEOUT_MILLIS_PROPERTY); - if (connectTimoutMillis != null) { - cluster.getConfiguration().getSocketOptions() - .setConnectTimeoutMillis(Integer.valueOf(connectTimoutMillis)); - } - - String readTimoutMillis = getProperties().getProperty( - READ_TIMEOUT_MILLIS_PROPERTY); - if (readTimoutMillis != null) { - cluster.getConfiguration().getSocketOptions() - .setReadTimeoutMillis(Integer.valueOf(readTimoutMillis)); - } - - Metadata metadata = cluster.getMetadata(); - logger.info("Connected to cluster: {}\n", - metadata.getClusterName()); - - for (Host discoveredHost : metadata.getAllHosts()) { - logger.info("Datacenter: {}; Host: {}; Rack: {}\n", - discoveredHost.getDatacenter(), discoveredHost.getAddress(), - discoveredHost.getRack()); + traceRead = Boolean.parseBoolean(getProperties().getProperty( + TRACING_READ_PROPERTY, + TRACING_PROPERTY_DEFAULT + )); + traceReadFrequency = Integer.parseInt(getProperties().getProperty( + TRACING_FREQUENCY_READ_PROPERTY, + TRACING_FREQUENCY_PROPERTY_DEFAULT + )); + traceScan = Boolean.parseBoolean(getProperties().getProperty( + TRACING_SCAN_PROPERTY, + TRACING_PROPERTY_DEFAULT + )); + traceScanFrequency = Integer.parseInt(getProperties().getProperty( + TRACING_FREQUENCY_SCAN_PROPERTY, + TRACING_FREQUENCY_PROPERTY_DEFAULT + )); + traceInsert = Boolean.parseBoolean(getProperties().getProperty( + TRACING_INSERT_PROPERTY, + TRACING_PROPERTY_DEFAULT + )); + traceInsertFrequency = Integer.parseInt(getProperties().getProperty( + TRACING_FREQUENCY_INSERT_PROPERTY, + TRACING_FREQUENCY_PROPERTY_DEFAULT + )); + traceUpdate = Boolean.parseBoolean(getProperties().getProperty( + TRACING_UPDATE_PROPERTY, + TRACING_PROPERTY_DEFAULT + )); + traceUpdateFrequency = Integer.parseInt(getProperties().getProperty( + TRACING_FREQUENCY_UPDATE_PROPERTY, + TRACING_FREQUENCY_PROPERTY_DEFAULT + )); + traceDelete = Boolean.parseBoolean(getProperties().getProperty( + TRACING_DELETE_PROPERTY, + TRACING_PROPERTY_DEFAULT + )); + traceDeleteFrequency = Integer.parseInt(getProperties().getProperty( + TRACING_FREQUENCY_DELETE_PROPERTY, + TRACING_FREQUENCY_PROPERTY_DEFAULT + )); + String driverConfigPath = getProperties().getProperty(DRIVER_CONFIG_PROPERTY); + if (driverConfigPath == null) { + throw new IllegalArgumentException("Missing required driver config file path set via: '" + + DRIVER_CONFIG_PROPERTY + "'"); } - - session = cluster.connect(keyspace); - + File driverConfigFile = new File(driverConfigPath); + DriverConfigLoader configLoader = DriverConfigLoader.fromFile(driverConfigFile); + DriverConfig driverConfig = configLoader.getInitialConfig(); + Function profileSupplier = (String profile) -> profile == null + ? driverConfig.getDefaultProfile() + : driverConfig.getProfile(profile); + readProfile = profileSupplier.apply(getProperties().getProperty(DRIVER_PROFILE_READ_PROPERTY)); + scanProfile = profileSupplier.apply(getProperties().getProperty(DRIVER_PROFILE_SCAN_PROPERTY)); + insertProfile = profileSupplier.apply(getProperties().getProperty(DRIVER_PROFILE_INSERT_PROPERTY)); + updateProfile = profileSupplier.apply(getProperties().getProperty(DRIVER_PROFILE_UPDATE_PROPERTY)); + deleteProfile = profileSupplier.apply(getProperties().getProperty(DRIVER_PROFILE_DELETE_PROPERTY)); + session = CqlSession.builder() + .withConfigLoader(DriverConfigLoader.fromFile(driverConfigFile)) + .build(); } catch (Exception e) { throw new DBException(e); } @@ -253,9 +222,12 @@ public void cleanup() throws DBException { readAllStmt.set(null); scanAllStmt.set(null); deleteStmt.set(null); + READ_TRACE_COUNT.set(0); + SCAN_TRACE_COUNT.set(0); + INSERT_TRACE_COUNT.set(0); + UPDATE_TRACE_COUNT.set(0); + DELETE_TRACE_COUNT.set(0); session.close(); - cluster.close(); - cluster = null; session = null; } if (curInitCount < 0) { @@ -266,6 +238,38 @@ public void cleanup() throws DBException { } } + /** + * Logs the trace events of a query submitted with tracing enabled. + * + * @param rs {@code ResultSet} from a {@code session.execute(...)} invocation. + */ + private void logTraceOutput(final ResultSet rs) { + final ExecutionInfo executionInfo = rs.getExecutionInfo(); + final UUID tracingId = executionInfo.getTracingId(); + final QueryTrace qtrace = executionInfo.getQueryTrace(); + logger.info( + "[{}] '{}' to {} took {}μs", + tracingId, + qtrace.getRequestType(), + qtrace.getCoordinatorAddress(), + qtrace.getDurationMicros() + ); + int eventIndex = 0; + for (final TraceEvent event : qtrace.getEvents()) { + logger.info( + " {} - [{}μs] {} :: {}", + eventIndex++, + event.getSourceElapsedMicros(), + event.getThreadName(), + event.getActivity() + ); + } + } + + private boolean shouldTraceQuery(final boolean flag, final AtomicLong counter, final int frequency) { + return flag && counter.get() % frequency == 0; + } + /** * Read a record from the database. Each field/value pair from the result will * be stored in a HashMap. @@ -283,62 +287,52 @@ public void cleanup() throws DBException { @Override public Status read(String table, String key, Set fields, Map result) { + final boolean shouldTrace = shouldTraceQuery(traceRead, READ_TRACE_COUNT, traceReadFrequency); try { PreparedStatement stmt = (fields == null) ? readAllStmt.get() : readStmts.get(fields); - // Prepare statement on demand if (stmt == null) { - Select.Builder selectBuilder; - + Select select; if (fields == null) { - selectBuilder = QueryBuilder.select().all(); + select = QueryBuilder.selectFrom(table).all(); } else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - stmt = session.prepare(selectBuilder.from(table) - .where(QueryBuilder.eq(YCSB_KEY, QueryBuilder.bindMarker())) - .limit(1)); - stmt.setConsistencyLevel(readConsistencyLevel); - if (trace) { - stmt.enableTracing(); + select = QueryBuilder.selectFrom(table).columns(fields); } - + SimpleStatement simpleStatement = select.where(Relation.column(YCSB_KEY).isEqualTo(QueryBuilder.bindMarker())) + .limit(1) + .build() + .setConsistencyLevel(readConsistencyLevel) + .setTracing(shouldTrace) + .setExecutionProfile(readProfile); + stmt = session.prepare(simpleStatement); PreparedStatement prevStmt = (fields == null) ? readAllStmt.getAndSet(stmt) : - readStmts.putIfAbsent(new HashSet(fields), stmt); + readStmts.putIfAbsent(new HashSet<>(fields), stmt); if (prevStmt != null) { stmt = prevStmt; } } - - logger.debug(stmt.getQueryString()); + logger.debug(stmt.getQuery()); logger.debug("key = {}", key); - ResultSet rs = session.execute(stmt.bind(key)); - - if (rs.isExhausted()) { - return Status.NOT_FOUND; + if (shouldTrace) { + logTraceOutput(rs); } - // Should be only 1 row Row row = rs.one(); + if (row == null) { + return Status.NOT_FOUND; + } ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); + for (final ColumnDefinition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName().toString()); if (val != null) { - result.put(def.getName(), new ByteArrayByteIterator(val.array())); + result.put(def.getName().toString(), new ByteArrayByteIterator(val.array())); } else { - result.put(def.getName(), null); + result.put(def.getName().toString(), null); } } - return Status.OK; - } catch (Exception e) { logger.error(MessageFormatter.format("Error reading key: {}", key).getMessage(), e); return Status.ERROR; @@ -369,79 +363,58 @@ public Status read(String table, String key, Set fields, @Override public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - + final boolean shouldTrace = shouldTraceQuery(traceScan, SCAN_TRACE_COUNT, traceScanFrequency); try { PreparedStatement stmt = (fields == null) ? scanAllStmt.get() : scanStmts.get(fields); - // Prepare statement on demand if (stmt == null) { - Select.Builder selectBuilder; - + Select select; if (fields == null) { - selectBuilder = QueryBuilder.select().all(); + select = QueryBuilder.selectFrom(table).all(); } else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - Select selectStmt = selectBuilder.from(table); - - // The statement builder is not setup right for tokens. - // So, we need to build it manually. - String initialStmt = selectStmt.toString(); - StringBuilder scanStmt = new StringBuilder(); - scanStmt.append(initialStmt.substring(0, initialStmt.length() - 1)); - scanStmt.append(" WHERE "); - scanStmt.append(QueryBuilder.token(YCSB_KEY)); - scanStmt.append(" >= "); - scanStmt.append("token("); - scanStmt.append(QueryBuilder.bindMarker()); - scanStmt.append(")"); - scanStmt.append(" LIMIT "); - scanStmt.append(QueryBuilder.bindMarker()); - - stmt = session.prepare(scanStmt.toString()); - stmt.setConsistencyLevel(readConsistencyLevel); - if (trace) { - stmt.enableTracing(); + select = QueryBuilder.selectFrom(table).columns(fields); } - - PreparedStatement prevStmt = (fields == null) ? + SimpleStatement simpleStatement = select.where(Relation.token(YCSB_KEY) + .isGreaterThanOrEqualTo(QueryBuilder.function( + "\"token\"", + QueryBuilder.bindMarker() + )) + ).limit(QueryBuilder.bindMarker()) + .build() + .setConsistencyLevel(readConsistencyLevel) + .setTracing(shouldTrace) + .setExecutionProfile(scanProfile); + stmt = session.prepare(simpleStatement); + final PreparedStatement prevStmt = (fields == null) ? scanAllStmt.getAndSet(stmt) : - scanStmts.putIfAbsent(new HashSet(fields), stmt); + scanStmts.putIfAbsent(new HashSet<>(fields), stmt); if (prevStmt != null) { stmt = prevStmt; } } - - logger.debug(stmt.getQueryString()); - logger.debug("startKey = {}, recordcount = {}", startkey, recordcount); - - ResultSet rs = session.execute(stmt.bind(startkey, Integer.valueOf(recordcount))); - + logger.debug(stmt.getQuery()); + logger.debug("startKey = {}, record count = {}", startkey, recordcount); + ResultSet rs = session.execute(stmt.bind(startkey, recordcount)); + if (shouldTrace) { + logTraceOutput(rs); + } HashMap tuple; - while (!rs.isExhausted()) { - Row row = rs.one(); - tuple = new HashMap(); - + Row row; + while ((row = rs.one()) != null) { + tuple = new HashMap<>(); ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); + for (ColumnDefinition def : cd) { + String name = def.getName().toString(); + ByteBuffer val = row.getBytesUnsafe(name); if (val != null) { - tuple.put(def.getName(), new ByteArrayByteIterator(val.array())); + tuple.put(name, new ByteArrayByteIterator(val.array())); } else { - tuple.put(def.getName(), null); + tuple.put(name, null); } } - result.add(tuple); } - return Status.OK; - } catch (Exception e) { logger.error( MessageFormatter.format("Error scanning with startkey: {}", startkey).getMessage(), e); @@ -465,60 +438,60 @@ public Status scan(String table, String startkey, int recordcount, */ @Override public Status update(String table, String key, Map values) { - + final boolean shouldTrace = shouldTraceQuery(traceUpdate, UPDATE_TRACE_COUNT, traceUpdateFrequency); try { Set fields = values.keySet(); PreparedStatement stmt = updateStmts.get(fields); - // Prepare statement on demand if (stmt == null) { - Update updateStmt = QueryBuilder.update(table); - + UpdateStart updateStmt = QueryBuilder.update(table); // Add fields - for (String field : fields) { - updateStmt.with(QueryBuilder.set(field, QueryBuilder.bindMarker())); - } - - // Add key - updateStmt.where(QueryBuilder.eq(YCSB_KEY, QueryBuilder.bindMarker())); - - stmt = session.prepare(updateStmt); - stmt.setConsistencyLevel(writeConsistencyLevel); - if (trace) { - stmt.enableTracing(); - } - - PreparedStatement prevStmt = updateStmts.putIfAbsent(new HashSet(fields), stmt); + List assignments = fields.stream() + .map((final String field) -> Assignment.setColumn(field, QueryBuilder.bindMarker())) + .collect(Collectors.toList()); + // Add keying + Update update = updateStmt.set(assignments) + .where(Relation.column(YCSB_KEY).isEqualTo(QueryBuilder.bindMarker())); + SimpleStatement simpleStatement = update.build() + .setConsistencyLevel(writeConsistencyLevel) + .setTracing(shouldTrace) + .setExecutionProfile(updateProfile); + stmt = session.prepare(simpleStatement); + PreparedStatement prevStmt = updateStmts.putIfAbsent(new HashSet<>(fields), stmt); if (prevStmt != null) { stmt = prevStmt; } } - if (logger.isDebugEnabled()) { - logger.debug(stmt.getQueryString()); + logger.debug(stmt.getQuery()); logger.debug("key = {}", key); for (Map.Entry entry : values.entrySet()) { logger.debug("{} = {}", entry.getKey(), entry.getValue()); } } - // Add fields - ColumnDefinitions vars = stmt.getVariables(); + ColumnDefinitions vars = stmt.getVariableDefinitions(); BoundStatement boundStmt = stmt.bind(); for (int i = 0; i < vars.size() - 1; i++) { - boundStmt.setString(i, values.get(vars.getName(i)).toString()); + boundStmt = boundStmt.setString( + i, + values.get( + vars.get(i) + .getName() + .toString() + ).toString() + ); } - // Add key - boundStmt.setString(vars.size() - 1, key); - - session.execute(boundStmt); - + boundStmt = boundStmt.setString(vars.size() - 1, key); + final ResultSet rs = session.execute(boundStmt); + if (shouldTrace) { + logTraceOutput(rs); + } return Status.OK; } catch (Exception e) { logger.error(MessageFormatter.format("Error updating key: {}", key).getMessage(), e); } - return Status.ERROR; } @@ -537,59 +510,58 @@ public Status update(String table, String key, Map values) */ @Override public Status insert(String table, String key, Map values) { - + final boolean shouldTrace = shouldTraceQuery(traceInsert, INSERT_TRACE_COUNT, traceInsertFrequency); try { Set fields = values.keySet(); PreparedStatement stmt = insertStmts.get(fields); - // Prepare statement on demand if (stmt == null) { - Insert insertStmt = QueryBuilder.insertInto(table); - + InsertInto insertStmt = QueryBuilder.insertInto(table); // Add key - insertStmt.value(YCSB_KEY, QueryBuilder.bindMarker()); - - // Add fields - for (String field : fields) { - insertStmt.value(field, QueryBuilder.bindMarker()); - } - - stmt = session.prepare(insertStmt); - stmt.setConsistencyLevel(writeConsistencyLevel); - if (trace) { - stmt.enableTracing(); - } - - PreparedStatement prevStmt = insertStmts.putIfAbsent(new HashSet(fields), stmt); + RegularInsert regularInsert = insertStmt.value(YCSB_KEY, QueryBuilder.bindMarker()) + .values(fields.stream().collect(Collectors.toMap( + Function.identity(), + (String ignored) -> QueryBuilder.bindMarker() + ))); + SimpleStatement simpleStatement = regularInsert.build() + .setConsistencyLevel(writeConsistencyLevel) + .setTracing(shouldTrace) + .setExecutionProfile(insertProfile); + stmt = session.prepare(simpleStatement); + PreparedStatement prevStmt = insertStmts.putIfAbsent(new HashSet<>(fields), stmt); if (prevStmt != null) { stmt = prevStmt; } } - if (logger.isDebugEnabled()) { - logger.debug(stmt.getQueryString()); + logger.debug(stmt.getQuery()); logger.debug("key = {}", key); for (Map.Entry entry : values.entrySet()) { logger.debug("{} = {}", entry.getKey(), entry.getValue()); } } - // Add key BoundStatement boundStmt = stmt.bind().setString(0, key); - // Add fields - ColumnDefinitions vars = stmt.getVariables(); + ColumnDefinitions vars = stmt.getVariableDefinitions(); for (int i = 1; i < vars.size(); i++) { - boundStmt.setString(i, values.get(vars.getName(i)).toString()); + boundStmt = boundStmt.setString( + i, + values.get( + vars.get(i) + .getName() + .toString() + ).toString() + ); + } + final ResultSet rs = session.execute(boundStmt); + if (shouldTrace) { + logTraceOutput(rs); } - - session.execute(boundStmt); - return Status.OK; } catch (Exception e) { logger.error(MessageFormatter.format("Error inserting key: {}", key).getMessage(), e); } - return Status.ERROR; } @@ -604,35 +576,33 @@ public Status insert(String table, String key, Map values) */ @Override public Status delete(String table, String key) { - + final boolean shouldTrace = shouldTraceQuery(traceDelete, DELETE_TRACE_COUNT, traceDeleteFrequency); try { PreparedStatement stmt = deleteStmt.get(); - // Prepare statement on demand if (stmt == null) { - stmt = session.prepare(QueryBuilder.delete().from(table) - .where(QueryBuilder.eq(YCSB_KEY, QueryBuilder.bindMarker()))); - stmt.setConsistencyLevel(writeConsistencyLevel); - if (trace) { - stmt.enableTracing(); - } - + Delete delete = QueryBuilder.deleteFrom(table) + .where(Relation.column(YCSB_KEY).isEqualTo(QueryBuilder.bindMarker())); + stmt = session.prepare(delete.build() + .setConsistencyLevel(writeConsistencyLevel) + .setTracing(shouldTrace) + .setExecutionProfile(deleteProfile) + ); PreparedStatement prevStmt = deleteStmt.getAndSet(stmt); if (prevStmt != null) { stmt = prevStmt; } } - - logger.debug(stmt.getQueryString()); + logger.debug(stmt.getQuery()); logger.debug("key = {}", key); - - session.execute(stmt.bind(key)); - + final ResultSet rs = session.execute(stmt.bind(key)); + if (shouldTrace) { + logTraceOutput(rs); + } return Status.OK; } catch (Exception e) { logger.error(MessageFormatter.format("Error deleting key: {}", key).getMessage(), e); } - return Status.ERROR; } diff --git a/pom.xml b/pom.xml index 7a24d43d4f..b7b0be10a9 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,16 @@ LICENSE file. + + + + false + + central + Maven Repository Switchboard + http://repo1.maven.org/maven2 + + scm:git:git://github.com/brianfrankcooper/YCSB.git master @@ -117,7 +127,7 @@ LICENSE file. 1.8.2 4.8.0 4.0.0 - 3.0.0 + 4.17.0 6.71.0 1.4.10 2.3.1