Skip to content

Zlib Stream API corrupts data.Β #61202

@LRagji

Description

@LRagji

Version

v24.5.0 and even previous LTS version 22

Platform

Tried both on MAC OS(M4 arm64) and Windows 11(Intel x64)

Subsystem

node:Zlib

What steps will reproduce the bug?

I am trying to use zlib inside a transform stream which compresses multiple streams with separate zlib instances. like given below

type TSplitStreamContext = {
        redisTempKey: string,
        csvObjectStringifier?: ReturnType<typeof createObjectCsvStringifier>,
        zipper?: Gzip,
        downStream?: Stream[]
    };

const tagsStreamContext = new Map<string, TSplitStreamContext>();

const zipStream = new Transform({
        objectMode: true,
        transform(tagWiseCSVChunks: Record<string, string>, encoding, callback) {
            const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
            let upstreamPressure = 0;
            let downstreamBackPressure = 0;

            for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;

                if (existingContext.zipper == null || existingContext.zipper === undefined) {
                    existingContext.zipper = createGzip();

                    existingContext.zipper.on('data', (chunk: Buffer) => {
                        if (this.push({ [tagId]: chunk }) === false) {
                            downstreamBackPressure++;
                            existingContext.zipper!.pause();
                            this.once('drain', () => {
                                downstreamBackPressure--;
                                if (downstreamBackPressure === 0) {
                                    existingContext.zipper!.resume();
                                }
                            });
                        }
                    });

                    tagsStreamContext.set(tagId, existingContext);
                }

                if (existingContext.zipper.write(csvChunk) === false) {
                    upstreamPressure++;
                    existingContext.zipper.once('drain', () => {
                        upstreamPressure--;
                        if (upstreamPressure === 0) {
                            callback();//This controls upstream flow
                        }
                    });
                }
            }
            if (upstreamPressure === 0) {
                callback();//This controls upstream flow
            }

        },

        final(callback) {
            const promiseHandles = [];

            for (const [tagId, context] of tagsStreamContext) {
                if (context.zipper !== null && context.zipper !== undefined) {
                    promiseHandles.push(new Promise<void>((resolve, reject) => {
                        context.zipper!.once('end', resolve);
                        context.zipper!.once('error', reject);
                        context.zipper!.end();
                    }));
                }
            }

            Promise.all(promiseHandles)
                .then(() => { callback(); console.log('All zippers ended.'); })
                .catch(err => callback(err));
        }
    });

I invoke this code with more stream which pass data to this transform stream like file reader, csv parser etc and downstream i have filewriter, Everything this only produces half of the data from the actual file and half is not written.

How often does it reproduce? Is there a required condition?

This happens every time.

What is the expected behavior? Why is that the expected behavior?

It should normally compress all of the file data and just act like a normal pipe operation.

What do you see instead?

It only transforms half of the file.

Important part is it works for small files <100MB but fails for any bigger files.. i assume this is something to do with its internal buffers i guess (speculation)

rename the attached code file from ts to mts for some reason github is not allowing mts extension

Additional information

The Entire code:

import { appendFileSync, createReadStream, createWriteStream, mkdirSync } from 'node:fs';
import Stream, { PassThrough, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createGzip, Gzip } from 'node:zlib';
import csvParser from 'csv-parser';
import redis from 'redis';
import { randomInt } from 'node:crypto';
import { createObjectCsvStringifier } from 'csv-writer';

