Result persistence rest api endpoint for json serialized results from deployments #17951
Replies: 2 comments
-
hi @jlwhelan28 - thanks for the detailed write up! if you're storing results in S3, you can use S3's API to read results. We don't store literal result values server-side, instead the paradigm is that you bring some of your own disk/blob and your client-side workflows write to that location and the server just knows the location. As a matter of bookkeeping, you can use choose an arbitrary example script# /// script
# dependencies = ["prefect-aws", "httpx-auth"]
# ///
import base64
import json
import os
import sys
import uuid
from typing import Any
import cloudpickle
import httpx
from httpx_auth import AWS4Auth
from prefect_aws import S3Bucket
import prefect.runtime.task_run
from prefect import task
from prefect.results import ResultStore
# --- Configuration ---
# Use a specific S3 block configured in Prefect
# Ensure this block points to a bucket you have credentials for.
S3_BLOCK_NAME = "test-s3-block"
# For the manual method, AWS credentials and bucket info must be in env vars:
# Required: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION
# Required: AWS_S3_BUCKET_NAME (should match the bucket in the S3_BLOCK_NAME)
# Optional: AWS_SESSION_TOKEN (if using temporary credentials)
# Global to store the task run ID from within the task
task_run_id: uuid.UUID | None = None
def _get_result_storage(name: str) -> S3Bucket:
"""Loads the S3Bucket block synchronously."""
return S3Bucket.load(name) # type: ignore
@task
def test_task() -> dict[str, Any]:
"""A simple task that returns a dictionary and stores its result in S3."""
global task_run_id
task_run_id = prefect.runtime.task_run.id
print(f"Task run {task_run_id} executed.")
return {"message": "Hello, world!"}
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python 17914.py <s3-bucket-name>")
sys.exit(1)
bucket_name = sys.argv[1]
print(f"Loading S3 block: {S3_BLOCK_NAME}")
result_storage = _get_result_storage(S3_BLOCK_NAME)
print("Running task with S3 result storage...")
test_task.with_options(
result_storage=result_storage,
result_storage_key="{prefect.runtime.task_run.id}",
)()
print(f"Task completed. Result stored at key: {task_run_id}")
print("--- Method 1: Easy Retrieval using Prefect ResultStore ---")
# ResultStore uses the provided storage block (and its SDK config)
# to fetch and deserialize the result automatically.
easy_access_result_store = ResultStore(result_storage=result_storage)
easy_result_object = easy_access_result_store.read(str(task_run_id))
easy_result = easy_result_object.result
print(f"Easy method result: {easy_result}")
print("--- Method 2: Manual Retrieval simulating External Process ---")
print("Requires AWS env vars set.")
# This simulates an external process that only has:
# 1. AWS Credentials (via env vars)
# 2. The Bucket Name (via env var)
# 3. The Storage Key (task_run_id)
# It uses httpx directly with AWS SigV4 auth via httpx-auth.
try:
aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
aws_session_token = os.getenv("AWS_SESSION_TOKEN")
region = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
except KeyError as e:
print(f"Error: Missing required environment variable: {e}")
print(
"Manual retrieval requires: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION "
)
raise
aws_auth = AWS4Auth(
access_id=aws_access_key_id,
secret_key=aws_secret_access_key,
security_token=aws_session_token,
region=region,
service="s3",
)
s3_url = f"https://{bucket_name}.s3.{region}.amazonaws.com/{task_run_id}"
print(f"Attempting manual authenticated fetch from: {s3_url}")
try:
response = httpx.get(s3_url, auth=aws_auth)
response.raise_for_status()
except Exception as e:
print(f"Manual fetch failed: {e}")
print("Check AWS credentials, region, bucket name, and object key.")
raise
# Manually decode and deserialize the raw S3 object content
# Assumes default Prefect serialization (JSON containing base64-encoded cloudpickle)
raw_data_bytes = response.content
try:
data = json.loads(raw_data_bytes.decode("utf-8"))
base64_result = data["result"]
pickled_result = base64.b64decode(base64_result)
manually_deserialized_result = cloudpickle.loads(pickled_result)
except Exception as e:
print(f"Manual deserialization failed: {e}")
print(f"Raw data (first 100 bytes): {raw_data_bytes[:100]}")
raise
print(f"Manual method result: {manually_deserialized_result}")
# --- Verification ---
print("Verifying results match...")
assert easy_result == manually_deserialized_result
print("Success! Both methods produced the same result.") » AWS_ACCESS_KEY_ID=AKIA.... AWS_SECRET_ACCESS_KEY=... uv run repros/17914.py zestoatbucket
Loading S3 block: test-s3-block
Running task with S3 result storage...
Task run ed870d27-f743-495d-96c7-58d04c33bc60 executed.
11:47:53.931 | INFO | Task run 'test_task' - Finished in state Completed()
Task completed. Result stored at key: ed870d27-f743-495d-96c7-58d04c33bc60
--- Method 1: Easy Retrieval using Prefect ResultStore ---
Easy method result: {'message': 'Hello, world!'}
--- Method 2: Manual Retrieval simulating External Process ---
Requires AWS env vars set.
Attempting manual authenticated fetch from: https://zestoatbucket.s3.us-east-1.amazonaws.com/ed870d27-f743-495d-96c7-58d04c33bc60
Manual method result: {'message': 'Hello, world!'}
Verifying results match...
Success! Both methods produced the same result. the trickiest part about fetching the result without the sdk is deserialization, but since you can use a custom what do you think about converting this to a discussion so we can discuss promoting recommendations on fetching from result storage into our docs? |
Beta Was this translation helpful? Give feedback.
-
Thank YOU @zzstoatzz for the detailed response. Converting to a discussion is fine by me. My motivation here is for a specific prefect server deployment where
However I understand doing this generally for the whole project is really tricky due to the nature of de-serializing results persisted with pickle. Its easy enough to enforce json serialization for my specific use case, and the current solution is just to host a custom api endpoint to de-serialize the persisted json. Only downside is a slightly inconsistent users experience in terms of api routes. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Describe the current behavior
Currently it is possible to configure flow / task persistence https://docs.prefect.io/v3/develop/results
For a deployment, those results can be retrieved via
or
Presumably these methods are using some combination of the existing API to facilitate the recreation of the flow result object in python using the configured serializer, but there isn't an direct api endpoint to facilitate that result retrieval.
Describe the proposed behavior
For a configuration similar to
I would like to be able to retrieve my persisted flow results via REST API in a response body as well as through prefect python sdk.
Example Use
Persisted results retrieved via python or rest api
Where if flow serialization is configured to pickle or compressed pickle, it returns http 400.
Additional context
If this suggestion goes against the prefect philosophy for result persistence, I respect that but would appreciate help to understand a bit more what patterns this community recommends to expose deployed flow "output" to a user base. I am working with this platform to deploy analytics in a context where an overall ecosystem of event driven asynchronous processes is not all that mature. A given deployment in this context may not have anything to "push" its result to, and needs to be used in a simpler call analytic / get response payload pattern. I'd like to be able to do that from any language without relying on a python environment with the prefect sdk installed.
Beta Was this translation helpful? Give feedback.
All reactions