From bee093bf2edc07c127dc7aab279927ef43954aba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Tue, 1 Oct 2019 11:55:09 -0400 Subject: [PATCH 01/14] New decorator to lock tasks based on redis SETNX --- kobo/apps/hook/tasks.py | 8 ++- kobo/settings/base.py | 28 ++++++++-- ...snapshots.py => delete_asset_snapshots.py} | 2 +- .../commands/delete_base_command.py | 1 - kpi/tasks.py | 22 ++++++++ kpi/utils/lock.py | 34 +++++++++++ kpi/utils/redis_helper.py | 56 ++++++++++++++----- 7 files changed, 129 insertions(+), 22 deletions(-) rename kpi/management/commands/{delete_assets_snapshots.py => delete_asset_snapshots.py} (96%) create mode 100644 kpi/utils/lock.py diff --git a/kobo/apps/hook/tasks.py b/kobo/apps/hook/tasks.py index da74f3e883..eb86777360 100644 --- a/kobo/apps/hook/tasks.py +++ b/kobo/apps/hook/tasks.py @@ -3,18 +3,19 @@ import time -from celery import shared_task import constance +from celery import shared_task from django.conf import settings -from django.core.mail import EmailMessage, EmailMultiAlternatives, get_connection +from django.core.mail import EmailMultiAlternatives, get_connection from django.template import Context from django.template.loader import get_template from django.utils import translation, timezone from django_celery_beat.models import PeriodicTask +from kpi.utils.lock import lock +from kpi.utils.log import logging from .constants import HOOK_LOG_FAILED from .models import Hook, HookLog -from kpi.utils.log import logging @shared_task(bind=True) @@ -59,6 +60,7 @@ def retry_all_task(hooklogs_ids): @shared_task +@lock('failure_reports', timeout=600) def failures_reports(): """ Notifies owners' assets by email of hooks failures. diff --git a/kobo/settings/base.py b/kobo/settings/base.py index c704978cbb..c515bb3657 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -442,9 +442,27 @@ def __init__(self, *args, **kwargs): # 'schedule': timedelta(hours=12) #}, # Schedule every day at midnight UTC. Can be customized in admin section - "send-hooks-failures-reports": { - "task": "kobo.apps.hook.tasks.failures_reports", - "schedule": crontab(hour=0, minute=0), + 'send-hooks-failures-reports': { + 'task': 'kobo.apps.hook.tasks.failures_reports', + 'schedule': crontab(hour=0, minute=0), + 'options': {'queue': 'kpi_queue'} + }, + # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section + 'clean-orphans': { + 'task': 'kpi.tasks.clean_orphans', + 'schedule': crontab(hour=4, minute=0, day_of_week=6), + 'options': {'queue': 'kpi_queue'} + }, + # Schedule every Friday at 4:00 AM UTC. Can be customized in admin section + 'delete-asset-snapshots': { + 'task': 'kpi.tasks.delete_asset_snapshots', + 'schedule': crontab(hour=4, minute=0, day_of_week=5), + 'options': {'queue': 'kpi_queue'} + }, + # Schedule every Friday at 5:00 AM UTC. Can be customized in admin section + 'delete-import-tasks': { + 'task': 'kpi.tasks.delete_import_tasks', + 'schedule': crontab(hour=5, minute=0, day_of_week=5), 'options': {'queue': 'kpi_queue'} }, } @@ -711,6 +729,8 @@ def __init__(self, *args, **kwargs): SESSION_ENGINE = "redis_sessions.session" -SESSION_REDIS = RedisHelper.config(default="redis://redis_cache:6380/2") +SESSION_REDIS = RedisHelper.session_config(default="redis://redis_cache:6380/2") + +LOCK_REDIS = RedisHelper.lock_config(default="redis://redis_cache:6380/3") TESTING = False diff --git a/kpi/management/commands/delete_assets_snapshots.py b/kpi/management/commands/delete_asset_snapshots.py similarity index 96% rename from kpi/management/commands/delete_assets_snapshots.py rename to kpi/management/commands/delete_asset_snapshots.py index 1d1c51a1c4..e7e6599c29 100644 --- a/kpi/management/commands/delete_assets_snapshots.py +++ b/kpi/management/commands/delete_asset_snapshots.py @@ -12,7 +12,7 @@ class Command(DeleteBaseCommand): - help = "Deletes assets snapshots" + help = "Deletes asset snapshots" def _prepare_delete_queryset(self, **options): days = options["days"] diff --git a/kpi/management/commands/delete_base_command.py b/kpi/management/commands/delete_base_command.py index 2e66f08ca5..b541f9aac4 100644 --- a/kpi/management/commands/delete_base_command.py +++ b/kpi/management/commands/delete_base_command.py @@ -9,7 +9,6 @@ class DeleteBaseCommand(BaseCommand): - def __init__(self, stdout=None, stderr=None, no_color=False): super(DeleteBaseCommand, self).__init__(stdout=stdout, stderr=stderr, no_color=no_color) self._model = None diff --git a/kpi/tasks.py b/kpi/tasks.py index 38797d20ee..4566d21f6c 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -5,9 +5,11 @@ from django.core.management import call_command from kpi.models import ImportTask, ExportTask +from kpi.utils.lock import lock @shared_task +@lock(key='update_search_index', timeout=3600) def update_search_index(): call_command('update_index', using=['default',], remove=True) @@ -25,6 +27,7 @@ def export_in_background(export_task_uid): @shared_task +@lock(key='sync_kobocat_xforms', timeout=3600 * 24) def sync_kobocat_xforms(username=None, quiet=True): call_command('sync_kobocat_xforms', username=username, quiet=quiet) @@ -32,3 +35,22 @@ def sync_kobocat_xforms(username=None, quiet=True): @shared_task def import_survey_drafts_from_dkobo(**kwargs): call_command('import_survey_drafts_from_dkobo', **kwargs) + + +@shared_task +@lock(key='clean_orphans', timeout=60) +def clean_orphans(): + #ToDo Create Management command + pass + + +@shared_task +@lock(key='delete_asset_snapshots', timeout=60) +def delete_asset_snapshots(): + call_command('delete_asset_snapshots') + + +@shared_task +@lock(key='delete_import_tasks', timeout=60) +def delete_import_tasks(): + call_command('delete_import_tasks') diff --git a/kpi/utils/lock.py b/kpi/utils/lock.py new file mode 100644 index 0000000000..699c50296a --- /dev/null +++ b/kpi/utils/lock.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import os +from functools import wraps + +import redis +from django.conf import settings + + +REDIS_LOCK_CLIENT = redis.Redis(**settings.LOCK_REDIS) + + +def lock(key='', timeout=None): + + def _lock(func): + @wraps(func) + def wrapper(*args, **kwargs): + ret_value = None + have_lock = False + prefix = os.getenv('REDIS_KPI_LOCK_PREFIX', 'kpi-lock') + key_ = '{}:{}'.format(prefix, key) + lock_ = REDIS_LOCK_CLIENT.lock(key_, timeout=timeout) + try: + have_lock = lock_.acquire(blocking=False) + if have_lock: + ret_value = func(*args, **kwargs) + finally: + if have_lock: + lock_.release() + + return ret_value + return wrapper + return _lock diff --git a/kpi/utils/redis_helper.py b/kpi/utils/redis_helper.py index 8615141e66..d464e16b9a 100644 --- a/kpi/utils/redis_helper.py +++ b/kpi/utils/redis_helper.py @@ -16,26 +16,56 @@ class RedisHelper(object): """ @staticmethod - def config(default=None): + def config(url_variable, default=None): """ - Parses `REDIS_SESSION_URL` environment variable to return a dict with - expected attributes for django redis session. + Parses `url_variable` environment variable to return a dict with + expected attributes for redis clients. :return: dict """ - redis_connection_url = os.getenv("REDIS_SESSION_URL", default) - match = re.match(r"redis://(:(?P[^@]*)@)?(?P[^:]+):(?P\d+)(/(?P\d+))?", - redis_connection_url) + redis_connection_url = os.getenv(url_variable, default) + match = re.match(r'redis://(:(?P[^@]*)@)?(?P[^:]+):(?P\d+)(/(?P\d+))?', + redis_connection_url) if not match: - raise ImproperlyConfigured("Could not parse Redis session URL. Please verify 'REDIS_SESSION_URL' value") + raise ImproperlyConfigured( + "Could not parse `{}`. Please verify your settings.".format( + url_variable) + ) redis_connection_dict = { - "host": match.group("host"), - "port": match.group("port"), - "db": match.group("index") or 0, - "password": match.group("password"), - "prefix": os.getenv("REDIS_SESSION_PREFIX", "session"), - "socket_timeout": os.getenv("REDIS_SESSION_SOCKET_TIMEOUT", 1), + 'host': match.group('host'), + 'port': match.group('port'), + 'db': match.group('index') or 0, + 'password': match.group('password') } return redis_connection_dict + + @classmethod + def session_config(cls, default=None): + """ + Parses `REDIS_SESSION_URL` environment variable to return a dict + for django redis session. + + :return: dict + """ + + redis_connection_dict = cls.config('REDIS_SESSION_URL', default) + redis_connection_dict.update({ + 'prefix': os.getenv('REDIS_SESSION_PREFIX', 'session'), + 'socket_timeout': os.getenv('REDIS_SESSION_SOCKET_TIMEOUT', 1), + }) + return redis_connection_dict + + @classmethod + def lock_config(cls, default=None): + """ + Parses `REDIS_LOCK_URL` environment variable to return a dict with + expected attributes for lock mechanism based on redis. + + :return: dict + """ + + redis_connection_dict = cls.config('REDIS_LOCK_URL', default) + + return redis_connection_dict From e60400ed63f21319cf7d255a673115bab9a75ff3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 2 Oct 2019 12:46:57 -0400 Subject: [PATCH 02/14] Standardize removal management command names --- kobo/settings/base.py | 8 ++++---- ...et_snapshots.py => remove_asset_snapshots.py} | 6 +++--- ...te_base_command.py => remove_base_command.py} | 6 +++--- ...te_import_tasks.py => remove_import_tasks.py} | 6 +++--- kpi/tasks.py | 16 ++++++++-------- 5 files changed, 21 insertions(+), 21 deletions(-) rename kpi/management/commands/{delete_asset_snapshots.py => remove_asset_snapshots.py} (89%) rename kpi/management/commands/{delete_base_command.py => remove_base_command.py} (95%) rename kpi/management/commands/{delete_import_tasks.py => remove_import_tasks.py} (78%) diff --git a/kobo/settings/base.py b/kobo/settings/base.py index c515bb3657..7a143a6cb2 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -448,20 +448,20 @@ def __init__(self, *args, **kwargs): 'options': {'queue': 'kpi_queue'} }, # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section - 'clean-orphans': { - 'task': 'kpi.tasks.clean_orphans', + 'remove-s3-orphans': { + 'task': 'kpi.tasks.remove_s3_orphans', 'schedule': crontab(hour=4, minute=0, day_of_week=6), 'options': {'queue': 'kpi_queue'} }, # Schedule every Friday at 4:00 AM UTC. Can be customized in admin section 'delete-asset-snapshots': { - 'task': 'kpi.tasks.delete_asset_snapshots', + 'task': 'kpi.tasks.remove_asset_snapshots', 'schedule': crontab(hour=4, minute=0, day_of_week=5), 'options': {'queue': 'kpi_queue'} }, # Schedule every Friday at 5:00 AM UTC. Can be customized in admin section 'delete-import-tasks': { - 'task': 'kpi.tasks.delete_import_tasks', + 'task': 'kpi.tasks.remove_import_tasks', 'schedule': crontab(hour=5, minute=0, day_of_week=5), 'options': {'queue': 'kpi_queue'} }, diff --git a/kpi/management/commands/delete_asset_snapshots.py b/kpi/management/commands/remove_asset_snapshots.py similarity index 89% rename from kpi/management/commands/delete_asset_snapshots.py rename to kpi/management/commands/remove_asset_snapshots.py index e7e6599c29..96fe651dc4 100644 --- a/kpi/management/commands/delete_asset_snapshots.py +++ b/kpi/management/commands/remove_asset_snapshots.py @@ -6,13 +6,13 @@ from django.db.models import Max from django.utils import timezone -from .delete_base_command import DeleteBaseCommand +from .remove_base_command import RemoveBaseCommand from kpi.models import AssetSnapshot -class Command(DeleteBaseCommand): +class Command(RemoveBaseCommand): - help = "Deletes asset snapshots" + help = "Removes asset snapshots" def _prepare_delete_queryset(self, **options): days = options["days"] diff --git a/kpi/management/commands/delete_base_command.py b/kpi/management/commands/remove_base_command.py similarity index 95% rename from kpi/management/commands/delete_base_command.py rename to kpi/management/commands/remove_base_command.py index b541f9aac4..f272a5e74a 100644 --- a/kpi/management/commands/delete_base_command.py +++ b/kpi/management/commands/remove_base_command.py @@ -7,14 +7,14 @@ from django.db import transaction, connection -class DeleteBaseCommand(BaseCommand): +class RemoveBaseCommand(BaseCommand): def __init__(self, stdout=None, stderr=None, no_color=False): - super(DeleteBaseCommand, self).__init__(stdout=stdout, stderr=stderr, no_color=no_color) + super(RemoveBaseCommand, self).__init__(stdout=stdout, stderr=stderr, no_color=no_color) self._model = None def add_arguments(self, parser): - super(DeleteBaseCommand, self).add_arguments(parser) + super(RemoveBaseCommand, self).add_arguments(parser) parser.add_argument( "--days", default=90, diff --git a/kpi/management/commands/delete_import_tasks.py b/kpi/management/commands/remove_import_tasks.py similarity index 78% rename from kpi/management/commands/delete_import_tasks.py rename to kpi/management/commands/remove_import_tasks.py index 3fddc172b9..42fdc46c0e 100644 --- a/kpi/management/commands/delete_import_tasks.py +++ b/kpi/management/commands/remove_import_tasks.py @@ -5,13 +5,13 @@ from django.utils import timezone -from .delete_base_command import DeleteBaseCommand +from .remove_base_command import RemoveBaseCommand from kpi.models import ImportTask -class Command(DeleteBaseCommand): +class Command(RemoveBaseCommand): - help = "Deletes import tasks" + help = "Removes import tasks" def _prepare_delete_queryset(self, **options): days = options["days"] diff --git a/kpi/tasks.py b/kpi/tasks.py index 4566d21f6c..07bd1a96bf 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -38,19 +38,19 @@ def import_survey_drafts_from_dkobo(**kwargs): @shared_task -@lock(key='clean_orphans', timeout=60) -def clean_orphans(): +@lock(key='remove_s3_orphans', timeout=60) +def remove_s3_orphans(): #ToDo Create Management command pass @shared_task -@lock(key='delete_asset_snapshots', timeout=60) -def delete_asset_snapshots(): - call_command('delete_asset_snapshots') +@lock(key='remove_asset_snapshots', timeout=60) +def remove_asset_snapshots(): + call_command('remove_asset_snapshots') @shared_task -@lock(key='delete_import_tasks', timeout=60) -def delete_import_tasks(): - call_command('delete_import_tasks') +@lock(key='remove_import_tasks', timeout=60) +def remove_import_tasks(): + call_command('remove_import_tasks') From cd50613d27d8f797dca10047f7211bace6129de3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 2 Oct 2019 15:20:12 -0400 Subject: [PATCH 03/14] Added remove s3 orphans management command --- kpi/management/commands/remove_s3_orphans.py | 154 +++++++++++++++++++ kpi/tasks.py | 3 +- kpi/utils/extended_s3boto_storage.py | 19 +++ 3 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 kpi/management/commands/remove_s3_orphans.py create mode 100644 kpi/utils/extended_s3boto_storage.py diff --git a/kpi/management/commands/remove_s3_orphans.py b/kpi/management/commands/remove_s3_orphans.py new file mode 100644 index 0000000000..5aa2dfbdbc --- /dev/null +++ b/kpi/management/commands/remove_s3_orphans.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# vim: ai ts=4 sts=4 et sw=4 coding=utf-8 +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +import codecs +import re +import sys +import time + +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from django.core.files.storage import get_storage_class +from django.utils.translation import ugettext as _ + +from kpi.models.import_export_task import ExportTask + + +# S3 Monkey Patch +from boto import handler +from boto.resultset import ResultSet +from boto.s3.bucket import Bucket +import xml.sax +import xml.sax.saxutils + + +def _get_all(self, element_map, initial_query_string='', + headers=None, **params): + query_args = self._get_all_query_args( + params, + initial_query_string=initial_query_string + ) + response = self.connection.make_request('GET', self.name, + headers=headers, + query_args=query_args) + body = response.read() + + if response.status == 200: + rs = ResultSet(element_map) + h = handler.XmlHandler(rs, self) + try: + xml.sax.parseString(fix_bad_characters(body), h) + except Exception as e: + print("XML Parsing Error - {}".format(str(e))) + error_filename = "/srv/logs/s3_body_error-{}.xml".format(str(int(time.time()))) + with open(error_filename, "w") as xmlfile_error: + xmlfile_error.write("{}\n".format(str(e))) + xmlfile_error.write(body) + raise Exception(str(e)) + return rs + else: + raise self.connection.provider.storage_response_error( + response.status, response.reason, body) + + +def fix_bad_characters(str_): + + try: + str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_) + except Exception as e: + # Try to force unicode + str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", unicode(str_, "utf-8")) + str_ = str_.encode("utf-8") + return str_ + + +class Command(BaseCommand): + help = _('Removes orphan files in S3') + + def add_arguments(self, parser): + super(Command, self).add_arguments(parser) + + parser.add_argument( + "--dry-run", + action='store_true', + default=False, + help="Do not delete files", + ) + + parser.add_argument( + "--log-files", + action='store_true', + default=True, + help="Save deleted files to a CSV", + ) + + def handle(self, *args, **kwargs): + + Bucket._get_all = _get_all + + dry_run = kwargs['dry_run'] + log_files = kwargs['log_files'] + + self._s3 = get_storage_class('kpi.utils.extended_s3boto_storage.ExtendedS3BotoStorage')() + all_files = self._s3.bucket.list() + size_to_reclaim = 0 + orphans = 0 + + now = time.time() + csv_filepath = '/srv/logs/orphan_files-{}.csv'.format(int(now)) + + print('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME)) + if dry_run: + print('Dry run mode activated') + if log_files: + print('CSV: {}'.format(csv_filepath)) + + if log_files: + with open(csv_filepath, "w") as csv: + csv.write("type,filename,filesize\n") + + for f in all_files: + try: + filename = f.name + if filename[-1] != "/": + # KPI Exports + if re.match(r"[^\/]*\/exports\/.+", filename): + if not ExportTask.objects.filter(result=filename).exists(): + filesize = f.size + orphans += 1 + size_to_reclaim += filesize + if log_files: + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{},{}\n".format("exports", filename, filesize)) + csv.close() + if not dry_run: + self.delete(f) + + if time.time() - now >= 5 * 60: + print("[{}] Still alive...".format(str(int(time.time())))) + now = time.time() + + except Exception as e: + print("ERROR - {}".format(str(e))) + sys.exit(-1) + + print("Orphans: {}".format(orphans)) + print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) + + def delete(self, file_object): + try: + print("File {} does not exist in DB".format(file_object.name).encode('utf-8')) + self._s3.delete_all(file_object.name) + except Exception as e: + print("ERROR - Could not delete file {} - Reason {}".format( + file_object.name, + str(e))) + + @staticmethod + def sizeof_fmt(num, suffix='B'): + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Yi', suffix) diff --git a/kpi/tasks.py b/kpi/tasks.py index 07bd1a96bf..9a4ae96362 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -40,8 +40,7 @@ def import_survey_drafts_from_dkobo(**kwargs): @shared_task @lock(key='remove_s3_orphans', timeout=60) def remove_s3_orphans(): - #ToDo Create Management command - pass + call_command('remove_s3_orphans') @shared_task diff --git a/kpi/utils/extended_s3boto_storage.py b/kpi/utils/extended_s3boto_storage.py new file mode 100644 index 0000000000..0b37ed7e6c --- /dev/null +++ b/kpi/utils/extended_s3boto_storage.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from storages.backends.s3boto import S3BotoStorage + + +class ExtendedS3BotoStorage(S3BotoStorage): + + def delete_all(self, name): + """ + Delete the key object and all its versions + :param name: str. S3 key (i.e. path to the file) + """ + name = self._normalize_name(self._clean_name(name)) + self.bucket.delete_key(self._encode_name(name)) + + # Delete all previous versions + for versioned_key in self.bucket.list_versions(prefix=name): + self.bucket.delete_key(versioned_key.name, version_id=versioned_key.version_id) From 67372c907d01c2d2f3cdfb579147ffa43d6d7952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 2 Oct 2019 15:58:13 -0400 Subject: [PATCH 04/14] Disabled periodic task by default --- kobo/settings/base.py | 16 ++++++++++------ kpi/tasks.py | 6 +++--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/kobo/settings/base.py b/kobo/settings/base.py index 7a143a6cb2..c586760813 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -445,25 +445,29 @@ def __init__(self, *args, **kwargs): 'send-hooks-failures-reports': { 'task': 'kobo.apps.hook.tasks.failures_reports', 'schedule': crontab(hour=0, minute=0), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section 'remove-s3-orphans': { 'task': 'kpi.tasks.remove_s3_orphans', 'schedule': crontab(hour=4, minute=0, day_of_week=6), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, # Schedule every Friday at 4:00 AM UTC. Can be customized in admin section - 'delete-asset-snapshots': { + 'remove-asset-snapshots': { 'task': 'kpi.tasks.remove_asset_snapshots', 'schedule': crontab(hour=4, minute=0, day_of_week=5), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, # Schedule every Friday at 5:00 AM UTC. Can be customized in admin section - 'delete-import-tasks': { + 'remove-import-tasks': { 'task': 'kpi.tasks.remove_import_tasks', 'schedule': crontab(hour=5, minute=0, day_of_week=5), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, } diff --git a/kpi/tasks.py b/kpi/tasks.py index 9a4ae96362..ce652ced90 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -38,18 +38,18 @@ def import_survey_drafts_from_dkobo(**kwargs): @shared_task -@lock(key='remove_s3_orphans', timeout=60) +@lock(key='remove_s3_orphans', timeout=3600) def remove_s3_orphans(): call_command('remove_s3_orphans') @shared_task -@lock(key='remove_asset_snapshots', timeout=60) +@lock(key='remove_asset_snapshots', timeout=3600) def remove_asset_snapshots(): call_command('remove_asset_snapshots') @shared_task -@lock(key='remove_import_tasks', timeout=60) +@lock(key='remove_import_tasks', timeout=3600) def remove_import_tasks(): call_command('remove_import_tasks') From 897fd5e3a13cb8ec3774b3f4559064b66336bd5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Tue, 1 Oct 2019 11:55:09 -0400 Subject: [PATCH 05/14] New decorator to lock tasks based on redis SETNX --- kobo/apps/hook/tasks.py | 2 + kobo/settings/base.py | 28 ++++++++-- ...snapshots.py => delete_asset_snapshots.py} | 2 +- kpi/tasks.py | 22 ++++++++ kpi/utils/lock.py | 34 +++++++++++ kpi/utils/redis_helper.py | 56 ++++++++++++++----- 6 files changed, 126 insertions(+), 18 deletions(-) rename kpi/management/commands/{delete_assets_snapshots.py => delete_asset_snapshots.py} (96%) create mode 100644 kpi/utils/lock.py diff --git a/kobo/apps/hook/tasks.py b/kobo/apps/hook/tasks.py index 4a300e3f33..860b279dc6 100644 --- a/kobo/apps/hook/tasks.py +++ b/kobo/apps/hook/tasks.py @@ -12,6 +12,7 @@ from django.utils import translation, timezone from django_celery_beat.models import PeriodicTask +from kpi.utils.lock import lock from kpi.utils.log import logging from .constants import HOOK_LOG_FAILED from .models import Hook, HookLog @@ -59,6 +60,7 @@ def retry_all_task(hooklogs_ids): @shared_task +@lock('failure_reports', timeout=600) def failures_reports(): """ Notifies owners' assets by email of hooks failures. diff --git a/kobo/settings/base.py b/kobo/settings/base.py index 4a4eb61f7d..626d087cf1 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -445,9 +445,27 @@ def __init__(self, *args, **kwargs): # 'schedule': timedelta(hours=12) #}, # Schedule every day at midnight UTC. Can be customized in admin section - "send-hooks-failures-reports": { - "task": "kobo.apps.hook.tasks.failures_reports", - "schedule": crontab(hour=0, minute=0), + 'send-hooks-failures-reports': { + 'task': 'kobo.apps.hook.tasks.failures_reports', + 'schedule': crontab(hour=0, minute=0), + 'options': {'queue': 'kpi_queue'} + }, + # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section + 'clean-orphans': { + 'task': 'kpi.tasks.clean_orphans', + 'schedule': crontab(hour=4, minute=0, day_of_week=6), + 'options': {'queue': 'kpi_queue'} + }, + # Schedule every Friday at 4:00 AM UTC. Can be customized in admin section + 'delete-asset-snapshots': { + 'task': 'kpi.tasks.delete_asset_snapshots', + 'schedule': crontab(hour=4, minute=0, day_of_week=5), + 'options': {'queue': 'kpi_queue'} + }, + # Schedule every Friday at 5:00 AM UTC. Can be customized in admin section + 'delete-import-tasks': { + 'task': 'kpi.tasks.delete_import_tasks', + 'schedule': crontab(hour=5, minute=0, day_of_week=5), 'options': {'queue': 'kpi_queue'} }, } @@ -714,6 +732,8 @@ def __init__(self, *args, **kwargs): SESSION_ENGINE = "redis_sessions.session" -SESSION_REDIS = RedisHelper.config(default="redis://redis_cache:6380/2") +SESSION_REDIS = RedisHelper.session_config(default="redis://redis_cache:6380/2") + +LOCK_REDIS = RedisHelper.lock_config(default="redis://redis_cache:6380/3") TESTING = False diff --git a/kpi/management/commands/delete_assets_snapshots.py b/kpi/management/commands/delete_asset_snapshots.py similarity index 96% rename from kpi/management/commands/delete_assets_snapshots.py rename to kpi/management/commands/delete_asset_snapshots.py index 1d1c51a1c4..e7e6599c29 100644 --- a/kpi/management/commands/delete_assets_snapshots.py +++ b/kpi/management/commands/delete_asset_snapshots.py @@ -12,7 +12,7 @@ class Command(DeleteBaseCommand): - help = "Deletes assets snapshots" + help = "Deletes asset snapshots" def _prepare_delete_queryset(self, **options): days = options["days"] diff --git a/kpi/tasks.py b/kpi/tasks.py index 38797d20ee..4566d21f6c 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -5,9 +5,11 @@ from django.core.management import call_command from kpi.models import ImportTask, ExportTask +from kpi.utils.lock import lock @shared_task +@lock(key='update_search_index', timeout=3600) def update_search_index(): call_command('update_index', using=['default',], remove=True) @@ -25,6 +27,7 @@ def export_in_background(export_task_uid): @shared_task +@lock(key='sync_kobocat_xforms', timeout=3600 * 24) def sync_kobocat_xforms(username=None, quiet=True): call_command('sync_kobocat_xforms', username=username, quiet=quiet) @@ -32,3 +35,22 @@ def sync_kobocat_xforms(username=None, quiet=True): @shared_task def import_survey_drafts_from_dkobo(**kwargs): call_command('import_survey_drafts_from_dkobo', **kwargs) + + +@shared_task +@lock(key='clean_orphans', timeout=60) +def clean_orphans(): + #ToDo Create Management command + pass + + +@shared_task +@lock(key='delete_asset_snapshots', timeout=60) +def delete_asset_snapshots(): + call_command('delete_asset_snapshots') + + +@shared_task +@lock(key='delete_import_tasks', timeout=60) +def delete_import_tasks(): + call_command('delete_import_tasks') diff --git a/kpi/utils/lock.py b/kpi/utils/lock.py new file mode 100644 index 0000000000..699c50296a --- /dev/null +++ b/kpi/utils/lock.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import os +from functools import wraps + +import redis +from django.conf import settings + + +REDIS_LOCK_CLIENT = redis.Redis(**settings.LOCK_REDIS) + + +def lock(key='', timeout=None): + + def _lock(func): + @wraps(func) + def wrapper(*args, **kwargs): + ret_value = None + have_lock = False + prefix = os.getenv('REDIS_KPI_LOCK_PREFIX', 'kpi-lock') + key_ = '{}:{}'.format(prefix, key) + lock_ = REDIS_LOCK_CLIENT.lock(key_, timeout=timeout) + try: + have_lock = lock_.acquire(blocking=False) + if have_lock: + ret_value = func(*args, **kwargs) + finally: + if have_lock: + lock_.release() + + return ret_value + return wrapper + return _lock diff --git a/kpi/utils/redis_helper.py b/kpi/utils/redis_helper.py index 8615141e66..d464e16b9a 100644 --- a/kpi/utils/redis_helper.py +++ b/kpi/utils/redis_helper.py @@ -16,26 +16,56 @@ class RedisHelper(object): """ @staticmethod - def config(default=None): + def config(url_variable, default=None): """ - Parses `REDIS_SESSION_URL` environment variable to return a dict with - expected attributes for django redis session. + Parses `url_variable` environment variable to return a dict with + expected attributes for redis clients. :return: dict """ - redis_connection_url = os.getenv("REDIS_SESSION_URL", default) - match = re.match(r"redis://(:(?P[^@]*)@)?(?P[^:]+):(?P\d+)(/(?P\d+))?", - redis_connection_url) + redis_connection_url = os.getenv(url_variable, default) + match = re.match(r'redis://(:(?P[^@]*)@)?(?P[^:]+):(?P\d+)(/(?P\d+))?', + redis_connection_url) if not match: - raise ImproperlyConfigured("Could not parse Redis session URL. Please verify 'REDIS_SESSION_URL' value") + raise ImproperlyConfigured( + "Could not parse `{}`. Please verify your settings.".format( + url_variable) + ) redis_connection_dict = { - "host": match.group("host"), - "port": match.group("port"), - "db": match.group("index") or 0, - "password": match.group("password"), - "prefix": os.getenv("REDIS_SESSION_PREFIX", "session"), - "socket_timeout": os.getenv("REDIS_SESSION_SOCKET_TIMEOUT", 1), + 'host': match.group('host'), + 'port': match.group('port'), + 'db': match.group('index') or 0, + 'password': match.group('password') } return redis_connection_dict + + @classmethod + def session_config(cls, default=None): + """ + Parses `REDIS_SESSION_URL` environment variable to return a dict + for django redis session. + + :return: dict + """ + + redis_connection_dict = cls.config('REDIS_SESSION_URL', default) + redis_connection_dict.update({ + 'prefix': os.getenv('REDIS_SESSION_PREFIX', 'session'), + 'socket_timeout': os.getenv('REDIS_SESSION_SOCKET_TIMEOUT', 1), + }) + return redis_connection_dict + + @classmethod + def lock_config(cls, default=None): + """ + Parses `REDIS_LOCK_URL` environment variable to return a dict with + expected attributes for lock mechanism based on redis. + + :return: dict + """ + + redis_connection_dict = cls.config('REDIS_LOCK_URL', default) + + return redis_connection_dict From 75bfdcd88c443508bf0ecd68a1e2d001701fc336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 2 Oct 2019 12:46:57 -0400 Subject: [PATCH 06/14] Standardize removal management command names --- kobo/settings/base.py | 8 ++++---- ...et_snapshots.py => remove_asset_snapshots.py} | 6 +++--- ...te_base_command.py => remove_base_command.py} | 6 +++--- ...te_import_tasks.py => remove_import_tasks.py} | 6 +++--- kpi/tasks.py | 16 ++++++++-------- 5 files changed, 21 insertions(+), 21 deletions(-) rename kpi/management/commands/{delete_asset_snapshots.py => remove_asset_snapshots.py} (89%) rename kpi/management/commands/{delete_base_command.py => remove_base_command.py} (95%) rename kpi/management/commands/{delete_import_tasks.py => remove_import_tasks.py} (78%) diff --git a/kobo/settings/base.py b/kobo/settings/base.py index 626d087cf1..c00cec4ccd 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -451,20 +451,20 @@ def __init__(self, *args, **kwargs): 'options': {'queue': 'kpi_queue'} }, # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section - 'clean-orphans': { - 'task': 'kpi.tasks.clean_orphans', + 'remove-s3-orphans': { + 'task': 'kpi.tasks.remove_s3_orphans', 'schedule': crontab(hour=4, minute=0, day_of_week=6), 'options': {'queue': 'kpi_queue'} }, # Schedule every Friday at 4:00 AM UTC. Can be customized in admin section 'delete-asset-snapshots': { - 'task': 'kpi.tasks.delete_asset_snapshots', + 'task': 'kpi.tasks.remove_asset_snapshots', 'schedule': crontab(hour=4, minute=0, day_of_week=5), 'options': {'queue': 'kpi_queue'} }, # Schedule every Friday at 5:00 AM UTC. Can be customized in admin section 'delete-import-tasks': { - 'task': 'kpi.tasks.delete_import_tasks', + 'task': 'kpi.tasks.remove_import_tasks', 'schedule': crontab(hour=5, minute=0, day_of_week=5), 'options': {'queue': 'kpi_queue'} }, diff --git a/kpi/management/commands/delete_asset_snapshots.py b/kpi/management/commands/remove_asset_snapshots.py similarity index 89% rename from kpi/management/commands/delete_asset_snapshots.py rename to kpi/management/commands/remove_asset_snapshots.py index e7e6599c29..96fe651dc4 100644 --- a/kpi/management/commands/delete_asset_snapshots.py +++ b/kpi/management/commands/remove_asset_snapshots.py @@ -6,13 +6,13 @@ from django.db.models import Max from django.utils import timezone -from .delete_base_command import DeleteBaseCommand +from .remove_base_command import RemoveBaseCommand from kpi.models import AssetSnapshot -class Command(DeleteBaseCommand): +class Command(RemoveBaseCommand): - help = "Deletes asset snapshots" + help = "Removes asset snapshots" def _prepare_delete_queryset(self, **options): days = options["days"] diff --git a/kpi/management/commands/delete_base_command.py b/kpi/management/commands/remove_base_command.py similarity index 95% rename from kpi/management/commands/delete_base_command.py rename to kpi/management/commands/remove_base_command.py index b541f9aac4..f272a5e74a 100644 --- a/kpi/management/commands/delete_base_command.py +++ b/kpi/management/commands/remove_base_command.py @@ -7,14 +7,14 @@ from django.db import transaction, connection -class DeleteBaseCommand(BaseCommand): +class RemoveBaseCommand(BaseCommand): def __init__(self, stdout=None, stderr=None, no_color=False): - super(DeleteBaseCommand, self).__init__(stdout=stdout, stderr=stderr, no_color=no_color) + super(RemoveBaseCommand, self).__init__(stdout=stdout, stderr=stderr, no_color=no_color) self._model = None def add_arguments(self, parser): - super(DeleteBaseCommand, self).add_arguments(parser) + super(RemoveBaseCommand, self).add_arguments(parser) parser.add_argument( "--days", default=90, diff --git a/kpi/management/commands/delete_import_tasks.py b/kpi/management/commands/remove_import_tasks.py similarity index 78% rename from kpi/management/commands/delete_import_tasks.py rename to kpi/management/commands/remove_import_tasks.py index 3fddc172b9..42fdc46c0e 100644 --- a/kpi/management/commands/delete_import_tasks.py +++ b/kpi/management/commands/remove_import_tasks.py @@ -5,13 +5,13 @@ from django.utils import timezone -from .delete_base_command import DeleteBaseCommand +from .remove_base_command import RemoveBaseCommand from kpi.models import ImportTask -class Command(DeleteBaseCommand): +class Command(RemoveBaseCommand): - help = "Deletes import tasks" + help = "Removes import tasks" def _prepare_delete_queryset(self, **options): days = options["days"] diff --git a/kpi/tasks.py b/kpi/tasks.py index 4566d21f6c..07bd1a96bf 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -38,19 +38,19 @@ def import_survey_drafts_from_dkobo(**kwargs): @shared_task -@lock(key='clean_orphans', timeout=60) -def clean_orphans(): +@lock(key='remove_s3_orphans', timeout=60) +def remove_s3_orphans(): #ToDo Create Management command pass @shared_task -@lock(key='delete_asset_snapshots', timeout=60) -def delete_asset_snapshots(): - call_command('delete_asset_snapshots') +@lock(key='remove_asset_snapshots', timeout=60) +def remove_asset_snapshots(): + call_command('remove_asset_snapshots') @shared_task -@lock(key='delete_import_tasks', timeout=60) -def delete_import_tasks(): - call_command('delete_import_tasks') +@lock(key='remove_import_tasks', timeout=60) +def remove_import_tasks(): + call_command('remove_import_tasks') From 7fd0228c5997e2cec68cff8ed54590a35aa75da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 2 Oct 2019 15:20:12 -0400 Subject: [PATCH 07/14] Added remove s3 orphans management command --- kpi/management/commands/remove_s3_orphans.py | 154 +++++++++++++++++++ kpi/tasks.py | 3 +- kpi/utils/extended_s3boto_storage.py | 19 +++ 3 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 kpi/management/commands/remove_s3_orphans.py create mode 100644 kpi/utils/extended_s3boto_storage.py diff --git a/kpi/management/commands/remove_s3_orphans.py b/kpi/management/commands/remove_s3_orphans.py new file mode 100644 index 0000000000..5aa2dfbdbc --- /dev/null +++ b/kpi/management/commands/remove_s3_orphans.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# vim: ai ts=4 sts=4 et sw=4 coding=utf-8 +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +import codecs +import re +import sys +import time + +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from django.core.files.storage import get_storage_class +from django.utils.translation import ugettext as _ + +from kpi.models.import_export_task import ExportTask + + +# S3 Monkey Patch +from boto import handler +from boto.resultset import ResultSet +from boto.s3.bucket import Bucket +import xml.sax +import xml.sax.saxutils + + +def _get_all(self, element_map, initial_query_string='', + headers=None, **params): + query_args = self._get_all_query_args( + params, + initial_query_string=initial_query_string + ) + response = self.connection.make_request('GET', self.name, + headers=headers, + query_args=query_args) + body = response.read() + + if response.status == 200: + rs = ResultSet(element_map) + h = handler.XmlHandler(rs, self) + try: + xml.sax.parseString(fix_bad_characters(body), h) + except Exception as e: + print("XML Parsing Error - {}".format(str(e))) + error_filename = "/srv/logs/s3_body_error-{}.xml".format(str(int(time.time()))) + with open(error_filename, "w") as xmlfile_error: + xmlfile_error.write("{}\n".format(str(e))) + xmlfile_error.write(body) + raise Exception(str(e)) + return rs + else: + raise self.connection.provider.storage_response_error( + response.status, response.reason, body) + + +def fix_bad_characters(str_): + + try: + str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_) + except Exception as e: + # Try to force unicode + str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", unicode(str_, "utf-8")) + str_ = str_.encode("utf-8") + return str_ + + +class Command(BaseCommand): + help = _('Removes orphan files in S3') + + def add_arguments(self, parser): + super(Command, self).add_arguments(parser) + + parser.add_argument( + "--dry-run", + action='store_true', + default=False, + help="Do not delete files", + ) + + parser.add_argument( + "--log-files", + action='store_true', + default=True, + help="Save deleted files to a CSV", + ) + + def handle(self, *args, **kwargs): + + Bucket._get_all = _get_all + + dry_run = kwargs['dry_run'] + log_files = kwargs['log_files'] + + self._s3 = get_storage_class('kpi.utils.extended_s3boto_storage.ExtendedS3BotoStorage')() + all_files = self._s3.bucket.list() + size_to_reclaim = 0 + orphans = 0 + + now = time.time() + csv_filepath = '/srv/logs/orphan_files-{}.csv'.format(int(now)) + + print('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME)) + if dry_run: + print('Dry run mode activated') + if log_files: + print('CSV: {}'.format(csv_filepath)) + + if log_files: + with open(csv_filepath, "w") as csv: + csv.write("type,filename,filesize\n") + + for f in all_files: + try: + filename = f.name + if filename[-1] != "/": + # KPI Exports + if re.match(r"[^\/]*\/exports\/.+", filename): + if not ExportTask.objects.filter(result=filename).exists(): + filesize = f.size + orphans += 1 + size_to_reclaim += filesize + if log_files: + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{},{}\n".format("exports", filename, filesize)) + csv.close() + if not dry_run: + self.delete(f) + + if time.time() - now >= 5 * 60: + print("[{}] Still alive...".format(str(int(time.time())))) + now = time.time() + + except Exception as e: + print("ERROR - {}".format(str(e))) + sys.exit(-1) + + print("Orphans: {}".format(orphans)) + print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) + + def delete(self, file_object): + try: + print("File {} does not exist in DB".format(file_object.name).encode('utf-8')) + self._s3.delete_all(file_object.name) + except Exception as e: + print("ERROR - Could not delete file {} - Reason {}".format( + file_object.name, + str(e))) + + @staticmethod + def sizeof_fmt(num, suffix='B'): + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Yi', suffix) diff --git a/kpi/tasks.py b/kpi/tasks.py index 07bd1a96bf..9a4ae96362 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -40,8 +40,7 @@ def import_survey_drafts_from_dkobo(**kwargs): @shared_task @lock(key='remove_s3_orphans', timeout=60) def remove_s3_orphans(): - #ToDo Create Management command - pass + call_command('remove_s3_orphans') @shared_task diff --git a/kpi/utils/extended_s3boto_storage.py b/kpi/utils/extended_s3boto_storage.py new file mode 100644 index 0000000000..0b37ed7e6c --- /dev/null +++ b/kpi/utils/extended_s3boto_storage.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from storages.backends.s3boto import S3BotoStorage + + +class ExtendedS3BotoStorage(S3BotoStorage): + + def delete_all(self, name): + """ + Delete the key object and all its versions + :param name: str. S3 key (i.e. path to the file) + """ + name = self._normalize_name(self._clean_name(name)) + self.bucket.delete_key(self._encode_name(name)) + + # Delete all previous versions + for versioned_key in self.bucket.list_versions(prefix=name): + self.bucket.delete_key(versioned_key.name, version_id=versioned_key.version_id) From 893664e936890f2621d84f72f33256256b09fd38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 2 Oct 2019 15:58:13 -0400 Subject: [PATCH 08/14] Disabled periodic task by default --- kobo/settings/base.py | 16 ++++++++++------ kpi/tasks.py | 6 +++--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/kobo/settings/base.py b/kobo/settings/base.py index c00cec4ccd..e3bd235318 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -448,25 +448,29 @@ def __init__(self, *args, **kwargs): 'send-hooks-failures-reports': { 'task': 'kobo.apps.hook.tasks.failures_reports', 'schedule': crontab(hour=0, minute=0), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section 'remove-s3-orphans': { 'task': 'kpi.tasks.remove_s3_orphans', 'schedule': crontab(hour=4, minute=0, day_of_week=6), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, # Schedule every Friday at 4:00 AM UTC. Can be customized in admin section - 'delete-asset-snapshots': { + 'remove-asset-snapshots': { 'task': 'kpi.tasks.remove_asset_snapshots', 'schedule': crontab(hour=4, minute=0, day_of_week=5), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, # Schedule every Friday at 5:00 AM UTC. Can be customized in admin section - 'delete-import-tasks': { + 'remove-import-tasks': { 'task': 'kpi.tasks.remove_import_tasks', 'schedule': crontab(hour=5, minute=0, day_of_week=5), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_queue'}, + 'enabled': False, }, } diff --git a/kpi/tasks.py b/kpi/tasks.py index 9a4ae96362..ce652ced90 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -38,18 +38,18 @@ def import_survey_drafts_from_dkobo(**kwargs): @shared_task -@lock(key='remove_s3_orphans', timeout=60) +@lock(key='remove_s3_orphans', timeout=3600) def remove_s3_orphans(): call_command('remove_s3_orphans') @shared_task -@lock(key='remove_asset_snapshots', timeout=60) +@lock(key='remove_asset_snapshots', timeout=3600) def remove_asset_snapshots(): call_command('remove_asset_snapshots') @shared_task -@lock(key='remove_import_tasks', timeout=60) +@lock(key='remove_import_tasks', timeout=3600) def remove_import_tasks(): call_command('remove_import_tasks') From d192fac088fa019d209e7afe14c31cead1c34064 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Thu, 24 Oct 2019 12:34:30 -0300 Subject: [PATCH 09/14] Replaced print() with self.stdout.write(), added some comments --- kpi/management/commands/remove_s3_orphans.py | 38 ++++++++++++++------ kpi/utils/lock.py | 7 +++- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/kpi/management/commands/remove_s3_orphans.py b/kpi/management/commands/remove_s3_orphans.py index 5aa2dfbdbc..fa7ad00c57 100644 --- a/kpi/management/commands/remove_s3_orphans.py +++ b/kpi/management/commands/remove_s3_orphans.py @@ -25,6 +25,12 @@ def _get_all(self, element_map, initial_query_string='', headers=None, **params): + """ + The purpose of this method is to be used to monkey-patch + `boto.s3.bucket.Bucket._get_all()`. The original doesn't handle + correctly bad characters and crashes because of `xml.sax.parseString` + which can't parse `body` as valid `XML`. + """ query_args = self._get_all_query_args( params, initial_query_string=initial_query_string @@ -40,7 +46,7 @@ def _get_all(self, element_map, initial_query_string='', try: xml.sax.parseString(fix_bad_characters(body), h) except Exception as e: - print("XML Parsing Error - {}".format(str(e))) + self.stdout.write("XML Parsing Error - {}".format(str(e))) error_filename = "/srv/logs/s3_body_error-{}.xml".format(str(int(time.time()))) with open(error_filename, "w") as xmlfile_error: xmlfile_error.write("{}\n".format(str(e))) @@ -53,7 +59,12 @@ def _get_all(self, element_map, initial_query_string='', def fix_bad_characters(str_): - + """ + Replace unknown/bad characters `&...;` with `&...;`. + Except `'`, `", `<`, `>` and `&` + Example: + `&foo;` becomes `&foo;` but `<` stays `<` + """ try: str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_) except Exception as e: @@ -98,11 +109,16 @@ def handle(self, *args, **kwargs): now = time.time() csv_filepath = '/srv/logs/orphan_files-{}.csv'.format(int(now)) - print('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME)) + if not settings.AWS_STORAGE_BUCKET_NAME: + self.stdout.write('`AWS_STORAGE_BUCKET_NAME` is not set. ' + 'Please check your settings') + sys.exit(1) + + self.stdout.write('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME)) if dry_run: - print('Dry run mode activated') + self.stdout.write('Dry run mode activated') if log_files: - print('CSV: {}'.format(csv_filepath)) + self.stdout.write('CSV: {}'.format(csv_filepath)) if log_files: with open(csv_filepath, "w") as csv: @@ -126,22 +142,22 @@ def handle(self, *args, **kwargs): self.delete(f) if time.time() - now >= 5 * 60: - print("[{}] Still alive...".format(str(int(time.time())))) + self.stdout.write("[{}] Still alive...".format(str(int(time.time())))) now = time.time() except Exception as e: - print("ERROR - {}".format(str(e))) + self.stdout.write("ERROR - {}".format(str(e))) sys.exit(-1) - print("Orphans: {}".format(orphans)) - print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) + self.stdout.write("Orphans: {}".format(orphans)) + self.stdout.write("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) def delete(self, file_object): try: - print("File {} does not exist in DB".format(file_object.name).encode('utf-8')) + self.stdout.write("File {} does not exist in DB".format(file_object.name).encode('utf-8')) self._s3.delete_all(file_object.name) except Exception as e: - print("ERROR - Could not delete file {} - Reason {}".format( + self.stdout.write("ERROR - Could not delete file {} - Reason {}".format( file_object.name, str(e))) diff --git a/kpi/utils/lock.py b/kpi/utils/lock.py index 699c50296a..e65036fd38 100644 --- a/kpi/utils/lock.py +++ b/kpi/utils/lock.py @@ -11,8 +11,13 @@ REDIS_LOCK_CLIENT = redis.Redis(**settings.LOCK_REDIS) -def lock(key='', timeout=None): +def lock(key, timeout=None): + """ + It tries to acquire a lock to execute the function it decorates. + If the lock is not acquired, the function is silently skipped. + It strongly depends on `redis` because it uses `redis.py:Lock` + """ def _lock(func): @wraps(func) def wrapper(*args, **kwargs): From 154c7a203a3e4437fba52d847e4e0b4363039829 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Wed, 4 Mar 2020 13:22:40 -0500 Subject: [PATCH 10/14] Removed useless tasks related to Whoosh and dkobo --- kpi/tasks.py | 11 ----------- kpi/utils/lock.py | 5 +---- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/kpi/tasks.py b/kpi/tasks.py index b5668d06b6..69440509c5 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -6,12 +6,6 @@ from kpi.utils.lock import lock -@shared_task -@lock(key='update_search_index', timeout=3600) -def update_search_index(): - call_command('update_index', using=['default',], remove=True) - - @shared_task def import_in_background(import_task_uid): import_task = ImportTask.objects.get(uid=import_task_uid) @@ -30,11 +24,6 @@ def sync_kobocat_xforms(username=None, quiet=True): call_command('sync_kobocat_xforms', username=username, quiet=quiet) -@shared_task -def import_survey_drafts_from_dkobo(**kwargs): - call_command('import_survey_drafts_from_dkobo', **kwargs) - - @shared_task @lock(key='remove_s3_orphans', timeout=3600) def remove_s3_orphans(): diff --git a/kpi/utils/lock.py b/kpi/utils/lock.py index e65036fd38..bf8afc8d64 100644 --- a/kpi/utils/lock.py +++ b/kpi/utils/lock.py @@ -1,13 +1,10 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals - +# coding: utf-8 import os from functools import wraps import redis from django.conf import settings - REDIS_LOCK_CLIENT = redis.Redis(**settings.LOCK_REDIS) From 59deab0f4a725a33b8d124951a636560ea99c7e0 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Wed, 4 Mar 2020 13:24:24 -0500 Subject: [PATCH 11/14] Enable REST failure reports by default --- kobo/settings/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kobo/settings/base.py b/kobo/settings/base.py index 89307ba173..b5d016e6b3 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -397,7 +397,7 @@ def __init__(self, *args, **kwargs): 'task': 'kobo.apps.hook.tasks.failures_reports', 'schedule': crontab(hour=0, minute=0), 'options': {'queue': 'kpi_queue'}, - 'enabled': False, + 'enabled': True, }, # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section 'remove-s3-orphans': { From 460502fdc1bbe83f178bfb7535ad7f60f721eb68 Mon Sep 17 00:00:00 2001 From: Alex Dorey Date: Thu, 19 Mar 2020 17:39:17 -0400 Subject: [PATCH 12/14] added a --dry-run option to get info before running the command --- .../commands/remove_base_command.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/kpi/management/commands/remove_base_command.py b/kpi/management/commands/remove_base_command.py index d199bdc918..ec3692aed8 100644 --- a/kpi/management/commands/remove_base_command.py +++ b/kpi/management/commands/remove_base_command.py @@ -2,6 +2,7 @@ import sys from django.core.management.base import BaseCommand +from django.utils import timezone from django.db import transaction, connection @@ -45,6 +46,13 @@ def add_arguments(self, parser): help="Run `VACUUM FULL` instead of `VACUUM`.", ) + parser.add_argument( + "--dry-run", + action='store_true', + default=False, + help="Print out what will be deleted without deleting it", + ) + def handle(self, *args, **options): chunks = options["chunks"] @@ -60,6 +68,23 @@ def handle(self, *args, **options): chunks_counter = 1 total = delete_queryset.count() + if options["dry_run"]: + try: + first = delete_queryset.order_by('date_created').first() + if first: + days_ago = '. Oldest is {} days'.format( + (timezone.now() - first.date_created).days, + ) + else: + days_ago = '' + print("{} items to delete{}".format(total, + days_ago, + )) + except Exception as err: + print("{} items to delete".format(total) + pass + return + for record_id in delete_queryset.values_list("id", flat=True).iterator(): chunked_delete_ids.append(record_id) From 7fa645b84282ed4143c1455c4ec93913ee7204e4 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Thu, 14 May 2020 17:36:48 -0300 Subject: [PATCH 13/14] =?UTF-8?q?Use=20`self.stdout.write()`=C2=A0instead?= =?UTF-8?q?=20of=20`print()`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kpi/management/commands/remove_base_command.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/kpi/management/commands/remove_base_command.py b/kpi/management/commands/remove_base_command.py index ec3692aed8..1235aa5b54 100644 --- a/kpi/management/commands/remove_base_command.py +++ b/kpi/management/commands/remove_base_command.py @@ -77,12 +77,10 @@ def handle(self, *args, **options): ) else: days_ago = '' - print("{} items to delete{}".format(total, - days_ago, - )) + self.stdout.write("{} items to delete{}".format(total, days_ago)) except Exception as err: - print("{} items to delete".format(total) pass + return for record_id in delete_queryset.values_list("id", flat=True).iterator(): @@ -105,12 +103,12 @@ def handle(self, *args, **options): chunks_counter += 1 # Print new line - print("") + self.stdout.write("") if vacuum is True or vacuum_full is True: self._do_vacuum(vacuum_full) - print("Done!") + self.stdout.write("Done!") def _prepare_delete_queryset(self, **options): raise Exception("Must be implemented in child class") @@ -118,9 +116,9 @@ def _prepare_delete_queryset(self, **options): def _do_vacuum(self, full=False): cursor = connection.cursor() if full: - print("Vacuuming (full) table {}...".format(self._model._meta.db_table)) + self.stdout.write("Vacuuming (full) table {}...".format(self._model._meta.db_table)) cursor.execute("VACUUM FULL {}".format(self._model._meta.db_table)) else: - print("Vacuuming table {}...".format(self._model._meta.db_table)) + self.stdout.write("Vacuuming table {}...".format(self._model._meta.db_table)) cursor.execute("VACUUM {}".format(self._model._meta.db_table)) connection.commit() From e895ea59cd90f3f029f1472ea601d3899a0e03ab Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Thu, 14 May 2020 17:51:40 -0300 Subject: [PATCH 14/14] Do not delete (yet) all versions of files on S3 bucket --- kpi/utils/extended_s3boto_storage.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kpi/utils/extended_s3boto_storage.py b/kpi/utils/extended_s3boto_storage.py index 0b37ed7e6c..445fa5397d 100644 --- a/kpi/utils/extended_s3boto_storage.py +++ b/kpi/utils/extended_s3boto_storage.py @@ -15,5 +15,7 @@ def delete_all(self, name): self.bucket.delete_key(self._encode_name(name)) # Delete all previous versions - for versioned_key in self.bucket.list_versions(prefix=name): - self.bucket.delete_key(versioned_key.name, version_id=versioned_key.version_id) + # ToDo Uncomment lines below few months after first run to + # give users some time to notice whether their files have been deleted + # for versioned_key in self.bucket.list_versions(prefix=name): + # self.bucket.delete_key(versioned_key.name, version_id=versioned_key.version_id)