diff --git a/pywps/app/Process.py b/pywps/app/Process.py index dcba6f49e..c156cbd98 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -9,6 +9,7 @@ import traceback import json import shutil +import time import tempfile from pywps import get_ElementMakerForVersion, E, dblog @@ -147,22 +148,26 @@ def _execute_process(self, async, wps_request, wps_response): """ maxparallel = int(config.get_config_value('server', 'parallelprocesses')) - running = dblog.get_running().count() - stored = dblog.get_stored().count() + + running, stored = dblog.get_process_counts() # async if async: # run immedietly + LOGGER.debug("Running processes: {} of {} allowed parallelprocesses".format(running, maxparallel)) + LOGGER.debug("Stored processes: {}".format(stored)) + if running < maxparallel or maxparallel == -1: wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) + LOGGER.debug("Accepted request {}".format(self.uuid)) self._run_async(wps_request, wps_response) # try to store for later usage else: maxprocesses = int(config.get_config_value('server', 'maxprocesses')) if stored >= maxprocesses: - raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') + raise ServerBusy('Maximum number of processes in queue reached. Please try later.') LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid)) dblog.store_process(self.uuid, wps_request) wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0) @@ -184,11 +189,13 @@ def _run_async(self, wps_request, wps_response): process=self, wps_request=wps_request, wps_response=wps_response) + LOGGER.debug("Starting process for request: {}".format(self.uuid)) process.start() # This function may not raise exception and must return a valid wps_response # Failure must be reported as wps_response.status = WPS_STATUS.FAILED def _run_process(self, wps_request, wps_response): + LOGGER.debug("Started processing request: {}".format(self.uuid)) try: self._set_grass(wps_request) # if required set HOME to the current working directory. @@ -243,8 +250,13 @@ def launch_next_process(self): try: LOGGER.debug("Checking for stored requests") - stored_request = dblog.get_first_stored() - if not stored_request: + for _ in range(2): + stored_request = dblog.pop_first_stored() + if stored_request: + break + LOGGER.debug("No stored request found, retrying in 1 second") + time.sleep(1) + else: LOGGER.debug("No stored request found") return @@ -259,10 +271,10 @@ def launch_next_process(self): process._set_uuid(uuid) process.async = True new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid) + new_wps_response.store_status_file = True process._run_async(new_wps_request, new_wps_response) - dblog.remove_stored(uuid) except Exception as e: - LOGGER.error("Could not run stored process. {}".format(e)) + LOGGER.exception("Could not run stored process. {}".format(e)) def clean(self): """Clean the process working dir and other temporary files diff --git a/pywps/dblog.py b/pywps/dblog.py index 610056ff0..869532ee0 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -16,6 +16,7 @@ import pickle import json import os +from multiprocessing import Lock import sqlalchemy from sqlalchemy.ext.declarative import declarative_base @@ -25,14 +26,14 @@ LOGGER = logging.getLogger('PYWPS') _SESSION_MAKER = None -_LAST_SESSION = None - _tableprefix = configuration.get_config_value('logging', 'prefix') _schema = configuration.get_config_value('logging', 'schema') Base = declarative_base() +lock = Lock() + class ProcessInstance(Base): __tablename__ = '{}requests'.format(_tableprefix) @@ -78,37 +79,37 @@ def log_request(uuid, request): # NoApplicableCode("Could commit to database: {}".format(e.message)) -def get_running(): - """Returns running processes ids +def get_process_counts(): + """Returns running and stored process counts and """ session = get_session() - running = session.query(ProcessInstance).filter( - ProcessInstance.percent_done < 100).filter( - ProcessInstance.percent_done > -1) - + stored_query = session.query(RequestInstance.uuid) + running_count = ( + session.query(ProcessInstance) + .filter(ProcessInstance.percent_done < 100) + .filter(ProcessInstance.percent_done > -1) + .filter(~ProcessInstance.uuid.in_(stored_query)) + .count() + ) + stored_count = stored_query.count() session.close() - return running + return running_count, stored_count -def get_stored(): - """Returns running processes ids +def pop_first_stored(): + """Gets the first stored process and delete it from the stored_requests table """ - - session = get_session() - stored = session.query(RequestInstance) - - session.close() - return stored - - -def get_first_stored(): - """Returns running processes ids - """ - session = get_session() request = session.query(RequestInstance).first() + if request: + delete_count = session.query(RequestInstance).filter_by(uuid=request.uuid).delete() + if delete_count == 0: + LOGGER.debug("Another thread or process took the same stored request") + request = None + + session.commit() return request @@ -146,43 +147,42 @@ def _get_identifier(request): def get_session(): """Get Connection for database """ - LOGGER.debug('Initializing database connection') global _SESSION_MAKER - global _LAST_SESSION - - if _LAST_SESSION: - _LAST_SESSION.close() if _SESSION_MAKER: - _LAST_SESSION = _SESSION_MAKER() - return _LAST_SESSION - - database = configuration.get_config_value('logging', 'database') - echo = True - level = configuration.get_config_value('logging', 'level') - level_name = logging.getLevelName(level) - if isinstance(level_name, int) and level_name >= logging.INFO: - echo = False - try: - if database.startswith("sqlite") or database.startswith("memory"): - engine = sqlalchemy.create_engine(database, - connect_args={'check_same_thread': False}, - poolclass=StaticPool, - echo=echo) - else: - engine = sqlalchemy.create_engine(database, echo=echo, poolclass=NullPool) - except sqlalchemy.exc.SQLAlchemyError as e: - raise NoApplicableCode("Could not connect to database: {}".format(e.message)) - - Session = sessionmaker(bind=engine) - ProcessInstance.metadata.create_all(engine) - RequestInstance.metadata.create_all(engine) - - _SESSION_MAKER = Session - - _LAST_SESSION = _SESSION_MAKER() - return _LAST_SESSION + return _SESSION_MAKER() + + with lock: + database = configuration.get_config_value('logging', 'database') + echo = True + level = configuration.get_config_value('logging', 'level') + level_name = logging.getLevelName(level) + if isinstance(level_name, int) and level_name >= logging.INFO: + echo = False + try: + if ":memory:" in database: + engine = sqlalchemy.create_engine(database, + echo=echo, + connect_args={'check_same_thread': False}, + poolclass=StaticPool) + elif database.startswith("sqlite"): + engine = sqlalchemy.create_engine(database, + echo=echo, + connect_args={'check_same_thread': False}, + poolclass=NullPool) + else: + engine = sqlalchemy.create_engine(database, echo=echo, poolclass=NullPool) + except sqlalchemy.exc.SQLAlchemyError as e: + raise NoApplicableCode("Could not connect to database: {}".format(e.message)) + + Session = sessionmaker(bind=engine) + ProcessInstance.metadata.create_all(engine) + RequestInstance.metadata.create_all(engine) + + _SESSION_MAKER = Session + + return _SESSION_MAKER() def store_process(uuid, request): @@ -198,14 +198,3 @@ def store_process(uuid, request): session.add(request) session.commit() session.close() - - -def remove_stored(uuid): - """Remove given request from stored requests - """ - - session = get_session() - request = session.query(RequestInstance).filter_by(uuid=str(uuid)).first() - session.delete(request) - session.commit() - session.close()