Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,97 @@
## Dependencies
- Install Python packages into your virtualenv:

```
python3 -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
```

## Configuration
- Redis connection is configured via CLI flags or environment variables using `redis_connector`.
- Supported env vars:
- `REDIS_HOST` (default: localhost)
- `REDIS_PORT` (default: 6379)
- `REDIS_DB` (default: 0)
- `REDIS_USERNAME`
- `REDIS_PASSWORD`
- `REDIS_TLS` (true/1/yes/on to enable)
- `REDIS_TLS_CA_CERTS` (path to CA bundle)

## Installation
- Use the provided installer script to set up services with proper user/paths:
```bash
sudo ./install.sh [install_user] [install_path]
```
- Defaults: `install_user=rdispatch`, `install_path=/opt/rdispatch/RedisActionDispatcher`
- The installer will:
- Create a system user (if needed)
- Set up the virtualenv and install dependencies
- Install and configure systemd services with proper paths
- Copy the example environment file to `/etc/rdispatch.env`

## Services
- Sample systemd units are included: `action_dispatcher.service`, `action_server_my_server_1.service`, `dispatcher_webmonitor.service`.
- Service files use placeholders (`%INSTALL_USER%`, `%INSTALL_PATH%`) that are substituted during installation.
- Hardened defaults are applied (`NoNewPrivileges`, `PrivateTmp`, `ProtectSystem=full`, `ProtectHome=true`).

### Templated server service
- Use `[email protected]` to avoid per-ID unit files.
- Instance name must be `SERVER_CLASS:SERVER_ID`. Examples:
- Enable and start: `systemctl enable --now action_server@my_server:1`
- Check status: `systemctl status action_server@my_server:1`

### Web monitor behind Nginx (optional)
- The web monitor binds to `127.0.0.1:5000` by default (4 workers, 120s timeout, 600s keepalive).
- A complete Nginx configuration is provided in [nginx-dispatcher.conf](nginx-dispatcher.conf).

**To set up Nginx:**
```bash
# Install nginx
sudo apt install nginx # Debian/Ubuntu
# or
sudo yum install nginx # RHEL/CentOS

# Copy and edit the config
sudo cp nginx-dispatcher.conf /etc/nginx/sites-available/dispatcher
sudo nano /etc/nginx/sites-available/dispatcher # Change server_name

# Enable the site
sudo ln -s /etc/nginx/sites-available/dispatcher /etc/nginx/sites-enabled/

# Test and reload
sudo nginx -t
sudo systemctl reload nginx
```

**Features included:**
- Upstream connection pooling with keepalive
- WebSocket/SSE support on `/stream` endpoint
- Proper proxy headers and timeouts matching Gunicorn
- Static file caching optimization
- Optional HTTPS and basic auth examples (commented)
- Health check endpoint configuration

**Minimal example for quick setup:**
```nginx
server {
listen 80;
server_name dispatcher.local;

location / {
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
```

After installation, edit `/etc/rdispatch.env` for Redis configuration, then enable/start services as shown by the installer.

## Logging
- `action_dispatcher.py` and `action_server.py` use Python `logging` with `--log-level` CLI option.
RedisActionDispatcher
===
New distributed dispatchless implementation of MDSplus action server based on REDIS
Expand Down
54 changes: 29 additions & 25 deletions action_dispatcher.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ActionDispatcher:


def __init__(self, red):
import logging
self.seqActions = {}
self.depActions = {}
self.dependencies = {}
Expand All @@ -98,6 +99,7 @@ def __init__(self, red):
self.pendingSeqActions = {}
self.pendingDepActions = {}


def printTables(self):
print("******Sequential Actions")
print(self.seqActions)
Expand Down Expand Up @@ -601,35 +603,37 @@ def manageNotifications(actDisp):
def manageWatchdog(actDisp):
actDisp.serverWatchdog()

if __name__ == "__main__":
import argparse
import logging
import signal
import redis_connector

