Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cio optimization #2550

Open
wants to merge 49 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
717fbc3
feat: cio optimization
sshanzel Dec 17, 2024
92971da
feat: retry policy
sshanzel Dec 18, 2024
bd96bc1
refactor: function name
sshanzel Dec 18, 2024
3b66822
feat: bq query
sshanzel Dec 18, 2024
b9048d8
feat: query from bq
sshanzel Dec 18, 2024
a9adb02
test: cron job
sshanzel Dec 18, 2024
f2d68b0
chore: prepare cron job
sshanzel Dec 18, 2024
bb510ff
chore: no registered test
sshanzel Dec 18, 2024
6af995b
fix: missing await
sshanzel Dec 18, 2024
0c0e4a3
refactor: ordering of variables
sshanzel Dec 18, 2024
a1c8939
Merge branch 'main' into MI-691
sshanzel Dec 18, 2024
4df7881
refactor: code cleanup
sshanzel Dec 18, 2024
8073118
refactor: better naming
sshanzel Dec 18, 2024
17dba79
fix: going through missed days
sshanzel Dec 19, 2024
41e7960
chore: finished todo
sshanzel Dec 19, 2024
a6dfe3f
fix: date minus 1
sshanzel Dec 19, 2024
ef1308f
chore: updated query based on dave
sshanzel Dec 19, 2024
a7cab63
revert: application properties
sshanzel Dec 19, 2024
b833be6
fix: edge cases on success and failure
sshanzel Dec 19, 2024
3e71803
refactor: to change object
sshanzel Dec 19, 2024
4ba9ff6
refactor: avoid nested looping
sshanzel Dec 20, 2024
9c5c00b
refactor: run the downgrade within a single query
sshanzel Dec 20, 2024
28c3684
fix: import issues
sshanzel Dec 20, 2024
c56070e
refactor: reusable function
sshanzel Dec 20, 2024
d68a90c
refactor: unneeded columns
sshanzel Dec 20, 2024
383182c
chore: better log message
sshanzel Dec 20, 2024
5a6de1f
fix: removal of personalized digest
sshanzel Dec 20, 2024
494142e
fix: remove digest
sshanzel Dec 20, 2024
65d54fa
fix: unnecessary comment
sshanzel Dec 20, 2024
b775005
fix: bq on cron
sshanzel Dec 20, 2024
9735a79
fix: limitations
sshanzel Dec 20, 2024
f19be40
fix: only required columns
sshanzel Dec 20, 2024
4802085
fix: test
sshanzel Dec 20, 2024
2da1eaa
chore: removal of console logs
sshanzel Dec 20, 2024
280f250
feat: bin script for initial rollout
sshanzel Dec 20, 2024
b7e6ddf
fix: test shape
sshanzel Dec 20, 2024
9fc0d67
Merge branch 'main' into MI-691
sshanzel Dec 20, 2024
c6c678b
test: import mock
sshanzel Dec 20, 2024
2db4020
refactor: worker to work with initial run
sshanzel Dec 20, 2024
d0ac684
chore: ignore column
sshanzel Dec 20, 2024
ec456da
fix: missing transaction
sshanzel Jan 2, 2025
772d21c
refactor: unnecessary limit
sshanzel Jan 2, 2025
9d7c6f4
Merge branch 'main' into MI-691
sshanzel Jan 2, 2025
dc0aa64
feat: initial rollout through csv
sshanzel Jan 2, 2025
ac4b96e
chore: minor code cleanups
sshanzel Jan 2, 2025
f4f036c
chore: uncomment ignoring of cio registered
sshanzel Jan 2, 2025
6cdf31c
chore: unnecessary csv file
sshanzel Jan 2, 2025
47a6b42
fix: condition to not process anything related to cio registered
sshanzel Jan 3, 2025
279773e
Merge branch 'main' into MI-691
sshanzel Jan 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .infra/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ debezium.transforms.ReadOperationFilter.condition=!(valueSchema.field('op') && v
debezium.transforms.PostsFilter.type=io.debezium.transforms.Filter
debezium.transforms.PostsFilter.language=jsr223.groovy
debezium.transforms.PostsFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'post' && value.before.views != value.after.views)
# debezium.transforms.PostsFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'post' && value.before.views != value.after.views) && !(valueSchema.field('op') && value.op == 'u' && value.source.table == 'user' && value.before.cioRegistered != value.after.cioRegistered && !value.after.cioRegistered)
Copy link
Member

@idoshamun idoshamun Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reactivation is still one-by-one based on this filter right?
We is it commented?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we are running it also in batches to avoid the rate-limiting as the users can reach up to thousands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we run in batches, we definitely need to uncomment this line, don't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes for that part. This is in part of the preparation, so we can merge this PR once done, and when the initial rollout is about to start, we will just uncomment this line. I will mention this in the confluence page.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we need this to be uncommented for production as well, not just initial rollout. Don't we? We don't rely on CDC anymore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Yes, moving forward, we also won't have to track the changes for the column. In that case, then yeah, we can always have it ignored.

