Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions pywps/app/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import traceback
import json
import shutil
import time
import tempfile

from pywps import get_ElementMakerForVersion, E, dblog
Expand Down Expand Up @@ -147,22 +148,26 @@ def _execute_process(self, async, wps_request, wps_response):
"""

maxparallel = int(config.get_config_value('server', 'parallelprocesses'))
running = dblog.get_running().count()
stored = dblog.get_stored().count()

running, stored = dblog.get_process_counts()

# async
if async:

# run immedietly
LOGGER.debug("Running processes: {} of {} allowed parallelprocesses".format(running, maxparallel))
LOGGER.debug("Stored processes: {}".format(stored))

if running < maxparallel or maxparallel == -1:
wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0)
LOGGER.debug("Accepted request {}".format(self.uuid))
self._run_async(wps_request, wps_response)

# try to store for later usage
else:
maxprocesses = int(config.get_config_value('server', 'maxprocesses'))
if stored >= maxprocesses:
raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')
raise ServerBusy('Maximum number of processes in queue reached. Please try later.')
LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid))
dblog.store_process(self.uuid, wps_request)
wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0)
Expand All @@ -184,11 +189,13 @@ def _run_async(self, wps_request, wps_response):
process=self,
wps_request=wps_request,
wps_response=wps_response)
LOGGER.debug("Starting process for request: {}".format(self.uuid))
process.start()

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_process(self, wps_request, wps_response):
LOGGER.debug("Started processing request: {}".format(self.uuid))
try:
self._set_grass(wps_request)
# if required set HOME to the current working directory.
Expand Down Expand Up @@ -243,8 +250,13 @@ def launch_next_process(self):
try:
LOGGER.debug("Checking for stored requests")

stored_request = dblog.get_first_stored()
if not stored_request:
for _ in range(2):
stored_request = dblog.pop_first_stored()
if stored_request:
break
LOGGER.debug("No stored request found, retrying in 1 second")
time.sleep(1)
else:
LOGGER.debug("No stored request found")
return

Expand All @@ -259,10 +271,10 @@ def launch_next_process(self):
process._set_uuid(uuid)
process.async = True
new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid)
new_wps_response.store_status_file = True
process._run_async(new_wps_request, new_wps_response)
dblog.remove_stored(uuid)
except Exception as e:
LOGGER.error("Could not run stored process. {}".format(e))
LOGGER.exception("Could not run stored process. {}".format(e))

def clean(self):
"""Clean the process working dir and other temporary files
Expand Down
123 changes: 56 additions & 67 deletions pywps/dblog.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pickle
import json
import os
from multiprocessing import Lock

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
Expand All @@ -25,14 +26,14 @@

LOGGER = logging.getLogger('PYWPS')
_SESSION_MAKER = None
_LAST_SESSION = None


_tableprefix = configuration.get_config_value('logging', 'prefix')
_schema = configuration.get_config_value('logging', 'schema')

Base = declarative_base()

lock = Lock()


class ProcessInstance(Base):
__tablename__ = '{}requests'.format(_tableprefix)
Expand Down Expand Up @@ -78,37 +79,37 @@ def log_request(uuid, request):
# NoApplicableCode("Could commit to database: {}".format(e.message))


def get_running():
"""Returns running processes ids
def get_process_counts():
"""Returns running and stored process counts and
"""

session = get_session()
running = session.query(ProcessInstance).filter(
ProcessInstance.percent_done < 100).filter(
ProcessInstance.percent_done > -1)

stored_query = session.query(RequestInstance.uuid)
running_count = (
session.query(ProcessInstance)
.filter(ProcessInstance.percent_done < 100)
.filter(ProcessInstance.percent_done > -1)
.filter(~ProcessInstance.uuid.in_(stored_query))
.count()
)
stored_count = stored_query.count()
session.close()
return running
return running_count, stored_count


def get_stored():
"""Returns running processes ids
def pop_first_stored():
"""Gets the first stored process and delete it from the stored_requests table
"""

session = get_session()
stored = session.query(RequestInstance)

session.close()
return stored


def get_first_stored():
"""Returns running processes ids
"""

session = get_session()
request = session.query(RequestInstance).first()

if request:
delete_count = session.query(RequestInstance).filter_by(uuid=request.uuid).delete()
if delete_count == 0:
LOGGER.debug("Another thread or process took the same stored request")
request = None

session.commit()
return request


Expand Down Expand Up @@ -146,43 +147,42 @@ def _get_identifier(request):
def get_session():
"""Get Connection for database
"""

LOGGER.debug('Initializing database connection')
global _SESSION_MAKER
global _LAST_SESSION

if _LAST_SESSION:
_LAST_SESSION.close()

if _SESSION_MAKER:
_LAST_SESSION = _SESSION_MAKER()
return _LAST_SESSION

database = configuration.get_config_value('logging', 'database')
echo = True
level = configuration.get_config_value('logging', 'level')
level_name = logging.getLevelName(level)
if isinstance(level_name, int) and level_name >= logging.INFO:
echo = False
try:
if database.startswith("sqlite") or database.startswith("memory"):
engine = sqlalchemy.create_engine(database,
connect_args={'check_same_thread': False},
poolclass=StaticPool,
echo=echo)
else:
engine = sqlalchemy.create_engine(database, echo=echo, poolclass=NullPool)
except sqlalchemy.exc.SQLAlchemyError as e:
raise NoApplicableCode("Could not connect to database: {}".format(e.message))

Session = sessionmaker(bind=engine)
ProcessInstance.metadata.create_all(engine)
RequestInstance.metadata.create_all(engine)

_SESSION_MAKER = Session

_LAST_SESSION = _SESSION_MAKER()
return _LAST_SESSION
return _SESSION_MAKER()

with lock:
database = configuration.get_config_value('logging', 'database')
echo = True
level = configuration.get_config_value('logging', 'level')
level_name = logging.getLevelName(level)
if isinstance(level_name, int) and level_name >= logging.INFO:
echo = False
try:
if ":memory:" in database:
engine = sqlalchemy.create_engine(database,
echo=echo,
connect_args={'check_same_thread': False},
poolclass=StaticPool)
elif database.startswith("sqlite"):
engine = sqlalchemy.create_engine(database,
echo=echo,
connect_args={'check_same_thread': False},
poolclass=NullPool)
else:
engine = sqlalchemy.create_engine(database, echo=echo, poolclass=NullPool)
except sqlalchemy.exc.SQLAlchemyError as e:
raise NoApplicableCode("Could not connect to database: {}".format(e.message))

Session = sessionmaker(bind=engine)
ProcessInstance.metadata.create_all(engine)
RequestInstance.metadata.create_all(engine)

_SESSION_MAKER = Session

return _SESSION_MAKER()


def store_process(uuid, request):
Expand All @@ -198,14 +198,3 @@ def store_process(uuid, request):
session.add(request)
session.commit()
session.close()


def remove_stored(uuid):
"""Remove given request from stored requests
"""

session = get_session()
request = session.query(RequestInstance).filter_by(uuid=str(uuid)).first()
session.delete(request)
session.commit()
session.close()