Skip to content

Commit

Permalink
add storage implmentation back
Browse files Browse the repository at this point in the history
  • Loading branch information
xin.chen committed Aug 30, 2024
1 parent b23fe59 commit b93ac88
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 0 deletions.
16 changes: 16 additions & 0 deletions python/aibrix/aibrix/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


__all__ = ["create_batch_input", "retrieve_batch_job_content"]
76 changes: 76 additions & 0 deletions python/aibrix/aibrix/batch/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from aibrix.batch.storage.batch_storage import (get_job_request_len,
get_storage_job_results,
put_storage_job_results,
read_job_requests,
remove_storage_job_data,
upload_input_data)


def submit_job_input(inputDataFile):
"""Upload job input data file to storage.
Args:
inputDataFile (str): an input file string. Each line is a request.
"""
job_id = upload_input_data(inputDataFile)
return job_id


def delete_job(job_id):
"""Delete job given job ID. This removes all data associated this job,
including input data and output results.
"""
remove_storage_job_data(job_id)


def get_job_input_requests(job_id, start_index, num_requests):
"""Read job input requests specified job Id.
Args:
job_id : job_id is returned by job submission.
start_index : request starting index for read.
num_requests: total number of requests needed to read.
"""
return read_job_requests(job_id, start_index, num_requests)


def get_job_num_request(job_id):
"""Get the number of valid requests for this submitted job."""
return get_job_request_len(job_id)


def put_job_results(job_id, start_index, requests_results):
"""Write job requests results to storage.
Args:
job_id : job_id is returned by job submission.
start_index : requests index to write.
requests_results: a list of json objects as request results to write.
"""
put_storage_job_results(job_id, start_index, requests_results)


def get_job_results(job_id, start_index, num_requests):
"""Read job requests results from storage.
Args:
job_id : job_id is returned by job submission.
start_index : requests index to read.
num_requests: total number of requests output needed to read.
"""
return get_storage_job_results(job_id, start_index, num_requests)
112 changes: 112 additions & 0 deletions python/aibrix/aibrix/batch/storage/batch_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid

from aibrix.batch.storage.generic_storage import LocalDiskFiles

current_job_offsets = {}
job_input_requests = {}
p_storage = LocalDiskFiles()
NUM_REQUESTS_PER_READ = 1024


def upload_input_data(inputDataFileName):
"""Upload job input data file to storage.
Args:
inputDataFileName (str): an input file string.
"""
job_id = uuid.uuid1()
p_storage.write_job_input_data(job_id, inputDataFileName)

current_job_offsets[job_id] = 0
return job_id


def read_job_requests(job_id, start_index, num_requests):
"""Read job requests starting at index: start_index.
Instead of reading from storage per request, this maintains a list of requests
in memory with a length of NUM_REQUESTS_PER_READ.
It also supports random access, if backward read is necessary.
"""

if job_id not in current_job_offsets:
print(f"Create job {job_id} first. Can not find corresponding job ID!!")
return []

# if no request is cached, this reads a list of requests from storage.
if job_id not in job_input_requests:
request_inputs = p_storage.read_job_input_data(job_id, 0, NUM_REQUESTS_PER_READ)
job_input_requests[job_id] = request_inputs

current_start_idx = current_job_offsets[job_id]
current_end_idx = current_start_idx + len(job_input_requests[job_id])

# this reads request backward, so it only reads necessary part.
if start_index < current_start_idx:
if start_index + num_requests < current_start_idx + 1:
temp_len = num_requests
diff_requests = p_storage.read_job_input_data(job_id, start_index, temp_len)
job_input_requests[job_id] = diff_requests
else:
temp_len = current_start_idx + 1 - start_index
diff_requests = p_storage.read_job_input_data(job_id, start_index, temp_len)
job_input_requests[job_id] = diff_requests + job_input_requests[job_id]

current_job_offsets[job_id], current_start_idx = start_index, start_index
current_end_idx = current_start_idx + len(job_input_requests[job_id])

# the cached parts miss already, this throws away old caches.
if start_index >= current_end_idx:
current_job_offsets[job_id] = start_index
job_input_requests[job_id] = []
current_start_idx, current_end_idx = start_index, start_index

# now this reads necessary requests at least for a length of num_requests.
if start_index + num_requests > current_end_idx:
temp_len = start_index + num_requests - current_end_idx
diff_requests = p_storage.read_job_input_data(job_id, current_end_idx, temp_len)
job_input_requests[job_id] = job_input_requests[job_id] + diff_requests
current_end_idx = current_job_offsets[job_id] + len(job_input_requests[job_id])

available_num_req = min(num_requests, current_end_idx - start_index)
start_offset = start_index - current_start_idx

requests = job_input_requests[job_id][
start_offset : start_offset + available_num_req
]
# print("debug", len(requests))
return requests


def put_storage_job_results(job_id, start_index, requests_results):
"""Write job results on a specific index."""
p_storage.write_job_output_data(job_id, start_index, requests_results)


def get_storage_job_results(job_id, start_index, num_requests):
"""Read job requests results."""
return p_storage.read_job_output_data(job_id, start_index, num_requests)


def remove_storage_job_data(job_id):
"""Remove job all relevant data."""
p_storage.delete_job_data(job_id)


def get_job_request_len(job_id):
"""Get the number of requests for the job_id."""
return p_storage.get_job_number_requests(job_id)
Loading

0 comments on commit b93ac88

Please sign in to comment.