Skip to content

feat: support on-demand retrieval tasks without affecting RSR scoring #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
13 changes: 13 additions & 0 deletions lib/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ export default class Spark {
participantAddress: Zinnia.walletAddress,
stationId: Zinnia.stationId,
}
if (task.isOnDemand) {
payload.excludeFromRSR = true
console.log('Note: This is an on-demand task; marking for RSR exclusion.')
}

console.log('%o', payload)
const res = await this.#fetch('https://api.filspark.com/measurements', {
method: 'POST',
Expand Down Expand Up @@ -299,6 +304,14 @@ export default class Spark {
}
console.error(err)
}
/**
* Public wrapper to inject an on-demand task
*
* @param {Task} task - A retrieval task with cid and minerId
*/
queueOnDemandRetrieval(task) {
return this.#tasker.queueOnDemandTask(task)
}
}

/**
Expand Down
29 changes: 29 additions & 0 deletions lib/tasker.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class Tasker {
#lastRoundUrl
/** @type {Task[]} */
#remainingRoundTasks
#onDemandTasks
#fetch
#activity

Expand All @@ -35,10 +36,17 @@ export class Tasker {
// retrieval tasks we have already executed
this.#lastRoundUrl = 'unknown'
this.#remainingRoundTasks = []

this.#onDemandTasks = []
}

/** @returns {Task | undefined} */
async next() {
if (this.#onDemandTasks.length > 0) {
console.log('Returning on-demand retrieval task.')
return this.#onDemandTasks.pop()
}

await this.#updateCurrentRound()
return this.#remainingRoundTasks.pop()
}
Expand Down Expand Up @@ -88,6 +96,27 @@ export class Tasker {

this.#lastRoundUrl = roundUrl
}
/**
* Queue a retrieval task for immediate execution. This task will be processed
* before regular round-based tasks.
*
* @param {Task} task - A valid retrieval task ({ cid, minerId })
*/
queueOnDemandTask(task) {
if (
!task ||
typeof task.cid !== 'string' ||
typeof task.minerId !== 'string'
) {
throw new Error(
'Invalid on-demand task. Must include cid and minerId as strings.',
)
}
// for future improvements prevent duplicates or apply rate limiting
console.log('Queued on-demand task for miner:', task.minerId)
task.isOnDemand = true
this.#onDemandTasks.push(task)
}
}

const textEncoder = new TextEncoder()
Expand Down
12 changes: 12 additions & 0 deletions scripts/trigger-on-demand.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// this’s a manual testing tool useful for demonstration
import Spark from '../lib/spark.js'

const spark = new Spark()

const testTask = {
cid: 'bafyreih4wq2ljuzhnn6pzl7tny7khzekqjx7yp6h5rvfbx2hrwrtp6mpcq',
minerId: 't01234',
}

spark.queueOnDemandRetrieval(testTask)
console.log('On-demand task queued successfully.')
40 changes: 40 additions & 0 deletions test/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,46 @@ test('submitRetrieval', async () => {
])
})

test('submitMeasurement adds excludeFromRSR for on-demand tasks', async () => {
let capturedPayload = null

const spark = new Spark({
fetch: async (url, allOpts) => {
if (url.includes('/measurements')) {
capturedPayload = JSON.parse(allOpts.body)
return {
status: 200,
ok: true,
async json() {
return { id: 456 }
},
}
}

// fallback response for anything else
return {
status: 200,
ok: true,
async json() {
return {}
},
headers: new Headers(),
}
},
})

const task = {
cid: 'bafytestondemand',
minerId: 't0999',
isOnDemand: true,
}
const stats = {}

await spark.submitMeasurement(task, stats)

assertEquals(capturedPayload.excludeFromRSR, true)
})

test('calculateDelayBeforeNextTask() returns value based on average task duration', () => {
const delay = calculateDelayBeforeNextTask({
lastTaskDurationInMs: 3_000,
Expand Down
29 changes: 27 additions & 2 deletions test/tasker.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import { test } from 'zinnia:test'
import { assertEquals } from 'zinnia:assert'
import { getStationKey, getTaskKey, pickTasks } from '../lib/tasker.js'
import {
getStationKey,
getTaskKey,
pickTasksForNode,
Tasker,
} from '../lib/tasker.js'

const RANDOMNESS =
'fc90e50dcdf20886b56c038b30fa921a5e57c532ea448dadcc209e44eec0445e'
Expand Down Expand Up @@ -36,7 +41,7 @@ test('pickTasksForNode', async () => {
{ cid: 'bafytwo', minerId: 'f040' },
]

const selectedTasks = await pickTasks({
const selectedTasks = await pickTasksForNode({
tasks: allTasks,
stationId: 'some-station-id',
randomness: RANDOMNESS,
Expand All @@ -49,3 +54,23 @@ test('pickTasksForNode', async () => {
{ cid: 'bafytwo', minerId: 'f020' },
])
})
test('on-demand tasks are returned before regular tasks', async () => {
const dummyFetch = async () => ({
status: 302,
headers: new Map([['location', '/mock-round']]),
json: async () => ({
retrievalTasks: [{ cid: 'bafyregular', minerId: 't0999' }],
maxTasksPerNode: 1,
}),
})

const tasker = new Tasker({ fetch: dummyFetch })

// Add an on-demand task first
const onDemandTask = { cid: 'bafyondemand', minerId: 't01234' }
tasker.queueOnDemandTask(onDemandTask)

const task = await tasker.next()
assertEquals(task.cid, onDemandTask.cid)
assertEquals(task.minerId, onDemandTask.minerId)
})
Loading