diff --git a/.infra/application.properties b/.infra/application.properties index e10113c77..67b5c853d 100644 --- a/.infra/application.properties +++ b/.infra/application.properties @@ -27,5 +27,5 @@ debezium.transforms.ReadOperationFilter.language=jsr223.groovy debezium.transforms.ReadOperationFilter.condition=!(valueSchema.field('op') && value.op == 'r') debezium.transforms.PostsFilter.type=io.debezium.transforms.Filter debezium.transforms.PostsFilter.language=jsr223.groovy -debezium.transforms.PostsFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'post' && value.before.views != value.after.views) +debezium.transforms.PostsFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'post' && value.before.views != value.after.views) && !(valueSchema.field('op') && value.op == 'u' && value.source.table == 'user' && value.before.cioRegistered != value.after.cioRegistered) debezium.sink.type=pubsub diff --git a/.infra/crons.ts b/.infra/crons.ts index 304299efa..65139b47f 100644 --- a/.infra/crons.ts +++ b/.infra/crons.ts @@ -59,6 +59,10 @@ export const crons: Cron[] = [ name: 'generic-referral-reminder', schedule: '12 3 * * *', }, + // { + // name: 'validate-active-users', + // schedule: '15 4 * * *', + // }, { name: 'update-source-tag-view', schedule: '20 3 * * 0', @@ -109,5 +113,5 @@ export const crons: Cron[] = [ { name: 'calculate-top-readers', schedule: '0 2 1 * *', - } + }, ]; diff --git a/__tests__/cron/validateActiveUsers.ts b/__tests__/cron/validateActiveUsers.ts new file mode 100644 index 000000000..c572a1b59 --- /dev/null +++ b/__tests__/cron/validateActiveUsers.ts @@ -0,0 +1,249 @@ +import { crons } from '../../src/cron/index'; +import cron from '../../src/cron/validateActiveUsers'; +import { expectSuccessfulCron, saveFixtures } from '../helpers'; +import * as gcp from '../../src/common/googleCloud'; +import * as cioModule from '../../src/cio'; +import { DataSource, In, JsonContains } from 'typeorm'; +import createOrGetConnection from '../../src/db'; +import { + User, + UserPersonalizedDigest, + UserPersonalizedDigestSendType, +} from '../../src/entity'; +import { badUsersFixture, plusUsersFixture, usersFixture } from '../fixture'; +import { updateFlagsStatement } from '../../src/common'; +import { ioRedisPool } from '../../src/redis'; + +let con: DataSource; + +beforeEach(async () => { + con = await createOrGetConnection(); + await ioRedisPool.execute((client) => client.flushall()); + jest.clearAllMocks(); + jest.resetModules(); + await saveFixtures(con, User, [ + ...usersFixture, + ...plusUsersFixture, + ...badUsersFixture, + ]); +}); + +describe('validateActiveUsers', () => { + beforeEach(async () => { + await saveFixtures(con, User, usersFixture); + }); + + it('should NOT be registered yet', () => { + const registeredWorker = crons.find((item) => item.name === cron.name); + + expect(registeredWorker).toBeDefined(); + }); +}); + +describe('users for downgrade', () => { + it('should not do anything if users do not have digest subscription', async () => { + jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ + reactivateUsers: [], + inactiveUsers: [], + downgradeUsers: ['4', '1'], + }); + + await con.getRepository(UserPersonalizedDigest).delete({}); + await expectSuccessfulCron(cron); + + const digests = await con.getRepository(UserPersonalizedDigest).find(); + expect(digests.length).toEqual(0); + }); + + it('should downgrade daily digest to weekly digest', async () => { + const downgradeUsers = ['4', '1']; + + jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ + reactivateUsers: [], + inactiveUsers: [], + downgradeUsers, + }); + + await con.getRepository(UserPersonalizedDigest).update( + {}, + { + preferredDay: 1, + preferredHour: 4, + flags: updateFlagsStatement({ + digestSendType: UserPersonalizedDigestSendType.workdays, + }), + }, + ); + await expectSuccessfulCron(cron); + + const digests = await con.getRepository(UserPersonalizedDigest).find({ + where: { + flags: JsonContains({ + digestSendType: UserPersonalizedDigestSendType.weekly, + }), + }, + }); + const downgradedOnly = digests.every( + ({ userId, preferredDay, preferredHour }) => + downgradeUsers.includes(userId) && + preferredDay === 3 && + preferredHour === 9, + ); + expect(downgradedOnly).toEqual(true); + }); +}); + +describe('users for removal', () => { + it('should not do anything if users are removed to CIO already', async () => { + jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ + reactivateUsers: ['1', '2'], + inactiveUsers: ['3', '5'], + downgradeUsers: ['4'], + }); + + const postSpy = jest + .spyOn(cioModule.cioV2.request, 'post') + .mockResolvedValue({}); + + await con + .getRepository(User) + .update({ id: In(['3', '5']) }, { cioRegistered: false }); + + await expectSuccessfulCron(cron); + + expect(postSpy).not.toHaveBeenCalled(); + }); + + it('should send removal to cio', async () => { + jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ + reactivateUsers: ['1', '2'], + inactiveUsers: ['3', '5', 'vordr'], + downgradeUsers: ['4'], + }); + + const postSpy = jest + .spyOn(cioModule.cioV2.request, 'post') + .mockResolvedValue({}); + + await con + .getRepository(User) + .update({ id: In(['vordr']) }, { cioRegistered: false }); + const digests = await con.getRepository(UserPersonalizedDigest).count(); + expect(digests).toBeGreaterThan(0); + + await expectSuccessfulCron(cron); + + const batch = [ + { + action: 'destroy', + type: 'person', + identifiers: { id: '3' }, + }, + { + action: 'destroy', + type: 'person', + identifiers: { id: '5' }, + }, + ]; + + expect(postSpy).toHaveBeenCalledWith('/users', { batch }); + + const fromRemovalOnly = postSpy.mock.calls[0][1].batch.every( + ({ identifiers }) => ['3', '5'].includes(identifiers.id), + ); + expect(fromRemovalOnly).toBeTruthy(); + + const unRegisteredOnly = postSpy.mock.calls[0][1].batch.every( + ({ identifiers }) => !['vordr'].includes(identifiers.id), + ); + expect(unRegisteredOnly).toBeTruthy(); + + const unregistered = await con + .getRepository(User) + .findOne({ select: ['cioRegistered'], where: { id: '3' } }); + expect(unregistered.cioRegistered).toEqual(false); + + const removed = await con.getRepository(UserPersonalizedDigest).count(); + expect(removed).toBeLessThan(digests); + }); +}); + +describe('users for reactivation', () => { + it('should not do anything if users are registered to CIO already', async () => { + jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ + reactivateUsers: ['1', '2'], + inactiveUsers: ['3'], + downgradeUsers: ['4'], + }); + const postSpy = jest + .spyOn(cioModule.cioV2.request, 'post') + .mockResolvedValue({}); + + // to stop running removal of `inactiveUsers` + await con.getRepository(User).update({ id: '3' }, { cioRegistered: false }); + + // default value for `cioRegistered` is true + await expectSuccessfulCron(cron); + + expect(postSpy).not.toHaveBeenCalled(); + }); + + it('should send reactivation to cio', async () => { + jest.spyOn(gcp, 'getUsersActiveState').mockResolvedValue({ + reactivateUsers: ['1', '2'], + inactiveUsers: ['3'], + downgradeUsers: ['4'], + }); + + const postSpy = jest + .spyOn(cioModule.cioV2.request, 'post') + .mockResolvedValue({}); + + // to stop running removal of `inactiveUsers` + await con + .getRepository(User) + .update({ id: In(['3', '1']) }, { cioRegistered: false }); + + // default value for `cioRegistered` is true + await expectSuccessfulCron(cron); + + const batch = [ + { + action: 'identify', + type: 'person', + identifiers: { id: '1' }, + attributes: { + accepted_marketing: false, + 'cio_subscription_preferences.topics.topic_4': false, + 'cio_subscription_preferences.topics.topic_7': true, + 'cio_subscription_preferences.topics.topic_8': true, + 'cio_subscription_preferences.topics.topic_9': true, + created_at: 1656427727, + first_name: 'Ido', + name: 'Ido', + permalink: 'http://localhost:5002/idoshamun', + referral_link: 'http://localhost:5002/join?cid=generic&userid=1', + updated_at: undefined, + username: 'idoshamun', + }, + }, + ]; + + expect(postSpy).toHaveBeenCalledWith('/users', { batch }); + + const fromReactivateUserOnly = postSpy.mock.calls[0][1].batch.every( + ({ identifiers }) => ['1'].includes(identifiers.id), + ); + expect(fromReactivateUserOnly).toBeTruthy(); + + const registeredOnly = postSpy.mock.calls[0][1].batch.every( + ({ identifiers }) => identifiers.id !== '3', + ); + expect(registeredOnly).toBeTruthy(); + + const reactivated = await con + .getRepository(User) + .findOne({ select: ['cioRegistered'], where: { id: '1' } }); + expect(reactivated.cioRegistered).toEqual(true); + }); +}); diff --git a/__tests__/workers/userCreatedCio.ts b/__tests__/workers/userCreatedCio.ts index 9f2e0dd2c..92758bab9 100644 --- a/__tests__/workers/userCreatedCio.ts +++ b/__tests__/workers/userCreatedCio.ts @@ -12,8 +12,8 @@ import { cio } from '../../src/cio'; import { typedWorkers } from '../../src/workers'; import mocked = jest.mocked; -jest.mock('../../src/common', () => ({ - ...jest.requireActual('../../src/common'), +jest.mock('../../src/common/links', () => ({ + ...jest.requireActual('../../src/common/links'), getShortGenericInviteLink: jest.fn(), })); diff --git a/__tests__/workers/userUpdatedCio.ts b/__tests__/workers/userUpdatedCio.ts index 1df28fbee..c038fe6ff 100644 --- a/__tests__/workers/userUpdatedCio.ts +++ b/__tests__/workers/userUpdatedCio.ts @@ -21,6 +21,11 @@ import { usersFixture } from '../fixture/user'; jest.mock('../../src/common', () => ({ ...jest.requireActual('../../src/common'), + resubscribeUser: jest.fn(), +})); + +jest.mock('../../src/common/links', () => ({ + ...jest.requireActual('../../src/common/links'), getShortGenericInviteLink: jest.fn(), resubscribeUser: jest.fn(), })); diff --git a/bin/cioSyncBasedOnActivity.ts b/bin/cioSyncBasedOnActivity.ts new file mode 100644 index 000000000..e71464100 --- /dev/null +++ b/bin/cioSyncBasedOnActivity.ts @@ -0,0 +1,65 @@ +import createOrGetConnection from '../src/db'; +import fs from 'fs'; +import { parse } from 'csv-parse'; +import { syncSubscriptionsWithActiveState } from '../src/common'; +import { + GetUsersActiveState, + UserActiveState, +} from '../src/common/googleCloud'; + +const func = async () => { + const csvFilePath = process.argv[2]; + + if (!csvFilePath) { + throw new Error('CSV file path is required'); + } + + const users: GetUsersActiveState = { + inactiveUsers: [], + downgradeUsers: [], + reactivateUsers: [], + }; + + const stream = fs + .createReadStream(csvFilePath) + .pipe(parse({ delimiter: ',', from_line: 2 })); + + stream.on('error', (err) => { + console.error('failed to read file: ', err.message); + }); + + stream.on('data', function ([id, rawStatus]) { + if (!id || !rawStatus) { + return; + } + + const status = rawStatus.toString() as UserActiveState; + + if (status === UserActiveState.InactiveSince6wAgo) { + users.downgradeUsers.push(id); + } + + if ( + status === UserActiveState.InactiveSince12wAgo || + status === UserActiveState.NeverActive + ) { + users.inactiveUsers.push(id); + } + }); + + await new Promise((resolve) => { + stream.on('end', resolve); + }); + + console.log('running cron sync function'); + + const con = await createOrGetConnection(); + + await syncSubscriptionsWithActiveState({ con, users }); + + console.log('finished sync'); + + process.exit(0); +}; + +func(); diff --git a/package-lock.json b/package-lock.json index 912101b9a..545f1cc59 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,6 +20,7 @@ "@fastify/http-proxy": "^10.0.1", "@fastify/rate-limit": "^10.1.1", "@fastify/websocket": "^11.0.1", + "@google-cloud/bigquery": "^7.9.1", "@google-cloud/opentelemetry-resource-util": "^2.4.0", "@google-cloud/pubsub": "^4.8.0", "@google-cloud/storage": "^7.13.0", @@ -1328,6 +1329,58 @@ "ws": "^8.16.0" } }, + "node_modules/@google-cloud/bigquery": { + "version": "7.9.1", + "resolved": "https://registry.npmjs.org/@google-cloud/bigquery/-/bigquery-7.9.1.tgz", + "integrity": "sha512-ZkcRMpBoFLxIh6TiQBywA22yT3c2j0f07AHWEMjtYqMQzZQbFrpxuJU2COp3tyjZ91ZIGHe4gY7/dGZL88cltg==", + "dependencies": { + "@google-cloud/common": "^5.0.0", + "@google-cloud/paginator": "^5.0.2", + "@google-cloud/precise-date": "^4.0.0", + "@google-cloud/promisify": "^4.0.0", + "arrify": "^2.0.1", + "big.js": "^6.0.0", + "duplexify": "^4.0.0", + "extend": "^3.0.2", + "is": "^3.3.0", + "stream-events": "^1.0.5", + "uuid": "^9.0.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@google-cloud/bigquery/node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, + "node_modules/@google-cloud/common": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/@google-cloud/common/-/common-5.0.2.tgz", + "integrity": "sha512-V7bmBKYQyu0eVG2BFejuUjlBt+zrya6vtsKdY+JxMM/dNntPF41vZ9+LhOshEUH01zOHEqBSvI7Dad7ZS6aUeA==", + "dependencies": { + "@google-cloud/projectify": "^4.0.0", + "@google-cloud/promisify": "^4.0.0", + "arrify": "^2.0.1", + "duplexify": "^4.1.1", + "extend": "^3.0.2", + "google-auth-library": "^9.0.0", + "html-entities": "^2.5.2", + "retry-request": "^7.0.0", + "teeny-request": "^9.0.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/@google-cloud/opentelemetry-resource-util": { "version": "2.4.0", "resolved": "https://registry.npmjs.org/@google-cloud/opentelemetry-resource-util/-/opentelemetry-resource-util-2.4.0.tgz", @@ -1344,9 +1397,9 @@ } }, "node_modules/@google-cloud/paginator": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/@google-cloud/paginator/-/paginator-5.0.0.tgz", - "integrity": "sha512-87aeg6QQcEPxGCOthnpUjvw4xAZ57G7pL8FS0C4e/81fr3FjkpUpibf1s2v5XGyGhUVGF4Jfg7yEcxqn2iUw1w==", + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/@google-cloud/paginator/-/paginator-5.0.2.tgz", + "integrity": "sha512-DJS3s0OVH4zFDB1PzjxAsHqJT6sKVbRwwML0ZBP9PbU7Yebtu/7SWMRzvO2J3nUi9pRNITCfu4LJeooM2w4pjg==", "dependencies": { "arrify": "^2.0.0", "extend": "^3.0.2" @@ -4769,6 +4822,18 @@ "node": ">=0.6" } }, + "node_modules/big.js": { + "version": "6.2.2", + "resolved": "https://registry.npmjs.org/big.js/-/big.js-6.2.2.tgz", + "integrity": "sha512-y/ie+Faknx7sZA5MfGA2xKlu0GDv8RWrXGsmlteyJQ2lvoKv9GBK/fpRMc2qlSoBAgNxrixICFCBefIq8WCQpQ==", + "engines": { + "node": "*" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/bigjs" + } + }, "node_modules/bignumber.js": { "version": "9.1.2", "resolved": "https://registry.npmjs.org/bignumber.js/-/bignumber.js-9.1.2.tgz", @@ -7714,6 +7779,14 @@ "node": ">= 0.10" } }, + "node_modules/is": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/is/-/is-3.3.0.tgz", + "integrity": "sha512-nW24QBoPcFGGHJGUwnfpI7Yc5CdqWNdsyHQszVE/z2pKHXzh7FZ5GWhJqSyaQ9wMkQnsTx+kAI8bHlCX4tKdbg==", + "engines": { + "node": "*" + } + }, "node_modules/is-arrayish": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.2.1.tgz", diff --git a/package.json b/package.json index 9cc3bf042..34c373a32 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "@fastify/http-proxy": "^10.0.1", "@fastify/rate-limit": "^10.1.1", "@fastify/websocket": "^11.0.1", + "@google-cloud/bigquery": "^7.9.1", "@google-cloud/opentelemetry-resource-util": "^2.4.0", "@google-cloud/pubsub": "^4.8.0", "@google-cloud/storage": "^7.13.0", diff --git a/src/cio.ts b/src/cio.ts index 9b657b484..510f9b44b 100644 --- a/src/cio.ts +++ b/src/cio.ts @@ -1,18 +1,15 @@ import { CustomerIORequestError, TrackClient } from 'customerio-node'; import { ChangeObject } from './types'; import { + ConnectionManager, User, UserPersonalizedDigest, UserPersonalizedDigestType, UserStreak, } from './entity'; -import { - camelCaseToSnakeCase, - CioUnsubscribeTopic, - debeziumTimeToDate, - getFirstName, - getShortGenericInviteLink, -} from './common'; +import { camelCaseToSnakeCase, getDateBaseFromType } from './common/utils'; +import { CioUnsubscribeTopic, getFirstName } from './common/mailing'; +import { getShortGenericInviteLink } from './common/links'; import type { UserCompany } from './entity/UserCompany'; import type { Company } from './entity/Company'; import { DataSource } from 'typeorm'; @@ -52,6 +49,16 @@ const OMIT_FIELDS: (keyof ChangeObject)[] = [ 'followNotifications', ]; +export const CIO_REQUIRED_FIELDS: (keyof ChangeObject)[] = [ + 'username', + 'name', + 'createdAt', + 'updatedAt', + 'notificationEmail', + 'acceptedMarketing', + 'followingEmail', +]; + export async function identifyUserStreak({ cio, data, @@ -89,15 +96,25 @@ export async function identifyUserStreak({ } } -export async function identifyUser({ - con, - cio, - user, -}: { - con: DataSource; - cio: TrackClient; - user: ChangeObject; -}): Promise { +export const generateIdentifyObject = async ( + con: ConnectionManager, + user: ChangeObject, +) => { + const { id } = user; + const identify = await getIdentifyAttributes(con, user); + + return { + action: 'identify', + type: 'person', + identifiers: { id }, + attributes: identify, + }; +}; + +export const getIdentifyAttributes = async ( + con: ConnectionManager, + user: ChangeObject, +) => { const dup = { ...user }; const id = dup.id; for (const field of OMIT_FIELDS) { @@ -115,24 +132,38 @@ export async function identifyUser({ }), ]); + return { + ...camelCaseToSnakeCase(dup), + first_name: getFirstName(dup.name), + created_at: dateToCioTimestamp(getDateBaseFromType(dup.createdAt)), + updated_at: dup.updatedAt + ? dateToCioTimestamp(getDateBaseFromType(dup.updatedAt)) + : undefined, + referral_link: genericInviteURL, + [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Marketing}`]: + user.acceptedMarketing, + [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Notifications}`]: + user.notificationEmail, + [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Digest}`]: + !!personalizedDigest, + [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Follow}`]: + user.followingEmail, + }; +}; + +export async function identifyUser({ + con, + cio, + user, +}: { + con: DataSource; + cio: TrackClient; + user: ChangeObject; +}): Promise { + const data = await getIdentifyAttributes(con, user); + try { - await cio.identify(id, { - ...camelCaseToSnakeCase(dup), - first_name: getFirstName(dup.name), - created_at: dateToCioTimestamp(debeziumTimeToDate(dup.createdAt)), - updated_at: dup.updatedAt - ? dateToCioTimestamp(debeziumTimeToDate(dup.updatedAt)) - : undefined, - referral_link: genericInviteURL, - [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Marketing}`]: - user.acceptedMarketing, - [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Notifications}`]: - user.notificationEmail, - [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Digest}`]: - !!personalizedDigest, - [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Follow}`]: - user.followingEmail, - }); + await cio.identify(user.id, data); } catch (err) { if (err instanceof CustomerIORequestError && err.statusCode === 400) { logger.warn({ err, user }, 'failed to update user in cio'); diff --git a/src/common/async.ts b/src/common/async.ts new file mode 100644 index 000000000..67f919bdd --- /dev/null +++ b/src/common/async.ts @@ -0,0 +1,79 @@ +import { + circuitBreaker, + ConsecutiveBreaker, + ExponentialBackoff, + handleAll, + IFailureEvent, + ISuccessEvent, + retry, + wrap, +} from 'cockatiel'; + +const DEFAULT_BATCH_LIMIT = 40_000; // postgresql params limit is around 65k, to be safe, let's run at 40k. + +type ForceStop = boolean; + +interface BlockingBatchRunnerOptions { + data: T[]; + batchLimit?: number; + runner: (current: T[]) => Promise; +} + +export const blockingBatchRunner = async ({ + batchLimit = DEFAULT_BATCH_LIMIT, + data, + runner, +}: BlockingBatchRunnerOptions) => { + for (let i = 0; i < data.length; i += batchLimit) { + const current = data.slice(i, i + batchLimit); + + if (current.length === 0) { + break; + } + + const shouldStop = await runner(current); + if (shouldStop) { + break; + } + } +}; + +interface CallWithRetryDefaultProps { + callback: () => Promise; + onSuccess?: (data: ISuccessEvent) => void; + onFailure?: (err: IFailureEvent) => void; +} + +export const callWithRetryDefault = ({ + callback, + onFailure, + onSuccess, +}: CallWithRetryDefaultProps) => { + // Create a retry policy that'll try whatever function we execute 3 + // times with a randomized exponential backoff. + const retryPolicy = retry(handleAll, { + maxAttempts: 3, + backoff: new ExponentialBackoff(), + }); + + if (onFailure) { + retryPolicy.onFailure(onFailure); + } + + if (onSuccess) { + retryPolicy.onSuccess(onSuccess); + } + + // Create a circuit breaker that'll stop calling the executed function for 10 + // seconds if it fails 5 times in a row. This can give time for e.g. a database + // to recover without getting tons of traffic. + const circuitBreakerPolicy = circuitBreaker(handleAll, { + halfOpenAfter: 10 * 1000, + breaker: new ConsecutiveBreaker(5), + }); + + // Combine these! Create a policy that retries 3 times, calling through the circuit breaker + const retryWithBreaker = wrap(retryPolicy, circuitBreakerPolicy); + + return retryWithBreaker.execute(callback); +}; diff --git a/src/common/constants.ts b/src/common/constants.ts index 74083dbd3..b3b369051 100644 --- a/src/common/constants.ts +++ b/src/common/constants.ts @@ -5,4 +5,6 @@ export const ONE_WEEK_IN_SECONDS = ONE_DAY_IN_SECONDS * 7; export const MAX_FOLLOWERS_LIMIT = 5_000; +export const SUCCESSFUL_CIO_SYNC_DATE = 'successful_cio_sync_date'; + export const customFeedsPlusDate = new Date('2024-12-11'); diff --git a/src/common/googleCloud.ts b/src/common/googleCloud.ts index 3ee525459..b86a49e55 100644 --- a/src/common/googleCloud.ts +++ b/src/common/googleCloud.ts @@ -1,6 +1,9 @@ import { Storage, DownloadOptions } from '@google-cloud/storage'; import { PropsParameters } from '../types'; import path from 'path'; +import { BigQuery } from '@google-cloud/bigquery'; +import { Query } from '@google-cloud/bigquery/build/src/bigquery'; +import { subDays } from 'date-fns'; export const downloadFile = async ({ url, @@ -29,3 +32,119 @@ export const downloadJsonFile = async ({ return JSON.parse(result); }; + +export enum UserActiveState { + Active = '1', + InactiveSince6wAgo = '2', + InactiveSince12wAgo = '3', + NeverActive = '4', +} + +export const userActiveStateQuery = ` + with d as ( + select u.primary_user_id, + min(last_app_timestamp) as last_app_timestamp, + min(registration_timestamp) as registration_timestamp, + min( + case + when period_end is null then '4' + when period_end between date(@previous_date - interval 6*7 day) and @previous_date then '1' + when period_end between date(@previous_date - interval 12*7 day) and date(@previous_date - interval 6*7 + 1 day) then '2' + when date(u.last_app_timestamp) < date(@previous_date - interval 12*7 day) then '3' + when date(u.registration_timestamp) < date(@previous_date - interval 12*7 day) then '3' + else '4' end + ) as previous_state, + min( + case + when period_end is null then '4' + when period_end between date(@run_date - interval 6*7 day) and @run_date then '1' + when period_end between date(@run_date - interval 12*7 day) and date(@run_date - interval 6*7 + 1 day) then '2' + when date(u.last_app_timestamp) < date(@run_date - interval 12*7 day) then '3' + when date(u.registration_timestamp) < date(@run_date - interval 12*7 day) then '3' + else '4' end + ) as current_state, + from analytics.user as u + left join analytics.user_state_sparse as uss on uss.primary_user_id = u.primary_user_id + and uss.period_end between date(@previous_date - interval 12* 7 day) and @run_date + and uss.period = 'daily' + and uss.app_active_state = 'active' + and uss.registration_state = 'registered' + where u.registration_timestamp is not null + and date(u.registration_timestamp) < @run_date + group by 1 + ) + select * + from d + where current_state != previous_state + and previous_state != '4' +`; + +export const getUserActiveStateQuery = ( + runDate: Date, + query = userActiveStateQuery, +): Query => { + const run_date = runDate.toISOString().split('T')[0]; + const previous_date = subDays(runDate, 1).toISOString().split('T')[0]; + + return { query, params: { previous_date, run_date } }; +}; + +export interface GetUsersActiveState { + inactiveUsers: string[]; + downgradeUsers: string[]; + reactivateUsers: string[]; +} + +export interface UserActiveStateData { + current_state: UserActiveState; + previous_state: UserActiveState; + primary_user_id: string; +} + +const bigquery = new BigQuery(); + +export const queryFromBq = async ( + query: Query, +): Promise => { + const [job] = await bigquery.createQueryJob(query); + const [rows] = await job.getQueryResults(); + + return rows; +}; + +export const sortUsersActiveState = (users: UserActiveStateData[]) => { + const inactiveUsers: string[] = []; + const downgradeUsers: string[] = []; + const reactivateUsers: string[] = []; + + // sort users from bq into active, inactive, downgrade, and reactivate + for (const user of users) { + if ( + user.current_state === UserActiveState.InactiveSince6wAgo && + user.previous_state === UserActiveState.Active + ) { + downgradeUsers.push(user.primary_user_id); + } else if ( + user.current_state === UserActiveState.Active && + user.previous_state !== UserActiveState.Active + ) { + reactivateUsers.push(user.primary_user_id); + } else if ( + user.current_state === UserActiveState.InactiveSince12wAgo && + user.previous_state !== UserActiveState.InactiveSince12wAgo + ) { + inactiveUsers.push(user.primary_user_id); + } + } + + return { inactiveUsers, downgradeUsers, reactivateUsers }; +}; + +export const getUsersActiveState = async ( + runDate: Date, +): Promise => { + const query = getUserActiveStateQuery(runDate); + const usersFromBq = await queryFromBq(query); + + return sortUsersActiveState(usersFromBq); +}; diff --git a/src/common/mailing.ts b/src/common/mailing.ts index d97f5257e..4db52b1c3 100644 --- a/src/common/mailing.ts +++ b/src/common/mailing.ts @@ -7,13 +7,20 @@ import { import CIORequest from 'customerio-node/dist/lib/request'; import { SendEmailRequestOptionalOptions } from 'customerio-node/lib/api/requests'; import { SendEmailRequestWithTemplate } from 'customerio-node/dist/lib/api/requests'; -import { DataSource } from 'typeorm'; +import { DataSource, In, Raw } from 'typeorm'; import { Source, User, UserPersonalizedDigest, + UserPersonalizedDigestSendType, UserPersonalizedDigestType, } from '../entity'; +import { blockingBatchRunner, callWithRetryDefault } from './async'; +import { CIO_REQUIRED_FIELDS, cioV2, generateIdentifyObject } from '../cio'; +import { setTimeout } from 'node:timers/promises'; +import { toChangeObject, updateFlagsStatement } from './utils'; +import { GetUsersActiveState } from './googleCloud'; +import { logger } from '../logger'; export enum CioUnsubscribeTopic { Marketing = '4', @@ -171,3 +178,133 @@ export const addPrivateSourceJoinParams = ({ return urlObj.toString(); }; + +const ITEMS_PER_DESTROY = 4000; +const ITEMS_PER_IDENTIFY = 250; + +interface SyncSubscriptionsWithActiveStateProps { + con: DataSource; + users: GetUsersActiveState; +} + +interface SyncSubscriptionsWithActiveState { + hasAnyFailed: boolean; +} + +export const syncSubscriptionsWithActiveState = async ({ + con, + users: { inactiveUsers, downgradeUsers, reactivateUsers }, +}: SyncSubscriptionsWithActiveStateProps): Promise => { + let hasAnyFailed = false; + + // user is active again: reactivate to CIO + await blockingBatchRunner({ + batchLimit: ITEMS_PER_IDENTIFY, + data: reactivateUsers, + runner: async (batch) => { + const users = await con.getRepository(User).find({ + where: { id: In(batch), cioRegistered: false }, + select: CIO_REQUIRED_FIELDS.concat('id'), + }); + + if (users.length === 0) { + return true; + } + + const data = await Promise.all( + users.map((user) => generateIdentifyObject(con, toChangeObject(user))), + ); + + await callWithRetryDefault({ + callback: () => cioV2.request.post('/users', { batch: data }), + onSuccess: async () => { + const ids = users.map(({ id }) => id); + await con + .getRepository(User) + .update({ id: In(ids) }, { cioRegistered: true }); + }, + onFailure: (err) => { + hasAnyFailed = true; + logger.info({ err }, 'Failed to reactivate users to CIO'); + }, + }); + + await setTimeout(20); // wait for a bit to avoid rate limiting + }, + }); + + // inactive for 12 weeks: remove from CIO + await blockingBatchRunner({ + batchLimit: ITEMS_PER_DESTROY, + data: inactiveUsers, + runner: async (batch) => { + const users = await con.getRepository(User).find({ + select: ['id'], + where: { id: In(batch), cioRegistered: true }, + }); + + if (users.length === 0) { + return true; + } + + const data = users.map(({ id }) => ({ + action: 'destroy', + type: 'person', + identifiers: { id }, + })); + + await callWithRetryDefault({ + callback: () => cioV2.request.post('/users', { batch: data }), + onSuccess: async () => { + const ids = users.map(({ id }) => id); + + await con.transaction(async (manager) => { + await Promise.all([ + manager.getRepository(User).update( + { id: In(ids) }, + { + cioRegistered: false, + acceptedMarketing: false, + followingEmail: false, + notificationEmail: false, + }, + ), + manager + .getRepository(UserPersonalizedDigest) + .delete({ userId: In(ids) }), + ]); + }); + }, + onFailure: (err) => { + hasAnyFailed = true; + logger.info({ err }, 'Failed to remove users from CIO'); + }, + }); + + await setTimeout(20); // wait for a bit to avoid rate limiting + }, + }); + + // inactive for 6 weeks: downgrade from daily to weekly digest + await blockingBatchRunner({ + data: downgradeUsers, + runner: async (current) => { + // set digest to weekly on Wednesday 9am + await con.getRepository(UserPersonalizedDigest).update( + { + userId: In(current), + flags: Raw(() => `flags->>'sendType' = 'daily'`), + }, + { + preferredDay: 3, + preferredHour: 9, + flags: updateFlagsStatement({ + sendType: UserPersonalizedDigestSendType.weekly, + }), + }, + ); + }, + }); + + return { hasAnyFailed }; +}; diff --git a/src/common/users.ts b/src/common/users.ts index 0a0180e72..f5893578e 100644 --- a/src/common/users.ts +++ b/src/common/users.ts @@ -557,7 +557,10 @@ export enum LogoutReason { KratosSessionAlreadyAvailable = 'kratos session already available', } -const getAbsoluteDifferenceInDays: typeof differenceInDays = (date1, date2) => { +export const getAbsoluteDifferenceInDays: typeof differenceInDays = ( + date1, + date2, +) => { const day1 = startOfDay(date1); const day2 = startOfDay(date2); diff --git a/src/common/utils.ts b/src/common/utils.ts index 3f6908620..405182bf8 100644 --- a/src/common/utils.ts +++ b/src/common/utils.ts @@ -3,6 +3,7 @@ import { zonedTimeToUtc } from 'date-fns-tz'; import { snakeCase } from 'lodash'; import { isNullOrUndefined } from './object'; import { remoteConfig } from '../remoteConfig'; +import { ChangeObject } from '../types'; const REMOVE_SPECIAL_CHARACTERS_REGEX = /[^a-zA-Z0-9-_#.]/g; @@ -138,6 +139,9 @@ export const toGQLEnum = (value: Record, name: string) => { return `enum ${name} { ${Object.values(value).join(' ')} }`; }; +export const toChangeObject = (entity: T): ChangeObject => + JSON.parse(Buffer.from(JSON.stringify(entity)).toString('utf-8').trim()); + export function camelCaseToSnakeCase( obj: Record, ): Record { @@ -152,6 +156,14 @@ export function debeziumTimeToDate(time: number): Date { return new Date(Math.floor(time / 1000)); } +export const getDateBaseFromType = (value: number | string | Date) => { + if (typeof value === 'number') { + return debeziumTimeToDate(value); + } + + return new Date(value); +}; + export const safeJSONParse = (json: string): T | undefined => { try { return JSON.parse(json); diff --git a/src/cron/index.ts b/src/cron/index.ts index ee48180d3..e0a270f69 100644 --- a/src/cron/index.ts +++ b/src/cron/index.ts @@ -13,6 +13,7 @@ import updateHighlightedViews from './updateHighlightedViews'; import hourlyNotifications from './hourlyNotifications'; import updateCurrentStreak from './updateCurrentStreak'; import syncSubscriptionWithCIO from './syncSubscriptionWithCIO'; +import validateActiveUsers from './validateActiveUsers'; import { updateSourcePublicThreshold } from './updateSourcePublicThreshold'; import { cleanZombieUserCompany } from './cleanZombieUserCompany'; import { calculateTopReaders } from './calculateTopReaders'; @@ -35,4 +36,5 @@ export const crons: Cron[] = [ cleanZombieUserCompany, updateSourcePublicThreshold, calculateTopReaders, + validateActiveUsers, ]; diff --git a/src/cron/validateActiveUsers.ts b/src/cron/validateActiveUsers.ts new file mode 100644 index 000000000..b5add63ed --- /dev/null +++ b/src/cron/validateActiveUsers.ts @@ -0,0 +1,48 @@ +import { Cron } from './cron'; +import { + getAbsoluteDifferenceInDays, + SUCCESSFUL_CIO_SYNC_DATE, + syncSubscriptionsWithActiveState, +} from '../common'; +import { getUsersActiveState } from '../common/googleCloud'; +import { getRedisObject, setRedisObject } from '../redis'; +import { DataSource } from 'typeorm'; +import { addDays, subDays } from 'date-fns'; + +const runSync = async (con: DataSource, runDate: Date) => { + const users = await getUsersActiveState(runDate); + + const { hasAnyFailed } = await syncSubscriptionsWithActiveState({ + con, + users, + }); + + if (!hasAnyFailed) { + await setRedisObject(SUCCESSFUL_CIO_SYNC_DATE, runDate.toISOString()); + } +}; + +const cron: Cron = { + name: 'validate-active-users', + handler: async (con) => { + const runDate = subDays(new Date(), 1); + const lastSuccessfulDate = await getRedisObject(SUCCESSFUL_CIO_SYNC_DATE); + + if (!lastSuccessfulDate) { + return runSync(con, runDate); + } + + const lastRunDate = new Date(lastSuccessfulDate); + const difference = getAbsoluteDifferenceInDays(lastRunDate, runDate); + + if (difference === 0) { + return; + } + + for (let i = 1; i <= difference; i++) { + await runSync(con, addDays(lastRunDate, i)); + } + }, +}; + +export default cron; diff --git a/src/entity/user/User.ts b/src/entity/user/User.ts index a88844582..482434714 100644 --- a/src/entity/user/User.ts +++ b/src/entity/user/User.ts @@ -131,6 +131,9 @@ export class User { @Column({ default: false }) devcardEligible: boolean; + @Column({ default: true }) + cioRegistered: boolean; + @Column({ type: 'text', nullable: true, default: DEFAULT_TIMEZONE }) timezone?: string; diff --git a/src/migration/1734294820179-UserCioRegistered.ts b/src/migration/1734294820179-UserCioRegistered.ts new file mode 100644 index 000000000..0bfdd0b4f --- /dev/null +++ b/src/migration/1734294820179-UserCioRegistered.ts @@ -0,0 +1,19 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class UserCioRegistered1734294820179 implements MigrationInterface { + name = 'UserCioRegistered1734294820179'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "user" ADD "cioRegistered" boolean NOT NULL DEFAULT true`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_user_cioRegistered" ON "user" ("cioRegistered") `, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_user_cioRegistered"`); + await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "cioRegistered"`); + } +}