Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions applications/wikipedia/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
data_dir = f"{cache_dir}/{dataset_name}"
DATA_PATH = Path(data_dir)

PUSH_TO_HUB = False
PUSH_TO_HUB = True
dataset_name = f"567-labs/wikipedia-embedding-{MODEL_SLUG}-sample"
dataset_file = "wiki-embeddings.parquet"

Expand Down Expand Up @@ -145,7 +145,9 @@ async def embed(self, chunks):


@stub.function(
image=Image.debian_slim().pip_install("datasets", "pyarrow", "tqdm"),
image=Image.debian_slim().pip_install(
"datasets", "pyarrow", "tqdm", "hf_transfer", "huggingface_hub"
),
volumes={cache_dir: volume},
timeout=84600,
secret=Secret.from_name("huggingface-credentials"),
Expand Down Expand Up @@ -186,7 +188,13 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50):
start = time.perf_counter()
acc_chunks = []
embeddings = []
for batch_chunks, batch_embeddings in model.embed.map(batches, order_outputs=False):
for resp in model.embed.map(batches, order_outputs=False, return_exceptions=True):
if isinstance(resp, Exception):
print(f"Exception: {resp}")
continue

batch_chunks, batch_embeddings = resp

acc_chunks.extend(batch_chunks)
embeddings.extend(batch_embeddings)

Expand All @@ -207,29 +215,46 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50):
"extrapolated_duration": extrapolated_duration_cps_fmt,
}

print(json.dumps(resp, indent=2))

if PUSH_TO_HUB:
print(f"Pushing to hub {dataset_name}")
table = pa.Table.from_arrays(
[
pa.array([chunk[0] for chunk in acc_chunks]), # id
pa.array([chunk[1] for chunk in acc_chunks]), # url
pa.array([chunk[2] for chunk in acc_chunks]), # title
pa.array([chunk[3] for chunk in acc_chunks]), # text
pa.array(embeddings),
],
names=["id", "url", "title", "text", "embedding"],
)
pq.write_table(table, dataset_file)
dataset = load_dataset("parquet", data_files=dataset_file)
dataset.push_to_hub(dataset_name, token=os.environ["HUGGINGFACE_TOKEN"])
try:
print(f"Pushing to hub {dataset_name}")
table = pa.Table.from_arrays(
[
pa.array([chunk[0] for chunk in acc_chunks]), # id
pa.array([chunk[1] for chunk in acc_chunks]), # url
pa.array([chunk[2] for chunk in acc_chunks]), # title
pa.array([chunk[3] for chunk in acc_chunks]), # text
pa.array(embeddings),
],
names=["id", "url", "title", "text", "embedding"],
)
pq.write_table(table, dataset_file)

print(f"Uploading to hub {dataset_name}")
from huggingface_hub import HfApi, logging

logging.set_verbosity_debug()
hf = HfApi()
# ! This is not working but should be
hf.upload_file(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can someone help look at this?

path_or_fileobj=dataset_file,
path_in_repo=dataset_file,
repo_id="jxnlco/modal-wikipedia",
repo_type="dataset",
)

except Exception as e:
print(e)

return resp


@stub.local_entrypoint()
def main():
for scale, batch_size in product([0.25], [512 * 50]):
with open("benchmarks.json", "a") as f:
benchmark = embed_dataset.remote(down_scale=scale, batch_size=batch_size)
print(json.dumps(benchmark, indent=2))
f.write(json.dumps(benchmark, indent=2) + "\n")
scale = 0.01
batch_size = 512 * 150
with open("benchmarks.json", "a") as f:
benchmark = embed_dataset.remote(down_scale=scale, batch_size=batch_size)
f.write(json.dumps(benchmark, indent=2) + "\n")