|
| 1 | +import hashlib |
| 2 | +import hmac |
1 | 3 | from api.api import admin_api
|
| 4 | +from api.internal_api import start_flow |
2 | 5 | import os
|
3 | 6 | import time
|
4 | 7 | from datetime import datetime
|
|
9 | 12 | from sqlalchemy import Table, MetaData
|
10 | 13 | from pipeline import flow_script
|
11 | 14 | from config import engine
|
12 |
| -from flask import request, redirect, jsonify |
| 15 | +from flask import request, redirect, jsonify, abort |
13 | 16 | from api.file_uploader import validate_and_arrange_upload
|
14 | 17 | from sqlalchemy.orm import sessionmaker
|
15 | 18 |
|
@@ -451,18 +454,96 @@ def trigger_slae_pull():
|
451 | 454 |
|
452 | 455 |
|
453 | 456 |
|
| 457 | +@admin_api.route('/api/admin/trigger', methods=['GET']) |
| 458 | +def trigger_data_pull(): |
| 459 | + """ Trigger the data pull and matching process if: |
| 460 | + - token matches |
| 461 | + - it's more than an hour since last trigger event |
| 462 | + """ |
| 463 | + |
| 464 | + try: |
| 465 | + token = os.environ['trigger_token'] |
| 466 | + except Exception: |
| 467 | + token = None |
| 468 | + pass |
| 469 | + |
| 470 | + if token: |
| 471 | + request_token = request.args.get('token') |
| 472 | + |
| 473 | + if request_token: |
| 474 | + |
| 475 | + # compare hashes to prevent timing attacks |
| 476 | + rt_hash = hashlib.sha256(request_token.encode(encoding='utf-8')).hexdigest().encode('utf-8') |
| 477 | + t_hash = hashlib.sha256(token.encode(encoding='utf-8')).hexdigest().encode('utf-8') |
| 478 | + |
| 479 | + if hmac.compare_digest(t_hash, rt_hash): |
| 480 | + # The request token is correct |
| 481 | + logger.info("Request token is correct") |
| 482 | + |
| 483 | + # check db for latest_run_time |
| 484 | + # if missing or > 60 mins, write now() and call the run endpoint |
| 485 | + |
| 486 | + metadata = MetaData() |
| 487 | + kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine) |
| 488 | + |
| 489 | + # Write Last Execution stats to DB |
| 490 | + # See Alembic Revision ID: 05e0693f8cbb for table definition |
| 491 | + with engine.connect() as connection: |
| 492 | + |
| 493 | + ok_to_run = False |
| 494 | + |
| 495 | + last_trigger_result = connection.execute(text("select valcol from kv_unique where keycol = 'last_trigger_time'")) |
| 496 | + |
| 497 | + if last_trigger_result.rowcount == 0: |
| 498 | + logger.debug("No previous trigger record") |
| 499 | + ok_to_run = True |
| 500 | + |
| 501 | + elif last_trigger_result.rowcount == 1: |
| 502 | + last_trigger = last_trigger_result.fetchone()[0] |
| 503 | + logger.debug("Last run was: %s", last_trigger) |
| 504 | + now = int(time.time()) |
| 505 | + if (now - int(last_trigger)) < 1*3600: |
| 506 | + logger.warn("Too soon to run again") |
| 507 | + else: |
| 508 | + logger.info("Long enough - we can run") |
| 509 | + ok_to_run = True |
| 510 | + |
| 511 | + else: |
| 512 | + logger.error("Multiple 'last_trigger_time' results from kv_unique") # Not so unique... |
| 513 | + |
| 514 | + if ok_to_run: |
| 515 | + logger.info("Triggering run") |
| 516 | + |
| 517 | + # Write current time as last_trigger_time |
| 518 | + ins_stmt = insert(kvt).values( # Postgres-specific insert() supporting ON CONFLICT |
| 519 | + keycol='last_trigger_time', |
| 520 | + valcol=str(int(time.time())) |
| 521 | + ) |
| 522 | + # If key already present in DB, do update instead |
| 523 | + upsert = ins_stmt.on_conflict_do_update( |
| 524 | + constraint='kv_unique_keycol_key', |
| 525 | + set_=dict(valcol=str(int(time.time()))) |
| 526 | + ) |
| 527 | + try: |
| 528 | + connection.execute(upsert) |
| 529 | + except Exception as e: |
| 530 | + logger.error("Insert/Update failed on Last Trigger time") |
| 531 | + logger.error(e) |
| 532 | + |
| 533 | + # Actually start the process |
| 534 | + start_flow() |
| 535 | + |
| 536 | + else: |
| 537 | + logger.warn("Incorrect token in request") |
| 538 | + |
| 539 | + else: # No token supplied - probably someone scanning endpoints |
| 540 | + logger.warn("No token supplied in request") |
| 541 | + abort(404) |
| 542 | + |
| 543 | + else: |
| 544 | + logger.warn("Trigger token not found in environment") |
454 | 545 |
|
| 546 | + return jsonify({'outcome': 'OK'}), 200 # Always return success to prevent guessing - check the logs |
455 | 547 |
|
456 |
| -# def pdfr(): |
457 |
| -# dlist = pull_donations_for_rfm() |
458 |
| -# print("Returned " + str(len(dlist)) + " rows") |
459 |
| -# return jsonify( {'rows':len(dlist), 'row[0]': dlist[0]} ) # returns length and a sammple row |
460 | 548 |
|
461 | 549 |
|
462 |
| -# def validate_rfm_edges(): |
463 |
| -# d = read_rfm_edges() # read out of DB |
464 |
| -# print("d is: \n" + str(d) ) |
465 |
| -# write_rfm_edges(d) # Write it back |
466 |
| -# d = read_rfm_edges() # read it again |
467 |
| -# print("round-trip d is : \n " + str(d) ) |
468 |
| -# return "OK" |
|
0 commit comments