Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds spacing between lines and fixes constant backoffs duration #7

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,36 @@ def get_params_and_headers(params):
return params, headers


def log_rate_limit(resp):
"""
Prints out the content for the rate limits headers in the response.
"""
if resp.status_code == 429:
for header in [
"X-HubSpot-RateLimit-Daily",
"X-HubSpot-RateLimit-Daily-Remaining",
"X-HubSpot-RateLimit-Interval-Milliseconds",
"X-HubSpot-RateLimit-Max",
"X-HubSpot-RateLimit-Remaining"
]:
LOGGER.info("Header: {}, value: {}".format(
header,
resp.headers.get(header)
))
LOGGER.info("429 response from path: {} - {}".format(
resp.url,
resp.content
))


@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
max_tries=5,
max_tries=10,
jitter=None,
giveup=giveup,
on_giveup=on_giveup,
interval=10)
interval=20)
def request(url, params=None):

params, headers = get_params_and_headers(params)
Expand All @@ -306,6 +328,7 @@ def request(url, params=None):
LOGGER.info("GET %s", req.url)
with metrics.http_request_timer(parse_source_from_url(url)) as timer:
resp = SESSION.send(req)
log_rate_limit(resp)
timer.tags[metrics.Tag.http_status_code] = resp.status_code
if resp.status_code == 403:
raise SourceUnavailableException(resp.content)
Expand Down Expand Up @@ -336,11 +359,11 @@ def lift_properties_and_versions(record):
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
max_tries=5,
max_tries=10,
jitter=None,
giveup=giveup,
on_giveup=on_giveup,
interval=10)
interval=20)
def post_search_endpoint(url, data, params=None):

params, headers = get_params_and_headers(params)
Expand All @@ -353,6 +376,7 @@ def post_search_endpoint(url, data, params=None):
params=params,
headers=headers
)
log_rate_limit(resp)

resp.raise_for_status()

Expand Down Expand Up @@ -442,7 +466,7 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys,
def _sync_contact_vids(catalog, vids, schema, bumble_bee):
if len(vids) == 0:
return

data = request(get_url("contacts_detail"), params={'vid': vids, 'showListMemberships' : True, "formSubmissionMode" : "all"}).json()
time_extracted = utils.now()
mdata = metadata.to_map(catalog.get('metadata'))
Expand All @@ -460,7 +484,7 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee):
def _sync_subscription_types(catalog, subscribers_emails, schema, bumble_bee):
if len(subscribers_emails) == 0:
return

for subscriber_email in subscribers_emails:
record = request(get_url("subscription_types", subscriber_email=subscriber_email)).json()
time_extracted = utils.now()
Expand Down Expand Up @@ -497,7 +521,7 @@ def sync_contacts(STATE, ctx):
while not break_loop:
data = request(get_url("list_contacts_recent", list_id=list_id), params=params).json()
for record in data.get("contacts", []):

if record["addedAt"] >= int(start.timestamp()*1000):
if record['vid'] not in newly_added:
newly_added.append(record["vid"])
Expand All @@ -509,7 +533,7 @@ def sync_contacts(STATE, ctx):
if len(vids) == 100:
_sync_contact_vids(catalog, vids, schema, bumble_bee)
vids = []

if data["has-more"] and not break_loop:
params["vidOffset"] = data["vid-offset"]
params["timeOffset"] = data["time-offset"]
Expand Down