-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: cio optimization #2550
Open
sshanzel
wants to merge
49
commits into
main
Choose a base branch
from
MI-691
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
feat: cio optimization #2550
Changes from 47 commits
Commits
Show all changes
49 commits
Select commit
Hold shift + click to select a range
717fbc3
feat: cio optimization
sshanzel 92971da
feat: retry policy
sshanzel bd96bc1
refactor: function name
sshanzel 3b66822
feat: bq query
sshanzel b9048d8
feat: query from bq
sshanzel a9adb02
test: cron job
sshanzel f2d68b0
chore: prepare cron job
sshanzel bb510ff
chore: no registered test
sshanzel 6af995b
fix: missing await
sshanzel 0c0e4a3
refactor: ordering of variables
sshanzel a1c8939
Merge branch 'main' into MI-691
sshanzel 4df7881
refactor: code cleanup
sshanzel 8073118
refactor: better naming
sshanzel 17dba79
fix: going through missed days
sshanzel 41e7960
chore: finished todo
sshanzel a6dfe3f
fix: date minus 1
sshanzel ef1308f
chore: updated query based on dave
sshanzel a7cab63
revert: application properties
sshanzel b833be6
fix: edge cases on success and failure
sshanzel 3e71803
refactor: to change object
sshanzel 4ba9ff6
refactor: avoid nested looping
sshanzel 9c5c00b
refactor: run the downgrade within a single query
sshanzel 28c3684
fix: import issues
sshanzel c56070e
refactor: reusable function
sshanzel d68a90c
refactor: unneeded columns
sshanzel 383182c
chore: better log message
sshanzel 5a6de1f
fix: removal of personalized digest
sshanzel 494142e
fix: remove digest
sshanzel 65d54fa
fix: unnecessary comment
sshanzel b775005
fix: bq on cron
sshanzel 9735a79
fix: limitations
sshanzel f19be40
fix: only required columns
sshanzel 4802085
fix: test
sshanzel 2da1eaa
chore: removal of console logs
sshanzel 280f250
feat: bin script for initial rollout
sshanzel b7e6ddf
fix: test shape
sshanzel 9fc0d67
Merge branch 'main' into MI-691
sshanzel c6c678b
test: import mock
sshanzel 2db4020
refactor: worker to work with initial run
sshanzel d0ac684
chore: ignore column
sshanzel ec456da
fix: missing transaction
sshanzel 772d21c
refactor: unnecessary limit
sshanzel 9d7c6f4
Merge branch 'main' into MI-691
sshanzel dc0aa64
feat: initial rollout through csv
sshanzel ac4b96e
chore: minor code cleanups
sshanzel f4f036c
chore: uncomment ignoring of cio registered
sshanzel 6cdf31c
chore: unnecessary csv file
sshanzel 47a6b42
fix: condition to not process anything related to cio registered
sshanzel 279773e
Merge branch 'main' into MI-691
sshanzel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
import { crons } from '../../src/cron/index'; | ||
import cron from '../../src/cron/validateActiveUsers'; | ||
import { expectSuccessfulCron, saveFixtures } from '../helpers'; | ||
import * as gcp from '../../src/common/googleCloud'; | ||
import * as cioModule from '../../src/cio'; | ||
import { DataSource, In, JsonContains } from 'typeorm'; | ||
import createOrGetConnection from '../../src/db'; | ||
import { | ||
User, | ||
UserPersonalizedDigest, | ||
UserPersonalizedDigestSendType, | ||
} from '../../src/entity'; | ||
import { badUsersFixture, plusUsersFixture, usersFixture } from '../fixture'; | ||
import { updateFlagsStatement } from '../../src/common'; | ||
import { ioRedisPool } from '../../src/redis'; | ||
|
||
let con: DataSource; | ||
|
||
beforeEach(async () => { | ||
con = await createOrGetConnection(); | ||
await ioRedisPool.execute((client) => client.flushall()); | ||
jest.clearAllMocks(); | ||
jest.resetModules(); | ||
await saveFixtures(con, User, [ | ||
...usersFixture, | ||
...plusUsersFixture, | ||
...badUsersFixture, | ||
]); | ||
}); | ||
|
||
describe('validateActiveUsers', () => { | ||
beforeEach(async () => { | ||
await saveFixtures(con, User, usersFixture); | ||
}); | ||
|
||
it('should NOT be registered yet', () => { | ||
const registeredWorker = crons.find((item) => item.name === cron.name); | ||
|
||
expect(registeredWorker).toBeDefined(); | ||
}); | ||
}); | ||
|
||
describe('users for downgrade', () => { | ||
it('should not do anything if users do not have digest subscription', async () => { | ||
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ | ||
reactivateUsers: [], | ||
inactiveUsers: [], | ||
downgradeUsers: ['4', '1'], | ||
}); | ||
|
||
await con.getRepository(UserPersonalizedDigest).delete({}); | ||
await expectSuccessfulCron(cron); | ||
|
||
const digests = await con.getRepository(UserPersonalizedDigest).find(); | ||
expect(digests.length).toEqual(0); | ||
}); | ||
|
||
it('should downgrade daily digest to weekly digest', async () => { | ||
const downgradeUsers = ['4', '1']; | ||
|
||
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ | ||
reactivateUsers: [], | ||
inactiveUsers: [], | ||
downgradeUsers, | ||
}); | ||
|
||
await con.getRepository(UserPersonalizedDigest).update( | ||
{}, | ||
{ | ||
preferredDay: 1, | ||
preferredHour: 4, | ||
flags: updateFlagsStatement({ | ||
digestSendType: UserPersonalizedDigestSendType.workdays, | ||
}), | ||
}, | ||
); | ||
await expectSuccessfulCron(cron); | ||
|
||
const digests = await con.getRepository(UserPersonalizedDigest).find({ | ||
where: { | ||
flags: JsonContains({ | ||
digestSendType: UserPersonalizedDigestSendType.weekly, | ||
}), | ||
}, | ||
}); | ||
const downgradedOnly = digests.every( | ||
({ userId, preferredDay, preferredHour }) => | ||
downgradeUsers.includes(userId) && | ||
preferredDay === 3 && | ||
preferredHour === 9, | ||
); | ||
expect(downgradedOnly).toEqual(true); | ||
}); | ||
}); | ||
|
||
describe('users for removal', () => { | ||
it('should not do anything if users are removed to CIO already', async () => { | ||
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ | ||
reactivateUsers: ['1', '2'], | ||
inactiveUsers: ['3', '5'], | ||
downgradeUsers: ['4'], | ||
}); | ||
|
||
const postSpy = jest | ||
.spyOn(cioModule.cioV2.request, 'post') | ||
.mockResolvedValue({}); | ||
|
||
await con | ||
.getRepository(User) | ||
.update({ id: In(['3', '5']) }, { cioRegistered: false }); | ||
|
||
await expectSuccessfulCron(cron); | ||
|
||
expect(postSpy).not.toHaveBeenCalled(); | ||
}); | ||
|
||
it('should send removal to cio', async () => { | ||
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ | ||
reactivateUsers: ['1', '2'], | ||
inactiveUsers: ['3', '5', 'vordr'], | ||
downgradeUsers: ['4'], | ||
}); | ||
|
||
const postSpy = jest | ||
.spyOn(cioModule.cioV2.request, 'post') | ||
.mockResolvedValue({}); | ||
|
||
await con | ||
.getRepository(User) | ||
.update({ id: In(['vordr']) }, { cioRegistered: false }); | ||
const digests = await con.getRepository(UserPersonalizedDigest).count(); | ||
expect(digests).toBeGreaterThan(0); | ||
|
||
await expectSuccessfulCron(cron); | ||
|
||
const batch = [ | ||
{ | ||
action: 'destroy', | ||
type: 'person', | ||
identifiers: { id: '3' }, | ||
}, | ||
{ | ||
action: 'destroy', | ||
type: 'person', | ||
identifiers: { id: '5' }, | ||
}, | ||
]; | ||
|
||
expect(postSpy).toHaveBeenCalledWith('/users', { batch }); | ||
|
||
const fromRemovalOnly = postSpy.mock.calls[0][1].batch.every( | ||
({ identifiers }) => ['3', '5'].includes(identifiers.id), | ||
); | ||
expect(fromRemovalOnly).toBeTruthy(); | ||
|
||
const unRegisteredOnly = postSpy.mock.calls[0][1].batch.every( | ||
({ identifiers }) => !['vordr'].includes(identifiers.id), | ||
); | ||
expect(unRegisteredOnly).toBeTruthy(); | ||
|
||
const unregistered = await con | ||
.getRepository(User) | ||
.findOne({ select: ['cioRegistered'], where: { id: '3' } }); | ||
expect(unregistered.cioRegistered).toEqual(false); | ||
|
||
const removed = await con.getRepository(UserPersonalizedDigest).count(); | ||
expect(removed).toBeLessThan(digests); | ||
}); | ||
}); | ||
|
||
describe('users for reactivation', () => { | ||
it('should not do anything if users are registered to CIO already', async () => { | ||
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ | ||
reactivateUsers: ['1', '2'], | ||
inactiveUsers: ['3'], | ||
downgradeUsers: ['4'], | ||
}); | ||
const postSpy = jest | ||
.spyOn(cioModule.cioV2.request, 'post') | ||
.mockResolvedValue({}); | ||
|
||
// to stop running removal of `inactiveUsers` | ||
await con.getRepository(User).update({ id: '3' }, { cioRegistered: false }); | ||
|
||
// default value for `cioRegistered` is true | ||
await expectSuccessfulCron(cron); | ||
|
||
expect(postSpy).not.toHaveBeenCalled(); | ||
}); | ||
|
||
it('should send reactivation to cio', async () => { | ||
jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ | ||
reactivateUsers: ['1', '2'], | ||
inactiveUsers: ['3'], | ||
downgradeUsers: ['4'], | ||
}); | ||
|
||
const postSpy = jest | ||
.spyOn(cioModule.cioV2.request, 'post') | ||
.mockResolvedValue({}); | ||
|
||
// to stop running removal of `inactiveUsers` | ||
await con | ||
.getRepository(User) | ||
.update({ id: In(['3', '1']) }, { cioRegistered: false }); | ||
|
||
// default value for `cioRegistered` is true | ||
await expectSuccessfulCron(cron); | ||
|
||
const batch = [ | ||
{ | ||
action: 'identify', | ||
type: 'person', | ||
identifiers: { id: '1' }, | ||
attributes: { | ||
accepted_marketing: false, | ||
'cio_subscription_preferences.topics.topic_4': false, | ||
'cio_subscription_preferences.topics.topic_7': true, | ||
'cio_subscription_preferences.topics.topic_8': true, | ||
'cio_subscription_preferences.topics.topic_9': true, | ||
created_at: 1656427727, | ||
first_name: 'Ido', | ||
name: 'Ido', | ||
permalink: 'http://localhost:5002/idoshamun', | ||
referral_link: 'http://localhost:5002/join?cid=generic&userid=1', | ||
updated_at: undefined, | ||
username: 'idoshamun', | ||
}, | ||
}, | ||
]; | ||
|
||
expect(postSpy).toHaveBeenCalledWith('/users', { batch }); | ||
|
||
const fromReactivateUserOnly = postSpy.mock.calls[0][1].batch.every( | ||
({ identifiers }) => ['1'].includes(identifiers.id), | ||
); | ||
expect(fromReactivateUserOnly).toBeTruthy(); | ||
|
||
const registeredOnly = postSpy.mock.calls[0][1].batch.every( | ||
({ identifiers }) => identifiers.id !== '3', | ||
); | ||
expect(registeredOnly).toBeTruthy(); | ||
|
||
const reactivated = await con | ||
.getRepository(User) | ||
.findOne({ select: ['cioRegistered'], where: { id: '1' } }); | ||
expect(reactivated.cioRegistered).toEqual(true); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import createOrGetConnection from '../src/db'; | ||
import fs from 'fs'; | ||
import { parse } from 'csv-parse'; | ||
import { syncSubscriptionsWithActiveState } from '../src/common'; | ||
import { | ||
GetUsersActiveState, | ||
UserActiveState, | ||
} from '../src/common/googleCloud'; | ||
|
||
const func = async () => { | ||
const csvFilePath = process.argv[2]; | ||
|
||
if (!csvFilePath) { | ||
throw new Error('CSV file path is required'); | ||
} | ||
|
||
const users: GetUsersActiveState = { | ||
inactiveUsers: [], | ||
downgradeUsers: [], | ||
reactivateUsers: [], | ||
}; | ||
|
||
const stream = fs | ||
.createReadStream(csvFilePath) | ||
.pipe(parse({ delimiter: ',', from_line: 2 })); | ||
|
||
stream.on('error', (err) => { | ||
console.error('failed to read file: ', err.message); | ||
}); | ||
|
||
stream.on('data', function ([id, rawStatus]) { | ||
if (!id || !rawStatus) { | ||
return; | ||
} | ||
|
||
const status = rawStatus.toString() as UserActiveState; | ||
|
||
if (status === UserActiveState.InactiveSince6wAgo) { | ||
users.downgradeUsers.push(id); | ||
} | ||
|
||
if ( | ||
status === UserActiveState.InactiveSince12wAgo || | ||
status === UserActiveState.NeverActive | ||
) { | ||
users.inactiveUsers.push(id); | ||
} | ||
}); | ||
|
||
await new Promise((resolve) => { | ||
stream.on('end', resolve); | ||
}); | ||
|
||
console.log('running cron sync function'); | ||
|
||
const con = await createOrGetConnection(); | ||
|
||
await syncSubscriptionsWithActiveState({ con, users }); | ||
|
||
console.log('finished sync'); | ||
|
||
process.exit(0); | ||
}; | ||
|
||
func(); |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice: these are the only attributes we are now sending to CIO for reactivation. Feel free to comment if it is missing any other properties.