Skip to content

Commit f1626d6

Browse files
authored
Merge pull request #89 from hyperflow-wms/master
Merge updates and fixes to redis-v4
2 parents b3f4d31 + 11d0cd0 commit f1626d6

File tree

3 files changed

+70
-27
lines changed

3 files changed

+70
-27
lines changed

.github/workflows/main.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ on:
2020

2121
jobs:
2222
build:
23-
runs-on: ubuntu-20.04
23+
runs-on: ubuntu-22.04
2424
steps:
2525
- uses: actions/checkout@v3
2626
with:
@@ -30,7 +30,7 @@ jobs:
3030
run: make image
3131
deploy:
3232
needs: [ "build" ]
33-
runs-on: ubuntu-20.04
33+
runs-on: ubuntu-22.04
3434
if: github.event_name == 'release' && github.event.action == 'created'
3535
steps:
3636
- name: Login to Docker Hub

examples/RemoteJobs/README.md

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,32 @@
1-
## Communication with remote Jobs using Redis
2-
This example demonstrates how HyperFlow can communicate with remote job executors using Redis.
1+
## Distributed execution of workflow Jobs using Redis
2+
This example explains the distributed execution model of Hyperflow. It demonstrates how HyperFlow can communicate with remote job executors using Redis. It is also useful for testing the implementation of the [Hyperflow job executor](https://github.com/hyperflow-wms/hyperflow-job-executor).
33

4-
- The workflow invokes function `submitRemoteJob` from `functions.js` 100 times. This function simulates submission of jobs by starting 100 parallel processes of `node handler.js <taskId> <redis_url>`.
5-
- `handler.js` represents a remote job executor which is passed two parameters: `taskId` and `redis_url`.
6-
- `handler.js` gets a `jobMessage` from HyperFlow, and then sends back a notification that the job has completed; `taskId` is used to construct appropriate Redis keys for this communication.
7-
- On the HyperFlow engine side, the Process Function can use two functions: `context.sendMsgToJob` to send a message to the job, and `context.jobResult` to wait for the notification. These functions return a [`Promise`](https://javascript.info/promise-basics), so the async/await syntax can be used as shown in the example.
4+
The distributed execution architecture consists of:
5+
1. Master components:
6+
- **The Hyperflow engine** - executes the workflow graph; for each workflow task it invokes the **Job invoker** function
7+
- **Job invoker** - Javascript function which creates jobs on a (remote) infrastructure to execute workflow tasks
8+
- **Redis server** - used for communication between the Hyperflow engine and Job executors on remote workers
9+
1. Worker components:
10+
- **Hyperflow job executor** - receives the job command from the Hyperflow engine and spawns application software
11+
- **Application software** - software that actually performs workflow tasks
12+
13+
In this example:
14+
- The workflow has two tasks (see `workflow.json`): one that executes `job.js`, the other which simply runs `ls -l`. Note that the commands to be executed are specified in `workflow.json`.
15+
- The engine invokes the function `submitRemoteJob` (Job invoker) from `functions.js`. This function simulates submission of jobs by starting the Hyperflow job executor and communicating with it via Redis to run jobs.
16+
- `../../../hyperflow-job-executor/handler.js` represents a remote job executor which is passed two parameters: `taskId` and `redis_url`. The executor gets a `jobMessage` from HyperFlow, executes the command in a separate process, and then sends back a notification that the job has completed; `taskId` is used to construct appropriate Redis keys for this communication.
17+
- On the HyperFlow engine side, the Job invoker can use two functions (provided by Hyperflow): `context.sendMsgToJob` to send a message to the job executor, and `context.jobResult` to wait for the notification from the executor. These functions return a [`Promise`](https://javascript.info/promise-basics), so the async/await syntax can be used as shown in the example.
818
- The parameter to the `context.jobResult` function is a timeout in seconds (0 denotes infinity). One can use a retry library, such as [promise-retry](https://www.npmjs.com/package/promise-retry), to implement an exponential retry strategy.
9-
- The Process Function also gets the Redis URL in `context.redis_url` which can be passed to the remote job executors.
19+
- The Job invoker also gets the Redis URL in `context.redis_url` which can be passed to the remote job executors.
1020

11-
To run the workflow, simply do `hflow run .` in this directory. You might need to run once `npm install` to install dependencies.
21+
To run the workflow, execute the following commands:
22+
1. First, clone the Hyperflow engine and the Hyperflow job executor:
23+
- `git clone https://github.com/hyperflow`
24+
- `git clone https://github.com/hyperflow-wms/hyperflow-job-executor`
25+
- `cd hyperflow; npm install`
26+
- `cd ../hyperflow-job-executor; npm install`
27+
1. Start the redis server
28+
1. To run the workflow:
29+
- `cd ../hyperflow/examples/RemoteJobs`
30+
- `npm install` (once)
31+
- `hflow run .`
1232

functions/kubernetes/amqpConnector.js

100644100755
Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,42 @@
1+
12
const amqplib = require('amqplib'),
23
createJobMessage = require('../../common/jobMessage').createJobMessage;
3-
let channels = {};
4+
45
let conn = null;
6+
let connPromise = null;
7+
let channels = {};
8+
let channelPromises = {};
9+
10+
async function getConnection() {
11+
if (conn) return conn;
12+
if (!connPromise) {
13+
console.log("[AMQP] Creating new connection...");
14+
connPromise = amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
15+
}
16+
conn = await connPromise;
17+
return conn;
18+
}
519

620
async function initialize(queue_name) {
21+
const connection = await getConnection();
22+
23+
if (channels[queue_name]) return;
724

8-
if (conn === null) {
9-
conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
25+
if (!channelPromises[queue_name]) {
26+
channelPromises[queue_name] = (async () => {
27+
try {
28+
console.log(`[AMQP] Creating channel for queue ${queue_name}`);
29+
const ch = await connection.createChannel();
30+
await ch.assertQueue(queue_name, { durable: false, expires: 6000000 });
31+
channels[queue_name] = ch;
32+
} catch (err) {
33+
delete channelPromises[queue_name]; // retry logic
34+
throw err;
35+
}
36+
})();
1037
}
11-
let ch = await conn.createChannel()
12-
await ch.assertQueue(queue_name, {durable: false, expires: 6000000});
13-
channels[queue_name] = ch
1438

39+
await channelPromises[queue_name];
1540
}
1641

1742
function getQueueName(context) {
@@ -30,28 +55,26 @@ function getQueueName(context) {
3055

3156
async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) {
3257
let context = contextArr[0];
33-
let queue_name = getQueueName(context)
34-
if (conn === null || !(queue_name in channels)) {
35-
await initialize(queue_name)
36-
}
37-
let ch = channels[queue_name]
58+
let queue_name = getQueueName(context);
3859
try {
60+
await initialize(queue_name);
61+
let ch = channels[queue_name];
3962

40-
console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`)
63+
console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`);
4164
let tasks = [];
4265

4366
for (let i = 0; i < jobArr.length; i++) {
4467
let job = jobArr[i];
4568
let taskId = taskIdArr[i];
4669
let jobMessage = createJobMessage(job.ins, job.outs, contextArr[i], taskId);
47-
await context.sendMsgToJob(JSON.stringify(jobMessage), taskId) // TODO remove
48-
tasks.push({"id": taskId, "message": jobMessage});
70+
await context.sendMsgToJob(JSON.stringify(jobMessage), taskId); // TODO remove
71+
tasks.push({ "id": taskId, "message": jobMessage });
4972
}
5073

51-
await ch.publish('', queue_name, Buffer.from(JSON.stringify({'tasks': tasks})));
74+
ch.sendToQueue(queue_name, Buffer.from(JSON.stringify({ 'tasks': tasks })));
5275
} catch (error) {
53-
console.log(error)
76+
console.log(error);
5477
}
5578
}
5679

57-
exports.enqueueJobs = enqueueJobs
80+
exports.enqueueJobs = enqueueJobs

0 commit comments

Comments
 (0)