Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
774d058
feat(storage): implement dual-write async database coordinator with t…
Siddharth2812 Feb 27, 2026
863dc0c
feat: migrate secondary dual-write databases to ThebeDB
Siddharth2812 Feb 27, 2026
4ac12b0
chore: update go-kzg-4844 dependency to v1.1.0 and reorder Go imports.
Siddharth2812 Feb 27, 2026
dda6b83
feat: Persist account data atomically in both KV and SQL stores in Th…
Siddharth2812 Mar 2, 2026
153a44e
Merge branch 'main' into fix/DB_DualWrite
Siddharth2812 Mar 2, 2026
1953e3f
push proper tag for thebe in go mod
Siddharth2812 Mar 2, 2026
a7892f8
Merge branch 'main' into fix/DB_DualWrite
Siddharth2812 Mar 6, 2026
cdbd2ae
Merge branch 'main' into fix/DB_DualWrite
Siddharth2812 Mar 6, 2026
9fbe398
fix: prevent uint64 underflow in CheckNonceAndGetLatest block scan
saishibunb Mar 23, 2026
93752e7
Fix fetching first from immu rather than thebe
Siddharth2812 Mar 4, 2026
201a8ab
feat: Implement a data migration system with backfill, verification, …
Siddharth2812 Mar 6, 2026
34331b1
feat: Add BackfillManager with admin HTTP API and explicit lifecycle …
Siddharth2812 Mar 31, 2026
64e6339
fix: Resolve nil DB connections, IPv4 loopback, and promote lib/pq de…
Siddharth2812 Mar 31, 2026
c03f3ae
feat: Wire ThebeDB PostgresDSN into unified settings system
Siddharth2812 Mar 31, 2026
65a2326
Merge branch 'fix/uint64-underflow-checknonce' of github.com:JupiterM…
Siddharth2812 Apr 1, 2026
66a2473
Port Conflict for PG
Siddharth2812 Apr 1, 2026
d7b0836
Fix loader for the new PG
Siddharth2812 Apr 1, 2026
54d05cd
refactor: rename Immu-bypass DB_OPs functions to clarify write target
Siddharth2812 Apr 1, 2026
4c8d66a
Merge branch 'main' into fix/DB_DualWrite
Siddharth2812 Apr 2, 2026
7529efc
docs: add dual-write & ThebeDB documentation suite
Siddharth2812 Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions DB_OPs/BlockLogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ import (

// GetLogs retrieves logs based on filter criteria
func GetLogs(mainDBClient *config.PooledConnection, filterQuery Types.FilterQuery) ([]Types.Log, error) {

// DEFINE NEW GLOBAL REPO USAGE:
if repo, ok := GlobalRepo.(interface {
GetLogs(context.Context, Types.FilterQuery) ([]Types.Log, error)
}); ok {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
logs, err := repo.GetLogs(ctx, filterQuery)
if err == nil {
return logs, nil
}
// If custom repo fails, fall through to legacy logic
}

var err error
var shouldReturnConnection = false

Expand Down
4 changes: 4 additions & 0 deletions DB_OPs/DBConstants.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ var (
ErrTokenExpired = errors.New("authentication token expired")
ErrNoAvailableConn = errors.New("no available connections in pool")
)

// GlobalRepo is the centralized Coordinator interface for all data operations.
// It will be initialized at node startup.
var GlobalRepo interface{}
64 changes: 58 additions & 6 deletions DB_OPs/account_immuclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string,
// Debugging
// fmt.Println("AccountDoc: ", AccountDoc)
// Store the account document
err = storeAccount(PooledConnection, AccountDoc)
err = StoreAccount(PooledConnection, AccountDoc)
if err != nil {
return err
}
Expand All @@ -145,7 +145,25 @@ func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string,
}

// StoreAccount stores a Key document in the accounts database and creates a DID reference
func storeAccount(PooledConnection *config.PooledConnection, KeyDoc *Account) error {
func StoreAccount(PooledConnection *config.PooledConnection, KeyDoc *Account) error {

// DEFINE NEW GLOBAL REPO USAGE:
// If GlobalRepo is initialized (meaning the new DB architecture is activated),
// route this request through the coordinator interfaces instead of standard ImmuDB.
if repo, ok := GlobalRepo.(interface {
StoreAccount(context.Context, *Account) error
}); ok {
// Create a generous context for the distributed transaction
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

return repo.StoreAccount(ctx, KeyDoc)
}

// ==========================================
// LEGACY IMMUDB OPERATION FALLBACK
// ==========================================

var err error
var AccountDoc *Account
var shouldReturnConnection = false
Expand All @@ -163,7 +181,7 @@ func storeAccount(PooledConnection *config.PooledConnection, KeyDoc *Account) er
}

// Try to use connection pool if available, otherwise fall back to traditional approach
if PooledConnection.Client == nil {
if PooledConnection == nil || PooledConnection.Client == nil {
PooledConnection, err = GetAccountConnectionandPutBack(ctx)
if err != nil {
return fmt.Errorf("failed to get accounts connection: %w - StoreAccount", err)
Expand Down Expand Up @@ -697,7 +715,10 @@ func loadAccountByKey(PooledConnection *config.PooledConnection, key []byte, log
return &acc, nil
}

// GetAccountByDID retrieves an account by DID directly from ImmuDB.
// Read routing (ImmuDB → ThebeDB fallback) is handled at the MasterRepository level.
func GetAccountByDID(PooledConnection *config.PooledConnection, did string) (*Account, error) {

var err error
var shouldReturnConnection = false

Expand Down Expand Up @@ -731,7 +752,10 @@ func GetAccountByDID(PooledConnection *config.PooledConnection, did string) (*Ac
return loadAccountByKey(PooledConnection, didKey, "DB_OPs.GetAccountByDID")
}

// GetAccount retrieves an account by address directly from ImmuDB.
// Read routing (ImmuDB → ThebeDB fallback) is handled at the MasterRepository level.
func GetAccount(PooledConnection *config.PooledConnection, address common.Address) (*Account, error) {

var err error
var shouldReturnConnection = false

Expand Down Expand Up @@ -765,8 +789,31 @@ func GetAccount(PooledConnection *config.PooledConnection, address common.Addres
return loadAccountByKey(PooledConnection, key, "DB_OPs.GetAccount")
}

// UpdateAccountBalance updates the balance for a Account
// UpdateAccountBalance updates the balance. If GlobalRepo (MasterRepository) is set
// it delegates there so that all stores (ImmuDB + ThebeDB async) are updated.
// Falls back to UpdateAccountBalanceImmu when GlobalRepo is not yet initialised.
func UpdateAccountBalance(PooledConnection *config.PooledConnection, address common.Address, newBalance string) error {
if repo, ok := GlobalRepo.(interface {
UpdateAccountBalance(context.Context, common.Address, string) error
}); ok {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return repo.UpdateAccountBalance(ctx, address, newBalance)
}
return UpdateAccountBalanceImmu(PooledConnection, address, newBalance)
}

// UpdateAccountBalanceImmu updates the balance directly in ImmuDB, bypassing GlobalRepo.
// Called by ImmuRepository.UpdateAccountBalance to avoid the circular call:
//
// ImmuRepository → DB_OPs.UpdateAccountBalance → GlobalRepo (MasterRepository)
// → ImmuRepository → … (stack overflow)
func UpdateAccountBalanceImmu(PooledConnection *config.PooledConnection, address common.Address, newBalance string) error {

// ==========================================
// IMMUDB DIRECT WRITE
// ==========================================

fmt.Printf("=== DEBUG: UpdateAccountBalance called for address %s with balance %s ===\n", address.Hex(), newBalance)

// Define Function wide context for timeout
Expand Down Expand Up @@ -2015,8 +2062,13 @@ func CheckNonceAndGetLatest(PooledConnection *config.PooledConnection, fromAddr
startBlock = 0
}

// Process current batch of blocks (in reverse order)
for i := currentBlock; i >= startBlock; i-- {
// Process current batch of blocks (in reverse order).
// Loop is written as a top-decrement to avoid uint64 underflow: if startBlock
// is 0 and the condition were checked as "i >= startBlock" after decrement,
// i would wrap to uint64 max on the iteration where i==0, causing an infinite
// loop that attempts to fetch non-existent blocks near ^uint64(0).
for i := currentBlock + 1; i > startBlock; {
i--
block, err := GetZKBlockByNumber(PooledConnection, i)
if err != nil {
loggerCtx, cancel := context.WithCancel(context.Background())
Expand Down
43 changes: 37 additions & 6 deletions DB_OPs/immuclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -1870,8 +1870,31 @@ func Ping(ic *config.ImmuClient) error {
return nil
}

// StoreZKBlock stores a complete ZK block in the main database (UNCHANGED)
// StoreZKBlock stores a ZK block. If GlobalRepo (MasterRepository) is set it
// delegates there so that all stores (ImmuDB + ThebeDB async) are written.
// Falls back to StoreZKBlockImmu when GlobalRepo is not yet initialised.
func StoreZKBlock(mainDBClient *config.PooledConnection, block *config.ZKBlock) error {
if repo, ok := GlobalRepo.(interface {
StoreZKBlock(context.Context, *config.ZKBlock) error
}); ok {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return repo.StoreZKBlock(ctx, block)
}
return StoreZKBlockImmu(mainDBClient, block)
}

// StoreZKBlockImmu writes a block directly to ImmuDB, bypassing GlobalRepo.
// Called by ImmuRepository.StoreZKBlock to avoid the circular call:
//
// ImmuRepository → DB_OPs.StoreZKBlock → GlobalRepo (MasterRepository)
// → ImmuRepository → … (stack overflow)
func StoreZKBlockImmu(mainDBClient *config.PooledConnection, block *config.ZKBlock) error {

// ==========================================
// IMMUDB DIRECT WRITE
// ==========================================

var err error
var shouldReturnConnection = false
// Create a unique key for the block
Expand Down Expand Up @@ -2008,9 +2031,11 @@ func StoreZKBlock(mainDBClient *config.PooledConnection, block *config.ZKBlock)
return nil
}

// GetZKBlockByNumber retrieves a ZK block by its number (UNCHANGED)
// GetZKBlockByNumber retrieves a ZK block by its number directly from ImmuDB.
// Read routing (ImmuDB → ThebeDB fallback) is handled at the MasterRepository level.
func GetZKBlockByNumber(mainDBClient *config.PooledConnection, blockNumber uint64) (*config.ZKBlock, error) {
var shouldReturnConnection = false

var shouldReturnConnection bool = false
var err error
blockKey := fmt.Sprintf("%s%d", PREFIX_BLOCK, blockNumber)

Expand Down Expand Up @@ -2083,8 +2108,10 @@ func GetZKBlockByNumber(mainDBClient *config.PooledConnection, blockNumber uint6
return block, nil
}

// GetZKBlockByHash retrieves a ZK block by its hash (UNCHANGED)
// GetZKBlockByHash retrieves a ZK block by its hash directly from ImmuDB.
// Read routing (ImmuDB → ThebeDB fallback) is handled at the MasterRepository level.
func GetZKBlockByHash(mainDBClient *config.PooledConnection, blockHash string) (*config.ZKBlock, error) {

// First get the block number from the hash
var shouldReturnConnection = false
var err error
Expand Down Expand Up @@ -2164,8 +2191,10 @@ func GetZKBlockByHash(mainDBClient *config.PooledConnection, blockHash string) (
return block, nil
}

// GetLatestBlockNumber returns the latest block number (UNCHANGED)
// GetLatestBlockNumber returns the latest block number directly from ImmuDB.
// Read routing (ImmuDB → ThebeDB fallback) is handled at the MasterRepository level.
func GetLatestBlockNumber(mainDBClient *config.PooledConnection) (uint64, error) {

var err error
var shouldReturnConnection = false

Expand Down Expand Up @@ -2326,8 +2355,10 @@ func GetTransactionBlock(mainDBClient *config.PooledConnection, txHash string) (
return GetZKBlockByNumber(mainDBClient, blockNumber)
}

// Get Transaction by hash
// GetTransactionByHash retrieves a transaction by hash directly from ImmuDB.
// Read routing (ImmuDB → ThebeDB fallback) is handled at the MasterRepository level.
func GetTransactionByHash(mainDBClient *config.PooledConnection, txHash string) (*config.Transaction, error) {

// Get the block that contains the transaction.
var err error
var shouldReturnConnection = false
Expand Down
Loading
Loading