diff --git a/.gitignore b/.gitignore index 2c36486..2e8dfdf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.log node_modules/ .cache/ +.env +api/data/ \ No newline at end of file diff --git a/api/README b/api/README new file mode 100644 index 0000000..51ce525 --- /dev/null +++ b/api/README @@ -0,0 +1,124 @@ +# API Prototype PR + +## Overview + +This is a prototype PR of an API that manages cryptocurrency metadata, current prices, and timeseries data. The API uses: + +- **PM2 Cluster** for load balancing and auto-restart. +- **Local Caching** for metadata (managed in a dedicated module). +- **Elasticsearch** for historical timeseries data. +- **Redis** for current price data. + +## Features + +- **PM2 Cluster Setup** + The API is designed to run in a PM2 cluster. Instance 0 is responsible for fetching the initial metadata refresh (if enabled) while all instances serve the endpoints. + +- **Local Cache Management** + Metadata is cached locally in a file (e.g., `api/data/metadata.json`). The cache is maintained via a dedicated module (`cache/metadataCache.js`). + **Note:** The metadata refresh cron is now managed externally (e.g., via Coolify) using a separate script. + +- **Normalized PID Mapping** + Coin IDs (pids) are normalized using the helper `normalizeCoinId` (from `utils/index.js`) and a mapping is maintained between the input pid and its normalized value. A maximum of 100 pids per request is allowed. + +## Endpoints + +### 1. Metadata Route + +**GET** `/api/coins/metadata?pid=bitcoin,ethereum` + +- **Description:** + Retrieves metadata for the specified coin IDs. + +- **Process:** + - The API checks the local cache for each pid (using normalized values). + - If metadata for a pid is missing, it returns `null` (no fallback to Elasticsearch is performed). + +### 2. Current Price Route + +**GET** `/api/coins/current?pid=bitcoin,ethereum&withMetadata=true&withTTL=true` + +- **Description:** + Retrieves the current price of the specified coins from Redis. + +- **Options:** + - `withTTL=true`: Includes TTL information. + +- **Process:** + - The API queries Redis using keys formatted as `price_`. + +### 3. Timeseries Route + +**GET** `/api/coins/timeseries?pid=bitcoin,ethereum&startDate=2023-01-01&endDate=2023-01-31&scale=5m` +or +**GET** `/api/coins/timeseries?pid=bitcoin,ethereum×tamp=1673000000` + +- **Description:** + Retrieves historical timeseries data for the specified coin IDs. + +- **Modes:** + - **Timestamp mode:** + If a `timestamp` (in seconds) is provided, the API returns, for each coin, the document whose `ts` is closest to that timestamp. + + - **Range mode:** + If `startDate` and `endDate` are provided, the API returns all matching records (grouped by coin). + + - **Scale mode:** + If a `scale` (e.g., "1m", "5m", "1h", "1d") is provided along with `startDate` and `endDate`, the API performs an aggregation using a `date_histogram`. + The response includes, for each coin, an array of objects per interval containing: + - `timestamp` (in seconds) + - `avg_price` + - `min_price` + - `max_price` + - `count` + + - **Note:** + A check is performed to avoid exceeding the maximum number of buckets (e.g., 5,000). If too many buckets would be created, an error is returned. + +### 4. Earliest Timestamp Route + +**GET** `/api/coins/earliest?pid=bitcoin,ethereum` + +- **Description:** + Retrieves, for each coin, the earliest available timeseries record (i.e., the record with the lowest timestamp). + +- **Process:** + - For each provided pid, the API queries Elasticsearch for the document with the smallest `ts`. + +### 5. Percentage Change Route + +**GET** `/api/coins/percentage-change?pid=bitcoin,ethereum×tamp=1656944730&period=3600&lookForward=true` + +- **Description:** + Calculates the percentage change in price for each coin between two points in time. + +- **Parameters:** + - `timestamp`: A reference timestamp in seconds (t₀). + - `period`: The period (in seconds) over which to calculate the change. + - `lookForward`: + - If `true`, the change is measured from t₀ to t₀ + period (i.e., forward in time). + - If `false` or not provided, the change is measured from t₀ to t₀ - period (i.e., backward in time). + +- **Process:** + - For each coin, the API retrieves the document closest to t₀ and another at t₀ + period (or t₀ - period), then calculates the percentage change as: + ``` + ((price_t1 - price_t0) / price_t0) * 100 + ``` + +## Setup Instructions + +1. Install dependencies: + ```sh + npm install + ``` + +2. Start the PM2 cluster: + ```sh + pm2 start ecosystem.config.js + ``` + +## Env Dependencies + ``` + REDIS_CLIENT_CONFIG= + COINS_ELASTICSEARCH_CONFIG= + ``` \ No newline at end of file diff --git a/api/cache/metadataCache.js b/api/cache/metadataCache.js new file mode 100644 index 0000000..8e9c897 --- /dev/null +++ b/api/cache/metadataCache.js @@ -0,0 +1,114 @@ +const fs = require('fs'); +const path = require('path'); +const { getClient } = require('../../db/elastic'); + +let localMetadataCache = {}; + +const DATA_DIR = path.join(__dirname, '../data'); +const METADATA_JSON_PATH = path.join(DATA_DIR, 'metadata.json'); + +function ensureCacheFileExists() { + if (!fs.existsSync(DATA_DIR)) { + fs.mkdirSync(DATA_DIR, { recursive: true }); + console.log("Data directory created:", DATA_DIR); + } + if (!fs.existsSync(METADATA_JSON_PATH)) { + fs.writeFileSync(METADATA_JSON_PATH, '{}', 'utf8'); + console.log("Created empty metadata file:", METADATA_JSON_PATH); + } +} + +function loadLocalMetadataCache() { + try { + ensureCacheFileExists(); + const data = fs.readFileSync(METADATA_JSON_PATH, 'utf8'); + localMetadataCache = JSON.parse(data); + console.log("Local metadata cache reloaded."); + } catch (err) { + console.error("Error loading metadata cache file:", err); + localMetadataCache = {}; + } +} + +function saveLocalMetadataCache() { + ensureCacheFileExists(); + fs.writeFile( + METADATA_JSON_PATH, + JSON.stringify(localMetadataCache, null, 2), + 'utf8', + (err) => { + if (err) { + console.error("Failed to write local metadata cache file:", err); + } else { + console.log(`Cache saved to ${METADATA_JSON_PATH}`); + } + } + ); +} + + +function initCacheWatcher() { + loadLocalMetadataCache(); + + fs.watchFile(METADATA_JSON_PATH, { interval: 1000 }, (curr, prev) => { + if (curr.mtimeMs !== prev.mtimeMs) { + console.log("Detected change in metadata file, reloading cache..."); + loadLocalMetadataCache(); + } + }); +} + +function getLocalCache() { + return localMetadataCache; +} + +function updateLocalCache(newCache) { + localMetadataCache = newCache; + saveLocalMetadataCache(); +} + +async function getAllMetadata() { + const client = getClient(); + if (!client) { + throw new Error('No Elasticsearch client available.'); + } + + const pageSize = 100000; + const allDocs = []; + + let response = await client.search({ + index: 'coins-metadata', + scroll: '1m', + size: pageSize, + body: { query: { match_all: {} } } + }); + + while (response.hits && response.hits.hits.length > 0) { + allDocs.push(...response.hits.hits.map(hit => hit._source)); + response = await client.scroll({ + scroll_id: response._scroll_id, + scroll: '1m' + }); + } + return allDocs; +} + +async function refreshLocalMetadataFromES() { + const allData = await getAllMetadata(); + console.log(`Fetched ${allData.length} metadata documents from ES.`); + + const newCache = { ...localMetadataCache }; + for (const item of allData) { + if (item.pid) { + newCache[item.pid.toLowerCase()] = item; + } + } + updateLocalCache(newCache); +} + +module.exports = { + initCacheWatcher, + getLocalCache, + updateLocalCache, + refreshLocalMetadataFromES +}; diff --git a/api/controllers/coins.js b/api/controllers/coins.js new file mode 100644 index 0000000..093c764 --- /dev/null +++ b/api/controllers/coins.js @@ -0,0 +1,119 @@ +const { getCurrentCoin } = require('../services/getCurrentCoin') +const { getCoinsEarliest } = require('../services/getEarliestCoin') +const { getCoinMetadata } = require('../services/getMetadataCoin') +const { getTimeseries } = require('../services/getTimeseriesCoin') +const { getPercentageChange } = require('../services/getPercentageChangeCoin') + +function sendResponse(res, status, data) { + res.statusCode = status; + res.header('Content-Type', 'application/json'); + res.send(JSON.stringify(data)); +} + +/** + * GET /metadata + * Example: /api/coins/metadata?pid=bitcoin,ethereum + * This endpoint retrieves metadata from the local cache or Elasticsearch for the given pid(s). + */ +async function getCoinsMetadata(req, res) { + try { + const { pid } = req.query; + if (!pid) { + return sendResponse(res, 400, { error: "Missing 'pid' query parameter." }); + } + const data = getCoinMetadata({ pid }); + return sendResponse(res, 200, data); + } catch (error) { + return sendResponse(res, 400, { error: error.message || 'Internal Server Error' }); + } +} + +/** + * GET /current + * Example: /api/coins/current?pid=bitcoin,ethereum&withMetadata=true&withTTL=true + * This endpoint retrieves current coin data from Redis for the given pid(s). + * It can also include metadata or TTL if specified in the query. + */ +async function getCoinsCurrent(req, res) { + try { + const { pid, withTTL = 'false' } = req.query; + if (!pid) { + return sendResponse(res, 400, { error: "Missing 'pid' query parameter." }); + } + const includeTTL = (withTTL === 'true'); + const data = await getCurrentCoin(pid, { withTTL: includeTTL }); + return sendResponse(res, 200, data); + } catch (error) { + return sendResponse(res, 400, { error: error.message || 'Internal Server Error' }); + } +} + +/** + * GET /timeseries + * Example: /api/coins/timeseries?pid=bitcoin&startDate=2023-01-01&endDate=2023-01-31 + * Or: /api/coins/timeseries?pid=bitcoin,ethereum×tamp=1673000000 + * + * pid is required. + * If 'timestamp' is provided, for each pid we return the single document whose 'ts' is closest to that timestamp. + * If 'timestamp' is not provided, we fetch all documents (optionally constrained by startDate/endDate), grouped by pid and sorted by ts ascending. + */ +async function getCoinsTimeseries(req, res) { + try { + const { pid, startDate, endDate, timestamp, scale } = req.query; + if (!pid) { + return sendResponse(res, 400, { error: "Missing 'pid' query parameter." }); + } + const data = await getTimeseries({ pid, startDate, endDate, timestamp, scale }); + return sendResponse(res, 200, data); + } catch (error) { + return sendResponse(res, 400, { error: error.message || 'Internal Server Error' }); + } +} + +/** + * GET /earliest + * Example: /api/coins/earliest?pid=bitcoin,ethereum + * Returns, for each coin, the earliest record (i.e. with the lowest timestamp). + */ +async function getCoinsFirstTimestamp(req, res) { + try { + const { pid } = req.query; + if (!pid) { + return sendResponse(res, 400, { error: "Missing 'pid' query parameter." }); + } + const data = await getCoinsEarliest({ pid }); + return sendResponse(res, 200, data); + } catch (error) { + return sendResponse(res, 400, { error: error.message || 'Internal Server Error' }); + } +} + +/** + * GET /percentage-change + * Example: /api/coins/percentage-change?pid=bitcoin,ethereum×tamp=1656944730&period=3600&lookForward=true + * Parameters: + * - pid: list of coins. + * - timestamp: a reference timestamp in seconds. + * - period: period (in seconds) to calculate the change. + * - lookForward: if true, change is from t0 to t0 + period; otherwise from t0 to t0 - period. + */ +async function getCoinsPercentageChange(req, res) { + try { + const { pid, timestamp, period, lookForward } = req.query; + if (!pid || !timestamp || !period) { + return sendResponse(res, 400, { error: "Missing required query parameters: pid, timestamp, period." }); + } + const data = await getPercentageChange({ pid, timestamp, period, lookForward }); + return sendResponse(res, 200, data); + } catch (error) { + return sendResponse(res, 400, { error: error.message || 'Internal Server Error' }); + } +} + +module.exports = { + getCoinsMetadata, + getCoinsCurrent, + getCoinsTimeseries, + getCoinsFirstTimestamp, + getCoinsPercentageChange +}; \ No newline at end of file diff --git a/api/index.js b/api/index.js new file mode 100644 index 0000000..c2fee5c --- /dev/null +++ b/api/index.js @@ -0,0 +1,45 @@ +const { Server } = require('hyper-express'); +const fs = require('fs'); +const path = require('path'); +const coinsRoutes = require('./routes/coinsRoutes'); +const { refreshLocalMetadataFromES } = require('./cache/metadataCache'); + +const node_instance = process.env.NODE_APP_INSTANCE || 0; + +if (node_instance == 0) { + const dataDir = path.join(__dirname, 'data'); + const metadataFilePath = path.join(dataDir, 'metadata.json'); + if (!fs.existsSync(dataDir)) { + fs.mkdirSync(dataDir, { recursive: true }); + console.log("Data directory created:", dataDir); + } + if (!fs.existsSync(metadataFilePath)) { + fs.writeFileSync(metadataFilePath, '{}', 'utf8'); + console.log("Created empty metadata file:", metadataFilePath); + } + + const metadataContent = fs.readFileSync(metadataFilePath, 'utf8'); + if (metadataContent.trim() === '{}' || metadataContent.trim() === '') { + console.log("Metadata file is empty, refreshing from Elasticsearch..."); + refreshLocalMetadataFromES(); + } +} + +const app = new Server(); + +app.use(async (req, res, next) => { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-Api-Key'); + if (req.method === 'OPTIONS') { + return res.send(200); + } + next(); +}); + +app.use('/api/coins', coinsRoutes); + +const PORT = process.env.PORT || 3000; +app.listen(PORT, () => { + console.log(`Server listening on port http://127.0.0.1:${PORT}/api/coins`); +}); diff --git a/api/routes/coinsRoutes.js b/api/routes/coinsRoutes.js new file mode 100644 index 0000000..3a9314a --- /dev/null +++ b/api/routes/coinsRoutes.js @@ -0,0 +1,12 @@ +const { Router } = require('hyper-express'); +const coins = require('../controllers/coins'); + +const router = new Router(); + +router.get('/metadata', coins.getCoinsMetadata); +router.get('/current', coins.getCoinsCurrent); +router.get('/timeseries', coins.getCoinsTimeseries); +router.get('/earliest', coins.getCoinsFirstTimestamp); +router.get('/percentage-change', coins.getCoinsPercentageChange); + +module.exports = router; diff --git a/api/services/getCurrentCoin.js b/api/services/getCurrentCoin.js new file mode 100644 index 0000000..735d835 --- /dev/null +++ b/api/services/getCurrentCoin.js @@ -0,0 +1,66 @@ +const { getMultipleKeyDetails } = require('../../db/redis'); +const { getCoinMetadata } = require('./getMetadataCoin') +const { getEffectivePids, parsePidMapping } = require('../../utils/index') + +/** + * Retrieve the current coin data from Redis. + * Always attach metadata using getCoinMetadata. + * Returns an object mapping the original PID to its coin data. + */ +async function getCurrentCoin(pidString, options = {}) { + const { withTTL = false } = options; + const { mapping, normalizedPids } = parsePidMapping(pidString); + if (!normalizedPids.length) { + throw new Error("The 'pid' query parameter must contain at least one valid pid."); + } + + const metadataMap = getCoinMetadata({ pid: pidString }); + for (const originalPid in mapping) { + if (!metadataMap[originalPid]) { + metadataMap[originalPid] = {}; + } + } + + const unionCandidatesSet = new Set(); + for (const originalPid in mapping) { + const effectiveCandidates = getEffectivePids(originalPid, mapping, metadataMap); + effectiveCandidates.forEach(candidate => unionCandidatesSet.add(candidate)); + } + + const unionCandidates = Array.from(unionCandidatesSet); + const redisKeys = unionCandidates.map(pid => `price_${pid}`); + const coinsData = await getMultipleKeyDetails(redisKeys, withTTL); + + const coins = {}; + for (const originalPid in mapping) { + const candidates = getEffectivePids(originalPid, mapping, metadataMap); + let data = null; + for (const candidate of candidates) { + const redisKey = `price_${candidate}`; + if (coinsData[redisKey] && coinsData[redisKey].value && Object.keys(coinsData[redisKey].value).length > 0) { + data = coinsData[redisKey]; + break; + } + } + if (!data) { + data = { value: {}, ttl: null }; + } + const { price, confidence, source } = data.value; + const { address, symbol, decimals, chain } = metadataMap[originalPid] + coins[originalPid] = { + pid: originalPid, + address, + symbol, + chain, + decimals, + price, + confidence, + source, + ttl: withTTL ? data.ttl : undefined, + }; + } + + return { coins }; +} + +module.exports = { getCurrentCoin } \ No newline at end of file diff --git a/api/services/getEarliestCoin.js b/api/services/getEarliestCoin.js new file mode 100644 index 0000000..ad0c62b --- /dev/null +++ b/api/services/getEarliestCoin.js @@ -0,0 +1,75 @@ +const { getClient } = require('../../db/elastic'); +const { getCoinMetadata } = require('./getMetadataCoin') +const { getEffectivePids, parsePidMapping } = require('../../utils/index') + +function filterTimeseriesDoc(doc) { + if (!doc) return null; + return { + pid: doc.pid, + ts: doc.ts, + price: doc.price, + confidence: doc.confidence + }; +} + +async function getEarliestRecord(client, pid) { + const query = { + index: 'coins-timeseries-*', + size: 1, + body: { + query: { term: { pid } }, + sort: [{ ts: { order: 'asc' } }] + } + }; + const response = await client.search(query); + return filterTimeseriesDoc(response.hits?.hits[0]?._source) || null; +} + +/** + * Retrieve, for each coin (by original PID), the earliest record along with metadata. + */ +async function getCoinsEarliest({ pid }) { + const client = getClient(); + if (!client) { + throw new Error('No Elasticsearch client available.'); + } + const { mapping, normalizedPids } = parsePidMapping(pid); + if (!normalizedPids.length) { + throw new Error("No valid 'pid' provided."); + } + + const metadataMap = getCoinMetadata({ pid }); + for (const originalPid in mapping) { + if (!metadataMap[originalPid]) { + metadataMap[originalPid] = {}; + } + } + + const coins = {}; + for (const originalPid in mapping) { + const effectiveCandidates = getEffectivePids(originalPid, mapping, metadataMap); + let earliestDoc = null; + for (const candidate of effectiveCandidates) { + earliestDoc = await getEarliestRecord(client, candidate); + if (earliestDoc) break; + } + + if (!earliestDoc) { + coins[originalPid] = { pid: originalPid }; + } else { + const { ts: timestamp, price } = earliestDoc; + coins[originalPid] = { + pid: originalPid, + address: metadataMap[originalPid].address, + symbol: metadataMap[originalPid].symbol, + chain: metadataMap[originalPid].chain, + decimals: metadataMap[originalPid].decimals, + timestamp, + price + }; + } + } + return { coins }; +} + +module.exports = { getCoinsEarliest } \ No newline at end of file diff --git a/api/services/getMetadataCoin.js b/api/services/getMetadataCoin.js new file mode 100644 index 0000000..5dfd0fc --- /dev/null +++ b/api/services/getMetadataCoin.js @@ -0,0 +1,37 @@ +const { initCacheWatcher, getLocalCache } = require('../cache/metadataCache'); +const { parsePidMapping } = require('../../utils/index') + +initCacheWatcher(); + +function filterMetadata(meta) { + if (!meta) return null; + return { + pid: meta.pid, + address: meta.address, + symbol: meta.symbol, + decimals: meta.decimals, + chain: meta.chain, + redirects: meta.redirects + }; +} + +/** + * Retrieve coin metadata from the local cache. + * Returns an object mapping the original PID (input) to its filtered metadata. + * If metadata for a normalized PID is missing, the value is null. + */ +function getCoinMetadata(query) { + const { mapping, normalizedPids } = parsePidMapping(query.pid); + if (!normalizedPids.length) { + throw new Error("Parameter 'pid' is required for metadata."); + } + const localCache = getLocalCache(); + const resultMap = {}; + for (const originalPid in mapping) { + const normalized = mapping[originalPid]; + resultMap[originalPid] = filterMetadata(localCache[normalized]) || null; + } + return resultMap; +} + +module.exports = { getCoinMetadata } \ No newline at end of file diff --git a/api/services/getPercentageChangeCoin.js b/api/services/getPercentageChangeCoin.js new file mode 100644 index 0000000..6e59554 --- /dev/null +++ b/api/services/getPercentageChangeCoin.js @@ -0,0 +1,122 @@ +const { getClient } = require('../../db/elastic'); +const { getCoinMetadata } = require('./getMetadataCoin') +const { getEffectivePids, parsePidMapping } = require('../../utils/index') + +function filterTimeseriesDoc(doc) { + if (!doc) return null; + return { + pid: doc.pid, + ts: doc.ts, + price: doc.price, + confidence: doc.confidence + }; +} + +async function findClosestDocForPid(client, pid, timestampMs) { + const query = { + index: 'coins-timeseries-*', + size: 1, + body: { + query: { + bool: { + must: [{ term: { pid } }] + } + }, + sort: [ + { + _script: { + type: "number", + script: { + lang: "painless", + source: "Math.abs(doc['ts'].value.toInstant().toEpochMilli() - params.target)", + params: { target: timestampMs } + }, + order: "asc" + } + } + ] + } + }; + const response = await client.search(query); + return filterTimeseriesDoc(response.hits?.hits[0]?._source) || null; +} + +/** + * Retrieve percentage change in price between two timestamps. + * Parameters: + * - pid: list of coins. + * - timestamp: reference timestamp in seconds. + * - period: period in seconds to look forward (if lookForward is true) or backward. + * - lookForward: if true, change is calculated from t0 to t0 + period, otherwise from t0 to t0 - period. + */ +async function getPercentageChange({ pid, timestamp, period, lookForward }) { + const client = getClient(); + if (!client) { + throw new Error('No Elasticsearch client available.'); + } + const { mapping, normalizedPids } = parsePidMapping(pid); + if (!normalizedPids.length) { + throw new Error("No valid 'pid' provided."); + } + + const t0 = parseInt(timestamp, 10) * 1000; + if (isNaN(t0)) { + throw new Error("Invalid timestamp format. It should be an integer in seconds."); + } + + const periodSec = Number(period); + if (isNaN(periodSec)) { + throw new Error("Invalid period format. It should be a number representing seconds."); + } + + const t1 = (lookForward === 'true' || lookForward === true) + ? t0 + periodSec * 1000 + : t0 - periodSec * 1000; + + const metadataMap = getCoinMetadata({ pid }); + for (const originalPid in mapping) { + if (!metadataMap[originalPid]) { + metadataMap[originalPid] = {}; + } + } + + const coins = {}; + for (const originalPid in mapping) { + const effectiveCandidates = getEffectivePids(originalPid, mapping, metadataMap); + let doc0 = null, doc1 = null; + for (const candidate of effectiveCandidates) { + doc0 = await findClosestDocForPid(client, candidate, t0); + if (doc0) break; + } + for (const candidate of effectiveCandidates) { + doc1 = await findClosestDocForPid(client, candidate, t1); + if (doc1) break; + } + + const { address, symbol, decimals, chain } = metadataMap[originalPid] + + if (!doc0 || !doc1 || doc0.price === 0) { + coins[originalPid] = { + pid: originalPid, + address, + symbol, + chain, + decimals, + percentageChange: null, + }; + } else { + const percentageChange = ((doc1.price - doc0.price) / doc0.price) * 100; + coins[originalPid] = { + pid: originalPid, + address, + symbol, + chain, + decimals, + percentageChange, + }; + } + } + return { coins }; +} + +module.exports = { getPercentageChange } \ No newline at end of file diff --git a/api/services/getTimeseriesCoin.js b/api/services/getTimeseriesCoin.js new file mode 100644 index 0000000..11e0d75 --- /dev/null +++ b/api/services/getTimeseriesCoin.js @@ -0,0 +1,222 @@ +const { getClient } = require('../../db/elastic'); +const { getCoinMetadata } = require('./getMetadataCoin') +const { getEffectivePids, parsePidMapping } = require('../../utils/index') + +function filterTimeseriesDoc(doc) { + if (!doc) return null; + return { + pid: doc.pid, + ts: doc.ts, + price: doc.price, + confidence: doc.confidence + }; +} + +async function findClosestDocForPid(client, pid, timestampMs) { + const query = { + index: 'coins-timeseries-*', + size: 1, + body: { + query: { + bool: { + must: [{ term: { pid } }] + } + }, + sort: [ + { + _script: { + type: "number", + script: { + lang: "painless", + source: "Math.abs(doc['ts'].value.toInstant().toEpochMilli() - params.target)", + params: { target: timestampMs } + }, + order: "asc" + } + } + ] + } + }; + const response = await client.search(query); + return filterTimeseriesDoc(response.hits?.hits[0]?._source) || null; +} + +/** + * Convert a scale string (e.g., "1m", "5m", "1h", "1d") to milliseconds. + */ +function scaleToMillis(scale) { + const match = scale.match(/^(\d+)([mhd])$/); + if (!match) { + throw new Error("Invalid scale format. Use e.g., '1m', '5m', '1h', or '1d'."); + } + const value = parseInt(match[1], 10); + const unit = match[2]; + switch (unit) { + case 'm': return value * 60 * 1000; + case 'h': return value * 60 * 60 * 1000; + case 'd': return value * 24 * 60 * 60 * 1000; + default: throw new Error("Unsupported time unit in scale."); + } +} + +/** + * Retrieve timeseries data: + * - If a timestamp (in seconds) is provided, for each PID return the document whose ts is closest. + * - Otherwise, if startDate/endDate are provided as date strings (e.g., "2024-06-16"), + * convert them to UTC (start: T00:00:00Z, end: T23:59:59Z), convert to ms, + * perform a range query and group the matching documents by the original PID. + * Also, metadata is always attached. + */ +async function getTimeseries({ pid, startDate, endDate, timestamp, scale }) { + const client = getClient(); + if (!client) { + throw new Error('No Elasticsearch client available.'); + } + + const { mapping, normalizedPids } = parsePidMapping(pid); + if (!normalizedPids.length) { + throw new Error("No valid 'pid' provided."); + } + + const metadata = getCoinMetadata({ pid }); + for (const originalPid in mapping) { + if (!metadata[originalPid]) { + metadata[originalPid] = {}; + } + } + + let timeseriesData; + + // 1. Mode Timestamp + if (timestamp) { + const timestampMs = parseInt(timestamp, 10) * 1000; + if (isNaN(timestampMs)) { + throw new Error('Invalid timestamp format. It should be an integer in seconds.'); + } + const results = {}; + for (const originalPid in mapping) { + const candidates = getEffectivePids(originalPid, mapping, metadata); + let doc = null; + for (const candidate of candidates) { + doc = await findClosestDocForPid(client, candidate, timestampMs); + if (doc) break; + } + results[originalPid] = doc; + } + timeseriesData = results; + } + // 2. Mode Scale aggregation + else { + if (!scale) { + scale = "1h"; + } + + let startMs, endMs; + const now = Date.now(); + if (!startDate && !endDate) { + endMs = now; + startMs = now - 24 * 60 * 60 * 1000; + } else if (startDate && !endDate) { + startMs = new Date(startDate + "T00:00:00Z").getTime(); + endMs = now; + } else { + startMs = new Date(startDate + "T00:00:00Z").getTime(); + endMs = new Date(endDate + "T23:59:59Z").getTime(); + } + + const effectiveMap = {}; + const candidateToOriginal = {}; + for (const originalPid in mapping) { + const eff = getEffectivePids(originalPid, mapping, metadata); + effectiveMap[originalPid] = eff; + eff.forEach(candidate => { + candidateToOriginal[candidate] = originalPid; + }); + } + const unionCandidates = [...new Set(Object.values(effectiveMap).flat())]; + + const scaleMs = scaleToMillis(scale); + const expectedBuckets = Math.ceil((endMs - startMs) / scaleMs); + const maxBuckets = 5000; + if (expectedBuckets > maxBuckets) { + throw new Error(`Too many buckets expected (${expectedBuckets}, limit: ${maxBuckets}). Please use a larger scale.`); + } + + const queryBody = { + query: { + bool: { + must: [ + { range: { ts: { gte: startMs, lte: endMs } } }, + { terms: { pid: unionCandidates } } + ] + } + }, + aggs: { + by_pid: { + terms: { + field: "pid", + size: unionCandidates.length + }, + aggs: { + by_interval: { + date_histogram: { + field: "ts", + fixed_interval: scale, + min_doc_count: 0, + extended_bounds: { min: startMs, max: endMs } + }, + aggs: { + avg_price: { avg: { field: "price" } }, + min_price: { min: { field: "price" } }, + max_price: { max: { field: "price" } }, + avg_confidence: { avg: { field: "confidence" } } + } + } + } + } + }, + size: 0 + }; + + const resp = await client.search({ + index: 'coins-timeseries-*', + body: queryBody + }); + + const buckets = resp.aggregations?.by_pid?.buckets || []; + const results = {}; + for (const bucket of buckets) { + const bucketPid = bucket.key; + const originalPid = Object.keys(mapping).find(key => getEffectivePids(key, mapping, metadata)[0] === bucketPid) || bucketPid; + results[originalPid] = bucket.by_interval.buckets.map(b => ({ + timestamp: b.key / 1000, + avg_price: b.avg_price.value, + min_price: b.min_price.value, + max_price: b.max_price.value, + confidence: b.avg_confidence.value, + count: b.doc_count + })); + } + for (const originalPid in mapping) { + if (!results[originalPid]) results[originalPid] = []; + } + timeseriesData = results; + } + + const coins = {}; + for (const originalPid in mapping) { + const { address, symbol, decimals, chain } = metadata[originalPid] + coins[originalPid] = { + pid: originalPid, + address, + symbol, + chain, + decimals, + timeseries: timeseriesData[originalPid] || [] + }; + } + + return { coins }; +} + +module.exports = { getTimeseries } \ No newline at end of file diff --git a/db/elastic.js b/db/elastic.js index 7cbcf1c..97a7161 100644 --- a/db/elastic.js +++ b/db/elastic.js @@ -45,4 +45,41 @@ function addYearAndMonth(index) { return `${index}-${date.getUTCFullYear()}-${date.getUTCMonth()}` } -module.exports = { getClient, writeLog } \ No newline at end of file +async function getMetadataForPids(pids) { + const client = getClient(); + if (!client) { + console.error('No Elasticsearch client available.'); + return {}; + } + + try { + const result = await client.search({ + index: 'coins-metadata', + size: pids.length, + body: { + query: { + bool: { + should: pids.map(pid => ({ term: { pid } })), + minimum_should_match: 1 + } + } + } + }); + + const metadataMap = {}; + if (result.hits && result.hits.hits.length) { + for (const hit of result.hits.hits) { + const src = hit._source; + if (src && src.pid) { + metadataMap[src.pid] = src; + } + } + } + return metadataMap; + } catch (error) { + console.error('Error in getMetadataForPids:', error); + return {}; + } +} + +module.exports = { getClient, writeLog, getMetadataForPids } \ No newline at end of file diff --git a/db/redis.js b/db/redis.js new file mode 100644 index 0000000..1abc629 --- /dev/null +++ b/db/redis.js @@ -0,0 +1,66 @@ +const Redis = require("ioredis"); + +let redisClient; + +function getRedis() { + if (!redisClient) { + const redisConfig = process.env.REDIS_CLIENT_CONFIG; + if (!redisConfig) { + throw new Error("Missing REDIS_CLIENT_CONFIG"); + } + const [host, port, password] = redisConfig.split("---"); + redisClient = new Redis({ + host, + port: Number(port), + password + }); + } + return redisClient; +} + +async function listKeys() { + const client = getRedis(); + try { + const keys = await client.keys("*"); + console.log("Keys in Redis:", keys); + return keys; + } catch (error) { + console.error("Error listing keys:", error); + throw error; + } +} + +async function getKeyDetails(key, withTTL = false) { + const client = getRedis(); + try { + const value = await client.hgetall(key); + let ttl = null; + if (withTTL) { + ttl = await client.ttl(key); + } + return withTTL ? { value, ttl } : { value }; + } catch (error) { + console.error(`Error getting key ${key}:`, error); + throw error; + } +} + +async function getMultipleKeyDetails(keys, withTTL = false) { + const client = getRedis(); + try { + const results = {}; + for (const key of keys) { + const value = await client.hgetall(key); + results[key] = { value }; + if (withTTL) { + results[key].ttl = await client.ttl(key); + } + } + return results; + } catch (error) { + console.error("Error getting multiple keys:", error); + throw error; + } +} + +module.exports = { getRedis, listKeys, getKeyDetails, getMultipleKeyDetails }; diff --git a/ecosystem.config.js b/ecosystem.config.js new file mode 100644 index 0000000..cde479f --- /dev/null +++ b/ecosystem.config.js @@ -0,0 +1,15 @@ +module.exports = { + apps: [ + { + name: 'coins-api', + script: 'api/index.js', + instances: '4', + exec_mode: 'cluster', + autorestart: true, + watch: false, + env: { + NODE_ENV: 'production' + } + } + ] +}; diff --git a/package-lock.json b/package-lock.json index d240a62..c85dfec 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "hyper-express": "^6.17.3", "ioredis": "^5.5.0", "kafkajs": "^2.2.4", + "node-cron": "^3.0.3", "pm2": "^5.4.3", "progress": "^2.0.3" } @@ -3088,6 +3089,27 @@ "node": ">= 0.4.0" } }, + "node_modules/node-cron": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.3.tgz", + "integrity": "sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A==", + "license": "ISC", + "dependencies": { + "uuid": "8.3.2" + }, + "engines": { + "node": ">=6.0.0" + } + }, + "node_modules/node-cron/node_modules/uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", + "license": "MIT", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/normalize-path": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/normalize-path/-/normalize-path-3.0.0.tgz", diff --git a/package.json b/package.json index 1e73c5d..5582b3e 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,8 @@ "version": "1.0.0", "main": "index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "test": "echo \"Error: no test specified\" && exit 1", + "start": "node api/index.js" }, "repository": { "type": "git", @@ -24,6 +25,7 @@ "hyper-express": "^6.17.3", "ioredis": "^5.5.0", "kafkajs": "^2.2.4", + "node-cron": "^3.0.3", "pm2": "^5.4.3", "progress": "^2.0.3" } diff --git a/utils/index.js b/utils/index.js index 2145dd0..5e6f59f 100644 --- a/utils/index.js +++ b/utils/index.js @@ -1,4 +1,3 @@ - function normalizeCoinId(coinId) { coinId = coinId.toLowerCase() const replaceSubStrings = ['asset#', 'coingecko#', 'coingecko:', 'ethereum:'] @@ -20,7 +19,40 @@ async function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)) } +function parsePidMapping(pidInput) { + let inputArray = []; + if (typeof pidInput === 'string') { + inputArray = pidInput.split(',').map(p => p.trim()).filter(Boolean); + } else if (Array.isArray(pidInput)) { + inputArray = pidInput.map(p => p.trim()).filter(Boolean); + } + if (inputArray.length > 100) { + throw new Error("Maximum of 100 PID tokens allowed."); + } + const mapping = {}; + const normalizedPids = inputArray.map(pid => { + const normalized = normalizeCoinId(pid); + mapping[pid] = normalized; + return normalized; + }); + return { mapping, normalizedPids }; +} + +function getEffectivePids(originalPid, mapping, metadata) { + const normalized = mapping[originalPid]; + let candidates = []; + const meta = metadata[originalPid]; + if (meta && Array.isArray(meta.redirects) && meta.redirects.length > 0) { + candidates = [...meta.redirects]; + } + candidates.push(normalized); + return [...new Set(candidates)]; +} + + module.exports = { normalizeCoinId, sleep, + parsePidMapping, + getEffectivePids } \ No newline at end of file