Skip to content

Commit 30c23b7

Browse files
committed
adding support for async job execution to nfcli shells
1 parent 9530e9a commit 30c23b7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+322
-447
lines changed

docs/testing/norfab_testing_framework.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ kill -9 <PID> # Kill process
369369
Check worker status using the MMI service:
370370

371371
```python
372-
status = nfclient.get("mmi.service.broker", "show_workers")
372+
status = nfclient.mmi("mmi.service.broker", "show_workers")
373373
```
374374

375375
## Related Documentation

norfab/clients/picle_shell_client.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ class PicleConfig:
6262
@staticmethod
6363
def run(*args, **kwargs):
6464
if kwargs.get("version"):
65-
reply = NFCLIENT.get("mmi.service.broker", "show_broker_version")
65+
reply = NFCLIENT.mmi("mmi.service.broker", "show_broker_version")
6666
elif kwargs.get("inventory"):
67-
reply = NFCLIENT.get("mmi.service.broker", "show_broker_inventory")
67+
reply = NFCLIENT.mmi("mmi.service.broker", "show_broker_inventory")
6868
else:
69-
reply = NFCLIENT.get("mmi.service.broker", "show_broker")
69+
reply = NFCLIENT.mmi("mmi.service.broker", "show_broker")
7070
if reply["errors"]:
7171
return "\n".join(reply["errors"])
7272
else:
@@ -157,7 +157,6 @@ def show_client():
157157
"status": "connected",
158158
"name": NFCLIENT.name,
159159
"zmq-name": NFCLIENT.zmq_name,
160-
"recv-queue-size": NFCLIENT.recv_queue.qsize(),
161160
"broker": {
162161
"endpoint": NFCLIENT.broker,
163162
"reconnects": NFCLIENT.stats_reconnect_to_broker,
@@ -166,7 +165,6 @@ def show_client():
166165
},
167166
"directories": {
168167
"base-dir": NFCLIENT.base_dir,
169-
"events-dir": NFCLIENT.events_dir,
170168
"public-keys-dir": NFCLIENT.public_keys_dir,
171169
"private-keys-dir": NFCLIENT.private_keys_dir,
172170
},
@@ -314,7 +312,7 @@ def source_filepath():
314312
@staticmethod
315313
def source_workers():
316314
NFCLIENT = builtins.NFCLIENT
317-
reply = NFCLIENT.get("mmi.service.broker", "show_workers")
315+
reply = NFCLIENT.mmi("mmi.service.broker", "show_workers")
318316
workers = [i["name"] for i in reply["results"]]
319317

320318
return ["all", "any"] + workers

norfab/clients/shell_clients/common.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,10 @@ def wrapper(*args, **kwargs):
102102
events_thread_stop = threading.Event()
103103
uuid = uuid4().hex
104104
progress = kwargs.get("progress", True)
105+
nowait = kwargs.get("nowait", False)
105106

106107
# start events thread to handle job events printing
107-
if progress:
108+
if progress and nowait is False:
108109
events_thread = threading.Thread(
109110
target=listen_events_thread,
110111
name="NornirCliShell_events_listen_thread",
@@ -121,7 +122,7 @@ def wrapper(*args, **kwargs):
121122
res = fun(uuid, *args, **kwargs)
122123
finally:
123124
# stop events thread
124-
if NFCLIENT and progress:
125+
if NFCLIENT and progress and nowait is False:
125126
events_thread_stop.set()
126127
events_thread.join()
127128

@@ -207,6 +208,11 @@ class ClientRunJobArgs(BaseModel):
207208
description="Display progress events",
208209
json_schema_extra={"presence": True},
209210
)
211+
nowait: Optional[StrictBool] = Field(
212+
False,
213+
description="Do not wait for job to complete",
214+
json_schema_extra={"presence": True},
215+
)
210216

211217
@staticmethod
212218
def walk_norfab_files():

norfab/clients/shell_clients/fastapi/fastapi_picle_shell_auth.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def run(*args, **kwargs):
2727
workers = kwargs.pop("workers", "all")
2828
timeout = kwargs.pop("timeout", 600)
2929
verbose_result = kwargs.pop("verbose_result", False)
30+
nowait = kwargs.pop("nowait", False)
3031

3132
if "token" not in kwargs:
3233
kwargs["token"] = uuid4().hex
@@ -37,8 +38,12 @@ def run(*args, **kwargs):
3738
kwargs=kwargs,
3839
workers=workers,
3940
timeout=timeout,
41+
nowait=nowait,
4042
)
4143

44+
if nowait:
45+
return result, Outputters.outputter_nested
46+
4247
return log_error_or_result(result, verbose_result=verbose_result)
4348

4449
class PicleConfig:
@@ -54,14 +59,20 @@ def run(*args, **kwargs):
5459
workers = kwargs.pop("workers", "all")
5560
timeout = kwargs.pop("timeout", 600)
5661
verbose_result = kwargs.pop("verbose_result", False)
62+
nowait = kwargs.pop("nowait", False)
5763

