From aef4682b517e25358c6f3793e445bff02b5ba9db Mon Sep 17 00:00:00 2001 From: KyrinCode Date: Tue, 21 Jan 2025 17:02:28 +0800 Subject: [PATCH 1/4] setup xlayer-recover (also as sequencer) --- test/Makefile | 3 +++ test/docker-compose.yml | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/test/Makefile b/test/Makefile index bce4fe035bd..834d4014621 100644 --- a/test/Makefile +++ b/test/Makefile @@ -12,6 +12,7 @@ DOCKER_BRIDGE_UI := xlayer-bridge-ui DOCKER_PROVER := xlayer-prover DOCKER_APPROVE := xlayer-approve DOCKER_SEQ := xlayer-seq +DOCKER_RECOVER := xlayer-recover DOCKER_RPC := xlayer-rpc DOCKER_L1_NETWORK := xlayer-mock-l1-network DOCKER_DATA_AVAILABILITY := xlayer-da @@ -36,6 +37,7 @@ RUN_DOCKER_BRIDGE_UI := $(DOCKER_COMPOSE) up -d $(DOCKER_BRIDGE_UI) RUN_DOCKER_PROVER := $(DOCKER_COMPOSE) up -d $(DOCKER_PROVER) RUN_DOCKER_APPROVE := $(DOCKER_COMPOSE) up -d $(DOCKER_APPROVE) RUN_DOCKER_SEQ := $(DOCKER_COMPOSE) up -d $(DOCKER_SEQ) +RUN_DOCKER_RECOVER := $(DOCKER_COMPOSE) up -d $(DOCKER_RECOVER) RUN_DOCKER_RPC := $(DOCKER_COMPOSE) up -d $(DOCKER_RPC) RUN_DOCKER_L1_NETWORK := $(DOCKER_COMPOSE) up -d $(DOCKER_L1_NETWORK) RUN_DOCKER_DATA_AVAILABILITY := $(DOCKER_COMPOSE) up -d $(DOCKER_DATA_AVAILABILITY) @@ -69,6 +71,7 @@ run: build-docker ## Runs a full node sleep 3 $(RUN_DOCKER_SEQ) $(RUN_DOCKER_PROVER) + $(RUN_DOCKER_RECOVER) sleep 10 $(RUN_DOCKER_DS) $(RUN_DOCKER_SIGNER) diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 914485ad82b..19340569c76 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -114,6 +114,25 @@ services: command: > cdk-erigon --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + xlayer-recover: + container_name: xlayer-recover + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=1 + ports: + - 6059:6060 + - 8119:8545 + - 6899:6900 + - 9089:9095 + volumes: + - ./data/recover/:/home/erigon/data/ + - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + cdk-erigon --zkevm.l1-sync-start-block=353 --zkevm.da-url=http://xlayer-da:8444 --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + xlayer-rpc: container_name: xlayer-rpc image: cdk-erigon From d399fc859a57fac9e3de341f3f3b96f948ca71cd Mon Sep 17 00:00:00 2001 From: KyrinCode Date: Sun, 26 Jan 2025 18:09:29 +0800 Subject: [PATCH 2/4] setup xlayer-lrp --- Dockerfile.local | 3 +- cmd/utils/flags.go | 4 +- test/Makefile | 44 ++++++++++++- test/docker-compose-lrp.yml | 123 ++++++++++++++++++++++++++++++++++++ test/docker-compose.yml | 103 +++++++++++++++++++++++++++--- test/fetch_blocks.sh | 42 ++++++++++++ test/process_log.py | 75 ++++++++++++++++++++++ 7 files changed, 379 insertions(+), 15 deletions(-) create mode 100644 test/docker-compose-lrp.yml create mode 100755 test/fetch_blocks.sh create mode 100644 test/process_log.py diff --git a/Dockerfile.local b/Dockerfile.local index c834e7c3308..69eb76d7ea5 100644 --- a/Dockerfile.local +++ b/Dockerfile.local @@ -15,7 +15,7 @@ ADD . . RUN --mount=type=cache,target=/root/.cache \ --mount=type=cache,target=/tmp/go-build \ --mount=type=cache,target=/go/pkg/mod \ - make BUILD_TAGS=nosqlite,noboltdb,nosilkworm cdk-erigon + make BUILD_TAGS=nosqlite,noboltdb,nosilkworm cdk-erigon integration FROM docker.io/library/golang:1.21-alpine3.17 AS tools-builder RUN apk --no-cache add build-base linux-headers git bash ca-certificates libstdc++ @@ -46,6 +46,7 @@ WORKDIR /home/erigon ## then give each binary its own layer COPY --from=builder /app/build/bin/cdk-erigon /usr/local/bin/cdk-erigon +COPY --from=builder /app/build/bin/integration /usr/local/bin/integration EXPOSE 8545 \ 8551 \ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f4cb4aaa88f..82f887e45d8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1454,7 +1454,7 @@ var ( YieldSizeFlag = cli.Uint64Flag{ Name: "yieldsize", Usage: "transaction count fetched from txpool each time", - Value: 1000, + Value: 30, } ) @@ -1945,7 +1945,7 @@ func setTxPool(ctx *cli.Context, fullCfg *ethconfig.Config) { // For X Layer setTxPoolXLayer(ctx, cfg) - + purgeEvery := ctx.Duration(TxpoolPurgeEveryFlag.Name) purgeDistance := ctx.Duration(TxpoolPurgeDistanceFlag.Name) diff --git a/test/Makefile b/test/Makefile index 834d4014621..a2a23030b40 100644 --- a/test/Makefile +++ b/test/Makefile @@ -12,7 +12,6 @@ DOCKER_BRIDGE_UI := xlayer-bridge-ui DOCKER_PROVER := xlayer-prover DOCKER_APPROVE := xlayer-approve DOCKER_SEQ := xlayer-seq -DOCKER_RECOVER := xlayer-recover DOCKER_RPC := xlayer-rpc DOCKER_L1_NETWORK := xlayer-mock-l1-network DOCKER_DATA_AVAILABILITY := xlayer-da @@ -37,7 +36,6 @@ RUN_DOCKER_BRIDGE_UI := $(DOCKER_COMPOSE) up -d $(DOCKER_BRIDGE_UI) RUN_DOCKER_PROVER := $(DOCKER_COMPOSE) up -d $(DOCKER_PROVER) RUN_DOCKER_APPROVE := $(DOCKER_COMPOSE) up -d $(DOCKER_APPROVE) RUN_DOCKER_SEQ := $(DOCKER_COMPOSE) up -d $(DOCKER_SEQ) -RUN_DOCKER_RECOVER := $(DOCKER_COMPOSE) up -d $(DOCKER_RECOVER) RUN_DOCKER_RPC := $(DOCKER_COMPOSE) up -d $(DOCKER_RPC) RUN_DOCKER_L1_NETWORK := $(DOCKER_COMPOSE) up -d $(DOCKER_L1_NETWORK) RUN_DOCKER_DATA_AVAILABILITY := $(DOCKER_COMPOSE) up -d $(DOCKER_DATA_AVAILABILITY) @@ -71,7 +69,6 @@ run: build-docker ## Runs a full node sleep 3 $(RUN_DOCKER_SEQ) $(RUN_DOCKER_PROVER) - $(RUN_DOCKER_RECOVER) sleep 10 $(RUN_DOCKER_DS) $(RUN_DOCKER_SIGNER) @@ -148,3 +145,44 @@ test-1: stop all ## Runs group 1 e2e tests checking race conditions sleep 3 docker ps -a trap '$(STOP)' EXIT; MallocNanoZone=0 go test -count=1 -failfast -race -v -p 1 -timeout 600s ../ci/e2e-1/... + +DOCKER_COMPOSE_LRP := docker compose -f docker-compose-lrp.yml +DOCKER_LRP_UNWIND := xlayer-lrp-unwind +DOCKER_LRP_RESEQUENCE := xlayer-lrp-resequence + +RUN_DOCKER_LRP_STATELESS_EXECUTOR := $(DOCKER_COMPOSE_LRP) up -d $(DOCKER_STATELESS_EXECUTOR) +RUN_DOCKER_LRP_L1_NETWORK := $(DOCKER_COMPOSE_LRP) up -d $(DOCKER_L1_NETWORK) +RUN_DOCKER_LRP_UNWIND := $(DOCKER_COMPOSE_LRP) up -d $(DOCKER_LRP_UNWIND) +RUN_DOCKER_LRP_RESEQUENCE := $(DOCKER_COMPOSE_LRP) up -d $(DOCKER_LRP_RESEQUENCE) + +STOP_LRP := $(DOCKER_COMPOSE_LRP) down --remove-orphans; sleep 3; rm -rf data + +.PHONY: lrp-run +lrp-run: build-docker + $(RUN_DOCKER_LRP_L1_NETWORK) + $(RUN_DOCKER_LRP_STATELESS_EXECUTOR) + sleep 3 + @read -p "Enter unwind to batch number: " UNWIND_BATCH_NO; \ + if [ "$(shell uname)" = "Darwin" ]; then \ + sed -i '' "s/--unwind-batch-no=[0-9]*/--unwind-batch-no=$$UNWIND_BATCH_NO/" docker-compose-lrp.yml; \ + else \ + sed -i "s/--unwind-batch-no=[0-9]*/--unwind-batch-no=$$UNWIND_BATCH_NO/" docker-compose-lrp.yml; \ + fi + $(RUN_DOCKER_LRP_UNWIND) + sleep 20 + $(RUN_DOCKER_LRP_RESEQUENCE) + +.PHONY: lrp-stop +lrp-stop: + $(STOP_LRP) + + +TARGET_LOG_FILE := filtered.log + +.PHONY: lrp-check +lrp-check: ## Filter logs for LRP & Compute TPS & Check stateroots (configure debugToolsConfig.yaml) + docker logs $(DOCKER_LRP_RESEQUENCE) 2>&1 | grep "\[5/13 Execution\]" > $(TARGET_LOG_FILE) + sleep 3 + python process_log.py $(TARGET_LOG_FILE) + sleep 3 + go run ../zk/debug_tools/rpc-blockhashes-compare/main.go \ No newline at end of file diff --git a/test/docker-compose-lrp.yml b/test/docker-compose-lrp.yml new file mode 100644 index 00000000000..a078e287ea4 --- /dev/null +++ b/test/docker-compose-lrp.yml @@ -0,0 +1,123 @@ +version: "3.5" +networks: + default: + name: erigon-lrp +services: + # xlayer-lrp-sequence: + # container_name: xlayer-lrp-sequence + # image: cdk-erigon + # environment: + # - CDK_ERIGON_SEQUENCER=1 + # ports: + # - 6062:6060 + # - 8126:8545 + # - 6902:6900 + # - 9093:9095 + # volumes: + # - ./data/lrp/:/home/erigon/data/ + # - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml + # - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + # - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + # - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + # command: > + # cdk-erigon --zkevm.l1-cache-enabled=true --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + + xlayer-lrp: + container_name: xlayer-lrp + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=0 + ports: + - 6062:6060 + - 8126:8545 + - 6902:6900 + - 9093:9095 + volumes: + - ./data/lrp/:/home/erigon/data/ + - ./config/test.erigon.rpc.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + cdk-erigon --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + + xlayer-lrp-unwind: + container_name: xlayer-lrp-unwind + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=0 + volumes: + - ./data/lrp/:/home/erigon/data/ + - ./config/test.erigon.rpc.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: | + integration state_stages_zkevm \ + --config=/usr/src/app/config.yaml \ + --chain dynamic-mynetwork \ + --datadir /home/erigon/data/ \ + --unwind-batch-no=10 + + xlayer-lrp-resequence: + container_name: xlayer-lrp-resequence + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=1 + ports: + - 6062:6060 + - 8126:8545 + - 6902:6900 + - 9093:9095 + volumes: + - ./data/lrp/:/home/erigon/data/ + - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + cdk-erigon --zkevm.l1-cache-enabled=true --zkevm.sequencer-resequence=true --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + + xlayer-executor: + container_name: xlayer-executor + image: hermeznetwork/zkevm-prover:v9.0.0-RC1-fork.13 + platform: linux/amd64 + environment: + - EXPERIMENTAL_DOCKER_DESKTOP_FORCE_QEMU=1 + ports: + - 0.0.0.0:50061:50061 # MT + - 0.0.0.0:50071:50071 # Executor + volumes: + - ./config/test.stateless_executor.config.json:/usr/src/app/config.json + command: > + zkProver -c /usr/src/app/config.json + + xlayer-mock-l1-network: + container_name: xlayer-mock-l1-network + image: zjg555543/geth:fork13-v0.0.3 + ports: + - 8545:8545 + - 8546:8546 + command: + - "--http" + - "--http.api" + - "admin,eth,debug,miner,net,txpool,personal,web3" + - "--http.addr" + - "0.0.0.0" + - "--http.corsdomain" + - "*" + - "--http.vhosts" + - "*" + - "--ws" + - "--ws.origins" + - "*" + - "--ws.addr" + - "0.0.0.0" + - "--dev" + - "--dev.period" + - "1" + - "--datadir" + - "/geth_data" + - "--syncmode" + - "full" + - "--rpc.allow-unprotected-txs" \ No newline at end of file diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 19340569c76..567d33202b3 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -112,26 +112,59 @@ services: - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json command: > - cdk-erigon --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + cdk-erigon --zkevm.l1-cache-enabled=true --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml - xlayer-recover: - container_name: xlayer-recover + xlayer-seq-resequence: + container_name: xlayer-seq-resequence image: cdk-erigon environment: - CDK_ERIGON_SEQUENCER=1 ports: - - 6059:6060 - - 8119:8545 - - 6899:6900 - - 9089:9095 + - 6060:6060 + - 8123:8545 + - 6900:6900 + - 9092:9095 volumes: - - ./data/recover/:/home/erigon/data/ + - ./data/seq/:/home/erigon/data/ + - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + cdk-erigon --zkevm.l1-cache-enabled=true --zkevm.sequencer-resequence=true --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + + xlayer-seq-recover: + container_name: xlayer-seq-recover + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=1 + ports: + - 6060:6060 + - 8123:8545 + - 6900:6900 + - 9092:9095 + volumes: + - ./data/seq/:/home/erigon/data/ + - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + cdk-erigon --zkevm.l1-cache-enabled=true --zkevm.l1-sync-start-block=353 --zkevm.da-url=http://xlayer-da:8444 --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + + xlayer-seq-unwind: + container_name: xlayer-seq-unwind + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=1 + volumes: + - ./data/seq/:/home/erigon/data/ - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json command: > - cdk-erigon --zkevm.l1-sync-start-block=353 --zkevm.da-url=http://xlayer-da:8444 --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + integration state_stages_zkevm --config=/usr/src/app/config.yaml --chain dynamic-mynetwork --datadir /home/erigon/data/ --unwind-batch-no=1 xlayer-rpc: container_name: xlayer-rpc @@ -152,6 +185,58 @@ services: command: > cdk-erigon --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + xlayer-lrp: + container_name: xlayer-lrp + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=0 + ports: + - 6062:6060 + - 8126:8545 + - 6902:6900 + - 9093:9095 + volumes: + - ./data/lrp/:/home/erigon/data/ + - ./config/test.erigon.rpc.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + cdk-erigon --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + + xlayer-lrp-unwind: + container_name: xlayer-lrp-unwind + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=0 + volumes: + - ./data/lrp/:/home/erigon/data/ + - ./config/test.erigon.rpc.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + integration state_stages_zkevm --config=/usr/src/app/config.yaml --chain dynamic-mynetwork --datadir /home/erigon/data/ --unwind-batch-no=1 + + xlayer-lrp-resequence: + container_name: xlayer-lrp-resequence + image: cdk-erigon + environment: + - CDK_ERIGON_SEQUENCER=1 + ports: + - 6062:6060 + - 8126:8545 + - 6902:6900 + - 9093:9095 + volumes: + - ./data/lrp/:/home/erigon/data/ + - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml + - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json + - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json + - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json + command: > + cdk-erigon --zkevm.l1-cache-enabled=true --zkevm.sequencer-resequence=true --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + xlayer-mock-l1-network: container_name: xlayer-mock-l1-network image: zjg555543/geth:fork13-v0.0.3 diff --git a/test/fetch_blocks.sh b/test/fetch_blocks.sh new file mode 100755 index 00000000000..af2c1f6865b --- /dev/null +++ b/test/fetch_blocks.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +# 检查是否提供了所有必要的参数 +if [ -z "$1" ] || [ -z "$2" ] || [ -z "$3" ] || [ -z "$4" ]; then + echo "Usage: $0 --to --rpc-url [--from ] [-f ]" + exit 1 +fi + +# 设置默认目标文件名 +START_BLOCKNUMBER=0 +TARGET_FILE="blocks_info.txt" + +# 读取参数 +while [[ "$#" -gt 0 ]]; do + case $1 in + --from) START_BLOCKNUMBER="$2"; shift ;; + --to) END_BLOCKNUMBER="$2"; shift ;; + --rpc-url) RPC_URL="$2"; shift ;; + -f) TARGET_FILE="$2"; shift ;; + *) echo "Unknown parameter passed: $1"; exit 1 ;; + esac + shift +done + +# 检查是否所有参数都已设置 +if [ -z "$END_BLOCKNUMBER" ] || [ -z "$RPC_URL" ]; then + echo "Usage: $0 --to --rpc-url [--from ] [-f ]" + exit 1 +fi + +# 清空目标文件 +> $TARGET_FILE + +# 循环获取区块信息 +for blocknumber in $(seq $START_BLOCKNUMBER $END_BLOCKNUMBER) +do + echo "Fetching block $blocknumber..." + cast block $blocknumber --rpc-url $RPC_URL >> $TARGET_FILE + echo "" >> $TARGET_FILE # 添加一个空行以分隔每个区块的信息 +done + +echo "All blocks fetched and saved to $TARGET_FILE" \ No newline at end of file diff --git a/test/process_log.py b/test/process_log.py new file mode 100644 index 00000000000..61a764a691f --- /dev/null +++ b/test/process_log.py @@ -0,0 +1,75 @@ +import re +import argparse +import logging +from datetime import datetime + +# 配置日志记录 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def parse_time(line): + # 假设时间戳在行的开头,格式为 [MM-DD|HH:MM:SS.SSS] + match = re.search(r'\[(\d{2}-\d{2}\|\d{2}:\d{2}:\d{2}\.\d{3})\]', line) + if match: + return datetime.strptime(match.group(1), '%m-%d|%H:%M:%S.%f') + return None + +def main(log_file): + last_batch = None + highest_batch_in_data_stream = None + start_time = None + end_time = None + tx_count = 0 + + with open(log_file, 'r') as file: + lines = file.readlines() + + if len(lines) < 5: + logging.error("Log file does not contain enough lines.") + return + + # 读取 lastBatch 和 highestBatchInDataStream + last_batch = int(re.search(r'Last batch (\d+)', lines[0]).group(1)) + highest_batch_in_data_stream = int(re.search(r'Highest batch in data stream (\d+)', lines[1]).group(1)) + + # 计算需要读取的批次数量 + batches_to_read = highest_batch_in_data_stream - last_batch + + # 找到 startTime + for i, line in enumerate(lines): + if f"Read {batches_to_read} batches from data stream" in line: + start_time = parse_time(line) + break + + if not start_time: + logging.error("Start time not found.") + return + + # 计算 txCount + for line in lines[i:]: + if "Finish block" in line: + match = re.search(r'Finish block \d+ with (\d+) transactions', line) + if match: + tx_count += int(match.group(1)) + if "Resequencing completed." in line: + end_time = parse_time(line) + break + + if not end_time: + logging.error("End time not found.") + return + + # 计算 TPS + duration = (end_time - start_time).total_seconds() + tps = tx_count / duration if duration > 0 else 0 + + logging.info(f"{'Start Time:':<20} {start_time}") + logging.info(f"{'End Time:':<20} {end_time}") + logging.info(f"{'Total Transactions:':<20} {tx_count}") + logging.info(f"{'Duration:':<20} {duration} seconds") + logging.info(f"{'TPS:':<20} {tps}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Process log file to calculate TPS.') + parser.add_argument('log_file', type=str, help='Path to the log file') + args = parser.parse_args() + main(args.log_file) \ No newline at end of file From 1a4ac07fe17e0dc2071f1c7d46e895246d835c18 Mon Sep 17 00:00:00 2001 From: KyrinCode Date: Thu, 6 Feb 2025 16:55:33 +0800 Subject: [PATCH 3/4] Add specifying toBatchNo for lrp feature by adding a new zkevm.sequencer-resequence-halt-on-batch-number flag to the system, which doesn't interfere with the rest functionalities. --- cmd/utils/flags.go | 5 ++++ eth/ethconfig/config_zkevm.go | 3 ++- test/Makefile | 16 ++++++----- test/docker-compose-lrp.yml | 4 +-- test/process_log.py | 27 +++++++++++++------ turbo/cli/default_flags.go | 2 +- turbo/cli/flags_zkevm.go | 1 + .../stage_sequence_execute_resequence.go | 20 +++++++++++--- 8 files changed, 56 insertions(+), 22 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 82f887e45d8..414dc085317 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -576,6 +576,11 @@ var ( Usage: "When enabled, the sequencer will automatically resequence unseen batches stored in data stream", Value: false, } + SequencerResequenceHaltOnBatchNumber = cli.Uint64Flag{ + Name: "zkevm.sequencer-resequence-halt-on-batch-number", + Usage: "Halt the sequencer on this batch number when resequencing", + Value: 0, + } SequencerResequenceStrict = cli.BoolFlag{ Name: "zkevm.sequencer-resequence-strict", Usage: "Strictly resequence the rolledback batches", diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index f1233d4ce5f..47731184e24 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -43,6 +43,7 @@ type Zk struct { SequencerTimeoutOnEmptyTxPool time.Duration SequencerHaltOnBatchNumber uint64 SequencerResequence bool + SequencerResequenceHaltOnBatchNumber uint64 SequencerResequenceStrict bool SequencerResequenceReuseL1InfoIndex bool ExecutorUrls []string @@ -90,7 +91,7 @@ type Zk struct { // For X Layer XLayer XLayerConfig - + InitialBatchCfgFile string ACLPrintHistory int InfoTreeUpdateInterval time.Duration diff --git a/test/Makefile b/test/Makefile index a2a23030b40..3e2037b12c8 100644 --- a/test/Makefile +++ b/test/Makefile @@ -162,12 +162,15 @@ lrp-run: build-docker $(RUN_DOCKER_LRP_L1_NETWORK) $(RUN_DOCKER_LRP_STATELESS_EXECUTOR) sleep 3 - @read -p "Enter unwind to batch number: " UNWIND_BATCH_NO; \ - if [ "$(shell uname)" = "Darwin" ]; then \ - sed -i '' "s/--unwind-batch-no=[0-9]*/--unwind-batch-no=$$UNWIND_BATCH_NO/" docker-compose-lrp.yml; \ - else \ - sed -i "s/--unwind-batch-no=[0-9]*/--unwind-batch-no=$$UNWIND_BATCH_NO/" docker-compose-lrp.yml; \ - fi + @read -p "Enter unwind batch number: " UNWIND_BATCH_NO; \ + read -p "Enter halt on batch number: " HALT_BATCH_NO; \ + if [ "$(shell uname)" = "Darwin" ]; then \ + sed -i '' "s/--unwind-batch-no=[0-9]*/--unwind-batch-no=$$UNWIND_BATCH_NO/" docker-compose-lrp.yml; \ + sed -i '' "s/--zkevm.sequencer-resequence-halt-on-batch-number=[0-9]*/--zkevm.sequencer-resequence-halt-on-batch-number=$$HALT_BATCH_NO/" docker-compose-lrp.yml; \ + else \ + sed -i "s/--unwind-batch-no=[0-9]*/--unwind-batch-no=$$UNWIND_BATCH_NO/" docker-compose-lrp.yml; \ + sed -i "s/--zkevm.sequencer-resequence-halt-on-batch-number=[0-9]*/--zkevm.sequencer-resequence-halt-on-batch-number=$$HALT_BATCH_NO/" docker-compose-lrp.yml; \ + fi $(RUN_DOCKER_LRP_UNWIND) sleep 20 $(RUN_DOCKER_LRP_RESEQUENCE) @@ -176,7 +179,6 @@ lrp-run: build-docker lrp-stop: $(STOP_LRP) - TARGET_LOG_FILE := filtered.log .PHONY: lrp-check diff --git a/test/docker-compose-lrp.yml b/test/docker-compose-lrp.yml index a078e287ea4..612c7711b20 100644 --- a/test/docker-compose-lrp.yml +++ b/test/docker-compose-lrp.yml @@ -57,7 +57,7 @@ services: --config=/usr/src/app/config.yaml \ --chain dynamic-mynetwork \ --datadir /home/erigon/data/ \ - --unwind-batch-no=10 + --unwind-batch-no=1200 xlayer-lrp-resequence: container_name: xlayer-lrp-resequence @@ -76,7 +76,7 @@ services: - ./config/dynamic-mynetwork-chainspec.json:/usr/src/app/dynamic-mynetwork-chainspec.json - ./config/dynamic-mynetwork-conf.json:/usr/src/app/dynamic-mynetwork-conf.json command: > - cdk-erigon --zkevm.l1-cache-enabled=true --zkevm.sequencer-resequence=true --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml + cdk-erigon --zkevm.l1-cache-enabled=true --zkevm.sequencer-resequence=true --zkevm.sequencer-resequence-halt-on-batch-number=1250 --http.vhosts=* --http.corsdomain=* --ws --config=/usr/src/app/config.yaml xlayer-executor: container_name: xlayer-executor diff --git a/test/process_log.py b/test/process_log.py index 61a764a691f..fcf4cc2bf95 100644 --- a/test/process_log.py +++ b/test/process_log.py @@ -10,7 +10,9 @@ def parse_time(line): # 假设时间戳在行的开头,格式为 [MM-DD|HH:MM:SS.SSS] match = re.search(r'\[(\d{2}-\d{2}\|\d{2}:\d{2}:\d{2}\.\d{3})\]', line) if match: - return datetime.strptime(match.group(1), '%m-%d|%H:%M:%S.%f') + current_year = datetime.now().year + parsed_time = datetime.strptime(match.group(1), '%m-%d|%H:%M:%S.%f') + return parsed_time.replace(year=current_year) return None def main(log_file): @@ -29,10 +31,16 @@ def main(log_file): # 读取 lastBatch 和 highestBatchInDataStream last_batch = int(re.search(r'Last batch (\d+)', lines[0]).group(1)) - highest_batch_in_data_stream = int(re.search(r'Highest batch in data stream (\d+)', lines[1]).group(1)) + # highest_batch_in_data_stream = int(re.search(r'highest batch in datastream (\d+)', lines[0]).group(1)) + halt_batch = int(re.search(r'resequencing from batch \d+ to (\d+)', lines[0]).group(1)) # 计算需要读取的批次数量 - batches_to_read = highest_batch_in_data_stream - last_batch + batches_to_read = halt_batch - last_batch + + # 计算启动时间 + first_line_time = parse_time(lines[0]) + second_line_time = parse_time(lines[1]) + startup_duration = (second_line_time - first_line_time).total_seconds() # 找到 startTime for i, line in enumerate(lines): @@ -62,11 +70,14 @@ def main(log_file): duration = (end_time - start_time).total_seconds() tps = tx_count / duration if duration > 0 else 0 - logging.info(f"{'Start Time:':<20} {start_time}") - logging.info(f"{'End Time:':<20} {end_time}") - logging.info(f"{'Total Transactions:':<20} {tx_count}") - logging.info(f"{'Duration:':<20} {duration} seconds") - logging.info(f"{'TPS:':<20} {tps}") + logging.info(f"{'From Batch:':<25} {last_batch+1}") + logging.info(f"{'To Batch:':<25} {halt_batch}") + logging.info(f"{'Startup Duration:':<25} {startup_duration} seconds") + logging.info(f"{'Start Time:':<25} {start_time}") + logging.info(f"{'End Time:':<25} {end_time}") + logging.info(f"{'Total Transactions:':<25} {tx_count}") + logging.info(f"{'Re-sequencing Duration:':<25} {duration} seconds") + logging.info(f"{'TPS:':<25} {tps}") if __name__ == "__main__": parser = argparse.ArgumentParser(description='Process log file to calculate TPS.') diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index e3b06026a38..bfec3c6de40 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -211,6 +211,7 @@ var DefaultFlags = []cli.Flag{ &utils.SequencerTimeoutOnEmptyTxPool, &utils.SequencerHaltOnBatchNumber, &utils.SequencerResequence, + &utils.SequencerResequenceHaltOnBatchNumber, &utils.SequencerResequenceStrict, &utils.SequencerResequenceReuseL1InfoIndex, &utils.ExecutorUrls, @@ -287,7 +288,6 @@ var DefaultFlags = []cli.Flag{ &utils.BadBatches, &utils.InitialBatchCfgFile, - // X Layer Flags &utils.AllowInternalTransactions, &utils.TxPoolPackBatchSpecialList, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 20c09205fac..6fb8474c2d7 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -186,6 +186,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { SequencerTimeoutOnEmptyTxPool: sequencerTimeoutOnEmptyTxPool, SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name), SequencerResequence: ctx.Bool(utils.SequencerResequence.Name), + SequencerResequenceHaltOnBatchNumber: ctx.Uint64(utils.SequencerResequenceHaltOnBatchNumber.Name), SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name), SequencerResequenceReuseL1InfoIndex: ctx.Bool(utils.SequencerResequenceReuseL1InfoIndex.Name), ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","), diff --git a/zk/stages/stage_sequence_execute_resequence.go b/zk/stages/stage_sequence_execute_resequence.go index d7fa2e18ab7..0212a57d64b 100644 --- a/zk/stages/stage_sequence_execute_resequence.go +++ b/zk/stages/stage_sequence_execute_resequence.go @@ -21,18 +21,32 @@ func resequence( panic(fmt.Sprintf("[%s] The node need re-sequencing but this option is disabled.", s.LogPrefix())) } - log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDs)) + haltBatch := uint64(0) + if cfg.zk.SequencerResequenceHaltOnBatchNumber > 0 { + haltBatch = cfg.zk.SequencerResequenceHaltOnBatchNumber + if haltBatch <= lastBatch { + panic(fmt.Sprintf("[%s] The zkevm.sequencer-resequence-halt-on-batch-number is set lower than the last batch number.", s.LogPrefix())) + } else if haltBatch > highestBatchInDs { + panic(fmt.Sprintf("[%s] The zkevm.sequencer-resequence-halt-on-batch-number is set higher than the highest batch in datastream.", s.LogPrefix())) + } + } else { + haltBatch = highestBatchInDs + } - batches, err := cfg.dataStreamServer.ReadBatches(lastBatch+1, highestBatchInDs) + log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing from batch %d to %d ...", s.LogPrefix(), lastBatch, highestBatchInDs, lastBatch+1, haltBatch)) + + batches, err := cfg.dataStreamServer.ReadBatches(lastBatch+1, haltBatch) if err != nil { return err } + log.Info(fmt.Sprintf("[%s] Read %d batches from data stream", s.LogPrefix(), len(batches))) + if err = cfg.dataStreamServer.UnwindToBatchStart(lastBatch + 1); err != nil { return err } - log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDs)) + log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, haltBatch)) for _, batch := range batches { batchJob := NewResequenceBatchJob(batch) subBatchCount := 0 From 9b63a9e063c65c6d0dce7c799fea5fb652f53e90 Mon Sep 17 00:00:00 2001 From: KyrinCode Date: Fri, 7 Feb 2025 19:32:08 +0800 Subject: [PATCH 4/4] Add independent tools to read batches from datastream and write in local database (mdbx) --- test/batch_db/batch.pb.go | 309 ++++++++++++++++++++++ test/batch_db/batch.proto | 24 ++ test/batch_db/batch_db.go | 241 +++++++++++++++++ test/batch_db/batch_db_test.go | 217 +++++++++++++++ test/batch_db/config.go | 30 +++ test/batch_db/datastream_reader.go | 129 +++++++++ test/batch_db/datastream_reader_test.go | 38 +++ test/batch_db/main.go | 53 ++++ test/batch_db/test.datastream.config.yaml | 6 + 9 files changed, 1047 insertions(+) create mode 100644 test/batch_db/batch.pb.go create mode 100644 test/batch_db/batch.proto create mode 100644 test/batch_db/batch_db.go create mode 100644 test/batch_db/batch_db_test.go create mode 100644 test/batch_db/config.go create mode 100644 test/batch_db/datastream_reader.go create mode 100644 test/batch_db/datastream_reader_test.go create mode 100644 test/batch_db/main.go create mode 100644 test/batch_db/test.datastream.config.yaml diff --git a/test/batch_db/batch.pb.go b/test/batch_db/batch.pb.go new file mode 100644 index 00000000000..d52f398df71 --- /dev/null +++ b/test/batch_db/batch.pb.go @@ -0,0 +1,309 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.5 +// protoc v5.29.3 +// source: batch.proto + +package main + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type BatchData struct { + state protoimpl.MessageState `protogen:"open.v1"` + Blocks []*BatchData_FullL2Block `protobuf:"bytes,1,rep,name=blocks,proto3" json:"blocks,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BatchData) Reset() { + *x = BatchData{} + mi := &file_batch_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BatchData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchData) ProtoMessage() {} + +func (x *BatchData) ProtoReflect() protoreflect.Message { + mi := &file_batch_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchData.ProtoReflect.Descriptor instead. +func (*BatchData) Descriptor() ([]byte, []int) { + return file_batch_proto_rawDescGZIP(), []int{0} +} + +func (x *BatchData) GetBlocks() []*BatchData_FullL2Block { + if x != nil { + return x.Blocks + } + return nil +} + +type BatchData_FullL2Block struct { + state protoimpl.MessageState `protogen:"open.v1"` + BatchNumber uint64 `protobuf:"varint,1,opt,name=batch_number,json=batchNumber,proto3" json:"batch_number,omitempty"` + L2BlockNumber uint64 `protobuf:"varint,2,opt,name=l2_block_number,json=l2BlockNumber,proto3" json:"l2_block_number,omitempty"` + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + DeltaTimestamp uint32 `protobuf:"varint,4,opt,name=delta_timestamp,json=deltaTimestamp,proto3" json:"delta_timestamp,omitempty"` + L1InfoTreeIndex uint32 `protobuf:"varint,5,opt,name=l1_info_tree_index,json=l1InfoTreeIndex,proto3" json:"l1_info_tree_index,omitempty"` + GlobalExitRoot []byte `protobuf:"bytes,6,opt,name=global_exit_root,json=globalExitRoot,proto3" json:"global_exit_root,omitempty"` + Coinbase []byte `protobuf:"bytes,7,opt,name=coinbase,proto3" json:"coinbase,omitempty"` + ForkId uint64 `protobuf:"varint,8,opt,name=fork_id,json=forkId,proto3" json:"fork_id,omitempty"` + L1BlockHash []byte `protobuf:"bytes,9,opt,name=l1_block_hash,json=l1BlockHash,proto3" json:"l1_block_hash,omitempty"` + L2BlockHash []byte `protobuf:"bytes,10,opt,name=l2_block_hash,json=l2BlockHash,proto3" json:"l2_block_hash,omitempty"` + ParentHash []byte `protobuf:"bytes,11,opt,name=parent_hash,json=parentHash,proto3" json:"parent_hash,omitempty"` + StateRoot []byte `protobuf:"bytes,12,opt,name=state_root,json=stateRoot,proto3" json:"state_root,omitempty"` + BlockGasLimit uint64 `protobuf:"varint,13,opt,name=block_gas_limit,json=blockGasLimit,proto3" json:"block_gas_limit,omitempty"` + BlockInfoRoot []byte `protobuf:"bytes,14,opt,name=block_info_root,json=blockInfoRoot,proto3" json:"block_info_root,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BatchData_FullL2Block) Reset() { + *x = BatchData_FullL2Block{} + mi := &file_batch_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BatchData_FullL2Block) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchData_FullL2Block) ProtoMessage() {} + +func (x *BatchData_FullL2Block) ProtoReflect() protoreflect.Message { + mi := &file_batch_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchData_FullL2Block.ProtoReflect.Descriptor instead. +func (*BatchData_FullL2Block) Descriptor() ([]byte, []int) { + return file_batch_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *BatchData_FullL2Block) GetBatchNumber() uint64 { + if x != nil { + return x.BatchNumber + } + return 0 +} + +func (x *BatchData_FullL2Block) GetL2BlockNumber() uint64 { + if x != nil { + return x.L2BlockNumber + } + return 0 +} + +func (x *BatchData_FullL2Block) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *BatchData_FullL2Block) GetDeltaTimestamp() uint32 { + if x != nil { + return x.DeltaTimestamp + } + return 0 +} + +func (x *BatchData_FullL2Block) GetL1InfoTreeIndex() uint32 { + if x != nil { + return x.L1InfoTreeIndex + } + return 0 +} + +func (x *BatchData_FullL2Block) GetGlobalExitRoot() []byte { + if x != nil { + return x.GlobalExitRoot + } + return nil +} + +func (x *BatchData_FullL2Block) GetCoinbase() []byte { + if x != nil { + return x.Coinbase + } + return nil +} + +func (x *BatchData_FullL2Block) GetForkId() uint64 { + if x != nil { + return x.ForkId + } + return 0 +} + +func (x *BatchData_FullL2Block) GetL1BlockHash() []byte { + if x != nil { + return x.L1BlockHash + } + return nil +} + +func (x *BatchData_FullL2Block) GetL2BlockHash() []byte { + if x != nil { + return x.L2BlockHash + } + return nil +} + +func (x *BatchData_FullL2Block) GetParentHash() []byte { + if x != nil { + return x.ParentHash + } + return nil +} + +func (x *BatchData_FullL2Block) GetStateRoot() []byte { + if x != nil { + return x.StateRoot + } + return nil +} + +func (x *BatchData_FullL2Block) GetBlockGasLimit() uint64 { + if x != nil { + return x.BlockGasLimit + } + return 0 +} + +func (x *BatchData_FullL2Block) GetBlockInfoRoot() []byte { + if x != nil { + return x.BlockInfoRoot + } + return nil +} + +var File_batch_proto protoreflect.FileDescriptor + +var file_batch_proto_rawDesc = string([]byte{ + 0x0a, 0x0b, 0x62, 0x61, 0x74, 0x63, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x62, + 0x61, 0x74, 0x63, 0x68, 0x5f, 0x64, 0x62, 0x22, 0xca, 0x04, 0x0a, 0x09, 0x42, 0x61, 0x74, 0x63, + 0x68, 0x44, 0x61, 0x74, 0x61, 0x12, 0x37, 0x0a, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x64, 0x62, + 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x4c, + 0x32, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x1a, 0x83, + 0x04, 0x0a, 0x0b, 0x46, 0x75, 0x6c, 0x6c, 0x4c, 0x32, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x21, + 0x0a, 0x0c, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x62, 0x61, 0x74, 0x63, 0x68, 0x4e, 0x75, 0x6d, 0x62, 0x65, + 0x72, 0x12, 0x26, 0x0a, 0x0f, 0x6c, 0x32, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x75, + 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x6c, 0x32, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x27, 0x0a, 0x0f, 0x64, 0x65, 0x6c, 0x74, 0x61, + 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x0e, 0x64, 0x65, 0x6c, 0x74, 0x61, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x12, 0x2b, 0x0a, 0x12, 0x6c, 0x31, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x74, 0x72, 0x65, 0x65, + 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6c, 0x31, + 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x72, 0x65, 0x65, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x28, 0x0a, + 0x10, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x72, 0x6f, 0x6f, + 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x45, + 0x78, 0x69, 0x74, 0x52, 0x6f, 0x6f, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6f, 0x69, 0x6e, 0x62, + 0x61, 0x73, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x63, 0x6f, 0x69, 0x6e, 0x62, + 0x61, 0x73, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6f, 0x72, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x08, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x66, 0x6f, 0x72, 0x6b, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, + 0x6c, 0x31, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x09, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x6c, 0x31, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, + 0x12, 0x22, 0x0a, 0x0d, 0x6c, 0x32, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x68, 0x61, 0x73, + 0x68, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x6c, 0x32, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x48, 0x61, 0x73, 0x68, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x68, + 0x61, 0x73, 0x68, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x65, 0x6e, + 0x74, 0x48, 0x61, 0x73, 0x68, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x5f, 0x72, + 0x6f, 0x6f, 0x74, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x74, 0x61, 0x74, 0x65, + 0x52, 0x6f, 0x6f, 0x74, 0x12, 0x26, 0x0a, 0x0f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x67, 0x61, + 0x73, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x62, + 0x6c, 0x6f, 0x63, 0x6b, 0x47, 0x61, 0x73, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x26, 0x0a, 0x0f, + 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, + 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, + 0x52, 0x6f, 0x6f, 0x74, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, 0x2f, 0x3b, 0x62, 0x61, 0x74, 0x63, 0x68, + 0x5f, 0x64, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +}) + +var ( + file_batch_proto_rawDescOnce sync.Once + file_batch_proto_rawDescData []byte +) + +func file_batch_proto_rawDescGZIP() []byte { + file_batch_proto_rawDescOnce.Do(func() { + file_batch_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_batch_proto_rawDesc), len(file_batch_proto_rawDesc))) + }) + return file_batch_proto_rawDescData +} + +var file_batch_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_batch_proto_goTypes = []any{ + (*BatchData)(nil), // 0: batch_db.BatchData + (*BatchData_FullL2Block)(nil), // 1: batch_db.BatchData.FullL2Block +} +var file_batch_proto_depIdxs = []int32{ + 1, // 0: batch_db.BatchData.blocks:type_name -> batch_db.BatchData.FullL2Block + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_batch_proto_init() } +func file_batch_proto_init() { + if File_batch_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_batch_proto_rawDesc), len(file_batch_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_batch_proto_goTypes, + DependencyIndexes: file_batch_proto_depIdxs, + MessageInfos: file_batch_proto_msgTypes, + }.Build() + File_batch_proto = out.File + file_batch_proto_goTypes = nil + file_batch_proto_depIdxs = nil +} diff --git a/test/batch_db/batch.proto b/test/batch_db/batch.proto new file mode 100644 index 00000000000..0100dc378f1 --- /dev/null +++ b/test/batch_db/batch.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; +package main; + +option go_package = "./;main"; + +message BatchData { + message FullL2Block { + uint64 batch_number = 1; + uint64 l2_block_number = 2; + int64 timestamp = 3; + uint32 delta_timestamp = 4; + uint32 l1_info_tree_index = 5; + bytes global_exit_root = 6; + bytes coinbase = 7; + uint64 fork_id = 8; + bytes l1_block_hash = 9; + bytes l2_block_hash = 10; + bytes parent_hash = 11; + bytes state_root = 12; + uint64 block_gas_limit = 13; + bytes block_info_root = 14; + } + repeated FullL2Block blocks = 1; +} \ No newline at end of file diff --git a/test/batch_db/batch_db.go b/test/batch_db/batch_db.go new file mode 100644 index 00000000000..311b40c694c --- /dev/null +++ b/test/batch_db/batch_db.go @@ -0,0 +1,241 @@ +package main + +import ( + "encoding/binary" + "fmt" + + "github.com/erigontech/mdbx-go/mdbx" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/zk/datastream/types" + "google.golang.org/protobuf/proto" +) + +const ( + BatchDBName = "batches" +) + +type BatchDB struct { + env *mdbx.Env + path string + dbi mdbx.DBI +} + +func NewBatchDB(path string) (*BatchDB, error) { + env, err := mdbx.NewEnv() + if err != nil { + return nil, fmt.Errorf("create env: %w", err) + } + + // 设置最大数据库数量为 1 + if err := env.SetOption(mdbx.OptMaxDB, 1); err != nil { + return nil, fmt.Errorf("set max dbs: %w", err) + } + + flags := uint(mdbx.NoTLS | mdbx.NoReadahead | mdbx.WriteMap) + // 打开/创建数据库文件 + if err := env.Open(path, flags, 0644); err != nil { + return nil, fmt.Errorf("open: %w", err) + } + + // 初始化数据库 + txn, err := env.BeginTxn(nil, 0) + if err != nil { + return nil, fmt.Errorf("begin txn: %w", err) + } + defer txn.Abort() + + // 打开/创建数据库 + dbi, err := txn.OpenDBI(BatchDBName, mdbx.Create, nil, nil) + if err != nil { + return nil, fmt.Errorf("open dbi: %w", err) + } + + if _, err := txn.Commit(); err != nil { + return nil, fmt.Errorf("commit: %w", err) + } + + return &BatchDB{ + env: env, + path: path, + dbi: dbi, + }, nil +} + +func (db *BatchDB) Close() error { + db.env.CloseDBI(db.dbi) + db.env.Close() + return nil +} + +func convertToProtoBlock(block *types.FullL2Block) *BatchData_FullL2Block { + return &BatchData_FullL2Block{ + BatchNumber: block.BatchNumber, + L2BlockNumber: block.L2BlockNumber, + Timestamp: block.Timestamp, + DeltaTimestamp: block.DeltaTimestamp, + L1InfoTreeIndex: block.L1InfoTreeIndex, + GlobalExitRoot: block.GlobalExitRoot.Bytes(), + Coinbase: block.Coinbase.Bytes(), + ForkId: block.ForkId, + L1BlockHash: block.L1BlockHash.Bytes(), + L2BlockHash: block.L2Blockhash.Bytes(), + ParentHash: block.ParentHash.Bytes(), + StateRoot: block.StateRoot.Bytes(), + BlockGasLimit: block.BlockGasLimit, + BlockInfoRoot: block.BlockInfoRoot.Bytes(), + } +} + +func convertFromProtoBlock(block *BatchData_FullL2Block) *types.FullL2Block { + return &types.FullL2Block{ + BatchNumber: block.BatchNumber, + L2BlockNumber: block.L2BlockNumber, + Timestamp: block.Timestamp, + DeltaTimestamp: block.DeltaTimestamp, + L1InfoTreeIndex: block.L1InfoTreeIndex, + GlobalExitRoot: libcommon.BytesToHash(block.GlobalExitRoot), + Coinbase: libcommon.BytesToAddress(block.Coinbase), + ForkId: block.ForkId, + L1BlockHash: libcommon.BytesToHash(block.L1BlockHash), + L2Blockhash: libcommon.BytesToHash(block.L2BlockHash), + ParentHash: libcommon.BytesToHash(block.ParentHash), + StateRoot: libcommon.BytesToHash(block.StateRoot), + BlockGasLimit: block.BlockGasLimit, + BlockInfoRoot: libcommon.BytesToHash(block.BlockInfoRoot), + } +} + +func (db *BatchDB) StoreBatch(batch []*types.FullL2Block) error { + if len(batch) == 0 { + return nil + } + + protoBlocks := make([]*BatchData_FullL2Block, len(batch)) + for i, block := range batch { + protoBlocks[i] = convertToProtoBlock(block) + } + + batchData := &BatchData{ + Blocks: protoBlocks, + } + + data, err := proto.Marshal(batchData) + if err != nil { + return fmt.Errorf("marshal batch %d: %w", batch[0].BatchNumber, err) + } + + return db.env.Update(func(txn *mdbx.Txn) error { + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, batch[0].BatchNumber) + return txn.Put(db.dbi, key, data, 0) + }) +} + +func (db *BatchDB) StoreBatches(batches [][]*types.FullL2Block) error { + if len(batches) == 0 { + return nil + } + + // 使用事务批量写入 + return db.env.Update(func(txn *mdbx.Txn) error { + for _, batch := range batches { + if len(batch) == 0 { + continue + } + + // 转换为 proto 格式 + protoBlocks := make([]*BatchData_FullL2Block, len(batch)) + for i, block := range batch { + protoBlocks[i] = convertToProtoBlock(block) + } + + batchData := &BatchData{ + Blocks: protoBlocks, + } + + // 序列化数据 + data, err := proto.Marshal(batchData) + if err != nil { + return fmt.Errorf("marshal batch %d: %w", batch[0].BatchNumber, err) + } + + // 存储到数据库 + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, batch[0].BatchNumber) + if err := txn.Put(db.dbi, key, data, 0); err != nil { + return fmt.Errorf("store batch %d: %w", batch[0].BatchNumber, err) + } + } + return nil + }) +} + +func (db *BatchDB) GetBatch(batchNumber uint64) ([]*types.FullL2Block, error) { + var data []byte + err := db.env.View(func(txn *mdbx.Txn) error { + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, batchNumber) + + val, err := txn.Get(db.dbi, key) + if err != nil { + return err + } + data = val + return nil + }) + if err != nil { + return nil, err + } + + var batchData BatchData + if err := proto.Unmarshal(data, &batchData); err != nil { + return nil, fmt.Errorf("unmarshal batch %d: %w", batchNumber, err) + } + + result := make([]*types.FullL2Block, len(batchData.Blocks)) + for i, block := range batchData.Blocks { + result[i] = convertFromProtoBlock(block) + } + + return result, nil +} + +func (db *BatchDB) GetBatchRange(fromBatch, toBatch uint64) ([][]*types.FullL2Block, error) { + var result [][]*types.FullL2Block + + err := db.env.View(func(txn *mdbx.Txn) error { + cursor, err := txn.OpenCursor(db.dbi) + if err != nil { + return err + } + defer cursor.Close() + + startKey := make([]byte, 8) + binary.BigEndian.PutUint64(startKey, fromBatch) + + for key, val, err := cursor.Get(startKey, nil, mdbx.SetRange); err == nil; key, val, err = cursor.Get(nil, nil, mdbx.Next) { + if key == nil { + break + } + + batchNum := binary.BigEndian.Uint64(key) + if batchNum > toBatch { + break + } + + var batchData BatchData + if err := proto.Unmarshal(val, &batchData); err != nil { + return fmt.Errorf("unmarshal batch %d: %w", batchNum, err) + } + + blocks := make([]*types.FullL2Block, len(batchData.Blocks)) + for i, block := range batchData.Blocks { + blocks[i] = convertFromProtoBlock(block) + } + result = append(result, blocks) + } + return nil + }) + + return result, err +} diff --git a/test/batch_db/batch_db_test.go b/test/batch_db/batch_db_test.go new file mode 100644 index 00000000000..6d7b59d4971 --- /dev/null +++ b/test/batch_db/batch_db_test.go @@ -0,0 +1,217 @@ +package main + +import ( + "os" + "path/filepath" + "testing" + + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/zk/datastream/types" + "github.com/stretchr/testify/assert" +) + +func createTestBlock(batchNum uint64, blockNum uint64) *types.FullL2Block { + return &types.FullL2Block{ + BatchNumber: batchNum, + L2BlockNumber: blockNum, + Timestamp: 1234567890, + DeltaTimestamp: 10, + L1InfoTreeIndex: 1, + GlobalExitRoot: libcommon.HexToHash("0x1234"), + Coinbase: libcommon.HexToAddress("0x5678"), + ForkId: 1, + L1BlockHash: libcommon.HexToHash("0x9abc"), + L2Blockhash: libcommon.HexToHash("0xdef0"), + ParentHash: libcommon.HexToHash("0x1111"), + StateRoot: libcommon.HexToHash("0x2222"), + BlockGasLimit: 1000000, + BlockInfoRoot: libcommon.HexToHash("0x3333"), + } +} + +func TestBatchDB(t *testing.T) { + // 创建临时目录用于测试 + tmpDir, err := os.MkdirTemp("", "batch_db_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + dbPath := filepath.Join(tmpDir, "test.mdbx") + + // 测试创建数据库 + db, err := NewBatchDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // 测试存储批次 + batch1 := []*types.FullL2Block{ + createTestBlock(1, 1), + createTestBlock(1, 2), + } + batch2 := []*types.FullL2Block{ + createTestBlock(2, 3), + createTestBlock(2, 4), + } + + err = db.StoreBatch(batch1) + assert.NoError(t, err) + err = db.StoreBatch(batch2) + assert.NoError(t, err) + + // 测试获取单个批次 + blocks, err := db.GetBatch(1) + assert.NoError(t, err) + assert.Equal(t, 2, len(blocks)) + assert.Equal(t, uint64(1), blocks[0].BatchNumber) + assert.Equal(t, uint64(1), blocks[0].L2BlockNumber) + assert.Equal(t, uint64(2), blocks[1].L2BlockNumber) + + // 测试获取批次范围 + batches, err := db.GetBatchRange(1, 2) + assert.NoError(t, err) + assert.Equal(t, 2, len(batches)) + assert.Equal(t, 2, len(batches[0])) + assert.Equal(t, 2, len(batches[1])) + assert.Equal(t, uint64(1), batches[0][0].BatchNumber) + assert.Equal(t, uint64(2), batches[1][0].BatchNumber) + + // 测试空批次 + err = db.StoreBatch(nil) + assert.NoError(t, err) + + // 测试不存在的批次 + blocks, err = db.GetBatch(999) + assert.Error(t, err) + + // 测试范围查询边界情况 + batches, err = db.GetBatchRange(999, 1000) + assert.NoError(t, err) + assert.Equal(t, 0, len(batches)) +} + +func TestBatchDB_StoreBatches(t *testing.T) { + // 创建临时目录用于测试 + tmpDir, err := os.MkdirTemp("", "batch_db_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + dbPath := filepath.Join(tmpDir, "test.mdbx") + + // 创建数据库实例 + db, err := NewBatchDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + tests := []struct { + name string + batches [][]*types.FullL2Block + wantErr bool + }{ + { + name: "空批次列表", + batches: nil, + wantErr: false, + }, + { + name: "包含空批次", + batches: [][]*types.FullL2Block{ + {}, + { + createTestBlock(1, 1), + }, + {}, + }, + wantErr: false, + }, + { + name: "多个正常批次", + batches: [][]*types.FullL2Block{ + { + createTestBlock(1, 1), + createTestBlock(1, 2), + }, + { + createTestBlock(2, 3), + createTestBlock(2, 4), + }, + }, + wantErr: false, + }, + { + name: "批次号连续性测试", + batches: [][]*types.FullL2Block{ + {createTestBlock(5, 1)}, + {createTestBlock(6, 2)}, + {createTestBlock(7, 3)}, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // 存储批次 + err := db.StoreBatches(tt.batches) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + // 验证存储结果 + if tt.batches != nil { + for _, batch := range tt.batches { + if len(batch) == 0 { + continue + } + + // 读取并验证每个批次 + batchNum := batch[0].BatchNumber + stored, err := db.GetBatch(batchNum) + assert.NoError(t, err) + assert.Equal(t, len(batch), len(stored)) + + // 验证每个区块的内容 + for i, block := range batch { + assert.Equal(t, block.BatchNumber, stored[i].BatchNumber) + assert.Equal(t, block.L2BlockNumber, stored[i].L2BlockNumber) + assert.Equal(t, block.Timestamp, stored[i].Timestamp) + assert.Equal(t, block.GlobalExitRoot, stored[i].GlobalExitRoot) + assert.Equal(t, block.Coinbase, stored[i].Coinbase) + assert.Equal(t, block.L1BlockHash, stored[i].L1BlockHash) + assert.Equal(t, block.L2Blockhash, stored[i].L2Blockhash) + assert.Equal(t, block.StateRoot, stored[i].StateRoot) + } + } + } + + // 验证范围查询 + if tt.batches != nil && len(tt.batches) > 0 { + var minBatch, maxBatch uint64 = ^uint64(0), 0 + for _, batch := range tt.batches { + if len(batch) > 0 { + batchNum := batch[0].BatchNumber + if batchNum < minBatch { + minBatch = batchNum + } + if batchNum > maxBatch { + maxBatch = batchNum + } + } + } + if minBatch != ^uint64(0) { + batches, err := db.GetBatchRange(minBatch, maxBatch) + assert.NoError(t, err) + assert.NotEmpty(t, batches) + } + } + }) + } +} diff --git a/test/batch_db/config.go b/test/batch_db/config.go new file mode 100644 index 00000000000..757b781675f --- /dev/null +++ b/test/batch_db/config.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +type Config struct { + DataStreamPort uint16 `yaml:"zkevm.data-stream-port"` + DatastreamVersion uint8 `yaml:"zkevm.datastream-version"` + L2ChainID uint64 `yaml:"zkevm.l2-chain-id"` + DatastreamFile string `yaml:"zkevm.datastream-file"` + BatchDBPath string `yaml:"zkevm.batch-db-path"` +} + +func LoadConfig(filename string) (*Config, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("error reading config file: %v", err) + } + + var config Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("error parsing config file: %v", err) + } + + return &config, nil +} diff --git a/test/batch_db/datastream_reader.go b/test/batch_db/datastream_reader.go new file mode 100644 index 00000000000..d42185b244f --- /dev/null +++ b/test/batch_db/datastream_reader.go @@ -0,0 +1,129 @@ +package main + +import ( + "fmt" + "time" + + "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" + "github.com/0xPolygonHermez/zkevm-data-streamer/log" + "github.com/ledgerwatch/erigon/zk/datastream/server" + "github.com/ledgerwatch/erigon/zk/datastream/types" +) + +func PrintBlockInfo(block *types.FullL2Block) { + fmt.Printf(" Block %d:\n", block.L2BlockNumber) + fmt.Printf(" BatchNumber: %d\n", block.BatchNumber) + fmt.Printf(" L2BlockNumber: %d\n", block.L2BlockNumber) + fmt.Printf(" Timestamp: %d\n", block.Timestamp) + fmt.Printf(" DeltaTimestamp: %d\n", block.DeltaTimestamp) + fmt.Printf(" L1InfoTreeIndex: %d\n", block.L1InfoTreeIndex) + fmt.Printf(" GlobalExitRoot: %s\n", block.GlobalExitRoot) + fmt.Printf(" Coinbase: %s\n", block.Coinbase) + fmt.Printf(" ForkId: %d\n", block.ForkId) + fmt.Printf(" L1BlockHash: %s\n", block.L1BlockHash) + fmt.Printf(" L2BlockHash: %s\n", block.L2Blockhash) + fmt.Printf(" ParentHash: %s\n", block.ParentHash) + fmt.Printf(" StateRoot: %s\n", block.StateRoot) + fmt.Printf(" BlockGasLimit: %d\n", block.BlockGasLimit) + fmt.Printf(" BlockInfoRoot: %s\n", block.BlockInfoRoot) + fmt.Printf(" Transactions count: %d\n", len(block.L2Txs)) + fmt.Println() +} + +func PrintBatchInfo(batch []*types.FullL2Block) { + if len(batch) == 0 { + return + } + fmt.Printf("Batch number %d:\n", batch[0].BatchNumber) + for _, block := range batch { + PrintBlockInfo(block) + } + fmt.Println("------------------------") +} + +func PrintBatchesInfo(batches [][]*types.FullL2Block) { + for _, batch := range batches { + PrintBatchInfo(batch) + } +} + +// 将函数名改为大写开头 +func ReadDataStreamBatches(config *Config, fromBatch, toBatch uint64) ([][]*types.FullL2Block, error) { + // 函数内部逻辑保持不变 + // Use hardcoded timeout values + writeTimeout := 20 * time.Second + inactivityTimeout := 10 * time.Minute + inactivityCheckInterval := 5 * time.Minute + + // 设置日志配置 + logConfig := &log.Config{ + Environment: "production", + Level: "warn", + Outputs: nil, + } + + // 创建 stream server factory + factory := server.NewZkEVMDataStreamServerFactory() + + // 创建 stream server + streamServer, err := factory.CreateStreamServer( + config.DataStreamPort, + config.DatastreamVersion, + 1, + datastreamer.StreamType(1), + config.DatastreamFile, + writeTimeout, + inactivityTimeout, + inactivityCheckInterval, + logConfig, + ) + if err != nil { + return nil, fmt.Errorf("failed to create stream server: %v", err) + } + + fmt.Printf("Successfully created stream server with file: %s\n", config.DatastreamFile) + + // 创建 data stream server + dataStreamServer := factory.CreateDataStreamServer(streamServer, config.L2ChainID) + + // 获取数据流中的最高批次号 + highestBatchInDs, err := dataStreamServer.GetHighestBatchNumber() + if err != nil { + return nil, fmt.Errorf("failed to get highest batch number: %v", err) + } + + // 确保 toBatch 不超过最高批次号 + if toBatch > highestBatchInDs { + fmt.Printf("Adjusting toBatch from %d to %d (highest batch in datastream)\n", toBatch, highestBatchInDs) + toBatch = highestBatchInDs + } + + // 读取 batches + batches, err := dataStreamServer.ReadBatches(fromBatch, toBatch) + if err != nil { + return nil, fmt.Errorf("failed to read batches: %v", err) + } + + return batches, nil +} + +// func main() { +// // Load configuration from YAML file +// configPath := "test.datastream.config.yaml" +// config, err := LoadConfig(configPath) +// if err != nil { +// log.Fatalf("Failed to load config: %v", err) +// } + +// // 读取 batches +// fromBatch := uint64(1) // TODO: 填入起始 batch 号 +// toBatch := uint64(10) // TODO: 填入结束 batch 号 + +// batches, err := readDataStreamBatches(config, fromBatch, toBatch) +// if err != nil { +// log.Fatalf("Failed to read data stream batches: %v", err) +// } + +// // Print batch information +// printBatchesInfo(batches) +// } diff --git a/test/batch_db/datastream_reader_test.go b/test/batch_db/datastream_reader_test.go new file mode 100644 index 00000000000..20171cdb55e --- /dev/null +++ b/test/batch_db/datastream_reader_test.go @@ -0,0 +1,38 @@ +package main + +import ( + "testing" +) + +func TestDataStreamReader(t *testing.T) { + // Load configuration from YAML file + configPath := "test.datastream.config.yaml" + config, err := LoadConfig(configPath) + if err != nil { + t.Fatalf("Failed to load config: %v", err) + } + + // 读取 batches + fromBatch := uint64(1) + toBatch := uint64(10) + + batches, err := ReadDataStreamBatches(config, fromBatch, toBatch) + if err != nil { + t.Fatalf("Failed to read data stream batches: %v", err) + } + + // 验证读取的数据 + if len(batches) == 0 { + t.Error("No batches were read") + } + + // 打印批次信息用于调试 + t.Log("Read batches successfully:") + for i, batch := range batches { + if len(batch) == 0 { + t.Errorf("Batch %d is empty", i) + continue + } + t.Logf("Batch %d contains %d blocks", batch[0].BatchNumber, len(batch)) + } +} diff --git a/test/batch_db/main.go b/test/batch_db/main.go new file mode 100644 index 00000000000..44c3e4059b0 --- /dev/null +++ b/test/batch_db/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + + "github.com/0xPolygonHermez/zkevm-data-streamer/log" +) + +func main() { + // 加载配置文件 + configPath := "test.datastream.config.yaml" // 修改配置文件路径 + config, err := LoadConfig(configPath) + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + // 创建 BatchDB 实例 + db, err := NewBatchDB(config.BatchDBPath) + if err != nil { + log.Fatalf("Failed to create batch db: %v", err) + } + defer db.Close() + + // 读取 datastream 中的 batches + fromBatch := uint64(1) + toBatch := uint64(1000) + + batches, err := ReadDataStreamBatches(config, fromBatch, toBatch) + if err != nil { + log.Fatalf("Failed to read data stream batches: %v", err) + } + + // 存储到数据库 + if err := db.StoreBatches(batches); err != nil { + log.Fatalf("Failed to store batches: %v", err) + } + + fmt.Printf("Successfully stored %d batches to database\n", len(batches)) + + // 打印所有批次的详细信息 + // if len(batches) > 0 { + // fmt.Println("\nAll batches details:") + // PrintBatchesInfo(batches) + // } + + // 验证存储结果 + storedBatches, err := db.GetBatchRange(fromBatch, toBatch) + if err != nil { + log.Fatalf("Failed to verify stored batches: %v", err) + } + + fmt.Printf("Successfully verified %d batches in database\n", len(storedBatches)) +} diff --git a/test/batch_db/test.datastream.config.yaml b/test/batch_db/test.datastream.config.yaml new file mode 100644 index 00000000000..84dc7c3429e --- /dev/null +++ b/test/batch_db/test.datastream.config.yaml @@ -0,0 +1,6 @@ +zkevm.data-stream-port: 6900 +zkevm.datastream-version: 2 +zkevm.l2-chain-id: 195 + +zkevm.datastream-file: "/Users/oker/Documents/xlayer-erigon/test/data/lrp/data-stream" +zkevm.batch-db-path: "/Users/oker/Documents/xlayer-erigon/test/data/lrp/batches.mdbx" \ No newline at end of file