Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/support aws s3 v3 #115

Merged
merged 7 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions esbuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const baseConfig = {
},
inject: ['./esbuild-shims.js'],
minify: true,
mainFields: ["browser", "module", "main"],
platform: 'browser', // default
plugins: [compressionBrowserPlugin, wasmPlugin],
target: "es2020" // default
Expand All @@ -27,6 +28,7 @@ const testConfig = {
},
inject: ['./esbuild-shims.js'],
minify: false,
mainFields: ["browser", "module", "main"],
platform: 'browser', // default
plugins: [compressionBrowserPlugin, wasmPlugin],
target: "es2020" // default
Expand Down
85 changes: 77 additions & 8 deletions lib/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import {
ColumnChunkExt
} from './declare';
import {Cursor, Options} from './codec/types';
import {
GetObjectCommand,
HeadObjectCommand,
S3Client,
} from "@aws-sdk/client-s3";
import type { Readable } from "stream";
import type { Blob } from "buffer";

const {
getBloomFiltersFor,
Expand Down Expand Up @@ -137,13 +144,23 @@ export class ParquetReader {
}

/**
* Open the parquet file from S3 using the supplied aws client and params
* The params have to include `Bucket` and `Key` to the file requested
* This function returns a new parquet reader
* Open the parquet file from S3 using the supplied aws client [, commands] and params
* The params have to include `Bucket` and `Key` to the file requested,
* If using v3 of the AWS SDK, combine the client and commands into an object wiht keys matching
* the original module names, and do not instantiate the commands; pass them as classes/modules.
*
* This function returns a new parquet reader [ or throws an Error.]
*/
static async openS3(client: ClientS3, params: ClientParameters, options?: BufferReaderOptions) {
let envelopeReader = await ParquetEnvelopeReader.openS3(client, params, options);
return this.openEnvelopeReader(envelopeReader, options);
static async openS3(client: any, params: ClientParameters, options?: BufferReaderOptions) {
try {
let envelopeReader: ParquetEnvelopeReader =
'function' === typeof client['headObject'] ?
await ParquetEnvelopeReader.openS3(client as ClientS3, params, options) :// S3 client v2
await ParquetEnvelopeReader.openS3v3(client as S3Client, params, options) ; // S3 client v3
return this.openEnvelopeReader(envelopeReader, options);
} catch (e: any) {
throw new Error(`Error accessing S3 Bucket ${params.Bucket}. Message: ${e.message}`);
}
}

/**
Expand Down Expand Up @@ -181,6 +198,7 @@ export class ParquetReader {
*/
constructor(metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, opts?: BufferReaderOptions) {
opts = opts || {};

if (!PARQUET_VERSIONS.includes(metadata.version)) {
throw 'invalid parquet version';
}
Expand Down Expand Up @@ -448,6 +466,56 @@ export class ParquetEnvelopeReader {
return new ParquetEnvelopeReader(readFn, closeFn, fileStat, options);
}

static async openS3v3(client: S3Client, params: any, options: any) {
const fileStat = async () => {
try {
let headObjectCommand = await client.send(new HeadObjectCommand(params));
return Promise.resolve(headObjectCommand.ContentLength);
}
catch (e: any){
// having params match command names makes e.message clear to user
return Promise.reject("rejected headObjectCommand: " + e.message);
}
}

const readFn = async (offset: number, length: number, file: string|undefined): Promise<Buffer> => {
if (file) {
return Promise.reject("external references are not supported");
}
const Range = `bytes=${offset}-${offset+length-1}`;
const input = { ...{ Range }, ...params };
const response = await client.send(new GetObjectCommand(input));

const body = response.Body;
if (body) {
return ParquetEnvelopeReader.streamToBuffer(body);
}
return Buffer.of();
};

let closeFn = () => ({});

return new ParquetEnvelopeReader(readFn, closeFn, fileStat, options);
}

static async streamToBuffer(body: any): Promise<Buffer> {
const blob = body as Blob;
if (blob.arrayBuffer !== undefined) {
const arrayBuffer = await blob.arrayBuffer();
const uint8Array: Uint8Array = new Uint8Array(arrayBuffer);
return new Buffer(uint8Array);
}

//Assumed to be a Readable like object
const readable = body as Readable;
return await new Promise((resolve, reject) => {
const chunks: Uint8Array[] = [];
readable.on("data", (chunk) => chunks.push(chunk));
readable.on("error", reject);
readable.on("end", () => resolve(Buffer.concat(chunks)));
});
}

static async openUrl(url: Parameter | URL | string, options?: BufferReaderOptions) {
let params: Parameter;
if (typeof url === 'string') params = { url };
Expand Down Expand Up @@ -695,9 +763,10 @@ export class ParquetEnvelopeReader {

let trailerLen = PARQUET_MAGIC.length + 4;

let trailerBuf = await this.read((this.fileSize as number) - trailerLen, trailerLen);
let offset = (this.fileSize as number) - trailerLen;
let trailerBuf = await this.read(offset, trailerLen);

if (trailerBuf.slice(4).toString() != PARQUET_MAGIC) {
if (trailerBuf.subarray(4).toString() != PARQUET_MAGIC) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: slice has been deprecated in favor of subarray

throw 'not a valid parquet file';
}

Expand Down
Loading
Loading