if len(sys.argv) != 1 and len(sys.argv) != 2:
print('usage: python action_dispatcher.py [redis server]')
sys.exit(0)

if len(sys.argv) == 1:
red = redis.Redis(host='localhost')
else:
red = redis.Redis(host=sys.argv[1])

act = ActionDispatcher(red)
thread = Thread(target = manageNotifications, args = (act, ))
thread.start()
threadWatch = Thread(target = manageWatchdog, args = (act, ))
threadWatch.start()
act.handleCommands()



parser = argparse.ArgumentParser()
redis_connector.add_redis_args(parser)








parser.add_argument("--log-level", default="INFO", choices=["DEBUG","INFO","WARNING","ERROR","CRITICAL"], help="Logging level")

args = parser.parse_args()

logging.basicConfig(level=getattr(logging, args.log_level))

red = redis_connector.connect_redis_from_args(args, prompt_on_auth_failure=True)
logging.info("Redis connected: %s", red.ping())

act = ActionDispatcher(red)

# Basic graceful shutdown on SIGTERM/SIGINT
def _stop(*_):
try:
act.aborted = True
except Exception:
pass
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)

thread = Thread(target=manageNotifications, args=(act, ))
thread.start()
threadWatch = Thread(target=manageWatchdog, args=(act, ))
threadWatch.start()
act.handleCommands()
25 changes: 25 additions & 0 deletions action_dispatcher.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[Unit]
Description=Action Dispatcher Service
After=network.target redis.service

[Service]
Type=simple
User=%INSTALL_USER%
WorkingDirectory=%INSTALL_PATH%
EnvironmentFile=/etc/rdispatch.env
ExecStart=/bin/bash -lc ' \
source /etc/profile.d/mdsplus.sh && \
exec %INSTALL_PATH%/.venv/bin/python -u action_dispatcher.py \
'
Restart=always
RestartSec=5
Environment=PYTHONUNBUFFERED=1
StandardOutput=journal
StandardError=inherit
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=full
ProtectHome=true

[Install]
WantedBy=multi-user.target
50 changes: 15 additions & 35 deletions action_server.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -402,57 +402,37 @@ def reportServerOn(red, ident, id):
red.hset('ACTION_SERVER_ACTIVE:'+ident, id, 'ON')
time.sleep(1)

def main(serverClass, serverId, redisServer, sequential, process):
red = redis.Redis(host=redisServer)
def main(redisObject, serverClass, serverId, sequential, process):
import logging
red = redisObject
ident = serverClass
id = serverId
print('Action server started. Server class: '+ident+', Server Id: '+id)
logging.info('Action server started. Server class: %s, Server Id: %s', ident, id)
act = ActionServer(ident, id, red)
#atexit.register(reportExit, red, ident, id)
#red.hset('ACTION_SERVER_ACTIVE:'+ident, id, 'ON')
thread = threading.Thread(target = reportServerOn, args = (red, ident, id,))
thread.start()
act.handleCommands(sequential != 0, process != 0)



if __name__ == "__main__":
import argparse
import logging
import redis_connector
parser = argparse.ArgumentParser()

# positional argument
redis_connector.add_redis_args(parser)

parser.add_argument("serverClass", help="Server Class")
parser.add_argument("serverId", help="ServerId")
parser.add_argument("redisServer", help="REDIS server")
parser.add_argument("--sequential", type=int, default=1, help="Force Mutual exclusion for log consistency")
parser.add_argument("--process", type=int, default=0, help="Force Mutual exclusion for log consistency")
args = parser.parse_args()
print(args.serverClass, args.serverId, args.redisServer, args.sequential, args.process)
main(args.serverClass, args.serverId, args.redisServer, args.sequential, args.process)
























parser.add_argument("--log-level", default="INFO", choices=["DEBUG","INFO","WARNING","ERROR","CRITICAL"], help="Logging level")

args = parser.parse_args()

logging.basicConfig(level=getattr(logging, args.log_level))

