Skip to content

Raw tables #213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.3.0 (unreleased)

* Support tables created outside of PowerSync with the `RawTable` API.

## 1.2.2

* Supabase: Avoid creating `Json` serializers multiple times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.powersync.sync

import app.cash.turbine.turbineScope
import co.touchlab.kermit.ExperimentalKermitApi
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncDatabase
import com.powersync.PowerSyncException
import com.powersync.TestConnector
Expand All @@ -13,6 +14,9 @@ import com.powersync.bucket.OplogEntry
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.PendingStatement
import com.powersync.db.schema.PendingStatementParameter
import com.powersync.db.schema.RawTable
import com.powersync.db.schema.Schema
import com.powersync.testutils.UserRow
import com.powersync.testutils.databaseTest
Expand Down Expand Up @@ -650,7 +654,7 @@ abstract class BaseSyncIntegrationTest(
class LegacySyncIntegrationTest : BaseSyncIntegrationTest(false)

class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
// The legacy sync implementation doesn't prefetch credentials.
// The legacy sync implementation doesn't prefetch credentials and doesn't support raw tables.

@OptIn(LegacySyncImplementation::class)
@Test
Expand Down Expand Up @@ -688,4 +692,88 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
turbine.cancel()
}
}

@Test
@OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class)
fun rawTables() = databaseTest(createInitialDatabase = false) {
val db = openDatabase(Schema(listOf(
RawTable(
name = "lists",
put = PendingStatement(
"INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)",
listOf(PendingStatementParameter.Id, PendingStatementParameter.Column("name"))
),
delete = PendingStatement(
"DELETE FROM lists WHERE id = ?", listOf(PendingStatementParameter.Id)
)
)
)))

db.execute("CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT)")
turbineScope(timeout = 10.0.seconds) {
val query = db.watch("SELECT * FROM lists", throttleMs = 0L) {
it.getString(0) to it.getString(1)
}.testIn(this)
query.awaitItem() shouldBe emptyList()

db.connect(connector, options = options)
syncLines.send(SyncLine.FullCheckpoint(Checkpoint(
lastOpId = "1",
checksums = listOf(BucketChecksum("a", checksum = 0)),
)))
syncLines.send(
SyncLine.SyncDataBucket(
bucket = "a",
data =
listOf(
OplogEntry(
checksum = 0L,
data =
JsonUtil.json.encodeToString(
mapOf(
"name" to "custom list",
),
),
op = OpType.PUT,
opId = "1",
rowId = "my_list",
rowType = "lists",
),
),
after = null,
nextAfter = null,
),
)
syncLines.send(SyncLine.CheckpointComplete("1"))

query.awaitItem() shouldBe listOf("my_list" to "custom list")

syncLines.send(SyncLine.FullCheckpoint(Checkpoint(
lastOpId = "2",
checksums = listOf(BucketChecksum("a", checksum = 0)),
)))
syncLines.send(
SyncLine.SyncDataBucket(
bucket = "a",
data =
listOf(
OplogEntry(
checksum = 0L,
data = null,
op = OpType.REMOVE,
opId = "2",
rowId = "my_list",
rowType = "lists",
),
),
after = null,
nextAfter = null,
),
)
syncLines.send(SyncLine.CheckpointComplete("1"))

query.awaitItem() shouldBe emptyList()
query.cancelAndIgnoreRemainingEvents()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ internal class ActiveDatabaseTest(
everySuspend { invalidateCredentials() } returns Unit
}

fun openDatabase(): PowerSyncDatabaseImpl {
fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl {
logger.d { "Opening database $databaseName in directory $testDirectory" }
val db =
createPowerSyncDatabaseImpl(
factory = factory,
schema = Schema(UserRow.table),
schema = schema,
dbFilename = databaseName,
dbDirectory = testDirectory,
logger = logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.powersync.bucket

import com.powersync.db.crud.CrudEntry
import com.powersync.db.internal.PowerSyncTransaction
import com.powersync.db.schema.SerializableSchema
import com.powersync.sync.Instruction
import com.powersync.sync.LegacySyncImplementation
import com.powersync.sync.SyncDataBatch
Expand Down Expand Up @@ -59,6 +60,7 @@ internal sealed interface PowerSyncControlArguments {
@Serializable
class Start(
val parameters: JsonObject,
val schema: SerializableSchema
) : PowerSyncControlArguments

data object Stop : PowerSyncControlArguments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ internal class PowerSyncDatabaseImpl(
uploadScope = scope,
createClient = createClient,
options = options,
schema = schema,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import kotlinx.coroutines.withContext
import kotlinx.serialization.encodeToString
import kotlin.time.Duration.Companion.milliseconds

@OptIn(FlowPreview::class)
internal class InternalDatabaseImpl(
private val factory: DatabaseDriverFactory,
private val scope: CoroutineScope,
Expand Down
13 changes: 13 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.powersync.db.schema

public sealed interface BaseTable {
/**
* The name of the table.
*/
public val name: String

/**
* Check that there are no issues in the table definition.
*/
public fun validate()
}
88 changes: 88 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.powersync.db.schema

import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.sync.SyncOptions
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonObjectBuilder
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put

/**
* A table that is managed by the user instead of being auto-created and migrated by the PowerSync
* SDK.
*
* These tables give application developers full control over the table (including table and
* column constraints). The [put] and [delete] statements the sync client uses to apply operations
* to the local database also need to be set explicitly.
*
* A main benefit of raw tables is that, since they're not backed by JSON views, complex queries on
* them can be much more efficient.
*
* Note that raw tables are only supported when [SyncOptions.newClientImplementation] is enabled.
*/
@ExperimentalPowerSyncAPI
public class RawTable(
override val name: String,
public val put: PendingStatement,
/**
* The statement to run when the sync client wants to delete a row.
*/
public val delete: PendingStatement,
): BaseTable {
override fun validate() {
// We don't currently have any validation for raw tables
}

internal fun serialize(): JsonElement {
return buildJsonObject {
put("name", name)
put("put", put.serialize())
put("delete", delete.serialize())
}
}
}

@ExperimentalPowerSyncAPI
public class PendingStatement(
public val sql: String,
public val parameters: List<PendingStatementParameter>,
) {
internal fun serialize(): JsonElement {
return buildJsonObject {
put("sql", sql)
put("params", buildJsonArray {
for (param in parameters) {
add(when(param) {
is PendingStatementParameter.Column -> buildJsonObject {
put("Column", param.name)
}
PendingStatementParameter.Id -> JsonPrimitive("Id")
})
}
})
}
}
}

/**
* A parameter that can be used in a [PendingStatement].
*/
@ExperimentalPowerSyncAPI
public sealed interface PendingStatementParameter {
/**
* Resolves to the id of the affected row.
*/
public object Id: PendingStatementParameter

/**
* Resolves to the value of a column in the added row.
*
* This is only available for [RawTable.put] - in [RawTable.delete] statements, only the [Id]
* can be used as a value.
*/
public class Column(public val name: String): PendingStatementParameter
}

27 changes: 24 additions & 3 deletions core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
package com.powersync.db.schema

import com.powersync.ExperimentalPowerSyncAPI
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement

/**
* The schema used by the database.
*
* The implementation uses the schema as a "VIEW" on top of JSON data.
* No migrations are required on the client.
*/
public data class Schema(
@OptIn(ExperimentalPowerSyncAPI::class)
public data class Schema internal constructor(
val tables: List<Table>,
val rawTables: List<RawTable>,
) {
public constructor(tables: List<BaseTable>): this(
tables.filterIsInstance<Table>(),
tables.filterIsInstance<RawTable>()
)

init {
validate()
}

internal val allTables: Sequence<BaseTable> get() = sequence {
yieldAll(tables)
yieldAll(rawTables)
}

/**
* Secondary constructor to create a schema with a variable number of tables.
*/
Expand All @@ -28,7 +43,7 @@ public data class Schema(
*/
public fun validate() {
val tableNames = mutableSetOf<String>()
tables.forEach { table ->
allTables.forEach { table ->
if (!tableNames.add(table.name)) {
throw AssertionError("Duplicate table name: ${table.name}")
}
Expand Down Expand Up @@ -56,9 +71,15 @@ public data class Schema(
@Serializable
internal data class SerializableSchema(
val tables: List<SerializableTable>,
@SerialName("raw_tables")
val rawTables: List<JsonElement>,
)

@OptIn(ExperimentalPowerSyncAPI::class)
internal fun Schema.toSerializable(): SerializableSchema =
with(this) {
SerializableSchema(tables.map { it.toSerializable() })
SerializableSchema(
tables = tables.map { it.toSerializable() },
rawTables = rawTables.map { it.serialize() }
)
}
9 changes: 3 additions & 6 deletions core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public data class Table(
/**
* The synced table name, matching sync rules.
*/
var name: String,
override var name: String,
/**
* List of columns.
*/
Expand Down Expand Up @@ -53,7 +53,7 @@ public data class Table(
* CRUD entries.
*/
val ignoreEmptyUpdates: Boolean = false,
) {
): BaseTable {
init {
/**
* Need to set the column definition for each index column.
Expand Down Expand Up @@ -141,10 +141,7 @@ public data class Table(
)
)

/**
* Check that there are no issues in the table definition.
*/
public fun validate() {
public override fun validate() {
if (columns.size > MAX_AMOUNT_OF_COLUMNS) {
throw AssertionError("Table $name has more than $MAX_AMOUNT_OF_COLUMNS columns, which is not supported")
}
Expand Down
9 changes: 8 additions & 1 deletion core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import com.powersync.bucket.PowerSyncControlArguments
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.db.crud.CrudEntry
import com.powersync.db.schema.Schema
import com.powersync.db.schema.SerializableSchema
import com.powersync.db.schema.toSerializable
import com.powersync.utils.JsonUtil
import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
Expand Down Expand Up @@ -62,6 +65,7 @@ internal class SyncStream(
private val params: JsonObject,
private val uploadScope: CoroutineScope,
private val options: SyncOptions,
private val schema: Schema,
createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient,
) {
private var isUploadingCrud = AtomicReference<PendingCrudUpload?>(null)
Expand Down Expand Up @@ -310,7 +314,10 @@ internal class SyncStream(
}

suspend fun start() {
invokeControl(PowerSyncControlArguments.Start(params))
invokeControl(PowerSyncControlArguments.Start(
parameters = params,
schema = schema.toSerializable(),
))

var hadSyncLine = false
for (line in controlInvocations) {
Expand Down
Loading
Loading