Skip to content

Commit

Permalink
Merge pull request #1502 from GSA/API-1466_Fix_database_inserts
Browse files Browse the repository at this point in the history
API-1466 - Fixing database IntegrityError failures
  • Loading branch information
terrazoon authored Dec 26, 2024
2 parents 0510c4d + 122195d commit 1ff2923
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 34 deletions.
4 changes: 1 addition & 3 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ def check_for_missing_rows_in_completed_jobs():
for row_to_process in missing_rows:
row = recipient_csv[row_to_process.missing_row]
current_app.logger.info(
"Processing missing row: {} for job: {}".format(
row_to_process.missing_row, job.id
)
f"Processing missing row: {row_to_process.missing_row} for job: {job.id}"
)
process_row(row, template, job, job.service, sender_id=sender_id)

Expand Down
57 changes: 29 additions & 28 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from app.errors import TotalRequestsError
from app.notifications.process_notifications import (
get_notification,
notification_exists,
persist_notification,
)
from app.notifications.validators import check_service_over_total_message_limit
Expand All @@ -39,9 +40,7 @@ def process_job(job_id, sender_id=None):
start = utc_now()
job = dao_get_job_by_id(job_id)
current_app.logger.info(
"Starting process-job task for job id {} with status: {}".format(
job_id, job.job_status
)
f"Starting process-job task for job id {job_id} with status: {job.job_status}"
)

if job.job_status != JobStatus.PENDING:
Expand All @@ -57,7 +56,7 @@ def process_job(job_id, sender_id=None):
job.job_status = JobStatus.CANCELLED
dao_update_job(job)
current_app.logger.warning(
"Job {} has been cancelled, service {} is inactive".format(
f"Job {job_id} has been cancelled, service {service.id} is inactive".format(
job_id, service.id
)
)
Expand All @@ -71,9 +70,7 @@ def process_job(job_id, sender_id=None):
)

current_app.logger.info(
"Starting job {} processing {} notifications".format(
job_id, job.notification_count
)
f"Starting job {job_id} processing {job.notification_count} notifications"
)

# notify-api-1495 we are going to sleep periodically to give other
Expand Down Expand Up @@ -229,22 +226,29 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
job = dao_get_job_by_id(job_id)
created_by_id = job.created_by_id

saved_notification = persist_notification(
template_id=notification["template"],
template_version=notification["template_version"],
recipient=notification["to"],
service=service,
personalisation=notification.get("personalisation"),
notification_type=NotificationType.SMS,
api_key_id=None,
key_type=KeyType.NORMAL,
created_at=utc_now(),
created_by_id=created_by_id,
job_id=notification.get("job", None),
job_row_number=notification.get("row_number", None),
notification_id=notification_id,
reply_to_text=reply_to_text,
)
try:
saved_notification = persist_notification(
template_id=notification["template"],
template_version=notification["template_version"],
recipient=notification["to"],
service=service,
personalisation=notification.get("personalisation"),
notification_type=NotificationType.SMS,
api_key_id=None,
key_type=KeyType.NORMAL,
created_at=utc_now(),
created_by_id=created_by_id,
job_id=notification.get("job", None),
job_row_number=notification.get("row_number", None),
notification_id=notification_id,
reply_to_text=reply_to_text,
)
except IntegrityError:
if notification_exists(notification_id):
saved_notification = get_notification(notification_id)

else:
raise

# Kick off sns process in provider_tasks.py
sn = saved_notification
Expand All @@ -258,11 +262,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)

current_app.logger.debug(
"SMS {} created at {} for job {}".format(
saved_notification.id,
saved_notification.created_at,
notification.get("job", None),
)
f"SMS {saved_notification.id} created at {saved_notification.created_at} "
f"for job {notification.get('job', None)}"
)

except SQLAlchemyError as e:
Expand Down
10 changes: 7 additions & 3 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def dao_get_last_date_template_was_used(template_id, service_id):
return last_date


def dao_notification_exists(notification_id) -> bool:
stmt = select(Notification).where(Notification.id == notification_id)
result = db.session.execute(stmt).scalar()
return result is not None


@autocommit
def dao_create_notification(notification):
if not notification.id:
Expand All @@ -86,9 +92,7 @@ def dao_create_notification(notification):
notification.normalised_to = "1"

# notify-api-1454 insert only if it doesn't exist
stmt = select(Notification).where(Notification.id == notification.id)
result = db.session.execute(stmt).scalar()
if result is None:
if not dao_notification_exists(notification.id):
db.session.add(notification)


Expand Down
5 changes: 5 additions & 0 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from app.dao.notifications_dao import (
dao_create_notification,
dao_delete_notifications_by_id,
dao_notification_exists,
get_notification_by_id,
)
from app.enums import KeyType, NotificationStatus, NotificationType
Expand Down Expand Up @@ -153,6 +154,10 @@ def persist_notification(
return notification


def notification_exists(notification_id):
return dao_notification_exists(notification_id)


def send_notification_to_queue_detached(
key_type, notification_type, notification_id, queue=None
):
Expand Down

0 comments on commit 1ff2923

Please sign in to comment.