Skip to content

Commit

Permalink
GapDetection and resend request logic
Browse files Browse the repository at this point in the history
  • Loading branch information
KKostya committed Jul 20, 2017
1 parent d383119 commit a96ad30
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 29 deletions.
21 changes: 10 additions & 11 deletions src/fix_engine.ml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ let proc_incoming_fix_msg ( m, engine : full_top_level_msg * fix_engine_state) =
| NoActiveSession -> run_no_active_session ( msg, engine )
| LogonInitiated -> run_logon_sequence ( msg, engine )
| ActiveSession -> run_active_session ( msg, engine )
| GapDetected -> run_recovery ( msg, engine )
| Recovery -> run_recovery ( msg, engine )
| ShutdownInitiated -> run_shutdown ( msg, engine )
| Error -> noop ( msg, engine )
Expand Down Expand Up @@ -205,18 +204,17 @@ let is_int_message_valid ( engine : fix_engine_state ) =
;;

(** The main transition function. *)
let one_step ( engine : fix_engine_state ) =
let one_step ( engine : fix_engine_state ) =
match engine.fe_curr_mode with
(** Check if we're in the middle of replaying our cache. *)
if engine.fe_curr_mode = CacheReplay then
run_cache_replay (engine)

| CacheReplay -> run_cache_replay (engine)
(** If gap is detected -- we'll send resend request and move to recovery mode. *)
| GapDetected -> run_gap_detected (engine)
(** If we still need to retransmit our messages out to the receiving engine. *)
else if engine.fe_curr_mode = Retransmit then
run_retransmit (engine)

else
(** Now we look to process internal (coming from our application) and external (coming from
another FIX engine) messages. *)
| Retransmit -> run_retransmit (engine)
(** Now we look to process internal (coming from our application) and external (coming from
another FIX engine) messages. *)
| _ -> begin
match engine.incoming_int_msg with
| Some i -> proc_incoming_int_msg (i, { engine with incoming_int_msg = None } )
| None ->
Expand All @@ -225,4 +223,5 @@ let one_step ( engine : fix_engine_state ) =
| Some m -> proc_incoming_fix_msg (m, { engine with incoming_fix_msg = None } )
| None -> engine
end
end
;;
66 changes: 50 additions & 16 deletions src/fix_engine_transitions.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,26 @@ let run_no_active_session ( m, engine : full_valid_fix_msg * fix_engine_state )
fe_curr_status = MaxNumLogonMsgsViolated;
} else
begin
let engine' = { engine with fe_encrypt_method = d.ln_encrypt_method } in
let logon_msg = create_logon_msg ( engine' ) in {
engine' with
fe_initiator = Some false;
outgoing_fix_msg = Some (ValidMsg ( logon_msg ));
outgoing_seq_num = engine.outgoing_seq_num + 1;
fe_target_comp_id = m.full_msg_header.h_sender_comp_id;
fe_curr_mode = ActiveSession;
fe_last_time_data_sent = engine.fe_curr_time;
fe_num_logons_sent = engine.fe_num_logons_sent + 1;
fe_history = add_msg_to_history ( engine.fe_history, logon_msg );
let engine' = { engine with fe_encrypt_method = d.ln_encrypt_method } in
let logon_msg = create_logon_msg ( engine' ) in
let engine'' = { engine' with
fe_initiator = Some false;
(* TODO -- check if we really have to accept all incoming senders *)
outgoing_fix_msg = Some (ValidMsg ( logon_msg ));
outgoing_seq_num = engine.outgoing_seq_num + 1;
fe_target_comp_id = m.full_msg_header.h_sender_comp_id;
fe_last_time_data_sent = engine.fe_curr_time;
fe_num_logons_sent = engine.fe_num_logons_sent + 1;
fe_history = add_msg_to_history ( engine.fe_history, logon_msg );
} in
if msg_consistent ( engine, m.full_msg_header )
then { engine'' with
fe_curr_mode = ActiveSession;
incoming_seq_num = m.full_msg_header.h_msg_seq_num;
fe_history = add_msg_to_history ( engine.fe_history, logon_msg );
} else { engine'' with
incoming_seq_num = engine.incoming_seq_num + 1;
fe_curr_mode = GapDetected;
}
end
end
Expand Down Expand Up @@ -141,12 +150,11 @@ let run_active_session ( m, engine : full_valid_fix_msg * fix_engine_state ) =

if not ( msg_consistent ( engine, m.full_msg_header ) ) then {
(** We've detected an out-of sequence message. We therefore need to
transition into Recovery mode and initialize engine state with
the message. *)
transition into GapDetected mode. We place the message into the cahce. *)
engine with
fe_curr_mode = Recovery;
fe_curr_mode = GapDetected;
incoming_seq_num = engine.incoming_seq_num + 1;
fe_cache = [ m ];

} else
match m.full_msg_data with
| Full_FIX_Admin_Msg adm_msg ->
Expand Down Expand Up @@ -205,6 +213,21 @@ let run_active_session ( m, engine : full_valid_fix_msg * fix_engine_state ) =
end
;;


let run_gap_detected ( engine : fix_engine_state ) =
let resend_msg = create_resend_request_msg (engine) in
{ engine with
fe_curr_mode = Recovery;
fe_last_time_data_sent = engine.fe_curr_time;
outgoing_fix_msg = Some ( ValidMsg ( resend_msg ) );
fe_history = add_msg_to_history ( engine.fe_history, resend_msg );
outgoing_seq_num = engine.outgoing_seq_num + 1;
}
;;




(** Here we can only handle a subset of the FIX messages. *)
let replay_single_msg ( m, engine : full_valid_fix_msg * fix_engine_state ) =
match m.full_msg_data with
Expand Down Expand Up @@ -272,9 +295,20 @@ let rec add_to_cache ( m, cache : full_valid_fix_msg * full_valid_fix_msg list )
m::x::xs else ( x :: ( add_to_cache (m, xs) ) )
;;

(** We're in recovery mode. We should add any received messages to our cache.
(** We're in recovery mode. We should add any (except logoff) received messages to our cache.
Check to see whether next message is complete. *)
let run_recovery ( m, engine : full_valid_fix_msg * fix_engine_state ) =
match m.full_msg_data with
| Full_FIX_Admin_Msg (Full_Msg_Logoff data) ->
let logoff_msg = create_logoff_msg ( engine ) in {
engine with
fe_last_time_data_sent = engine.fe_curr_time;
fe_curr_mode = ShutdownInitiated;
outgoing_fix_msg = Some ( ValidMsg ( logoff_msg ));
outgoing_seq_num = engine.outgoing_seq_num + 1;
fe_history = add_msg_to_history ( engine.fe_history, logoff_msg );
}
| _ ->
let new_cache = add_to_cache (m, engine.fe_cache) in
if is_cache_complete (new_cache, engine.incoming_seq_num) then {
engine with
Expand Down
21 changes: 19 additions & 2 deletions src/fix_engine_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ let create_outbound_fix_msg ( osn, target_comp_id, our_comp_id, curr_time, msg,
h_begin_string = default_session_details.constant_begin_string;
h_body_length = 0;
h_msg_seq_num = osn + 1;
h_poss_dup_flag = Some is_duplicate;
h_poss_dup_flag = if is_duplicate then Some is_duplicate else None;
h_target_comp_id = target_comp_id;
h_sender_comp_id = our_comp_id;
h_sending_time = curr_time;
Expand Down Expand Up @@ -163,7 +163,7 @@ let create_logon_msg ( engine : fix_engine_state ) =
ln_heartbeat_interval = engine.fe_heartbeat_interval;
ln_raw_data_length = None;
ln_raw_data = None;
ln_reset_seq_num_flag = Some true;
ln_reset_seq_num_flag = None; (* Some true; *)
ln_next_expected_msg_seq_num = None;
ln_max_message_size = None;

Expand Down Expand Up @@ -223,6 +223,23 @@ let create_test_request_msg ( engine : fix_engine_state ) =
)
;;

(** Create Resend Request message. *)
let create_resend_request_msg ( engine : fix_engine_state ) =
let msg_data = Full_FIX_Admin_Msg (
Full_Msg_Resend_Request {
rr_begin_seq_num = engine.incoming_seq_num;
rr_end_seq_num = 0 (* (represents infinity) *)
}
) in
create_outbound_fix_msg (
engine.outgoing_seq_num, engine.fe_target_comp_id,
engine.fe_comp_id, engine.fe_curr_time,
msg_data, false
)
;;



(** Create session-rejection message. *)
let create_session_reject_msg ( outbound_seq_num, target_comp_id, comp_id, curr_time, reject_info :
int * fix_string * fix_string * fix_utctimestamp * session_rejected_msg_data ) =
Expand Down

0 comments on commit a96ad30

Please sign in to comment.