Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Active interface #62

Open
wants to merge 16 commits into
base: mainnet
Choose a base branch
from
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
"dependencies": {
"@crustio/type-definitions": "1.1.0",
"@polkadot/api": "4.2.2-4",
"@types/express": "^4.17.13",
"@types/seedrandom": "^2.4.28",
"axios": "^0.21.1",
"bignumber.js": "^9.0.1",
"bluebird": "^3.7.2",
"dayjs": "^1.10.6",
"express": "^4.17.2",
"fs-extra": "^9.1.0",
"ipfs-http-client": "^48.1.3",
"joi": "^17.4.1",
Expand Down
2 changes: 1 addition & 1 deletion src/chain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ export default class CrustApi {
* @returns Option<MarketFileInfo>
* @throws ApiPromise error or type conversing error
*/
async maybeGetFileUsedInfo(cid: string): Promise<MarketFileInfo | null> {
public async maybeGetFileUsedInfo(cid: string): Promise<MarketFileInfo | null> {
await this.withApiReady();

try {
Expand Down
67 changes: 67 additions & 0 deletions src/http/http-interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import express from 'express';
import { Logger } from 'winston';
import { AppContext } from '../types/context';
import * as bodyParser from 'body-parser';
import { FileInfo } from '../chain';
import { createFileOrderOperator } from '../db/file-record';
import { bytesToMb } from '../utils';
import { ChainFileInfo } from '../types/chain';
import BigNumber from 'bignumber.js';

export async function startHttp(
context: AppContext,
loggerParent: Logger,
): Promise<void> {
const logger = loggerParent.child({ moduleId: "http" });
const app = express();
const PORT = 42087;
const fileOrderOp = createFileOrderOperator(context.database);

app.use(bodyParser.json({ limit: '50mb' }));
app.use(bodyParser.urlencoded({ limit: '50mb', extended: true }));
app.use(bodyParser.json());

logger.info("Configure smanager interface");

app.post('/api/v0/insert', async (req, res) => {
try {
const cid = req.body['cid'];
if (!cid) {
return res.status(400).send('please provide cid in the request');
}
logger.info("try to insert wanted file: %s", cid);

const file: any = await context.api.chainApi().query.market.files(cid); // eslint-disable-line
if (file.isEmpty) {
logger.warn('wanted file %s not exist on chain', cid);
return res.status(400).send(`wanted file ${cid} not exist on chain`);
}

const fi = file.toJSON() as any; // eslint-disable-line
const fileInfo = {
...fi,
amount: new BigNumber(fi.amount.toString()),
} as ChainFileInfo;

logger.info(`wanted file chain info: ${JSON.stringify(fileInfo)}`);

const sfi: FileInfo[] = [{
cid,
size: bytesToMb(fileInfo.file_size),
tips: fileInfo.amount.toNumber(),
owner: null,
replicas: fileInfo.reported_replica_count,
expiredAt: fileInfo.expired_at,
}];

await fileOrderOp.addFiles(sfi, 'wanted', true);
return res.status(200).send(`insert wanted file: ${cid} success`);
} catch (e) {
return res.status(500).send("internal server error");
}
});

app.listen(PORT, () => {
logger.info(`Smanager interface run on http://localhost:${PORT}`);
});
}
5 changes: 5 additions & 0 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { SimpleTask, Task } from './types/tasks';
import { Dayjs } from './utils/datetime';
import { logger } from './utils/logger';
import { timeout, timeoutOrError } from './utils/promise-utils';
import { startHttp } from './http/http-interface';

const MaxTickTimout = 15 * 1000;
const IpfsTimeout = 8000 * 1000; // 8000s
Expand Down Expand Up @@ -78,6 +79,10 @@ async function main() {
// start tasks
_.forEach(simpleTasks, (t) => t.start(context));
_.forEach(tasks, (t) => t.start(context));

// start http
startHttp(context, logger);

// start event loop after chain is synced
await doEventLoop(context, tasks);
} catch (e) {
Expand Down
34 changes: 32 additions & 2 deletions src/tasks/pull-scheduler-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async function handlePulling(
await fileOrderOps.updateFileInfoStatus(record.id, 'insufficient_space');
continue;
}

await sealFile(
context,
logger,
Expand Down Expand Up @@ -218,6 +219,16 @@ async function getOneFileByStrategy(
): Promise<FileRecord | null> {
const { strategy } = options;
do {
// Accapt all wanted records
const wantedRecord = await getWantedPendingFile(fileOrderOps, options);
if (wantedRecord) {
if (await isSealDone(wantedRecord.cid, context.sworkerApi, logger)) {
await fileOrderOps.updateFileInfoStatus(wantedRecord.id, 'handled');
return null;
}
return wantedRecord;
}

const record = await getPendingFile(fileOrderOps, options);
if (!record) {
return null;
Expand Down Expand Up @@ -284,6 +295,25 @@ async function getFreeSpace(context: AppContext): Promise<[number, number]> {
return [gbToMb(freeGBSize), gbToMb(sysFreeGBSize)];
}

async function getWantedPendingFile(
fileOrderOps: DbOrderOperator,
sealOptions: SealOption,
): DbResult<FileRecord> {
if (sealOptions.sealLarge) {
const record = await fileOrderOps.getPendingFileRecord('wanted', false);
if (record) {
return record;
}

return await fileOrderOps.getPendingFileRecord('wanted', true);
}

if (sealOptions.sealSmall) {
return await fileOrderOps.getPendingFileRecord('wanted', true);
}
return null;
}

async function getPendingFile(
fileOrderOps: DbOrderOperator,
sealOptions: SealOption,
Expand All @@ -303,7 +333,7 @@ async function getPendingFile(
}

if (sealSmall) {
return getPendingFileByStrategy(fileOrderOps, strategy, true);
return await getPendingFileByStrategy(fileOrderOps, strategy, true);
}
return null;
}
Expand Down Expand Up @@ -374,7 +404,7 @@ export async function createPullSchedulerTask(
const pullingInterval = 1 * 60 * 1000; // trival, period run it if there is no pending files in the db

return makeIntervalTask(
60 * 1000,
70 * 1000,
pullingInterval,
'files-pulling',
context,
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/seal-cleanup-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export async function createSealCleanupTask(
hours: 1,
}).asMilliseconds();
return makeIntervalTask(
60 * 1000,
40 * 1000,
cleanupInterval,
'seal-cleanup',
context,
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/seal-status-updater-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export async function createSealStatuUpdater(
): Promise<SimpleTask> {
const sealStatusUpdateInterval = 2 * 60 * 1000; // update seal status every 2 minutes
return makeIntervalTask(
1 * 60 * 1000,
50 * 1000,
sealStatusUpdateInterval,
'seal-updater',
context,
Expand Down
3 changes: 2 additions & 1 deletion src/types/indexing.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
// indexers types
// dbScan - indexer by scanning the market.Files map
// chainEvent - indexer by subscribing to the latest storage orders events
export type Indexer = 'dbScan' | 'chainEvent';
// wanted - trigger by user
export type Indexer = 'dbScan' | 'chainEvent' | 'wanted';
5 changes: 3 additions & 2 deletions src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ export * as consts from './consts';
* Parse object into JSON object
* @param o any object
*/
export function parseObj<T>(o: unknown): T {
export function parseObj(o: unknown): any { // eslint-disable-line
if (typeof o !== 'string') {
return o as T;
return JSON.parse(JSON.stringify(o));
}

return JSON.parse(o);
}

Expand Down