Skip to content

Commit

Permalink
add test for scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
binux committed Mar 6, 2014
1 parent 0f25a36 commit f5b23b3
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 86 deletions.
1 change: 1 addition & 0 deletions data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.db
2 changes: 1 addition & 1 deletion database/sqlite/projectdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ def get(self, name, fields=None):

def check_update(self, timestamp, fields=None):
what = ','.join(('`%s`' % x for x in fields)) if fields else '*'
where = "updatetime >= %d" % timestamp
where = "updatetime >= %f" % timestamp
return self._select2dic(self.__tablename__, what=what, where=where)
13 changes: 8 additions & 5 deletions database/sqlite/taskdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ def _create_project(self, project):

def _parse(self, data):
for each in ('schedule', 'fetch', 'process', 'track'):
if each in data and data[each]:
data[each] = json.loads(data[each])
else:
data[each] = {}
if each in data:
if data[each]:
data[each] = json.loads(data[each])
else:
data[each] = {}
return data

def _stringify(self, data):
Expand All @@ -58,7 +59,7 @@ def _stringify(self, data):
return data

def load_tasks(self, status, project=None, fields=None):
if project not in self.projects:
if project and project not in self.projects:
raise StopIteration
what = ','.join(fields) if fields else '*'
where = "status = %d" % status
Expand All @@ -73,6 +74,8 @@ def load_tasks(self, status, project=None, fields=None):
yield self._parse(each)

def get_task(self, project, taskid, fields=None):
if project not in self.projects:
self._list_project()
if project not in self.projects:
return None
what = ','.join(fields) if fields else '*'
Expand Down
4 changes: 2 additions & 2 deletions fetcher/tornado_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ def quit(self):
self._quit = True
tornado.ioloop.IOLoop.instance().stop()

def xmlrpc_run(self, port=24444, bind='127.0.0.1'):
def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
from SimpleXMLRPCServer import SimpleXMLRPCServer

server = SimpleXMLRPCServer((bind, port), allow_none=True)
server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
server.register_introspection_functions()
server.register_multicall_functions()

Expand Down
14 changes: 14 additions & 0 deletions libs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,17 @@ def hide_me(tb, g=globals()):
if not tb:
tb = base_tb
return tb

def run_in_thread(func, *args, **kwargs):
from threading import Thread
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread

def run_in_subprocess(func, *args, **kwargs):
from multiprocessing import Process
thread = Process(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread
15 changes: 1 addition & 14 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,10 @@
import logging.config
from multiprocessing import Queue
from database.sqlite import taskdb, projectdb
from libs.utils import run_in_thread, run_in_subprocess

logging.config.fileConfig("logging.conf")

def run_in_thread(func, *args, **kwargs):
from threading import Thread
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread

def run_in_subprocess(func, *args, **kwargs):
from multiprocessing import Process
thread = Process(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread

def get_taskdb():
return taskdb.TaskDB('./data/task.db')

Expand Down
28 changes: 15 additions & 13 deletions scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Scheduler(object):
'itag': None,
}
LOOP_LIMIT = 1000
LOOP_INTERVAL = 0.1

def __init__(self, taskdb, projectdb, newtask_queue, status_queue, out_queue):
self.taskdb = taskdb
Expand All @@ -47,9 +48,9 @@ def __init__(self, taskdb, projectdb, newtask_queue, status_queue, out_queue):
"all": counter.CounterManager(
lambda : counter.TotalCounter()),
}
self._cnt['1h'].load('.scheduler.1h')
self._cnt['1d'].load('.scheduler.1d')
self._cnt['all'].load('.scheduler.all')
self._cnt['1h'].load('./data/scheduler.1h')
self._cnt['1d'].load('./data/scheduler.1d')
self._cnt['all'].load('./data/scheduler.all')
self._last_dump_cnt = 0

def _load_projects(self):
Expand All @@ -62,8 +63,7 @@ def _update_projects(self):
now = time.time()
if self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now:
return
self._last_update_project = now
for project in self.projectdb.check_update(now):
for project in self.projectdb.check_update(self._last_update_project):
logger.debug("project: %s updated." % project['name'])
self.projects[project['name']] = project
if project['name'] not in self.task_queue:
Expand All @@ -74,6 +74,7 @@ def _update_projects(self):
else:
self.task_queue[project['name']].rate = 0
self.task_queue[project['name']].burst = 0
self._last_update_project = now

