Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"bn.js": "^5.2.1",
"common-errors": "^1.2.0",
"debug": "^4.3.4",
"fastq": "^1.17.1",
"get-value": "^3.0.1",
"glob": "^10.3.10",
"hwp": "^0.3.0",
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/services/storage/twitter-statuses.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class TwitterStatuses {
async last({ account }) {
return this.knex(this.table)
.select()
.where({ account })
.where({ account: account.toLowerCase() })
.orderBy([{ column: 'id', order: 'desc' }])
.limit(1)
.first();
Expand Down
13 changes: 8 additions & 5 deletions src/services/twitter/nitter/nitter-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ class NitterClient {
url = `${url}?${query}`;
}

const response = await this.pool.request({
const {
statusCode,
body
} = await this.pool.request({
path: url,
method: method.toUpperCase()
});

const statusCode = response.statusCode;
const data = await response.body.text();

if (statusCode !== 200) {
throw new Error(`Request failed with status code: ${statusCode}, body: ${data}`);
await body.dump()
throw new HttpStatusError(statusCode, `Request failed with status code: ${statusCode}`);
}

const data = await body.text();

return {
statusCode,
data: JSON.parse(data)
Expand Down
215 changes: 143 additions & 72 deletions src/services/twitter/twitter.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const get = require('get-value');
const { HttpStatusError } = require('common-errors');
const { merge, find } = require('lodash');
// eslint-disable-next-line no-unused-vars
const hwp = require('hwp');
const fastq = require('fastq');

const StatusFilter = require('./status-filter');
const { transform, TYPE_TWEET } = require('../../utils/response');
Expand All @@ -11,6 +13,7 @@ const { kPublishEvent } = require('../notifier');
const { NitterClient } = require('./nitter/nitter-client');

const SYNC_INTERVAL = parseInt(process.env.SYNC_INTERVAL || '2500', 10);
const BACKOFF_INTERVAL = parseInt(process.env.BACKOFF_INTERVAL || '30000', 10);

function extractAccount(accum, value) {
const accountId = value.meta.account_id;
Expand Down Expand Up @@ -69,6 +72,20 @@ class Twitter {
return tweet;
}

static findTweet(tweets, tweetId) {
if (!tweetId) {
return null;
}

for (const tweet of tweets) {
if (tweetId === tweet.id_str) {
return tweet;
}
}

return null;
}

/**
* @param {Social} core
* @param {object} config
Expand All @@ -86,7 +103,6 @@ class Twitter {
logger: logger.child({ namespace: '@social/nitter' }),
});
this.logger = logger.child({ namespace: '@social/twitter' });
this.loaders = new Map();

const { restrictedTypes = [] } = config.requests || {};
this.restrictedStatusTypes = restrictedTypes.map((name) => TweetTypeByName[name]);
Expand All @@ -97,6 +113,7 @@ class Twitter {
this.following = [];
this.accountIds = {};
this.syncTimer = null;
this.syncQueue = fastq.promise(this, this.syncAccount, 1);

this.syncFeed = this.syncFeed.bind(this);
this.init = this.init.bind(this);
Expand Down Expand Up @@ -158,6 +175,12 @@ class Twitter {
Object.setPrototypeOf(this.accountIds, null);
}

/**
* @description returns a boolean flag whether notification required
* @param event
* @param from
* @returns {boolean}
*/
shouldNotifyFor(event, from) {
const allow = this.notifyConfig[event];

Expand All @@ -179,12 +202,13 @@ class Twitter {

this.logger.trace({ status }, 'saving serialized status data');

// noinspection JSUnresolvedReference
return this.storage
.twitterStatuses()
.save(status);
}

async _onData(data, notify = true) {
async _onTweetData(data, notify = true) {
if (!this.isSyncable()) {
return false;
}
Expand All @@ -199,18 +223,24 @@ class Twitter {
return false;
}

try {
const saved = await this._saveToStatuses(data, tweetType, false);
let saved;

if (notify) {
this.publish(saved);
}

return saved;
try {
saved = await this._saveToStatuses(data, tweetType, false);
} catch (err) {
this.logger.warn({ id: data.id_str, err }, 'failed to save tweet');
return false;
}

if (notify) {
try {
this.publish(saved);
} catch (err) {
this.logger.warn({ err }, 'failed to publish tweet');
}
}

return saved;
}

publish(tweet) {
Expand Down Expand Up @@ -249,15 +279,20 @@ class Twitter {
}

async syncFeed() {
// noinspection JSUnresolvedReference
const feeds = await this.storage
.feeds()
.fetch({ network: 'twitter' });

const accounts = feeds.reduce(extractAccount, []);

await hwp.forEach(accounts, async (item) => {
await this.syncAccount(item.account);
});
for (const item of accounts) {
// This promise could be ignored as it will not lead to a 'unhandledRejection'.
// noinspection ES6MissingAwait
this.syncQueue.push(item.account);
}

await this.syncQueue.drained();

this.logger.debug({ accounts }, 'resolved accounts');

Expand Down Expand Up @@ -300,80 +335,116 @@ class Twitter {
}
}

/**
* @description asynchronous worker for fastq
* @param account
* @returns {Promise<void>}
*/
async syncAccount(account) {
if (this.loaders.has(account)) {
return;
}
// calculate notification on sync
const notify = this.shouldNotifyFor('data', 'sync');

this.loaders.set(account, true);
// recursively syncs account
// noinspection JSUnresolvedReference
const lastTweet = await this.storage
.twitterStatuses()
.last({ account });

try {
// calculate notification on sync
const notify = this.shouldNotifyFor('data', 'sync');
for await (const tweets of this.pageLoader(account, lastTweet?.id)) {
// up to 16 tweets can be saved concurrently by default
await hwp.forEach(tweets, async (tweet) => {
return this._onTweetData(tweet, notify);
}, 16);
}
}

// recursively syncs account
const lastTweet = await this.storage
.twitterStatuses()
.last({ account });
/**
* @description wrapper around nitter method, should introduce backoff flag capability
* @param account
* @param cursor
* @returns {Promise<{cursor: *, tweets: (*[]|[]|*|*[])}>}
*/
async fetchTweets(account, cursor) {
try {
const result = await this.nitter.fetchTweets(account, cursor);
return {
tweets: result?.tweets ?? [],
cursorBottom: result?.cursorBottom,
backoff: false,
};
} catch (err) {
this.logger.warn({ err }, 'error occurred while tweet loading');

if (err.statusCode === 429) {
// whatever happens on the api, should take a backoff to let it calm down
return {
tweets: [],
cursorBottom: null,
backoff: true,
};
}

this.logger.info({
tweet: { id: lastTweet?.id },
account,
}, 'last tweet');
return {
tweets: [],
cursorBottom: null,
backoff: false,
};
}
}

let looped = true;
let page = 1;
let count = 0;
let cursor = null;
/**
* @description tweet page async generator for given account,
* should stop loading as soon as lastTweetId is reached
* @param account
* @param lastTweetId
* @returns {AsyncGenerator<[]|*[], void, *>}
*/
async* pageLoader(account, lastTweetId) {
let total = 0;
let cursor = null;

do {
// eslint-disable-next-line no-await-in-loop
const result = await this.nitter.fetchTweets(account, cursor);
const tweets = result.tweets ?? [];
cursor = result.cursorBottom;
for (let page = 1; page <= this.loaderMaxPages; page += 1) {
// eslint-disable-next-line no-await-in-loop
const { tweets, cursorBottom, backoff } = await this.fetchTweets(account, cursor);

if (backoff) {
this.logger.debug({
account,
page,
tweets: tweets.map((tweet) => tweet.id_str),
}, 'tweets loaded');

if (lastTweet) {
for (const tweet of tweets) {
if (lastTweet.id === tweet.id_str) {
this.logger.debug({
account,
tweet_id: tweet.id_str,
}, 'last known tweet reached, loader terminated');
looped = false;
break;
}
}
if (!looped) {
break;
}
}
lastTweetId,
backoff,
backoffInterval: BACKOFF_INTERVAL,
}, 'pageLoader backoff');

for (const tweet of tweets) {
// eslint-disable-next-line no-await-in-loop
await this._onData(tweet, notify);
}
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => { setTimeout(resolve, BACKOFF_INTERVAL); });

looped = page < this.loaderMaxPages && tweets.length > 0;
count += tweets.length;
yield [];
break;
}

this.logger.debug({
looped, page, cursor, count, account,
}, 'tweet page loaded');
cursor = cursorBottom;
total += tweets.length;
const isLastTweet = !!Twitter.findTweet(tweets, lastTweetId);
const nextPage = tweets.length > 0 && this.isSyncable() && !isLastTweet;

if (looped) {
page += 1;
}
} while (looped && !this.isStopped);
} catch (err) {
this.logger.warn({ err, account }, 'error occurred while tweet loading');
} finally {
this.loaders.delete(account);
this.logger.debug({
nextPage,
page,
cursor,
total,
account,
lastTweetId,
isLastTweet,
...(this.logger.level === 'debug' || this.logger.level === 'trace'
? { tweetIds: tweets.map((tweet) => tweet.id_str) }
: {}),
}, 'tweet page loaded');

yield tweets;

if (!nextPage) {
break;
}
}
}

Expand Down