diff --git a/internal/common/block.go b/internal/common/block.go index 4bcbe04..6924464 100644 --- a/internal/common/block.go +++ b/internal/common/block.go @@ -5,28 +5,28 @@ import ( ) type Block struct { - ChainId *big.Int `json:"chain_id"` - Number *big.Int `json:"number"` - Hash string `json:"hash"` - ParentHash string `json:"parent_hash"` - Timestamp uint64 `json:"timestamp"` - Nonce string `json:"nonce"` - Sha3Uncles string `json:"sha3_uncles"` - MixHash string `json:"mix_hash"` - Miner string `json:"miner"` - StateRoot string `json:"state_root"` - TransactionsRoot string `json:"transactions_root"` - ReceiptsRoot string `json:"receipts_root"` - LogsBloom string `json:"logs_bloom"` - Size uint64 `json:"size"` - ExtraData string `json:"extra_data"` - Difficulty *big.Int `json:"difficulty"` - TotalDifficulty *big.Int `json:"total_difficulty"` - TransactionCount uint64 `json:"transaction_count"` - GasLimit *big.Int `json:"gas_limit"` - GasUsed *big.Int `json:"gas_used"` - WithdrawalsRoot string `json:"withdrawals_root"` - BaseFeePerGas uint64 `json:"base_fee_per_gas"` + ChainId *big.Int `json:"chain_id" ch:"chain_id"` + Number *big.Int `json:"number" ch:"number"` + Hash string `json:"hash" ch:"hash"` + ParentHash string `json:"parent_hash" ch:"parent_hash"` + Timestamp uint64 `json:"timestamp" ch:"timestamp"` + Nonce string `json:"nonce" ch:"nonce"` + Sha3Uncles string `json:"sha3_uncles" ch:"sha3_uncles"` + MixHash string `json:"mix_hash" ch:"mix_hash"` + Miner string `json:"miner" ch:"miner"` + StateRoot string `json:"state_root" ch:"state_root"` + TransactionsRoot string `json:"transactions_root" ch:"transactions_root"` + ReceiptsRoot string `json:"receipts_root" ch:"receipts_root"` + LogsBloom string `json:"logs_bloom" ch:"logs_bloom"` + Size uint64 `json:"size" ch:"size"` + ExtraData string `json:"extra_data" ch:"extra_data"` + Difficulty *big.Int `json:"difficulty" ch:"difficulty"` + TotalDifficulty *big.Int `json:"total_difficulty" ch:"total_difficulty"` + TransactionCount uint64 `json:"transaction_count" ch:"transaction_count"` + GasLimit *big.Int `json:"gas_limit" ch:"gas_limit"` + GasUsed *big.Int `json:"gas_used" ch:"gas_used"` + WithdrawalsRoot string `json:"withdrawals_root" ch:"withdrawals_root"` + BaseFeePerGas uint64 `json:"base_fee_per_gas" ch:"base_fee_per_gas"` } type BlockData struct { diff --git a/internal/common/log.go b/internal/common/log.go index 0338c1b..d652909 100644 --- a/internal/common/log.go +++ b/internal/common/log.go @@ -11,15 +11,15 @@ import ( ) type Log struct { - ChainId *big.Int `json:"chain_id" swaggertype:"string"` - BlockNumber *big.Int `json:"block_number" swaggertype:"string"` - BlockHash string `json:"block_hash"` - BlockTimestamp uint64 `json:"block_timestamp"` - TransactionHash string `json:"transaction_hash"` - TransactionIndex uint64 `json:"transaction_index"` - LogIndex uint64 `json:"log_index"` - Address string `json:"address"` - Data string `json:"data"` + ChainId *big.Int `json:"chain_id" ch:"chain_id" swaggertype:"string"` + BlockNumber *big.Int `json:"block_number" ch:"block_number" swaggertype:"string"` + BlockHash string `json:"block_hash" ch:"block_hash"` + BlockTimestamp uint64 `json:"block_timestamp" ch:"block_timestamp"` + TransactionHash string `json:"transaction_hash" ch:"transaction_hash"` + TransactionIndex uint64 `json:"transaction_index" ch:"transaction_index"` + LogIndex uint64 `json:"log_index" ch:"log_index"` + Address string `json:"address" ch:"address"` + Data string `json:"data" ch:"data"` Topics []string `json:"topics"` } diff --git a/internal/common/set.go b/internal/common/set.go new file mode 100644 index 0000000..d975982 --- /dev/null +++ b/internal/common/set.go @@ -0,0 +1,42 @@ +package common + +type Set[T comparable] struct { + elements map[T]struct{} +} + +// NewSet creates a new set +func NewSet[T comparable]() *Set[T] { + return &Set[T]{ + elements: make(map[T]struct{}), + } +} + +// Add inserts an element into the set +func (s *Set[T]) Add(value T) { + s.elements[value] = struct{}{} +} + +// Remove deletes an element from the set +func (s *Set[T]) Remove(value T) { + delete(s.elements, value) +} + +// Contains checks if an element is in the set +func (s *Set[T]) Contains(value T) bool { + _, found := s.elements[value] + return found +} + +// Size returns the number of elements in the set +func (s *Set[T]) Size() int { + return len(s.elements) +} + +// List returns all elements in the set as a slice +func (s *Set[T]) List() []T { + keys := make([]T, 0, len(s.elements)) + for key := range s.elements { + keys = append(keys, key) + } + return keys +} diff --git a/internal/common/trace.go b/internal/common/trace.go index 2ca4531..53d6a1f 100644 --- a/internal/common/trace.go +++ b/internal/common/trace.go @@ -5,27 +5,27 @@ import ( ) type Trace struct { - ChainID *big.Int `json:"chain_id"` - BlockNumber *big.Int `json:"block_number"` - BlockHash string `json:"block_hash"` - BlockTimestamp uint64 `json:"block_timestamp"` - TransactionHash string `json:"transaction_hash"` - TransactionIndex uint64 `json:"transaction_index"` - Subtraces int64 `json:"subtraces"` - TraceAddress []uint64 `json:"trace_address"` - TraceType string `json:"trace_type"` - CallType string `json:"call_type"` - Error string `json:"error"` - FromAddress string `json:"from_address"` - ToAddress string `json:"to_address"` - Gas *big.Int `json:"gas"` - GasUsed *big.Int `json:"gas_used"` - Input string `json:"input"` - Output string `json:"output"` - Value *big.Int `json:"value"` - Author string `json:"author"` - RewardType string `json:"reward_type"` - RefundAddress string `json:"refund_address"` + ChainID *big.Int `json:"chain_id" ch:"chain_id"` + BlockNumber *big.Int `json:"block_number" ch:"block_number"` + BlockHash string `json:"block_hash" ch:"block_hash"` + BlockTimestamp uint64 `json:"block_timestamp" ch:"block_timestamp"` + TransactionHash string `json:"transaction_hash" ch:"transaction_hash"` + TransactionIndex uint64 `json:"transaction_index" ch:"transaction_index"` + Subtraces int64 `json:"subtraces" ch:"subtraces"` + TraceAddress []uint64 `json:"trace_address" ch:"trace_address"` + TraceType string `json:"trace_type" ch:"type"` + CallType string `json:"call_type" ch:"call_type"` + Error string `json:"error" ch:"error"` + FromAddress string `json:"from_address" ch:"from_address"` + ToAddress string `json:"to_address" ch:"to_address"` + Gas *big.Int `json:"gas" ch:"gas"` + GasUsed *big.Int `json:"gas_used" ch:"gas_used"` + Input string `json:"input" ch:"input"` + Output string `json:"output" ch:"output"` + Value *big.Int `json:"value" ch:"value"` + Author string `json:"author" ch:"author"` + RewardType string `json:"reward_type" ch:"reward_type"` + RefundAddress string `json:"refund_address" ch:"refund_address"` } type RawTraces = []map[string]interface{} diff --git a/internal/common/transaction.go b/internal/common/transaction.go index 95ee93e..8e91405 100644 --- a/internal/common/transaction.go +++ b/internal/common/transaction.go @@ -10,35 +10,35 @@ import ( ) type Transaction struct { - ChainId *big.Int `json:"chain_id" swaggertype:"string"` - Hash string `json:"hash"` - Nonce uint64 `json:"nonce"` - BlockHash string `json:"block_hash"` - BlockNumber *big.Int `json:"block_number" swaggertype:"string"` - BlockTimestamp uint64 `json:"block_timestamp"` - TransactionIndex uint64 `json:"transaction_index"` - FromAddress string `json:"from_address"` - ToAddress string `json:"to_address"` - Value *big.Int `json:"value" swaggertype:"string"` - Gas uint64 `json:"gas"` - GasPrice *big.Int `json:"gas_price" swaggertype:"string"` - Data string `json:"data"` - FunctionSelector string `json:"function_selector"` - MaxFeePerGas *big.Int `json:"max_fee_per_gas" swaggertype:"string"` - MaxPriorityFeePerGas *big.Int `json:"max_priority_fee_per_gas" swaggertype:"string"` - TransactionType uint8 `json:"transaction_type"` - R *big.Int `json:"r" swaggertype:"string"` - S *big.Int `json:"s" swaggertype:"string"` - V *big.Int `json:"v" swaggertype:"string"` - AccessListJson *string `json:"access_list_json"` - ContractAddress *string `json:"contract_address"` - GasUsed *uint64 `json:"gas_used"` - CumulativeGasUsed *uint64 `json:"cumulative_gas_used"` - EffectiveGasPrice *big.Int `json:"effective_gas_price" swaggertype:"string"` - BlobGasUsed *uint64 `json:"blob_gas_used"` - BlobGasPrice *big.Int `json:"blob_gas_price" swaggertype:"string"` - LogsBloom *string `json:"logs_bloom"` - Status *uint64 `json:"status"` + ChainId *big.Int `json:"chain_id" ch:"chain_id" swaggertype:"string"` + Hash string `json:"hash" ch:"hash"` + Nonce uint64 `json:"nonce" ch:"nonce"` + BlockHash string `json:"block_hash" ch:"block_hash"` + BlockNumber *big.Int `json:"block_number" ch:"block_number" swaggertype:"string"` + BlockTimestamp uint64 `json:"block_timestamp" ch:"block_timestamp"` + TransactionIndex uint64 `json:"transaction_index" ch:"transaction_index"` + FromAddress string `json:"from_address" ch:"from_address"` + ToAddress string `json:"to_address" ch:"to_address"` + Value *big.Int `json:"value" ch:"value" swaggertype:"string"` + Gas uint64 `json:"gas" ch:"gas"` + GasPrice *big.Int `json:"gas_price" ch:"gas_price" swaggertype:"string"` + Data string `json:"data" ch:"data"` + FunctionSelector string `json:"function_selector" ch:"function_selector"` + MaxFeePerGas *big.Int `json:"max_fee_per_gas" ch:"max_fee_per_gas" swaggertype:"string"` + MaxPriorityFeePerGas *big.Int `json:"max_priority_fee_per_gas" ch:"max_priority_fee_per_gas" swaggertype:"string"` + TransactionType uint8 `json:"transaction_type" ch:"transaction_type"` + R *big.Int `json:"r" ch:"r" swaggertype:"string"` + S *big.Int `json:"s" ch:"s" swaggertype:"string"` + V *big.Int `json:"v" ch:"v" swaggertype:"string"` + AccessListJson *string `json:"access_list_json" ch:"access_list_json"` + ContractAddress *string `json:"contract_address" ch:"contract_address"` + GasUsed *uint64 `json:"gas_used" ch:"gas_used"` + CumulativeGasUsed *uint64 `json:"cumulative_gas_used" ch:"cumulative_gas_used"` + EffectiveGasPrice *big.Int `json:"effective_gas_price" ch:"effective_gas_price" swaggertype:"string"` + BlobGasUsed *uint64 `json:"blob_gas_used" ch:"blob_gas_used"` + BlobGasPrice *big.Int `json:"blob_gas_price" ch:"blob_gas_price" swaggertype:"string"` + LogsBloom *string `json:"logs_bloom" ch:"logs_bloom"` + Status *uint64 `json:"status" ch:"status"` } type DecodedTransactionData struct { diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 5a62dbf..52cb7cc 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -64,8 +64,9 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { Settings: func() clickhouse.Settings { if cfg.AsyncInsert { return clickhouse.Settings{ - "async_insert": "1", - "wait_for_async_insert": "1", + "async_insert": "1", + "wait_for_async_insert": "1", + "lightweight_deletes_sync": "0", } } return clickhouse.Settings{} @@ -954,68 +955,183 @@ func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, } func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error { - var saveErr error - var saveErrMutex sync.Mutex + var deleteErr error + var deleteErrMutex sync.Mutex var wg sync.WaitGroup wg.Add(4) go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "blocks", "number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting blocks: %v", err) - saveErrMutex.Unlock() + if err := c.deleteBlocksByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting blocks: %v", err) + deleteErrMutex.Unlock() } }() go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "logs", "block_number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting logs: %v", err) - saveErrMutex.Unlock() + if err := c.deleteLogsByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting logs: %v", err) + deleteErrMutex.Unlock() } }() go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "transactions", "block_number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting transactions: %v", err) - saveErrMutex.Unlock() + if err := c.deleteTransactionsByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting transactions: %v", err) + deleteErrMutex.Unlock() } }() go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "traces", "block_number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting traces: %v", err) - saveErrMutex.Unlock() + if err := c.deleteTracesByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting traces: %v", err) + deleteErrMutex.Unlock() } }() wg.Wait() - if saveErr != nil { - return saveErr + if deleteErr != nil { + return deleteErr + } + return nil +} + +func (c *ClickHouseConnector) deleteBlocksByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { + query := fmt.Sprintf("DELETE FROM %s.blocks WHERE _partition_value.1 = ? AND chain_id = ? AND number IN (?)", c.cfg.Database) + + blockNumbersStr := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumbersStr[i] = bn.String() + } + err := c.conn.Exec(context.Background(), query, chainId, chainId, blockNumbersStr) + if err != nil { + return fmt.Errorf("error deleting blocks: %w", err) } return nil } -func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, blockNumberColumn string) error { - query := fmt.Sprintf("DELETE FROM %s.%s WHERE chain_id = ? AND %s IN (?)", c.cfg.Database, table, blockNumberColumn) +func (c *ClickHouseConnector) deleteLogsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { + blockNumbersStr := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumbersStr[i] = bn.String() + } + getQuery := fmt.Sprintf("SELECT block_number, transaction_hash, log_index FROM %s.logs WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) + + rows, getErr := c.conn.Query(context.Background(), getQuery, blockNumbersStr) + if getErr != nil { + return getErr + } + defer rows.Close() + + blockNumbersToDelete := common.NewSet[string]() + txHashesToDelete := common.NewSet[string]() + logIndexesToDelete := common.NewSet[uint64]() + for rows.Next() { + var logToDelete common.Log + err := rows.ScanStruct(&logToDelete) + if err != nil { + return err + } + blockNumbersToDelete.Add(logToDelete.BlockNumber.String()) + txHashesToDelete.Add(logToDelete.TransactionHash) + logIndexesToDelete.Add(logToDelete.LogIndex) + } + + if txHashesToDelete.Size() == 0 { + return nil // No logs to delete + } + + deleteQuery := fmt.Sprintf("DELETE FROM %s.logs WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND transaction_hash IN (?) AND log_index IN (?)", c.cfg.Database) + + err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), txHashesToDelete.List(), logIndexesToDelete.List()) + if err != nil { + return err + } + return nil +} +func (c *ClickHouseConnector) deleteTransactionsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { blockNumbersStr := make([]string, len(blockNumbers)) for i, bn := range blockNumbers { blockNumbersStr[i] = bn.String() } + getQuery := fmt.Sprintf("SELECT block_number, hash FROM %s.transactions WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) - err := c.conn.Exec(context.Background(), query, chainId, blockNumbersStr) + rows, getErr := c.conn.Query(context.Background(), getQuery, blockNumbersStr) + if getErr != nil { + return getErr + } + defer rows.Close() + + blockNumbersToDelete := common.NewSet[string]() + hashesToDelete := common.NewSet[string]() + + for rows.Next() { + var txToDelete common.Transaction + err := rows.ScanStruct(&txToDelete) + if err != nil { + return err + } + blockNumbersToDelete.Add(txToDelete.BlockNumber.String()) + hashesToDelete.Add(txToDelete.Hash) + } + + if hashesToDelete.Size() == 0 { + return nil // No transactions to delete + } + + deleteQuery := fmt.Sprintf("DELETE FROM %s.transactions WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND hash IN (?)", c.cfg.Database) + + err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), hashesToDelete.List()) if err != nil { - return fmt.Errorf("error deleting from %s: %w", table, err) + return err + } + return nil +} + +func (c *ClickHouseConnector) deleteTracesByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { + blockNumbersStr := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumbersStr[i] = bn.String() + } + getQuery := fmt.Sprintf("SELECT block_number, transaction_hash FROM %s.traces WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) + + rows, getErr := c.conn.Query(context.Background(), getQuery, blockNumbersStr) + if getErr != nil { + return getErr + } + defer rows.Close() + + blockNumbersToDelete := common.NewSet[string]() + txHashesToDelete := common.NewSet[string]() + for rows.Next() { + var traceToDelete common.Trace + err := rows.ScanStruct(&traceToDelete) + if err != nil { + return err + } + blockNumbersToDelete.Add(traceToDelete.BlockNumber.String()) + txHashesToDelete.Add(traceToDelete.TransactionHash) + } + + if txHashesToDelete.Size() == 0 { + return nil // No traces to delete } + deleteQuery := fmt.Sprintf("DELETE FROM %s.traces WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND transaction_hash IN (?)", c.cfg.Database) + + err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), txHashesToDelete.List()) + if err != nil { + return err + } return nil }