Skip to content

Commit bbd97fc

Browse files
authored
Merge pull request #629 from CodeForPhilly/final-touches
updates to platform message sent to Salesforce
2 parents 7ee6230 + 5036d5e commit bbd97fc

File tree

6 files changed

+104
-77
lines changed

6 files changed

+104
-77
lines changed

src/server/api/API_ingest/shelterluv_people.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
import requests, os
2-
from models import ShelterluvPeople
3-
from config import engine
4-
from sqlalchemy.orm import sessionmaker
1+
import os
2+
import requests
53
import structlog
4+
import time
5+
from sqlalchemy.orm import sessionmaker
6+
7+
from config import engine
8+
from models import ShelterluvPeople
9+
610
logger = structlog.get_logger()
711

812
try:
@@ -23,6 +27,7 @@
2327

2428
TEST_MODE=os.getenv("TEST_MODE") # if not present, has value None
2529
LIMIT = 100
30+
MAX_RETRIES = 10
2631
#################################
2732
# This script is used to fetch data from shelterluv API.
2833
# Please be mindful of your usage.
@@ -41,20 +46,38 @@ def store_shelterluv_people_all():
4146
offset = 0
4247
has_more = True
4348
Session = sessionmaker(engine)
49+
retries = 0
4450

4551
with Session() as session:
4652
logger.debug("Truncating table shelterluvpeople")
47-
4853
session.execute("TRUNCATE TABLE shelterluvpeople")
49-
5054
logger.debug("Start getting shelterluv contacts from people table")
5155

5256
while has_more:
53-
r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset),
54-
headers={"x-api-key": SHELTERLUV_SECRET_TOKEN})
55-
response = r.json()
57+
if retries > MAX_RETRIES:
58+
raise Exception("reached max retries for get store_shelterluv_people_all")
59+
60+
try:
61+
r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset),
62+
headers={"x-api-key": SHELTERLUV_SECRET_TOKEN})
63+
except Exception as e:
64+
logger.error("store_shelterluv_people_all failed with %s, retrying...", e)
65+
retries += 1
66+
continue
67+
68+
if r.status_code != 200:
69+
logger.error("store_shelterluv_people_all %s code, retrying...", r.status_code)
70+
retries += 1
71+
continue
72+
73+
try:
74+
response = r.json()
75+
except Exception as e:
76+
logger.error("store_shelterluv_people_all JSON decode failed with %s", e)
77+
retries += 1
78+
continue
79+
5680
for person in response["people"]:
57-
#todo: Does this need more "null checks"?
5881
session.add(ShelterluvPeople(firstname=person["Firstname"],
5982
lastname=person["Lastname"],
6083
id=person["ID"] if "ID" in person else None,
@@ -69,9 +92,11 @@ def store_shelterluv_people_all():
6992
phone=person["Phone"],
7093
animal_ids=person["Animal_ids"]))
7194
offset += LIMIT
95+
retries = 0
7296
has_more = response["has_more"] if not TEST_MODE else response["has_more"] and offset < 1000
7397
if offset % 1000 == 0:
7498
logger.debug("Reading offset %s", str(offset))
99+
time.sleep(0.2)
75100
session.commit()
76101

77102
logger.debug("Finished getting shelterluv contacts from people table")

src/server/api/API_ingest/sl_animal_events.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import os
33
import posixpath as path
4+
import time
45

56
import structlog
67

@@ -25,6 +26,7 @@
2526

2627
BASE_URL = "http://shelterluv.com/api/"
2728
MAX_COUNT = 100 # Max records the API will return for one call
29+
MAX_RETRY = 10
2830

2931
# Get the API key
3032
try:
@@ -75,8 +77,9 @@
7577

7678
def get_event_count():
7779
"""Test that server is operational and get total event count."""
78-
events = "v1/events&offset=0&limit=1"
80+
events = "v1/events?offset=0&limit=1"
7981
URL = path.join(BASE_URL, events)
82+
logger.info("making call: %s", URL)
8083

8184
try:
8285
response = requests.request("GET", URL, headers=headers)
@@ -85,7 +88,7 @@ def get_event_count():
8588
return -2
8689

8790
if response.status_code != 200:
88-
logger.error("get_event_count ", response.status_code, "code")
91+
logger.error("get_event_count status code: %s", response.status_code)
8992
return -3
9093

9194
try:
@@ -111,30 +114,36 @@ def get_events_bulk():
111114

112115
event_records = []
113116

114-
raw_url = path.join(BASE_URL, "v1/events&offset={0}&limit={1}")
117+
raw_url = path.join(BASE_URL, "v1/events?offset={0}&limit={1}")
115118
offset = 0
116119
limit = MAX_COUNT
117120
more_records = True
121+
retries = 0
118122

119123
while more_records:
120124

