Skip to content
This repository was archived by the owner on Feb 20, 2024. It is now read-only.

Commit 6744d1d

Browse files
authored
Merge pull request #103 from nginyc/fix/stop_jobs_in_teardown
Stop jobs in teardown script
2 parents 08f44bc + 0572ba0 commit 6744d1d

File tree

19 files changed

+183
-42
lines changed

19 files changed

+183
-42
lines changed

.env.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export POSTGRES_EXT_PORT=5433
1010
export REDIS_EXT_PORT=6380
1111
export DATA_WORKDIR_PATH=$PWD/data # Shares a data folder with containers
1212
export LOGS_WORKDIR_PATH=$PWD/logs # Shares a folder with containers that stores components' logs
13+
export APP_MODE=DEV # DEV or PROD
1314

1415
# Internal credentials for Rafiki's components
1516
export POSTGRES_USER=rafiki

examples/models/image_classification/SkDt.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import base64
66
import numpy as np
77

8-
from rafiki.config import APP_MODE
98
from rafiki.model import BaseModel, InvalidModelParamsException, test_model_class, \
109
IntegerKnob, CategoricalKnob, dataset_utils, logger
1110
from rafiki.constants import TaskType, ModelDependency
@@ -17,7 +16,7 @@ class SkDt(BaseModel):
1716
@staticmethod
1817
def get_knob_config():
1918
return {
20-
'max_depth': IntegerKnob(2, 16 if APP_MODE != 'DEV' else 4),
19+
'max_depth': IntegerKnob(2, 4),
2120
'criterion': CategoricalKnob(['gini', 'entropy'])
2221
}
2322

examples/models/image_classification/SkSvm.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import base64
66
import numpy as np
77

