Skip to content

Commit

Permalink
Merge pull request #31 from scalableminds/fix-columnFamily-options
Browse files Browse the repository at this point in the history
Fix column family options
  • Loading branch information
fm3 authored Sep 11, 2019
2 parents 6ee35cd + 0f6c94c commit 28687a6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
8 changes: 8 additions & 0 deletions src/main/protobuf/fossildbapi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ message RestoreFromBackupReply {
optional string errorMessage = 2;
}

message CompactAllDataRequest {}

message CompactAllDataReply {
required bool success = 1;
optional string errorMessage = 2;
}


service FossilDB {
rpc Health (HealthRequest) returns (HealthReply) {}
Expand All @@ -143,4 +150,5 @@ service FossilDB {
rpc ListVersions (ListVersionsRequest) returns (ListVersionsReply) {}
rpc Backup (BackupRequest) returns (BackupReply) {}
rpc RestoreFromBackup (RestoreFromBackupRequest) returns (RestoreFromBackupReply) {}
rpc CompactAllData (CompactAllDataRequest) returns (CompactAllDataReply) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
RestoreFromBackupReply(true)
} {errorMsg => RestoreFromBackupReply(false, errorMsg)}

override def compactAllData(req: CompactAllDataRequest) = withExceptionHandler(req) {
storeManager.compactAllData
CompactAllDataReply(true)
} {errorMsg => CompactAllDataReply(false, errorMsg)}

private def withExceptionHandler [T, R <: GeneratedMessage](request: R)(tryBlock: => T)(onErrorBlock: Option[String] => T): Future[T] = {
try {
Expand Down
35 changes: 18 additions & 17 deletions src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat

val (db: RocksDB, columnFamilyHandles) = {
RocksDB.loadLibrary()
val columnOptions = new ColumnFamilyOptions()
.setArenaBlockSize(4 * 1024 * 1024) // 4MB
.setTargetFileSizeBase(1024 * 1024 * 1024) // 1GB
.setMaxBytesForLevelBase(10 * 1024 * 1024 * 1024) // 10GB
val columnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).map { columnFamily =>
new ColumnFamilyDescriptor(columnFamily, columnOptions)
}
val columnFamilyHandles = new util.ArrayList[ColumnFamilyHandle]
var options = new DBOptions()
var cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer()
optionsFilePathOpt.map { optionsFilePath =>
val options = new DBOptions()
val cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer()
optionsFilePathOpt.foreach { optionsFilePath =>
try {
org.rocksdb.OptionsUtil.loadOptionsFromFile(optionsFilePath, Env.getDefault, options, cfListRef.asJava)
logger.info("successfully loaded rocksdb options from " + optionsFilePath)
Expand All @@ -41,10 +33,12 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
}
}
}
options = options
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true)
options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
val defaultColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(new ColumnFamilyOptions())
val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions))
val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors
logger.info("Opening RocksDB at " + dataDir.toAbsolutePath)
val columnFamilyHandles = new util.ArrayList[ColumnFamilyHandle]
val db = RocksDB.open(
options,
dataDir.toAbsolutePath.toString,
Expand All @@ -61,7 +55,7 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
if (!Files.exists(backupDir) || !Files.isDirectory(backupDir))
Files.createDirectories(backupDir)

RocksDB.loadLibrary
RocksDB.loadLibrary()
val backupEngine = BackupEngine.open(Env.getDefault, new BackupableDBOptions(backupDir.toString))
backupEngine.createNewBackup(db)
backupEngine.purgeOldBackups(1)
Expand All @@ -71,12 +65,19 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
def restoreFromBackup(backupDir: Path) = {
logger.info("Restoring from backup. RocksDB temporarily unavailable")
close()
RocksDB.loadLibrary
RocksDB.loadLibrary()
val backupEngine = BackupEngine.open(Env.getDefault, new BackupableDBOptions(backupDir.toString))
backupEngine.restoreDbFromLatestBackup(dataDir.toString, dataDir.toString, new RestoreOptions(true))
logger.info("Restoring from backup complete. Reopening RocksDB")
}

def compactAllData() = {
logger.info("Compacting all data")
RocksDB.loadLibrary()
db.compactRange()
logger.info("All data has been compacted to last level containing data")
}

def close(): Future[Unit] = {
logger.info("Closing RocksDB handle")
Future.successful(db.close())
Expand All @@ -100,7 +101,7 @@ class RocksDBIterator(it: RocksIterator, prefix: Option[String]) extends Iterato
override def hasNext: Boolean = it.isValid && prefix.forall(it.key().startsWith(_))

override def next: KeyValuePair[Array[Byte]] = {
val value = KeyValuePair(new String(it.key().map(_.toChar)) , it.value())
val value = KeyValuePair(new String(it.key().map(_.toChar)), it.value())
it.next()
value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String],
}
}

def compactAllData = {
failDuringBackup
failDuringRestore
try {
rocksDBManager.get.compactAllData()
}
}

def close = {
rocksDBManager.map(_.close)
}
Expand Down

0 comments on commit 28687a6

Please sign in to comment.