diff --git a/lib/spark.js b/lib/spark.js index 86e725d..05662c0 100644 --- a/lib/spark.js +++ b/lib/spark.js @@ -230,6 +230,11 @@ export default class Spark { participantAddress: Zinnia.walletAddress, stationId: Zinnia.stationId, } + if (task.isOnDemand) { + payload.excludeFromRSR = true + console.log('Note: This is an on-demand task; marking for RSR exclusion.') + } + console.log('%o', payload) const res = await this.#fetch('https://api.filspark.com/measurements', { method: 'POST', @@ -299,6 +304,14 @@ export default class Spark { } console.error(err) } + /** + * Public wrapper to inject an on-demand task + * + * @param {Task} task - A retrieval task with cid and minerId + */ + queueOnDemandRetrieval(task) { + return this.#tasker.queueOnDemandTask(task) + } } /** diff --git a/lib/tasker.js b/lib/tasker.js index 765a5b9..e3b6731 100644 --- a/lib/tasker.js +++ b/lib/tasker.js @@ -13,6 +13,7 @@ export class Tasker { #lastRoundUrl /** @type {Task[]} */ #remainingRoundTasks + #onDemandTasks #fetch #activity @@ -35,10 +36,17 @@ export class Tasker { // retrieval tasks we have already executed this.#lastRoundUrl = 'unknown' this.#remainingRoundTasks = [] + + this.#onDemandTasks = [] } /** @returns {Task | undefined} */ async next() { + if (this.#onDemandTasks.length > 0) { + console.log('Returning on-demand retrieval task.') + return this.#onDemandTasks.pop() + } + await this.#updateCurrentRound() return this.#remainingRoundTasks.pop() } @@ -88,6 +96,27 @@ export class Tasker { this.#lastRoundUrl = roundUrl } + /** + * Queue a retrieval task for immediate execution. This task will be processed + * before regular round-based tasks. + * + * @param {Task} task - A valid retrieval task ({ cid, minerId }) + */ + queueOnDemandTask(task) { + if ( + !task || + typeof task.cid !== 'string' || + typeof task.minerId !== 'string' + ) { + throw new Error( + 'Invalid on-demand task. Must include cid and minerId as strings.', + ) + } + // for future improvements prevent duplicates or apply rate limiting + console.log('Queued on-demand task for miner:', task.minerId) + task.isOnDemand = true + this.#onDemandTasks.push(task) + } } const textEncoder = new TextEncoder() diff --git a/scripts/trigger-on-demand.js b/scripts/trigger-on-demand.js new file mode 100644 index 0000000..be6a04b --- /dev/null +++ b/scripts/trigger-on-demand.js @@ -0,0 +1,12 @@ +// this’s a manual testing tool useful for demonstration +import Spark from '../lib/spark.js' + +const spark = new Spark() + +const testTask = { + cid: 'bafyreih4wq2ljuzhnn6pzl7tny7khzekqjx7yp6h5rvfbx2hrwrtp6mpcq', + minerId: 't01234', +} + +spark.queueOnDemandRetrieval(testTask) +console.log('On-demand task queued successfully.') diff --git a/test/spark.js b/test/spark.js index 9e62718..23d7b10 100644 --- a/test/spark.js +++ b/test/spark.js @@ -415,6 +415,46 @@ test('submitRetrieval', async () => { ]) }) +test('submitMeasurement adds excludeFromRSR for on-demand tasks', async () => { + let capturedPayload = null + + const spark = new Spark({ + fetch: async (url, allOpts) => { + if (url.includes('/measurements')) { + capturedPayload = JSON.parse(allOpts.body) + return { + status: 200, + ok: true, + async json() { + return { id: 456 } + }, + } + } + + // fallback response for anything else + return { + status: 200, + ok: true, + async json() { + return {} + }, + headers: new Headers(), + } + }, + }) + + const task = { + cid: 'bafytestondemand', + minerId: 't0999', + isOnDemand: true, + } + const stats = {} + + await spark.submitMeasurement(task, stats) + + assertEquals(capturedPayload.excludeFromRSR, true) +}) + test('calculateDelayBeforeNextTask() returns value based on average task duration', () => { const delay = calculateDelayBeforeNextTask({ lastTaskDurationInMs: 3_000, diff --git a/test/tasker.test.js b/test/tasker.test.js index d783cfc..3a5c4f2 100644 --- a/test/tasker.test.js +++ b/test/tasker.test.js @@ -2,7 +2,12 @@ import { test } from 'zinnia:test' import { assertEquals } from 'zinnia:assert' -import { getStationKey, getTaskKey, pickTasks } from '../lib/tasker.js' +import { + getStationKey, + getTaskKey, + pickTasksForNode, + Tasker, +} from '../lib/tasker.js' const RANDOMNESS = 'fc90e50dcdf20886b56c038b30fa921a5e57c532ea448dadcc209e44eec0445e' @@ -36,7 +41,7 @@ test('pickTasksForNode', async () => { { cid: 'bafytwo', minerId: 'f040' }, ] - const selectedTasks = await pickTasks({ + const selectedTasks = await pickTasksForNode({ tasks: allTasks, stationId: 'some-station-id', randomness: RANDOMNESS, @@ -49,3 +54,23 @@ test('pickTasksForNode', async () => { { cid: 'bafytwo', minerId: 'f020' }, ]) }) +test('on-demand tasks are returned before regular tasks', async () => { + const dummyFetch = async () => ({ + status: 302, + headers: new Map([['location', '/mock-round']]), + json: async () => ({ + retrievalTasks: [{ cid: 'bafyregular', minerId: 't0999' }], + maxTasksPerNode: 1, + }), + }) + + const tasker = new Tasker({ fetch: dummyFetch }) + + // Add an on-demand task first + const onDemandTask = { cid: 'bafyondemand', minerId: 't01234' } + tasker.queueOnDemandTask(onDemandTask) + + const task = await tasker.next() + assertEquals(task.cid, onDemandTask.cid) + assertEquals(task.minerId, onDemandTask.minerId) +})