Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import taboolib.expansion.lettuce.IRedisClient
import taboolib.expansion.lettuce.cluster.IRedisClusterCommand
import taboolib.expansion.lettuce.cluster.IRedisClusterPubSub
import java.util.concurrent.CompletableFuture
import kotlin.collections.plusAssign
import kotlin.time.toJavaDuration

@Suppress("DuplicatedCode")
Expand All @@ -43,10 +44,10 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie
val resource = DefaultClientResources.builder()

if (redisConfig.ioThreadPoolSize != 0) {
resource.ioThreadPoolSize(4)
resource.ioThreadPoolSize(redisConfig.ioThreadPoolSize)
}
if (redisConfig.computationThreadPoolSize != 0) {
resource.computationThreadPoolSize(4)
resource.computationThreadPoolSize(redisConfig.computationThreadPoolSize)
}

val cluster = redisConfig.cluster
Expand Down Expand Up @@ -78,6 +79,7 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie

resources = resource.build()
client = RedisClusterClient.create(resources, uris)
client.setOptions(clientOptions.build())

// 连接 pub/sub 通道
pubSubConnection = client.connectPubSub()
Expand All @@ -99,14 +101,25 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie
v.readFrom = slaves.readFrom
}
} },
redisConfig.asyncPool.asyncClusterPoolConfig()
redisConfig.asyncPool.poolConfig()
).thenAccept {
asyncPool = it
completableFuture.complete(null)
}
if (autoRelease) {
LettuceRedis.clusterClients += this
}
return completableFuture
}

override fun stop() {
pubSubConnection.close()
asyncPool.close()
pool.close()
client.shutdown()
resources.shutdown()
}

