Skip to content

Commit

Permalink
fix: configure ethers batching (#301)
Browse files Browse the repository at this point in the history
# What ❔

Add a configuration for ethers batching. Set the default batching config
in case it's not defined.

## Why ❔

In ethers v6 batching is enabled by default and default values don't
work well in our case. That's why we need a way to configure ethers
batching and decrease the batching throughput by default.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [X] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [X] Tests for the changes have been added / updated.
  • Loading branch information
vasyl-ivanchuk authored Oct 29, 2024
1 parent 8085be9 commit 1388956
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 20 deletions.
6 changes: 3 additions & 3 deletions packages/app/tests/e2e/features/copying.feature
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ Feature: Copying

Examples:
| Page |
| /tx/0xe84dcdaa4e40c9899763c8f55255376fd77f3a588be0fe0afa69f153a0ae3f10 |
| /address/0xed7175341f123f7718aBaCF1702d6980CFc08784 |
| /tx/0xe239f4cc4ddbaad475d0ef3e23114a89387864bcde5da5b5ca4d2c140bfc4bc4 |
| /tx/0xe7286a5fca5b1f5eb19ab5e63752de713059a042efc4cd3725213051951a77b0 |
| /address/0x000000000000000000000000000000000000800A |
| /tx/0xf47aaa3fde4cec0015cad3a38a46e047e667f81753ecf8642a0e60a5901eb00f |
| /block/1 |
| /address/0x574343B3d1544477f2C4dF38c2Ef720Ab33e782b |

Expand Down
4 changes: 4 additions & 0 deletions packages/data-fetcher/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ RPC_CALLS_RETRIES_MAX_TOTAL_TIMEOUT=120000
RPC_CALLS_CONNECTION_TIMEOUT=60000
RPC_CALLS_CONNECTION_QUICK_TIMEOUT=10000

RPC_BATCH_MAX_COUNT=10
RPC_BATCH_MAX_SIZE_BYTES=1048576
RPC_BATCH_STALL_TIME_MS=0

MAX_BLOCKS_BATCH_SIZE=20
74 changes: 67 additions & 7 deletions packages/data-fetcher/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@ import config from "./config";

describe("config", () => {
const env = process.env;
let defaultConfig;

beforeAll(() => {
process.env = {};
});

afterAll(() => {
process.env = env;
});

it("sets default values", () => {
expect(config()).toEqual({
defaultConfig = {
port: 3040,
blockchain: {
rpcUrl: "http://localhost:3050",
Expand All @@ -21,9 +16,74 @@ describe("config", () => {
rpcCallRetriesMaxTotalTimeout: 120000,
rpcCallConnectionTimeout: 60000,
rpcCallConnectionQuickTimeout: 10000,
rpcBatchMaxCount: 10,
rpcBatchMaxSizeBytes: 1048576,
rpcBatchStallTimeMs: 0,
},
maxBlocksBatchSize: 20,
gracefulShutdownTimeoutMs: 0,
};
});

afterAll(() => {
process.env = env;
});

it("sets default values", () => {
expect(config()).toEqual(defaultConfig);
});

describe("when RPC_BATCH_MAX_COUNT is specified in env vars", () => {
beforeEach(() => {
process.env = {
RPC_BATCH_MAX_COUNT: "40",
};
});

it("sets rpcBatchMaxCount to the value from env vars", () => {
expect(config()).toEqual({
...defaultConfig,
blockchain: {
...defaultConfig.blockchain,
rpcBatchMaxCount: 40,
},
});
});
});

describe("when RPC_BATCH_MAX_SIZE_BYTES is specified in env vars", () => {
beforeEach(() => {
process.env = {
RPC_BATCH_MAX_SIZE_BYTES: "50",
};
});

it("sets rpcBatchMaxSizeBytes to the value from env vars", () => {
expect(config()).toEqual({
...defaultConfig,
blockchain: {
...defaultConfig.blockchain,
rpcBatchMaxSizeBytes: 50,
},
});
});
});

describe("when RPC_BATCH_STALL_TIME_MS is specified in env vars", () => {
beforeEach(() => {
process.env = {
RPC_BATCH_STALL_TIME_MS: "7",
};
});

it("sets rpcBatchMaxSizeBytes to the value from env vars", () => {
expect(config()).toEqual({
...defaultConfig,
blockchain: {
...defaultConfig.blockchain,
rpcBatchStallTimeMs: 7,
},
});
});
});
});
14 changes: 14 additions & 0 deletions packages/data-fetcher/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ export default () => {
RPC_CALLS_RETRIES_MAX_TOTAL_TIMEOUT,
RPC_CALLS_CONNECTION_TIMEOUT,
RPC_CALLS_CONNECTION_QUICK_TIMEOUT,
RPC_BATCH_MAX_SIZE_BYTES,
RPC_BATCH_MAX_COUNT,
RPC_BATCH_STALL_TIME_MS,
MAX_BLOCKS_BATCH_SIZE,
GRACEFUL_SHUTDOWN_TIMEOUT_MS,
} = process.env;
Expand All @@ -25,6 +28,17 @@ export default () => {

rpcCallConnectionTimeout: parseInt(RPC_CALLS_CONNECTION_TIMEOUT, 10) || 60000,
rpcCallConnectionQuickTimeout: parseInt(RPC_CALLS_CONNECTION_QUICK_TIMEOUT, 10) || 10000,

// maximum number of requests to allow in a batch.
// If rpcBatchMaxCount = 1, then batching is disabled.
rpcBatchMaxCount: parseInt(RPC_BATCH_MAX_COUNT, 10) || 10,
// target maximum size (bytes) to allow per batch request (default: 1Mb)
// If rpcBatchMaxCount = 1, this is ignored.
rpcBatchMaxSizeBytes: parseInt(RPC_BATCH_MAX_SIZE_BYTES, 10) || 1048576,
// how long (ms) to aggregate requests into a single batch.
// 0 indicates batching will only encompass the current event loop.
// If rpcBatchMaxCount = 1, this is ignored.
rpcBatchStallTimeMs: parseInt(RPC_BATCH_STALL_TIME_MS, 10) || 0,
},
maxBlocksBatchSize: parseInt(MAX_BLOCKS_BATCH_SIZE, 10) || 20,
gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0,
Expand Down
12 changes: 11 additions & 1 deletion packages/data-fetcher/src/rpcProvider/jsonRpcProvider.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ import { JsonRpcProviderBase, JsonRpcProviderExtended } from "./index";
const providerUrl = configService.get<string>("blockchain.rpcUrl");
const connectionTimeout = configService.get<number>("blockchain.rpcCallConnectionTimeout");
const connectionQuickTimeout = configService.get<number>("blockchain.rpcCallConnectionQuickTimeout");
const batchMaxSizeBytes = configService.get<number>("blockchain.rpcBatchMaxSizeBytes");
const batchMaxCount = configService.get<number>("blockchain.rpcBatchMaxCount");
const batchStallTimeMs = configService.get<number>("blockchain.rpcBatchStallTimeMs");
const providerUrlProtocol = new URL(providerUrl).protocol;

logger.debug(`Initializing RPC provider with the following URL: ${providerUrl}.`, "RpcProviderModule");

if (providerUrlProtocol === "http:" || providerUrlProtocol === "https:") {
return new JsonRpcProviderExtended(providerUrl, connectionTimeout, connectionQuickTimeout);
return new JsonRpcProviderExtended(
providerUrl,
connectionTimeout,
connectionQuickTimeout,
batchMaxCount,
batchMaxSizeBytes,
batchStallTimeMs
);
}

throw new Error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,21 @@ describe("JsonRpcProviderExtended", () => {
let jsonRpcProvider: JsonRpcProviderExtended;
const timer = mock<NodeJS.Timeout>();
let lastCallback: () => void;
const quickTimeout = 10_000;
const batchMaxCount = 10;
const batchMaxSizeBytes = 5_000;
const batchStallTimeMs = 7_000;
const rpcUrl = "url";

beforeEach(async () => {
jsonRpcProvider = new JsonRpcProviderExtended("url", 120_000, 10_000);
jsonRpcProvider = new JsonRpcProviderExtended(
rpcUrl,
120_000,
quickTimeout,
batchMaxCount,
batchMaxSizeBytes,
batchStallTimeMs
);

jest.spyOn(global, "setTimeout").mockImplementation((callback: () => void) => {
lastCallback = callback;
Expand Down Expand Up @@ -46,7 +58,7 @@ describe("JsonRpcProviderExtended", () => {
it("starts quick timeout", async () => {
await jsonRpcProvider.send("method", [1, 2]);
expect(global.setTimeout).toBeCalledTimes(1);
expect(global.setTimeout).toBeCalledWith(expect.any(Function), 10_000);
expect(global.setTimeout).toBeCalledWith(expect.any(Function), quickTimeout);
});

it("clears quick timeout", async () => {
Expand Down Expand Up @@ -85,7 +97,7 @@ describe("JsonRpcProviderExtended", () => {
it("waits for internal timeout and returns the result of the second call", async () => {
const result = await jsonRpcProvider.send("method", [1, 2]);
expect(global.setTimeout).toBeCalledTimes(1);
expect(global.setTimeout).toBeCalledWith(expect.any(Function), 10_000);
expect(global.setTimeout).toBeCalledWith(expect.any(Function), quickTimeout);
expect(result).toStrictEqual({
method: "method2",
params: [2, 3],
Expand Down
13 changes: 12 additions & 1 deletion packages/data-fetcher/src/rpcProvider/jsonRpcProviderExtended.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@ export class QuickTimeoutError extends Error {

export class JsonRpcProviderExtended extends Provider implements JsonRpcProviderBase {
private readonly connectionQuickTimeout;
constructor(providerUrl: string, connectionTimeout: number, connectionQuickTimeout: number) {
constructor(
providerUrl: string,
connectionTimeout: number,
connectionQuickTimeout: number,
batchMaxCount: number,
batchMaxSizeBytes: number,
batchStallTimeMs: number
) {
super(providerUrl, undefined, {
timeout: connectionTimeout,
batchMaxSize: batchMaxSizeBytes,
batchMaxCount: batchMaxCount,
staticNetwork: true,
batchStallTime: batchStallTimeMs,
});
this.connectionQuickTimeout = connectionQuickTimeout;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/worker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ RPC_CALLS_DEFAULT_RETRY_TIMEOUT=30000
RPC_CALLS_QUICK_RETRY_TIMEOUT=500
RPC_CALLS_CONNECTION_TIMEOUT=20000
RPC_CALLS_CONNECTION_QUICK_TIMEOUT=10000
RPC_BATCH_MAX_COUNT=10
RPC_BATCH_MAX_SIZE_BYTES=1048576
RPC_BATCH_STALL_TIME_MS=0

COLLECT_DB_CONNECTION_POOL_METRICS_INTERVAL=10000
COLLECT_BLOCKS_TO_PROCESS_METRIC_INTERVAL=10000
Expand Down
110 changes: 110 additions & 0 deletions packages/worker/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,62 @@ import config from "./config";

describe("config", () => {
const env = process.env;
let defaultConfig;

beforeAll(() => {
process.env = {};

defaultConfig = {
port: 3001,
blockchain: {
rpcUrl: "http://localhost:3050",
rpcCallDefaultRetryTimeout: 30000,
rpcCallQuickRetryTimeout: 500,
rpcCallConnectionTimeout: 20000,
rpcCallConnectionQuickTimeout: 10000,
rpcBatchMaxCount: 10,
rpcBatchMaxSizeBytes: 1048576,
rpcBatchStallTimeMs: 0,
},
dataFetcher: {
url: "http://localhost:3040",
requestTimeout: 150_000,
},
blocks: {
waitForBlocksInterval: 1000,
blocksProcessingBatchSize: 50,
fromBlock: 0,
toBlock: null,
disableBlocksRevert: false,
numberOfBlocksPerDbTransaction: 50,
},
batches: {
batchesProcessingPollingInterval: 60000,
disableBatchesProcessing: false,
},
balances: {
deleteBalancesInterval: 300000,
disableOldBalancesCleaner: false,
},
counters: {
recordsBatchSize: 20000,
updateInterval: 30000,
disableCountersProcessing: false,
},
tokens: {
enableTokenOffChainDataSaver: false,
updateTokenOffChainDataInterval: 86_400_000,
tokenOffChainDataProviders: ["coingecko", "portalsFi"],
selectedTokenOffChainDataProvider: "coingecko",
coingecko: {
isProPlan: false,
},
},
metrics: {
collectDbConnectionPoolMetricsInterval: 10000,
collectBlocksToProcessMetricInterval: 10000,
},
};
});

afterAll(() => {
Expand All @@ -20,6 +73,9 @@ describe("config", () => {
rpcCallQuickRetryTimeout: 500,
rpcCallConnectionTimeout: 20000,
rpcCallConnectionQuickTimeout: 10000,
rpcBatchMaxCount: 10,
rpcBatchMaxSizeBytes: 1048576,
rpcBatchStallTimeMs: 0,
},
dataFetcher: {
url: "http://localhost:3040",
Expand Down Expand Up @@ -61,4 +117,58 @@ describe("config", () => {
},
});
});

describe("when RPC_BATCH_MAX_COUNT is specified in env vars", () => {
beforeEach(() => {
process.env = {
RPC_BATCH_MAX_COUNT: "40",
};
});

it("sets rpcBatchMaxCount to the value from env vars", () => {
expect(config()).toEqual({
...defaultConfig,
blockchain: {
...defaultConfig.blockchain,
rpcBatchMaxCount: 40,
},
});
});
});

describe("when RPC_BATCH_MAX_SIZE_BYTES is specified in env vars", () => {
beforeEach(() => {
process.env = {
RPC_BATCH_MAX_SIZE_BYTES: "50",
};
});

it("sets rpcBatchMaxSizeBytes to the value from env vars", () => {
expect(config()).toEqual({
...defaultConfig,
blockchain: {
...defaultConfig.blockchain,
rpcBatchMaxSizeBytes: 50,
},
});
});
});

describe("when RPC_BATCH_STALL_TIME_MS is specified in env vars", () => {
beforeEach(() => {
process.env = {
RPC_BATCH_STALL_TIME_MS: "7",
};
});

it("sets rpcBatchMaxSizeBytes to the value from env vars", () => {
expect(config()).toEqual({
...defaultConfig,
blockchain: {
...defaultConfig.blockchain,
rpcBatchStallTimeMs: 7,
},
});
});
});
});
Loading

0 comments on commit 1388956

Please sign in to comment.