diff --git a/db/package.json b/db/package.json index cca405e..2e37975 100644 --- a/db/package.json +++ b/db/package.json @@ -21,4 +21,4 @@ "mocha" ] } -} +} \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index c16fbbf..9e35a09 100644 --- a/package-lock.json +++ b/package-lock.json @@ -301,6 +301,28 @@ "dequal": "^2.0.3" } }, + "node_modules/@fastify/postgres": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/@fastify/postgres/-/postgres-6.0.2.tgz", + "integrity": "sha512-b8JKy/aNcz/iKzFEe0Qwx7xkk2lmcQXxylg6jOctYCpNW/shSJZunIhMrTI3J5GQQubtMmRGXEiwfKdq5h4LNQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT", + "dependencies": { + "fastify-plugin": "^5.0.0" + }, + "peerDependencies": { + "pg": ">=6.0.0" + } + }, "node_modules/@fastify/proxy-addr": { "version": "5.0.0", "license": "MIT", @@ -5721,7 +5743,6 @@ "version": "8.14.1", "resolved": "https://registry.npmjs.org/pg/-/pg-8.14.1.tgz", "integrity": "sha512-0TdbqfjwIun9Fm/r89oB7RFQ0bLgduAhiIqIXOsyKoiC/L54DbuAAzIEN/9Op0f1Po9X7iCPXGoa/Ah+2aI8Xw==", - "license": "MIT", "dependencies": { "pg-connection-string": "^2.7.0", "pg-pool": "^3.8.0", @@ -5771,7 +5792,6 @@ "version": "3.8.0", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.8.0.tgz", "integrity": "sha512-VBw3jiVm6ZOdLBTIcXLNdSotb6Iy3uOCwDGFAksZCXmi10nyRvnP2v3jl4d+IsLYRyXf6o9hIm/ZtUzlByNUdw==", - "license": "MIT", "peerDependencies": { "pg": ">=8.0" } @@ -5779,8 +5799,7 @@ "node_modules/pg-protocol": { "version": "1.8.0", "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.8.0.tgz", - "integrity": "sha512-jvuYlEkL03NRvOoyoRktBK7+qU5kOvlAwvmrH8sr3wbLrOdVWsRxQfz8mMy9sZFsqJ1hEWNfdWKI4SAmoL+j7g==", - "license": "MIT" + "integrity": "sha512-jvuYlEkL03NRvOoyoRktBK7+qU5kOvlAwvmrH8sr3wbLrOdVWsRxQfz8mMy9sZFsqJ1hEWNfdWKI4SAmoL+j7g==" }, "node_modules/pg-types": { "version": "4.0.2", @@ -7418,6 +7437,7 @@ "name": "@filecoin-station/spark-stats", "dependencies": { "@fastify/cors": "^11.0.1", + "@fastify/postgres": "^6.0.2", "@fastify/url-data": "^6.0.3", "@filecoin-station/spark-stats-db": "^1.0.0", "@sentry/node": "^9.10.1", diff --git a/stats/bin/spark-stats.js b/stats/bin/spark-stats.js index 9771a3a..626e4c9 100644 --- a/stats/bin/spark-stats.js +++ b/stats/bin/spark-stats.js @@ -1,23 +1,24 @@ import '../lib/instrument.js' import { createApp } from '../lib/app.js' -import { getPgPools } from '@filecoin-station/spark-stats-db' const { PORT = '8080', HOST = '127.0.0.1', SPARK_API_BASE_URL = 'https://api.filspark.com/', - REQUEST_LOGGING = 'true' + REQUEST_LOGGING = 'true', + DATABASE_URL = 'postgres://localhost:5432/spark_stats', + EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate' } = process.env -const pgPools = await getPgPools() - const app = await createApp({ SPARK_API_BASE_URL, - pgPools, + DATABASE_URL, + EVALUATE_DB_URL, logger: { level: ['1', 'true'].includes(REQUEST_LOGGING) ? 'info' : 'error' } }) + console.log('Starting the http server on host %j port %s', HOST, PORT) const baseUrl = app.listen({ port: Number(PORT), host: HOST }) console.log(baseUrl) diff --git a/stats/lib/app.js b/stats/lib/app.js index 611f781..942e428 100644 --- a/stats/lib/app.js +++ b/stats/lib/app.js @@ -2,28 +2,41 @@ import * as Sentry from '@sentry/node' import Fastify from 'fastify' import cors from '@fastify/cors' import urlData from '@fastify/url-data' +import fastifyPostgres from '@fastify/postgres' import { addRoutes } from './routes.js' import { addPlatformRoutes } from './platform-routes.js' -/** @typedef {import('@filecoin-station/spark-stats-db').PgPools} PgPools */ /** @typedef {import('./typings.js').DateRangeFilter} DateRangeFilter */ /** * @param {object} args * @param {string} args.SPARK_API_BASE_URL - * @param {import('@filecoin-station/spark-stats-db').PgPools} args.pgPools - * @param {Fastify.FastifyLoggerOptions} args.logger - * @returns + * @param {string} args.DATABASE_URL - Connection string for stats database + * @param {string} args.EVALUATE_DB_URL - Connection string for evaluate database + * @param {import('fastify').FastifyLoggerOptions} args.logger + * @returns {Promise} */ -export const createApp = ({ + +export const createApp = async ({ SPARK_API_BASE_URL, - pgPools, + DATABASE_URL, + EVALUATE_DB_URL, logger }) => { const app = Fastify({ logger }) Sentry.setupFastifyErrorHandler(app) + await app.register(fastifyPostgres, { + connectionString: DATABASE_URL, + name: 'stats' + }) + + await app.register(fastifyPostgres, { + connectionString: EVALUATE_DB_URL, + name: 'evaluate' + }) + app.register(cors, { origin: [ 'http://localhost:3000', @@ -38,8 +51,8 @@ export const createApp = ({ ] }) app.register(urlData) - addRoutes(app, pgPools, SPARK_API_BASE_URL) - addPlatformRoutes(app, pgPools) + addRoutes(app, SPARK_API_BASE_URL) + addPlatformRoutes(app) app.get('/', (request, reply) => { reply.send('OK') }) diff --git a/stats/lib/platform-routes.js b/stats/lib/platform-routes.js index 7d1836d..7fe85f0 100644 --- a/stats/lib/platform-routes.js +++ b/stats/lib/platform-routes.js @@ -14,39 +14,40 @@ import { filterPreHandlerHook, filterOnSendHook } from './request-helpers.js' /** @typedef {import('./typings.js').RequestWithFilter} RequestWithFilter */ -export const addPlatformRoutes = (app, pgPools) => { +export const addPlatformRoutes = (app) => { app.register(async app => { app.addHook('preHandler', filterPreHandlerHook) app.addHook('onSend', filterOnSendHook) app.get('/stations/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyStationCount(pgPools.evaluate, request.filter)) + reply.send(await fetchDailyStationCount(request.server.pg, request.filter)) }) app.get('/stations/monthly', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchMonthlyStationCount(pgPools.evaluate, request.filter)) + reply.send(await fetchMonthlyStationCount(request.server.pg, request.filter)) }) app.get('/stations/desktop/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyDesktopUsers(pgPools.stats, request.filter)) + reply.send(await fetchDailyDesktopUsers(request.server.pg, request.filter)) }) app.get('/measurements/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyStationMeasurementCounts(pgPools.evaluate, request.filter)) + reply.send(await fetchDailyStationMeasurementCounts(request.server.pg, request.filter)) }) app.get('/participants/top-measurements', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchParticipantsWithTopMeasurements(pgPools.evaluate, request.filter)) + reply.send(await fetchParticipantsWithTopMeasurements(request.server.pg, request.filter)) }) app.get('/participants/top-earning', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchTopEarningParticipants(pgPools.stats, request.filter)) + reply.send(await fetchTopEarningParticipants(request.server.pg, request.filter)) }) + app.get('/participants/accumulative/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchAccumulativeDailyParticipantCount(pgPools.evaluate, request.filter)) + reply.send(await fetchAccumulativeDailyParticipantCount(request.server.pg, request.filter)) }) app.get('/transfers/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyRewardTransfers(pgPools.stats, request.filter)) + reply.send(await fetchDailyRewardTransfers(request.server.pg, request.filter)) }) }) app.get('/participants/summary', async (request, reply) => { reply.header('cache-control', `public, max-age=${24 * 3600 /* one day */}`) - reply.send(await fetchParticipantsSummary(pgPools.evaluate)) + reply.send(await fetchParticipantsSummary(request.server.pg)) }) } diff --git a/stats/lib/platform-stats-fetchers.js b/stats/lib/platform-stats-fetchers.js index d4533f9..1a3e292 100644 --- a/stats/lib/platform-stats-fetchers.js +++ b/stats/lib/platform-stats-fetchers.js @@ -1,16 +1,24 @@ import assert from 'http-assert' import { today, yesterday } from './request-helpers.js' -/** @typedef {import('@filecoin-station/spark-stats-db').Queryable} Queryable */ +/** + @typedef {import('./typings.js').DateRangeFilter} DateRangeFilter + @typedef {import('@filecoin-station/spark-stats-db').Queryable} Queryable + @typedef {import('./typings.js').FastifyPg} FastifyPg +*/ + +/** + * @param {FastifyPg} pgPools + */ const ONE_DAY = 24 * 60 * 60 * 1000 /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchDailyStationCount = async (pgPool, filter) => { - const { rows } = await pgPool.query(` +export const fetchDailyStationCount = async (pgPools, filter) => { + const { rows } = await pgPools.evaluate.query(` SELECT day::TEXT, station_count FROM daily_platform_stats WHERE day >= $1 AND day <= $2 @@ -20,11 +28,11 @@ export const fetchDailyStationCount = async (pgPool, filter) => { } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchMonthlyStationCount = async (pgPool, filter) => { - const { rows } = await pgPool.query(` +export const fetchMonthlyStationCount = async (pgPools, filter) => { + const { rows } = await pgPools.evaluate.query(` SELECT month::TEXT, station_count FROM monthly_active_station_count WHERE @@ -36,11 +44,11 @@ export const fetchMonthlyStationCount = async (pgPool, filter) => { } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchDailyStationMeasurementCounts = async (pgPool, filter) => { - const { rows } = await pgPool.query(` +export const fetchDailyStationMeasurementCounts = async (pgPools, filter) => { + const { rows } = await pgPools.evaluate.query(` SELECT day::TEXT, accepted_measurement_count, total_measurement_count FROM daily_platform_stats WHERE day >= $1 AND day <= $2 @@ -50,31 +58,31 @@ export const fetchDailyStationMeasurementCounts = async (pgPool, filter) => { } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchParticipantsWithTopMeasurements = async (pgPool, filter) => { +export const fetchParticipantsWithTopMeasurements = async (pgPools, filter) => { assert(filter.to === filter.from, 400, 'Multi-day queries are not supported for this endpoint') assert(filter.to === yesterday(), 400, 'filter.to must be set to yesterday, other values are not supported yet') // Ignore the filter for this query // Get the top measurement stations from the Materialized View - return (await pgPool.query(` + return (await pgPools.evaluate.query(` SELECT day::TEXT, participant_address, station_count, accepted_measurement_count, inet_group_count FROM top_measurement_participants_yesterday_mv `)).rows } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchDailyRewardTransfers = async (pgPool, filter) => { +export const fetchDailyRewardTransfers = async (pgPools, filter) => { assert( new Date(filter.to).getTime() - new Date(filter.from).getTime() <= 31 * ONE_DAY, 400, 'Date range must be 31 days max' ) - const { rows } = await pgPool.query(` + const { rows } = await pgPools.stats.query(` SELECT day::TEXT, to_address, amount FROM daily_reward_transfers WHERE day >= $1 AND day <= $2 @@ -99,11 +107,11 @@ export const fetchDailyRewardTransfers = async (pgPool, filter) => { } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchAccumulativeDailyParticipantCount = async (pgPool, filter) => { - const { rows } = await pgPool.query(` +export const fetchAccumulativeDailyParticipantCount = async (pgPools, filter) => { + const { rows } = await pgPools.evaluate.query(` WITH first_appearance AS ( SELECT participant_id, MIN(day) as day FROM daily_participants @@ -126,15 +134,15 @@ export const fetchAccumulativeDailyParticipantCount = async (pgPool, filter) => } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchTopEarningParticipants = async (pgPool, filter) => { +export const fetchTopEarningParticipants = async (pgPools, filter) => { // The query combines "transfers until filter.to" with "latest scheduled rewards as of today". // As a result, it produces incorrect result if `to` is different from `now()`. // See https://github.com/filecoin-station/spark-stats/pull/170#discussion_r1664080395 assert(filter.to === today(), 400, 'filter.to must be today, other values are not supported') - const { rows } = await pgPool.query(` + const { rows } = await pgPools.stats.query(` WITH latest_scheduled_rewards AS ( SELECT DISTINCT ON (participant_address) participant_address, scheduled_rewards FROM daily_scheduled_rewards @@ -154,10 +162,10 @@ export const fetchTopEarningParticipants = async (pgPool, filter) => { } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools */ -export const fetchParticipantsSummary = async (pgPool) => { - const { rows } = await pgPool.query(` +export const fetchParticipantsSummary = async (pgPools) => { + const { rows } = await pgPools.evaluate.query(` SELECT COUNT(DISTINCT participant_id) FROM daily_participants `) return { @@ -166,11 +174,11 @@ export const fetchParticipantsSummary = async (pgPool) => { } /** - * @param {Queryable} pgPool + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ -export const fetchDailyDesktopUsers = async (pgPool, filter) => { - const { rows } = await pgPool.query(` +export const fetchDailyDesktopUsers = async (pgPools, filter) => { + const { rows } = await pgPools.stats.query(` SELECT day::TEXT, user_count diff --git a/stats/lib/routes.js b/stats/lib/routes.js index 8ba3bdd..095b967 100644 --- a/stats/lib/routes.js +++ b/stats/lib/routes.js @@ -25,64 +25,65 @@ import { /** @typedef {import('./typings.js').RequestWithFilterAndMinerId} RequestWithFilterAndMinerId */ /** @typedef {import('./typings.js').RequestWithFilterAndClientId} RequestWithFilterAndClientId */ -export const addRoutes = (app, pgPools, SPARK_API_BASE_URL) => { +export const addRoutes = (app, SPARK_API_BASE_URL) => { app.register(async app => { app.addHook('preHandler', filterPreHandlerHook) app.addHook('onSend', filterOnSendHook) app.get('/deals/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyDealStats(pgPools, request.filter)) + reply.send(await fetchDailyDealStats(request.server.pg, request.filter)) }) + app.get('/deals/summary', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDealSummary(pgPools, request.filter)) + reply.send(await fetchDealSummary(request.server.pg, request.filter)) }) app.get('/retrieval-success-rate', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchRetrievalSuccessRate(pgPools, request.filter)) + reply.send(await fetchRetrievalSuccessRate(request.server.pg, request.filter)) }) app.get('/participants/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyParticipants(pgPools, request.filter)) + reply.send(await fetchDailyParticipants(request.server.pg, request.filter)) }) app.get('/participants/monthly', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchMonthlyParticipants(pgPools, request.filter)) + reply.send(await fetchMonthlyParticipants(request.server.pg, request.filter)) }) app.get('/participants/change-rates', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchParticipantChangeRates(pgPools, request.filter)) + reply.send(await fetchParticipantChangeRates(request.server.pg, request.filter)) }) app.get('/participant/:address/scheduled-rewards', async (/** @type {RequestWithFilterAndAddress} */ request, reply) => { - reply.send(await fetchParticipantScheduledRewards(pgPools, request.filter, request.params.address)) + reply.send(await fetchParticipantScheduledRewards(request.server.pg, request.filter, request.params.address)) }) app.get('/participant/:address/reward-transfers', async (/** @type {RequestWithFilterAndAddress} */ request, reply) => { - reply.send(await fetchParticipantRewardTransfers(pgPools, request.filter, request.params.address)) + reply.send(await fetchParticipantRewardTransfers(request.server.pg, request.filter, request.params.address)) }) app.get('/miners/retrieval-success-rate/summary', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchMinersRSRSummary(pgPools, request.filter)) + reply.send(await fetchMinersRSRSummary(request.server.pg, request.filter)) }) app.get('/miners/retrieval-timings/summary', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchMinersTimingsSummary(pgPools, request.filter)) + reply.send(await fetchMinersTimingsSummary(request.server.pg, request.filter)) }) app.get('/retrieval-result-codes/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyRetrievalResultCodes(pgPools, request.filter)) + reply.send(await fetchDailyRetrievalResultCodes(request.server.pg, request.filter)) }) app.get('/retrieval-timings/daily', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchDailyRetrievalTimings(pgPools, request.filter)) + reply.send(await fetchDailyRetrievalTimings(request.server.pg, request.filter)) }) app.get('/miner/:minerId/retrieval-timings/summary', async (/** @type {RequestWithFilterAndMinerId} */ request, reply) => { - reply.send(await fetchDailyMinerRetrievalTimings(pgPools, request.filter, request.params.minerId)) + reply.send(await fetchDailyMinerRetrievalTimings(request.server.pg, request.filter, request.params.minerId)) }) app.get('/miner/:minerId/retrieval-success-rate/summary', async (/** @type {RequestWithFilterAndMinerId} */ request, reply) => { - reply.send(await fetchDailyMinerRSRSummary(pgPools, request.filter, request.params.minerId)) + reply.send(await fetchDailyMinerRSRSummary(request.server.pg, request.filter, request.params.minerId)) }) app.get('/clients/retrieval-success-rate/summary', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchClientsRSRSummary(pgPools, request.filter)) + reply.send(await fetchClientsRSRSummary(request.server.pg, request.filter)) }) app.get('/client/:clientId/retrieval-success-rate/summary', async (/** @type {RequestWithFilterAndClientId} */ request, reply) => { - reply.send(await fetchDailyClientRSRSummary(pgPools, request.filter, request.params.clientId)) + reply.send(await fetchDailyClientRSRSummary(request.server.pg, request.filter, request.params.clientId)) }) app.get('/allocators/retrieval-success-rate/summary', async (/** @type {RequestWithFilter} */ request, reply) => { - reply.send(await fetchAllocatorsRSRSummary(pgPools, request.filter)) + reply.send(await fetchAllocatorsRSRSummary(request.server.pg, request.filter)) }) app.get('/allocator/:allocatorId/retrieval-success-rate/summary', async (/** @type {RequestWithFilterAndClientId} */ request, reply) => { - reply.send(await fetchDailyAllocatorRSRSummary(pgPools, request.filter, request.params.allocatorId)) + reply.send(await fetchDailyAllocatorRSRSummary(request.server.pg, request.filter, request.params.allocatorId)) }) }) diff --git a/stats/lib/stats-fetchers.js b/stats/lib/stats-fetchers.js index 8179231..d79f1fb 100644 --- a/stats/lib/stats-fetchers.js +++ b/stats/lib/stats-fetchers.js @@ -1,8 +1,13 @@ -/** @typedef {import('@filecoin-station/spark-stats-db').PgPools} PgPools */ /** - * @param {PgPools} pgPools + @typedef {import('@filecoin-station/spark-stats-db').Queryable} Queryable + @typedef {import('./typings.js').FastifyPg} FastifyPg + */ + +/** * @param {import('./typings.js').DateRangeFilter & {nonZero?: 'true'}} filter + * @param {FastifyPg} pgPools */ + export const fetchRetrievalSuccessRate = async (pgPools, filter) => { // Fetch the "day" (DATE) as a string (TEXT) to prevent node-postgres for converting it into // a JavaScript Date with a timezone, as that could change the date one day forward or back. @@ -36,7 +41,7 @@ export const fetchRetrievalSuccessRate = async (pgPools, filter) => { } /** - * @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ export const fetchDailyDealStats = async (pgPools, filter) => { @@ -63,7 +68,7 @@ export const fetchDailyDealStats = async (pgPools, filter) => { } /** - * @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ export const fetchDealSummary = async (pgPools, filter) => { @@ -115,7 +120,7 @@ export const fetchMonthlyParticipants = async (pgPools, filter) => { } /** - * @param {PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ export const fetchParticipantChangeRates = async (pgPools, filter) => { @@ -183,7 +188,7 @@ export const fetchParticipantChangeRates = async (pgPools, filter) => { } /** - * @param {PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter * @param {string} address */ @@ -197,7 +202,7 @@ export const fetchParticipantScheduledRewards = async (pgPools, { from, to }, ad } /** - * @param {PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter * @param {string} address */ @@ -212,7 +217,7 @@ export const fetchParticipantRewardTransfers = async (pgPools, { from, to }, add /** * Fetches the retrieval stats summary for all miners for given date range. - * @param {PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ export const fetchMinersRSRSummary = async (pgPools, filter) => { @@ -246,7 +251,7 @@ export const fetchMinersRSRSummary = async (pgPools, filter) => { /** * Fetches the retrieval stats summary for a single miner for given date range. - * @param {PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter * @param {string} minerId */ @@ -302,7 +307,7 @@ export const fetchDailyRetrievalResultCodes = async (pgPools, filter) => { /** * Fetches daily global retrieval time statistics - * @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ export const fetchDailyRetrievalTimings = async (pgPools, filter) => { @@ -323,7 +328,7 @@ export const fetchDailyRetrievalTimings = async (pgPools, filter) => { /** * Fetches per miner daily retrieval time statistics - * @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools + * @param {{stats: Queryable, evaluate: Queryable}} pgPools * @param {import('./typings.js').DateRangeFilter} filter * @param {string} minerId */ @@ -347,9 +352,10 @@ export const fetchDailyMinerRetrievalTimings = async (pgPools, { from, to }, min /** * Fetches retrieval time statistics summary for all miners for given date range. - * @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools + /** + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter - */ +*/ export const fetchMinersTimingsSummary = async (pgPools, { from, to }) => { const { rows } = await pgPools.evaluate.query(` SELECT @@ -367,10 +373,10 @@ export const fetchMinersTimingsSummary = async (pgPools, { from, to }) => { } /** - * Fetches the retrieval stats summary for all clients for given date range. - * @param {PgPools} pgPools - * @param {import('./typings.js').DateRangeFilter} filter - */ +* Fetches the retrieval stats summary for all clients for given date range. +* @param {FastifyPg} pgPools +* @param {import('./typings.js').DateRangeFilter} filter +*/ export const fetchClientsRSRSummary = async (pgPools, filter) => { const { rows } = await pgPools.evaluate.query(` SELECT @@ -400,7 +406,7 @@ export const fetchClientsRSRSummary = async (pgPools, filter) => { /** * Fetches the retrieval stats summary for a single client for given date range. - * @param {PgPools} pgPools + * @param {{stats: Queryable, evaluate: Queryable}} pgPools * @param {import('./typings.js').DateRangeFilter} filter * @param {string} clientId */ @@ -434,7 +440,7 @@ export const fetchDailyClientRSRSummary = async (pgPools, { from, to }, clientId /** * Fetches the retrieval stats summary for all allocators for given date range. - * @param {PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter */ export const fetchAllocatorsRSRSummary = async (pgPools, filter) => { @@ -466,7 +472,7 @@ export const fetchAllocatorsRSRSummary = async (pgPools, filter) => { /** * Fetches the retrieval stats summary for a single allocator for given date range. - * @param {PgPools} pgPools + * @param {FastifyPg} pgPools * @param {import('./typings.js').DateRangeFilter} filter * @param {string} allocatorId */ diff --git a/stats/lib/typings.d.ts b/stats/lib/typings.d.ts index 377374e..7237960 100644 --- a/stats/lib/typings.d.ts +++ b/stats/lib/typings.d.ts @@ -1,10 +1,16 @@ import { FastifyRequest } from 'fastify' +import { PostgresDb } from '@fastify/postgres' +import { Queryable } from '@filecoin-station/spark-stats-db' + + +export type FastifyPg = PostgresDb & Record export interface DateRangeFilter { from: string; to: string; } + export type RequestWithFilter = FastifyRequest<{ Querystring: { from: string, to: string } }> diff --git a/stats/package.json b/stats/package.json index 4521a06..4f3e20c 100644 --- a/stats/package.json +++ b/stats/package.json @@ -16,6 +16,7 @@ "dependencies": { "@fastify/cors": "^11.0.1", "@fastify/url-data": "^6.0.3", + "@fastify/postgres": "^6.0.2", "@filecoin-station/spark-stats-db": "^1.0.0", "@sentry/node": "^9.10.1", "@sentry/profiling-node": "^9.10.1", diff --git a/stats/test/app.test.js b/stats/test/app.test.js index 5fa9dde..e0c5a12 100644 --- a/stats/test/app.test.js +++ b/stats/test/app.test.js @@ -1,5 +1,4 @@ import assert from 'node:assert' -import { getPgPools } from '@filecoin-station/spark-stats-db' import { givenDailyParticipants } from '@filecoin-station/spark-stats-db/test-helpers.js' import { assertResponseStatus } from './test-helpers.js' @@ -7,32 +6,34 @@ import { createApp } from '../lib/app.js' import { today } from '../lib/request-helpers.js' describe('HTTP request handler', () => { - /** @type {import('@filecoin-station/spark-stats-db').PgPools} */ - let pgPools /** @type {import('fastify').FastifyInstance} */ let app /** @type {string} */ let baseUrl + let pgPools before(async () => { - pgPools = await getPgPools() + // Use test database connection strings + const DATABASE_URL = 'postgres://postgres:postgres@localhost:5432/spark_stats' + const EVALUATE_DB_URL = 'postgres://postgres:postgres@localhost:5432/spark_evaluate' - app = createApp({ + app = await createApp({ SPARK_API_BASE_URL: 'https://api.filspark.com/', - pgPools, + DATABASE_URL, + EVALUATE_DB_URL, logger: { level: process.env.DEBUG === '*' || process.env.DEBUG?.includes('test') ? 'debug' : 'error' } }) + pgPools = app.pg baseUrl = await app.listen() }) after(async () => { - await app.close() - await pgPools.end() + await app?.close() }) beforeEach(async () => { @@ -43,6 +44,7 @@ describe('HTTP request handler', () => { await pgPools.stats.query('DELETE FROM daily_scheduled_rewards') await pgPools.stats.query('DELETE FROM daily_reward_transfers') await pgPools.stats.query('DELETE FROM daily_retrieval_result_codes') + await pgPools.evaluate.query('DELETE FROM daily_client_retrieval_stats') }) it('returns 200 for GET /', async () => { @@ -1012,7 +1014,8 @@ describe('HTTP request handler', () => { /** * - * @param {import('../lib/platform-stats-fetchers.js').Queryable} pgPool + * @param {Object} pg - Fastify pg object with database connections + * @param {object} data * @param {string} data.day * @param {string} [data.minerId] @@ -1021,8 +1024,8 @@ describe('HTTP request handler', () => { * @param {number | bigint} [data.successfulHttp] * @param {number | bigint} [data.successfulHttpHead] */ -const givenRetrievalStats = async (pgPool, { day, minerId, total, successful, successfulHttp, successfulHttpHead }) => { - await pgPool.query( +const givenRetrievalStats = async (pg, { day, minerId, total, successful, successfulHttp, successfulHttpHead }) => { + await pg.query( 'INSERT INTO retrieval_stats (day, miner_id, total, successful, successful_http, successful_http_head) VALUES ($1, $2, $3, $4, $5, $6)', [day, minerId ?? 'f1test', total, successful, successfulHttp, successfulHttpHead] ) @@ -1030,7 +1033,7 @@ const givenRetrievalStats = async (pgPool, { day, minerId, total, successful, su /** * - * @param {import('../lib/platform-stats-fetchers.js').Queryable} pgPool + * @param {Object} pg - Fastify pg object with database connections * @param {object} data * @param {string} data.day * @param {string} [data.clientId] @@ -1038,8 +1041,8 @@ const givenRetrievalStats = async (pgPool, { day, minerId, total, successful, su * @param {number | bigint } data.successful * @param {number | bigint} [data.successfulHttp] */ -const givenClientRetrievalStats = async (pgPool, { day, clientId, total, successful, successfulHttp }) => { - await pgPool.query( +const givenClientRetrievalStats = async (pg, { day, clientId, total, successful, successfulHttp }) => { + await pg.query( 'INSERT INTO daily_client_retrieval_stats (day, client_id, total, successful, successful_http) VALUES ($1, $2, $3, $4, $5)', [day, clientId ?? 'f1ClientTest', total, successful, successfulHttp] ) @@ -1047,7 +1050,7 @@ const givenClientRetrievalStats = async (pgPool, { day, clientId, total, success /** * - * @param {import('../lib/platform-stats-fetchers.js').Queryable} pgPool + * @param {Object} pg - Fastify pg object with database connections * @param {object} data * @param {string} data.day * @param {string} [data.allocatorId] @@ -1055,8 +1058,8 @@ const givenClientRetrievalStats = async (pgPool, { day, clientId, total, success * @param {number | bigint } data.successful * @param {number | bigint} [data.successfulHttp] */ -const givenAllocatorRetrievalStats = async (pgPool, { day, allocatorId, total, successful, successfulHttp }) => { - await pgPool.query( +const givenAllocatorRetrievalStats = async (pg, { day, allocatorId, total, successful, successfulHttp }) => { + await pg.query( 'INSERT INTO daily_allocator_retrieval_stats (day, allocator_id, total, successful, successful_http) VALUES ($1, $2, $3, $4, $5)', [day, allocatorId ?? 'f1AllocatorTest', total, successful, successfulHttp] ) @@ -1064,7 +1067,7 @@ const givenAllocatorRetrievalStats = async (pgPool, { day, allocatorId, total, s /** * - * @param {import('@filecoin-station/spark-stats-db').Queryable} pgPool + * @param {import('@filecoin-station/spark-stats-db').Queryable} pg * @param {{ * day: string; * minerId?: string; @@ -1077,7 +1080,7 @@ const givenAllocatorRetrievalStats = async (pgPool, { day, allocatorId, total, s * retrievable?: number; * }} stats */ -const givenDailyDealStats = async (pgPool, { +const givenDailyDealStats = async (pg, { day, minerId, clientId, @@ -1095,7 +1098,7 @@ const givenDailyDealStats = async (pgPool, { retrievable ??= tested retrievalMajorityFound ??= retrievable - await pgPool.query(` + await pg.query(` INSERT INTO daily_deals ( day, miner_id, @@ -1122,14 +1125,14 @@ const givenDailyDealStats = async (pgPool, { /** * - * @param {import('../lib/platform-stats-fetchers.js').Queryable} pgPool + * @param {Object} pg - Fastify pg object with database connections * @param {object} data * @param {string} data.day * @param {string} data.minerId * @param {number[]} data.timeToFirstByteP50 */ -const givenRetrievalTimings = async (pgPool, { day, minerId, timeToFirstByteP50 }) => { - await pgPool.query( +const givenRetrievalTimings = async (pg, { day, minerId, timeToFirstByteP50 }) => { + await pg.query( 'INSERT INTO retrieval_timings (day, miner_id, ttfb_p50) VALUES ($1, $2, $3)', [day, minerId ?? 'f1test', timeToFirstByteP50] ) diff --git a/stats/test/platform-routes.test.js b/stats/test/platform-routes.test.js index d1542dd..644c19e 100644 --- a/stats/test/platform-routes.test.js +++ b/stats/test/platform-routes.test.js @@ -1,37 +1,39 @@ import assert from 'node:assert' -import { getPgPools } from '@filecoin-station/spark-stats-db' +import { beforeEach, describe, it } from 'mocha' import { assertResponseStatus } from './test-helpers.js' import { createApp } from '../lib/app.js' import { getLocalDayAsISOString, today, yesterday } from '../lib/request-helpers.js' -import { givenDailyParticipants, givenDailyDesktopUsers } from '@filecoin-station/spark-stats-db/test-helpers.js' describe('Platform Routes HTTP request handler', () => { - /** @type {import('@filecoin-station/spark-stats-db').PgPools} */ - let pgPools + /** @type {import('fastify').FastifyInstance} */ let app /** @type {string} */ let baseUrl + let pgPools before(async () => { - pgPools = await getPgPools() + // Use test database connection strings + const DATABASE_URL = 'postgres://postgres:postgres@localhost:5432/spark_stats' + const EVALUATE_DB_URL = 'postgres://postgres:postgres@localhost:5432/spark_evaluate' - app = createApp({ + // Await the app creation since it's async + app = await createApp({ SPARK_API_BASE_URL: 'https://api.filspark.com/', - pgPools, + DATABASE_URL, + EVALUATE_DB_URL, logger: { level: process.env.DEBUG === '*' || process.env.DEBUG?.includes('test') ? 'debug' : 'error' } }) - - baseUrl = await app.listen() + pgPools = app.pg + baseUrl = await app.listen({ port: 0 }) // Use random port for tests }) after(async () => { - await app.close() - await pgPools.end() + await app?.close() }) beforeEach(async () => { @@ -49,9 +51,97 @@ describe('Platform Routes HTTP request handler', () => { await pgPools.stats.query('DELETE FROM daily_desktop_users') }) + // Helper functions updated to use app's database connections + const givenDailyMeasurementsSummary = async (summaryData) => { + const processedSummaryData = summaryData.map(row => ({ + day: row.day, + accepted_measurement_count: row.accepted_measurement_count ?? 100, + total_measurement_count: row.total_measurement_count ?? 120, + station_count: row.station_count ?? 10, + participant_address_count: row.participant_address_count ?? 5, + inet_group_count: row.inet_group_count ?? 8 + })) + + await pgPools.evaluate.query(` + INSERT INTO daily_platform_stats ( + day, + accepted_measurement_count, + total_measurement_count, + station_count, + participant_address_count, + inet_group_count + ) + SELECT + UNNEST($1::date[]) AS day, + UNNEST($2::int[]) AS accepted_measurement_count, + UNNEST($3::int[]) AS total_measurement_count, + UNNEST($4::int[]) AS station_count, + UNNEST($5::int[]) AS participant_address_count, + UNNEST($6::int[]) AS inet_group_count + ON CONFLICT DO NOTHING + `, [ + processedSummaryData.map(s => s.day), + processedSummaryData.map(s => s.accepted_measurement_count), + processedSummaryData.map(s => s.total_measurement_count), + processedSummaryData.map(s => s.station_count), + processedSummaryData.map(s => s.participant_address_count), + processedSummaryData.map(s => s.inet_group_count) + ]) + } + + const givenMonthlyActiveStationCount = async (month, stationCount) => { + await pgPools.evaluate.query(` + INSERT INTO monthly_active_station_count (month, station_count) + VALUES ($1, $2) + ON CONFLICT DO NOTHING + `, [ + month, + stationCount + ]) + } + + const givenDailyRewardTransferMetrics = async (day, transferStats) => { + await pgPools.stats.query(` + INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block) + SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount, UNNEST($4::int[]) AS last_checked_block + ON CONFLICT DO NOTHING + `, [ + day, + transferStats.map(s => s.toAddress), + transferStats.map(s => s.amount), + transferStats.map(s => s.lastCheckedBlock) + ]) + } + + // Helper function for participants + const givenDailyParticipants = async (day, participantAddresses) => { + // This is a simplified implementation - you may need to adjust based on your actual schema + for (const address of participantAddresses) { + await pgPools.evaluate.query(` + WITH participant AS ( + INSERT INTO participants (participant_address) + VALUES ($1) + ON CONFLICT (participant_address) DO UPDATE SET participant_address = EXCLUDED.participant_address + RETURNING id + ) + INSERT INTO daily_participants (day, participant_id) + SELECT $2, id FROM participant + ON CONFLICT DO NOTHING + `, [address, day]) + } + } + + const givenDailyDesktopUsers = async (day, count) => { + await pgPools.stats.query(` + INSERT INTO daily_desktop_users (day, user_count) + VALUES ($1, $2) + ON CONFLICT DO NOTHING + `, [day, count]) + } + describe('GET /stations/daily', () => { it('returns daily station metrics for the given date range', async () => { - await givenDailyMeasurementsSummary(pgPools.evaluate, [ + await givenDailyMeasurementsSummary([ { day: '2024-01-10', station_count: 3 }, { day: '2024-01-11', station_count: 1 }, { day: '2024-01-12', station_count: 2 }, @@ -78,12 +168,12 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /stations/monthly', () => { it('returns monthly station metrics for the given date range', async () => { // before the date range - await givenMonthlyActiveStationCount(pgPools.evaluate, '2023-12-01', 10) + await givenMonthlyActiveStationCount('2023-12-01', 10) // in the date range - await givenMonthlyActiveStationCount(pgPools.evaluate, '2024-01-01', 3) - await givenMonthlyActiveStationCount(pgPools.evaluate, '2024-02-01', 1) + await givenMonthlyActiveStationCount('2024-01-01', 3) + await givenMonthlyActiveStationCount('2024-02-01', 1) // after the date range - await givenMonthlyActiveStationCount(pgPools.evaluate, '2024-03-01', 5) + await givenMonthlyActiveStationCount('2024-03-01', 5) const res = await fetch( new URL( @@ -104,7 +194,7 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /measurements/daily', () => { it('returns daily total accepted measurement count for the given date range', async () => { - await givenDailyMeasurementsSummary(pgPools.evaluate, [ + await givenDailyMeasurementsSummary([ { day: '2024-01-10', accepted_measurement_count: 5, total_measurement_count: 6 }, { day: '2024-01-11', accepted_measurement_count: 1, total_measurement_count: 2 }, { day: '2024-01-12', accepted_measurement_count: 3, total_measurement_count: 4 }, @@ -210,17 +300,17 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /transfers/daily', () => { it('returns daily total Rewards sent for the given date range', async () => { - await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-10', [ + await givenDailyRewardTransferMetrics('2024-01-10', [ { toAddress: 'to1', amount: 100, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-11', [ + await givenDailyRewardTransferMetrics('2024-01-11', [ { toAddress: 'to2', amount: 150, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-12', [ + await givenDailyRewardTransferMetrics('2024-01-12', [ { toAddress: 'to2', amount: 300, lastCheckedBlock: 1 }, { toAddress: 'to3', amount: 250, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-13', [ + await givenDailyRewardTransferMetrics('2024-01-13', [ { toAddress: 'to1', amount: 100, lastCheckedBlock: 1 } ]) @@ -291,21 +381,21 @@ describe('Platform Routes HTTP request handler', () => { } it('returns top earning participants for the given date range', async () => { // First two dates should be ignored - await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-09', [ + await givenDailyRewardTransferMetrics('2024-01-09', [ { toAddress: 'address1', amount: 100, lastCheckedBlock: 1 }, { toAddress: 'address2', amount: 100, lastCheckedBlock: 1 }, { toAddress: 'address3', amount: 100, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-10', [ + await givenDailyRewardTransferMetrics('2024-01-10', [ { toAddress: 'address1', amount: 100, lastCheckedBlock: 1 } ]) // These should be included in the results - await givenDailyRewardTransferMetrics(pgPools.stats, oneWeekAgo, [ + await givenDailyRewardTransferMetrics(oneWeekAgo, [ { toAddress: 'address2', amount: 150, lastCheckedBlock: 1 }, { toAddress: 'address1', amount: 50, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPools.stats, today(), [ + await givenDailyRewardTransferMetrics(today(), [ { toAddress: 'address3', amount: 200, lastCheckedBlock: 1 }, { toAddress: 'address2', amount: 100, lastCheckedBlock: 1 } ]) @@ -332,7 +422,7 @@ describe('Platform Routes HTTP request handler', () => { it('returns top earning participants for the given date range with no existing reward transfers', async () => { await setupScheduledRewardsData() - await givenDailyRewardTransferMetrics(pgPools.stats, today(), [ + await givenDailyRewardTransferMetrics(today(), [ { toAddress: 'address1', amount: 100, lastCheckedBlock: 1 } ]) @@ -367,11 +457,7 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /participants/summary', () => { it('counts participants', async () => { - await givenDailyParticipants( - pgPools.evaluate, - '2000-01-01', - ['0x1', '0x2', '0x3'] - ) + await givenDailyParticipants('2000-01-01', ['0x1', '0x2', '0x3']) const res = await fetch( new URL('/participants/summary', baseUrl), { @@ -391,35 +477,15 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /participants/accumulative/daily', () => { it('counts accumulative daily participants', async () => { // 3 new participants, out of range - await givenDailyParticipants( - pgPools.evaluate, - '1999-01-01', - ['0x10', '0x20', '0x30'] - ) + await givenDailyParticipants('1999-01-01', ['0x10', '0x20', '0x30']) // 3 new participants, 1 old participant -> 6 - await givenDailyParticipants( - pgPools.evaluate, - '2000-01-01', - ['0x1', '0x2', '0x3', '0x10'] - ) + await givenDailyParticipants('2000-01-01', ['0x1', '0x2', '0x3', '0x10']) // 0 new participants, 2 old participants - await givenDailyParticipants( - pgPools.evaluate, - '2000-01-02', - ['0x1', '0x2'] - ) + await givenDailyParticipants('2000-01-02', ['0x1', '0x2']) // 1 new participant, 1 old participant -> 7 - await givenDailyParticipants( - pgPools.evaluate, - '2000-01-03', - ['0x1', '0x4'] - ) + await givenDailyParticipants('2000-01-03', ['0x1', '0x4']) // 1 new participant, out of range - await givenDailyParticipants( - pgPools.evaluate, - '2000-01-04', - ['0x5'] - ) + await givenDailyParticipants('2000-01-04', ['0x5']) const res = await fetch( new URL('/participants/accumulative/daily?from=2000-01-01&to=2000-01-03', baseUrl), { @@ -442,28 +508,12 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /stations/desktop/daily', () => { it('counts daily desktop users', async () => { // out of range - await givenDailyDesktopUsers( - pgPools.stats, - '1999-01-01', - 10 - ) + await givenDailyDesktopUsers('1999-01-01', 10) // in range - await givenDailyDesktopUsers( - pgPools.stats, - '2000-01-01', - 30 - ) - await givenDailyDesktopUsers( - pgPools.stats, - '2000-01-03', - 20 - ) + await givenDailyDesktopUsers('2000-01-01', 30) + await givenDailyDesktopUsers('2000-01-03', 20) // out of range - await givenDailyDesktopUsers( - pgPools.stats, - '2000-01-04', - 10 - ) + await givenDailyDesktopUsers('2000-01-04', 10) const res = await fetch( new URL('/stations/desktop/daily?from=2000-01-01&to=2000-01-03', baseUrl) @@ -477,64 +527,3 @@ describe('Platform Routes HTTP request handler', () => { }) }) }) - -const givenDailyMeasurementsSummary = async (pgPoolEvaluate, summaryData) => { - const processedSummaryData = summaryData.map(row => ({ - day: row.day, - accepted_measurement_count: row.accepted_measurement_count ?? 100, - total_measurement_count: row.total_measurement_count ?? 120, - station_count: row.station_count ?? 10, - participant_address_count: row.participant_address_count ?? 5, - inet_group_count: row.inet_group_count ?? 8 - })) - - await pgPoolEvaluate.query(` - INSERT INTO daily_platform_stats ( - day, - accepted_measurement_count, - total_measurement_count, - station_count, - participant_address_count, - inet_group_count - ) - SELECT - UNNEST($1::date[]) AS day, - UNNEST($2::int[]) AS accepted_measurement_count, - UNNEST($3::int[]) AS total_measurement_count, - UNNEST($4::int[]) AS station_count, - UNNEST($5::int[]) AS participant_address_count, - UNNEST($6::int[]) AS inet_group_count - ON CONFLICT DO NOTHING - `, [ - processedSummaryData.map(s => s.day), - processedSummaryData.map(s => s.accepted_measurement_count), - processedSummaryData.map(s => s.total_measurement_count), - processedSummaryData.map(s => s.station_count), - processedSummaryData.map(s => s.participant_address_count), - processedSummaryData.map(s => s.inet_group_count) - ]) -} - -const givenMonthlyActiveStationCount = async (pgPoolEvaluate, month, stationCount) => { - await pgPoolEvaluate.query(` - INSERT INTO monthly_active_station_count (month, station_count) - VALUES ($1, $2) - ON CONFLICT DO NOTHING - `, [ - month, - stationCount - ]) -} - -const givenDailyRewardTransferMetrics = async (pgPoolStats, day, transferStats) => { - await pgPoolStats.query(` - INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block) - SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount, UNNEST($4::int[]) AS last_checked_block - ON CONFLICT DO NOTHING - `, [ - day, - transferStats.map(s => s.toAddress), - transferStats.map(s => s.amount), - transferStats.map(s => s.lastCheckedBlock) - ]) -}