125+
if retries > MAX_RETRY:
126+
raise Exception("get_events_bulk failed, max retries reached")
121127
url = raw_url.format(offset, limit)
122128

123129
try:
124130
response = requests.request("GET", url, headers=headers)
125131
except Exception as e:
126-
logger.error("get_events failed with ", e)
127-
return -2
132+
logger.error("get_events_buk failed with %s, retrying...", e)
133+
retries += 1
134+
continue
128135

129136
if response.status_code != 200:
130-
logger.error("get_event_count ", response.status_code, "code")
131-
return -3
137+
logger.error("get_events_bulk %s code, retrying...", response.status_code)
138+
retries += 1
139+
continue
132140

133141
try:
134142
decoded = json.loads(response.text)
135143
except json.decoder.JSONDecodeError as e:
136-
logger.error("get_event_count JSON decode failed with", e)
137-
return -4
144+
logger.error("get_events_bulk JSON decode failed with %s", e)
145+
retries += 1
146+
continue
138147

139148
if decoded["success"]:
140149
for evrec in decoded["events"]:
@@ -143,13 +152,14 @@ def get_events_bulk():
143152

144153
more_records = decoded["has_more"] # if so, we'll make another pass
145154
offset += limit
155+
retries = 0
146156
if offset % 1000 == 0:
147157
logger.debug("Reading offset %s", str(offset))
148158
if TEST_MODE and offset > 1000:
149-
more_records=False # Break out early
150-
159+
more_records=False # Break out early
151160
else:
152161
return -5 # AFAICT, this means URL was bad
162+
time.sleep(0.2)
153163

154164
return event_records
155165

src/server/api/API_ingest/updated_data.py

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,56 +13,47 @@ def get_updated_contact_data():
1313
qry = """ -- Collect latest foster/volunteer dates
1414
select json_agg (upd) as "cd"
1515
from (
16-
select
17-
sf.source_id as "Id" , -- long salesforce string
18-
array_agg(sl.source_id) filter (where sl.source_id is not null) as "Person_Id__c", -- short PAWS-local shelterluv id
16+
select
17+
salesforce.source_id as "contactId",
18+
shelterluv.person_ids as "personIds",
1919
case
20-
when
21-
(extract(epoch from now())::bigint - max(foster_out) < 365*86400) -- foster out in last year
22-
or (extract(epoch from now())::bigint - max(foster_return) < 365*86400) -- foster return
23-
then 'Active'
24-
else 'Inactive'
25-
end as "Foster_Activity__c",
26-
max(foster_out) as "Foster_Start_Date__c",
27-
max(foster_return) as "Foster_End_Date__c",
28-
min(vol.first_date) "First_volunteer_date__c",
29-
max(vol.last_date) "Last_volunteer_date__c",
30-
sum(vol.hours) as "Total_volunteer_hours__c",
31-
array_agg(vc.source_id::integer) filter(where vc.source_id is not null) as "Volgistics_Id__c"
20+
when volgistics.last_shift_date > now() - interval '1 year' then 'Active' else 'InActive'
21+
end as "volunteerStatus",
22+
shelterluv.foster_start as "fosterStartDate",
23+
null as "fosterEndDate",
24+
shelterluv.latest_foster_event as "latestFosterEvent",
25+
volgistics.first_volunteer_date as "firstVolunteerDate",
26+
volgistics.last_shift_date as "lastShiftDate",
27+
volgistics.total_hours as "totalVolunteerHours",
28+
volgistics.volg_ids as "volgisticIds"
3229
from (
33-
select source_id, matching_id from pdp_contacts sf
34-
where sf.source_type = 'salesforcecontacts'
35-
) sf
36-
left join pdp_contacts sl on sl.matching_id = sf.matching_id and sl.source_type = 'shelterluvpeople'
30+
select * from pdp_contacts pc where source_type = 'salesforcecontacts'
31+
) salesforce
3732
left join (
38-
select
39-
person_id,
40-
max(case when event_type=1 then time else null end) * 1000 adopt,
41-
max(case when event_type=2 then time else null end) * 1000 foster_out,
42-
-- max(case when event_type=3 then time else null end) rto,
43-
max(case when event_type=5 then time else null end) * 1000 foster_return
44-
from sl_animal_events
45-
group by person_id
46-
) sle on sle.person_id::text = sl.source_id
47-
left join pdp_contacts vc on vc.matching_id = sf.matching_id and vc.source_type = 'volgistics'
33+
select matching_id, array_agg(distinct v."number"::int) volg_ids, sum(hours) total_hours,
34+
min(from_date) first_volunteer_date, max(from_date) last_shift_date
35+
from volgistics v
36+
left join volgisticsshifts v2 on v2.volg_id::varchar = v.number
37+
inner join pdp_contacts pc on pc.source_id = v.number::varchar and pc.source_type = 'volgistics'
38+
group by matching_id
39+
) volgistics on volgistics.matching_id = salesforce.matching_id
4840
left join (
4941
select
50-
volg_id,
51-
sum(hours) as hours,
52-
extract(epoch from min(from_date)) * 1000 as first_date,
53-
extract(epoch from max(from_date)) * 1000 as last_date
54-
from volgisticsshifts
55-
group by volg_id
56-
) vol on vol.volg_id::text = vc.source_id
57-
where sl.matching_id is not null or vc.matching_id is not null
58-
group by sf.source_id
42+
matching_id, array_agg(distinct p.internal_id) as person_ids,
43+
min(case when event_type in (2,5) then to_timestamp(time) else null end) foster_start,
44+
max(case when event_type in (2,5) then to_timestamp(time) else null end) latest_foster_event
45+
from shelterluvpeople p
46+
left join sl_animal_events sae on sae.person_id::varchar = p.internal_id
47+
inner join pdp_contacts pc on pc.source_id = p.internal_id
48+
group by matching_id
49+
) shelterluv on shelterluv.matching_id = salesforce.matching_id
50+
where volgistics.matching_id is not null or shelterluv.matching_id is not null
5951
) upd;
6052
"""
6153

