Skip to content

ocrd network: defer enqueue until workspace is freeΒ #1046

@bertsky

Description

@bertsky

In the Processing Server, we currently add jobs to the queue unconditionally, without checking whether any job is already running on the respective workspace:

self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message)

Obviously, this will create inconsistent (or at least surprising) results when another request is made for the same workspace while running a job, or if two requests are made for the same workspace before either of them is scheduled.

IMO (since everything goes through it) we can simply look up the database for effective locking. We could add a method db_has_processing_job(workspace_id) which checks whether there are any QUEUED or RUNNING jobs for that workspace.

But what happens then? Ideally, the Processing Server would itself "wait" for the workspace to become available before actually enqueuing the new job. And that of course must be atomic. And accomodate our asynchronous model.

One idea would be to enqueue the job in a "hidden" queue, which only the Processing Server itself subscribes to and can be identified via the workspace ID (e.g. queue_name="workspace." + data.workspace_id). And when the Processing Worker is done, it sends the OcrdResultMessage not only to the OcrdProcessingMessage's result queue and callback URL, but to a constant unique result queue as well. That queue in turn gets subscribed to by the Processing Server, in a background thread independent of the requests. When a result message is consumed on it, the Processing Server checks if there is any "hidden" OcrdProcessingMessage for the same workspace ID, and if so, consumes it and enqueues it on the actual processor queue.

(But I think that would still not be atomic. Perhaps instead of the result queue mechanism and background thread, we should use the callback mechanism and add a regular endpoint like a DELETE to /processor/{processor_name}/{job_id} – to the Processing Server.)

What I find intriguing is the perspective that with the additional machinery, we could avoid the need to poll the job status in the Workflow Server: the latter could simply post/push all step jobs for that workspace right away, and rely on the Processing Server to ensure they are executed in order. (One still needs conditionality on success status, but that's also currently needed in the Workflow Server.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions