Skip to content
13 changes: 13 additions & 0 deletions src/migrations/20220418091527_statuses_deleted_at.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const kTable = 'statuses';

exports.up = async (knex) => {
await knex.schema.alterTable(kTable, (table) => {
table.timestamp('deleted_at');
});
};

exports.down = async (knex) => {
return knex.schema.alterTable(kTable, (table) => {
table.dropColumn('deleted_at');
});
};
16 changes: 12 additions & 4 deletions src/services/storage/twitter-statuses.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ class TwitterStatuses {
return this.knex.upsertItem(this.table, 'id', data);
}

softDelete(criteria) {
return this.knex.table(this.table)
.update('deleted_at', this.knex.fn.now())
.where(criteria);
}

list(data) {
const {
page,
Expand All @@ -27,11 +33,11 @@ class TwitterStatuses {
: data.filter.account.toLowerCase();

const query = this.knex(this.table)
.select()
.whereRaw(rawQuery, [account])
.select('id', 'date', 'text', 'account', 'meta')
.whereNull('deleted_at')
.andWhereRaw(rawQuery, [account])
.orderBy([
{ column: 'id', order },
{ column: 'account' },
])
.limit(pageSize)
.offset(offset);
Expand All @@ -51,7 +57,9 @@ class TwitterStatuses {

byId(tweetId) {
return this.knex(this.table)
.where('id', tweetId)
.select('id', 'date', 'text', 'account', 'meta')
.whereNull('deleted_at')
.andWhere('id', tweetId)
.first();
}
}
Expand Down
19 changes: 15 additions & 4 deletions src/services/twitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class Twitter {
this.onData = (json) => this._onData(json);
this.onError = (err) => this._onError(err);
this.onEnd = () => this._onEnd();
this.onDelete = (json) => this._onDelete(json);
}

async init() {
Expand Down Expand Up @@ -315,14 +316,11 @@ class Twitter {
listener.on('data', this.onData);
listener.on('error', this.onError);
listener.on('end', this.onEnd);
listener.on('delete', this.onDelete);

// attach params
listener.params = params;

// TODO: do this!
// add 'delete' handler
// listener.on('delete', this.onDelete);

// remap stream receiver to add 90 sec timeout
const { receive } = listener;
listener.receive = (chunk) => {
Expand Down Expand Up @@ -446,6 +444,19 @@ class Twitter {
return false;
}

async _onDelete(data) {
this.logger.debug({ data }, 'deleting tweet');
const { id_str: id } = data.delete.status;

if (!id) {
return false;
}

return this.storage
.twitterStatuses()
.softDelete({ id });
}

publish(tweet) {
const account = get(tweet, 'meta.account', false);
const { following } = this;
Expand Down
2 changes: 1 addition & 1 deletion src/utils/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function collectionResponse(objects, type, options = {}) {

function modelResponse(model, type) {
const response = {
data: model !== null ? transform(model, type) : null,
data: model !== null && model !== undefined ? transform(model, type) : null,
};

return response;
Expand Down
38 changes: 35 additions & 3 deletions test/suites/01.twitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ describe('twitter', function testSuite() {
invalidTweet: {
tweetId: '123-not-number',
},

deletionNotice: {
delete: {
status: {
id: 20,
id_str: '20',
user_id: 12,
user_id_str: '12',
},
},
},
};

let tweetId;
Expand Down Expand Up @@ -236,7 +247,6 @@ describe('twitter', function testSuite() {
assert.strictEqual(text, 'just setting up my twttr');
assert(meta);
assert.strictEqual(meta.id_str, payload.oneTweet.tweetId);
assert(data.attributes.explicit);
});

it('rejects with error on sync non-existing tweet', async () => {
Expand All @@ -260,9 +270,8 @@ describe('twitter', function testSuite() {
assert(data);
assert.strictEqual(data.id, payload.oneTweet.tweetId);
assert.strictEqual(data.type, 'tweet');
const { text, meta, explicit } = data.attributes;
const { text, meta } = data.attributes;
assert.strictEqual(text, 'just setting up my twttr');
assert(explicit);
assert(meta.account_id, '12');
assert(meta.account);
assert(meta.account_name);
Expand All @@ -279,6 +288,29 @@ describe('twitter', function testSuite() {
});
});

it('handle deletion tweet on stream', async () => {
service
.service('twitter')
.listener
.emit('delete', payload.deletionNotice);

const res = await service.amqp.publishAndWait(uri.getOne, payload.oneTweet);
assert(res);
assert.strictEqual(res.data, null);

const id = payload.oneTweet.tweetId;

const hidden = await service
.knex('statuses')
.where({ id })
.first();

assert(hidden);
assert.strictEqual(hidden.id, id);
assert(hidden.explicit);
assert(hidden.deleted_at);
});

after('close consumer', () => listener.close());
after('shutdown service', () => service.close());
});