Skip to content

Commit 00d4946

Browse files
committed
add wal event listener
1 parent bdb2a96 commit 00d4946

70 files changed

Lines changed: 3551 additions & 87 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/check.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,17 @@ jobs:
3636
with:
3737
name: jdbc-basic-test-report
3838
path: tests/jdbc-basic/build/reports/tests/test
39+
40+
- name: Upload r2dbc-wal test report
41+
if: failure()
42+
uses: actions/upload-artifact@v4
43+
with:
44+
name: r2dbc-wal-test-report
45+
path: tests/r2dbc-wal/build/reports/tests/test
46+
47+
- name: Upload jdbc-wal test report
48+
if: failure()
49+
uses: actions/upload-artifact@v4
50+
with:
51+
name: jdbc-wal-test-report
52+
path: tests/jdbc-wal/build/reports/tests/test

lib/build.gradle.kts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ enum class SubProjects(val projectName: String) {
1919
JDBC("jdbc"),
2020
R2DBC("r2dbc"),
2121
SHARED("shared"),
22-
INTERN("intern"),
22+
WAL("wal"),
2323
}
2424

2525
tasks.matching { it.name.startsWith("publish") }.configureEach {
@@ -53,7 +53,6 @@ subprojects {
5353

5454
val artifactId = project.name
5555

56-
if (projectType in setOf(SubProjects.INTERN)) return@subprojects
5756
mavenPublishing {
5857
val descriptionStr = when (projectType) {
5958
SubProjects.CORE -> "Core utilities used by all generated code: column/value helpers, constraint " +
@@ -67,7 +66,9 @@ subprojects {
6766
SubProjects.R2DBC ->
6867
"R2DBC-specific helpers and extensions for non-blocking database access with Exposed R2DBC."
6968

70-
SubProjects.INTERN -> error("should not be published")
69+
SubProjects.WAL -> "PostgreSQL logical WAL listener based on wal2json. Manages replication slots and " +
70+
"replica identity per table, streams change events via the JDBC replication API, and publishes " +
71+
"them as a Kotlin Flow for reactive processing in generated code."
7172
}
7273
coordinates(
7374
groupId = project.group as String,
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package de.quati.pgen.core
2+
3+
import de.quati.pgen.shared.TableNameWithSchema
4+
import de.quati.pgen.shared.WalEvent
5+
import kotlinx.serialization.json.JsonObject
6+
7+
public interface PgenTable {
8+
public fun getTableNameWithSchema(): TableNameWithSchema
9+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package de.quati.pgen.core
2+
3+
import de.quati.pgen.shared.WalEvent
4+
import kotlinx.serialization.json.JsonObject
5+
6+
public interface PgenWalEventTable : PgenTable {
7+
public fun walEventMapper(event: WalEvent.Change<JsonObject>): WalEvent.Change<*>
8+
}

lib/core/src/main/kotlin/de/quati/pgen/core/column/DomainColumnType.kt

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ package de.quati.pgen.core.column
22

33
import org.jetbrains.exposed.v1.core.Column
44
import org.jetbrains.exposed.v1.core.ColumnType
5+
import org.jetbrains.exposed.v1.core.IColumnType
56
import org.jetbrains.exposed.v1.core.Table
67
import kotlin.reflect.KClass
78
import kotlin.reflect.KProperty1
89
import kotlin.reflect.full.memberProperties
910
import kotlin.reflect.full.primaryConstructor
1011

11-
public class DomainColumnType<T : Any>(
12+
public class DomainColumnType<T : Any, R>(
1213
kClass: KClass<T>,
13-
private val sqlType: String,
14-
private val builder: (Any) -> T,
14+
public val sqlType: String,
15+
public val originType: IColumnType<R>,
16+
public val builder: (R?) -> T,
1517
) : ColumnType<T>() {
1618
private val getter: KProperty1<T, Any>
1719

@@ -34,15 +36,24 @@ public class DomainColumnType<T : Any>(
3436
override fun sqlType(): String = sqlType
3537
override fun notNullValueToDB(value: T): Any = getter.get(value)
3638
override fun nonNullValueToString(value: T): String = "'$value'"
37-
override fun valueFromDB(value: Any): T = builder(value)
39+
override fun valueFromDB(value: Any): T {
40+
val innerValue = originType.valueFromDB(value)
41+
return builder(innerValue)
42+
}
3843
}
3944

4045

41-
public inline fun <reified T : Any> Table.domainType(
46+
public inline fun <reified T : Any, R> Table.domainType(
4247
name: String,
4348
sqlType: String,
44-
noinline builder: (Any) -> T,
49+
originType: IColumnType<R>,
50+
noinline builder: (R?) -> T,
4551
): Column<T> {
46-
val type = DomainColumnType(kClass = T::class, sqlType = sqlType, builder = builder)
52+
val type = DomainColumnType(
53+
kClass = T::class,
54+
sqlType = sqlType,
55+
originType = originType,
56+
builder = builder,
57+
)
4758
return registerColumn(name = name, type = type)
4859
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package de.quati.pgen.core.util
2+
3+
import de.quati.pgen.core.column.DomainColumnType
4+
import kotlinx.datetime.LocalDate
5+
import kotlinx.datetime.LocalTime
6+
import kotlinx.serialization.json.JsonArray
7+
import kotlinx.serialization.json.JsonElement
8+
import kotlinx.serialization.json.JsonNull
9+
import kotlinx.serialization.json.JsonObject
10+
import kotlinx.serialization.json.JsonPrimitive
11+
import org.jetbrains.exposed.v1.core.Column
12+
import org.jetbrains.exposed.v1.core.CustomEnumerationColumnType
13+
import org.jetbrains.exposed.v1.core.IColumnType
14+
import org.jetbrains.exposed.v1.core.StringColumnType
15+
import org.jetbrains.exposed.v1.core.UUIDColumnType
16+
import org.jetbrains.exposed.v1.datetime.KotlinInstantColumnType
17+
import org.jetbrains.exposed.v1.datetime.KotlinLocalDateColumnType
18+
import org.jetbrains.exposed.v1.datetime.KotlinLocalTimeColumnType
19+
import org.jetbrains.exposed.v1.datetime.KotlinOffsetDateTimeColumnType
20+
import java.time.Instant
21+
import java.time.LocalDateTime
22+
import java.time.OffsetDateTime
23+
import java.time.ZoneId
24+
import java.time.ZoneOffset
25+
import kotlin.time.ExperimentalTime
26+
import kotlin.time.toKotlinInstant
27+
28+
29+
public fun <T : Any> parseColumnNullable(data: JsonObject, column: Column<T?>): T? {
30+
val data = data[column.name] ?: throw NoSuchElementException("Column ${column.name} not found in JSON")
31+
val result = parse(data, column.columnType)
32+
return result
33+
}
34+
35+
public fun <T : Any> parseColumn(data: JsonObject, column: Column<T>): T {
36+
val data = data[column.name] ?: throw NoSuchElementException("Column ${column.name} not found in JSON")
37+
val result = parse(data, column.columnType)
38+
return result ?: error(
39+
"Column '${column.name}' is required but was null or invalid"
40+
)
41+
}
42+
43+
private fun <T : Any> parse(value: JsonElement, type: IColumnType<T>): T? = when (value) {
44+
JsonNull -> null
45+
is JsonArray, is JsonObject -> error("Unsupported field value '$value' for column type: ${type::class.simpleName}")
46+
is JsonPrimitive -> when {
47+
value.isString -> parseString(value.content, type)
48+
else -> error("Unsupported field value '$value' for column type: ${type::class.simpleName}")
49+
}
50+
}
51+
52+
53+
private fun <T : Any> parseString(value: String, type: IColumnType<T>): T = when (type) {
54+
is StringColumnType -> type.valueFromDB(value)
55+
is UUIDColumnType -> type.valueFromDB(value)
56+
is CustomEnumerationColumnType<*> -> type.valueFromDB(value)
57+
is DomainColumnType<T, *> -> {
58+
@Suppress("UNCHECKED_CAST")
59+
val type = type as DomainColumnType<T, Any>
60+
val inner = parseString(value, type.originType)
61+
type.builder(inner)
62+
}
63+
64+
is KotlinOffsetDateTimeColumnType -> {
65+
@Suppress("UNCHECKED_CAST")
66+
OffsetDateTime.parse(value, pgenTimestampTzFormatter) as T
67+
}
68+
69+
is KotlinInstantColumnType -> {
70+
@Suppress("UNCHECKED_CAST")
71+
@OptIn(ExperimentalTime::class)
72+
LocalDateTime.parse(value, pgenTimestampFormatter)
73+
.atZone(ZoneId.systemDefault())
74+
.toInstant()
75+
.toKotlinInstant() as T
76+
}
77+
78+
is KotlinLocalDateColumnType -> {
79+
@Suppress("UNCHECKED_CAST")
80+
LocalDate.parse(value) as T
81+
}
82+
83+
is KotlinLocalTimeColumnType -> {
84+
@Suppress("UNCHECKED_CAST")
85+
LocalTime.parse(value) as T
86+
}
87+
88+
else -> error("Unsupported column type: ${type::class.simpleName}")
89+
}

lib/core/src/main/kotlin/de/quati/pgen/core/util/Util.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import org.jetbrains.exposed.v1.core.ExpressionWithColumnType
1818
import org.jetbrains.exposed.v1.core.QueryParameter
1919
import org.jetbrains.exposed.v1.core.anyFrom
2020
import org.jetbrains.exposed.v1.core.QueryBuilder
21+
import java.time.format.DateTimeFormatter
22+
import java.time.format.DateTimeFormatterBuilder
23+
import java.time.temporal.ChronoField
2124

2225
public object IsInsert : Expression<Boolean>() {
2326
override fun toQueryBuilder(queryBuilder: QueryBuilder) {
@@ -119,3 +122,14 @@ public sealed interface UpdateSingleResult {
119122
public class Success(public val data: ResultRow) : UpdateSingleResult
120123
public data object TooMany : UpdateSingleResult
121124
}
125+
126+
public val pgenTimestampTzFormatter: DateTimeFormatter = DateTimeFormatterBuilder()
127+
.appendPattern("yyyy-MM-dd HH:mm:ss")
128+
.optionalStart().appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true).optionalEnd()
129+
.appendOffset("+HH", "Z")
130+
.toFormatter()!!
131+
132+
public val pgenTimestampFormatter: DateTimeFormatter = DateTimeFormatterBuilder()
133+
.appendPattern("yyyy-MM-dd HH:mm:ss")
134+
.optionalStart().appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true).optionalEnd()
135+
.toFormatter()!!

lib/gradle/libs.versions.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ exposed-json = { module = "org.jetbrains.exposed:exposed-json", version.ref = "e
1212
exposed-datetime = { module = "org.jetbrains.exposed:exposed-kotlin-datetime", version.ref = "exposed" }
1313

1414
exposed-jdbc = { module = "org.jetbrains.exposed:exposed-jdbc", version.ref = "exposed" }
15-
jdbc-postgresql = { module = "org.postgresql:postgresql", version = "42.7.4" }
15+
jdbc-postgresql = { module = "org.postgresql:postgresql", version = "42.7.7" }
1616

1717
exposed-r2dbc = { module = "org.jetbrains.exposed:exposed-r2dbc", version.ref = "exposed" }
1818
r2dbc-postgresql = { module = "org.postgresql:r2dbc-postgresql", version = "1.0.7.RELEASE" }
1919
r2dbc-pool = { module = "io.r2dbc:r2dbc-pool", version = "1.0.2.RELEASE" }
2020

21+
slf4j = { module = "org.slf4j:slf4j-api", version = "2.0.17" }
22+
2123
goquati-base = { module = "de.quati:kotlin-util", version.ref = "quati-util" }
2224

2325
kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core-jvm", version.ref = "kotlinx-serialization" }
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package de.quati.pgen.jdbc.util
2+
3+
import org.jetbrains.exposed.v1.core.BooleanColumnType
4+
import org.jetbrains.exposed.v1.core.TextColumnType
5+
import org.jetbrains.exposed.v1.jdbc.JdbcTransaction
6+
7+
public fun JdbcTransaction.emitLogicalPgMessage(
8+
transactional: Boolean,
9+
prefix: String,
10+
message: String,
11+
flush: Boolean = false,
12+
): String = exec(
13+
stmt = "select pg_logical_emit_message(?, ?, ?, ?)",
14+
args = listOf(
15+
BooleanColumnType() to transactional,
16+
TextColumnType() to prefix,
17+
TextColumnType() to message,
18+
BooleanColumnType() to flush,
19+
),
20+
) {
21+
it.next()
22+
it.getString(1)
23+
} ?: error("no result")
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package de.quati.pgen.r2dbc.util
2+
3+
import kotlinx.coroutines.flow.single
4+
import org.jetbrains.exposed.v1.core.BooleanColumnType
5+
import org.jetbrains.exposed.v1.core.TextColumnType
6+
import org.jetbrains.exposed.v1.r2dbc.R2dbcTransaction
7+
8+
public suspend fun R2dbcTransaction.emitLogicalPgMessage(
9+
transactional: Boolean,
10+
prefix: String,
11+
message: String,
12+
flush: Boolean = false,
13+
): String = exec(
14+
stmt = "select pg_logical_emit_message(?, ?, ?, ?)",
15+
args = listOf(
16+
BooleanColumnType() to transactional,
17+
TextColumnType() to prefix,
18+
TextColumnType() to message,
19+
BooleanColumnType() to flush,
20+
),
21+
) {
22+
it.get(0) as String
23+
}?.single() ?: error("no result")

0 commit comments

Comments
 (0)