1
1
from pathlib import Path
2
2
import logging
3
+ from urllib .parse import urlparse
4
+ import subprocess
5
+ import time
3
6
4
7
import redisai
8
+ import redis
5
9
import ml2rt
6
- from mlflow .deployments import BasePlugin
10
+ from mlflow .deployments import BaseDeploymentClient
7
11
from mlflow .exceptions import MlflowException
8
12
from mlflow .tracking .artifact_utils import _download_artifact_from_uri
9
13
from mlflow .models import Model
10
14
from mlflow .protos .databricks_pb2 import INVALID_PARAMETER_VALUE
11
- import mlflow .tensorflow
12
15
13
- from .utils import get_preferred_deployment_flavor , SUPPORTED_DEPLOYMENT_FLAVORS , flavor2backend
16
+ from .utils import (get_preferred_deployment_flavor , validate_deployment_flavor ,
17
+ SUPPORTED_DEPLOYMENT_FLAVORS , flavor2backend , Config )
14
18
15
19
16
20
logger = logging .getLogger (__name__ )
17
21
18
22
19
- class RedisAIPlugin (BasePlugin ):
20
- def create (self , model_uri , flavor = None , ** kwargs ):
21
- key = kwargs .pop ('modelkey' )
23
+ def target_help ():
24
+ help_string = ("\n mlflow-redisai plugin integrates RedisAI to mlflow deployment pipeline. "
25
+ "For detailed explanation and to see multiple examples, checkout the Readme at "
26
+ "https://github.com/RedisAI/mlflow-redisai/blob/master/README.md \n \n "
27
+
28
+ "Connection parameters: You can either use the URI to specify the connection "
29
+ "parameters or specify them as environmental variables. If connection parameters "
30
+ "are present in both URI and environmental variables, parameters from the "
31
+ "environmental variables are ignored completely. The command with formatted "
32
+ "URI would look like\n \n "
33
+
34
+ " mlflow deployments <command> -t redisai:/<username>:<password>@<host>:<port>/<db>\n \n "
35
+
36
+ "If you'd like to use the default values for parameters, only specify the "
37
+ "target as given below \n \n "
38
+
39
+ " mlflow deployments <command> -t redisai\n \n "
40
+
41
+ "If you are going with environmental variables instead of URI parameters, the "
42
+ "expected keys are \n \n "
43
+
44
+ " * REDIS_HOST\n "
45
+ " * REDIS_PORT\n "
46
+ " * REDIS_DB\n "
47
+ " * REDIS_USERNAME\n "
48
+ " * REDIS_PASSWORD\n \n "
49
+
50
+ "However, if you wish to go with default values, don't set any environmental "
51
+ "variables\n \n "
52
+ "Model configuration: The ``--config`` or ``-C`` option of ``create`` and "
53
+ "``update`` API enables you to pass arguments specific to RedisAI deployment. "
54
+ "The possible config options are\n \n "
55
+
56
+ " * batchsize: Batch size for auto-batching\n "
57
+ " * tag: Tag a deployment with a version number or a given name\n "
58
+ " * device: CPU or GPU. if multiple GPUs are available, specify that too\n \n " )
59
+ return help_string
60
+
61
+
62
+ def run_local (name , model_uri , flavor = None , config = None ):
63
+ device = config .get ('device' )
64
+ if 'gpu' in device .lower ():
65
+ commands = ['docker' , 'run' , '-p' , '6379:6379' , '--gpus' , 'all' , '--rm' , 'redisai/redisai:latest' ]
66
+ else :
67
+ commands = ['docker' , 'run' , '-p' , '6379:6379' , '--rm' , 'redisai/redisai:latest' ]
68
+ proc = subprocess .Popen (commands )
69
+ plugin = RedisAIPlugin ('redisai:/localhost:6379/0' )
70
+ start_time = time .time ()
71
+ prev_num_interval = 0
72
+ while True :
73
+ logger .info ("Launching RedisAI docker container" )
22
74
try :
23
- device = kwargs .pop ('device' )
24
- except KeyError :
25
- device = 'CPU'
75
+ if plugin .con .ping ():
76
+ break
77
+ except redis .exceptions .ConnectionError :
78
+ num_interval , _ = divmod (time .time () - start_time , 10 )
79
+ if num_interval > prev_num_interval :
80
+ prev_num_interval = num_interval
81
+ try :
82
+ proc .communicate (timeout = 0.1 )
83
+ except subprocess .TimeoutExpired :
84
+ pass
85
+ else :
86
+ raise RuntimeError ("Could not start the RedisAI docker container. You can "
87
+ "try setting up RedisAI locally by (by following the "
88
+ "documentation https://oss.redislabs.com/redisai/quickstart/)"
89
+ " and call the ``create`` API with target_uri as redisai as"
90
+ "given in the example command below\n \n "
91
+ " mlflow deployments create -t redisai -m <modeluri> ...\n \n " )
92
+ time .sleep (0.2 )
93
+ plugin .create_deployment (name , model_uri , flavor , config )
94
+
95
+
96
+ class RedisAIPlugin (BaseDeploymentClient ):
97
+ def __init__ (self , uri ):
98
+ super ().__init__ (uri )
99
+ server_config = Config ()
100
+ path = urlparse (uri ).path
101
+ if path :
102
+ uri = f"redis:/{ path } "
103
+ self .con = redisai .Client .from_url (uri )
104
+ else :
105
+ self .con = redisai .Client (** server_config )
106
+
107
+ def create_deployment (self , name , model_uri , flavor = None , config = None ):
108
+ device = config .get ('device' , 'CPU' )
109
+ autobatch_size = config .get ('batchsize' )
110
+ tag = config .get ('tag' )
26
111
path = Path (_download_artifact_from_uri (model_uri ))
27
112
model_config = path / 'MLmodel'
28
113
if not model_config .exists ():
@@ -36,11 +121,10 @@ def create(self, model_uri, flavor=None, **kwargs):
36
121
if flavor is None :
37
122
flavor = get_preferred_deployment_flavor (model_config )
38
123
else :
39
- self . _validate_deployment_flavor (model_config , flavor , SUPPORTED_DEPLOYMENT_FLAVORS )
124
+ validate_deployment_flavor (model_config , flavor )
40
125
logger .info ("Using the {} flavor for deployment!" .format (flavor ))
41
126
42
- con = redisai .Client (** kwargs )
43
- if flavor == mlflow .tensorflow .FLAVOR_NAME :
127
+ if flavor == 'tensorflow' :
44
128
# TODO: test this for tf1.x and tf2.x
45
129
tags = model_config .flavors [flavor ]['meta_graph_tags' ]
46
130
signaturedef = model_config .flavors [flavor ]['signature_def_key' ]
@@ -56,39 +140,37 @@ def create(self, model_uri, flavor=None, **kwargs):
56
140
model = ml2rt .load_model (model_path )
57
141
inputs = outputs = None
58
142
backend = flavor2backend [flavor ]
59
- con .modelset (key , backend , device , model , inputs = inputs , outputs = outputs )
60
- return {'deployment_id ' : key , 'flavor' : flavor }
143
+ self . con .modelset (name , backend , device , model , inputs = inputs , outputs = outputs , batch = autobatch_size , tag = tag )
144
+ return {'name ' : name , 'flavor' : flavor }
61
145
62
- def delete (self , deployment_id , ** kwargs ):
146
+ def delete_deployment (self , name ):
63
147
"""
64
148
Delete a RedisAI model key and value.
65
149
66
- :param deployment_id : Redis Key on which we deploy the model
150
+ :param name : Redis Key on which we deploy the model
67
151
"""
68
- con = redisai .Client (** kwargs )
69
- con .modeldel (deployment_id )
70
- logger .info ("Deleted model with key: {}" .format (deployment_id ))
152
+ self .con .modeldel (name )
153
+ logger .info ("Deleted model with key: {}" .format (name ))
71
154
72
- def update (self , deployment_id , model_uri = None , flavor = False , ** kwargs ):
155
+ def update_deployment (self , name , model_uri = None , flavor = None , config = None ):
73
156
try :
74
- device = kwargs .pop ('device' )
75
- except KeyError :
76
- device = 'CPU'
77
- con = redisai .Client (** kwargs )
78
- try :
79
- con .modelget (deployment_id , meta_only = True )
80
- except Exception : # TODO: check specificially for KeyError and raise MLFlowException with proper error code
157
+ self .con .modelget (name , meta_only = True )
158
+ except redis .exceptions .ConnectionError :
81
159
raise MlflowException ("Model doesn't exist. If you trying to create new "
82
160
"deployment, use ``create_deployment``" )
83
161
else :
84
- ret = self .create ( model_uri , flavor , modelkey = deployment_id , device = device , ** kwargs )
162
+ ret = self .create_deployment ( name , model_uri , flavor , config = config )
85
163
return {'flavor' : ret ['flavor' ]}
86
164
87
- def list (self , ** kwargs ):
88
- # TODO: May be support RedisAI SCRIPT, eventually
89
- con = redisai .Client (** kwargs )
90
- return con .modelscan ()
165
+ def list_deployments (self , ** kwargs ):
166
+ return self .con .modelscan ()
167
+
168
+ def get_deployment (self , name ):
169
+ return self .con .modelget (name , meta_only = True )
91
170
92
- def get (self , deployment_id , ** kwargs ):
93
- con = redisai .Client (** kwargs )
94
- return con .modelget (deployment_id , meta_only = True )
171
+ def predict (self , deployment_name , df ):
172
+ nparray = df .to_numpy ()
173
+ self .con .tensorset ('array' , nparray )
174
+ # TODO: manage multiple inputs and multiple outputs
175
+ self .con .modelrun (deployment_name , inputs = ['array' ], outputs = ['output' ])
176
+ return self .con .tensorget ('output' )
0 commit comments