5864
result = NFCLIENT.run_job(
5965
"fastapi",
6066
"bearer_token_list",
6167
kwargs=kwargs,
6268
workers=workers,
6369
timeout=timeout,
70+
nowait=nowait,
6471
)
72+
73+
if nowait:
74+
return result, Outputters.outputter_nested
75+
6576
result = log_error_or_result(result, verbose_result=verbose_result)
6677
ret = []
6778
for wname, wdata in result.items():
@@ -87,15 +98,20 @@ def run(*args, **kwargs):
8798
workers = kwargs.pop("workers", "all")
8899
timeout = kwargs.pop("timeout", 600)
89100
verbose_result = kwargs.pop("verbose_result", False)
101+
nowait = kwargs.pop("nowait", False)
90102

91103
result = NFCLIENT.run_job(
92104
"fastapi",
93105
"bearer_token_delete",
94106
kwargs=kwargs,
95107
workers=workers,
96108
timeout=timeout,
109+
nowait=nowait,
97110
)
98111

112+
if nowait:
113+
return result, Outputters.outputter_nested
114+
99115
return log_error_or_result(result, verbose_result=verbose_result)
100116

101117
class PicleConfig:
@@ -111,15 +127,20 @@ def run(*args, **kwargs):
111127
workers = kwargs.pop("workers", "all")
112128
timeout = kwargs.pop("timeout", 600)
113129
verbose_result = kwargs.pop("verbose_result", False)
130+
nowait = kwargs.pop("nowait", False)
114131

115132
result = NFCLIENT.run_job(
116133
"fastapi",
117134
"bearer_token_check",
118135
kwargs=kwargs,
119136
workers=workers,
120137
timeout=timeout,
138+
nowait=nowait,
121139
)
122140

141+
if nowait:
142+
return result, Outputters.outputter_nested
143+
123144
return log_error_or_result(result, verbose_result=verbose_result)
124145

125146
class PicleConfig:

norfab/clients/shell_clients/fastapi/fastapi_picle_shell_discover.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def run(uuid, *args, **kwargs):
3030
workers = kwargs.pop("workers", "all")
3131
timeout = kwargs.pop("timeout", 600)
3232
verbose_result = kwargs.pop("verbose_result", False)
33+
nowait = kwargs.pop("nowait", False)
3334

3435
result = NFCLIENT.run_job(
3536
"fastapi",
@@ -38,8 +39,12 @@ def run(uuid, *args, **kwargs):
3839
workers=workers,
3940
timeout=timeout,
4041
uuid=uuid,
42+
nowait=nowait,
4143
)
4244

45+
if nowait:
46+
return result, Outputters.outputter_nested
47+
4348
return log_error_or_result(result, verbose_result=verbose_result)
4449

4550
class PicleConfig:

norfab/clients/shell_clients/fastmcp/fastmcp_picle_shell_discover.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def run(uuid, *args, **kwargs):
3030
workers = kwargs.pop("workers", "all")
3131
timeout = kwargs.pop("timeout", 600)
3232
verbose_result = kwargs.pop("verbose_result", False)
33+
nowait = kwargs.pop("nowait", False)
3334

3435
result = NFCLIENT.run_job(
3536
"fastmcp",
@@ -38,8 +39,12 @@ def run(uuid, *args, **kwargs):
3839
workers=workers,
3940
timeout=timeout,
4041
uuid=uuid,
42+
nowait=nowait,
4143
)
4244

45+
if nowait:
46+
return result, Outputters.outputter_nested
47+
4348
return log_error_or_result(result, verbose_result=verbose_result)
4449

4550
class PicleConfig:

