From a96ad306453489e4ee14379f2bc0599c5c93f7fb Mon Sep 17 00:00:00 2001 From: Kostya Date: Thu, 20 Jul 2017 11:04:59 +0200 Subject: [PATCH] GapDetection and resend request logic --- src/fix_engine.ml | 21 ++++++----- src/fix_engine_transitions.ml | 66 ++++++++++++++++++++++++++--------- src/fix_engine_utils.ml | 21 +++++++++-- 3 files changed, 79 insertions(+), 29 deletions(-) diff --git a/src/fix_engine.ml b/src/fix_engine.ml index 763b1ab3..f76624f6 100644 --- a/src/fix_engine.ml +++ b/src/fix_engine.ml @@ -177,7 +177,6 @@ let proc_incoming_fix_msg ( m, engine : full_top_level_msg * fix_engine_state) = | NoActiveSession -> run_no_active_session ( msg, engine ) | LogonInitiated -> run_logon_sequence ( msg, engine ) | ActiveSession -> run_active_session ( msg, engine ) - | GapDetected -> run_recovery ( msg, engine ) | Recovery -> run_recovery ( msg, engine ) | ShutdownInitiated -> run_shutdown ( msg, engine ) | Error -> noop ( msg, engine ) @@ -205,18 +204,17 @@ let is_int_message_valid ( engine : fix_engine_state ) = ;; (** The main transition function. *) -let one_step ( engine : fix_engine_state ) = +let one_step ( engine : fix_engine_state ) = + match engine.fe_curr_mode with (** Check if we're in the middle of replaying our cache. *) - if engine.fe_curr_mode = CacheReplay then - run_cache_replay (engine) - + | CacheReplay -> run_cache_replay (engine) + (** If gap is detected -- we'll send resend request and move to recovery mode. *) + | GapDetected -> run_gap_detected (engine) (** If we still need to retransmit our messages out to the receiving engine. *) - else if engine.fe_curr_mode = Retransmit then - run_retransmit (engine) - - else - (** Now we look to process internal (coming from our application) and external (coming from - another FIX engine) messages. *) + | Retransmit -> run_retransmit (engine) + (** Now we look to process internal (coming from our application) and external (coming from + another FIX engine) messages. *) + | _ -> begin match engine.incoming_int_msg with | Some i -> proc_incoming_int_msg (i, { engine with incoming_int_msg = None } ) | None -> @@ -225,4 +223,5 @@ let one_step ( engine : fix_engine_state ) = | Some m -> proc_incoming_fix_msg (m, { engine with incoming_fix_msg = None } ) | None -> engine end + end ;; diff --git a/src/fix_engine_transitions.ml b/src/fix_engine_transitions.ml index d6d8421b..f77805f8 100644 --- a/src/fix_engine_transitions.ml +++ b/src/fix_engine_transitions.ml @@ -69,17 +69,26 @@ let run_no_active_session ( m, engine : full_valid_fix_msg * fix_engine_state ) fe_curr_status = MaxNumLogonMsgsViolated; } else begin - let engine' = { engine with fe_encrypt_method = d.ln_encrypt_method } in - let logon_msg = create_logon_msg ( engine' ) in { - engine' with - fe_initiator = Some false; - outgoing_fix_msg = Some (ValidMsg ( logon_msg )); - outgoing_seq_num = engine.outgoing_seq_num + 1; - fe_target_comp_id = m.full_msg_header.h_sender_comp_id; - fe_curr_mode = ActiveSession; - fe_last_time_data_sent = engine.fe_curr_time; - fe_num_logons_sent = engine.fe_num_logons_sent + 1; - fe_history = add_msg_to_history ( engine.fe_history, logon_msg ); + let engine' = { engine with fe_encrypt_method = d.ln_encrypt_method } in + let logon_msg = create_logon_msg ( engine' ) in + let engine'' = { engine' with + fe_initiator = Some false; + (* TODO -- check if we really have to accept all incoming senders *) + outgoing_fix_msg = Some (ValidMsg ( logon_msg )); + outgoing_seq_num = engine.outgoing_seq_num + 1; + fe_target_comp_id = m.full_msg_header.h_sender_comp_id; + fe_last_time_data_sent = engine.fe_curr_time; + fe_num_logons_sent = engine.fe_num_logons_sent + 1; + fe_history = add_msg_to_history ( engine.fe_history, logon_msg ); + } in + if msg_consistent ( engine, m.full_msg_header ) + then { engine'' with + fe_curr_mode = ActiveSession; + incoming_seq_num = m.full_msg_header.h_msg_seq_num; + fe_history = add_msg_to_history ( engine.fe_history, logon_msg ); + } else { engine'' with + incoming_seq_num = engine.incoming_seq_num + 1; + fe_curr_mode = GapDetected; } end end @@ -141,12 +150,11 @@ let run_active_session ( m, engine : full_valid_fix_msg * fix_engine_state ) = if not ( msg_consistent ( engine, m.full_msg_header ) ) then { (** We've detected an out-of sequence message. We therefore need to - transition into Recovery mode and initialize engine state with - the message. *) + transition into GapDetected mode. We place the message into the cahce. *) engine with - fe_curr_mode = Recovery; + fe_curr_mode = GapDetected; + incoming_seq_num = engine.incoming_seq_num + 1; fe_cache = [ m ]; - } else match m.full_msg_data with | Full_FIX_Admin_Msg adm_msg -> @@ -205,6 +213,21 @@ let run_active_session ( m, engine : full_valid_fix_msg * fix_engine_state ) = end ;; + +let run_gap_detected ( engine : fix_engine_state ) = + let resend_msg = create_resend_request_msg (engine) in + { engine with + fe_curr_mode = Recovery; + fe_last_time_data_sent = engine.fe_curr_time; + outgoing_fix_msg = Some ( ValidMsg ( resend_msg ) ); + fe_history = add_msg_to_history ( engine.fe_history, resend_msg ); + outgoing_seq_num = engine.outgoing_seq_num + 1; + } +;; + + + + (** Here we can only handle a subset of the FIX messages. *) let replay_single_msg ( m, engine : full_valid_fix_msg * fix_engine_state ) = match m.full_msg_data with @@ -272,9 +295,20 @@ let rec add_to_cache ( m, cache : full_valid_fix_msg * full_valid_fix_msg list ) m::x::xs else ( x :: ( add_to_cache (m, xs) ) ) ;; -(** We're in recovery mode. We should add any received messages to our cache. +(** We're in recovery mode. We should add any (except logoff) received messages to our cache. Check to see whether next message is complete. *) let run_recovery ( m, engine : full_valid_fix_msg * fix_engine_state ) = + match m.full_msg_data with + | Full_FIX_Admin_Msg (Full_Msg_Logoff data) -> + let logoff_msg = create_logoff_msg ( engine ) in { + engine with + fe_last_time_data_sent = engine.fe_curr_time; + fe_curr_mode = ShutdownInitiated; + outgoing_fix_msg = Some ( ValidMsg ( logoff_msg )); + outgoing_seq_num = engine.outgoing_seq_num + 1; + fe_history = add_msg_to_history ( engine.fe_history, logoff_msg ); + } + | _ -> let new_cache = add_to_cache (m, engine.fe_cache) in if is_cache_complete (new_cache, engine.incoming_seq_num) then { engine with diff --git a/src/fix_engine_utils.ml b/src/fix_engine_utils.ml index 8087c704..f3876c3c 100644 --- a/src/fix_engine_utils.ml +++ b/src/fix_engine_utils.ml @@ -118,7 +118,7 @@ let create_outbound_fix_msg ( osn, target_comp_id, our_comp_id, curr_time, msg, h_begin_string = default_session_details.constant_begin_string; h_body_length = 0; h_msg_seq_num = osn + 1; - h_poss_dup_flag = Some is_duplicate; + h_poss_dup_flag = if is_duplicate then Some is_duplicate else None; h_target_comp_id = target_comp_id; h_sender_comp_id = our_comp_id; h_sending_time = curr_time; @@ -163,7 +163,7 @@ let create_logon_msg ( engine : fix_engine_state ) = ln_heartbeat_interval = engine.fe_heartbeat_interval; ln_raw_data_length = None; ln_raw_data = None; - ln_reset_seq_num_flag = Some true; + ln_reset_seq_num_flag = None; (* Some true; *) ln_next_expected_msg_seq_num = None; ln_max_message_size = None; @@ -223,6 +223,23 @@ let create_test_request_msg ( engine : fix_engine_state ) = ) ;; +(** Create Resend Request message. *) +let create_resend_request_msg ( engine : fix_engine_state ) = + let msg_data = Full_FIX_Admin_Msg ( + Full_Msg_Resend_Request { + rr_begin_seq_num = engine.incoming_seq_num; + rr_end_seq_num = 0 (* (represents infinity) *) + } + ) in + create_outbound_fix_msg ( + engine.outgoing_seq_num, engine.fe_target_comp_id, + engine.fe_comp_id, engine.fe_curr_time, + msg_data, false + ) +;; + + + (** Create session-rejection message. *) let create_session_reject_msg ( outbound_seq_num, target_comp_id, comp_id, curr_time, reject_info : int * fix_string * fix_string * fix_utctimestamp * session_rejected_msg_data ) =