Skip to content

Use @fastify/fastify-postgres. CheckerNetwork/roadmap#220 #341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
"mocha"
]
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @Goddhi

22 changes: 18 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion stats/bin/migrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please undo this change to keep the diff clean

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, noted

const pgPools = await getPgPools()
await migrateStatsDB(pgPools.stats)
await migrateEvaluateDB(pgPools.evaluate)
await migrateEvaluateDB(pgPools.evaluate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await migrateEvaluateDB(pgPools.evaluate)
await migrateEvaluateDB(pgPools.evaluate)

11 changes: 7 additions & 4 deletions stats/bin/spark-stats.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import '../lib/instrument.js'
import { createApp } from '../lib/app.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'

const {
PORT = '8080',
HOST = '127.0.0.1',
SPARK_API_BASE_URL = 'https://api.filspark.com/',
REQUEST_LOGGING = 'true'
REQUEST_LOGGING = 'true',
DATABASE_URL,
EVALUATE_DB_URL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add default values for both env variables:

Suggested change
DATABASE_URL,
EVALUATE_DB_URL
DATABASE_URL = 'postgres://localhost:5432/spark_stats',
EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate'

} = process.env

const pgPools = await getPgPools()

const app = await createApp({
SPARK_API_BASE_URL,
pgPools,
DATABASE_URL,
EVALUATE_DB_URL,
logger: {
level: ['1', 'true'].includes(REQUEST_LOGGING) ? 'info' : 'error'
}
})

console.log('Starting the http server on host %j port %s', HOST, PORT)
const baseUrl = app.listen({ port: Number(PORT), host: HOST })
console.log(baseUrl)

30 changes: 22 additions & 8 deletions stats/lib/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,41 @@ import * as Sentry from '@sentry/node'
import Fastify from 'fastify'
import cors from '@fastify/cors'
import urlData from '@fastify/url-data'
import fastifyPostgres from '@fastify/postgres'

import { addRoutes } from './routes.js'
import { addPlatformRoutes } from './platform-routes.js'

/** @typedef {import('@filecoin-station/spark-stats-db').PgPools} PgPools */
/** @typedef {import('./typings.js').DateRangeFilter} DateRangeFilter */

/**
* @param {object} args
* @param {string} args.SPARK_API_BASE_URL
* @param {import('@filecoin-station/spark-stats-db').PgPools} args.pgPools
* @param {Fastify.FastifyLoggerOptions} args.logger
* @returns
* @param {string} args.DATABASE_URL - Connection string for stats database
* @param {string} args.EVALUATE_DB_URL - Connection string for evaluate database
* @param {import('fastify').FastifyLoggerOptions} args.logger
* @returns {Promise<import('fastify').FastifyInstance>}
*/
export const createApp = ({

export const createApp = async ({
SPARK_API_BASE_URL,
pgPools,
DATABASE_URL,
EVALUATE_DB_URL,
logger
}) => {
const app = Fastify({ logger })
Sentry.setupFastifyErrorHandler(app)

await app.register(fastifyPostgres, {
connectionString: DATABASE_URL,
name: 'stats'
})

await app.register(fastifyPostgres, {
connectionString: EVALUATE_DB_URL,
name: 'evaluate',
})

app.register(cors, {
origin: [
'http://localhost:3000',
Expand All @@ -38,11 +51,12 @@ export const createApp = ({
]
})
app.register(urlData)
addRoutes(app, pgPools, SPARK_API_BASE_URL)
addPlatformRoutes(app, pgPools)
addRoutes(app, SPARK_API_BASE_URL)
addPlatformRoutes(app)
app.get('/', (request, reply) => {
reply.send('OK')
})

return app
}

22 changes: 11 additions & 11 deletions stats/lib/platform-routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,39 @@ import { filterPreHandlerHook, filterOnSendHook } from './request-helpers.js'

/** @typedef {import('./typings.js').RequestWithFilter} RequestWithFilter */

export const addPlatformRoutes = (app, pgPools) => {

export const addPlatformRoutes = (app) => {
app.register(async app => {
app.addHook('preHandler', filterPreHandlerHook)
app.addHook('onSend', filterOnSendHook)

app.get('/stations/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchDailyStationCount(pgPools.evaluate, request.filter))
})
app.get('/stations/monthly', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchMonthlyStationCount(pgPools.evaluate, request.filter))
reply.send(await fetchMonthlyStationCount(request.server.pg, request.filter))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reply.send(await fetchMonthlyStationCount(request.server.pg, request.filter))
reply.send(await fetchMonthlyStationCount(request.server.pg.evaluate, request.filter))

We could also pass correct instance of the database instead of passing the pg object. In that case we don't need to change fetchers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws an error reason why went with only the pg object, besides the fetcher functions expects full pg object and uses pg.stats or pg.evaluate interally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error does it throw?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @Goddhi

})
app.get('/stations/desktop/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchDailyDesktopUsers(pgPools.stats, request.filter))
reply.send(await fetchDailyDesktopUsers(request.server.pg, request.filter))
})
app.get('/measurements/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchDailyStationMeasurementCounts(pgPools.evaluate, request.filter))
reply.send(await fetchDailyStationMeasurementCounts(request.server.pg, request.filter))
})
app.get('/participants/top-measurements', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchParticipantsWithTopMeasurements(pgPools.evaluate, request.filter))
reply.send(await fetchParticipantsWithTopMeasurements(request.server.pg, request.filter))
})
app.get('/participants/top-earning', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchTopEarningParticipants(pgPools.stats, request.filter))
})
reply.send(await fetchTopEarningParticipants(request.server.pg, request.filter)) })