override fun <T> useCommands(block: (RedisClusterCommands<String, String>) -> T): T? {
return useConnection {
block(it.sync())
Expand Down Expand Up @@ -142,6 +155,7 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie
use: ((StatefulRedisConnection<String, String>) -> T)?,
useCluster: ((StatefulRedisClusterConnection<String, String>) -> T)?
): T? {
if (useCluster == null) return null
val connection = try {
pool.borrowObject()
} catch (e: Exception) {
Expand All @@ -155,7 +169,7 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie
}

return try {
useCluster!!(connection)
useCluster(connection)
} catch (e: Exception) {
warning(
"""
Expand All @@ -174,10 +188,11 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie
use: ((StatefulRedisConnection<String, String>) -> T)?,
useCluster: ((StatefulRedisClusterConnection<String, String>) -> T)?
): CompletableFuture<T?> {
if (useCluster == null) return CompletableFuture.completedFuture(null)
return try {
asyncPool.acquire().thenApply { connection ->
try {
useCluster!!(connection)
useCluster(connection)
} catch (e: Exception) {
warning(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,10 @@ object LettuceRedis {
@Awake(LifeCycle.DISABLE)
internal fun stop() {
clients.forEach {
if (it.enabledSlaves) {
it.masterAsyncReplicaPool.close()
it.masterReplicaPool.close()
} else {
it.asyncPool.close()
it.pool.close()
}
it.client.shutdown()
it.resources.shutdown()
it.stop()
}
clusterClients.forEach {
it.asyncPool.close()
it.pool.close()
it.client.shutdown()
it.resources.shutdown()
it.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
val resource = DefaultClientResources.builder()

if (redisConfig.ioThreadPoolSize != 0) {
resource.ioThreadPoolSize(4)
resource.ioThreadPoolSize(redisConfig.ioThreadPoolSize)
}
if (redisConfig.computationThreadPoolSize != 0) {
resource.computationThreadPoolSize(4)
resource.computationThreadPoolSize(redisConfig.computationThreadPoolSize)
}

val clientOptions = ClientOptions.builder()
Expand Down Expand Up @@ -86,7 +86,7 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
{ MasterReplica.connectAsync(client, StringCodec.UTF8, uri).whenComplete { v, _ ->
v.readFrom = slaves.readFrom
} },
redisConfig.asyncPool.asyncSlavesPoolConfig()
redisConfig.asyncPool.poolConfig()
).thenAccept {
masterAsyncReplicaPool = it
completableFuture.complete(null)
Expand All @@ -100,15 +100,31 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
// 连接异步
AsyncConnectionPoolSupport.createBoundedObjectPoolAsync(
{ client.connectAsync(StringCodec.UTF8, uri) },
redisConfig.asyncPool.asyncPoolConfig()
redisConfig.asyncPool.poolConfig()
).thenAccept {
asyncPool = it
completableFuture.complete(null)
}
}
if (autoRelease) {
LettuceRedis.clients += this
}
return completableFuture
}

override fun stop() {
pubSubConnection.close()
if (enabledSlaves) {
masterAsyncReplicaPool.close()
masterReplicaPool.close()
} else {
asyncPool.close()
pool.close()
}
client.shutdown()
resources.shutdown()
}

override fun <T> useCommands(block: (RedisCommands<String, String>) -> T): T? {
return useConnection(
{ block(it.sync()) }
Expand Down Expand Up @@ -144,6 +160,7 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
use: ((StatefulRedisConnection<String, String>) -> T)?,
useCluster: ((StatefulRedisClusterConnection<String, String>) -> T)?
): T? {
if (use == null) return null
return if (enabledSlaves) {
val connection = try {
masterReplicaPool.borrowObject()
Expand All @@ -158,7 +175,7 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
}

try {
use!!(connection)
use(connection)
} catch (e: Exception) {
warning(
"""
Expand All @@ -184,7 +201,7 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
}

try {
use!!(connection)
use(connection)
} catch (e: Exception) {
warning(
"""
Expand All @@ -204,11 +221,12 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
use: ((StatefulRedisConnection<String, String>) -> T)?,
useCluster: ((StatefulRedisClusterConnection<String, String>) -> T)?
): CompletableFuture<T?> {
if (use == null) return CompletableFuture.completedFuture(null)
return if (enabledSlaves) {
try {
masterAsyncReplicaPool.acquire().thenApply { obj ->
try {
use!!(obj)
use(obj)
} catch (e: Throwable) {
warning(
"""
Expand All @@ -234,7 +252,7 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
try {
asyncPool.acquire().thenApply { obj ->
try {
use!!(obj)
use(obj)
} catch (e: Throwable) {
warning(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,31 @@ import java.io.File
import kotlin.time.Duration
import kotlin.time.toJavaDuration

private fun parseDuration(value: String?, fieldName: String): Duration {
if (value == null) error("$fieldName must be set")
return try {
Duration.parse(value)
} catch (e: IllegalArgumentException) {
error("Invalid duration format for $fieldName: '$value'. Expected ISO-8601 duration (e.g., 'PT10S' for 10 seconds, 'PT1M' for 1 minute)")
}
}

private fun parseDurationOrNull(value: String?, fieldName: String): Duration? {
if (value == null) return null
return try {
Duration.parse(value)
} catch (e: IllegalArgumentException) {
error("Invalid duration format for $fieldName: '$value'. Expected ISO-8601 duration (e.g., 'PT10S' for 10 seconds, 'PT1M' for 1 minute)")
}
}

class LettuceRedisConfig(val configurationSection: ConfigurationSection) {

val host = configurationSection.getString("host") ?: error("host must be set")
val port = configurationSection.getInt("port", 6379)
val password = configurationSection.getString("password")
val ssl = configurationSection.getBoolean("ssl")
val timeout = Duration.parse(configurationSection.getString("timeout") ?: error("timeout must be set"))
val timeout = parseDuration(configurationSection.getString("timeout"), "timeout")
val database = configurationSection.getInt("database", 0)

val ioThreadPoolSize = configurationSection.getInt("ioThreadPoolSize")
Expand All @@ -33,38 +51,36 @@ class LettuceRedisConfig(val configurationSection: ConfigurationSection) {
val pingBeforeActivateConnection = configurationSection.getBoolean("pingBeforeActivateConnection", true)

val sslOptions: SslOptions by lazy {
val truststoreFile = configurationSection.getString("truststoreFile") ?: "default.jks"
val password = configurationSection.getString("truststorePassword")
SslOptions.builder()
.jdkSslProvider()
.truststore(File(getDataFolder(), "default.jks"), password)
.truststore(File(getDataFolder(), truststoreFile), password)
.build()
}

val pool = Pool(configurationSection.getConfigurationSection("pool")!!)
val pool = Pool(configurationSection.getConfigurationSection("pool"))

class Pool(configurationSection: ConfigurationSection) {
class Pool(configurationSection: ConfigurationSection?) {

val lifo = configurationSection.getBoolean("lifo", true)
val fairness = configurationSection.getBoolean("fairness", false)
val lifo = configurationSection?.getBoolean("lifo", true) ?: true
val fairness = configurationSection?.getBoolean("fairness", false) ?: false

val maxTotal = configurationSection.getInt("maxTotal", 8)
val maxIdle = configurationSection.getInt("maxIdle", 8)
val minIdle = configurationSection.getInt("minIdle", 0)
val maxTotal = configurationSection?.getInt("maxTotal", 8) ?: 8
val maxIdle = configurationSection?.getInt("maxIdle", 8) ?: 8
val minIdle = configurationSection?.getInt("minIdle", 0) ?: 0

val testOnCreate = configurationSection.getBoolean("testOnCreate", false)
val testOnBorrow = configurationSection.getBoolean("testOnCreate", false)
val testOnReturn = configurationSection.getBoolean("testOnCreate", false)
val testWhileIdle = configurationSection.getBoolean("testOnCreate", false)
val testOnCreate = configurationSection?.getBoolean("testOnCreate", false) ?: false
val testOnBorrow = configurationSection?.getBoolean("testOnBorrow", false) ?: false
val testOnReturn = configurationSection?.getBoolean("testOnReturn", false) ?: false
val testWhileIdle = configurationSection?.getBoolean("testWhileIdle", false) ?: false

val maxWaitDuration = configurationSection.getString("maxWaitDuration")?.let { Duration.parse(it) }
val blockWhenExhausted = configurationSection.getBoolean("blockWhenExhausted", true)
val timeBetweenEvictionRuns =
configurationSection.getString("timeBetweenEvictionRuns")?.let { Duration.parse(it) }
val minEvictableIdleDuration =
configurationSection.getString("minEvictableIdleDuration")?.let { Duration.parse(it) }
val softMinEvictableIdleDuration =
configurationSection.getString("softMinEvictableIdleDuration")?.let { Duration.parse(it) }
val numTestsPerEvictionRun = configurationSection.getInt("numTestsPerEvictionRun", 3)
val maxWaitDuration = parseDurationOrNull(configurationSection?.getString("maxWaitDuration"), "pool.maxWaitDuration")
val blockWhenExhausted = configurationSection?.getBoolean("blockWhenExhausted", true) ?: true
val timeBetweenEvictionRuns = parseDurationOrNull(configurationSection?.getString("timeBetweenEvictionRuns"), "pool.timeBetweenEvictionRuns")
val minEvictableIdleDuration = parseDurationOrNull(configurationSection?.getString("minEvictableIdleDuration"), "pool.minEvictableIdleDuration")
val softMinEvictableIdleDuration = parseDurationOrNull(configurationSection?.getString("softMinEvictableIdleDuration"), "pool.softMinEvictableIdleDuration")
val numTestsPerEvictionRun = configurationSection?.getInt("numTestsPerEvictionRun", 3) ?: 3

fun poolConfig(): GenericObjectPoolConfig<StatefulRedisConnection<String, String>> {
return GenericObjectPoolConfig<StatefulRedisConnection<String, String>>().apply {
Expand Down Expand Up @@ -136,35 +152,19 @@ class LettuceRedisConfig(val configurationSection: ConfigurationSection) {
}
}

val asyncPool = AsyncPool(configurationSection.getConfigurationSection("asyncPool")!!)
val asyncPool = AsyncPool(configurationSection.getConfigurationSection("asyncPool"))

class AsyncPool(configurationSection: ConfigurationSection) {
class AsyncPool(configurationSection: ConfigurationSection?) {

val maxTotal = configurationSection.getInt("maxTotal", 8)
val maxIdle = configurationSection.getInt("maxIdle", 8)
val minIdle = configurationSection.getInt("minIdle", 0)

fun asyncPoolConfig(): BoundedPoolConfig {
return BoundedPoolConfig.builder()
.maxTotal([email protected])
.maxIdle([email protected])
.minIdle([email protected])
.build()
}

fun asyncClusterPoolConfig(): BoundedPoolConfig {
return BoundedPoolConfig.builder()
.maxTotal([email protected])
.maxIdle([email protected])
.minIdle([email protected])
.build()
}
val maxTotal = configurationSection?.getInt("maxTotal", 8) ?: 8
val maxIdle = configurationSection?.getInt("maxIdle", 8) ?: 8
val minIdle = configurationSection?.getInt("minIdle", 0) ?: 0

fun asyncSlavesPoolConfig(): BoundedPoolConfig {
fun poolConfig(): BoundedPoolConfig {
return BoundedPoolConfig.builder()
.maxTotal(this@AsyncPool.maxTotal)
.maxIdle(this@AsyncPool.maxIdle)
.minIdle(this@AsyncPool.minIdle)
.maxTotal(maxTotal)
.maxIdle(maxIdle)
.minIdle(minIdle)
.build()
}
}
Expand All @@ -177,8 +177,10 @@ class LettuceRedisConfig(val configurationSection: ConfigurationSection) {

val masterId = configurationSection.getString("masterId") ?: error("masterId must be set")

val nodes = configurationSection.getStringList("nodes").map {
Node(it.split(":")[0], it.split(":")[1].toInt())
val nodes = configurationSection.getStringList("nodes").map { node ->
val parts = node.split(":")
require(parts.size == 2) { "Invalid sentinel node format: '$node'. Expected 'host:port'" }
Node(parts[0], parts[1].toIntOrNull() ?: error("Invalid port in sentinel node: '$node'"))
}

class Node(val host: String, val port: Int)
Expand Down Expand Up @@ -218,7 +220,7 @@ class LettuceRedisConfig(val configurationSection: ConfigurationSection) {
val port = configurationSection.getInt("port", 6379)
val password = configurationSection.getString("password")
val ssl = configurationSection.getBoolean("ssl")
val timeout = Duration.parse(configurationSection.getString("timeout") ?: error("timeout must be set"))
val timeout = parseDuration(configurationSection.getString("timeout"), "cluster node timeout")
val database = configurationSection.getInt("database", 0)

// sentinel
Expand All @@ -229,8 +231,10 @@ class LettuceRedisConfig(val configurationSection: ConfigurationSection) {

val masterId = configurationSection.getString("masterId") ?: error("masterId must be set")

val nodes = configurationSection.getStringList("nodes").map {
Node(it.split(":")[0], it.split(":")[1].toInt())
val nodes = configurationSection.getStringList("nodes").map { node ->
val parts = node.split(":")
require(parts.size == 2) { "Invalid sentinel node format: '$node'. Expected 'host:port'" }
Node(parts[0], parts[1].toIntOrNull() ?: error("Invalid port in sentinel node: '$node'"))
}

class Node(val host: String, val port: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@ interface IRedisClient {
* @param autoRelease 关服是否自动释放
* */
fun start(autoRelease: Boolean = true): CompletableFuture<Void>

/**
* 结束 Redis 客户端
*
* 如果没开启 autoRelease 必须关服前调用
* */
fun stop()
}
Loading
Loading