debezium.sink.type=pubsub
6 changes: 5 additions & 1 deletion .infra/crons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export const crons: Cron[] = [
name: 'generic-referral-reminder',
schedule: '12 3 * * *',
},
// {
// name: 'validate-active-users',
// schedule: '15 4 * * *',
// },
{
name: 'update-source-tag-view',
schedule: '20 3 * * 0',
Expand Down Expand Up @@ -109,5 +113,5 @@ export const crons: Cron[] = [
{
name: 'calculate-top-readers',
schedule: '0 2 1 * *',
}
},
];
249 changes: 249 additions & 0 deletions __tests__/cron/validateActiveUsers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
import { crons } from '../../src/cron/index';
import cron from '../../src/cron/validateActiveUsers';
import { expectSuccessfulCron, saveFixtures } from '../helpers';
import * as gcp from '../../src/common/googleCloud';
import * as cioModule from '../../src/cio';
import { DataSource, In, JsonContains } from 'typeorm';
import createOrGetConnection from '../../src/db';
import {
User,
UserPersonalizedDigest,
UserPersonalizedDigestSendType,
} from '../../src/entity';
import { badUsersFixture, plusUsersFixture, usersFixture } from '../fixture';
import { updateFlagsStatement } from '../../src/common';
import { ioRedisPool } from '../../src/redis';

let con: DataSource;

beforeEach(async () => {
con = await createOrGetConnection();
await ioRedisPool.execute((client) => client.flushall());
jest.clearAllMocks();
jest.resetModules();
await saveFixtures(con, User, [
...usersFixture,
...plusUsersFixture,
...badUsersFixture,
]);
});

describe('validateActiveUsers', () => {
beforeEach(async () => {
await saveFixtures(con, User, usersFixture);
});

it('should NOT be registered yet', () => {
const registeredWorker = crons.find((item) => item.name === cron.name);

expect(registeredWorker).toBeDefined();
});
});

describe('users for downgrade', () => {
it('should not do anything if users do not have digest subscription', async () => {
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
reactivateUsers: [],
inactiveUsers: [],
downgradeUsers: ['4', '1'],
});

await con.getRepository(UserPersonalizedDigest).delete({});
await expectSuccessfulCron(cron);

const digests = await con.getRepository(UserPersonalizedDigest).find();
expect(digests.length).toEqual(0);
});

it('should downgrade daily digest to weekly digest', async () => {
const downgradeUsers = ['4', '1'];

jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
reactivateUsers: [],
inactiveUsers: [],
downgradeUsers,
});

await con.getRepository(UserPersonalizedDigest).update(
{},
{
preferredDay: 1,
preferredHour: 4,
flags: updateFlagsStatement({
digestSendType: UserPersonalizedDigestSendType.workdays,
}),
},
);
await expectSuccessfulCron(cron);

const digests = await con.getRepository(UserPersonalizedDigest).find({
where: {
flags: JsonContains({
digestSendType: UserPersonalizedDigestSendType.weekly,
}),
},
});
const downgradedOnly = digests.every(
({ userId, preferredDay, preferredHour }) =>
downgradeUsers.includes(userId) &&
preferredDay === 3 &&
preferredHour === 9,
);
expect(downgradedOnly).toEqual(true);
});
});

describe('users for removal', () => {
it('should not do anything if users are removed to CIO already', async () => {
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
reactivateUsers: ['1', '2'],
inactiveUsers: ['3', '5'],
downgradeUsers: ['4'],
});

const postSpy = jest
.spyOn(cioModule.cioV2.request, 'post')
.mockResolvedValue({});

await con
.getRepository(User)
.update({ id: In(['3', '5']) }, { cioRegistered: false });

await expectSuccessfulCron(cron);

expect(postSpy).not.toHaveBeenCalled();
});

it('should send removal to cio', async () => {
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
reactivateUsers: ['1', '2'],
inactiveUsers: ['3', '5', 'vordr'],
downgradeUsers: ['4'],
});

const postSpy = jest
.spyOn(cioModule.cioV2.request, 'post')
.mockResolvedValue({});

await con
.getRepository(User)
.update({ id: In(['vordr']) }, { cioRegistered: false });
const digests = await con.getRepository(UserPersonalizedDigest).count();
expect(digests).toBeGreaterThan(0);

await expectSuccessfulCron(cron);

const batch = [
{
action: 'destroy',
type: 'person',
identifiers: { id: '3' },
},
{
action: 'destroy',
type: 'person',
identifiers: { id: '5' },
},
];

expect(postSpy).toHaveBeenCalledWith('/users', { batch });

const fromRemovalOnly = postSpy.mock.calls[0][1].batch.every(
({ identifiers }) => ['3', '5'].includes(identifiers.id),
);
expect(fromRemovalOnly).toBeTruthy();

const unRegisteredOnly = postSpy.mock.calls[0][1].batch.every(
({ identifiers }) => !['vordr'].includes(identifiers.id),
);
expect(unRegisteredOnly).toBeTruthy();

const unregistered = await con
.getRepository(User)
.findOne({ select: ['cioRegistered'], where: { id: '3' } });
expect(unregistered.cioRegistered).toEqual(false);

