Skip to content

Commit

Permalink
automated clickhouse migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Jan 15, 2025
1 parent 9538ef4 commit 4c9bc7b
Show file tree
Hide file tree
Showing 17 changed files with 359 additions and 164 deletions.
6 changes: 6 additions & 0 deletions cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/thirdweb-dev/indexer/db"
"github.com/thirdweb-dev/indexer/internal/orchestrator"
"github.com/thirdweb-dev/indexer/internal/rpc"
)
Expand All @@ -28,6 +29,11 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}

err = db.RunMigrations()
if err != nil {
log.Fatal().Err(err).Msg("Failed to run migrations")
}

orchestrator, err := orchestrator.NewOrchestrator(rpc)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create orchestrator")
Expand Down
14 changes: 14 additions & 0 deletions db/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Insight DB migrations

## Clickhouse

Migrations are managed using [golang-migrate](https://github.com/golang-migrate/migrate) and the migration files are located in the `ch_migrations` directory.

Each storage type (orchestrator, staging and main) has its own migrations.

To add a new migration, run
```
migrate create -ext sql -dir db/ch_migrations/<storage_type> <migration_name>
```

Migrations are run when the indexer is started.
4 changes: 4 additions & 0 deletions db/ch_migrations/main/20250114110057_init.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS blocks;
DROP TABLE IF EXISTS logs;
DROP TABLE IF EXISTS transactions;
DROP TABLE IF EXISTS traces;
142 changes: 142 additions & 0 deletions db/ch_migrations/main/20250114110057_init.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
-- create blocks table
CREATE TABLE IF NOT EXISTS blocks (
`chain_id` UInt256,
`number` UInt256,
`timestamp` UInt64 CODEC(Delta, ZSTD),
`hash` FixedString(66),
`parent_hash` FixedString(66),
`sha3_uncles` FixedString(66),
`nonce` FixedString(18),
`mix_hash` FixedString(66),
`miner` FixedString(42),
`state_root` FixedString(66),
`transactions_root` FixedString(66),
`receipts_root` FixedString(66),
`logs_bloom` String,
`size` UInt64,
`extra_data` String,
`difficulty` UInt256,
`total_difficulty` UInt256,
`transaction_count` UInt64,
`gas_limit` UInt256,
`gas_used` UInt256,
`withdrawals_root` Nullable(FixedString(66)),
`base_fee_per_gas` Nullable(UInt64),
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 1,
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, number)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create logs table
CREATE TABLE IF NOT EXISTS logs (
`chain_id` UInt256,
`block_number` UInt256,
`block_hash` FixedString(66),
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
`transaction_hash` FixedString(66),
`transaction_index` UInt64,
`log_index` UInt64,
`address` FixedString(42),
`data` String,
`topic_0` String,
`topic_1` Nullable(String),
`topic_2` Nullable(String),
`topic_3` Nullable(String),
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_address address TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic0 topic_0 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic1 topic_1 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic2 topic_2 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic3 topic_3 TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number, transaction_hash, log_index)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create transactions table
CREATE TABLE IF NOT EXISTS transactions (
`chain_id` UInt256,
`hash` FixedString(66),
`nonce` UInt64,
`block_hash` FixedString(66),
`block_number` UInt256,
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
`transaction_index` UInt64,
`from_address` FixedString(42),
`to_address` FixedString(42),
`value` UInt256,
`gas` UInt64,
`gas_price` UInt256,
`data` String,
`function_selector` FixedString(10),
`max_fee_per_gas` UInt128,
`max_priority_fee_per_gas` UInt128,
`transaction_type` UInt8,
`r` UInt256,
`s` UInt256,
`v` UInt256,
`access_list` Nullable(String),
`contract_address` Nullable(FixedString(42)),
`gas_used` Nullable(UInt64),
`cumulative_gas_used` Nullable(UInt64),
`effective_gas_price` Nullable(UInt256),
`blob_gas_used` Nullable(UInt64),
`blob_gas_price` Nullable(UInt256),
`logs_bloom` Nullable(String),
`status` Nullable(UInt64),
`is_deleted` UInt8 DEFAULT 0,
`insert_timestamp` DateTime DEFAULT now(),
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_function_selector function_selector TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number, hash)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create traces table
CREATE TABLE IF NOT EXISTS traces (
`chain_id` UInt256,
`block_number` UInt256,
`block_hash` FixedString(66),
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
`transaction_hash` FixedString(66),
`transaction_index` UInt64,
`subtraces` Int64,
`trace_address` Array(Int64),
`type` LowCardinality(String),
`call_type` LowCardinality(String),
`error` Nullable(String),
`from_address` FixedString(42),
`to_address` FixedString(42),
`gas` UInt64,
`gas_used` UInt64,
`input` String,
`output` Nullable(String),
`value` UInt256,
`author` Nullable(FixedString(42)),
`reward_type` LowCardinality(Nullable(String)),
`refund_address` Nullable(FixedString(42)),
`is_deleted` UInt8 DEFAULT 0,
`insert_timestamp` DateTime DEFAULT now(),
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_type type TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number, transaction_hash, trace_address)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
2 changes: 2 additions & 0 deletions db/ch_migrations/orchestrator/20250114110046_init.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS block_failures;
DROP TABLE IF EXISTS cursors;
22 changes: 22 additions & 0 deletions db/ch_migrations/orchestrator/20250114110046_init.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- create block failures table
CREATE TABLE IF NOT EXISTS block_failures (
`chain_id` UInt256,
`block_number` UInt256,
`last_error_timestamp` UInt64 CODEC(Delta, ZSTD),
`count` UInt16,
`reason` String,
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create cursors table
CREATE TABLE IF NOT EXISTS cursors (
`chain_id` UInt256,
`cursor_type` String,
`cursor_value` String,
`insert_timestamp` DateTime DEFAULT now(),
) ENGINE = ReplacingMergeTree(insert_timestamp)
ORDER BY (chain_id, cursor_type);
1 change: 1 addition & 0 deletions db/ch_migrations/staging/20250114110054_init.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS block_data;
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
CREATE TABLE block_data (
-- create staging table
CREATE TABLE IF NOT EXISTS block_data (
`chain_id` UInt256,
`block_number` UInt256,
`data` String,
Expand All @@ -8,4 +9,4 @@ CREATE TABLE block_data (
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
139 changes: 139 additions & 0 deletions db/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package db

import (
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/clickhouse"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
)

func RunMigrations() error {
storageConfigs := []struct {
Type string
Config *config.ClickhouseConfig
}{
{Type: "orchestrator", Config: config.Cfg.Storage.Orchestrator.Clickhouse},
{Type: "staging", Config: config.Cfg.Storage.Staging.Clickhouse},
{Type: "main", Config: config.Cfg.Storage.Main.Clickhouse},
}

groupedConfigs := make(map[string][]struct {
Type string
Config *config.ClickhouseConfig
})
for _, cfg := range storageConfigs {
key := fmt.Sprintf("%s:%d:%s", cfg.Config.Host, cfg.Config.Port, cfg.Config.Database)
groupedConfigs[key] = append(groupedConfigs[key], cfg)
}

removeTmpMigrations() // just in case
for _, cfgs := range groupedConfigs {
var types []string
for _, cfg := range cfgs {
copyMigrationsToTmp([]string{"db/ch_migrations/" + cfg.Type})
types = append(types, cfg.Type)
}
log.Info().Msgf("Running Clickhouse migrations for %s", strings.Join(types, ", "))
runClickhouseMigrations(cfgs[0].Config)
removeTmpMigrations()
log.Info().Msgf("Clickhouse migration completed for %s", strings.Join(types, ", "))
}

log.Info().Msg("All Clickhouse migrations completed")

return nil
}

func runClickhouseMigrations(cfg *config.ClickhouseConfig) error {
if cfg.Host == "" {
return nil
}

secureParam := "&secure=true"
if cfg.DisableTLS {
secureParam = "&secure=false"
}

url := fmt.Sprintf("clickhouse://%s:%d/%s?username=%s&password=%s%s&x-multi-statement=true&x-migrations-table-engine=MergeTree",
cfg.Host,
cfg.Port,
cfg.Database,
cfg.Username,
cfg.Password,
secureParam,
)

m, err := migrate.New("file://db/ch_migrations/tmp", url)
if err != nil {
return err
}
m.Up()

m.Close()

return nil
}

func copyMigrationsToTmp(sources []string) error {
destination := "db/ch_migrations/tmp"
err := os.MkdirAll(destination, os.ModePerm)
if err != nil {
return err
}

for _, source := range sources {
filepath.Walk(source, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// skip directories
if info.IsDir() {
return nil
}
// determine destination path
relPath, err := filepath.Rel(source, path)
if err != nil {
return err
}
destPath := filepath.Join(destination, relPath)
if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
return err
}

return copyFile(path, destPath)
})
}
return nil
}

func copyFile(src string, dst string) error {
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()

dstFile, err := os.Create(dst)
if err != nil {
return err
}
defer dstFile.Close()

_, err = io.Copy(dstFile, srcFile)
return err
}

func removeTmpMigrations() error {
err := os.RemoveAll("db/ch_migrations/tmp")
if err != nil {
return fmt.Errorf("error removing directory: %w", err)
}
return nil
}
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ require (
github.com/ethereum/go-ethereum v1.14.8
github.com/gin-gonic/gin v1.10.0
github.com/go-redis/redis/v8 v8.11.5
github.com/golang-migrate/migrate/v4 v4.18.1
github.com/gorilla/schema v1.4.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/lib/pq v1.10.9
github.com/prometheus/client_golang v1.20.4
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
Expand Down Expand Up @@ -57,6 +59,8 @@ require (
github.com/goccy/go-json v0.10.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.3.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down Expand Up @@ -100,8 +104,9 @@ require (
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/urfave/cli/v2 v2.27.4 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.10.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
Expand Down
Loading

0 comments on commit 4c9bc7b

Please sign in to comment.