diff --git a/engine/build.gradle.kts b/engine/build.gradle.kts index 29207ccf..fe480262 100644 --- a/engine/build.gradle.kts +++ b/engine/build.gradle.kts @@ -51,6 +51,9 @@ dependencies { implementation(Dependencies.Logging.SLF4J_API) implementation(Dependencies.Logging.LOGBACK_CLASSIC) + // SlateDB (Maven Central) + implementation("io.slatedb:slatedb:0.11.0") + // HBase implementation(Dependencies.HBase.CLIENT) implementation(Dependencies.HBase.MAPREDUCE) diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/Graph.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/Graph.kt index af92deba..138afcd0 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/Graph.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/Graph.kt @@ -70,6 +70,8 @@ import com.kakao.actionbase.v2.engine.sql.toRowFlux import com.kakao.actionbase.v2.engine.storage.hbase.HBaseConnections import com.kakao.actionbase.v2.engine.storage.hbase.HBaseOptions import com.kakao.actionbase.v2.engine.storage.jdbc.MetadataTable +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbConnections +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbOptions import com.kakao.actionbase.v2.engine.util.getLogger import com.kakao.actionbase.v2.engine.wal.Wal import com.kakao.actionbase.v2.engine.wal.WalFactory @@ -630,6 +632,10 @@ class Graph( val options = storage.materialize().options as HBaseOptions options.checkConnection() } + StorageType.SLATEDB -> { + val options = storage.materialize().options as SlateDbOptions + options.checkConnection() + } else -> Mono.just(false) } @@ -903,6 +909,7 @@ class Graph( intervalDisposable?.dispose() log.info("Disposed Flux.interval for reloading metastore - {}", intervalDisposable) HBaseConnections.closeConnections().block() + SlateDbConnections.closeConnections().block() DefaultHBaseCluster.INSTANCE.close() } diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/LabelEntity.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/LabelEntity.kt index 4fa10b50..028ed29a 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/LabelEntity.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/LabelEntity.kt @@ -21,12 +21,15 @@ import com.kakao.actionbase.v2.engine.label.hbase.HBaseIndexedLabel import com.kakao.actionbase.v2.engine.label.metastore.JdbcHashLabel import com.kakao.actionbase.v2.engine.label.metastore.LocalBackedJdbcHashLabel import com.kakao.actionbase.v2.engine.label.nil.NilLabel +import com.kakao.actionbase.v2.engine.label.slatedb.SlateDbHashLabel +import com.kakao.actionbase.v2.engine.label.slatedb.SlateDbIndexedLabel import com.kakao.actionbase.v2.engine.service.ddl.LabelCreateRequest import com.kakao.actionbase.v2.engine.sql.RowWithSchema import com.kakao.actionbase.v2.engine.storage.DatastoreStorage import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorage import com.kakao.actionbase.v2.engine.storage.jdbc.JdbcStorage import com.kakao.actionbase.v2.engine.storage.local.LocalStorage +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbStorage import com.kakao.actionbase.v2.engine.util.getLogger import org.slf4j.Logger @@ -79,6 +82,7 @@ data class LabelEntity( is JdbcStorage -> JdbcHashLabel.create(this, graph, storage, block) is HBaseStorage -> HBaseHashLabel.create(this, graph, storage) is DatastoreStorage -> DatastoreHashLabel.create(this, graph, block) + is SlateDbStorage -> SlateDbHashLabel.create(this, graph, storage) else -> { logger.error( "{} supports only Local, Jdbc, HBase storage types. {} is not supported. Fallback to NilLabel", @@ -99,9 +103,10 @@ data class LabelEntity( when (storage) { is HBaseStorage -> HBaseIndexedLabel.create(this, graph, storage) is DatastoreStorage -> DatastoreIndexedLabel.create(this, graph, block) + is SlateDbStorage -> SlateDbIndexedLabel.create(this, graph, storage) else -> { logger.error( - "{} supports only Jdbc, HBase storage types. {} is not supported. Fallback to NilLabel", + "{} supports only Jdbc, HBase, SlateDb storage types. {} is not supported. Fallback to NilLabel", type, storage, ) diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/StorageEntity.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/StorageEntity.kt index 5b0cdec1..2f689668 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/StorageEntity.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/entity/StorageEntity.kt @@ -11,6 +11,7 @@ import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorage import com.kakao.actionbase.v2.engine.storage.jdbc.JdbcStorage import com.kakao.actionbase.v2.engine.storage.local.LocalStorage import com.kakao.actionbase.v2.engine.storage.nil.NilStorage +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbStorage import com.kakao.actionbase.v2.engine.util.getLogger import org.slf4j.Logger @@ -41,6 +42,9 @@ data class StorageEntity( StorageType.HBASE -> { HBaseStorage(this) } + StorageType.SLATEDB -> { + SlateDbStorage(this) + } StorageType.DATASTORE -> { DatastoreStorage } diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbHashLabel.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbHashLabel.kt new file mode 100644 index 00000000..b87bb31a --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbHashLabel.kt @@ -0,0 +1,282 @@ +package com.kakao.actionbase.v2.engine.label.slatedb + +import com.kakao.actionbase.v2.core.code.EdgeEncoder +import com.kakao.actionbase.v2.core.code.EncodedKey +import com.kakao.actionbase.v2.core.code.IdEdgeEncoder +import com.kakao.actionbase.v2.core.code.KeyFieldValue +import com.kakao.actionbase.v2.core.code.KeyValue +import com.kakao.actionbase.v2.core.edge.Edge +import com.kakao.actionbase.v2.core.edge.SchemaEdge +import com.kakao.actionbase.v2.core.metadata.Direction +import com.kakao.actionbase.v2.engine.GraphDefaults +import com.kakao.actionbase.v2.engine.edge.decodeByteArray +import com.kakao.actionbase.v2.engine.edge.toRow +import com.kakao.actionbase.v2.engine.entity.LabelEntity +import com.kakao.actionbase.v2.engine.label.AbstractLabel +import com.kakao.actionbase.v2.engine.label.LabelFactory +import com.kakao.actionbase.v2.engine.sql.DataFrame +import com.kakao.actionbase.v2.engine.sql.Row +import com.kakao.actionbase.v2.engine.sql.StatKey +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbStorage +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbTable +import com.kakao.actionbase.v2.engine.storage.slatedb.toLong +import com.kakao.actionbase.v2.engine.storage.slatedb.toSlateBytes + +import java.util.Arrays + +import reactor.core.publisher.Mono + +open class SlateDbHashLabel( + entity: LabelEntity, + coder: EdgeEncoder, + private val table: Mono, +) : AbstractLabel(entity, coder) { + override fun findHashEdge(keyField: EncodedKey): Mono { + require(keyField.field == null) { "field must be null" } + return table.flatMap { it.get(keyField.key) } + } + + override fun create( + keyField: EncodedKey, + value: ByteArray, + ): Mono> { + require(keyField.field == null) { "field must be null" } + return table + .flatMap { it.put(keyField.key, value) } + .thenReturn(emptyList()) + } + + override fun update( + keyField: EncodedKey, + value: ByteArray, + ): Mono> = create(keyField, value) + + override fun delete(keyField: EncodedKey): Mono> { + require(keyField.field == null) { "field must be null" } + return table + .flatMap { it.delete(keyField.key) } + .thenReturn(emptyList()) + } + + override fun setnx( + keyField: EncodedKey, + value: ByteArray, + ): Mono { + require(keyField.field == null) { "field must be null" } + return table.flatMap { tbl -> + tbl + .get(keyField.key) + .hasElement() + .flatMap { exists -> + if (exists) { + Mono.just(false) + } else { + tbl.put(keyField.key, value).thenReturn(true) + } + } + } + } + + override fun cad( + keyField: EncodedKey, + value: ByteArray, + ): Mono { + require(keyField.field == null) { "field must be null" } + return table.flatMap { tbl -> + tbl + .get(keyField.key) + .flatMap { existingValue -> + if (Arrays.equals(existingValue, value)) { + tbl.delete(keyField.key).thenReturn(1L) + } else { + Mono.just(0L) + } + }.defaultIfEmpty(0L) + } + } + + override fun findLockValue(keyField: EncodedKey): Mono { + require(keyField.field == null) { "field must be null" } + return table.flatMap { it.get(keyField.key) } + } + + override fun incrby( + key: ByteArray, + acc: Long, + ): Mono> = + table.flatMap { tbl -> + tbl.merge(key, acc.toSlateBytes()).thenReturn(emptyList()) + } + + override fun scanStorage( + prefix: EncodedKey, + limit: Int, + start: EncodedKey?, + end: EncodedKey?, + ): Mono>> = + table + .flatMap { it.scanPrefix(prefix.key, limit + 1) } + .map { results -> + results + // Filter by start key (exclusive) + .dropWhile { (key, _) -> + start?.key?.let { startKey -> Arrays.compareUnsigned(startKey, key) >= 0 } ?: false + } + // Filter by end key (exclusive) + .dropLastWhile { (key, _) -> + end?.key?.let { endKey -> Arrays.compareUnsigned(endKey, key) < 0 } ?: false + }.take(limit) + .map { (key, value) -> KeyFieldValue(key, value) } + } + + override fun encodedEdgeToSchemaEdge(keyFieldValue: KeyFieldValue): SchemaEdge = entity.schema.decodeByteArray(keyFieldValue) + + override fun deleteOnLock(keyField: KeyValue): Mono = cad(EncodedKey(keyField.key), keyField.value).map { it > 0 } + + override fun getSelf( + src: List, + stats: Set, + idEdgeEncoder: IdEdgeEncoder, + ): Mono { + val withAll = stats.contains(StatKey.WITH_ALL) + val withEdgeId = withAll || stats.contains(StatKey.EDGE_ID) + + val keysMono = + Mono.just( + src.map { + val edge = Edge(0L, it, it).ensureType(entity.schema) + coder.encodeHashEdgeKey(edge, entity.id) + }, + ) + + return keysMono + .flatMap { keys -> + table.flatMap { tbl -> + Mono.zip( + keys.map { key -> tbl.get(key.key).map { key to it } }, + ) { results -> + results + .filterIsInstance, ByteArray>>() + .mapNotNull { (key, value) -> + try { + encodedEdgeToSchemaEdge(KeyFieldValue(key.key, value)) + } catch (e: Exception) { + null + } + }.filter { withAll || it.isActive } + .map { + if (withEdgeId) { + it.toRow(withAll, idEdgeEncoder) + } else { + it.toRow(withAll, null) + } + } + } + } + }.map { rows -> + DataFrame( + rows, + if (withAll) { + entity.schema.allStructType + } else if (withEdgeId) { + entity.schema.edgeIdStructType + } else { + entity.schema.structType + }, + ) + }.defaultIfEmpty(DataFrame.empty(entity.schema.allStructType)) + } + + override fun get( + src: Any, + tgt: List, + dir: Direction, + stats: Set, + idEdgeEncoder: IdEdgeEncoder, + ): Mono { + val withAll = stats.contains(StatKey.WITH_ALL) + val withEdgeId = withAll || stats.contains(StatKey.EDGE_ID) + + val keys = + tgt.map { + val edge = Edge(0L, src, it).ensureType(entity.schema) + coder.encodeHashEdgeKey(edge, entity.id) + } + + return table + .flatMap { tbl -> + if (keys.isEmpty()) { + Mono.just(emptyList()) + } else { + Mono.zip( + keys.map { key -> tbl.get(key.key).map { key to it }.defaultIfEmpty(key to ByteArray(0)) }, + ) { results -> + results + .filterIsInstance, ByteArray>>() + .filter { it.second.isNotEmpty() } + .mapNotNull { (key, value) -> + try { + encodedEdgeToSchemaEdge(KeyFieldValue(key.key, value)) + } catch (e: Exception) { + null + } + }.filter { withAll || it.isActive } + .map { + if (withEdgeId) { + it.toRow(withAll, idEdgeEncoder, isMultiEdge) + } else { + it.toRow(withAll, null, isMultiEdge) + } + } + } + } + }.map { rows -> + DataFrame( + rows, + if (withAll) { + entity.schema.allStructType + } else if (withEdgeId) { + entity.schema.edgeIdStructType + } else { + entity.schema.structType + }, + ) + }.defaultIfEmpty(DataFrame.empty(entity.schema.allStructType)) + } + + override fun getCountRows( + srcAndKeys: List>, + dir: Direction, + ): Mono> = + table.flatMap { tbl -> + if (srcAndKeys.isEmpty()) { + Mono.just(emptyList()) + } else { + Mono.zip( + srcAndKeys.map { (src, key) -> + tbl + .get(key) + .map { bytes -> bytes.toLong() } + .defaultIfEmpty(0L) + .map { count -> Row(arrayOf(src, count, dir)) } + }, + ) { results -> results.filterIsInstance() } + } + } + + companion object : LabelFactory { + override fun create( + entity: LabelEntity, + graph: GraphDefaults, + storage: SlateDbStorage, + block: SlateDbHashLabel.() -> Unit, + ): SlateDbHashLabel { + val table = storage.options.getTable() + return SlateDbHashLabel( + entity = entity, + coder = graph.edgeEncoderFactory.bytesKeyValueEncoder, + table = table, + ).apply(block) + } + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbIndexedLabel.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbIndexedLabel.kt new file mode 100644 index 00000000..953e3431 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbIndexedLabel.kt @@ -0,0 +1,69 @@ +package com.kakao.actionbase.v2.engine.label.slatedb + +import com.kakao.actionbase.v2.core.code.EdgeEncoder +import com.kakao.actionbase.v2.core.code.IdEdgeEncoder +import com.kakao.actionbase.v2.core.code.Index +import com.kakao.actionbase.v2.engine.GraphDefaults +import com.kakao.actionbase.v2.engine.cdc.CdcContext +import com.kakao.actionbase.v2.engine.entity.LabelEntity +import com.kakao.actionbase.v2.engine.label.AbstractLabel +import com.kakao.actionbase.v2.engine.label.LabelFactory +import com.kakao.actionbase.v2.engine.label.mixin.IndexedLabelMixin +import com.kakao.actionbase.v2.engine.sql.DataFrame +import com.kakao.actionbase.v2.engine.sql.ScanFilter +import com.kakao.actionbase.v2.engine.sql.StatKey +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbStorage +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbTable + +import reactor.core.publisher.Mono + +/** + * Manages IndexedEdgeEncoder in SlateDB + */ +class SlateDbIndexedLabel( + entity: LabelEntity, + coder: EdgeEncoder, + override val indices: List, + override val indexNameToIndex: Map, + table: Mono, +) : SlateDbHashLabel( + entity = entity, + coder = coder, + table = table, + ), + IndexedLabelMixin { + override val self: AbstractLabel = this + + override fun finalizeEdgeMutationUnderLock(context: CdcContext): Mono> = mutateIndexedEdges(context) + + override fun scan( + scanFilter: ScanFilter, + stats: Set, + idEdgeEncoder: IdEdgeEncoder, + ): Mono = + scanIndexedEdges( + scanFilter, + stats, + idEdgeEncoder, + ) + + companion object : LabelFactory { + override fun create( + entity: LabelEntity, + graph: GraphDefaults, + storage: SlateDbStorage, + block: SlateDbIndexedLabel.() -> Unit, + ): SlateDbIndexedLabel { + val table = storage.options.getTable() + val indices: List = entity.indices + val indexNameToId = indices.associateBy { it.name } + return SlateDbIndexedLabel( + entity = entity, + coder = graph.edgeEncoderFactory.bytesKeyValueEncoder, + indices = indices, + indexNameToIndex = indexNameToId, + table = table, + ) + } + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/metadata/StorageType.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/metadata/StorageType.kt index 55c968f5..f278f38b 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/metadata/StorageType.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/metadata/StorageType.kt @@ -5,6 +5,7 @@ enum class StorageType { LOCAL, JDBC, HBASE, + SLATEDB, DATASTORE, ; diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbConnections.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbConnections.kt new file mode 100644 index 00000000..3dda4dd4 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbConnections.kt @@ -0,0 +1,93 @@ +package com.kakao.actionbase.v2.engine.storage.slatedb + +import com.kakao.actionbase.v2.engine.util.getLogger + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean + +import io.slatedb.SlateDb +import io.slatedb.SlateDbConfig +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers + +// The SlateDB C library uses a single global Tokio runtime that does not support +// concurrent block_on calls from multiple threads. All native FFI calls are routed +// through this global single-thread scheduler to prevent concurrent runtime entries. +// +// Uses Schedulers.fromExecutorService rather than Schedulers.newSingle to avoid +// marking the worker thread as Reactor NonBlocking. NonBlocking threads are +// monitored by BlockHound, which conflicts with the intentional blocking FFI calls. +object SlateDbScheduler { + val INSTANCE: reactor.core.scheduler.Scheduler = + Schedulers.fromExecutorService( + Executors.newSingleThreadExecutor { r -> Thread(r, "slatedb-worker") }, + "slatedb-worker", + ) +} + +object SlateDbConnections { + private val logger = getLogger() + + private val initialized = AtomicBoolean(false) + private val connections: ConcurrentHashMap> = ConcurrentHashMap() + + fun ensureInitialized() { + if (initialized.compareAndSet(false, true)) { + logger.info("Initializing SlateDB (native library loaded from JAR classpath)") + SlateDb.initLogging(SlateDbConfig.LogLevel.INFO) + } + } + + fun getConnection( + dbPath: String, + url: String, + ): Mono { + val cacheKey = getCacheKey(dbPath, url) + + return connections.computeIfAbsent(cacheKey) { key -> + Mono + .fromCallable { + ensureInitialized() + val db = + SlateDb.builder(dbPath, url, null).use { builder -> + builder.withMergeOperator(incrementMergeOperator) + builder.build() + } + SlateDbTable.create(db) + }.subscribeOn(Schedulers.boundedElastic()) + .doOnSuccess { + logger.info("Successfully opened SlateDB connection for cacheKey: {}", key) + }.doOnError { error -> + logger.error("Failed to open SlateDB connection for cacheKey: {}", key, error) + connections.remove(key) + }.cache() + } + } + + private fun getCacheKey( + dbPath: String, + url: String, + ): String = "$url/$dbPath" + + fun closeConnections(): Mono { + val closeMonos = + connections.entries.map { (key, tableMono) -> + tableMono + .flatMap { table -> + Mono + .fromRunnable { + try { + table.close() + logger.info("Closed SlateDB connection for cacheKey: {}", key) + } catch (e: Exception) { + logger.error("Error closing SlateDB connection for cacheKey: {}", key, e) + } + // Close on the same single-thread scheduler so it is enqueued + // after all pending FFI operations, preventing use-after-close. + }.subscribeOn(SlateDbScheduler.INSTANCE) + } + } + return Mono.`when`(closeMonos).doFinally { connections.clear() } + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbOptions.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbOptions.kt new file mode 100644 index 00000000..0864a311 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbOptions.kt @@ -0,0 +1,24 @@ +package com.kakao.actionbase.v2.engine.storage.slatedb + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + +import reactor.core.publisher.Mono + +@JsonIgnoreProperties(ignoreUnknown = true) +data class SlateDbOptions( + val path: String = "data", + val url: String = "", +) { + fun checkConnection(): Mono = + if (url.isBlank()) { + Mono.just(false) + } else { + Mono.just(true) + } + + fun getTable(): Mono = + SlateDbConnections.getConnection( + dbPath = path, + url = url, + ) +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbStorage.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbStorage.kt new file mode 100644 index 00000000..ea6546b4 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbStorage.kt @@ -0,0 +1,11 @@ +package com.kakao.actionbase.v2.engine.storage.slatedb + +import com.kakao.actionbase.v2.engine.entity.StorageEntity +import com.kakao.actionbase.v2.engine.storage.Storage +import com.kakao.actionbase.v2.engine.storage.Storage.Companion.parseOptions + +class SlateDbStorage( + override val entity: StorageEntity, +) : Storage { + override val options: SlateDbOptions = parseOptions(entity.conf) +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTable.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTable.kt new file mode 100644 index 00000000..ed4b5e75 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTable.kt @@ -0,0 +1,154 @@ +package com.kakao.actionbase.v2.engine.storage.slatedb + +import java.nio.ByteBuffer +import java.nio.ByteOrder + +import io.slatedb.SlateDb +import io.slatedb.SlateDbKeyValue +import io.slatedb.SlateDbMergeOperator +import reactor.core.publisher.Mono + +fun Long.toSlateBytes(): ByteArray = + ByteBuffer + .allocate(Long.SIZE_BYTES) + .order(ByteOrder.BIG_ENDIAN) + .putLong(this) + .array() + +fun ByteArray.toLong(): Long = ByteBuffer.wrap(this).order(ByteOrder.BIG_ENDIAN).long + +val incrementMergeOperator = + SlateDbMergeOperator { _, existingValue, operand -> + val current = existingValue?.toLong() ?: 0L + val delta = operand.toLong() + (current + delta).toSlateBytes() + } + +sealed class BatchOperation { + data class Put( + val key: ByteArray, + val value: ByteArray, + ) : BatchOperation() + + data class Delete( + val key: ByteArray, + ) : BatchOperation() + + data class Increment( + val key: ByteArray, + val delta: Long, + ) : BatchOperation() +} + +interface SlateDbTable : AutoCloseable { + fun get(key: ByteArray): Mono + + fun put( + key: ByteArray, + value: ByteArray, + ): Mono + + fun delete(key: ByteArray): Mono + + fun merge( + key: ByteArray, + value: ByteArray, + ): Mono + + fun flush(): Mono + + fun scanPrefix( + prefix: ByteArray, + limit: Int, + ): Mono>> + + fun batch(operations: List): Mono + + companion object { + fun create(db: SlateDb): SlateDbTable = SlateDbTableImpl(db) + } +} + +internal class SlateDbTableImpl( + private val db: SlateDb, +) : SlateDbTable { + // The SlateDB C library uses a single global Tokio runtime with block_on, which + // does not support concurrent calls from multiple threads. The global single-thread + // scheduler serializes all native FFI calls across all database instances. + private val scheduler = SlateDbScheduler.INSTANCE + + override fun get(key: ByteArray): Mono = + Mono + .fromCallable { db.get(key) } + .flatMap { Mono.justOrEmpty(it) } + .subscribeOn(scheduler) + + override fun put( + key: ByteArray, + value: ByteArray, + ): Mono = + Mono + .fromCallable { db.put(key, value) } + .subscribeOn(scheduler) + .then() + + override fun delete(key: ByteArray): Mono = + Mono + .fromCallable { db.delete(key) } + .subscribeOn(scheduler) + .then() + + override fun merge( + key: ByteArray, + value: ByteArray, + ): Mono = + Mono + .fromCallable { db.merge(key, value) } + .subscribeOn(scheduler) + .then() + + override fun flush(): Mono = + Mono + .fromCallable { db.flush() } + .subscribeOn(scheduler) + .then() + + override fun scanPrefix( + prefix: ByteArray, + limit: Int, + ): Mono>> = + Mono + .fromCallable { + val results = mutableListOf>() + db.scanPrefix(prefix).use { iterator -> + var kv: SlateDbKeyValue? = iterator.next() + var count = 0 + while (kv != null && count < limit) { + results.add(kv.key() to kv.value()) + count++ + kv = iterator.next() + } + } + results.toList() + }.subscribeOn(scheduler) + + override fun batch(operations: List): Mono = + Mono + .fromCallable { + SlateDb.newWriteBatch().use { batch -> + operations.forEach { op -> + when (op) { + is BatchOperation.Put -> batch.put(op.key, op.value) + is BatchOperation.Delete -> batch.delete(op.key) + is BatchOperation.Increment -> batch.merge(op.key, op.delta.toSlateBytes()) + } + } + db.write(batch) + } + }.subscribeOn(scheduler) + .then() + + override fun close() { + db.close() + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/datastore/SlateDBDatastoreCompatibilityTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/datastore/SlateDBDatastoreCompatibilityTest.kt new file mode 100644 index 00000000..2322ff24 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/datastore/SlateDBDatastoreCompatibilityTest.kt @@ -0,0 +1,120 @@ +package com.kakao.actionbase.engine.datastore + +import com.kakao.actionbase.v2.engine.storage.slatedb.BatchOperation +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbTable +import com.kakao.actionbase.v2.engine.storage.slatedb.incrementMergeOperator +import com.kakao.actionbase.v2.engine.storage.slatedb.toLong +import com.kakao.actionbase.v2.engine.storage.slatedb.toSlateBytes + +import java.nio.file.Path + +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assumptions.assumeTrue +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.io.TempDir + +import io.slatedb.SlateDb +import io.slatedb.SlateDbConfig + +/** + * SlateDB compatibility test. + * + * Disabled by default. Set SLATEDB_TEST=true to run. + * + * To run: + * SLATEDB_TEST=true ./gradlew :engine:test --tests "*SlateDBDatastoreCompatibilityTest*" + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class SlateDBDatastoreCompatibilityTest : DatastoreCompatibilityTest() { + private var table: SlateDbTable? = null + private lateinit var tempDir: Path + + private val enabled = System.getenv("SLATEDB_TEST") == "true" + + @BeforeAll + fun setUpSlateDB( + @TempDir dir: Path, + ) { + assumeTrue(enabled, "SLATEDB_TEST=true not set") + tempDir = dir + SlateDb.initLogging(SlateDbConfig.LogLevel.INFO) + val db = + SlateDb.builder("data", "file://${tempDir.toAbsolutePath()}", null).use { builder -> + builder.withMergeOperator(incrementMergeOperator) + builder.build() + } + table = SlateDbTable.create(db) + } + + @AfterAll + fun tearDownSlateDB() { + table?.close() + } + + override fun createStore(): StorageOperations = SlateDBOperations(table!!) + + override fun supportsCheckAndMutate() = false + + override fun cleanup() { + table?.let { t -> + t.scanPrefix(ByteArray(0), Int.MAX_VALUE).block()?.forEach { (key, _) -> + t.delete(key).block() + } + } + } + + private class SlateDBOperations( + private val table: SlateDbTable, + ) : StorageOperations { + override fun get(key: ByteArray): ByteArray? = table.get(key).block() + + override fun getAll(keys: List) = keys.mapNotNull { k -> table.get(k).block()?.let { k to it } } + + override fun scan( + prefix: ByteArray, + limit: Int, + ) = table.scanPrefix(prefix, limit).block() ?: emptyList() + + override fun put( + key: ByteArray, + value: ByteArray, + ) { + table.put(key, value).block() + } + + override fun delete(key: ByteArray) { + table.delete(key).block() + } + + override fun increment( + key: ByteArray, + delta: Long, + ): Long { + table.merge(key, delta.toSlateBytes()).block() + return table.get(key).block()?.toLong() ?: 0L + } + + override fun batch(mutations: List) { + val ops = + mutations.map { m -> + when (m) { + is Mutation.Put -> BatchOperation.Put(m.key, m.value) + is Mutation.Delete -> BatchOperation.Delete(m.key) + is Mutation.Increment -> BatchOperation.Increment(m.key, m.delta) + } + } + table.batch(ops).block() + } + + override fun setIfNotExists( + key: ByteArray, + value: ByteArray, + ): Boolean = throw UnsupportedOperationException("SlateDB does not support checkAndMutate") + + override fun deleteIfEquals( + key: ByteArray, + expectedValue: ByteArray, + ): Boolean = throw UnsupportedOperationException("SlateDB does not support checkAndMutate") + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbHashLabelTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbHashLabelTest.kt new file mode 100644 index 00000000..fa6e4d56 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbHashLabelTest.kt @@ -0,0 +1,228 @@ +package com.kakao.actionbase.v2.engine.label.slatedb + +import com.kakao.actionbase.v2.core.metadata.DirectionType +import com.kakao.actionbase.v2.core.metadata.EdgeOperation +import com.kakao.actionbase.v2.core.metadata.LabelType +import com.kakao.actionbase.v2.core.types.DataType +import com.kakao.actionbase.v2.core.types.EdgeSchema +import com.kakao.actionbase.v2.core.types.Field +import com.kakao.actionbase.v2.core.types.VertexField +import com.kakao.actionbase.v2.core.types.VertexType +import com.kakao.actionbase.v2.engine.Graph +import com.kakao.actionbase.v2.engine.GraphConfig +import com.kakao.actionbase.v2.engine.client.kafka.impl.DefaultKafkaClientFactory +import com.kakao.actionbase.v2.engine.client.web.impl.DefaultWebClientFactory +import com.kakao.actionbase.v2.engine.entity.EntityName +import com.kakao.actionbase.v2.engine.entity.LabelEntity +import com.kakao.actionbase.v2.engine.label.EdgeOperationStatus +import com.kakao.actionbase.v2.engine.metadata.StorageType +import com.kakao.actionbase.v2.engine.service.ddl.DdlStatus +import com.kakao.actionbase.v2.engine.service.ddl.ServiceCreateRequest +import com.kakao.actionbase.v2.engine.service.ddl.StorageCreateRequest +import com.kakao.actionbase.v2.engine.test.cdc.InMemoryCdcFactory +import com.kakao.actionbase.v2.engine.test.wal.InMemoryWalFactory + +import java.nio.file.Path +import java.util.UUID + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir + +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper + +import io.kotest.matchers.shouldBe +import reactor.kotlin.test.test + +class SlateDbHashLabelTest { + @TempDir + lateinit var tempDir: Path + + private lateinit var graph: Graph + private val serviceName = "slatedb_test_service" + private val storageName = "slatedb_storage" + private val labelName = "slatedb_label" + + private fun createGraph(): Graph { + val config = + GraphConfig + .Builder() + .withMetastoreUrl("jdbc:h2:mem:${UUID.randomUUID()};DB_CLOSE_DELAY=-1;MODE=MYSQL") + .build() + return Graph.create(config, InMemoryWalFactory, InMemoryCdcFactory, DefaultKafkaClientFactory, DefaultWebClientFactory) + } + + @BeforeEach + fun setUp() { + graph = createGraph() + graph.updateAllMetadata().block() + + // Create service + graph.serviceDdl + .create(EntityName.fromOrigin(serviceName), ServiceCreateRequest(desc = "test service")) + .test() + .assertNext { it.status shouldBe DdlStatus.Status.CREATED } + .verifyComplete() + + // Create SlateDB storage + val conf = + jacksonObjectMapper().createObjectNode().apply { + put("path", "test-data") + put("url", "file://${tempDir.toAbsolutePath()}") + } + + graph.storageDdl + .create(EntityName.fromOrigin(storageName), StorageCreateRequest(desc = "slatedb storage", type = StorageType.SLATEDB, conf = conf)) + .test() + .assertNext { it.status shouldBe DdlStatus.Status.CREATED } + .verifyComplete() + + // Create label with SlateDB storage + val labelEntity = + LabelEntity( + active = true, + name = EntityName(serviceName, labelName), + desc = "test slatedb label", + type = LabelType.HASH, + schema = + EdgeSchema( + VertexField(VertexType.LONG), + VertexField(VertexType.LONG), + listOf( + Field("score", DataType.LONG, false), + Field("memo", DataType.STRING, true), + ), + ), + dirType = DirectionType.OUT, + storage = storageName, + ) + + graph.labelDdl + .create(labelEntity.name, labelEntity.toCreateRequest()) + .test() + .assertNext { it.status shouldBe DdlStatus.Status.CREATED } + .verifyComplete() + + graph.updateAllMetadata().block() + } + + @AfterEach + fun tearDown() { + graph.close() + } + + @Test + fun `insert and get edge`() { + val label = graph.getLabel(EntityName(serviceName, labelName)) + val edge = + com.kakao.actionbase.v2.core.edge.Edge( + System.currentTimeMillis(), + 100L, + 200L, + mapOf("score" to 42L, "memo" to "hello"), + ) + + // Insert + label + .mutate(edge.toTraceEdge(), EdgeOperation.INSERT) + .test() + .assertNext { context -> + context.status shouldBe EdgeOperationStatus.CREATED + }.verifyComplete() + + // Get + graph + .queryGet( + EntityName(serviceName, labelName), + 100L, + 200L, + ).test() + .assertNext { df -> + df.rows.size shouldBe 1 + // tgt is at index 1 in the schema (src, tgt, ts, ...) + val row = df.toRowWithSchema().first() + row.getLong("tgt") shouldBe 200L + }.verifyComplete() + } + + @Test + fun `delete edge`() { + val label = graph.getLabel(EntityName(serviceName, labelName)) + val edge = + com.kakao.actionbase.v2.core.edge.Edge( + System.currentTimeMillis(), + 101L, + 201L, + mapOf("score" to 100L), + ) + + // Insert + label.mutate(edge.toTraceEdge(), EdgeOperation.INSERT).block() + + // Delete + label + .mutate(edge.toTraceEdge(), EdgeOperation.DELETE) + .test() + .assertNext { context -> + context.status shouldBe EdgeOperationStatus.DELETED + }.verifyComplete() + + // Verify deleted (should return empty) + graph + .queryGet( + EntityName(serviceName, labelName), + 101L, + 201L, + ).test() + .assertNext { df -> + df.rows.size shouldBe 0 + }.verifyComplete() + } + + @Test + fun `update edge`() { + val label = graph.getLabel(EntityName(serviceName, labelName)) + val ts = System.currentTimeMillis() + + val edge1 = + com.kakao.actionbase.v2.core.edge.Edge( + ts, + 102L, + 202L, + mapOf("score" to 50L, "memo" to "original"), + ) + + val edge2 = + com.kakao.actionbase.v2.core.edge.Edge( + ts + 1, + 102L, + 202L, + mapOf("score" to 100L, "memo" to "updated"), + ) + + // Insert + label.mutate(edge1.toTraceEdge(), EdgeOperation.INSERT).block() + + // Update + label + .mutate(edge2.toTraceEdge(), EdgeOperation.INSERT) + .test() + .assertNext { context -> + context.status shouldBe EdgeOperationStatus.UPDATED + }.verifyComplete() + + // Verify updated + graph + .queryGet( + EntityName(serviceName, labelName), + 102L, + 202L, + ).test() + .assertNext { df -> + df.rows.size shouldBe 1 + val row = df.toRowWithSchema().first() + row.getString("memo") shouldBe "updated" + }.verifyComplete() + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbIndexedLabelTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbIndexedLabelTest.kt new file mode 100644 index 00000000..470ae134 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/label/slatedb/SlateDbIndexedLabelTest.kt @@ -0,0 +1,284 @@ +package com.kakao.actionbase.v2.engine.label.slatedb + +import com.kakao.actionbase.v2.core.code.Index +import com.kakao.actionbase.v2.core.code.hbase.Order +import com.kakao.actionbase.v2.core.metadata.DirectionType +import com.kakao.actionbase.v2.core.metadata.EdgeOperation +import com.kakao.actionbase.v2.core.metadata.LabelType +import com.kakao.actionbase.v2.core.types.DataType +import com.kakao.actionbase.v2.core.types.EdgeSchema +import com.kakao.actionbase.v2.core.types.Field +import com.kakao.actionbase.v2.core.types.VertexField +import com.kakao.actionbase.v2.core.types.VertexType +import com.kakao.actionbase.v2.engine.Graph +import com.kakao.actionbase.v2.engine.GraphConfig +import com.kakao.actionbase.v2.engine.client.kafka.impl.DefaultKafkaClientFactory +import com.kakao.actionbase.v2.engine.client.web.impl.DefaultWebClientFactory +import com.kakao.actionbase.v2.engine.entity.EntityName +import com.kakao.actionbase.v2.engine.entity.LabelEntity +import com.kakao.actionbase.v2.engine.label.EdgeOperationStatus +import com.kakao.actionbase.v2.engine.metadata.StorageType +import com.kakao.actionbase.v2.engine.service.ddl.DdlStatus +import com.kakao.actionbase.v2.engine.service.ddl.ServiceCreateRequest +import com.kakao.actionbase.v2.engine.service.ddl.StorageCreateRequest +import com.kakao.actionbase.v2.engine.test.cdc.InMemoryCdcFactory +import com.kakao.actionbase.v2.engine.test.wal.InMemoryWalFactory + +import java.nio.file.Path +import java.util.UUID + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir + +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper + +import io.kotest.matchers.shouldBe +import reactor.kotlin.test.test + +class SlateDbIndexedLabelTest { + @TempDir + lateinit var tempDir: Path + + private lateinit var graph: Graph + private val serviceName = "slatedb_indexed_test_service" + private val storageName = "slatedb_indexed_storage" + private val labelName = "slatedb_indexed_label" + + private fun createGraph(): Graph { + val config = + GraphConfig + .Builder() + .withMetastoreUrl("jdbc:h2:mem:${UUID.randomUUID()};DB_CLOSE_DELAY=-1;MODE=MYSQL") + .build() + return Graph.create(config, InMemoryWalFactory, InMemoryCdcFactory, DefaultKafkaClientFactory, DefaultWebClientFactory) + } + + @BeforeEach + fun setUp() { + graph = createGraph() + graph.updateAllMetadata().block() + + // Create service + graph.serviceDdl + .create(EntityName.fromOrigin(serviceName), ServiceCreateRequest(desc = "test service")) + .test() + .assertNext { it.status shouldBe DdlStatus.Status.CREATED } + .verifyComplete() + + // Create SlateDB storage + val conf = + jacksonObjectMapper().createObjectNode().apply { + put("path", "test-data") + put("url", "file://${tempDir.toAbsolutePath()}") + } + + graph.storageDdl + .create(EntityName.fromOrigin(storageName), StorageCreateRequest(desc = "slatedb indexed storage", type = StorageType.SLATEDB, conf = conf)) + .test() + .assertNext { it.status shouldBe DdlStatus.Status.CREATED } + .verifyComplete() + + // Create INDEXED label with SlateDB storage + val labelEntity = + LabelEntity( + active = true, + name = EntityName(serviceName, labelName), + desc = "test slatedb indexed label", + type = LabelType.INDEXED, + schema = + EdgeSchema( + VertexField(VertexType.LONG), + VertexField(VertexType.LONG), + listOf( + Field("score", DataType.LONG, false), + Field("memo", DataType.STRING, true), + ), + ), + dirType = DirectionType.OUT, + storage = storageName, + indices = + listOf( + Index( + "score_desc", + listOf(Index.Field("score", Order.DESC)), + "Score descending index", + ), + ), + ) + + graph.labelDdl + .create(labelEntity.name, labelEntity.toCreateRequest()) + .test() + .assertNext { it.status shouldBe DdlStatus.Status.CREATED } + .verifyComplete() + + graph.updateAllMetadata().block() + } + + @AfterEach + fun tearDown() { + graph.close() + } + + @Test + fun `insert and get edge with index`() { + val label = graph.getLabel(EntityName(serviceName, labelName)) + val edge = + com.kakao.actionbase.v2.core.edge.Edge( + System.currentTimeMillis(), + 100L, + 200L, + mapOf("score" to 42L, "memo" to "hello"), + ) + + // Insert + label + .mutate(edge.toTraceEdge(), EdgeOperation.INSERT) + .test() + .assertNext { context -> + context.status shouldBe EdgeOperationStatus.CREATED + }.verifyComplete() + + // Get + graph + .queryGet( + EntityName(serviceName, labelName), + 100L, + 200L, + ).test() + .assertNext { df -> + df.rows.size shouldBe 1 + val row = df.toRowWithSchema().first() + row.getLong("tgt") shouldBe 200L + row.getLong("score") shouldBe 42L + }.verifyComplete() + } + + @Test + fun `insert multiple edges and verify index order`() { + val label = graph.getLabel(EntityName(serviceName, labelName)) + val ts = System.currentTimeMillis() + + // Insert edges with different scores + val edges = + listOf( + com.kakao.actionbase.v2.core.edge + .Edge(ts, 100L, 201L, mapOf("score" to 10L, "memo" to "low")), + com.kakao.actionbase.v2.core.edge + .Edge(ts + 1, 100L, 202L, mapOf("score" to 50L, "memo" to "medium")), + com.kakao.actionbase.v2.core.edge + .Edge(ts + 2, 100L, 203L, mapOf("score" to 100L, "memo" to "high")), + ) + + edges.forEach { edge -> + label.mutate(edge.toTraceEdge(), EdgeOperation.INSERT).block() + } + + // Verify all edges exist + edges.forEach { edge -> + graph + .queryGet( + EntityName(serviceName, labelName), + edge.src, + edge.tgt, + ).test() + .assertNext { df -> + df.rows.size shouldBe 1 + }.verifyComplete() + } + } + + @Test + fun `delete edge removes from index`() { + val label = graph.getLabel(EntityName(serviceName, labelName)) + val edge = + com.kakao.actionbase.v2.core.edge.Edge( + System.currentTimeMillis(), + 101L, + 201L, + mapOf("score" to 100L), + ) + + // Insert + label.mutate(edge.toTraceEdge(), EdgeOperation.INSERT).block() + + // Verify exists + graph + .queryGet( + EntityName(serviceName, labelName), + 101L, + 201L, + ).test() + .assertNext { df -> + df.rows.size shouldBe 1 + }.verifyComplete() + + // Delete + label + .mutate(edge.toTraceEdge(), EdgeOperation.DELETE) + .test() + .assertNext { context -> + context.status shouldBe EdgeOperationStatus.DELETED + }.verifyComplete() + + // Verify deleted + graph + .queryGet( + EntityName(serviceName, labelName), + 101L, + 201L, + ).test() + .assertNext { df -> + df.rows.size shouldBe 0 + }.verifyComplete() + } + + @Test + fun `update edge updates index`() { + val label = graph.getLabel(EntityName(serviceName, labelName)) + val ts = System.currentTimeMillis() + + val edge1 = + com.kakao.actionbase.v2.core.edge.Edge( + ts, + 102L, + 202L, + mapOf("score" to 50L, "memo" to "original"), + ) + + val edge2 = + com.kakao.actionbase.v2.core.edge.Edge( + ts + 1, + 102L, + 202L, + mapOf("score" to 100L, "memo" to "updated"), + ) + + // Insert + label.mutate(edge1.toTraceEdge(), EdgeOperation.INSERT).block() + + // Update + label + .mutate(edge2.toTraceEdge(), EdgeOperation.INSERT) + .test() + .assertNext { context -> + context.status shouldBe EdgeOperationStatus.UPDATED + }.verifyComplete() + + // Verify updated + graph + .queryGet( + EntityName(serviceName, labelName), + 102L, + 202L, + ).test() + .assertNext { df -> + df.rows.size shouldBe 1 + val row = df.toRowWithSchema().first() + row.getLong("score") shouldBe 100L + row.getString("memo") shouldBe "updated" + }.verifyComplete() + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbStorageTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbStorageTest.kt new file mode 100644 index 00000000..54799982 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbStorageTest.kt @@ -0,0 +1,64 @@ +package com.kakao.actionbase.v2.engine.storage.slatedb + +import com.kakao.actionbase.v2.engine.Graph +import com.kakao.actionbase.v2.engine.GraphConfig +import com.kakao.actionbase.v2.engine.client.kafka.impl.DefaultKafkaClientFactory +import com.kakao.actionbase.v2.engine.client.web.impl.DefaultWebClientFactory +import com.kakao.actionbase.v2.engine.entity.EntityName +import com.kakao.actionbase.v2.engine.metadata.StorageType +import com.kakao.actionbase.v2.engine.test.GraphFixtures +import com.kakao.actionbase.v2.engine.test.cdc.InMemoryCdcFactory +import com.kakao.actionbase.v2.engine.test.wal.InMemoryWalFactory + +import java.util.UUID + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper + +import reactor.kotlin.test.test + +class SlateDbStorageTest { + private lateinit var graph: Graph + + private fun createGraph(): Graph { + val config = + GraphConfig + .Builder() + .withMetastoreUrl("jdbc:h2:mem:${UUID.randomUUID()};DB_CLOSE_DELAY=-1;MODE=MYSQL") + .build() + return Graph.create(config, InMemoryWalFactory, InMemoryCdcFactory, DefaultKafkaClientFactory, DefaultWebClientFactory) + } + + @BeforeEach + fun setUp() { + graph = createGraph() + graph.updateAllMetadata().block() + } + + @AfterEach + fun tearDown() { + graph.close() + } + + @Test + fun `create SlateDB storage entity`() { + val conf = + jacksonObjectMapper().createObjectNode().apply { + put("path", "test-data") + put("url", "file:///tmp/slatedb-test") + } + + GraphFixtures.createStorage(graph, "slatedb_test", StorageType.SLATEDB, conf) + + graph.storageDdl + .getSingle(EntityName.fromOrigin("slatedb_test")) + .test() + .assertNext { storage -> + assert(storage.type == StorageType.SLATEDB) + assert(storage.active) + }.verifyComplete() + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTableTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTableTest.kt new file mode 100644 index 00000000..c3cc9e75 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTableTest.kt @@ -0,0 +1,224 @@ +package com.kakao.actionbase.v2.engine.storage.slatedb + +import java.nio.charset.StandardCharsets +import java.nio.file.Path + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir + +import io.slatedb.SlateDb +import io.slatedb.SlateDbConfig +import reactor.test.StepVerifier + +class SlateDbTableTest { + @TempDir + lateinit var tempDir: Path + + private lateinit var table: SlateDbTable + + @BeforeEach + fun setUp() { + SlateDb.initLogging(SlateDbConfig.LogLevel.INFO) + + val db = + SlateDb.builder("data", "file://${tempDir.toAbsolutePath()}", null).use { builder -> + builder.withMergeOperator(incrementMergeOperator) + builder.build() + } + table = SlateDbTable.create(db) + } + + @AfterEach + fun tearDown() { + table.close() + } + + @Test + fun `put and get with reactive API`() { + val key = "hello".toByteArray(StandardCharsets.UTF_8) + val value = "world".toByteArray(StandardCharsets.UTF_8) + + StepVerifier + .create( + table.put(key, value).then(table.get(key)), + ).expectNextMatches { it != null && String(it, StandardCharsets.UTF_8) == "world" } + .verifyComplete() + } + + @Test + fun `get non-existent key returns empty`() { + val key = "nonexistent".toByteArray(StandardCharsets.UTF_8) + + StepVerifier + .create(table.get(key)) + .verifyComplete() + } + + @Test + fun `delete removes key`() { + val key = "to-delete".toByteArray(StandardCharsets.UTF_8) + val value = "value".toByteArray(StandardCharsets.UTF_8) + + StepVerifier + .create( + table + .put(key, value) + .then(table.delete(key)) + .then(table.get(key)), + ).verifyComplete() + } + + @Test + fun `flush persists data`() { + val key = "flush-key".toByteArray(StandardCharsets.UTF_8) + val value = "flush-value".toByteArray(StandardCharsets.UTF_8) + + StepVerifier + .create( + table + .put(key, value) + .then(table.flush()) + .then(table.get(key)), + ).expectNextMatches { it != null && String(it, StandardCharsets.UTF_8) == "flush-value" } + .verifyComplete() + } + + @Test + fun `batch writes multiple keys atomically`() { + val key1 = "batch-key-1".toByteArray(StandardCharsets.UTF_8) + val value1 = "batch-value-1".toByteArray(StandardCharsets.UTF_8) + val key2 = "batch-key-2".toByteArray(StandardCharsets.UTF_8) + val value2 = "batch-value-2".toByteArray(StandardCharsets.UTF_8) + val key3 = "batch-key-3".toByteArray(StandardCharsets.UTF_8) + val value3 = "batch-value-3".toByteArray(StandardCharsets.UTF_8) + + val operations = + listOf( + BatchOperation.Put(key1, value1), + BatchOperation.Put(key2, value2), + BatchOperation.Put(key3, value3), + ) + + StepVerifier + .create( + table + .batch(operations) + .then(table.get(key1)), + ).expectNextMatches { String(it, StandardCharsets.UTF_8) == "batch-value-1" } + .verifyComplete() + + StepVerifier + .create(table.get(key2)) + .expectNextMatches { String(it, StandardCharsets.UTF_8) == "batch-value-2" } + .verifyComplete() + } + + @Test + fun `batch with put and delete`() { + val key1 = "batch-put".toByteArray(StandardCharsets.UTF_8) + val value1 = "value".toByteArray(StandardCharsets.UTF_8) + val key2 = "batch-delete".toByteArray(StandardCharsets.UTF_8) + val value2 = "to-be-deleted".toByteArray(StandardCharsets.UTF_8) + + // First, put key2 + StepVerifier + .create(table.put(key2, value2)) + .verifyComplete() + + // Then batch: put key1, delete key2 + val operations = + listOf( + BatchOperation.Put(key1, value1), + BatchOperation.Delete(key2), + ) + + StepVerifier + .create( + table + .batch(operations) + .then(table.get(key1)), + ).expectNextMatches { String(it, StandardCharsets.UTF_8) == "value" } + .verifyComplete() + + // key2 should be deleted + StepVerifier + .create(table.get(key2)) + .verifyComplete() + } + + // -- merge operator tests (degree counting use case) -- + + @Test + fun `merge on non-existent key initializes from zero`() { + val key = "degree:user:1".toByteArray(StandardCharsets.UTF_8) + + StepVerifier + .create( + table.merge(key, 1L.toSlateBytes()).then(table.get(key)), + ).expectNextMatches { it.toLong() == 1L } + .verifyComplete() + } + + @Test + fun `sequential merges accumulate — edge insert and delete`() { + val key = "degree:user:2".toByteArray(StandardCharsets.UTF_8) + + // 3 edges inserted + StepVerifier + .create( + table + .merge(key, 1L.toSlateBytes()) + .then(table.merge(key, 1L.toSlateBytes())) + .then(table.merge(key, 1L.toSlateBytes())) + .then(table.get(key)), + ).expectNextMatches { it.toLong() == 3L } + .verifyComplete() + + // 1 edge deleted + StepVerifier + .create( + table.merge(key, (-1L).toSlateBytes()).then(table.get(key)), + ).expectNextMatches { it.toLong() == 2L } + .verifyComplete() + } + + @Test + fun `merge survives flush — degree persists across memtable rotation`() { + val key = "degree:user:3".toByteArray(StandardCharsets.UTF_8) + + StepVerifier + .create( + table + .merge(key, 5L.toSlateBytes()) + .then(table.flush()) + .then(table.merge(key, 3L.toSlateBytes())) + .then(table.get(key)), + ).expectNextMatches { it.toLong() == 8L } + .verifyComplete() + } + + @Test + fun `batch increment adds delta to value`() { + val key = "counter".toByteArray(StandardCharsets.UTF_8) + + // Increment non-existent key (starts at 0) + StepVerifier + .create( + table + .batch(listOf(BatchOperation.Increment(key, 5))) + .then(table.get(key)), + ).expectNextMatches { it.toLong() == 5L } + .verifyComplete() + + // Increment existing key + StepVerifier + .create( + table + .batch(listOf(BatchOperation.Increment(key, 3))) + .then(table.get(key)), + ).expectNextMatches { it.toLong() == 8L } + .verifyComplete() + } +} diff --git a/server/src/main/resources/application-slatedb.yaml b/server/src/main/resources/application-slatedb.yaml new file mode 100644 index 00000000..9c2537c6 --- /dev/null +++ b/server/src/main/resources/application-slatedb.yaml @@ -0,0 +1,15 @@ +# SlateDB storage profile +# Usage: --spring.profiles.active=slatedb +# +# Native library is bundled in the slatedb JAR (loaded automatically from classpath). + +actionbase: + tenant: ab-slatedb + +kc: + graph: + defaultStorage: + type: SLATEDB + conf: + path: data + url: file://${SLATEDB_STORAGE_PATH:/tmp/actionbase-slatedb}