const removed = await con.getRepository(UserPersonalizedDigest).count();
expect(removed).toBeLessThan(digests);
});
});

describe('users for reactivation', () => {
it('should not do anything if users are registered to CIO already', async () => {
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
reactivateUsers: ['1', '2'],
inactiveUsers: ['3'],
downgradeUsers: ['4'],
});
const postSpy = jest
.spyOn(cioModule.cioV2.request, 'post')
.mockResolvedValue({});

// to stop running removal of `inactiveUsers`
await con.getRepository(User).update({ id: '3' }, { cioRegistered: false });

// default value for `cioRegistered` is true
await expectSuccessfulCron(cron);

expect(postSpy).not.toHaveBeenCalled();
});

it('should send reactivation to cio', async () => {
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({
reactivateUsers: ['1', '2'],
inactiveUsers: ['3'],
downgradeUsers: ['4'],
});

const postSpy = jest
.spyOn(cioModule.cioV2.request, 'post')
.mockResolvedValue({});

// to stop running removal of `inactiveUsers`
await con
.getRepository(User)
.update({ id: In(['3', '1']) }, { cioRegistered: false });

// default value for `cioRegistered` is true
await expectSuccessfulCron(cron);

const batch = [
{
action: 'identify',
type: 'person',
identifiers: { id: '1' },
attributes: {
accepted_marketing: false,
'cio_subscription_preferences.topics.topic_4': false,
'cio_subscription_preferences.topics.topic_7': true,
'cio_subscription_preferences.topics.topic_8': true,
'cio_subscription_preferences.topics.topic_9': true,
created_at: 1656427727,
first_name: 'Ido',
name: 'Ido',
permalink: 'http://localhost:5002/idoshamun',
referral_link: 'http://localhost:5002/join?cid=generic&userid=1',
updated_at: undefined,
username: 'idoshamun',
Comment on lines +216 to +227
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice: these are the only attributes we are now sending to CIO for reactivation. Feel free to comment if it is missing any other properties.

},
},
];

expect(postSpy).toHaveBeenCalledWith('/users', { batch });

const fromReactivateUserOnly = postSpy.mock.calls[0][1].batch.every(
({ identifiers }) => ['1'].includes(identifiers.id),
);
expect(fromReactivateUserOnly).toBeTruthy();

const registeredOnly = postSpy.mock.calls[0][1].batch.every(
({ identifiers }) => identifiers.id !== '3',
);
expect(registeredOnly).toBeTruthy();

const reactivated = await con
.getRepository(User)
.findOne({ select: ['cioRegistered'], where: { id: '1' } });
expect(reactivated.cioRegistered).toEqual(true);
});
});
4 changes: 2 additions & 2 deletions __tests__/workers/userCreatedCio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { cio } from '../../src/cio';
import { typedWorkers } from '../../src/workers';
import mocked = jest.mocked;

jest.mock('../../src/common', () => ({
...jest.requireActual('../../src/common'),
jest.mock('../../src/common/links', () => ({
...jest.requireActual('../../src/common/links'),
getShortGenericInviteLink: jest.fn(),
}));

Expand Down
5 changes: 5 additions & 0 deletions __tests__/workers/userUpdatedCio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import { usersFixture } from '../fixture/user';

jest.mock('../../src/common', () => ({
...jest.requireActual('../../src/common'),
resubscribeUser: jest.fn(),
}));

jest.mock('../../src/common/links', () => ({
...jest.requireActual('../../src/common/links'),
getShortGenericInviteLink: jest.fn(),
resubscribeUser: jest.fn(),
}));
Expand Down
66 changes: 66 additions & 0 deletions bin/cioSyncBasedOnActivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import createOrGetConnection from '../src/db';
import fs from 'fs';
import { parse } from 'csv-parse';
import { syncSubscriptionsWithActiveState } from '../src/common';
import {
GetUsersActiveState,
UserActiveState,
} from '../src/common/googleCloud';

const func = async () => {
const csvFilePath = process.argv[2];

if (!csvFilePath) {
throw new Error('CSV file path is required');
}

const users: GetUsersActiveState = {
inactiveUsers: [],
downgradeUsers: [],
reactivateUsers: [],
};

const stream = fs
.createReadStream(csvFilePath)
.pipe(parse({ delimiter: ',', from_line: 2 }));

stream.on('error', (err) => {
console.error('failed to read file: ', err.message);
});

// eslint-disable-next-line @typescript-eslint/no-unused-vars
stream.on('data', function ([id, rawStatus]) {
if (!id || !rawStatus) {
return;
}

const status = rawStatus.toString() as UserActiveState;

if (status === UserActiveState.InactiveSince6wAgo) {
users.downgradeUsers.push(id);
}

if (
status === UserActiveState.InactiveSince12wAgo ||
status === UserActiveState.NeverActive
) {
users.inactiveUsers.push(id);
}
});

await new Promise((resolve) => {
stream.on('end', resolve);
});

console.log('running cron sync function');

const con = await createOrGetConnection();

await syncSubscriptionsWithActiveState({ con, users });

console.log('finished sync');

process.exit(0);
};

func();
Loading
Loading