diff --git a/backend/src/main/java/io/opspace/backend/repository/SqliteActivityLogRepository.java b/backend/src/main/java/io/opspace/backend/repository/SqliteActivityLogRepository.java index 66bbc23..d083c01 100644 --- a/backend/src/main/java/io/opspace/backend/repository/SqliteActivityLogRepository.java +++ b/backend/src/main/java/io/opspace/backend/repository/SqliteActivityLogRepository.java @@ -3,14 +3,17 @@ import io.opspace.backend.domain.ActivityAction; import io.opspace.backend.domain.ActivityLog; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.function.Supplier; @Repository @ConditionalOnProperty(name = "app.storage.type", havingValue = "sqlite", matchIfMissing = true) public class SqliteActivityLogRepository implements ActivityLogRepository { + private static final int SQLITE_LOCK_RETRY_MAX = 8; private static final String INSERT_SQL = """ INSERT INTO activity_logs ( @@ -26,13 +29,27 @@ INSERT INTO activity_logs ( last_updated_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + action = excluded.action, + capture_id = excluded.capture_id, + item_id = excluded.item_id, + metadata = excluded.metadata, + created_at = excluded.created_at, + created_by = excluded.created_by, + last_updated_at = excluded.last_updated_at, + last_updated_by = excluded.last_updated_by """; private static final String FIND_ALL_SQL = """ SELECT row_id, id, action, capture_id, item_id, metadata, created_at, created_by, last_updated_at, last_updated_by FROM activity_logs ORDER BY created_at DESC """; - private static final String NEXT_ROW_ID_SQL = "SELECT COALESCE(MAX(row_id), 0) + 1 FROM activity_logs"; + private static final String BACKFILL_ROW_ID_SQL = """ + UPDATE activity_logs + SET row_id = rowid + WHERE id = ? AND row_id IS NULL + """; + private static final String FIND_ROW_ID_BY_ID_SQL = "SELECT row_id FROM activity_logs WHERE id = ?"; private final JdbcTemplate jdbcTemplate; @@ -42,35 +59,47 @@ public SqliteActivityLogRepository(JdbcTemplate jdbcTemplate) { @Override public ActivityLog save(ActivityLog activityLog) { - Long rowId = activityLog.rowId() != null - ? activityLog.rowId() - : jdbcTemplate.queryForObject(NEXT_ROW_ID_SQL, Long.class); - ActivityLog normalized = new ActivityLog( - rowId, - activityLog.id(), - activityLog.action(), - activityLog.captureId(), - activityLog.itemId(), - activityLog.metadata(), - activityLog.createdAt(), - activityLog.createdBy(), - activityLog.lastUpdatedAt(), - activityLog.lastUpdatedBy() - ); - jdbcTemplate.update( - INSERT_SQL, - normalized.rowId(), - normalized.id(), - normalized.action().name(), - normalized.captureId(), - normalized.itemId(), - normalized.metadata(), - normalized.createdAt(), - normalized.createdBy(), - normalized.lastUpdatedAt(), - normalized.lastUpdatedBy() - ); - return normalized; + return withSqliteLockRetry(() -> { + ActivityLog normalized = new ActivityLog( + activityLog.rowId(), + activityLog.id(), + activityLog.action(), + activityLog.captureId(), + activityLog.itemId(), + activityLog.metadata(), + activityLog.createdAt(), + activityLog.createdBy(), + activityLog.lastUpdatedAt(), + activityLog.lastUpdatedBy() + ); + jdbcTemplate.update( + INSERT_SQL, + normalized.rowId(), + normalized.id(), + normalized.action().name(), + normalized.captureId(), + normalized.itemId(), + normalized.metadata(), + normalized.createdAt(), + normalized.createdBy(), + normalized.lastUpdatedAt(), + normalized.lastUpdatedBy() + ); + jdbcTemplate.update(BACKFILL_ROW_ID_SQL, normalized.id()); + Long persistedRowId = jdbcTemplate.queryForObject(FIND_ROW_ID_BY_ID_SQL, Long.class, normalized.id()); + return new ActivityLog( + persistedRowId, + normalized.id(), + normalized.action(), + normalized.captureId(), + normalized.itemId(), + normalized.metadata(), + normalized.createdAt(), + normalized.createdBy(), + normalized.lastUpdatedAt(), + normalized.lastUpdatedBy() + ); + }); } @Override @@ -88,4 +117,45 @@ public List findAllNewestFirst() { rs.getString("last_updated_by") )); } + + private T withSqliteLockRetry(Supplier supplier) { + DataAccessException lastException = null; + for (int attempt = 0; attempt < SQLITE_LOCK_RETRY_MAX; attempt++) { + try { + return supplier.get(); + } catch (DataAccessException exception) { + lastException = exception; + if (!isSqliteLocked(exception) || attempt == SQLITE_LOCK_RETRY_MAX - 1) { + throw exception; + } + sleepBackoff(attempt); + } + } + throw lastException; + } + + private boolean isSqliteLocked(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + String message = current.getMessage(); + String className = current.getClass().getName(); + boolean sqliteCause = className.contains("SQLite"); + if (sqliteCause && message != null + && (message.contains("SQLITE_LOCKED") + || message.contains("SQLITE_BUSY") + || message.contains("database is locked"))) { + return true; + } + current = current.getCause(); + } + return false; + } + + private void sleepBackoff(int attempt) { + try { + Thread.sleep((long) (attempt + 1) * 6L); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + } + } } diff --git a/backend/src/main/java/io/opspace/backend/repository/SqliteCaptureRepository.java b/backend/src/main/java/io/opspace/backend/repository/SqliteCaptureRepository.java index b592b48..1275ea7 100644 --- a/backend/src/main/java/io/opspace/backend/repository/SqliteCaptureRepository.java +++ b/backend/src/main/java/io/opspace/backend/repository/SqliteCaptureRepository.java @@ -2,15 +2,18 @@ import io.opspace.backend.domain.Capture; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.util.List; import java.util.Optional; +import java.util.function.Supplier; @Repository @ConditionalOnProperty(name = "app.storage.type", havingValue = "sqlite", matchIfMissing = true) public class SqliteCaptureRepository implements CaptureRepository { + private static final int SQLITE_LOCK_RETRY_MAX = 8; private static final String UPSERT_SQL = """ INSERT INTO captures ( @@ -46,7 +49,12 @@ ON CONFLICT(id) DO UPDATE SET FROM captures ORDER BY created_at DESC """; - private static final String NEXT_ROW_ID_SQL = "SELECT COALESCE(MAX(row_id), 0) + 1 FROM captures"; + private static final String BACKFILL_ROW_ID_SQL = """ + UPDATE captures + SET row_id = rowid + WHERE id = ? AND row_id IS NULL + """; + private static final String FIND_ROW_ID_BY_ID_SQL = "SELECT row_id FROM captures WHERE id = ?"; private static final String DELETE_BY_ID_SQL = "DELETE FROM captures WHERE id = ?"; private final JdbcTemplate jdbcTemplate; @@ -57,33 +65,47 @@ public SqliteCaptureRepository(JdbcTemplate jdbcTemplate) { @Override public Capture save(Capture capture) { - Long rowId = capture.rowId() != null ? capture.rowId() : jdbcTemplate.queryForObject(NEXT_ROW_ID_SQL, Long.class); - Capture normalizedCapture = new Capture( - rowId, - capture.id(), - capture.content(), - capture.createdAt(), - capture.createdBy(), - capture.lastUpdatedAt(), - capture.lastUpdatedBy(), - capture.convertedAt(), - capture.convertedItemId(), - capture.deletedAt() - ); - jdbcTemplate.update( - UPSERT_SQL, - normalizedCapture.rowId(), - normalizedCapture.id(), - normalizedCapture.content(), - normalizedCapture.createdAt(), - normalizedCapture.createdBy(), - normalizedCapture.lastUpdatedAt(), - normalizedCapture.lastUpdatedBy(), - normalizedCapture.convertedAt(), - normalizedCapture.convertedItemId(), - normalizedCapture.deletedAt() - ); - return normalizedCapture; + return withSqliteLockRetry(() -> { + Capture normalizedCapture = new Capture( + capture.rowId(), + capture.id(), + capture.content(), + capture.createdAt(), + capture.createdBy(), + capture.lastUpdatedAt(), + capture.lastUpdatedBy(), + capture.convertedAt(), + capture.convertedItemId(), + capture.deletedAt() + ); + jdbcTemplate.update( + UPSERT_SQL, + normalizedCapture.rowId(), + normalizedCapture.id(), + normalizedCapture.content(), + normalizedCapture.createdAt(), + normalizedCapture.createdBy(), + normalizedCapture.lastUpdatedAt(), + normalizedCapture.lastUpdatedBy(), + normalizedCapture.convertedAt(), + normalizedCapture.convertedItemId(), + normalizedCapture.deletedAt() + ); + jdbcTemplate.update(BACKFILL_ROW_ID_SQL, normalizedCapture.id()); + Long persistedRowId = jdbcTemplate.queryForObject(FIND_ROW_ID_BY_ID_SQL, Long.class, normalizedCapture.id()); + return new Capture( + persistedRowId, + normalizedCapture.id(), + normalizedCapture.content(), + normalizedCapture.createdAt(), + normalizedCapture.createdBy(), + normalizedCapture.lastUpdatedAt(), + normalizedCapture.lastUpdatedBy(), + normalizedCapture.convertedAt(), + normalizedCapture.convertedItemId(), + normalizedCapture.deletedAt() + ); + }); } @Override @@ -123,4 +145,45 @@ public List findAllNewestFirst() { public void deleteById(String id) { jdbcTemplate.update(DELETE_BY_ID_SQL, id); } + + private T withSqliteLockRetry(Supplier supplier) { + DataAccessException lastException = null; + for (int attempt = 0; attempt < SQLITE_LOCK_RETRY_MAX; attempt++) { + try { + return supplier.get(); + } catch (DataAccessException exception) { + lastException = exception; + if (!isSqliteLocked(exception) || attempt == SQLITE_LOCK_RETRY_MAX - 1) { + throw exception; + } + sleepBackoff(attempt); + } + } + throw lastException; + } + + private boolean isSqliteLocked(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + String message = current.getMessage(); + String className = current.getClass().getName(); + boolean sqliteCause = className.contains("SQLite"); + if (sqliteCause && message != null + && (message.contains("SQLITE_LOCKED") + || message.contains("SQLITE_BUSY") + || message.contains("database is locked"))) { + return true; + } + current = current.getCause(); + } + return false; + } + + private void sleepBackoff(int attempt) { + try { + Thread.sleep((long) (attempt + 1) * 6L); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + } + } } diff --git a/backend/src/main/java/io/opspace/backend/repository/SqliteItemRepository.java b/backend/src/main/java/io/opspace/backend/repository/SqliteItemRepository.java index e654bcb..83432a8 100644 --- a/backend/src/main/java/io/opspace/backend/repository/SqliteItemRepository.java +++ b/backend/src/main/java/io/opspace/backend/repository/SqliteItemRepository.java @@ -4,15 +4,18 @@ import io.opspace.backend.domain.ItemStatus; import io.opspace.backend.domain.ItemType; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.util.List; import java.util.Optional; +import java.util.function.Supplier; @Repository @ConditionalOnProperty(name = "app.storage.type", havingValue = "sqlite", matchIfMissing = true) public class SqliteItemRepository implements ItemRepository { + private static final int SQLITE_LOCK_RETRY_MAX = 8; private static final String UPSERT_SQL = """ INSERT INTO items ( @@ -54,7 +57,12 @@ ON CONFLICT(id) DO UPDATE SET DELETE FROM items WHERE id = ? """; - private static final String NEXT_ROW_ID_SQL = "SELECT COALESCE(MAX(row_id), 0) + 1 FROM items"; + private static final String BACKFILL_ROW_ID_SQL = """ + UPDATE items + SET row_id = rowid + WHERE id = ? AND row_id IS NULL + """; + private static final String FIND_ROW_ID_BY_ID_SQL = "SELECT row_id FROM items WHERE id = ?"; private final JdbcTemplate jdbcTemplate; @@ -64,34 +72,49 @@ public SqliteItemRepository(JdbcTemplate jdbcTemplate) { @Override public Item save(Item item) { - Long rowId = item.rowId() != null ? item.rowId() : jdbcTemplate.queryForObject(NEXT_ROW_ID_SQL, Long.class); - Item normalizedItem = new Item( - rowId, - item.id(), - item.type(), - item.status(), - item.content(), - item.createdAt(), - item.createdBy(), - item.lastUpdatedAt(), - item.lastUpdatedBy(), - item.sourceCaptureId(), - item.deletedAt() - ); - jdbcTemplate.update( - UPSERT_SQL, - normalizedItem.rowId(), - normalizedItem.id(), - normalizedItem.type().name(), - normalizedItem.status().name(), - normalizedItem.content(), - normalizedItem.createdAt(), - normalizedItem.createdBy(), - normalizedItem.lastUpdatedAt(), - normalizedItem.lastUpdatedBy(), - normalizedItem.sourceCaptureId(), - normalizedItem.deletedAt()); - return normalizedItem; + return withSqliteLockRetry(() -> { + Item normalizedItem = new Item( + item.rowId(), + item.id(), + item.type(), + item.status(), + item.content(), + item.createdAt(), + item.createdBy(), + item.lastUpdatedAt(), + item.lastUpdatedBy(), + item.sourceCaptureId(), + item.deletedAt() + ); + jdbcTemplate.update( + UPSERT_SQL, + normalizedItem.rowId(), + normalizedItem.id(), + normalizedItem.type().name(), + normalizedItem.status().name(), + normalizedItem.content(), + normalizedItem.createdAt(), + normalizedItem.createdBy(), + normalizedItem.lastUpdatedAt(), + normalizedItem.lastUpdatedBy(), + normalizedItem.sourceCaptureId(), + normalizedItem.deletedAt()); + jdbcTemplate.update(BACKFILL_ROW_ID_SQL, normalizedItem.id()); + Long persistedRowId = jdbcTemplate.queryForObject(FIND_ROW_ID_BY_ID_SQL, Long.class, normalizedItem.id()); + return new Item( + persistedRowId, + normalizedItem.id(), + normalizedItem.type(), + normalizedItem.status(), + normalizedItem.content(), + normalizedItem.createdAt(), + normalizedItem.createdBy(), + normalizedItem.lastUpdatedAt(), + normalizedItem.lastUpdatedBy(), + normalizedItem.sourceCaptureId(), + normalizedItem.deletedAt() + ); + }); } @Override @@ -131,4 +154,45 @@ public List findAllNewestFirst() { public void deleteById(String id) { jdbcTemplate.update(DELETE_BY_ID_SQL, id); } + + private T withSqliteLockRetry(Supplier supplier) { + DataAccessException lastException = null; + for (int attempt = 0; attempt < SQLITE_LOCK_RETRY_MAX; attempt++) { + try { + return supplier.get(); + } catch (DataAccessException exception) { + lastException = exception; + if (!isSqliteLocked(exception) || attempt == SQLITE_LOCK_RETRY_MAX - 1) { + throw exception; + } + sleepBackoff(attempt); + } + } + throw lastException; + } + + private boolean isSqliteLocked(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + String message = current.getMessage(); + String className = current.getClass().getName(); + boolean sqliteCause = className.contains("SQLite"); + if (sqliteCause && message != null + && (message.contains("SQLITE_LOCKED") + || message.contains("SQLITE_BUSY") + || message.contains("database is locked"))) { + return true; + } + current = current.getCause(); + } + return false; + } + + private void sleepBackoff(int attempt) { + try { + Thread.sleep((long) (attempt + 1) * 6L); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + } + } } diff --git a/backend/src/test/java/io/opspace/backend/repository/SqliteRepositoriesIntegrationTests.java b/backend/src/test/java/io/opspace/backend/repository/SqliteRepositoriesIntegrationTests.java index 202a6e8..5cf5155 100644 --- a/backend/src/test/java/io/opspace/backend/repository/SqliteRepositoriesIntegrationTests.java +++ b/backend/src/test/java/io/opspace/backend/repository/SqliteRepositoriesIntegrationTests.java @@ -1,11 +1,13 @@ package io.opspace.backend.repository; -import io.opspace.backend.domain.ActivityAction; -import io.opspace.backend.domain.ActivityLog; -import io.opspace.backend.domain.Capture; -import io.opspace.backend.domain.Item; -import io.opspace.backend.domain.ItemStatus; -import io.opspace.backend.domain.ItemType; +import java.time.Instant; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -13,11 +15,7 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.test.context.TestPropertySource; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import io.opspace.backend.domain.*; @SpringBootTest @TestPropertySource(properties = { @@ -189,4 +187,114 @@ void activityLogRepositoryAssignsRowIdAndKeepsNewestFirstOrder() { assertEquals("editor", logs.get(0).lastUpdatedBy()); assertEquals("log-1", logs.get(1).id()); } + + @Test + void captureRepositoryHandlesConcurrentInsertsWithoutDuplicateRowIds() throws Exception { + int total = 24; + Set rowIds = ConcurrentHashMap.newKeySet(); + runConcurrent(total, index -> { + Capture saved = captureRepository.save(new Capture( + null, + "concurrent-capture-" + index, + "Capture " + index, + Instant.now().toString(), + "system", + Instant.now().toString(), + "system", + null, + null, + null + )); + rowIds.add(saved.rowId()); + }); + + assertEquals(total, captureRepository.findAllNewestFirst().size()); + assertEquals(total, rowIds.size()); + } + + @Test + void itemRepositoryHandlesConcurrentInsertsWithoutDuplicateRowIds() throws Exception { + captureRepository.save(new Capture( + null, + "shared-capture", + "Shared capture", + Instant.now().toString(), + "system", + Instant.now().toString(), + "system", + null, + null, + null + )); + + int total = 24; + Set rowIds = ConcurrentHashMap.newKeySet(); + runConcurrent(total, index -> { + Item saved = itemRepository.save(new Item( + null, + "concurrent-item-" + index, + ItemType.TASK, + ItemStatus.TODO, + "Item " + index, + Instant.now().toString(), + "system", + Instant.now().toString(), + "system", + "shared-capture", + null + )); + rowIds.add(saved.rowId()); + }); + + assertEquals(total, itemRepository.findAllNewestFirst().size()); + assertEquals(total, rowIds.size()); + } + + @Test + void activityRepositoryHandlesConcurrentInsertsWithoutDuplicateRowIds() throws Exception { + int total = 24; + Set rowIds = ConcurrentHashMap.newKeySet(); + runConcurrent(total, index -> { + ActivityLog saved = activityLogRepository.save(new ActivityLog( + null, + "concurrent-log-" + index, + ActivityAction.CAPTURE_CREATED, + "capture-" + index, + null, + null, + Instant.now().toString(), + "system", + Instant.now().toString(), + "system" + )); + rowIds.add(saved.rowId()); + }); + + assertEquals(total, activityLogRepository.findAllNewestFirst().size()); + assertEquals(total, rowIds.size()); + } + + private void runConcurrent(int total, ConcurrentWork work) throws Exception { + ExecutorService pool = Executors.newFixedThreadPool(Math.min(total, 8)); + CountDownLatch startLatch = new CountDownLatch(1); + List> futures = java.util.stream.IntStream.range(0, total) + .mapToObj(index -> pool.submit(() -> { + startLatch.await(5, TimeUnit.SECONDS); + work.run(index); + return (Object) null; + })) + .toList(); + + startLatch.countDown(); + for (Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + pool.shutdown(); + assertTrue(pool.awaitTermination(5, TimeUnit.SECONDS)); + } + + @FunctionalInterface + private interface ConcurrentWork { + void run(int index) throws Exception; + } }