8-
from rafiki.config import APP_MODE
98
from rafiki.model import BaseModel, InvalidModelParamsException, test_model_class, \
109
IntegerKnob, CategoricalKnob, FloatKnob, dataset_utils
1110
from rafiki.constants import TaskType, ModelDependency
@@ -17,7 +16,7 @@ class SkSvm(BaseModel):
1716
@staticmethod
1817
def get_knob_config():
1918
return {
20-
'max_iter': IntegerKnob(10, 40 if APP_MODE != 'DEV' else 10),
19+
'max_iter': IntegerKnob(10, 20),
2120
'kernel': CategoricalKnob(['rbf', 'linear']),
2221
'gamma': CategoricalKnob(['scale', 'auto']),
2322
'C': FloatKnob(1e-2, 1e2, is_exp=True)

examples/models/image_classification/TfFeedForward.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import numpy as np
88
import base64
99

10-
from rafiki.config import APP_MODE
1110
from rafiki.model import BaseModel, InvalidModelParamsException, test_model_class, \
1211
IntegerKnob, CategoricalKnob, FloatKnob, FixedKnob, dataset_utils, logger
1312
from rafiki.constants import TaskType, ModelDependency
@@ -20,8 +19,8 @@ class TfFeedForward(BaseModel):
2019
@staticmethod
2120
def get_knob_config():
2221
return {
23-
'epochs': IntegerKnob(3, 10 if APP_MODE != 'DEV' else 3),
24-
'hidden_layer_count': IntegerKnob(1, 8 if APP_MODE != 'DEV' else 2),
22+
'epochs': FixedKnob(3),
23+
'hidden_layer_count': IntegerKnob(1, 2),
2524
'hidden_layer_units': IntegerKnob(2, 128),
2625
'learning_rate': FloatKnob(1e-5, 1e-1, is_exp=True),
2726
'batch_size': CategoricalKnob([16, 32, 64, 128]),

examples/models/image_classification/TfVgg16.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from rafiki.model import BaseModel, InvalidModelParamsException, test_model_class, \
1212
IntegerKnob, FloatKnob, CategoricalKnob, dataset_utils
1313
from rafiki.constants import TaskType, ModelDependency
14-
from rafiki.config import APP_MODE
1514

1615
class TfVgg16(BaseModel):
1716
'''
@@ -20,7 +19,7 @@ class TfVgg16(BaseModel):
2019
@staticmethod
2120
def get_knob_config():
2221
return {
23-
'epochs': IntegerKnob(1, 1 if APP_MODE != 'DEV' else 10),
22+
'epochs': FixedKnob(1),
2423
'learning_rate': FloatKnob(1e-5, 1e-1, is_exp=True),
2524
'batch_size': CategoricalKnob([16, 32, 64, 128]),
2625
}

examples/models/pos_tagging/PyBiLstm.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from rafiki.model import BaseModel, InvalidModelParamsException, test_model_class, \
1616
IntegerKnob, FloatKnob, CategoricalKnob, logger, dataset_utils
1717
from rafiki.constants import TaskType, ModelDependency
18-
from rafiki.config import APP_MODE
1918

2019
class PyBiLstm(BaseModel):
2120
'''
@@ -24,7 +23,7 @@ class PyBiLstm(BaseModel):
2423
@staticmethod
2524
def get_knob_config():
2625
return {
27-
'epochs': IntegerKnob(10, 50 if APP_MODE != 'DEV' else 10),
26+
'epochs': FixedKnob(10),
2827
'word_embed_dims': IntegerKnob(16, 128),
2928
'word_rnn_hidden_size': IntegerKnob(16, 128),
3029
'word_dropout': FloatKnob(1e-3, 2e-1, is_exp=True),

rafiki/admin/admin.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
import csv
77

88
from rafiki.db import Database
9-
from rafiki.constants import ServiceStatus, UserType, ServiceType, TrainJobStatus, ModelAccessRight, BudgetType
10-
from rafiki.config import MIN_SERVICE_PORT, MAX_SERVICE_PORT, SUPERADMIN_EMAIL, SUPERADMIN_PASSWORD
9+
from rafiki.constants import ServiceStatus, UserType, ServiceType, InferenceJobStatus, \
10+
TrainJobStatus, ModelAccessRight, BudgetType
11+
from rafiki.config import SUPERADMIN_EMAIL, SUPERADMIN_PASSWORD
1112
from rafiki.model import ModelLogger
1213
from rafiki.container import DockerSwarmContainerManager
1314

@@ -288,6 +289,18 @@ def stop_train_job_worker(self, service_id):
288289
'sub_train_job_id': worker.sub_train_job_id
289290
}
290291

292+
def stop_all_train_jobs(self):
293+
train_jobs = self._db.get_train_jobs_by_status(TrainJobStatus.RUNNING)
294+
for train_job in train_jobs:
295+
self._services_manager.stop_train_services(train_job.id)
296+
297+
return [
298+
{
299+
'id': train_job.id
300+
}
301+
for train_job in train_jobs
302+
]
303+
291304
####################################
292305
# Trials
293306
####################################
@@ -467,6 +480,18 @@ def get_inference_jobs_by_user(self, user_id):
467480
for (inference_job, train_job, predictor_host) in zip(inference_jobs, train_jobs, predictor_hosts)
468481
]
469482

483+
def stop_all_inference_jobs(self):
484+
inference_jobs = self._db.get_inference_jobs_by_status(InferenceJobStatus.RUNNING)
485+
for inference_job in inference_jobs:
486+
self._services_manager.stop_inference_services(inference_job.id)
487+
488+
return [
489+
{
490+
'id': inference_job.id
491+
}
492+
for inference_job in inference_jobs
493+
]
494+
470495
####################################
471496
# Models
472497
####################################

rafiki/admin/app.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,23 @@ def get_models(auth):
298298
with admin:
299299
return jsonify(admin.get_models(auth['user_id'], **params))
300300

301+
####################################
302+
# Administrative Actions
303+
####################################
304+
305+
@app.route('/actions/stop_all_jobs', methods=['POST'])
306+
@auth([UserType.ADMIN])
307+
def stop_all_jobs(auth):
308+
admin = get_admin()
309+
310+
with admin:
311+
train_jobs = admin.stop_all_train_jobs()
312+
inference_jobs = admin.stop_all_inference_jobs()
313+
return jsonify({
314+
'train_jobs': train_jobs,
315+
'inference_jobs': inference_jobs
316+
})
317+
301318
# Handle uncaught exceptions with a server error & the error's stack trace (for development)
302319
@app.errorhandler(Exception)
303320
def handle_error(error):

rafiki/admin/services_manager.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import logging
33
import traceback
44
import time
5+
import socket
6+
from contextlib import closing
57

68
from rafiki.db import Database
79
from rafiki.constants import ServiceStatus, UserType, ServiceType, BudgetType
8-
from rafiki.config import MIN_SERVICE_PORT, MAX_SERVICE_PORT, \
9-
TRAIN_WORKER_REPLICAS_PER_SUB_TRAIN_JOB, INFERENCE_WORKER_REPLICAS_PER_TRIAL, \
10+
from rafiki.config import TRAIN_WORKER_REPLICAS_PER_SUB_TRAIN_JOB, INFERENCE_WORKER_REPLICAS_PER_TRIAL, \
1011
INFERENCE_MAX_BEST_TRIALS, SERVICE_STATUS_WAIT
1112
from rafiki.container import DockerSwarmContainerManager, ServiceRequirement, InvalidServiceRequest
1213
from rafiki.model import parse_model_install_command
@@ -122,9 +123,11 @@ def stop_train_services(self, train_job_id):
122123
train_job = self._db.get_train_job(train_job_id)
123124

124125
# Stop all workers for train job
125-
workers = self._db.get_workers_of_train_job(train_job_id)
126-
for worker in workers:
127-
self._stop_train_job_worker(worker)
126+
sub_train_jobs = self._db.get_sub_train_jobs_of_train_job(train_job_id)
127+
for sub_train_job in sub_train_jobs:
128+
workers = self._db.get_workers_of_sub_train_job(sub_train_job.id)
129+
for worker in workers:
130+
self._stop_train_job_worker(worker)
128131

129132
return train_job
130133

@@ -345,19 +348,13 @@ def _create_service(self, service_type, docker_image,
345348

346349
return service
347350

348-
# Compute next available external port
349351
def _get_available_ext_port(self):
350-
services = self._db.get_services(status=ServiceStatus.RUNNING)
351-
used_ports = [int(x.ext_port) for x in services if x.ext_port is not None]
352-
port = MIN_SERVICE_PORT
353-
while port <= MAX_SERVICE_PORT:
354-
if port not in used_ports:
355-
return port
356-
357-
port += 1
358-
359-
return port
360-
352+
# Credits to https://stackoverflow.com/questions/1365265/on-localhost-how-do-i-pick-a-free-port-number
353+
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
354+
s.bind(('', 0))
355+
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
356+
return s.getsockname()[1]
357+
361358
def _get_best_trials_for_inference(self, inference_job):
362359
best_trials = self._db.get_best_trials_of_train_job(inference_job.train_job_id)
363360
return best_trials

rafiki/client/client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,19 @@ def delete_advisor(self, advisor_id):
582582
data = self._delete('/advisors/{}'.format(advisor_id), target='advisor')
583583
return data
584584

585+
####################################
586+
# Administrative Actions
587+
####################################
588+
589+
def stop_all_jobs(self):
590+
'''
591+
Stops all train and inference jobs on Rafiki.
592+
593+
Only admins can call this.
594+
'''
595+
data = self._post('/actions/stop_all_jobs')
596+
return data
597+
585598
####################################
586599
# Private
587600
####################################

0 commit comments

Comments
 (0)