From b93ac8871d7a768a02930708ba388d03d5d8be2b Mon Sep 17 00:00:00 2001 From: "xin.chen" Date: Fri, 30 Aug 2024 09:36:36 +0800 Subject: [PATCH] add storage implmentation back --- python/aibrix/aibrix/batch/__init__.py | 16 ++ .../aibrix/aibrix/batch/storage/__init__.py | 76 ++++++ .../aibrix/batch/storage/batch_storage.py | 112 +++++++++ .../aibrix/batch/storage/generic_storage.py | 219 ++++++++++++++++++ 4 files changed, 423 insertions(+) create mode 100644 python/aibrix/aibrix/batch/__init__.py create mode 100644 python/aibrix/aibrix/batch/storage/__init__.py create mode 100644 python/aibrix/aibrix/batch/storage/batch_storage.py create mode 100644 python/aibrix/aibrix/batch/storage/generic_storage.py diff --git a/python/aibrix/aibrix/batch/__init__.py b/python/aibrix/aibrix/batch/__init__.py new file mode 100644 index 00000000..46b40b48 --- /dev/null +++ b/python/aibrix/aibrix/batch/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2024 The Aibrix Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +__all__ = ["create_batch_input", "retrieve_batch_job_content"] diff --git a/python/aibrix/aibrix/batch/storage/__init__.py b/python/aibrix/aibrix/batch/storage/__init__.py new file mode 100644 index 00000000..b780b79a --- /dev/null +++ b/python/aibrix/aibrix/batch/storage/__init__.py @@ -0,0 +1,76 @@ +# Copyright 2024 The Aibrix Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from aibrix.batch.storage.batch_storage import (get_job_request_len, + get_storage_job_results, + put_storage_job_results, + read_job_requests, + remove_storage_job_data, + upload_input_data) + + +def submit_job_input(inputDataFile): + """Upload job input data file to storage. + + Args: + inputDataFile (str): an input file string. Each line is a request. + """ + job_id = upload_input_data(inputDataFile) + return job_id + + +def delete_job(job_id): + """Delete job given job ID. This removes all data associated this job, + including input data and output results. + """ + remove_storage_job_data(job_id) + + +def get_job_input_requests(job_id, start_index, num_requests): + """Read job input requests specified job Id. + + Args: + job_id : job_id is returned by job submission. + start_index : request starting index for read. + num_requests: total number of requests needed to read. + """ + return read_job_requests(job_id, start_index, num_requests) + + +def get_job_num_request(job_id): + """Get the number of valid requests for this submitted job.""" + return get_job_request_len(job_id) + + +def put_job_results(job_id, start_index, requests_results): + """Write job requests results to storage. + + Args: + job_id : job_id is returned by job submission. + start_index : requests index to write. + requests_results: a list of json objects as request results to write. + """ + put_storage_job_results(job_id, start_index, requests_results) + + +def get_job_results(job_id, start_index, num_requests): + """Read job requests results from storage. + + Args: + job_id : job_id is returned by job submission. + start_index : requests index to read. + num_requests: total number of requests output needed to read. + """ + return get_storage_job_results(job_id, start_index, num_requests) diff --git a/python/aibrix/aibrix/batch/storage/batch_storage.py b/python/aibrix/aibrix/batch/storage/batch_storage.py new file mode 100644 index 00000000..a0d754f5 --- /dev/null +++ b/python/aibrix/aibrix/batch/storage/batch_storage.py @@ -0,0 +1,112 @@ +# Copyright 2024 The Aibrix Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +from aibrix.batch.storage.generic_storage import LocalDiskFiles + +current_job_offsets = {} +job_input_requests = {} +p_storage = LocalDiskFiles() +NUM_REQUESTS_PER_READ = 1024 + + +def upload_input_data(inputDataFileName): + """Upload job input data file to storage. + + Args: + inputDataFileName (str): an input file string. + """ + job_id = uuid.uuid1() + p_storage.write_job_input_data(job_id, inputDataFileName) + + current_job_offsets[job_id] = 0 + return job_id + + +def read_job_requests(job_id, start_index, num_requests): + """Read job requests starting at index: start_index. + + Instead of reading from storage per request, this maintains a list of requests + in memory with a length of NUM_REQUESTS_PER_READ. + It also supports random access, if backward read is necessary. + """ + + if job_id not in current_job_offsets: + print(f"Create job {job_id} first. Can not find corresponding job ID!!") + return [] + + # if no request is cached, this reads a list of requests from storage. + if job_id not in job_input_requests: + request_inputs = p_storage.read_job_input_data(job_id, 0, NUM_REQUESTS_PER_READ) + job_input_requests[job_id] = request_inputs + + current_start_idx = current_job_offsets[job_id] + current_end_idx = current_start_idx + len(job_input_requests[job_id]) + + # this reads request backward, so it only reads necessary part. + if start_index < current_start_idx: + if start_index + num_requests < current_start_idx + 1: + temp_len = num_requests + diff_requests = p_storage.read_job_input_data(job_id, start_index, temp_len) + job_input_requests[job_id] = diff_requests + else: + temp_len = current_start_idx + 1 - start_index + diff_requests = p_storage.read_job_input_data(job_id, start_index, temp_len) + job_input_requests[job_id] = diff_requests + job_input_requests[job_id] + + current_job_offsets[job_id], current_start_idx = start_index, start_index + current_end_idx = current_start_idx + len(job_input_requests[job_id]) + + # the cached parts miss already, this throws away old caches. + if start_index >= current_end_idx: + current_job_offsets[job_id] = start_index + job_input_requests[job_id] = [] + current_start_idx, current_end_idx = start_index, start_index + + # now this reads necessary requests at least for a length of num_requests. + if start_index + num_requests > current_end_idx: + temp_len = start_index + num_requests - current_end_idx + diff_requests = p_storage.read_job_input_data(job_id, current_end_idx, temp_len) + job_input_requests[job_id] = job_input_requests[job_id] + diff_requests + current_end_idx = current_job_offsets[job_id] + len(job_input_requests[job_id]) + + available_num_req = min(num_requests, current_end_idx - start_index) + start_offset = start_index - current_start_idx + + requests = job_input_requests[job_id][ + start_offset : start_offset + available_num_req + ] + # print("debug", len(requests)) + return requests + + +def put_storage_job_results(job_id, start_index, requests_results): + """Write job results on a specific index.""" + p_storage.write_job_output_data(job_id, start_index, requests_results) + + +def get_storage_job_results(job_id, start_index, num_requests): + """Read job requests results.""" + return p_storage.read_job_output_data(job_id, start_index, num_requests) + + +def remove_storage_job_data(job_id): + """Remove job all relevant data.""" + p_storage.delete_job_data(job_id) + + +def get_job_request_len(job_id): + """Get the number of requests for the job_id.""" + return p_storage.get_job_number_requests(job_id) diff --git a/python/aibrix/aibrix/batch/storage/generic_storage.py b/python/aibrix/aibrix/batch/storage/generic_storage.py new file mode 100644 index 00000000..5e28e8c6 --- /dev/null +++ b/python/aibrix/aibrix/batch/storage/generic_storage.py @@ -0,0 +1,219 @@ +# Copyright 2024 The Aibrix Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from abc import ABC, abstractmethod + + +class PersistentStorage(ABC): + """ + This is an abstract class. + + A storage should implement this class, such as Local files, TOS and S3. + Any storage implementation are transparent to external components. + """ + + @abstractmethod + def write_job_input_data(self, job_id, inputDataFileName): + pass + + @abstractmethod + def read_job_input_data(self, job_id, start_index=0, num_requests=-1): + pass + + @abstractmethod + def write_job_output_data(self, job_id, output_list): + pass + + @abstractmethod + def delete_job_data(self, job_id): + pass + + def get_current_storage_type(self): + print("Current storage type: ", self._storage_type_name) + return self._storage_type_name + + @classmethod + def create_storage(cls, storage_type=0): + if storage_type == 0: + return LocalDiskFiles() + else: + raise ValueError("Unknown storage type") + + +class LocalDiskFiles(PersistentStorage): + """ + This stores all job data in local disk as files. + """ + + def __init__(self): + self.directory_path = os.path.abspath(os.path.dirname(__file__)) + self.directory_path = self.directory_path + "/data/" + os.makedirs(self.directory_path, exist_ok=True) + print("Storage path: ", self.directory_path) + + def write_job_input_data(self, job_id, inputDataFileName): + """This writes requests file to local disk.""" + request_list = [] + # Open the JSON file + with open(inputDataFileName, "r") as file: + # Parse JSON data into a Python dictionary + for line in file.readlines(): + if len(line) <= 1: + continue + data = json.loads(line) + request_list.append(data) + + num_valid_request = len(request_list) + print(f"Storage side receives {num_valid_request} request.") + + directory_path = self.directory_path + str(job_id) + "/" + os.makedirs(directory_path) + + inputFileName = "input.json" + inputJsonName = directory_path + inputFileName + with open(inputJsonName, "w") as file: + for obj in request_list: + file.write(json.dumps(obj) + "\n") + + def read_job_input_data(self, job_id, start_index=0, num_requests=-1): + """Read job requests input from local disk.""" + directory_path = self.directory_path + str(job_id) + "/" + inputFileName = "input.json" + inputJsonName = directory_path + inputFileName + + request_inputs = [] + if not os.path.exists(inputJsonName): + print(f"job {job_id} does not exist in storage!") + return request_inputs + + with open(inputJsonName, "r") as file: + for _ in range(start_index): + next(file) + if not file: + print("read requests is out of index, not enough size.") + return request_inputs + + if num_requests > 0: + for _ in range(num_requests): + line = file.readline() + if not line: # End of file reached + break + data = json.loads(line) + request_inputs.append(data) + else: + # Parse JSON data into a Python dictionary + for line in file.readlines(): + if len(line) <= 1: + continue + data = json.loads(line) + request_inputs.append(data) + # print("debug: ", len(request_inputs)) + return request_inputs + + def get_job_number_requests(self, job_id): + """Get job requests length from local disk.""" + directory_path = self.directory_path + str(job_id) + "/" + inputFileName = "input.json" + inputJsonName = directory_path + inputFileName + + if not os.path.exists(inputJsonName): + print(f"job {job_id} does not exist in storage!") + return 0 + + with open(inputJsonName, "r") as file: + return sum(1 for line in file) + + return 0 + + def write_job_output_data(self, job_id, start_index, output_list): + """Write job results output as files.""" + directory_path = self.directory_path + str(job_id) + "/" + + if not os.path.exists(directory_path): + print( + f"Error: Job {job_id} does not exist, perhaps need to create Job first!" + ) + return + output_file_path = directory_path + "output.json" + + with open(output_file_path, "w") as file: + for _ in range(start_index): + next(file) + if not file: + print("writing requests is out of index.") + return + + for obj in output_list: + file.write(json.dumps(obj) + "\n") + + def read_job_output_data(self, job_id, start_index, num_requests): + """Read job results output from local disk as files.""" + directory_path = self.directory_path + str(job_id) + "/" + + output_data = [] + if not os.path.exists(directory_path): + print( + f"Error: Job {job_id} does not exist, perhaps need to create Job first!" + ) + return output_data + output_file_path = directory_path + "output.json" + + with open(output_file_path, "r") as file: + for _ in range(start_index): + next(file) + if not file: + print("reading requests output is out of index.") + return output_data + + num_lines = 0 + for line in file.readlines(): + if len(line) <= 1: + continue + data = json.loads(line) + output_data.append(data) + num_lines += 1 + if num_lines == num_requests: + break + + return output_data + + def delete_job_data(self, job_id): + """Delete all input and output files for the job.""" + directory_path = self.directory_path + str(job_id) + "/" + + input_file_path = directory_path + "input.json" + try: + os.remove(input_file_path) + except FileNotFoundError: + print(f"Job ID {input_file_path} does not exist.") + except Exception as e: + print(f"Error: {e}") + + output_file_path = directory_path + "output.json" + if os.path.exists(output_file_path): + try: + os.remove(output_file_path) + except FileNotFoundError: + print(f"Job output {output_file_path} does not exist.") + except Exception as e: + print(f"Error: {e}") + + try: + os.rmdir(directory_path) + except FileNotFoundError: + print(f"Job Directory {directory_path} does not exist.") + except OSError as e: + print(f"Error: {e} - Job Directory is not empty or can't be deleted.")