Skip to content

Commit bca573b

Browse files
authored
feat: cio optimization (#2550)
CIO optimization by removing/registering users based on their current activity, along with downgrading their digest if we need to. Once we've agreed on the processing code-wise, I will start working on the rollout plan, as it should basically use the same processing.
1 parent 895bd32 commit bca573b

19 files changed

+894
-42
lines changed

.infra/application.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ debezium.transforms.ReadOperationFilter.language=jsr223.groovy
2727
debezium.transforms.ReadOperationFilter.condition=!(valueSchema.field('op') && value.op == 'r')
2828
debezium.transforms.PostsFilter.type=io.debezium.transforms.Filter
2929
debezium.transforms.PostsFilter.language=jsr223.groovy
30-
debezium.transforms.PostsFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'post' && value.before.views != value.after.views)
30+
debezium.transforms.PostsFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'post' && value.before.views != value.after.views) && !(valueSchema.field('op') && value.op == 'u' && value.source.table == 'user' && value.before.cioRegistered != value.after.cioRegistered)
3131
debezium.sink.type=pubsub

.infra/crons.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ export const crons: Cron[] = [
5959
name: 'generic-referral-reminder',
6060
schedule: '12 3 * * *',
6161
},
62+
// {
63+
// name: 'validate-active-users',
64+
// schedule: '15 4 * * *',
65+
// },
6266
{
6367
name: 'update-source-tag-view',
6468
schedule: '20 3 * * 0',
@@ -109,5 +113,5 @@ export const crons: Cron[] = [
109113
{
110114
name: 'calculate-top-readers',
111115
schedule: '0 2 1 * *',
112-
}
116+
},
113117
];