6254
with Session() as session:
6355
result = session.execute(qry)
6456
sfdata = result.fetchone()[0]
6557
if sfdata:
66-
logger.debug(sfdata)
6758
logger.debug("Query for Salesforce update returned %d records", len(sfdata))
6859
return sfdata

src/server/api/internal_api.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,9 @@ def user_test2():
3333

3434
@internal_api.route("/api/internal/ingestRawData", methods=["GET"])
3535
def ingest_raw_data():
36-
try:
37-
ingest_sources_from_api.start()
38-
except Exception as e:
39-
logger.error(e)
40-
36+
ingest_sources_from_api.start()
4137
return jsonify({'outcome': 'OK'}), 200
4238

43-
4439
@internal_api.route("/api/internal/get_updated_data", methods=["GET"])
4540
def get_contact_data():
4641
logger.debug("Calling get_updated_contact_data()")
@@ -49,7 +44,11 @@ def get_contact_data():
4944
logger.debug("Returning %d contact records", len(contact_json))
5045
else:
5146
logger.debug("No contact records found")
52-
return jsonify({'outcome': 'OK'}), 200
47+
return jsonify({
48+
'outcome': 'OK',
49+
'data': contact_json,
50+
'length': len(contact_json) if contact_json else 0
51+
}), 200
5352

5453

5554
@internal_api.route("/api/internal/start_flow", methods=["GET"])

src/server/config.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1+
import logging
12
import os
23
import sys
3-
import sqlalchemy as db
4-
import models
5-
from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES
64

7-
import logging
5+
import sqlalchemy as db
86
import structlog
97
from structlog.processors import CallsiteParameter
108

9+
from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES
1110

1211
# structlog setup for complete app
1312

@@ -17,7 +16,7 @@
1716
structlog.processors.add_log_level,
1817
structlog.processors.StackInfoRenderer(),
1918
structlog.dev.set_exc_info,
20-
structlog.processors.TimeStamper(fmt=None, utc=True ),
19+
structlog.processors.TimeStamper(fmt="iso", utc=True),
2120
structlog.processors.CallsiteParameterAdder(
2221
[
2322
CallsiteParameter.FILENAME,
@@ -67,7 +66,7 @@
6766
+ POSTGRES_DATABASE
6867
)
6968

70-
engine = db.create_engine(DB)
69+
engine = db.create_engine(DB, pool_pre_ping=True)
7170

7271
# Run Alembic to create managed tables
7372
# from alembic.config import Config

src/server/pub_sub/salesforce_message_publisher.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ def send_pipeline_update_messages(contacts_list):
6060
schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id
6161
schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json
6262

63-
payloads = []
63+
64+
batches = 0
6465
while len(contacts_list) > 0:
6566
if len(contacts_list) > BATCH_SIZE:
6667
current_batch = contacts_list[:BATCH_SIZE]
@@ -85,9 +86,11 @@ def send_pipeline_update_messages(contacts_list):
8586
"schema_id": schema_id,
8687
"payload": buf.getvalue()
8788
}
88-
payloads.append(payload)
8989

90-
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=payloads), metadata=auth_meta_data)
90+
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data)
91+
logger.info('Sent %s contacts in message', len(current_batch))
92+
batches = batches + 1
93+
9194

92-
logger.info("%s total pipeline update messages sent", len(payloads))
95+
logger.info('completed sending platform messages, %s messages sent', batches)
9396

0 commit comments

Comments
 (0)