Skip to content

Commit ae2cf6e

Browse files
authored
Merge pull request #13 from topcoder-platform/migration
Better handling of basic_trait info
2 parents e76441c + 0d011a1 commit ae2cf6e

File tree

2 files changed

+268
-10
lines changed

2 files changed

+268
-10
lines changed

src/scripts/migrate-dynamo-data.js

Lines changed: 258 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,14 @@ const destructiveApprovals = new Map()
7272

7373
function logWithLevel (level, message, context = null) {
7474
const timestamp = new Date().toISOString()
75-
const contextSuffix = context ? ` | ${JSON.stringify(context)}` : ''
75+
let contextSuffix = ''
76+
if (context) {
77+
try {
78+
contextSuffix = ` | ${JSON.stringify(context, (_, value) => (typeof value === 'bigint' ? value.toString() : value))}`
79+
} catch (serializeErr) {
80+
contextSuffix = ` | ${JSON.stringify({ serializationError: serializeErr.message })}`
81+
}
82+
}
7683
const output = `[${level}] ${timestamp} ${message}${contextSuffix}`
7784
if (level === LOG_LEVELS.ERROR) {
7885
console.error(output)
@@ -95,6 +102,14 @@ function logError (message, context) {
95102
logWithLevel(LOG_LEVELS.ERROR, message, context)
96103
}
97104

105+
function isBigIntSerializationError (err) {
106+
if (!err) {
107+
return false
108+
}
109+
const message = err.message || `${err}`
110+
return typeof message === 'string' && message.toLowerCase().includes('serialize a bigint')
111+
}
112+
98113
async function executeWrite (description, operation, context = {}) {
99114
if (DRY_RUN) {
100115
logInfo(`DRY_RUN active, skipping write: ${description}`, context)
@@ -1169,6 +1184,7 @@ async function importDynamoMember (filename, dateFilter = null) {
11691184
let total = 0
11701185
// count skipped items due to date filter
11711186
let skipped = 0
1187+
let skippedDueToErrors = 0
11721188
// store the temp json object string
11731189
let stringObject = ''
11741190
// store batch items
@@ -1446,6 +1462,29 @@ function isInvalidUtf8Error (err) {
14461462
return message.includes('invalid byte sequence for encoding "UTF8"') || message.includes('0x00')
14471463
}
14481464

1465+
function isUniqueConstraintError (err) {
1466+
if (!err) {
1467+
return false
1468+
}
1469+
if (err.code === 'P2002') {
1470+
return true
1471+
}
1472+
const message = err.message || ''
1473+
return message.includes('Unique constraint failed')
1474+
}
1475+
1476+
function logUniqueConstraintSkip (memberItem, err) {
1477+
const identifier = compactObject({
1478+
userId: memberItem?.userId,
1479+
handle: memberItem?.handle,
1480+
handleLower: memberItem?.handleLower
1481+
})
1482+
logWarn('Skipping member due to unique constraint violation', {
1483+
...identifier,
1484+
target: err?.meta?.target
1485+
})
1486+
}
1487+
14491488
async function createMembersIndividually (memberItems) {
14501489
for (const memberItem of memberItems) {
14511490
try {
@@ -1457,6 +1496,10 @@ async function createMembersIndividually (memberItems) {
14571496
timeout: TRANSACTION_TIMEOUT_MS
14581497
}))
14591498
} catch (err) {
1499+
if (isUniqueConstraintError(err)) {
1500+
logUniqueConstraintSkip(memberItem, err)
1501+
continue
1502+
}
14601503
if (isInvalidUtf8Error(err)) {
14611504
console.warn(`Skipping member ${memberItem.userId || memberItem.handleLower || 'unknown'} due to invalid UTF-8 data`)
14621505
continue
@@ -1495,6 +1538,11 @@ async function createMembers (memberItems) {
14951538
await createMembersIndividually(memberItems)
14961539
return
14971540
}
1541+
if (isUniqueConstraintError(err)) {
1542+
console.warn('Batch insert failed due to unique constraint violation. Falling back to per-member inserts.')
1543+
await createMembersIndividually(memberItems)
1544+
return
1545+
}
14981546
throw err
14991547
}
15001548
}
@@ -2193,15 +2241,198 @@ async function importElasticSearchMember (filename, dateFilter = null) {
21932241
continue
21942242
}
21952243

2196-
await updateMembersWithTraitsAndSkills(dataObj)
2197-
total += 1
2244+
const updated = await updateMembersWithTraitsAndSkills(dataObj)
2245+
if (updated) {
2246+
total += 1
2247+
} else {
2248+
skippedDueToErrors += 1
2249+
}
21982250
}
21992251

22002252
console.log(`\nIt has updated ${total} items totally, skipped ${skipped} items`)
2253+
if (skippedDueToErrors > 0) {
2254+
console.log(`Skipped due to errors: ${skippedDueToErrors}`)
2255+
}
22012256

22022257
console.log(`Finished reading the file: ${filename}\n`)
22032258
}
22042259

2260+
/**
2261+
* Import only the Dynamo basic_info traits and upsert them.
2262+
* @param {String} filename filename
2263+
* @param {Date|null} [dateFilter=null] optional date filter threshold
2264+
*/
2265+
async function importDynamoBasicInfoTraits (filename, dateFilter = null) {
2266+
const traitFilePath = path.join(MIGRATE_DIR, filename)
2267+
2268+
const lineCount = await countFileLines(traitFilePath)
2269+
console.log(`${filename} has ${lineCount} lines in total`)
2270+
2271+
const rlRead = readline.createInterface({
2272+
input: fs.createReadStream(traitFilePath),
2273+
crlfDelay: Infinity
2274+
})
2275+
2276+
let currentLine = 0
2277+
let count = 0
2278+
let processed = 0
2279+
let skipped = 0
2280+
let errors = 0
2281+
let stringObject = ''
2282+
2283+
const processBasicInfoRecord = async (dataItem) => {
2284+
if (dataItem.traitId !== 'basic_info') {
2285+
return
2286+
}
2287+
2288+
if (!shouldProcessRecord(dataItem, dateFilter)) {
2289+
skipped += 1
2290+
return
2291+
}
2292+
2293+
let traitsPayload = dataItem.traits
2294+
if (typeof traitsPayload === 'string') {
2295+
try {
2296+
traitsPayload = JSON.parse(traitsPayload)
2297+
} catch (err) {
2298+
errors += 1
2299+
logWarn('Skipping basic_info trait due to invalid JSON payload', {
2300+
userId: dataItem.userId,
2301+
error: err?.message
2302+
})
2303+
return
2304+
}
2305+
}
2306+
2307+
const traitEntries = Array.isArray(traitsPayload?.data) ? traitsPayload.data : []
2308+
if (!traitEntries.length) {
2309+
skipped += 1
2310+
return
2311+
}
2312+
2313+
const normalizedEntries = []
2314+
let targetUserId = normalizeUserId(dataItem.userId)
2315+
2316+
for (const rawEntry of traitEntries) {
2317+
const normalizedEntry = pick(rawEntry || {}, TRAIT_BASIC_INFO)
2318+
const normalizedUserId = normalizeUserId(normalizedEntry.userId || targetUserId)
2319+
if (!normalizedUserId) {
2320+
continue
2321+
}
2322+
targetUserId = targetUserId || normalizedUserId
2323+
normalizedEntries.push({
2324+
userId: normalizedUserId,
2325+
country: normalizedEntry.country || '',
2326+
primaryInterestInTopcoder: normalizedEntry.primaryInterestInTopcoder || '',
2327+
tshirtSize: normalizedEntry.tshirtSize || null,
2328+
gender: normalizedEntry.gender || null,
2329+
shortBio: normalizedEntry.shortBio || '',
2330+
birthDate: _convert2Date(normalizedEntry.birthDate),
2331+
currentLocation: normalizedEntry.currentLocation || null,
2332+
createdBy: CREATED_BY
2333+
})
2334+
}
2335+
2336+
if (normalizedEntries.length === 0 || !targetUserId) {
2337+
skipped += 1
2338+
return
2339+
}
2340+
2341+
const payload = {
2342+
userId: targetUserId,
2343+
memberTraits: {
2344+
basicInfo: normalizedEntries.map(entry => ({
2345+
...entry,
2346+
userId: targetUserId
2347+
}))
2348+
}
2349+
}
2350+
2351+
try {
2352+
const updated = await updateMembersWithTraitsAndSkills(payload)
2353+
if (updated) {
2354+
processed += 1
2355+
} else {
2356+
errors += 1
2357+
logWarn('Basic info trait update returned falsy result', { userId: targetUserId })
2358+
}
2359+
} catch (err) {
2360+
errors += 1
2361+
logError('Failed to upsert basic_info trait', { userId: targetUserId, error: err?.message })
2362+
}
2363+
}
2364+
2365+
for await (const line of rlRead) {
2366+
currentLine += 1
2367+
if (currentLine % 10 === 0) {
2368+
const percentage = ((currentLine / lineCount) * 100).toFixed(2)
2369+
process.stdout.clearLine()
2370+
process.stdout.cursorTo(0)
2371+
process.stdout.write(`Migrate Progress: ${percentage}%, read ${count}, processed ${processed}, skipped ${skipped}, errors ${errors}`)
2372+
}
2373+
2374+
let trimmedLine = line.trim()
2375+
if (!trimmedLine || trimmedLine === ',' || trimmedLine === '[' || trimmedLine === ']' || trimmedLine === '],') {
2376+
continue
2377+
}
2378+
2379+
if (trimmedLine.startsWith('[')) {
2380+
trimmedLine = trimmedLine.substring(1).trim()
2381+
if (!trimmedLine) {
2382+
continue
2383+
}
2384+
}
2385+
if (trimmedLine.endsWith(']')) {
2386+
trimmedLine = trimmedLine.substring(0, trimmedLine.length - 1).trim()
2387+
if (!trimmedLine) {
2388+
continue
2389+
}
2390+
}
2391+
2392+
if (!stringObject) {
2393+
stringObject = trimmedLine
2394+
} else {
2395+
stringObject += trimmedLine
2396+
}
2397+
2398+
let jsonCandidate = stringObject
2399+
if (jsonCandidate.endsWith(',')) {
2400+
jsonCandidate = jsonCandidate.slice(0, -1)
2401+
}
2402+
2403+
let dataItem
2404+
try {
2405+
dataItem = JSON.parse(jsonCandidate)
2406+
} catch (err) {
2407+
continue
2408+
}
2409+
2410+
stringObject = ''
2411+
count += 1
2412+
await processBasicInfoRecord(dataItem)
2413+
}
2414+
2415+
if (stringObject) {
2416+
let jsonCandidate = stringObject
2417+
if (jsonCandidate.endsWith(',')) {
2418+
jsonCandidate = jsonCandidate.slice(0, -1)
2419+
}
2420+
if (jsonCandidate) {
2421+
try {
2422+
const dataItem = JSON.parse(jsonCandidate)
2423+
count += 1
2424+
await processBasicInfoRecord(dataItem)
2425+
} catch (err) {
2426+
errors += 1
2427+
logWarn('Skipping trailing basic_info trait due to invalid JSON payload', { error: err?.message })
2428+
}
2429+
}
2430+
}
2431+
2432+
console.log(`\nProcessed ${processed} basic_info traits, skipped ${skipped}, errors ${errors}`)
2433+
console.log(`Finished reading the file: ${filename}\n`)
2434+
}
2435+
22052436
/**
22062437
* Update member status values from ElasticSearch snapshot
22072438
* @param {String} filename filename
@@ -2357,9 +2588,12 @@ async function fixMemberUpdateData (memberItem, dbItem) {
23572588
}
23582589
} else if (memberItem.traits.traitId === 'basic_info') {
23592590
const traitData = pick(memberItem.traits.data[0], TRAIT_BASIC_INFO)
2360-
if (traitData.userId && traitData.country && traitData.primaryInterestInTopcoder && traitData.shortBio) {
2591+
if (traitData.userId) {
23612592
memberItemUpdate.memberTraits.basicInfo = [{
23622593
...traitData,
2594+
country: traitData.country || '',
2595+
primaryInterestInTopcoder: traitData.primaryInterestInTopcoder || '',
2596+
shortBio: traitData.shortBio || '',
23632597
birthDate: _convert2Date(traitData.birthDate)
23642598
}]
23652599
}
@@ -2718,6 +2952,10 @@ async function updateMembersWithTraitsAndSkills (memberObj) {
27182952
}))
27192953
} catch (err) {
27202954
logError('Failed to update member with traits and skills', { ...context, error: err?.message })
2955+
if (isBigIntSerializationError(err)) {
2956+
logWarn('Skipping member update due to BigInt serialization error', context)
2957+
return false
2958+
}
27212959
throw err
27222960
}
27232961
}
@@ -2727,9 +2965,15 @@ async function updateMembersWithTraitsAndSkills (memberObj) {
27272965
await syncMemberSkills(memberObj.userId, memberObj.memberSkills, memberObj.handle)
27282966
} catch (err) {
27292967
logError('Failed to sync member skills', { ...context, error: err?.message })
2968+
if (isBigIntSerializationError(err)) {
2969+
logWarn('Skipping member skill sync due to BigInt serialization error', context)
2970+
return false
2971+
}
27302972
throw err
27312973
}
27322974
}
2975+
2976+
return true
27332977
}
27342978

27352979
async function syncMemberAddresses (tx, userId, addresses = []) {
@@ -3649,7 +3893,7 @@ async function runMigrationStep (step, dateFilter, askQuestion) {
36493893
migrationRuntimeState.dateFilter = dateFilter
36503894
destructiveApprovals.delete(step)
36513895

3652-
const shouldRunIntegrityCheck = ['1', '2', '3', '4', '5', '7'].includes(step)
3896+
const shouldRunIntegrityCheck = ['1', '2', '3', '4', '5', '7', '8'].includes(step)
36533897

36543898
try {
36553899
if (shouldRunIntegrityCheck) {
@@ -3777,6 +4021,11 @@ async function runMigrationStep (step, dateFilter, askQuestion) {
37774021
await updateMemberStatusFromElasticSearch(memberElasticsearchFilename, dateFilter)
37784022
break
37794023
}
4024+
case '8': {
4025+
const memberTraitFilename = 'MemberProfileTrait.json'
4026+
await importDynamoBasicInfoTraits(memberTraitFilename, dateFilter)
4027+
break
4028+
}
37804029
default:
37814030
throw new Error(`Unsupported step "${step}"`)
37824031
}
@@ -3813,6 +4062,7 @@ async function main () {
38134062
console.log('5. Import Dynamo MemberStatHistory')
38144063
console.log('6. Import Distribution Stats')
38154064
console.log('7. Update ElasticSearch Member Status')
4065+
console.log('8. Import Dynamo Basic Info Traits')
38164066
console.log('')
38174067
console.log('Destructive clears require --full-reset or ALLOW_DESTRUCTIVE=true and an explicit confirmation. Incremental runs retain existing data by default.')
38184068

@@ -3825,8 +4075,8 @@ async function main () {
38254075

38264076
let selectedStep = null
38274077
try {
3828-
selectedStep = (await askQuestion('Please select your step to run (0-7): ')).trim()
3829-
const validSteps = new Set(['0', '1', '2', '3', '4', '5', '6', '7'])
4078+
selectedStep = (await askQuestion('Please select your step to run (0-8): ')).trim()
4079+
const validSteps = new Set(['0', '1', '2', '3', '4', '5', '6', '7', '8'])
38304080
if (!validSteps.has(selectedStep)) {
38314081
console.log('Unsupported step selected. Script is finished.')
38324082
return
@@ -3840,7 +4090,7 @@ async function main () {
38404090
}
38414091

38424092
let dateFilter = null
3843-
if (['1', '2', '3', '4', '5', '7'].includes(selectedStep)) {
4093+
if (['1', '2', '3', '4', '5', '7', '8'].includes(selectedStep)) {
38444094
const dateFilterInput = (await askQuestion('Enter date filter (YYYY-MM-DD UTC; timestamp-less records will be skipped, or press Enter to skip): ')).trim()
38454095
if (dateFilterInput) {
38464096
const parsed = parseDateFilter(dateFilterInput)

0 commit comments

Comments
 (0)