scheduler_task_fields = ['taskid', 'project', 'schedule', ]
def _load_tasks(self, project):
Expand All @@ -94,7 +95,7 @@ def _load_tasks(self, project):

def task_verify(self, task):
for each in ('taskid', 'project', 'url', ):
if each not in task:
if each not in task or not task[each]:
logger.error('%s not in task: %s' % (each, unicode(task)[:200]))
return False
if task['project'] not in self.task_queue:
Expand Down Expand Up @@ -144,7 +145,7 @@ def _check_request(self):
logger.info('ignore newtask %(project)s:%(taskid)s %(url)s' % task)
continue
oldtask = self.taskdb.get_task(task['project'], task['taskid'],
self.merge_task_fields)
fields=self.merge_task_fields)
if oldtask:
task = self.on_old_request(task, oldtask)
else:
Expand All @@ -170,9 +171,9 @@ def _check_select(self):
return cnt_dict

def _dump_cnt(self):
self._cnt['1h'].dump('.scheduler.1h')
self._cnt['1d'].dump('.scheduler.1d')
self._cnt['all'].dump('.scheduler.all')
self._cnt['1h'].dump('./data/scheduler.1h')
self._cnt['1d'].dump('./data/scheduler.1d')
self._cnt['all'].dump('./data/scheduler.all')

def _try_dump_cnt(self):
now = time.time()
Expand All @@ -199,14 +200,15 @@ def run(self):
self._check_task_done()
self._check_request()
self._check_select()
time.sleep(0.1)
time.sleep(self.LOOP_INTERVAL)

logger.info("scheduler exiting...")
self._dump_cnt()

def xmlrpc_run(self, port=23333, bind='127.0.0.1'):
def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
from SimpleXMLRPCServer import SimpleXMLRPCServer

server = SimpleXMLRPCServer((bind, port), allow_none=True)
server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
server.register_introspection_functions()
server.register_multicall_functions()

Expand Down
5 changes: 1 addition & 4 deletions test/test_database_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ class TestTaskDB(unittest.TestCase):
'updatetime': time.time(),
}

def setUp(self):
pass

def test_create_project(self):
taskdb = TaskDB(':memory:')
with self.assertRaises(AssertionError):
Expand Down Expand Up @@ -158,9 +155,9 @@ def test_all(self):
self.assertNotIn('gourp', project)

# update
projectdb.update('not found', status='RUNNING')
time.sleep(0.1)
now = time.time()
projectdb.update('not found', status='RUNNING')
projectdb.update('abc', status='RUNNING')

# check update
Expand Down
34 changes: 16 additions & 18 deletions test/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,57 +59,55 @@ def setUp(self):
'name': self.project,
'status': 'DEBUG',
}

def test_build_module(self):
module = project_module.ProjectModule(self.project, self.script, self.env)
self.module = module = project_module.ProjectModule(self.project, self.script, self.env)
module.rethrow()
_class = module.get()
instance = _class()._init(self.project_info)
self.instance = _class()._init(self.project_info)

# hello
def test_2_hello(self):
self.base_task['process']['callback'] = 'hello'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, "hello world!")

# echo
def test_3_echo(self):
self.base_task['process']['callback'] = 'echo'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, "test data")

# saved
def test_4_saved(self):
self.base_task['process']['callback'] = 'saved'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, self.base_task['process']['save'])

# echo task
def test_5_echo_task(self):
self.base_task['process']['callback'] = 'echo_task'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, self.project)

# catch_status_code
def test_6_catch_status_code(self):
self.fetch_result['status_code'] = 403
self.base_task['process']['callback'] = 'catch_status_code'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, 403)
self.fetch_result['status_code'] = 200

# raise_exception
def test_7_raise_exception(self):
self.base_task['process']['callback'] = 'raise_exception'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNotNone(ret.exception)
logstr = ret.logstr()
self.assertIn('info', logstr)
self.assertIn('warning', logstr)
self.assertIn('error', logstr)

# add_task
def test_8_add_task(self):
self.base_task['process']['callback'] = 'add_task'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(len(ret.follows), 1)
self.assertEqual(len(ret.messages), 1)
Loading

0 comments on commit f5b23b3

Please sign in to comment.