Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/composite-file-upload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@e2b/python-sdk': minor
'e2b': minor
---

automatically split large file uploads (>64MB) into parallel chunks and compose them server-side (async Python SDK and JS SDK only)
53 changes: 53 additions & 0 deletions packages/js-sdk/src/envd/schema.gen.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 85 additions & 2 deletions packages/js-sdk/src/sandbox/filesystem/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ export interface FilesystemReadOpts extends FilesystemRequestOpts {
gzip?: boolean
}

const DEFAULT_CHUNK_SIZE = 64 * 1024 * 1024 // 64 MB

export interface FilesystemListOpts extends FilesystemRequestOpts {
/**
* Depth of the directory to list.
Expand Down Expand Up @@ -388,7 +390,7 @@ export class Filesystem {
typeof pathOrFiles === 'string'
? {
path: pathOrFiles,
writeOpts: opts as FilesystemWriteOpts,
writeOpts: opts as FilesystemWriteOpts | undefined,
writeFiles: [
{
data: dataOrOpts as
Expand All @@ -401,7 +403,7 @@ export class Filesystem {
}
: {
path: undefined,
writeOpts: dataOrOpts as FilesystemWriteOpts,
writeOpts: dataOrOpts as FilesystemWriteOpts | undefined,
writeFiles: pathOrFiles as WriteEntry[],
}

Expand All @@ -418,6 +420,16 @@ export class Filesystem {
const useOctetStream =
compareVersions(this.envdApi.version, ENVD_OCTET_STREAM_UPLOAD) >= 0

// Composite upload: automatically chunk large files, upload parts in parallel, then compose
if (path && useOctetStream) {
const blob = await toBlob(writeFiles[0].data)
if (blob.size > DEFAULT_CHUNK_SIZE) {
return this.compositeWrite(path, blob, user, writeOpts)
}
// Data fits in a single chunk — fall through to normal write path
writeFiles[0] = { data: blob }
}

const results: WriteInfo[] = []

const useGzip = writeOpts?.gzip === true
Expand Down Expand Up @@ -821,4 +833,75 @@ export class Filesystem {
throw handleFilesystemRpcError(err)
}
}

private async compositeWrite(
destination: string,
blob: Blob,
user: Username | undefined,
opts?: FilesystemWriteOpts
): Promise<WriteInfo> {
const totalSize = blob.size
const chunkSize = DEFAULT_CHUNK_SIZE
const useGzip = opts?.gzip === true

const headers: Record<string, string> = {
'Content-Type': 'application/octet-stream',
}
if (useGzip) {
headers['Content-Encoding'] = 'gzip'
}

// Split into chunks and upload in parallel
const chunkCount = Math.ceil(totalSize / chunkSize)
const uploadId = crypto.randomUUID()
const chunkPaths: string[] = []

for (let i = 0; i < chunkCount; i++) {
chunkPaths.push(`/tmp/.e2b-upload-${uploadId}-${i}`)
}

await Promise.all(
chunkPaths.map(async (chunkPath, i) => {
const start = i * chunkSize
const end = Math.min(start + chunkSize, totalSize)
const chunk = blob.slice(start, end)
const body = await toUploadBody(chunk, useGzip)

const res = await this.envdApi.api.POST('/files', {
params: {
query: {
path: chunkPath,
username: user,
},
},
bodySerializer: () => body,
headers,
signal: this.connectionConfig.getSignal(opts?.requestTimeoutMs),
body: {},
})

const err = await handleFilesystemEnvdApiError(res)
if (err) {
throw err
}
})
)

// Compose chunks into the final file
const composeRes = await this.envdApi.api.POST('/files/compose', {
body: {
source_paths: chunkPaths,
destination,
username: user,
},
signal: this.connectionConfig.getSignal(opts?.requestTimeoutMs),
})

const composeErr = await handleFilesystemEnvdApiError(composeRes)
if (composeErr) {
throw composeErr
}

return composeRes.data as WriteInfo
}
}
1 change: 1 addition & 0 deletions packages/python-sdk/e2b/envd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


ENVD_API_FILES_ROUTE = "/files"
ENVD_API_FILES_COMPOSE_ROUTE = "/files/compose"
ENVD_API_HEALTH_ROUTE = "/health"

_DEFAULT_API_ERROR_MAP: dict[int, Callable[[str], Exception]] = {
Expand Down
95 changes: 94 additions & 1 deletion packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import asyncio
import gzip
import uuid

from io import IOBase, TextIOBase
from typing import IO, AsyncIterator, List, Literal, Optional, Union, overload

Expand All @@ -15,7 +18,11 @@
Username,
default_username,
)
from e2b.envd.api import ENVD_API_FILES_ROUTE, ahandle_envd_api_exception
from e2b.envd.api import (
ENVD_API_FILES_COMPOSE_ROUTE,
ENVD_API_FILES_ROUTE,
ahandle_envd_api_exception,
)
from e2b.envd.filesystem import filesystem_connect, filesystem_pb2
from e2b.envd.rpc import authentication_header, handle_rpc_exception
from e2b.envd.versions import (
Expand Down Expand Up @@ -58,6 +65,9 @@ async def _ahandle_filesystem_envd_api_exception(r):
return await ahandle_envd_api_exception(r, _FILESYSTEM_HTTP_ERROR_MAP)


_DEFAULT_CHUNK_SIZE = 64 * 1024 * 1024 # 64 MB


class Filesystem:
"""
Module for interacting with the filesystem in the sandbox.
Expand Down Expand Up @@ -212,6 +222,15 @@ async def write(

:return: Information about the written file
"""
if self._envd_version >= ENVD_OCTET_STREAM_UPLOAD:
content = to_upload_body(data, False)
if len(content) > _DEFAULT_CHUNK_SIZE:
return await self._composite_write(
path, content, user, request_timeout, gzip
)
# Use materialized bytes to avoid consuming IO streams twice
data = content

result = await self.write_files(
[WriteEntry(path=path, data=data)],
user,
Expand Down Expand Up @@ -345,6 +364,80 @@ async def _upload_file(file):

return results

async def _composite_write(
self,
destination: str,
content: bytes,
user: Optional[Username] = None,
request_timeout: Optional[float] = None,
use_gzip: bool = False,
) -> WriteInfo:
username = user
if username is None and self._envd_version < ENVD_DEFAULT_USER:
username = default_username

total_size = len(content)
chunk_size = _DEFAULT_CHUNK_SIZE

headers = {"Content-Type": "application/octet-stream"}
if use_gzip:
headers["Content-Encoding"] = "gzip"

# Split into chunks and upload in parallel
upload_id = str(uuid.uuid4())
chunk_count = (total_size + chunk_size - 1) // chunk_size
chunk_paths = [f"/tmp/.e2b-upload-{upload_id}-{i}" for i in range(chunk_count)]

async def _upload_chunk(i: int) -> None:
start = i * chunk_size
end = min(start + chunk_size, total_size)
chunk_data = content[start:end]

params = {"path": chunk_paths[i]}
if username:
params["username"] = username

if use_gzip:
upload_content = await asyncio.to_thread(gzip.compress, chunk_data)
else:
upload_content = chunk_data

r = await self._envd_api.post(
ENVD_API_FILES_ROUTE,
content=upload_content,
headers=headers,
params=params,
timeout=self._connection_config.get_request_timeout(request_timeout),
)

err = await _ahandle_filesystem_envd_api_exception(r)
if err:
raise err

async with asyncio.TaskGroup() as tg:
for i in range(chunk_count):
tg.create_task(_upload_chunk(i))

# Compose chunks into the final file
body = {
"source_paths": chunk_paths,
"destination": destination,
}
if username:
body["username"] = username

r = await self._envd_api.post(
ENVD_API_FILES_COMPOSE_ROUTE,
json=body,
timeout=self._connection_config.get_request_timeout(request_timeout),
)

err = await _ahandle_filesystem_envd_api_exception(r)
if err:
raise err

return WriteInfo(**r.json())

async def list(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
)
from e2b_connect.client import Code

from e2b.envd.api import ENVD_API_FILES_ROUTE, handle_envd_api_exception
from e2b.envd.api import (
ENVD_API_FILES_ROUTE,
handle_envd_api_exception,
)
from e2b.envd.filesystem import filesystem_connect, filesystem_pb2
from e2b.envd.rpc import authentication_header, handle_rpc_exception
from e2b.envd.versions import (
Expand Down
Loading
Loading