red = redis_connector.connect_redis_from_args(args)
logging.info("Redis connected: %s", red.ping())

main(red, args.serverClass, args.serverId, args.sequential, args.process)
24 changes: 24 additions & 0 deletions [email protected]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[Unit]
Description=Action Server (templated) — Class:ID from instance name
After=network.target redis.service

[Service]
Type=simple
User=%INSTALL_USER%
WorkingDirectory=%INSTALL_PATH%
EnvironmentFile=/etc/rdispatch.env
# Usage example: systemctl start action_server@my_server:1
# This shell trick splits %i as CLASS:ID into two args
ExecStart=/bin/sh -c '. /etc/profile.d/mdsplus.sh; exec %INSTALL_PATH%/.venv/bin/python -u action_server.py "${0%%:*}" "${0##*:}"' %i
Restart=always
RestartSec=5
Environment=PYTHONUNBUFFERED=1
StandardOutput=journal
StandardError=inherit
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=full
ProtectHome=true

[Install]
WantedBy=multi-user.target
Empty file modified dispatcher_command.py
100644 → 100755
Empty file.
Empty file modified dispatcher_monitor.py
100644 → 100755
Empty file.
17 changes: 11 additions & 6 deletions dispatcher_webmonitor.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@
import json


import redis_connector
import logging

LOG_FILE = "redis_pubsub.log"
redishost= os.getenv("REDIS_HOST","localhost") #Permits setting a different redishost using env var - default is "localhost"
print(f"Redis host set to: {redishost}")
#redishost= os.getenv("REDIS_HOST","localhost") #Permits setting a different redishost using env var - default is "localhost"
#print(f"Redis host set to: {redishost}")

app = Flask(__name__)
client = redis.StrictRedis(redishost, port=6379, decode_responses=True)
client = redis_connector.connect_from_env(decode_responses=True)
redishost = client.connection_pool.connection_kwargs.get("host")
logging.getLogger(__name__).info("Redis host set to: %s", redishost)
redis_client = client

lock = Lock()
Expand Down Expand Up @@ -100,7 +105,7 @@ def message_handler(message):

def log_all_channels():
setup_logger()
client = redis.StrictRedis(redishost, port=6379, decode_responses=True)
client = redis_connector.connect_from_env(decode_responses=True)
pubsub = client.pubsub()

channels = client.pubsub_channels()
Expand All @@ -125,7 +130,7 @@ def log_all_channels():
def server_list():
setup_logger()

client = redis.StrictRedis(redishost, port=6379, decode_responses=True)
client = redis_connector.connect_from_env(decode_responses=True)
pubsub = client.pubsub()

channels = client.pubsub_channels()
Expand Down Expand Up @@ -1020,7 +1025,7 @@ def publish_message():


#Initialize red to be able to call functions from dispatcher_monitor.py and action_server.py
red = redis.Redis(redishost)
red = redis_connector.connect_from_env()


app.run(debug=True, host='0.0.0.0', port=3000, threaded=True)
Expand Down
31 changes: 31 additions & 0 deletions dispatcher_webmonitor.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[Unit]
Description=Dispatcher Web Monitor (Gunicorn)
After=network.target redis.service

[Service]
Type=simple
User=%INSTALL_USER%
WorkingDirectory=%INSTALL_PATH%
EnvironmentFile=/etc/rdispatch.env
ExecStart=/bin/bash -lc ' \
source /etc/profile.d/mdsplus.sh && \
exec %INSTALL_PATH%/.venv/bin/gunicorn \
-w 4 \
-k gevent \
--keep-alive 600 \
--timeout 120 \
-b 127.0.0.1:5000 \
dispatcher_webmonitor:app \
'
Restart=always
RestartSec=5
Environment=PYTHONUNBUFFERED=1
StandardOutput=journal
StandardError=inherit
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=full
ProtectHome=true

[Install]
WantedBy=multi-user.target
Empty file modified display_logs.py
100644 → 100755
Empty file.
Loading