Skip to content

Commit 9738e96

Browse files
author
Bryan Jen
authored
Merge pull request #45 from davejohnston/FFM-5352
[FFM-5352]: Stream errors cause the SDK to spam ff server requests
2 parents 86832f8 + ed62139 commit 9738e96

File tree

4 files changed

+50
-20
lines changed

4 files changed

+50
-20
lines changed

featureflags/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def run(self):
7575
token=self._auth_token,
7676
config=self._config,
7777
ready=streaming_event,
78+
poller=polling_event,
7879
cluster=self._cluster,
7980
)
8081
self._stream.start()

featureflags/polling.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def __init__(self, client: AuthenticatedClient, config: Config,
2929
def run(self):
3030
if not self.__running:
3131
if self.__config.pull_interval < 60:
32-
log.warning("Pull Interval must be greater than or equal "
33-
"to 60 seconds, was: " +
32+
log.warning("Pull Interval must be greater than or equal to "
33+
"60 seconds, was: " +
3434
str(self.__config.pull_interval) +
3535
" setting to 60")
3636
self.__config.pull_interval = 60
@@ -46,14 +46,16 @@ def run(self):
4646
t2.start()
4747
t1.join()
4848
t2.join()
49-
if not self.__ready.is_set() is True:
50-
log.info("PollingProcessor initialized ok")
51-
if self.__config.enable_stream and \
52-
not self.__stream_ready.is_set():
53-
log.debug('Poller is in pause mode...')
54-
self.__ready.wait()
55-
else:
56-
self.__ready.set()
49+
50+
if self.__config.enable_stream and \
51+
self.__stream_ready.is_set():
52+
log.debug('Poller will be paused because' +
53+
' streaming mode is active')
54+
# Block until ready.set() is called
55+
self.__ready.wait()
56+
log.debug('Poller resuming ')
57+
else:
58+
self.__ready.set()
5759
except Exception as e:
5860
log.exception(
5961
'Error: Exception encountered when polling flags. %s',
@@ -62,6 +64,9 @@ def run(self):
6264

6365
elapsed = time.time() - start_time
6466
if elapsed < self.__config.pull_interval:
67+
log.info("Poller sleeping for " +
68+
(self.__config.pull_interval - elapsed).__str__())
69+
" seconds"
6570
time.sleep(self.__config.pull_interval - elapsed)
6671

6772
def stop(self):

featureflags/streaming.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
1+
import random
22
import threading
3+
import time
34
from threading import Thread
45
from typing import List, Union
56

67
from featureflags.repository import DataProviderInterface
7-
88
from .api.client import AuthenticatedClient
99
from .api.default.get_feature_config_by_identifier import \
1010
sync as get_feature_config
@@ -14,18 +14,23 @@
1414
from .sse_client import SSEClient
1515
from .util import log
1616

17+
BACK_OFF_IN_SECONDS = 5
18+
1719

1820
class StreamProcessor(Thread):
1921
def __init__(self, repository: DataProviderInterface,
2022
client: AuthenticatedClient,
2123
environment_id: str, api_key: str, token: str,
22-
config: Config, ready: threading.Event,
24+
config: Config,
25+
ready: threading.Event,
26+
poller: threading.Event,
2327
cluster: str):
2428

2529
Thread.__init__(self)
2630
self.daemon = True
2731
self._running = False
2832
self._ready = ready
33+
self.poller = poller
2934
self._client = client
3035
self._environment_id = environment_id
3136
self._api_key = api_key
@@ -39,10 +44,14 @@ def run(self):
3944
log.info("Starting StreamingProcessor connecting to uri: " +
4045
self._stream_url)
4146
self._running = True
42-
47+
retries = 0
4348
while self._running:
4449
try:
4550
messages = self._connect()
51+
self.poller.clear() # were streaming now, so tell any poller
52+
# threads calling wait to wait...
53+
self._ready.set()
54+
retries = 0 # reset the retry counter
4655
for msg in messages:
4756
if not self._running:
4857
break
@@ -53,7 +62,17 @@ def run(self):
5362
if self._ready.is_set() is False:
5463
self._ready.set()
5564
except Exception as e:
56-
log.warning("Unexpected error on stream connection: %s", e)
65+
log.error("Unexpected error on stream connection: %s", e)
66+
# Signal the poller than it should start due to stream error.
67+
if self.poller.is_set() is False:
68+
self.poller.set()
69+
70+
# Calculate back of sleep
71+
sleep = (BACK_OFF_IN_SECONDS * 2 ** retries +
72+
random.uniform(0, 1))
73+
log.info(f"retrying stream connection in {sleep.__str__()}s")
74+
time.sleep(sleep)
75+
retries += 1
5776

5877
def _connect(self) -> SSEClient:
5978
return SSEClient(self._stream_url, headers={

tests/unit/test_evaluator.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
from featureflags.evaluations.auth_target import Target
44
from featureflags.evaluations.clause import Clause
5-
from featureflags.evaluations.constants import EQUAL_OPERATOR, STARTS_WITH_OPERATOR
5+
from featureflags.evaluations.constants import EQUAL_OPERATOR, \
6+
STARTS_WITH_OPERATOR
67
from featureflags.evaluations.distribution import Distribution
78
from featureflags.evaluations.enum import FeatureState
89
from featureflags.evaluations.evaluator import Evaluator
@@ -193,10 +194,14 @@ def test_evaluate_clauses(data_provider, target):
193194
)
194195

195196
testcases = [
196-
{"scenario": "Evaluate clauses with no clauses", "input": [], "expected": False},
197-
{"scenario": "Evaluate clauses with 2 Clauses both will match", "input": [clause1, clause2], "expected": True},
198-
{"scenario": "Evaluate clauses with 2 Clauses only 1 match", "input": [clause1, clause3], "expected": True},
199-
{"scenario": "Evaluate clauses with 2 Clauses but no match", "input": [clause3, clause4], "expected": False}
197+
{"scenario": "Evaluate clauses with no clauses",
198+
"input": [], "expected": False},
199+
{"scenario": "Evaluate clauses with 2 Clauses both will match",
200+
"input": [clause1, clause2], "expected": True},
201+
{"scenario": "Evaluate clauses with 2 Clauses only 1 match",
202+
"input": [clause1, clause3], "expected": True},
203+
{"scenario": "Evaluate clauses with 2 Clauses but no match",
204+
"input": [clause3, clause4], "expected": False}
200205
]
201206

202207
for tc in testcases:

0 commit comments

Comments
 (0)