Skip to content

Commit 22cbe7b

Browse files
#2368 - ChannelBroker updates timeout_at upon contact
- Add `current_delay` into Respondent's sessions, to track the minutes we're currently waiting until the next timeout. - Fixed a bug in which SMS replies during the last contact attempt refreshed the timeout_at with the fallback_delay instead of the last delay used. - Improved the test suite by better synchronising the test code with the other processes. - Deleted best_timeout_option helper and integrated it into consume_retry were it belongs - Remove ChannelBroker.has_delivery_confirmation? and Runtime.Channel.has_delivery_confirmation? since they could be pattern-matched instead of messaging processes. - Fixed a bug in the RetriesHistogram that made the last SMS contact appear as the fallback_delay instead of the last delay used. Fixes #2368
1 parent 7d93bc5 commit 22cbe7b

19 files changed

Lines changed: 2472 additions & 1978 deletions

lib/ask.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ defmodule Ask do
5757
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
5858
# for other strategies and supported options
5959
opts = [strategy: :one_for_one, name: Ask.Supervisor]
60-
{:ok, _} = Logger.add_backend(Sentry.LoggerBackend)
60+
if Mix.env() != :test do
61+
{:ok, _} = Logger.add_backend(Sentry.LoggerBackend)
62+
end
6163

6264
Supervisor.start_link(children, opts)
6365
end

lib/ask/channel.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ defmodule Ask.Channel do
119119
end
120120
end
121121

122+
# Returns whether the channel sends delivery confirmations callbacks or not.
123+
def has_delivery_confirmation?(%{type: "ivr"}), do: false
124+
def has_delivery_confirmation?(%{type: "sms"}), do: true
125+
122126
defp validate_patterns(changeset) do
123127
changeset
124128
|> validate_patterns_not_empty

lib/ask/retries_histogram.ex

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,12 @@ defmodule Ask.RetriesHistogram do
228228

229229
defp end_flow(survey, flow) do
230230
fallback_delay = survey |> Survey.fallback_delay() |> minutes_to_hours()
231-
[%{type: type}] = Enum.take(flow, -1)
231+
[%{type: type, delay: last_delay}] = Enum.take(flow, -1)
232232

233-
end_flow_delay(type, fallback_delay)
233+
# if last SMS delay is 0h, there was no retry - we default to the fallback_delay
234+
# See Session.current_timeout/2 for reference
235+
delay = if last_delay == 0, do: fallback_delay, else: last_delay
236+
end_flow_delay(type, delay)
234237
end
235238

236239
defp end_flow_delay("ivr", _), do: []

lib/ask/runtime/channel.ex

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@ defprotocol Ask.Runtime.Channel do
33

44
@type t :: map()
55

6-
# Returns whether the channel sends delivery confirmations callbacks or not.
7-
@spec has_delivery_confirmation?(t()) :: boolean
8-
def has_delivery_confirmation?(channel)
9-
106
# Configure the channel to start communicating with Surveda. For example setup
117
# the callback URL on the remote service.
128
@spec prepare(t()) :: any()

lib/ask/runtime/channel_broker.ex

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# NOTE: channels without channel_id (used in some unit tests) share a single process (channel_id: 0)
22
defmodule Ask.Runtime.ChannelBroker do
3-
alias Ask.Runtime.{ChannelBrokerSupervisor, SurveyLogger}
3+
alias Ask.Runtime.{ChannelBrokerSupervisor, Session, SurveyLogger}
44
alias Ask.Runtime.ChannelBrokerAgent, as: Agent
55
alias Ask.Runtime.ChannelBrokerState, as: State
6-
alias Ask.{Channel, Logger, Respondent, Repo, Stats}
6+
alias Ask.{Channel, Logger, Respondent, Repo, Stats, SystemTime}
77
import Ecto.Query
88
use GenServer
99

@@ -32,10 +32,6 @@ defmodule Ask.Runtime.ChannelBroker do
3232
cast(channel_id, {:setup, channel_type, respondent, token, not_before, not_after})
3333
end
3434

35-
def has_delivery_confirmation?(channel_id) do
36-
call(channel_id, {:has_delivery_confirmation?})
37-
end
38-
3935
def ask(channel_id, channel_type, respondent, token, reply, not_before \\ nil, not_after \\ nil) do
4036
cast(channel_id, {:ask, channel_type, respondent, token, reply, not_before, not_after})
4137
end
@@ -286,14 +282,6 @@ defmodule Ask.Runtime.ChannelBroker do
286282
{:reply, reply, new_state, State.process_timeout(new_state)}
287283
end
288284