norfab/clients/shell_clients/netbox/netbox_picle_shell_cache.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class CacheList(NetboxClientRunJobArgs):
3939
@staticmethod
4040
def source_workers():
4141
NFCLIENT = builtins.NFCLIENT
42-
reply = NFCLIENT.get(
42+
reply = NFCLIENT.mmi(
4343
"mmi.service.broker", "show_workers", kwargs={"service": "netbox"}
4444
)
4545
reply = reply["results"]
@@ -53,6 +53,7 @@ def run(*args, **kwargs):
5353
details = kwargs.get("details", False)
5454
table = kwargs.pop("table", False)
5555
verbose_result = kwargs.pop("verbose_result", False)
56+
nowait = kwargs.pop("nowait", False)
5657

5758
result = NFCLIENT.run_job(
5859
"netbox",
@@ -61,7 +62,12 @@ def run(*args, **kwargs):
6162
args=args,
6263
kwargs=kwargs,
6364
timeout=timeout,
65+
nowait=nowait,
6466
)
67+
68+
if nowait:
69+
return result, Outputters.outputter_nested
70+
6571
result = log_error_or_result(result, verbose_result=verbose_result)
6672

6773
if details:
@@ -100,7 +106,7 @@ class CacheClear(NetboxClientRunJobArgs):
100106
@staticmethod
101107
def source_workers():
102108
NFCLIENT = builtins.NFCLIENT
103-
reply = NFCLIENT.get(
109+
reply = NFCLIENT.mmi(
104110
"mmi.service.broker", "show_workers", kwargs={"service": "netbox"}
105111
)
106112
reply = reply["results"]
@@ -112,6 +118,7 @@ def run(*args, **kwargs):
112118
workers = kwargs.pop("workers")
113119
timeout = kwargs.pop("timeout", 600)
114120
verbose_result = kwargs.pop("verbose_result", False)
121+
nowait = kwargs.pop("nowait", False)
115122

116123
result = NFCLIENT.run_job(
117124
"netbox",
@@ -120,8 +127,12 @@ def run(*args, **kwargs):
120127
args=args,
121128
kwargs=kwargs,
122129
timeout=timeout,
130+
nowait=nowait,
123131
)
124132

133+
if nowait:
134+
return result, Outputters.outputter_nested
135+
125136
return log_error_or_result(result, verbose_result=verbose_result)
126137

127138
class PicleConfig:
@@ -140,7 +151,7 @@ class CacheGet(NetboxClientRunJobArgs):
140151
@staticmethod
141152
def source_workers():
142153
NFCLIENT = builtins.NFCLIENT
143-
reply = NFCLIENT.get(
154+
reply = NFCLIENT.mmi(
144155
"mmi.service.broker", "show_workers", kwargs={"service": "netbox"}
145156
)
146157
reply = reply["results"]
@@ -152,6 +163,7 @@ def run(*args, **kwargs):
152163
workers = kwargs.pop("workers")
153164
timeout = kwargs.pop("timeout", 600)
154165
verbose_result = kwargs.pop("verbose_result", False)
166+
nowait = kwargs.pop("nowait", False)
155167

156168
result = NFCLIENT.run_job(
157169
"netbox",
@@ -160,8 +172,12 @@ def run(*args, **kwargs):
160172
args=args,
161173
kwargs=kwargs,
162174
timeout=timeout,
175+
nowait=nowait,
163176
)
164177

178+
if nowait:
179+
return result, Outputters.outputter_nested
180+
165181
return log_error_or_result(result, verbose_result=verbose_result)
166182

167183
class PicleConfig:

norfab/clients/shell_clients/netbox/netbox_picle_shell_common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class NetboxClientRunJobArgs(ClientRunJobArgs):
1616
@staticmethod
1717
def source_workers():
1818
NFCLIENT = builtins.NFCLIENT
19-
reply = NFCLIENT.get("mmi.service.broker", "show_workers")
19+
reply = NFCLIENT.mmi("mmi.service.broker", "show_workers")
2020
reply = reply["results"]
2121
return ["all", "any"] + [
2222
w["name"] for w in reply if w["service"].startswith("netbox")

norfab/clients/shell_clients/netbox/netbox_picle_shell_create_device_interfaces.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def run(uuid, *args, **kwargs):
1717
workers = kwargs.pop("workers", "any")
1818
timeout = kwargs.pop("timeout", 600)
1919
verbose_result = kwargs.pop("verbose_result", False)
20+
nowait = kwargs.pop("nowait", False)
2021

2122
if isinstance(kwargs.get("tags"), str):
2223
kwargs["tags"] = [kwargs["tags"]]
@@ -32,8 +33,12 @@ def run(uuid, *args, **kwargs):
3233
kwargs=kwargs,
3334
timeout=timeout,
3435
uuid=uuid,
36+
nowait=nowait,
3537
)
3638

39+
if nowait:
40+
return result, Outputters.outputter_nested
41+
3742
return log_error_or_result(result, verbose_result=verbose_result)
3843

3944
class PicleConfig:

norfab/clients/shell_clients/netbox/netbox_picle_shell_create_ip.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def run(uuid, *args, **kwargs):
7373
workers = kwargs.pop("workers", "any")
7474
timeout = kwargs.pop("timeout", 600)
7575
verbose_result = kwargs.pop("verbose_result", False)
76+
nowait = kwargs.pop("nowait", False)
7677

7778
if isinstance(kwargs.get("devices"), str):
7879
kwargs["devices"] = [kwargs["devices"]]
@@ -90,8 +91,12 @@ def run(uuid, *args, **kwargs):
9091
kwargs=kwargs,
9192
timeout=timeout,
9293
uuid=uuid,
94+
nowait=nowait,
9395
)
9496

97+
if nowait:
98+
return result, Outputters.outputter_nested
99+
95100
return log_error_or_result(result, verbose_result=verbose_result)
96101

97102
class PicleConfig:

0 commit comments

Comments
 (0)