-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathcloud_scheduler
executable file
·419 lines (333 loc) · 17.3 KB
/
cloud_scheduler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
#!/usr/bin/env python
# vim: set expandtab ts=4 sw=4:
# Copyright (C) 2009 University of Victoria
# You may distribute under the terms of either the GNU General Public
# License or the Apache v2 License, as specified in the README file.
## Auth: Duncan Penfold-Brown. 6/15/2009
## CLOUD SCHEDULER
##
## The main body for the cloud scheduler, that encapsulates and organizes
## all cloud scheduler functionality.
##
## Using optparse for command line options (http://docs.python.org/library/optparse.html)
##
## Imports
import os
import sys
import time
import string
import getopt
import logging
import threading
import ConfigParser
import logging.handlers
from optparse import OptionParser
import cloudscheduler.config as config
import cloudscheduler.utilities as utilities
import cloudscheduler.__version__ as version
import cloudscheduler.info_server as info_server
import cloudscheduler.cloud_management as cloud_management
import cloudscheduler.job_management as job_management
## GLOBAL VARIABLES
usage_str = "cloud_scheduler [-f FILE | --config-file] [-c FILE | --cloud-config FILE] [-m SERVER | --MDS SERVER]"
version_str = "Cloud Scheduler " + version.version
log = logging.getLogger("cloudscheduler")
null_handler = utilities.NullHandler()
log.addHandler(null_handler)
## Thread Bodies
# Polling:
# The resource & running vm polling thread. Inherits from Thread class.
# Polling thread will iterate through the resource pool, updating resource
# status based on vm_poll calls to each vm in a cluster's 'vms' list (of
# running vms)
# Constructed with argument 'resource_pool'
class PollingTh(threading.Thread):
def __init__(self, resource_pool):
threading.Thread.__init__(self)
self.resource_pool = resource_pool
def run(self):
# TODO: Implement polling thread functionality here.
log.info("Starting polling thread...")
# Scheduling:
# Scheduling thread will match jobs to available resources and start vms
# (for now, will contain test create/destroy functions)
class SchedulingTh(threading.Thread):
def __init__(self, resource_pool, job_pool):
threading.Thread.__init__(self)
self.resource_pool = resource_pool
self.job_pool = job_pool
self.quit = False
def stop(self):
log.debug("Waiting for scheduling loop to end")
self.quit = True
def run(self):
log.info("Starting scheduling thread...")
########################################################################
## Full scheduler loop
########################################################################
while (not self.quit):
log_with_line("Scheduler Cycle:")
## Query the job pool to get new unscheduled jobs
# Populates the 'jobs' and 'scheduled_jobs' lists appropriately
log_with_line("Querying Condor job pool (via SOAP)")
condor_jobs = self.job_pool.job_querySOAP()
if (condor_jobs == None):
log.error("Failed to contact Condor job scheduler. Continuing with VM management.")
else:
self.job_pool.update_jobs(condor_jobs)
## Schedule user jobs
# TODO: This should be less explicit in its reference to JobPool class parts.
# Something like 'for job in self.job_pool.get_unsched_jobs()'
log_with_line("Attempt to schedule user jobs")
for user in self.job_pool.new_jobs.keys():
# Attempt to schedule jobs in order of their appearance in user's job list
# (currently sorted by Job priority)
for job in self.job_pool.new_jobs[user]:
# Find resources that match the job's requirements
(pri_rsrc, sec_rsrc) = self.resource_pool.get_resourceBF(job.req_network, \
job.req_cpuarch, job.req_memory, job.req_cpucores, job.req_storage)
# If no resource fits, continue to next job in user's list
if pri_rsrc == None:
log.info("No resource to match job: %s" % job.id)
log.info("Leaving job unscheduled, moving to %s's next job" % user)
continue
# Print details of the resource selected
log.info("Open resource selected for job %s:" % job.id)
pri_rsrc.log()
# Start VM with job requirements on the selected resource
log.info("Creating VM for job %s on primary resource" % job.id)
create_ret = pri_rsrc.vm_create(job.req_image,
job.req_vmtype, job.req_network, job.req_cpuarch,
job.req_imageloc, job.req_memory, job.req_cpucores,
job.req_storage)
# If the VM create fails, try again on secondary resource
if (create_ret != 0):
log.info("Creating VM for job %s failed on cluster %s." % (job.id, pri_rsrc.name))
log.info("Attempting VM creation on secondary resource.")
# If VM creation fails for user-job on both resources (or if there is no
# secondary resource), move to next user
if sec_rsrc == None:
log.info("No secondary resource available for job %s" % job.id)
log.info("Leaving %s's jobs unscheduled, moving on to next user" % user)
break
# Print details of the secondary resource
log.info("Secondary resource selected for job %s:" % job.id)
sec_rsrc.log()
# Start VM on secondary resource
log.info("Creating VM for job %s on secondary resource" % job.id)
create_ret = sec_rsrc.vm_create(job.req_image,
job.req_vmtype, job.req_network, job.req_cpuarch,
job.req_imageloc, job.req_memory, job.req_cpucores,
job.req_storage)
# If secondary create fails, continue to next user
if (create_ret != 0):
log.info("Creating VM for job %s failed on cluster %s." % (job.id, sec_rsrc.name))
log.info("Leaving user's jobs unscheduled. Moving to next user.")
break
# ENDIF - if VM create fails on primary resource
# Mark job as scheduled
self.job_pool.schedule(job)
# ENDFOR - for jobs in user's unscheduled job set
#ENDFOR - Attempt to schedule each one job per user
## Wait for a number of seconds
#log_with_line("Waiting")
log.debug("Scheduler - Waiting...")
time.sleep(1)
## Clear all un-needed VMs from the system
log_with_line("Clearing all un-needed VMs from the system")
# TODO: Add counting code here. Count the number of jobs that require a certain VM type,
# and then destroy the EXCESS VMs in that type (leaving a few for spare)
log.debug("Gathering required VM types.")
required_vmtypes = self.job_pool.get_required_vmtypes()
# Remove all VMs with a type not required by current system jobs
log.debug("Removing unneeded VMs from the system.")
for cluster in self.resource_pool.resources:
# Use alternative loop structure to avoid unsafe remove operation
i = len(cluster.vms)
while (i != 0):
i = i-1
# If VM is not required, destroy it
if (cluster.vms[i].vmtype not in required_vmtypes):
log.info("VM type %s not required. Destroying VM:" % cluster.vms[i].vmtype)
cluster.vms[i].log()
destroy_ret = cluster.vm_destroy(cluster.vms[i])
if (destroy_ret != 0):
log.error("Destroying VM failed in attempt to clear an un-needed VM. Leaving VM.")
continue
#ENDWHILE - Check all VMs on a cluster
#ENDFOR - Clear all unneeded VMs from the system
## Poll all remaining system VMs
log_with_line ("Polling all running VMs...")
for cluster in self.resource_pool.resources:
for vm in cluster.vms:
ret_state = cluster.vm_poll(vm)
# Print polled VM's state and details
log.debug("Polled VM: ")
vm.log_dbg()
## If the VM is in an error state, manually recreate it
# We use a manual re-create here because calling the Cluster recreate method
# would simply try to recreate the VM on the same cluster. To avoid repetitious
# errors (trying to recreate again and again on a bad cluster), we find another
# resource and attempt to create on that resource.
if ret_state == "Error":
log.info("VM %s in error state. Recreating..." % vm.name)
# Store all VM fields
vm_name = vm.name
vm_type = vm.vmtype
vm_network = vm.network
vm_cpuarch = vm.cpuarch
vm_imageloc = vm.imagelocation
vm_mem = vm.memory
vm_cores = vm.cpucores
vm_storage = vm.storage
# Destroy the VM
destroy_ret = cluster.vm_destroy(vm)
if (destroy_ret != 0):
log.error("Destroying VM failed. Leaving VM in error state.")
continue
# Find an available, balanced resource to recreate on
(pri_rsrc, sec_rsrc) = self.resource_pool.get_resourceBF(vm_network, vm_cpuarch, vm_mem, vm_cores, vm_storage)
if (pri_rsrc == None):
log.info("No resource found for recreation. Aborting recreate.")
continue
# Recreate the VM on the new resource
log.info("Open resource selected for recreate:")
pri_rsrc.log()
create_ret = pri_rsrc.vm_create(vm_name, vm_type, vm_network, vm_cpuarch, vm_imageloc, vm_mem, vm_cores, vm_storage)
if (create_ret != 0):
log.error("Recreating VM failed. Leaving VM in error state.")
# TODO: Could add code here to retry on secondary resource.
# Too time consuming
continue
# ENDIF - if VM in error state
# ENDFOR - for each VM in cluster vms list
# ENDFOR - For each cluster in the resource pool
# ENDWHILE - End of the main scheduler loop
# Exit the scheduling thread - clean up VMs and exit
log.info("Exiting scheduler thread")
# Destroy all VMs and finish
remaining_vms = []
log_with_line("Destroying all remaining VMs and exiting :-(")
for cluster in self.resource_pool.resources:
i = len(cluster.vms)
while (i != 0):
i = i-1
log.info("Destroying VM:")
cluster.vms[i].log
destroy_ret = cluster.vm_destroy(cluster.vms[i])
if destroy_ret != 0:
log.error("Destroying VM failed. Continuing anyway... check VM logs")
remaining_vms.append(cluster.vms[i])
#ENDFOR - Attempt to destroy each remaining VM in the system
# Print list of VMs cloud scheduler failed to destroy before exit.
if (remaining_vms != []):
log.error("The following VMs could not be destroyed properly:")
for vm in remaining_vms:
log.error("VM: %s, ID: %s" % (vm.name, vm.id))
##
## Functions
##
def main():
# Log entry message (for timestamp in log)
log.info("Cloud Scheduler system starting...")
# Create a parser and process commandline arguments
parser = OptionParser(usage=usage_str, version=version_str)
set_options(parser)
(cli_options, args) = parser.parse_args()
# Look for global configuration file, and initialize config
if (cli_options.config_file):
config.setup(path=cli_options.config_file)
else:
config.setup()
# Set up logging
log.setLevel(utilities.LEVELS[config.log_level])
# TODO: Requires Python 2.5+
#log_formatter = logging.Formatter("%(asctime)s - %(levelname)s " \
# "- %(funcName)s: %(message)s")
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s " \
"- %(lineno)4d - %(message)s")
if config.log_stdout:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(log_formatter)
log.addHandler(stream_handler)
if config.log_location:
file_handler = None
if config.log_max_size:
file_handler = logging.handlers.RotatingFileHandler(
config.log_location,
maxBytes=config.log_max_size)
else:
file_handler = logging.handlers.RotatingFileHandler(
config.log_location,)
file_handler.setFormatter(log_formatter)
log.addHandler(file_handler)
if not config.log_location and not config.log_stdout:
null_handler = NullHandler()
log.addHandler(null_handler)
# Command line options take precedence, so replace config file
# option with command line option
if cli_options.cloud_conffile:
config.cloud_resource_config = cli_options.cloud_conffile
# If the neither the cloud conffile or the MDS server are passed to obtain
# initial cluster information, print usage and exit the system.
if (not config.cloud_resource_config) and (not cli_options.mds_server):
print "ERROR - main - No cloud or cluster information sources provided"
parser.print_help()
sys.exit(1)
# Create a job pool
job_pool = job_management.JobPool("Job Pool")
# Create a resource pool
cloud_resources = cloud_management.ResourcePool("Resource Pool")
cloud_resources.setup(config.cloud_resource_config)
# TODO: Add code to query an MDS to get initial cluster/cloud information
# Log the resource pool
cloud_resources.log_pool()
# TODO: Resolve issue of atomicity / reliability when 2 threads are working
# on the same resource pool data. Does it matter (best effort!)?
# Start the cloud scheduler info server for RPCs
info_serv = info_server.CloudSchedulerInfoServer(cloud_resources)
info_serv.daemon = True
info_serv.start()
log.info("Started Cloud Scheduler info server...")
# Create the Polling thread (pass resource pool)
poller = PollingTh(cloud_resources)
poller.start()
# Create the Scheduling thread (pass resource pool)
scheduler = SchedulingTh(cloud_resources, job_pool)
scheduler.start()
# Wait for keyboard input to exit the cloud scheduler
try:
while scheduler.isAlive():
time.sleep(2)
except (SystemExit, KeyboardInterrupt):
log.info("Exiting normally due to KeyboardInterrupt or SystemExit")
# Clean up out threads (shuts down all running VMs)
log.info("System exiting gracefully")
scheduler.stop()
info_serv.stop()
scheduler.join()
info_serv.join()
sys.exit()
# Sets the command-line options for a passed in OptionParser object (via optparse)
def set_options(parser):
# Option attributes: action, type, dest, help. See optparse documentation.
# Defaults: action=store, type=string, dest=[name of the option] help=none
parser.add_option("-f", "--config-file", dest="config_file",
metavar="FILE",
help="Designate a config file for Cloud Scheduler")
parser.add_option("-c", "--cloud-config", dest="cloud_conffile",
metavar="FILE",
help="Designate a config file from which cloud cluster "
"information is obtained")
parser.add_option("-m", "--MDS", dest="mds_server", metavar="SERVER",
help="Designate an MDS server from which cloud cluster "
"information is obtained")
# logs readable lined lines across the screen with message
def log_with_line(msg):
msg_len = len(msg)
fill = "-" * (120-msg_len)
log.debug("-----"+msg+fill)
##
## Main Functionality
##
main()