async function streamTodo(csvFilePath: string, expiryTimeInMilliseconds: number, redisClient: redis.RedisClientType, redisTempKeyPartA = `temp-stream-${randomInt(1e6)}-`, redisTempStreamingTimeout = 10000): Promise<void> {
    type TTagStreamObject = { timestamp: number, value: number, status: number, tag: string };
    type TSplitStreamContext = {
        redisTempKey: string,
        csvObjectStringifier?: ReturnType<typeof createObjectCsvStringifier>,
        zipper?: Gzip,
        downStream?: Stream[]
    };

    const tagsStreamContext = new Map<string, TSplitStreamContext>();
    const rowAccumulatorMap = new WeakMap<Transform, Array<Record<string, string>>>();

    const inputFileStream = createReadStream(csvFilePath, { encoding: 'utf8' });

    const csvStream = csvParser();

    const rowAccumulator = new Transform({
        objectMode: true,
        highWaterMark: 100, // Adjust this value as needed for batch size.(sweet spot between memory and speed)
        transform(row: Record<string, string>, encoding, callback) {
            const acc = rowAccumulatorMap.get(this) ?? new Array<Record<string, string>>();
            acc.push(row);
            if (acc.length >= this.readableHighWaterMark) {
                callback(null, acc);
                acc.length = 0;
            } else {
                rowAccumulatorMap.set(this, acc);
                callback();
            }
        },
        flush(callback) {
            const acc = rowAccumulatorMap.get(this) ?? new Array<Record<string, string>>();
            callback(null, acc);
            acc.length = 0;
        }
    });

    const tagSegregationStream = new Transform({
        objectMode: true,
        transform(rows: Record<string, string>[], encoding, callback) {
            const tagWiseData: Record<string, TTagStreamObject[]> = {};
            for (const row of rows) {
                for (const [key, value] of Object.entries(row)) {
                    if (key !== 'timestamp' && !key.endsWith('_status')) {
                        const existingArray = tagWiseData[key] ?? [];
                        existingArray.push({
                            timestamp: parseFloat(row['timestamp']),
                            value: parseFloat(value),
                            status: parseFloat(row[`${key}_status`]),
                            tag: key
                        });
                        tagWiseData[key] = existingArray;
                    }
                }
            }
            callback(null, tagWiseData);
        }
    });

    const outputCSVStream = new Transform({
        objectMode: true,
        transform(tagWiseData: Record<string, TTagStreamObject[]>, encoding, callback) {

            const tagWiseCSVChunks: Record<string, string> = {};
            for (const [tagId, data] of Object.entries(tagWiseData)) {
                tagWiseCSVChunks[tagId] = "";
                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
                if (existingContext?.csvObjectStringifier == undefined || existingContext?.csvObjectStringifier == null) {

                    existingContext.csvObjectStringifier = createObjectCsvStringifier({
                        header: [
                            { id: 'timestamp', title: 'timestamp' },
                            { id: 'value', title: 'value' },
                            { id: 'status', title: 'status' }
                        ]
                    });
                    tagsStreamContext.set(tagId, existingContext);
                    tagWiseCSVChunks[tagId] = existingContext.csvObjectStringifier.getHeaderString() ?? "";
                }

                tagWiseCSVChunks[tagId] += existingContext.csvObjectStringifier.stringifyRecords(data);
            }

            callback(null, tagWiseCSVChunks);
        }
    });

    const zipStream = new Transform({
        objectMode: true,
        transform(tagWiseCSVChunks: Record<string, string>, encoding, callback) {
            const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
            let upstreamPressure = 0;
            let downstreamBackPressure = 0;

            for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;

                if (existingContext.zipper == null || existingContext.zipper === undefined) {
                    existingContext.zipper = createGzip();

                    existingContext.zipper.on('data', (chunk: Buffer) => {
                        if (this.push({ [tagId]: chunk }) === false) {
                            downstreamBackPressure++;
                            existingContext.zipper!.pause();
                            this.once('drain', () => {
                                downstreamBackPressure--;
                                if (downstreamBackPressure === 0) {
                                    existingContext.zipper!.resume();
                                }
                            });
                        }
                    });

                    tagsStreamContext.set(tagId, existingContext);
                }

                if (existingContext.zipper.write(csvChunk) === false) {
                    upstreamPressure++;
                    existingContext.zipper.once('drain', () => {
                        upstreamPressure--;
                        if (upstreamPressure === 0) {
                            callback();//This controls upstream flow
                        }
                    });
                }
            }
            if (upstreamPressure === 0) {
                callback();//This controls upstream flow
            }

        },

        final(callback) {
            const promiseHandles = [];

            for (const [tagId, context] of tagsStreamContext) {
                if (context.zipper !== null && context.zipper !== undefined) {
                    promiseHandles.push(new Promise<void>((resolve, reject) => {
                        context.zipper!.once('end', resolve);
                        context.zipper!.once('error', reject);
                        context.zipper!.end();
                    }));
                }
            }

            Promise.all(promiseHandles)
                .then(() => { callback(); console.log('All zippers ended.'); })
                .catch(err => callback(err));
        }
    });

    const redisWriter = new Transform({
        objectMode: true,
        transform(chunk: Record<string, Buffer>, encoding, callback) {
            const promiseHandles = [];
            for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
                const redisTempTagKey = `${redisTempKeyPartA}${tagId}`;

                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
                if (existingContext.redisTempKey == null || existingContext.redisTempKey == undefined) {
                    existingContext.redisTempKey = redisTempTagKey;
                    tagsStreamContext.set(tagId, existingContext);
                }

                promiseHandles.push(
                    redisClient.multi()
                        .append(redisTempTagKey, zippedBufferChunk)
                        .pExpire(redisTempTagKey, redisTempStreamingTimeout)
                        .exec()
                )
            }

            Promise.allSettled(promiseHandles)
                .then(() => callback())
                .catch(err => callback(err));
        },

        final(callback) {
            const promiseHandles = [];
            for (const [tagId, info] of tagsStreamContext) {
                promiseHandles.push(
                    redisClient.multi()
                        .rename(info.redisTempKey, tagId)
                        .pExpire(tagId, expiryTimeInMilliseconds)
                        .exec()
                )
            }

            Promise.allSettled(promiseHandles)
                .then(() => callback())
                .catch(err => callback(err));
        },
    });

    const fileWriter = new Transform({
        objectMode: true,
        transform(chunk: Record<string, Buffer>, encoding, callback) {
            for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
                appendFileSync(`./zipped/${tagId}.gz`, zippedBufferChunk, { flag: 'a' });
            }
            callback();
        }
    });

    // const splitStreams = new Transform({
    //     objectMode: true,
    //     construct(callback) {
    //         callback();
    //     },
    //     transform(tagWiseCSVChunks: Record<string, Buffer>, encoding, callback) {

    //         const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
    //         let upstreamPressure = 0;
    //         let downstreamBackPressure = 0;

    //         for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
    //             const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;

    //             if (Array.isArray(existingContext.downStream) === false || existingContext.downStream.length === 0) {
    //                 existingContext.downStream = [new PassThrough({ objectMode: true }), createGzip(), createWriteStream(`./zipped/${tagId}.gz`, { flags: 'a' })];

    //                 existingContext.zipper.on('data', (chunk: Buffer) => {
    //                     if (this.push({ [tagId]: chunk }) === false) {
    //                         downstreamBackPressure++;
    //                         existingContext.zipper!.pause();
    //                         this.once('drain', () => {
    //                             downstreamBackPressure--;
    //                             if (downstreamBackPressure === 0) {
    //                                 existingContext.zipper!.resume();
    //                             }
    //                         });
    //                     }
    //                 });

    //                 tagsStreamContext.set(tagId, existingContext);
    //             }

    //             if (existingContext.zipper.write(csvChunk) === false) {
    //                 upstreamPressure++;
    //                 existingContext.zipper.once('drain', () => {
    //                     upstreamPressure--;
    //                     if (upstreamPressure === 0) {
    //                         callback();//This controls upstream flow
    //                     }
    //                 });
    //             }
    //         }
    //         if (upstreamPressure === 0) {
    //             callback();//This controls upstream flow
    //         }
    //     },
    //     final(callback) {
    //         redisWriter.end();
    //         fileWriter.end();
    //         callback();
    //     }
    // });

    await pipeline(
        inputFileStream,
        csvStream,
        rowAccumulator,
        tagSegregationStream,
        outputCSVStream,
        zipStream,
        fileWriter
    );

    tagsStreamContext.clear();
}

async function mainModule() {
    mkdirSync('./zipped', { recursive: true })
    console.log('Computing...');
    await streamTodo('./1.csv', 3600000, null as any);
    // console.log('Unzipping...');
    // await unzipGzFiles('./zipped');
    // console.log('Comparing 03...');
    // await compareCsvFiles('./1.csv', './zipped/0c6ad90b-e90d-40ac-a277-169c8024a003.gz', './zipped/diff_03.csv');
    // console.log('Comparing 10...');
    // await compareCsvFiles('./1.csv', './zipped/0c6ad90b-e90d-40ac-a277-169c8024a010.gz', './zipped/diff_10.csv');
}

mainModule()
    .then(() => console.log('All operations completed successfully.'))
    .catch(err => console.error('Error during operations:', err));

Metadata

Metadata

Assignees

No one assigned

    Labels

    needs more infoIssues without a valid reproduction.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions