Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add openS3 Compatibility for AWS SDK v3 #64

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

entitycs
Copy link

@entitycs entitycs commented Jan 28, 2021

Without changing parameter list for openS3 function, allows the function to take as input required arguments for use with version 3 of the AWS SDK S3 client (S3Client, HeadObjectCommand, and GetObjectCommand).

Checks for existence of client.getObject (v2) in the client argument before attempting to destructure the client argument into an S3Client, HeadObjectCommand, and GetObjectCommand. The actual implementation is abstracted down from the ParquetReader class (called by the developer) to the ParquetEnvelopeReader class (called by ParquetReader).

The S3Client should be an instantiated client object, and the commands should be passed in just as they are after being destructured/modularized from the require statement. This allows for the ParquetEnvelopeReader class to set ranges on "getObject" calls for v3, as it does for v2 S3 clients (as command inputs are readonly in version 3 of the SDK and must be set per 'new' command). It also allows the developer to set middleware on their client if they wish.

The following example was added to README.md

//v3 example
const {S3Client, HeadObjectCommand, GetObjectCommand} = require('@aws-sdk/client-s3');
const client = new S3Client({region:"us-east-1"});
let reader = await parquet.ParquetReader.openS3(
  {S3Client:client, HeadObjectCommand, GetObjectCommand},
  params
);

@thenickdude
Copy link

The readable() loop you use to receive the response from S3 delivers the response buffer as soon as there are no bytes waiting to be read from S3, this truncates the read before the response from S3 actually finishes and so loses data.

I built a testbed for this here which demonstrates the problem. It reads a local Parquet file, which is successful, then reads that same file through an S3 emulator, and that fails:

https://github.com/thenickdude/parquetjs-s3v3-test

Logging shows that it asks for the byte range '21578-201998', the first chunk available from the S3 stream is 65209 bytes long, and the read loop terminates here without reading the rest of the reply (the full reply length is 180421).

@thenickdude
Copy link

The test passes if I replace that routine with:

        let body = await new Promise ((resolve, reject) => {
          let bodyBuffer, data;
          getObjectCommand.Body.on('end', function() {
            resolve(bodyBuffer);
          });
          getObjectCommand.Body.on('readable', function() {
            while (data = this.read()) {
              if (bodyBuffer) {
                bodyBuffer = Buffer.concat([bodyBuffer, data]);
              } else {
                bodyBuffer = data;
              }
            }
          });
        });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants