Skip to content

Commit

Permalink
Feat/support aws s3 v3 (#115)
Browse files Browse the repository at this point in the history
Problem
=======
Support AWS S3 V3 streams while retaining support for V2. V2 may be removed later.
Closes #32
with @wilwade , @pfrank13 

Change summary:
---------------
* support V3 in reader.ts
* add tests
* use `it.skip` instead of commenting out test code.
---------

Co-authored-by: Wil Wade <[email protected]>
  • Loading branch information
shannonwells and wilwade authored Jan 30, 2024
1 parent 2622ff1 commit 117e5a5
Show file tree
Hide file tree
Showing 6 changed files with 2,997 additions and 121 deletions.
2 changes: 2 additions & 0 deletions esbuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const baseConfig = {
},
inject: ['./esbuild-shims.js'],
minify: true,
mainFields: ["browser", "module", "main"],
platform: 'browser', // default
plugins: [compressionBrowserPlugin, wasmPlugin],
target: "es2020" // default
Expand All @@ -27,6 +28,7 @@ const testConfig = {
},
inject: ['./esbuild-shims.js'],
minify: false,
mainFields: ["browser", "module", "main"],
platform: 'browser', // default
plugins: [compressionBrowserPlugin, wasmPlugin],
target: "es2020" // default
Expand Down
85 changes: 77 additions & 8 deletions lib/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import {
ColumnChunkExt
} from './declare';
import {Cursor, Options} from './codec/types';
import {
GetObjectCommand,
HeadObjectCommand,
S3Client,
} from "@aws-sdk/client-s3";
import type { Readable } from "stream";
import type { Blob } from "buffer";

const {
getBloomFiltersFor,
Expand Down Expand Up @@ -137,13 +144,23 @@ export class ParquetReader {
}

/**
* Open the parquet file from S3 using the supplied aws client and params
* The params have to include `Bucket` and `Key` to the file requested
* This function returns a new parquet reader
* Open the parquet file from S3 using the supplied aws client [, commands] and params
* The params have to include `Bucket` and `Key` to the file requested,
* If using v3 of the AWS SDK, combine the client and commands into an object wiht keys matching
* the original module names, and do not instantiate the commands; pass them as classes/modules.
*
* This function returns a new parquet reader [ or throws an Error.]
*/
static async openS3(client: ClientS3, params: ClientParameters, options?: BufferReaderOptions) {
let envelopeReader = await ParquetEnvelopeReader.openS3(client, params, options);
return this.openEnvelopeReader(envelopeReader, options);
static async openS3(client: any, params: ClientParameters, options?: BufferReaderOptions) {
try {
let envelopeReader: ParquetEnvelopeReader =
'function' === typeof client['headObject'] ?
await ParquetEnvelopeReader.openS3(client as ClientS3, params, options) :// S3 client v2
await ParquetEnvelopeReader.openS3v3(client as S3Client, params, options) ; // S3 client v3
return this.openEnvelopeReader(envelopeReader, options);
} catch (e: any) {
throw new Error(`Error accessing S3 Bucket ${params.Bucket}. Message: ${e.message}`);
}
}

/**
Expand Down Expand Up @@ -181,6 +198,7 @@ export class ParquetReader {
*/
constructor(metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, opts?: BufferReaderOptions) {
opts = opts || {};

if (!PARQUET_VERSIONS.includes(metadata.version)) {
throw 'invalid parquet version';
}
Expand Down Expand Up @@ -448,6 +466,56 @@ export class ParquetEnvelopeReader {
return new ParquetEnvelopeReader(readFn, closeFn, fileStat, options);
}

static async openS3v3(client: S3Client, params: any, options: any) {
const fileStat = async () => {
try {
let headObjectCommand = await client.send(new HeadObjectCommand(params));
return Promise.resolve(headObjectCommand.ContentLength);
}
catch (e: any){
// having params match command names makes e.message clear to user
return Promise.reject("rejected headObjectCommand: " + e.message);
}
}

const readFn = async (offset: number, length: number, file: string|undefined): Promise<Buffer> => {
if (file) {
return Promise.reject("external references are not supported");
}
const Range = `bytes=${offset}-${offset+length-1}`;
const input = { ...{ Range }, ...params };
const response = await client.send(new GetObjectCommand(input));

const body = response.Body;
if (body) {
return ParquetEnvelopeReader.streamToBuffer(body);
}
return Buffer.of();
};

let closeFn = () => ({});

return new ParquetEnvelopeReader(readFn, closeFn, fileStat, options);
}

static async streamToBuffer(body: any): Promise<Buffer> {
const blob = body as Blob;
if (blob.arrayBuffer !== undefined) {
const arrayBuffer = await blob.arrayBuffer();
const uint8Array: Uint8Array = new Uint8Array(arrayBuffer);
return new Buffer(uint8Array);
}

//Assumed to be a Readable like object
const readable = body as Readable;
return await new Promise((resolve, reject) => {
const chunks: Uint8Array[] = [];
readable.on("data", (chunk) => chunks.push(chunk));
readable.on("error", reject);
readable.on("end", () => resolve(Buffer.concat(chunks)));
});
}

static async openUrl(url: Parameter | URL | string, options?: BufferReaderOptions) {
let params: Parameter;
if (typeof url === 'string') params = { url };
Expand Down Expand Up @@ -695,9 +763,10 @@ export class ParquetEnvelopeReader {

let trailerLen = PARQUET_MAGIC.length + 4;

let trailerBuf = await this.read((this.fileSize as number) - trailerLen, trailerLen);
let offset = (this.fileSize as number) - trailerLen;
let trailerBuf = await this.read(offset, trailerLen);

if (trailerBuf.slice(4).toString() != PARQUET_MAGIC) {
if (trailerBuf.subarray(4).toString() != PARQUET_MAGIC) {
throw 'not a valid parquet file';
}

Expand Down
Loading

0 comments on commit 117e5a5

Please sign in to comment.