Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions module/database/database-lettuce-redis/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
// LettuceGithub: https://github.com/lettuce-io/lettuce-core

dependencies {
compileOnly("io.lettuce:lettuce-core:6.6.0.RELEASE")
// 使用 api 传递依赖
api("io.lettuce:lettuce-core:7.2.1.RELEASE")
compileOnly("org.apache.commons:commons-pool2:2.12.1")

compileOnly(project(":common"))
compileOnly(project(":common-env"))
compileOnly(project(":common-util"))
Expand All @@ -15,9 +17,11 @@ dependencies {

tasks {
withType<ShadowJar> {
relocate("org.reactivestreams.", "org.reactivestreams_1_0_4.")
relocate("reactor.", "reactor_3_6_6.")
relocate("io.netty.", "io.netty_4_2_5_final.")
relocate("org.apache.commons.pool2.", "org.apache.commons.pool2_2_12_1.")
relocate("io.netty.", "io.netty._4_1_107_final.")
relocate("reactor.", "reactor_3_6_6.")
relocate("org.reactivestreams.", "org.reactivestreams_1_0_4.")
relocate("org.slf4j.", "org.slf4j_1_7_36.")
relocate("redis.clients.authentication.", "redis.clients.authentication_0_1_1_beta2.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie
lateinit var pubSubConnection: StatefulRedisClusterPubSubConnection<String, String>
lateinit var resources: DefaultClientResources

@OptIn(ExperimentalStdlibApi::class)
override fun start(autoRelease: Boolean): CompletableFuture<Void> {
val completableFuture = CompletableFuture<Void>()
val resource = DefaultClientResources.builder()
Expand All @@ -63,11 +64,25 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie

val topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(cluster.enablePeriodicRefresh)
.enableAdaptiveRefreshTrigger(*cluster.enableAdaptiveRefreshTrigger.toTypedArray())
.refreshTriggersReconnectAttempts(cluster.refreshTriggersReconnectAttempts)
.dynamicRefreshSources(cluster.dynamicRefreshSources)
.closeStaleConnections(cluster.closeStaleConnections)

// Lettuce 7.0+ 默认启用所有自适应触发器,需要禁用未配置的触发器
val configuredTriggers = cluster.enableAdaptiveRefreshTrigger.toSet()
if (configuredTriggers.isEmpty()) {
// 如果未配置任何触发器,禁用所有
topologyRefreshOptions.disableAllAdaptiveRefreshTriggers()
} else {
// 禁用未配置的触发器
val triggersToDisable = ClusterTopologyRefreshOptions.RefreshTrigger.entries
.filter { it !in configuredTriggers }
.toTypedArray()
if (triggersToDisable.isNotEmpty()) {
topologyRefreshOptions.disableAdaptiveRefreshTrigger(*triggersToDisable)
}
}

cluster.adaptiveRefreshTriggersTimeout?.toJavaDuration()?.let { topologyRefreshOptions.adaptiveRefreshTriggersTimeout(it) }
cluster.refreshPeriod?.toJavaDuration()?.let { topologyRefreshOptions.refreshPeriod(it) }
clientOptions
Expand Down Expand Up @@ -112,6 +127,89 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie
return completableFuture
}

@OptIn(ExperimentalStdlibApi::class)
override fun startSync(autoRelease: Boolean) {
val resource = DefaultClientResources.builder()

if (redisConfig.ioThreadPoolSize != 0) {
resource.ioThreadPoolSize(redisConfig.ioThreadPoolSize)
}
if (redisConfig.computationThreadPoolSize != 0) {
resource.computationThreadPoolSize(redisConfig.computationThreadPoolSize)
}

val cluster = redisConfig.cluster

val uris = cluster.nodes.map {
it.redisURIBuilder().build()
}
val clientOptions = ClusterClientOptions.builder()

if (redisConfig.ssl) {
clientOptions.sslOptions(redisConfig.sslOptions)
}

val topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(cluster.enablePeriodicRefresh)
.refreshTriggersReconnectAttempts(cluster.refreshTriggersReconnectAttempts)
.dynamicRefreshSources(cluster.dynamicRefreshSources)
.closeStaleConnections(cluster.closeStaleConnections)

// Lettuce 7.0+ 默认启用所有自适应触发器,需要禁用未配置的触发器
val configuredTriggers = cluster.enableAdaptiveRefreshTrigger.toSet()
if (configuredTriggers.isEmpty()) {
// 如果未配置任何触发器,禁用所有
topologyRefreshOptions.disableAllAdaptiveRefreshTriggers()
} else {
// 禁用未配置的触发器
val triggersToDisable = ClusterTopologyRefreshOptions.RefreshTrigger.entries
.filter { it !in configuredTriggers }
.toTypedArray()
if (triggersToDisable.isNotEmpty()) {
topologyRefreshOptions.disableAdaptiveRefreshTrigger(*triggersToDisable)
}
}

cluster.adaptiveRefreshTriggersTimeout?.toJavaDuration()?.let { topologyRefreshOptions.adaptiveRefreshTriggersTimeout(it) }
cluster.refreshPeriod?.toJavaDuration()?.let { topologyRefreshOptions.refreshPeriod(it) }
clientOptions
.topologyRefreshOptions(topologyRefreshOptions.build())
.autoReconnect(redisConfig.autoReconnect)
.maxRedirects(cluster.maxRedirects)
.validateClusterNodeMembership(cluster.validateClusterNodeMembership)
.pingBeforeActivateConnection(redisConfig.pingBeforeActivateConnection)

resources = resource.build()
client = RedisClusterClient.create(resources, uris)
client.setOptions(clientOptions.build())

// 连接 pub/sub 通道
pubSubConnection = client.connectPubSub()
// 连接同步
pool = ConnectionPoolSupport.createGenericObjectPool(
{ client.connect().apply {
if (redisConfig.enableSlaves) {
val slaves = redisConfig.slaves
readFrom = slaves.readFrom
}
} },
redisConfig.pool.clusterPoolConfig()
)
// 连接异步(同步方式创建)
asyncPool = AsyncConnectionPoolSupport.createBoundedObjectPool(
{ client.connectAsync(StringCodec.UTF8).whenComplete { v, _ ->
if (redisConfig.enableSlaves) {
val slaves = redisConfig.slaves
v.readFrom = slaves.readFrom
}
} },
redisConfig.asyncPool.poolConfig()
)
if (autoRelease) {
LettuceRedis.clusterClients += this
}
}

override fun stop() {
pubSubConnection.close()
asyncPool.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,72 +9,99 @@ import taboolib.common.platform.Awake
@Inject
@RuntimeDependencies(
RuntimeDependency(
"!io.lettuce:lettuce-core:6.6.0.RELEASE",
"!io.lettuce:lettuce-core:7.2.1.RELEASE",
test = "!io.lettuce.core.RedisURI",
relocate = ["!io.netty", "!io.netty_4_1_118_final",
relocate = ["!io.netty", "!io.netty_4_2_5_final",
"!org.apache.commons.pool2", "!org.apache.commons.pool2_2_12_1",
"!reactor", "!reactor_3_6_6",
"!org.reactivestreams", "!org.reactivestreams_1_0_4"],
"!org.reactivestreams", "!org.reactivestreams_1_0_4",
"!org.slf4j", "!org.slf4j_1_7_36",
"!redis.clients.authentication", "!redis.clients.authentication_0_1_1_beta2"],
transitive = false
),
RuntimeDependency(
"!org.reactivestreams:reactive-streams:1.0.4",
test = "!org.reactivestreams_1_0_4.Publisher",
relocate = ["!org.reactivestreams", "!org.reactivestreams_1_0_4"],
"!org.apache.commons:commons-pool2:2.12.1",
test = "!org.apache.commons.pool2_2_12_1.BaseObject",
relocate = ["!org.apache.commons.pool2", "!org.apache.commons.pool2_2_12_1"],
transitive = false
),
RuntimeDependency(
"!io.projectreactor:reactor-core:3.6.6",
test = "!reactor_3_6_6.core.CorePublisher",
relocate = ["!reactor", "!reactor_3_6_6", "!org.reactivestreams", "!org.reactivestreams_1_0_4"],
value = "!io.netty:netty-common:4.2.5.Final",
test = "!io.netty_4_2_5_final.util.AbstractConstant",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
"!org.apache.commons:commons-pool2:2.12.1",
test = "!org.apache.commons.pool2_2_12_1.BaseObject",
relocate = ["!org.apache.commons.pool2", "!org.apache.commons.pool2_2_12_1"],
value = "!io.netty:netty-handler:4.2.5.Final",
test = "!io.netty_4_2_5_final.handler.ssl.SslHandler",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-resolver-dns:4.2.5.Final",
test = "!io.netty_4_2_5_final.resolver.dns.DnsNameResolver",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-common:4.1.118.Final",
test = "!io.netty_4_1_118_final.util.AbstractConstant",
relocate = ["!io.netty", "!io.netty_4_1_118_final"],
value = "!io.netty:netty-transport:4.2.5.Final",
test = "!io.netty_4_2_5_final.channel.Channel",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-buffer:4.1.118.Final",
test = "!io.netty_4_1_118_final.buffer.AbstractByteBuf",
relocate = ["!io.netty", "!io.netty_4_1_118_final"],
value = "!io.netty:netty-buffer:4.2.5.Final",
test = "!io.netty_4_2_5_final.buffer.ByteBuf",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-codec:4.1.118.Final",
test = "!io.netty_4_1_118_final.handler.codec.AsciiHeadersEncoder",
relocate = ["!io.netty", "!io.netty_4_1_118_final"],
value = "!io.netty:netty-codec-base:4.2.5.Final",
test = "!io.netty_4_2_5_final.handler.codec.ByteToMessageDecoder",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-handler:4.1.118.Final",
test = "!io.netty_4_1_118_final.handler.address.ResolveAddressHandler",
relocate = ["!io.netty", "!io.netty_4_1_118_final"],
value = "!io.netty:netty-resolver:4.2.5.Final",
test = "!io.netty_4_2_5_final.resolver.AddressResolver",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-resolver:4.1.118.Final",
test = "!io.netty_4_1_118_final.resolver.AbstractAddressResolver",
relocate = ["!io.netty", "!io.netty_4_1_118_final"],
value = "!io.netty:netty-transport-native-unix-common:4.2.5.Final",
test = "!io.netty_4_2_5_final.channel.unix.UnixChannel",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-transport:4.1.118.Final",
test = "!io.netty_4_1_118_final.bootstrap.Bootstrap",
relocate = ["!io.netty", "!io.netty_4_1_118_final"],
value = "!io.netty:netty-codec-dns:4.2.5.Final",
test = "!io.netty_4_2_5_final.handler.codec.dns.DnsRecord",
relocate = ["!io.netty", "!io.netty_4_2_5_final"],
transitive = false
),
RuntimeDependency(
value = "!org.reactivestreams:reactive-streams:1.0.4",
test = "!org.reactivestreams_1_0_4.Publisher",
relocate = ["!org.reactivestreams", "!org.reactivestreams_1_0_4"],
transitive = false
),
RuntimeDependency(
value = "!org.slf4j:slf4j-api:1.7.36",
test = "!org.slf4j_1_7_36.Logger",
relocate = ["!org.slf4j", "!org.slf4j_1_7_36"],
transitive = false
),
RuntimeDependency(
value = "!io.projectreactor:reactor-core:3.6.6",
test = "!reactor_3_6_6.core.publisher.Flux",
relocate = ["!reactor", "!reactor_3_6_6",
"!org.reactivestreams", "!org.reactivestreams_1_0_4"],
transitive = false
),
RuntimeDependency(
value = "!io.netty:netty-transport-native-unix-common:4.1.118.Final",
test = "!io.netty_4_1_118_final.channel.unix.Buffer",
relocate = ["!io.netty", "!io.netty_4_1_118_final"],
value = "!redis.clients.authentication:redis-authx-core:0.1.1-beta2",
test = "!redis.clients.authentication_0_1_1_beta2.core.TokenManager",
relocate = ["!redis.clients.authentication", "!redis.clients.authentication_0_1_1_beta2"],
transitive = false
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,67 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe
return completableFuture
}

override fun startSync(autoRelease: Boolean) {
val resource = DefaultClientResources.builder()

if (redisConfig.ioThreadPoolSize != 0) {
resource.ioThreadPoolSize(redisConfig.ioThreadPoolSize)
}
if (redisConfig.computationThreadPoolSize != 0) {
resource.computationThreadPoolSize(redisConfig.computationThreadPoolSize)
}

val clientOptions = ClientOptions.builder()
.autoReconnect(redisConfig.autoReconnect)
.pingBeforeActivateConnection(redisConfig.pingBeforeActivateConnection)

if (redisConfig.ssl) {
clientOptions.sslOptions(redisConfig.sslOptions)
}
val uri = redisConfig.redisURIBuilder().build()

resources = resource.build()
client = RedisClient.create(resources, uri).apply {
options = clientOptions.build()
}
// 连接 pub/sub 通道
pubSubConnection = client.connectPubSub()

if (redisConfig.enableSlaves) {
enabledSlaves = true
val slaves = redisConfig.slaves

// 连接同步
masterReplicaPool = ConnectionPoolSupport.createGenericObjectPool(
{ MasterReplica.connect(client, StringCodec.UTF8, uri).apply {
readFrom = slaves.readFrom
} },
redisConfig.pool.slavesPoolConfig()
)
// 连接异步(同步方式创建)
masterAsyncReplicaPool = AsyncConnectionPoolSupport.createBoundedObjectPool(
{ MasterReplica.connectAsync(client, StringCodec.UTF8, uri).whenComplete { v, _ ->
v.readFrom = slaves.readFrom
} },
redisConfig.asyncPool.poolConfig()
)
} else {
// 连接同步
pool = ConnectionPoolSupport.createGenericObjectPool(
{ client.connect() },
redisConfig.pool.poolConfig()
)
// 连接异步(同步方式创建)
asyncPool = AsyncConnectionPoolSupport.createBoundedObjectPool(
{ client.connectAsync(StringCodec.UTF8, uri) },
redisConfig.asyncPool.poolConfig()
)
}
if (autoRelease) {
LettuceRedis.clients += this
}
}

override fun stop() {
pubSubConnection.close()
if (enabledSlaves) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ import java.util.concurrent.CompletableFuture
interface IRedisClient {

/**
* 启动 Redis 客户端
* 启动 Redis 客户端(异步)
* @param autoRelease 关服是否自动释放
* */
fun start(autoRelease: Boolean = true): CompletableFuture<Void>

/**
* 启动 Redis 客户端(同步)
* @param autoRelease 关服是否自动释放
* */
fun startSync(autoRelease: Boolean = true)

/**
* 结束 Redis 客户端
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@ fun createLettuceRedisConfig(configurationSection: ConfigurationSection) = Lettu
* 根据 [LettuceRedisConfig] 创建一个 集群 Redis 客户端
*
* 需要使用 ```IRedisClient.start()``` 方法启动
*
* 记得导入 ```compileOnly("io.lettuce:lettuce-core:6.6.0.RELEASE")```
* */
fun LettuceRedisConfig.createClusterClient(): LettuceClusterRedisClient = LettuceClusterRedisClient(this)

/**
* 根据 [LettuceRedisConfig] 创建一个 Redis 客户端
*
* 需要使用 ```IRedisClient.start()``` 方法启动
*
* 记得导入 ```compileOnly("io.lettuce:lettuce-core:6.6.0.RELEASE")```
* */
fun LettuceRedisConfig.createClient(): LettuceRedisClient = LettuceRedisClient(this)
Loading