app.get('/participants/accumulative/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchAccumulativeDailyParticipantCount(pgPools.evaluate, request.filter))
reply.send(await fetchAccumulativeDailyParticipantCount(request.server.pg, request.filter))
})
app.get('/transfers/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchDailyRewardTransfers(pgPools.stats, request.filter))
reply.send(await fetchDailyRewardTransfers(request.server.pg, request.filter))
})
})

app.get('/participants/summary', async (request, reply) => {
reply.header('cache-control', `public, max-age=${24 * 3600 /* one day */}`)
reply.send(await fetchParticipantsSummary(pgPools.evaluate))
reply.send(await fetchParticipantsSummary(request.server.pg.evaluate))
})
}
66 changes: 37 additions & 29 deletions stats/lib/platform-stats-fetchers.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import assert from 'http-assert'
import { today, yesterday } from './request-helpers.js'

/** @typedef {import('@filecoin-station/spark-stats-db').Queryable} Queryable */
/**
@typedef {import('./typings.js').DateRangeFilter} DateRangeFilter
@typedef {import('@filecoin-station/spark-stats-db').Queryable} Queryable
@typedef {import('./typings.js').FastifyPg} FastifyPg
*/

/**
* @param {FastifyPg} pg - Fastify pg object with database connections
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* @param {FastifyPg} pg - Fastify pg object with database connections
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @Goddhi


const ONE_DAY = 24 * 60 * 60 * 1000

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchDailyStationCount = async (pgPool, filter) => {
const { rows } = await pgPool.query(`
export const fetchDailyStationCount = async (pg, filter) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case we pass correct database instance we don't need to change this handler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @Goddhi

const { rows } = await pg.evaluate.query(`
SELECT day::TEXT, station_count
FROM daily_platform_stats
WHERE day >= $1 AND day <= $2
Expand All @@ -20,11 +28,11 @@ export const fetchDailyStationCount = async (pgPool, filter) => {
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchMonthlyStationCount = async (pgPool, filter) => {
const { rows } = await pgPool.query(`
export const fetchMonthlyStationCount = async (pg, filter) => {
const { rows } = await pg.evaluate.query(`
SELECT month::TEXT, station_count
FROM monthly_active_station_count
WHERE
Expand All @@ -36,11 +44,11 @@ export const fetchMonthlyStationCount = async (pgPool, filter) => {
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchDailyStationMeasurementCounts = async (pgPool, filter) => {
const { rows } = await pgPool.query(`
export const fetchDailyStationMeasurementCounts = async (pg, filter) => {
const { rows } = await pg.evaluate.query(`
SELECT day::TEXT, accepted_measurement_count, total_measurement_count
FROM daily_platform_stats
WHERE day >= $1 AND day <= $2
Expand All @@ -50,31 +58,31 @@ export const fetchDailyStationMeasurementCounts = async (pgPool, filter) => {
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchParticipantsWithTopMeasurements = async (pgPool, filter) => {
export const fetchParticipantsWithTopMeasurements = async (pg, filter) => {
assert(filter.to === filter.from, 400, 'Multi-day queries are not supported for this endpoint')
assert(filter.to === yesterday(), 400, 'filter.to must be set to yesterday, other values are not supported yet')
// Ignore the filter for this query
// Get the top measurement stations from the Materialized View
return (await pgPool.query(`
return (await pg.evaluate.query(`
SELECT day::TEXT, participant_address, station_count, accepted_measurement_count, inet_group_count
FROM top_measurement_participants_yesterday_mv
`)).rows
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchDailyRewardTransfers = async (pgPool, filter) => {
export const fetchDailyRewardTransfers = async (pg, filter) => {
assert(
new Date(filter.to).getTime() - new Date(filter.from).getTime() <= 31 * ONE_DAY,
400,
'Date range must be 31 days max'
)
const { rows } = await pgPool.query(`
const { rows } = await pg.stats.query(`
SELECT day::TEXT, to_address, amount
FROM daily_reward_transfers
WHERE day >= $1 AND day <= $2
Expand All @@ -99,11 +107,11 @@ export const fetchDailyRewardTransfers = async (pgPool, filter) => {
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchAccumulativeDailyParticipantCount = async (pgPool, filter) => {
const { rows } = await pgPool.query(`
export const fetchAccumulativeDailyParticipantCount = async (pg, filter) => {
const { rows } = await pg.evaluate.query(`
WITH first_appearance AS (
SELECT participant_id, MIN(day) as day
FROM daily_participants
Expand All @@ -126,15 +134,15 @@ export const fetchAccumulativeDailyParticipantCount = async (pgPool, filter) =>
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchTopEarningParticipants = async (pgPool, filter) => {
export const fetchTopEarningParticipants = async (pg, filter) => {
// The query combines "transfers until filter.to" with "latest scheduled rewards as of today".
// As a result, it produces incorrect result if `to` is different from `now()`.
// See https://github.com/filecoin-station/spark-stats/pull/170#discussion_r1664080395
assert(filter.to === today(), 400, 'filter.to must be today, other values are not supported')
const { rows } = await pgPool.query(`
const { rows } = await pg.stats.query(`
WITH latest_scheduled_rewards AS (
SELECT DISTINCT ON (participant_address) participant_address, scheduled_rewards
FROM daily_scheduled_rewards
Expand All @@ -154,10 +162,10 @@ export const fetchTopEarningParticipants = async (pgPool, filter) => {
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
*/
export const fetchParticipantsSummary = async (pgPool) => {
const { rows } = await pgPool.query(`
export const fetchParticipantsSummary = async (pg) => {
const { rows } = await pg.evaluate.query(`
SELECT COUNT(DISTINCT participant_id) FROM daily_participants
`)
return {
Expand All @@ -166,11 +174,11 @@ export const fetchParticipantsSummary = async (pgPool) => {
}

/**
* @param {Queryable} pgPool
* @param {FastifyPg} pg
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchDailyDesktopUsers = async (pgPool, filter) => {
const { rows } = await pgPool.query(`
export const fetchDailyDesktopUsers = async (pg, filter) => {
const { rows } = await pg.stats.query(`
SELECT
day::TEXT,
user_count
Expand All @@ -180,4 +188,4 @@ export const fetchDailyDesktopUsers = async (pgPool, filter) => {
[filter.from, filter.to])

return rows
}
}
Loading
Loading