Skip to content
3 changes: 2 additions & 1 deletion dpdispatcher/contexts/openapi_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
self.init_remote_root = remote_root
self.temp_local_root = os.path.abspath(local_root)
self.remote_profile = remote_profile
os.makedirs(DP_CLOUD_SERVER_HOME_DIR, exist_ok=True)
access_key = (
remote_profile.get("access_key", None)
or os.getenv("BOHRIUM_ACCESS_KEY", None)
Expand Down Expand Up @@ -170,7 +171,7 @@ def upload_job(self, job, common_files=None):
object_key = os.path.join(data["storePath"], zip_filename) # type: ignore
job.upload_path = object_key
job.job_id = data["jobId"] # type: ignore
Comment on lines 170 to 172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistent error handling for API response fields.

Lines 170 and 172 use direct indexing (data["storePath"], data["jobId"]) while lines 167, 168, and 173 use safe .get() access. If the API can omit jobGroupId, it may also omit storePath or jobId, leading to KeyError at runtime.

Apply this diff to use consistent safe access:

-        object_key = os.path.join(data["storePath"], zip_filename)  # type: ignore
+        object_key = os.path.join(data.get("storePath", ""), zip_filename)  # type: ignore
         job.upload_path = object_key
-        job.job_id = data["jobId"]  # type: ignore
+        job.job_id = data.get("jobId", "")  # type: ignore
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
object_key = os.path.join(data["storePath"], zip_filename) # type: ignore
job.upload_path = object_key
job.job_id = data["jobId"] # type: ignore
object_key = os.path.join(data.get("storePath", ""), zip_filename) # type: ignore
job.upload_path = object_key
job.job_id = data.get("jobId", "") # type: ignore
🤖 Prompt for AI Agents
In dpdispatcher/contexts/openapi_context.py around lines 170 to 172, replace the
direct indexing data["storePath"] and data["jobId"] with safe .get() access
consistent with the surrounding code (e.g., data.get("storePath") and
data.get("jobId")); then handle the possibility of None by either raising a
clear ValueError or logging an error and providing a sensible default (or
aborting the operation) so a missing field does not cause an unhandled KeyError
at runtime.

job.jgid = data["jobGroupId"] # type: ignore
job.jgid = data.get("jobGroupId", "") # type: ignore
self.storage.upload_From_file_multi_part(
object_key=object_key, file_path=upload_zip, token=token
)
Expand Down
15 changes: 9 additions & 6 deletions dpdispatcher/machines/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ def do_submit(self, job):
),
"out_files": self._gen_backward_files_list(job),
"platform": self.remote_profile.get("platform", "ali"),
"image_address": self.remote_profile.get("image_address", ""),
"image_name": self.remote_profile.get("image_address", ""),
}
if job.job_state == JobStatus.unsubmitted:
openapi_params["job_id"] = job.job_id
if "real_user_id" in self.remote_profile:
openapi_params["real_user_id"] = self.remote_profile["real_user_id"]
if "session_id" in self.remote_profile:
openapi_params["session_id"] = self.remote_profile["session_id"]
openapi_params["job_id"] = job.job_id
data = self.job.insert(**openapi_params)

job.job_id = data.get("jobId", 0) # type: ignore
Expand Down Expand Up @@ -182,8 +185,8 @@ def check_status(self, job):
self.ignore_exit_code,
)
if job_state == JobStatus.finished:
job_log = self.job.log(job_id)
if self.remote_profile.get("output_log"):
job_log = self.job.log(job_id)
print(job_log, end="")
self._download_job(job)
elif self.remote_profile.get("output_log") and job_state == JobStatus.running:
Expand All @@ -193,7 +196,7 @@ def check_status(self, job):

def _download_job(self, job):
data = self.job.detail(job.job_id)
job_url = data["jobFiles"]["outFiles"][0]["url"] # type: ignore
job_url = data["resultUrl"] # type: ignore
if not job_url:
return
job_hash = job.job_hash
Expand Down Expand Up @@ -243,7 +246,7 @@ def map_dp_job_state(status, exit_code, ignore_exit_code=True):
if status not in map_dict:
dlog.error(f"unknown job status {status}")
return JobStatus.unknown
if status == -1 and exit_code != 0 and ignore_exit_code:
if status == -1 and ignore_exit_code:
return JobStatus.finished
return map_dict[status]

Expand Down
Loading