__tests__/cron/validateActiveUsers.ts

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
import { crons } from '../../src/cron/index';
2+
import cron from '../../src/cron/validateActiveUsers';
3+
import { expectSuccessfulCron, saveFixtures } from '../helpers';
4+
import * as gcp from '../../src/common/googleCloud';
5+
import * as cioModule from '../../src/cio';
6+
import { DataSource, In, JsonContains } from 'typeorm';
7+
import createOrGetConnection from '../../src/db';
8+
import {
9+
User,
10+
UserPersonalizedDigest,
11+
UserPersonalizedDigestSendType,
12+
} from '../../src/entity';
13+
import { badUsersFixture, plusUsersFixture, usersFixture } from '../fixture';
14+
import { updateFlagsStatement } from '../../src/common';
15+
import { ioRedisPool } from '../../src/redis';
16+
17+
let con: DataSource;
18+
19+
beforeEach(async () => {
20+
con = await createOrGetConnection();
21+
await ioRedisPool.execute((client) => client.flushall());
22+
jest.clearAllMocks();
23+
jest.resetModules();
24+
await saveFixtures(con, User, [
25+
...usersFixture,
26+
...plusUsersFixture,
27+
...badUsersFixture,
28+
]);
29+
});
30+
31+
describe('validateActiveUsers', () => {
32+
beforeEach(async () => {
33+
await saveFixtures(con, User, usersFixture);
34+
});
35+
36+
it('should NOT be registered yet', () => {
37+
const registeredWorker = crons.find((item) => item.name === cron.name);
38+
39+
expect(registeredWorker).toBeDefined();
40+
});
41+
});
42+
43+
describe('users for downgrade', () => {
44+
it('should not do anything if users do not have digest subscription', async () => {
45+
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
46+
reactivateUsers: [],
47+
inactiveUsers: [],
48+
downgradeUsers: ['4', '1'],
49+
});
50+
51+
await con.getRepository(UserPersonalizedDigest).delete({});
52+
await expectSuccessfulCron(cron);
53+
54+
const digests = await con.getRepository(UserPersonalizedDigest).find();
55+
expect(digests.length).toEqual(0);
56+
});
57+
58+
it('should downgrade daily digest to weekly digest', async () => {
59+
const downgradeUsers = ['4', '1'];
60+
61+
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
62+
reactivateUsers: [],
63+
inactiveUsers: [],
64+
downgradeUsers,
65+
});
66+
67+
await con.getRepository(UserPersonalizedDigest).update(
68+
{},
69+
{
70+
preferredDay: 1,
71+
preferredHour: 4,
72+
flags: updateFlagsStatement({
73+
digestSendType: UserPersonalizedDigestSendType.workdays,
74+
}),
75+
},
76+
);
77+
await expectSuccessfulCron(cron);
78+
79+
const digests = await con.getRepository(UserPersonalizedDigest).find({
80+
where: {
81+
flags: JsonContains({
82+
digestSendType: UserPersonalizedDigestSendType.weekly,
83+
}),
84+
},
85+
});
86+
const downgradedOnly = digests.every(
87+
({ userId, preferredDay, preferredHour }) =>
88+
downgradeUsers.includes(userId) &&
89+
preferredDay === 3 &&
90+
preferredHour === 9,
91+
);
92+
expect(downgradedOnly).toEqual(true);
93+
});
94+
});
95+
96+
describe('users for removal', () => {
97+
it('should not do anything if users are removed to CIO already', async () => {
98+
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
99+
reactivateUsers: ['1', '2'],
100+
inactiveUsers: ['3', '5'],
101+
downgradeUsers: ['4'],
102+
});
103+
104+
const postSpy = jest
105+
.spyOn(cioModule.cioV2.request, 'post')
106+
.mockResolvedValue({});
107+
108+
await con
109+
.getRepository(User)
110+
.update({ id: In(['3', '5']) }, { cioRegistered: false });
111+
112+
await expectSuccessfulCron(cron);
113+
114+
expect(postSpy).not.toHaveBeenCalled();
115+
});
116+
117+
it('should send removal to cio', async () => {
118+
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
119+
reactivateUsers: ['1', '2'],
120+
inactiveUsers: ['3', '5', 'vordr'],
121+
downgradeUsers: ['4'],
122+
});
123+
124+
const postSpy = jest
125+
.spyOn(cioModule.cioV2.request, 'post')
126+
.mockResolvedValue({});
127+
128+
await con
129+
.getRepository(User)
130+
.update({ id: In(['vordr']) }, { cioRegistered: false });
131+
const digests = await con.getRepository(UserPersonalizedDigest).count();
132+
expect(digests).toBeGreaterThan(0);
133+
134+
await expectSuccessfulCron(cron);
135+
136+
const batch = [
137+
{
138+
action: 'destroy',
139+
type: 'person',
140+
identifiers: { id: '3' },
141+
},
142+
{
143+
action: 'destroy',
144+
type: 'person',
145+
identifiers: { id: '5' },
146+
},
147+
];
148+
149+
expect(postSpy).toHaveBeenCalledWith('/users', { batch });
150+
151+
const fromRemovalOnly = postSpy.mock.calls[0][1].batch.every(
152+
({ identifiers }) => ['3', '5'].includes(identifiers.id),
153+
);
154+
expect(fromRemovalOnly).toBeTruthy();
155+
156+
const unRegisteredOnly = postSpy.mock.calls[0][1].batch.every(
157+
({ identifiers }) => !['vordr'].includes(identifiers.id),
158+
);
159+
expect(unRegisteredOnly).toBeTruthy();
160+
161+
const unregistered = await con
162+
.getRepository(User)
163+
.findOne({ select: ['cioRegistered'], where: { id: '3' } });
164+
expect(unregistered.cioRegistered).toEqual(false);
165+
166+
const removed = await con.getRepository(UserPersonalizedDigest).count();
167+
expect(removed).toBeLessThan(digests);
168+
});
169+
});
170+
171+
describe('users for reactivation', () => {
172+
it('should not do anything if users are registered to CIO already', async () => {
173+
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
174+
reactivateUsers: ['1', '2'],
175+
inactiveUsers: ['3'],
176+
downgradeUsers: ['4'],
177+
});
178+
const postSpy = jest
179+
.spyOn(cioModule.cioV2.request, 'post')
180+
.mockResolvedValue({});
181+
182+
// to stop running removal of `inactiveUsers`
183+
await con.getRepository(User).update({ id: '3' }, { cioRegistered: false });
184+
185+
// default value for `cioRegistered` is true
186+
await expectSuccessfulCron(cron);
187+
188+
expect(postSpy).not.toHaveBeenCalled();
189+
});
190+
191+
it('should send reactivation to cio', async () => {
192+
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
193+
reactivateUsers: ['1', '2'],
194+
inactiveUsers: ['3'],
195+
downgradeUsers: ['4'],
196+
});
197+
198+
const postSpy = jest
199+
.spyOn(cioModule.cioV2.request, 'post')
200+
.mockResolvedValue({});
201+
202+
// to stop running removal of `inactiveUsers`
203+
await con
204+
.getRepository(User)
205+
.update({ id: In(['3', '1']) }, { cioRegistered: false });
206+
207+
// default value for `cioRegistered` is true
208+
await expectSuccessfulCron(cron);
209+
210+
const batch = [
211+
{
212+
action: 'identify',
213+
type: 'person',
214+
identifiers: { id: '1' },
215+
attributes: {
216+
accepted_marketing: false,
217+
'cio_subscription_preferences.topics.topic_4': false,
218+
'cio_subscription_preferences.topics.topic_7': true,
219+
'cio_subscription_preferences.topics.topic_8': true,
220+
'cio_subscription_preferences.topics.topic_9': true,
221+
created_at: 1656427727,
222+
first_name: 'Ido',
223+
name: 'Ido',
224+
permalink: 'http://localhost:5002/idoshamun',
225+
referral_link: 'http://localhost:5002/join?cid=generic&userid=1',
226+
updated_at: undefined,
227+
username: 'idoshamun',
228+
},
229+
},
230+
];
231+
232+
expect(postSpy).toHaveBeenCalledWith('/users', { batch });
233+
234+
const fromReactivateUserOnly = postSpy.mock.calls[0][1].batch.every(
235+
({ identifiers }) => ['1'].includes(identifiers.id),
236+
);
237+
expect(fromReactivateUserOnly).toBeTruthy();
238+
239+
const registeredOnly = postSpy.mock.calls[0][1].batch.every(
240+
({ identifiers }) => identifiers.id !== '3',
241+
);
242+
expect(registeredOnly).toBeTruthy();
243+
244+
const reactivated = await con
245+
.getRepository(User)
246+
.findOne({ select: ['cioRegistered'], where: { id: '1' } });
247+
expect(reactivated.cioRegistered).toEqual(true);
248+
});
249+
});