289-
@impl true
290-
def handle_call({:has_delivery_confirmation?}, _from, state) do
291-
debug("handle_call[has_delivery_confirmation?]", channel_id: state.channel_id)
292-
new_state = refresh_runtime_channel(state)
293-
reply = Ask.Runtime.Channel.has_delivery_confirmation?(new_state.runtime_channel)
294-
{:reply, reply, new_state, State.process_timeout(new_state)}
295-
end
296-
297285
if Mix.env() == :test do
298286
@impl true
299287
def handle_call({:has_queued_message?, respondent_id}, _from, state) do
@@ -458,7 +446,7 @@ defmodule Ask.Runtime.ChannelBroker do
458446
end
459447

460448
defp log_contact(status, respondent) do
461-
session = respondent.session |> Ask.Runtime.Session.load()
449+
session = respondent.session |> Session.load()
462450
SurveyLogger.log(
463451
respondent.survey_id,
464452
session.flow.mode,
@@ -478,6 +466,8 @@ defmodule Ask.Runtime.ChannelBroker do
478466
state.runtime_channel
479467
|> Ask.Runtime.Channel.setup(respondent, token, not_before, not_after)
480468

469+
update_respondent_timeout(respondent)
470+
481471
case response do
482472
{:ok, %{verboice_call_id: verboice_call_id}} ->
483473
channel_state = %{"verboice_call_id" => verboice_call_id}
@@ -501,6 +491,8 @@ defmodule Ask.Runtime.ChannelBroker do
501491
state.runtime_channel
502492
|> Ask.Runtime.Channel.ask(respondent, token, reply, state.channel_id)
503493

494+
update_respondent_timeout(respondent)
495+
504496
case result do
505497
{:ok, %{nuntium_token: nuntium_token}} ->
506498
channel_state = %{"nuntium_token" => nuntium_token}
@@ -513,6 +505,16 @@ defmodule Ask.Runtime.ChannelBroker do
513505
end
514506
end
515507

508+
if Mix.env() == :test do
509+
defp update_respondent_timeout(%{session: nil} = respondent), do: respondent
510+
end
511+
512+
defp update_respondent_timeout(respondent) do
513+
session = respondent.session |> Session.load()
514+
timeout_at = Respondent.next_actual_timeout(respondent, session.current_delay, SystemTime.time().now)
515+
Respondent.update(respondent, %{timeout_at: timeout_at}, true)
516+
end
517+
516518
# Don't schedule automatic GC runs in tests.
517519
if Mix.env() == :test do
518520
defp schedule_GC(_), do: nil

lib/ask/runtime/nuntium_channel.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,6 @@ defmodule Ask.Runtime.NuntiumChannel do
469469

470470
def message_inactive?(_, _), do: false
471471

472-
def has_delivery_confirmation?(_), do: true
473472
def has_queued_message?(_, _), do: false
474473
def message_expired?(_, _), do: false
475474
def cancel_message(_, _), do: :ok

lib/ask/runtime/retries_histogram.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ defmodule Ask.Runtime.RetriesHistogram do
4141
%Session{} = session = Session.load(respondent.session)
4242

4343
session =
44-
reallocate_respondent(session, respondent, false, Session.current_timeout(session))
44+
reallocate_respondent(session, respondent, false, session.current_delay)
4545

4646
session.respondent
4747
end
@@ -56,7 +56,7 @@ defmodule Ask.Runtime.RetriesHistogram do
5656
def retry(session) do
5757
callback = fn ->
5858
%Session{respondent: %Respondent{} = respondent} = session
59-
reallocate_respondent(session, respondent, ivr?(session), Session.current_timeout(session))
59+
reallocate_respondent(session, respondent, ivr?(session), session.current_delay)
6060
end
6161

6262
run_safe(%{
@@ -146,7 +146,7 @@ defmodule Ask.Runtime.RetriesHistogram do
146146
) do
147147
# sms -> transition to active RetryStat
148148
if respondent.retry_stat_id,
149-
do: reallocate_respondent(session, respondent, false, Session.current_timeout(session))
149+
do: reallocate_respondent(session, respondent, false, session.current_delay)
150150
end
151151

152152
defp do_next_step(_respondent, _session, {:reply, _reply, _}) do

lib/ask/runtime/session.ex

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ defmodule Ask.Runtime.Session do
33
import Ecto
44

55
alias Ask.{
6+
Channel,
67
Repo,
78
QuotaBucket,
89
Respondent,
@@ -42,6 +43,7 @@ defmodule Ask.Runtime.Session do
4243
:flow,
4344
:respondent,
4445
:token,
46+
:current_delay,
4547
:fallback_delay,
4648
:count_partial_results,
4749
:schedule
@@ -69,16 +71,16 @@ defmodule Ask.Runtime.Session do
6971
) do
7072
flow = Flow.start(questionnaire, mode)
7173

74+
session_fallback_delay = fallback_delay || Survey.default_fallback_delay()
7275
session = %Session{
7376
current_mode: SessionModeProvider.new(mode, channel, retries),
7477
fallback_mode: SessionModeProvider.new(fallback_mode, fallback_channel, fallback_retries),
7578
flow: flow,
7679
respondent: update_section_order(respondent, flow.section_order, persist),
77-
fallback_delay: fallback_delay || Survey.default_fallback_delay(),
80+
fallback_delay: session_fallback_delay,
7881
count_partial_results: count_partial_results,
7982
schedule: schedule
8083
}
81-
8284
run_flow(session, persist)
8385
end
8486

@@ -105,25 +107,23 @@ defmodule Ask.Runtime.Session do
105107
{:ok, session, %Reply{}, base_timeout(session) + current_timeout(session)}
106108

107109
true ->
108-
timeout(session, nil)
110+
do_timeout(session)
109111
end
110112
end
111113

112-
def timeout(%{current_mode: %{retries: []}, fallback_mode: nil} = session, _) do
114+
defp do_timeout(%{current_mode: %{retries: []}, fallback_mode: nil} = session) do
113115
session = %{session | respondent: RetriesHistogram.remove_respondent(session.respondent)}
114116
terminate(session)
115117
end
116118

117-
def timeout(%{current_mode: %{retries: []}} = session, _) do
119+
defp do_timeout(%{current_mode: %{retries: []}} = session) do
118120
switch_to_fallback_mode(session)
119121
end
120122

121-
def timeout(%Session{} = session, _) do
122-
best_timeout_option = best_timeout_option(session)
123+
defp do_timeout(%Session{} = session) do
123124
session = retry(session)
124125

125-
# The new session will timeout as defined by hd(retries)
126-
{:ok, session, %Reply{}, best_timeout_option || current_timeout(session)}
126+
{:ok, session, %Reply{}, session.current_delay}
127127
end
128128

129129
@doc """
@@ -215,11 +215,11 @@ defmodule Ask.Runtime.Session do
215215
Respondent.update(respondent, %{section_order: section_order}, persist)
216216
end
217217

218-
def current_timeout(%Session{current_mode: %{retries: []}, fallback_delay: fallback_delay}) do
218+
defp current_timeout(%Session{current_mode: %{retries: []}, fallback_delay: fallback_delay}) do
219219
fallback_delay
220220
end
221221

222-
def current_timeout(%Session{current_mode: %{retries: [next_retry | _]}}) do
222+
defp current_timeout(%Session{current_mode: %{retries: [next_retry | _]}}) do
223223
next_retry
224224
end
225225

@@ -335,6 +335,7 @@ defmodule Ask.Runtime.Session do
335335
flow: session.flow |> Flow.dump(),
336336
respondent_id: session.respondent.id,
337337
token: session.token,
338+
current_delay: session.current_delay,
338339
fallback_delay: session.fallback_delay,
339340
count_partial_results: session.count_partial_results,
340341
schedule: session.schedule |> Schedule.dump!()
@@ -348,6 +349,7 @@ defmodule Ask.Runtime.Session do
348349
flow: Flow.load(state["flow"]),
349350
respondent: Repo.get(Ask.Respondent, state["respondent_id"]),
350351
token: state["token"],
352+
current_delay: state["current_delay"],
351353
fallback_delay: state["fallback_delay"],
352354
count_partial_results: state["count_partial_results"],
353355
schedule: state["schedule"] |> Schedule.load!()
@@ -601,6 +603,14 @@ defmodule Ask.Runtime.Session do
601603
|> add_mode_attempt.()
602604

603605
mode_start(session)
606+
|> update_current_delay
607+
end
608+
609+
defp update_current_delay({:ok, session, reply, timeout}) do
610+
{:ok, %{session | current_delay: timeout}, reply, timeout}
611+
end
612+
defp update_current_delay({:end, _, _} = flow_result) do
613+
flow_result
604614
end
605615

606616
defp apply_patterns_if_match(patterns, respondent, persist) do
@@ -628,7 +638,7 @@ defmodule Ask.Runtime.Session do
628638
defp log_prompts(reply, channel, mode, respondent, force \\ false, persist \\ true) do
629639
if persist do
630640
if force ||
631-
!ChannelBroker.has_delivery_confirmation?(channel.id) do
641+
!Channel.has_delivery_confirmation?(channel) do
632642
disposition = Reply.disposition(reply) || respondent.disposition
633643

634644
Enum.each(Reply.steps(reply), fn step ->
@@ -712,12 +722,6 @@ defmodule Ask.Runtime.Session do
712722
%{session | token: nil}
713723
end
714724

715-
defp best_timeout_option(%{current_mode: %{retries: retries}, fallback_mode: nil})
716-
when length(retries) == 1,
717-
do: hd(retries)
718-
719-
defp best_timeout_option(_), do: nil
720-
721725
defp terminate(%{current_mode: %SMSMode{}, respondent: respondent}) do
722726
{:failed, respondent}
723727
end
@@ -757,12 +761,19 @@ defmodule Ask.Runtime.Session do
757761
result
758762
end
759763

764+
defp consume_retry(%{current_mode: %{retries: [retry]}, fallback_mode: nil} = session) do
765+
%{session | current_mode: %{session.current_mode | retries: []}, current_delay: retry}
766+
end
767+
760768
defp consume_retry(%{current_mode: %{retries: [_ | retries]}} = session) do
761-
%{session | current_mode: %{session.current_mode | retries: retries}}
769+
session = %{session | current_mode: %{session.current_mode | retries: retries}}
770+
current_delay = current_timeout(session)
771+
%{session | current_delay: current_delay}
762772
end
763773

764774
defp consume_retry(%{current_mode: %{retries: []}} = session) do
765-
session
775+
current_delay = current_timeout(session)
776+
%{session | current_delay: current_delay}
766777
end
767778

768779
defp add_session_mode_attempt!(%Session{} = session),
@@ -831,7 +842,7 @@ defmodule Ask.Runtime.Session do
831842
persist
832843
)
833844

834-
{:ok, %{session | flow: flow}, reply, current_timeout(session)}
845+
{:ok, %{session | flow: flow}, reply, session.current_delay}
835846
end
836847
end
837848

@@ -841,7 +852,7 @@ defmodule Ask.Runtime.Session do
841852
{:failed, session.respondent}
842853

843854
_ ->
844-
{:hangup, %{session | flow: flow}, reply, current_timeout(session), session.respondent}
855+
{:hangup, %{session | flow: flow}, reply, session.current_delay, session.respondent}
845856
end
846857
end
847858

lib/ask/runtime/survey_broker.ex

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -223,22 +223,6 @@ defmodule Ask.Runtime.SurveyBroker do
223223
)
224224
end
225225

226-
def recontact_queued_respondents(respondent_ids) do
227-
# We're recontacting respondants that were queued
228-
# in channel broker after it fails
229-
Repo.all(
230-
from r in Respondent,
231-
select: r.id,
232-
where: r.id in ^respondent_ids
233-
)
234-
|> Enum.each(fn respondent_id ->
235-
respondent = Respondent |> Repo.get(respondent_id)
236-
session = respondent.session |> Session.load()
237-
# We have loaded everything neccesary, now contact them without consuming a retry
238-
Ask.Runtime.Session.contact_respondent(session)
239-
end)
240-
end
241-
242226
defp retry_respondents(now) do
243227
# Select projects that have respondents to retry, then retry them respecting the project's batch limit
244228
Repo.all(

lib/ask/runtime/verboice_channel.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,6 @@ defmodule Ask.Runtime.VerboiceChannel do
479479
def check_status(_), do: :up
480480

481481
defimpl Ask.Runtime.Channel, for: Ask.Runtime.VerboiceChannel do
482-
def has_delivery_confirmation?(_), do: false
483482
def ask(_, _, _, _, _), do: throw(:not_implemented)
484483
def prepare(_), do: :ok
485484
def messages_count(_, _, _, _, _), do: 1

0 commit comments

Comments
 (0)