__tests__/workers/userCreatedCio.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import { cio } from '../../src/cio';
1212
import { typedWorkers } from '../../src/workers';
1313
import mocked = jest.mocked;
1414

15-
jest.mock('../../src/common', () => ({
16-
...jest.requireActual('../../src/common'),
15+
jest.mock('../../src/common/links', () => ({
16+
...jest.requireActual('../../src/common/links'),
1717
getShortGenericInviteLink: jest.fn(),
1818
}));
1919

__tests__/workers/userUpdatedCio.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ import { usersFixture } from '../fixture/user';
2121

2222
jest.mock('../../src/common', () => ({
2323
...jest.requireActual('../../src/common'),
24+
resubscribeUser: jest.fn(),
25+
}));
26+
27+
jest.mock('../../src/common/links', () => ({
28+
...jest.requireActual('../../src/common/links'),
2429
getShortGenericInviteLink: jest.fn(),
2530
resubscribeUser: jest.fn(),
2631
}));

bin/cioSyncBasedOnActivity.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import createOrGetConnection from '../src/db';
2+
import fs from 'fs';
3+
import { parse } from 'csv-parse';
4+
import { syncSubscriptionsWithActiveState } from '../src/common';
5+
import {
6+
GetUsersActiveState,
7+
UserActiveState,
8+
} from '../src/common/googleCloud';
9+
10+
const func = async () => {
11+
const csvFilePath = process.argv[2];
12+
13+
if (!csvFilePath) {
14+
throw new Error('CSV file path is required');
15+
}
16+
17+
const users: GetUsersActiveState = {
18+
inactiveUsers: [],
19+
downgradeUsers: [],
20+
reactivateUsers: [],
21+
};
22+
23+
const stream = fs
24+
.createReadStream(csvFilePath)
25+
.pipe(parse({ delimiter: ',', from_line: 2 }));
26+
27+
stream.on('error', (err) => {
28+
console.error('failed to read file: ', err.message);
29+
});
30+
31+
stream.on('data', function ([id, rawStatus]) {
32+
if (!id || !rawStatus) {
33+
return;
34+
}
35+
36+
const status = rawStatus.toString() as UserActiveState;
37+
38+
if (status === UserActiveState.InactiveSince6wAgo) {
39+
users.downgradeUsers.push(id);
40+
}
41+
42+
if (
43+
status === UserActiveState.InactiveSince12wAgo ||
44+
status === UserActiveState.NeverActive
45+
) {
46+
users.inactiveUsers.push(id);
47+
}
48+
});
49+
50+
await new Promise((resolve) => {
51+
stream.on('end', resolve);
52+
});
53+
54+
console.log('running cron sync function');
55+
56+
const con = await createOrGetConnection();
57+
58+
await syncSubscriptionsWithActiveState({ con, users });
59+
60+
console.log('finished sync');
61+
62+
process.exit(0);
63+
};
64+
65+
func();

0 commit comments

Comments
 (0)