diff --git a/examples/1_hello_world/hello_client.py b/examples/1_hello_world/hello_client.py index 4c0d231..0b22a24 100644 --- a/examples/1_hello_world/hello_client.py +++ b/examples/1_hello_world/hello_client.py @@ -2,12 +2,14 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) @@ -42,16 +44,18 @@ def simple_client_callback( In most cases, everything under from_config_file should come from a configuration file, command line arguments, or environment variables. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] """ step two: construct the initial messages you want to send. In this case we will only send a single starting message. @@ -73,7 +77,7 @@ def simple_client_callback( ] config = IntersectClientConfig( initial_message_event_config=IntersectClientCallback(messages_to_send=initial_messages), - **from_config_file, + brokers=brokers, ) """ diff --git a/examples/1_hello_world/hello_service.py b/examples/1_hello_world/hello_service.py index 1b91819..e43f3cf 100644 --- a/examples/1_hello_world/hello_service.py +++ b/examples/1_hello_world/hello_service.py @@ -1,6 +1,7 @@ import logging from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectService, @@ -9,6 +10,7 @@ intersect_message, intersect_status, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -44,16 +46,19 @@ def say_hello_to_name(self, name: str) -> str: In most cases, everything under from_config_file should come from a configuration file, command line arguments, or environment variables. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] + config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='hello-organization', @@ -62,7 +67,7 @@ def say_hello_to_name(self, name: str) -> str: subsystem='hello-subsystem', service='hello-service', ), - **from_config_file, + brokers=brokers, ) """ diff --git a/examples/1_hello_world_amqp/hello_client.py b/examples/1_hello_world_amqp/hello_client.py index 688a19b..d036a29 100644 --- a/examples/1_hello_world_amqp/hello_client.py +++ b/examples/1_hello_world_amqp/hello_client.py @@ -2,12 +2,14 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logging.getLogger('pika').setLevel(logging.WARNING) @@ -43,16 +45,18 @@ def simple_client_callback( In most cases, everything under from_config_file should come from a configuration file, command line arguments, or environment variables. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 5672, - 'protocol': 'amqp0.9.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 5672}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='amqp0.9.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] """ step two: construct the initial messages you want to send. In this case we will only send a single starting message. @@ -75,7 +79,7 @@ def simple_client_callback( ] config = IntersectClientConfig( initial_message_event_config=IntersectClientCallback(messages_to_send=initial_messages), - **from_config_file, + brokers=brokers, ) """ diff --git a/examples/1_hello_world_amqp/hello_service.py b/examples/1_hello_world_amqp/hello_service.py index 6b4369c..96c98a9 100644 --- a/examples/1_hello_world_amqp/hello_service.py +++ b/examples/1_hello_world_amqp/hello_service.py @@ -1,6 +1,7 @@ import logging from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectService, @@ -9,6 +10,7 @@ intersect_message, intersect_status, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logging.getLogger('pika').setLevel(logging.WARNING) @@ -45,16 +47,19 @@ def say_hello_to_name(self, name: str) -> str: In most cases, everything under from_config_file should come from a configuration file, command line arguments, or environment variables. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 5672, - 'protocol': 'amqp0.9.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 5672}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='amqp0.9.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] + config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='hello-organization', @@ -63,7 +68,7 @@ def say_hello_to_name(self, name: str) -> str: subsystem='hello-subsystem', service='hello-service', ), - **from_config_file, + brokers=brokers, ) """ diff --git a/examples/1_hello_world_events/hello_client.py b/examples/1_hello_world_events/hello_client.py index 3b1081b..d872d79 100644 --- a/examples/1_hello_world_events/hello_client.py +++ b/examples/1_hello_world_events/hello_client.py @@ -2,12 +2,14 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) @@ -63,16 +65,18 @@ def simple_event_callback( In most cases, everything under from_config_file should come from a configuration file, command line arguments, or environment variables. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] """ step two: construct the initial messages you want to send. In this case we will only send a single starting message. @@ -99,7 +103,7 @@ def simple_event_callback( 'hello-organization.hello-facility.hello-system.hello-subsystem.hello-service' ], ), - **from_config_file, + brokers=brokers, ) """ diff --git a/examples/1_hello_world_events/hello_service.py b/examples/1_hello_world_events/hello_service.py index 90f3beb..10204b8 100644 --- a/examples/1_hello_world_events/hello_service.py +++ b/examples/1_hello_world_events/hello_service.py @@ -1,6 +1,7 @@ import logging from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectEventDefinition, @@ -10,6 +11,7 @@ intersect_message, intersect_status, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -48,16 +50,18 @@ def say_hello_to_name(self, name: str) -> str: In most cases, everything under from_config_file should come from a configuration file, command line arguments, or environment variables. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='hello-organization', @@ -66,7 +70,7 @@ def say_hello_to_name(self, name: str) -> str: subsystem='hello-subsystem', service='hello-service', ), - **from_config_file, + brokers=brokers, ) """ diff --git a/examples/1_hello_world_minio/hello_client.py b/examples/1_hello_world_minio/hello_client.py index a78d1fd..9b3ef37 100644 --- a/examples/1_hello_world_minio/hello_client.py +++ b/examples/1_hello_world_minio/hello_client.py @@ -2,6 +2,7 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, @@ -9,6 +10,7 @@ IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) @@ -43,6 +45,20 @@ def simple_client_callback( In most cases, everything under from_config_file should come from a configuration file, command line arguments, or environment variables. """ + + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] + from_config_file = { # NOTE: for this example, you will need a MINIO instance configured at this stage. 'data_stores': { @@ -53,15 +69,7 @@ def simple_client_callback( 'port': 9000, }, ], - }, - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], + } } """ step two: construct the initial messages you want to send. In this case we will only send a single starting message. @@ -86,6 +94,7 @@ def simple_client_callback( ] config = IntersectClientConfig( initial_message_event_config=IntersectClientCallback(messages_to_send=initial_messages), + brokers=brokers, **from_config_file, ) diff --git a/examples/1_hello_world_minio/hello_service.py b/examples/1_hello_world_minio/hello_service.py index d0b034a..e204ca5 100644 --- a/examples/1_hello_world_minio/hello_service.py +++ b/examples/1_hello_world_minio/hello_service.py @@ -1,6 +1,7 @@ import logging from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectDataHandler, @@ -10,6 +11,7 @@ intersect_message, intersect_status, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -57,15 +59,21 @@ def say_hello_to_name(self, name: str) -> str: }, ], }, - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], } + + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] + config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='hello-organization', @@ -74,6 +82,7 @@ def say_hello_to_name(self, name: str) -> str: subsystem='hello-subsystem', service='hello-service', ), + brokers=brokers, **from_config_file, ) diff --git a/examples/2_counting/counting_client.py b/examples/2_counting/counting_client.py index fe23cc5..98fa98c 100644 --- a/examples/2_counting/counting_client.py +++ b/examples/2_counting/counting_client.py @@ -4,12 +4,14 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) @@ -135,16 +137,18 @@ def client_callback( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] # The counter will start after the initial message. # If the service is already active and counting, this may do nothing. @@ -157,7 +161,7 @@ def client_callback( ] config = IntersectClientConfig( initial_message_event_config=IntersectClientCallback(messages_to_send=initial_messages), - **from_config_file, + brokers=brokers, ) orchestrator = SampleOrchestrator() client = IntersectClient( diff --git a/examples/2_counting/counting_service.py b/examples/2_counting/counting_service.py index a02f4b9..8d272e5 100644 --- a/examples/2_counting/counting_service.py +++ b/examples/2_counting/counting_service.py @@ -8,6 +8,7 @@ from typing_extensions import Annotated from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectService, @@ -16,6 +17,7 @@ intersect_message, intersect_status, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -163,16 +165,18 @@ def _run_count(self) -> None: if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='counting-organization', @@ -182,7 +186,7 @@ def _run_count(self) -> None: service='counting-service', ), status_interval=30.0, - **from_config_file, + brokers=brokers, ) capability = CountingServiceCapabilityImplementation() service = IntersectService([capability], config) diff --git a/examples/2_counting_events/counting_client.py b/examples/2_counting_events/counting_client.py index f04194a..a89e730 100644 --- a/examples/2_counting_events/counting_client.py +++ b/examples/2_counting_events/counting_client.py @@ -2,11 +2,13 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) @@ -53,16 +55,18 @@ def event_callback( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] # start listening to events from the counting service config = IntersectClientConfig( @@ -71,7 +75,7 @@ def event_callback( 'counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', ] ), - **from_config_file, + brokers=brokers, ) orchestrator = SampleOrchestrator() client = IntersectClient( diff --git a/examples/2_counting_events/counting_service.py b/examples/2_counting_events/counting_service.py index 2d6d06d..7f5fc0c 100644 --- a/examples/2_counting_events/counting_service.py +++ b/examples/2_counting_events/counting_service.py @@ -3,6 +3,7 @@ import time from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectEventDefinition, @@ -11,6 +12,7 @@ default_intersect_lifecycle_loop, intersect_event, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -54,16 +56,18 @@ def increment_counter_function(self) -> None: if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='counting-organization', @@ -73,7 +77,7 @@ def increment_counter_function(self) -> None: service='counting-service', ), status_interval=30.0, - **from_config_file, + brokers=brokers, ) capability = CountingServiceCapabilityImplementation() service = IntersectService([capability], config) diff --git a/examples/3_ping_pong_events/ping_pong_client.py b/examples/3_ping_pong_events/ping_pong_client.py index 2f2b476..fb533bf 100644 --- a/examples/3_ping_pong_events/ping_pong_client.py +++ b/examples/3_ping_pong_events/ping_pong_client.py @@ -2,11 +2,13 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) @@ -56,16 +58,20 @@ def event_callback( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + from intersect_sdk.config.shared import BrokerConfig + + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] # we initially only listen for ping service events, config = IntersectClientConfig( @@ -74,7 +80,7 @@ def event_callback( PING_SERVICE, ] ), - **from_config_file, + brokers=brokers, ) orchestrator = SampleOrchestrator() client = IntersectClient( diff --git a/examples/3_ping_pong_events/service_runner.py b/examples/3_ping_pong_events/service_runner.py index d3738e0..ae1ba4c 100644 --- a/examples/3_ping_pong_events/service_runner.py +++ b/examples/3_ping_pong_events/service_runner.py @@ -4,12 +4,14 @@ from abc import ABC, abstractmethod from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectService, IntersectServiceConfig, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logger = logging.getLogger(__name__) @@ -27,16 +29,18 @@ def run_service(capability: P_ngBaseCapabilityImplementation) -> None: The interesting configuration mostly happens in the Client, look at that one for details. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] service_name = capability.__class__.__name__[:4].lower() config = IntersectServiceConfig( hierarchy=HierarchyConfig( @@ -46,8 +50,8 @@ def run_service(capability: P_ngBaseCapabilityImplementation) -> None: subsystem='p-ng-subsystem', service=f'{service_name}-service', ), + brokers=brokers, status_interval=30.0, - **from_config_file, ) service = IntersectService([capability], config) logger.info('Starting %s_service, use Ctrl+C to exit.', service_name) diff --git a/examples/3_ping_pong_events_amqp/ping_pong_client.py b/examples/3_ping_pong_events_amqp/ping_pong_client.py index e055086..71be318 100644 --- a/examples/3_ping_pong_events_amqp/ping_pong_client.py +++ b/examples/3_ping_pong_events_amqp/ping_pong_client.py @@ -2,11 +2,13 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logging.getLogger('pika').setLevel(logging.WARNING) @@ -60,16 +62,18 @@ def event_callback( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 5672, - 'protocol': 'amqp0.9.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] # we initially only listen for ping service events, config = IntersectClientConfig( @@ -78,7 +82,7 @@ def event_callback( PING_SERVICE, ] ), - **from_config_file, + brokers=brokers, ) orchestrator = SampleOrchestrator() client = IntersectClient( diff --git a/examples/3_ping_pong_events_amqp/service_runner.py b/examples/3_ping_pong_events_amqp/service_runner.py index 4c9ff16..ae1ba4c 100644 --- a/examples/3_ping_pong_events_amqp/service_runner.py +++ b/examples/3_ping_pong_events_amqp/service_runner.py @@ -4,12 +4,14 @@ from abc import ABC, abstractmethod from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectService, IntersectServiceConfig, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logger = logging.getLogger(__name__) @@ -27,16 +29,18 @@ def run_service(capability: P_ngBaseCapabilityImplementation) -> None: The interesting configuration mostly happens in the Client, look at that one for details. """ - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 5672, - 'protocol': 'amqp0.9.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] service_name = capability.__class__.__name__[:4].lower() config = IntersectServiceConfig( hierarchy=HierarchyConfig( @@ -46,8 +50,8 @@ def run_service(capability: P_ngBaseCapabilityImplementation) -> None: subsystem='p-ng-subsystem', service=f'{service_name}-service', ), + brokers=brokers, status_interval=30.0, - **from_config_file, ) service = IntersectService([capability], config) logger.info('Starting %s_service, use Ctrl+C to exit.', service_name) diff --git a/examples/4_service_to_service/example_1_service.py b/examples/4_service_to_service/example_1_service.py index 63e365a..69ff754 100644 --- a/examples/4_service_to_service/example_1_service.py +++ b/examples/4_service_to_service/example_1_service.py @@ -3,6 +3,7 @@ import logging from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectDirectMessageParams, @@ -14,6 +15,7 @@ intersect_message, intersect_status, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -65,16 +67,18 @@ def additional_service_handler( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='example-organization', @@ -84,7 +88,7 @@ def additional_service_handler( service='service-one', ), status_interval=30.0, - **from_config_file, + brokers=brokers, ) capability = ExampleServiceOneCapabilityImplementation() service = IntersectService([capability], config) diff --git a/examples/4_service_to_service/example_2_service.py b/examples/4_service_to_service/example_2_service.py index 18ea230..c45734a 100644 --- a/examples/4_service_to_service/example_2_service.py +++ b/examples/4_service_to_service/example_2_service.py @@ -3,6 +3,7 @@ import logging from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectService, @@ -11,6 +12,7 @@ intersect_message, intersect_status, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -33,16 +35,18 @@ def test_service(self, text: str) -> str: if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='example-organization', @@ -52,7 +56,7 @@ def test_service(self, text: str) -> str: service='service-two', ), status_interval=30.0, - **from_config_file, + brokers=brokers, ) capability = ExampleServiceTwoCapabilityImplementation() service = IntersectService([capability], config) diff --git a/examples/4_service_to_service/example_client.py b/examples/4_service_to_service/example_client.py index 685e765..e7dd092 100644 --- a/examples/4_service_to_service/example_client.py +++ b/examples/4_service_to_service/example_client.py @@ -9,12 +9,14 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -47,16 +49,18 @@ def event_callback( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 1883, - 'protocol': 'mqtt3.1.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] # The counter will start after the initial message. # If the service is already active and counting, this may do nothing. @@ -74,7 +78,7 @@ def event_callback( 'example-organization.example-facility.example-system.example-subsystem.service-one' ], ), - **from_config_file, + brokers=brokers, ) orchestrator = SampleOrchestrator() client = IntersectClient( diff --git a/examples/4_service_to_service_events/example_client.py b/examples/4_service_to_service_events/example_client.py index 1f8198f..652b840 100644 --- a/examples/4_service_to_service_events/example_client.py +++ b/examples/4_service_to_service_events/example_client.py @@ -8,11 +8,13 @@ from intersect_sdk import ( INTERSECT_JSON_VALUE, + ControlPlaneConfig, IntersectClient, IntersectClientCallback, IntersectClientConfig, default_intersect_lifecycle_loop, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -48,16 +50,18 @@ def event_callback( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 5672, - 'protocol': 'amqp0.9.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] # Listen for an event on the exposed service config = IntersectClientConfig( @@ -66,7 +70,7 @@ def event_callback( 'example-organization.example-facility.example-system.example-subsystem.exposed-service' ], ), - **from_config_file, + brokers=brokers, ) orchestrator = SampleOrchestrator() client = IntersectClient( diff --git a/examples/4_service_to_service_events/exposed_service.py b/examples/4_service_to_service_events/exposed_service.py index 73a46e4..04c4d62 100644 --- a/examples/4_service_to_service_events/exposed_service.py +++ b/examples/4_service_to_service_events/exposed_service.py @@ -7,6 +7,7 @@ import logging from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectEventDefinition, @@ -15,6 +16,7 @@ default_intersect_lifecycle_loop, intersect_event, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -60,16 +62,18 @@ def on_internal_service_event( if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 5672, - 'protocol': 'amqp0.9.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='example-organization', @@ -79,7 +83,7 @@ def on_internal_service_event( service='exposed-service', ), status_interval=30.0, - **from_config_file, + brokers=brokers, ) capability = ExposedServiceCapabilityImplementation() service = IntersectService([capability], config) diff --git a/examples/4_service_to_service_events/internal_service.py b/examples/4_service_to_service_events/internal_service.py index fdfc9c4..4217a71 100644 --- a/examples/4_service_to_service_events/internal_service.py +++ b/examples/4_service_to_service_events/internal_service.py @@ -9,6 +9,7 @@ import time from intersect_sdk import ( + ControlPlaneConfig, HierarchyConfig, IntersectBaseCapabilityImplementation, IntersectEventDefinition, @@ -17,6 +18,7 @@ default_intersect_lifecycle_loop, intersect_event, ) +from intersect_sdk.config.shared import BrokerConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -43,16 +45,18 @@ def internal_service_event_generator(self) -> str: if __name__ == '__main__': - from_config_file = { - 'brokers': [ - { - 'username': 'intersect_username', - 'password': 'intersect_password', - 'port': 5672, - 'protocol': 'amqp0.9.1', - }, - ], - } + broker_configs = [ + {'host': 'localhost', 'port': 1883}, + ] + + brokers = [ + ControlPlaneConfig( + protocol='mqtt3.1.1', + username='intersect_username', + password='intersect_password', + brokers=[BrokerConfig(**broker) for broker in broker_configs], + ) + ] config = IntersectServiceConfig( hierarchy=HierarchyConfig( organization='example-organization', @@ -62,7 +66,7 @@ def internal_service_event_generator(self) -> str: service='internal-service', ), status_interval=30.0, - **from_config_file, + brokers=brokers, ) capability = InternalServiceCapabilityImplementation() service = IntersectService([capability], config) diff --git a/src/intersect_sdk/__init__.py b/src/intersect_sdk/__init__.py index 9eb584d..7aabe22 100644 --- a/src/intersect_sdk/__init__.py +++ b/src/intersect_sdk/__init__.py @@ -47,6 +47,7 @@ 'INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE', 'INTERSECT_JSON_VALUE', 'INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE', + 'BrokerConfig', 'ControlPlaneConfig', 'ControlProvider', 'DataStoreConfig', diff --git a/src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py b/src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py index ad34bc1..92641cb 100644 --- a/src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py +++ b/src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py @@ -10,7 +10,9 @@ from __future__ import annotations import functools +import random import threading +import time from hashlib import sha384 from typing import TYPE_CHECKING, Callable @@ -28,11 +30,73 @@ from pika.frame import Frame from pika.spec import Basic, BasicProperties + from ....config.shared import BrokerConfig from ..topic_handler import TopicHandler _AMQP_MAX_RETRIES = 10 + +class ClusterConnectionParameters: + """Configuration for an AMQP cluster. + + Attributes: + brokers: List of broker configurations for AMQP cluster nodes + username: AMQP broker username + password: AMQP broker password + connection_attempts: Number of connection attempts per node + connection_retry_delay: Delay between connection attempts in seconds + """ + + def __init__( + self, + brokers: list[BrokerConfig], + username: str, + password: str, + connection_attempts: int = 3, + connection_retry_delay: float = 2.0, + ) -> None: + """Initialize cluster connection parameters. + + Args: + brokers: List of broker configurations for AMQP cluster nodes + username: AMQP broker username + password: AMQP broker password + connection_attempts: Number of connection attempts per node + connection_retry_delay: Delay between connection attempts in seconds + """ + self.brokers = brokers + self.username = username + self.password = password + self.connection_attempts = connection_attempts + self.connection_retry_delay = connection_retry_delay + + def get_random_node(self) -> BrokerConfig: + """Get a random node from the cluster. + + Returns: + A BrokerConfig randomly selected from the available cluster nodes + """ + index = random.randint(0, len(self.brokers) - 1) # noqa: S311 + return self.brokers[index] + + def get_next_node(self, previous_index: int | None = None) -> tuple[int, BrokerConfig]: + """Get the next node in a round-robin fashion. + + Args: + previous_index: Index of the previously used node + + Returns: + A tuple containing (index, BrokerConfig) of the next node to try + """ + if previous_index is None or previous_index >= len(self.brokers) - 1: + next_index = 0 + else: + next_index = previous_index + 1 + + return next_index, self.brokers[next_index] + + # Note that we deliberately do NOT want this configurable at runtime. Any two INTERSECT services/clients could potentially exchange messages between one another. _INTERSECT_MESSAGE_EXCHANGE = 'intersect-messages' """All INTERSECT messages get published to one exchange on the broker.""" @@ -64,14 +128,33 @@ def _amqp_2_hierarchy(amqp_routing_key: str) -> str: return amqp_routing_key.replace('.', '/') +def _check_broker_connectivity(host: str, port: int, timeout: int = 2.0) -> bool: + """Check if we can connect to a broker host:port. + + Returns True if reachable, False otherwise. + """ + try: + import socket + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + result = sock.connect_ex((host, port)) + sock.close() + return result == 0 # noqa: TRY300 annoyingly, lint complains about this if I put it in an if/else also. It's circular in suggestions. + except socket.gaierror: + # Handle name resolution errors specially + return False + return True + + class AMQPClient(BrokerClient): """Client for performing broker actions backed by a AMQP broker. NOTE: Currently, thread safety has been attempted, but may not be guaranteed Attributes: - id: A string representation of the client's UUID. - _connection_params: connection information to the AMQP broker (includes credentials) + username: Username for broker authentication. + password: Password for broker authentication. _publish_connection: AMQP connection dedicated to publishing messages _consume_connection: AMQP connection dedicated to consuming messages _topics_to_handlers: Dictionary of string topic names to lists of @@ -80,28 +163,19 @@ class AMQPClient(BrokerClient): def __init__( self, - host: str, - port: int, - username: str, - password: str, topics_to_handlers: Callable[[], dict[str, TopicHandler]], + cluster_params: ClusterConnectionParameters, ) -> None: """The default constructor. Args: - host: String for hostname of AMQP broker - port: port number of AMQP broker username: username credentials for AMQP broker password: password credentials for AMQP broker topics_to_handlers: callback function which gets the topic to handler map from the channel manager + cluster_params: Optional cluster configuration parameters. If provided, brokers/username/password are ignored. """ - self._connection_params = pika.ConnectionParameters( - host=host, - port=port, - virtual_host='/', - credentials=pika.PlainCredentials(username, password), - connection_attempts=3, - ) + self._cluster_params: ClusterConnectionParameters = cluster_params + self._current_broker_index: int = None # The pika connection to the broker self._connection: pika.adapters.SelectConnection = None @@ -109,6 +183,7 @@ def __init__( self._channel_out: Channel = None self._thread: threading.Thread | None = None + self._node_reconnect_thread: threading.Thread | None = None # Callback to the topics_to_handler list inside of self._topics_to_handlers = topics_to_handlers @@ -120,6 +195,7 @@ def __init__( self._unrecoverable = False # tracking both channels is the best way to handle continuations self._channel_flags = MultiFlagThreadEvent(2) + self._cluster_connection_attempts = 0 def connect(self) -> None: """Connect to the defined broker. @@ -127,13 +203,25 @@ def connect(self) -> None: Try to connect to the broker, performing exponential backoff if connection fails. """ self._should_disconnect = False + self._unrecoverable = False + self._connection_retries = 0 self._channel_flags.unset_all() - if not self._thread: - self._thread = threading.Thread(target=self._init_connection, daemon=True) + self._current_broker_index = 0 + logger.info(f'Starting with broker index {self._current_broker_index} (first broker)') + + if not self._thread or not self._thread.is_alive(): + self._thread = threading.Thread( + target=self._init_connection, daemon=True, name='amqp_init_connection' + ) self._thread.start() + timeout = 30.0 + start_time = time.time() while not self._channel_flags.is_set(): + if time.time() - start_time > timeout: + logger.error(f'Timeout waiting for AMQP connection after {timeout} seconds') + break self._channel_flags.wait(1.0) def disconnect(self) -> None: @@ -156,11 +244,16 @@ def is_connected(self) -> bool: Returns: A boolean. True if there is a connection, False if not. """ - # We are connected to the broker if either the publish or consume connections is open - return self._connection is not None and ( - not self._connection.is_closed or not self._connection.is_closing + # We need to check both the connection and at least one channel + is_conn_ok = self._connection is not None and not self._connection.is_closed + has_channel = (self._channel_in is not None and self._channel_in.is_open) or ( + self._channel_out is not None and self._channel_out.is_open ) + # Only consider ourselves connected if we have both a connection and at least one channel + logger.debug(f'Connection status: conn={is_conn_ok}, channel={has_channel}') + return is_conn_ok and has_channel + def considered_unrecoverable(self) -> bool: return self._unrecoverable @@ -233,62 +326,297 @@ def _cancel_consumer_tag_cb(self, _frame: pika.frame.Frame, topic: str) -> None: # BEGIN CALLBACKS + THREADSAFE FUNCTIONS # def _init_connection(self) -> None: - """Open the consuming connection and start its io loop. - - NOTE: ANY functions which are not eventually called from this function - should be called via self._connection.ioloop.add_callback_threadsafe(cb) - """ - while not self._should_disconnect: - self._connection = pika.adapters.SelectConnection( - parameters=self._connection_params, - on_close_callback=self._on_connection_closed, - on_open_error_callback=self._on_connection_open_error, - on_open_callback=self._on_connection_open, - ) + """Open the consuming connection and start its io loop.""" + # Start with a clean state + self._connection = None + self._channel_in = None + self._channel_out = None - # Loops forever until ioloop.stop is called WHEN self._should_disconnect is True - self._connection.ioloop.start() + # Main connection loop - run until shutdown or all brokers fail + while not self._should_disconnect and not self._unrecoverable: + try: + # Initialize broker index if needed + if self._current_broker_index is None: + self._current_broker_index = 0 + + # Get current broker info + broker = self._cluster_params.brokers[self._current_broker_index] + logger.warning( + f'====== TRYING BROKER {self._current_broker_index}: {broker.host}:{broker.port} ======' + ) + + # Check server availability with TCP connection test first + try: + import socket + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1.0) + + # Try to connect directly with TCP + result = sock.connect_ex((broker.host, broker.port)) + sock.close() + + # If we can't connect at TCP level, try next broker right away + if result != 0: + logger.warning( + f'✗ TCP connectivity test failed for {broker.host}:{broker.port} (error: {result})' + ) + + # Move to next broker if we have multiple + if len(self._cluster_params.brokers) > 1: + self._current_broker_index = (self._current_broker_index + 1) % len( + self._cluster_params.brokers + ) + logger.warning( + f'Switching to next broker (index {self._current_broker_index})' + ) + time.sleep(0.5) # Brief delay to avoid hammering servers + continue + # No other brokers to try, pause longer before retry + logger.warning( + 'No alternative brokers available, retrying same one in a moment...' + ) + time.sleep(2.0) + continue + except Exception as e: # noqa: BLE001 I think I can clean this up later but for now catching it all. + # Handle DNS resolution or other socket-related errors gracefully + logger.warning( + f'✗ Socket connection error for {broker.host}:{broker.port}: {e!s}' + ) + + # If we already have an active connection to another broker, don't disrupt it + if self.is_connected(): + logger.info( + f'Already connected to another broker, will retry {broker.host} later' + ) + time.sleep(5.0) # Longer delay when already connected + continue + + # Otherwise, try the next broker + if len(self._cluster_params.brokers) > 1: + self._current_broker_index = (self._current_broker_index + 1) % len( + self._cluster_params.brokers + ) + logger.warning( + f'Socket error, switching to broker {self._current_broker_index}' + ) + time.sleep(0.5) + continue + logger.warning('Socket error with only broker, waiting to retry...') + time.sleep(2.0) + continue + + # TCP connection successful, now try AMQP connection + logger.info( + f'✓ TCP connection succeeded for {broker.host}:{broker.port}, now trying AMQP connection' + ) + + self._connection = pika.adapters.SelectConnection( + parameters=pika.ConnectionParameters( + host=broker.host, + port=broker.port, + virtual_host='/', + credentials=pika.PlainCredentials( + self._cluster_params.username, self._cluster_params.password + ), + connection_attempts=1, # Only try once + heartbeat=10, + retry_delay=0.5, + ), + on_close_callback=self._on_connection_closed, + on_open_error_callback=self._on_connection_open_error, + on_open_callback=self._on_connection_open, + ) + + # Start IO loop - blocks until connection closes + self._connection.ioloop.start() + + # If we get here, the IO loop has stopped - check why + if self._should_disconnect: + logger.info('Connection loop exited due to shutdown request') + break + + # If not shutting down, we must have had a connection failure + logger.warning('Connection IO loop exited, will retry...') + + # Brief pause before trying again + time.sleep(1.0) + + except Exception as e: # noqa: BLE001 I think I can clean this up later but for now catching it all. + # Log any unexpected errors + logger.error(f'⚠️ Unexpected error in connection attempt: {e!s}') + self._connection = None # Ensure connection is cleared + + # Increment broker index to try next one + if len(self._cluster_params.brokers) > 1: + self._current_broker_index = (self._current_broker_index + 1) % len( + self._cluster_params.brokers + ) + + # Brief pause before retry + time.sleep(1.0) + + def _get_current_connection_params(self) -> pika.ConnectionParameters: + """Get connection parameters for the current broker index.""" + broker = self._cluster_params.brokers[self._current_broker_index] + + # Log current broker info + logger.info(f'Setting up connection parameters for broker: {broker.host}:{broker.port}') + + # IMPORTANT: Force connection_attempts=1 to ensure we don't retry using pika's + # internal retry mechanism, which can get stuck when DNS issues occur + # We handle retries ourselves at the AMQP client level + return pika.ConnectionParameters( + host=broker.host, + port=broker.port, + virtual_host='/', + credentials=pika.PlainCredentials( + self._cluster_params.username, self._cluster_params.password + ), + connection_attempts=1, # CRITICAL: Only try once per broker + heartbeat=10, # Faster heartbeat for quicker failure detection + blocked_connection_timeout=5.0, # Detect blocked connections faster + retry_delay=0.5, # Minimal delay between retries + ) def _on_connection_closed(self, connection: pika.SelectConnection, reason: Exception) -> None: """This method is called if the connection to RabbitMQ closes.""" + logger.warning( + f'Connection closed on broker index {self._current_broker_index}, reason: {reason}' + ) + + # Store the broker that failed for later reference + failed_broker_index = self._current_broker_index + failed_broker = self._cluster_params.brokers[failed_broker_index] + + # Clean up connection state for this specific connection only + # We might still have other connections active, so don't clear everything + if connection == self._connection: + self._channel_flags.unset_all() + self._channel_out = None + self._channel_in = None + self._connection = None + + # Stop the IO loop if requested if self._should_disconnect: connection.ioloop.stop() - else: - logger.warning('Connection closed, reopening in 5 seconds: %s', reason) - connection.ioloop.call_later(5, connection.ioloop.stop) - self._channel_flags.unset_all() - self._channel_out = None - self._channel_in = None + return + + # We need to switch to the next broker if we have multiple + if len(self._cluster_params.brokers) > 1: + # Increment broker index to try the next one + self._current_broker_index = (self._current_broker_index + 1) % len( + self._cluster_params.brokers + ) + next_broker = self._cluster_params.brokers[self._current_broker_index] + logger.info( + f'Connection closed on {failed_broker.host}, SWITCHING to BROKER {next_broker.host}' + ) + + # Try a quick connectivity test to the next broker + import socket + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1.0) + result = sock.connect_ex((next_broker.host, next_broker.port)) + sock.close() + + if result == 0: + logger.info( + f'Successfully verified TCP connectivity to {next_broker.host}:{next_broker.port}' + ) + else: + logger.warning( + f'TCP connection test to {next_broker.host}:{next_broker.port} failed (error: {result})' + ) + + # If this is our second broker and we know the first is down, continue with this one + # Otherwise, switch back if we still have a connection to the original broker + if self.is_connected() and self._current_broker_index != failed_broker_index: + logger.info('Already connected to a working broker, keeping that connection') + self._current_broker_index = failed_broker_index + + # Always stop the IO loop - this is what triggers reconnection + connection.ioloop.stop() def _on_connection_open_error( self, connection: pika.SelectConnection, err: pika.exceptions.AMQPConnectionError ) -> None: - """This gets called if the connection to RabbitMQ can't be established. + """This gets called if the connection to RabbitMQ can't be established.""" + current_broker = self._cluster_params.brokers[self._current_broker_index] + logger.error(f'CONNECTION ERROR to broker {current_broker.host}: {err!s}') - This function usually implies a misconfiguration in the application config. - """ + # Increment retry count self._connection_retries += 1 - logger.error( - f'On connect error received (probable broker config error), have tried {self._connection_retries} times' - ) - logger.error(err) - if self._connection_retries >= _AMQP_MAX_RETRIES: - # This will allow us to break out of the while loop - # where we establish the connection, as ioloop.stop - # will now stop the thread for good - logger.error('Giving up AMQP reconnection attempt') - self._should_disconnect = True - self._unrecoverable = True + + # Print error details to help debugging + connection_params = self._get_current_connection_params() + logger.info(f'Connection parameters: {connection_params}') + + # Always try next node in cluster + if len(self._cluster_params.brokers) > 1: + # Move to the next broker + old_index = self._current_broker_index + self._current_broker_index = (self._current_broker_index + 1) % len( + self._cluster_params.brokers + ) + next_broker = self._cluster_params.brokers[self._current_broker_index] + + # Reset connection retries when switching to a new broker + self._connection_retries = 0 + + # Check if we're already connected before trying to switch + if self.is_connected(): + logger.info( + 'Already connected to a working broker, staying with current connection' + ) + # If we're connected, don't try to switch brokers + return + + logger.warning( + f'SWITCHING from broker {old_index} to {self._current_broker_index} ({next_broker.host})' + ) + + # Try to immediately connect to verify the next broker + import socket + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1.0) + result = sock.connect_ex((next_broker.host, next_broker.port)) + sock.close() + + if result == 0: + logger.info( + f'✓ Successfully verified TCP connectivity to {next_broker.host}:{next_broker.port}' + ) + else: + logger.warning( + f'✗ TCP connection test to {next_broker.host}:{next_broker.port} failed (error: {result})' + ) + elif self._connection_retries >= _AMQP_MAX_RETRIES: + # Give up if we've tried too many times + logger.error(f'Giving up AMQP connection after {self._connection_retries} retries') self._channel_flags.set_all() - connection.ioloop.stop() - else: - logger.error('Reopening in 5 seconds') - connection.ioloop.call_later(5, connection.ioloop.stop) + self._unrecoverable = True + + # Clean up and force reconnection + self._connection = None + connection.ioloop.stop() def _on_connection_open(self, connection: pika.SelectConnection) -> None: - logger.info('AMQP connection open') + """Called when connection to RabbitMQ is established.""" + current_broker = self._cluster_params.brokers[self._current_broker_index] + logger.info(f'AMQP connection open to broker {current_broker.host}:{current_broker.port}') + + # Reset retries and clear state on successful connection self._connection_retries = 0 self._topics_to_consumer_tags.clear() + + # Check connectivity to both brokers for diagnosis + for i, broker in enumerate(self._cluster_params.brokers): + can_connect = _check_broker_connectivity(broker.host, broker.port) + logger.info(f'Broker {i} ({broker.host}:{broker.port}) reachable: {can_connect}') + # Open the channels connection.channel(on_open_callback=self._on_input_channel_open) connection.channel(on_open_callback=self._on_output_channel_open) @@ -322,6 +650,10 @@ def _on_output_channel_open(self, channel: Channel) -> None: callback=lambda _frame: self._channel_flags.set_nth_flag(channel_num), ) logger.info('AMQP: output channel ready') + # Resubscribe to all topics to ensure queues and bindings are re-established + for topic, _ in self._topics_to_handlers().items(): + logger.info(f'Resubscribing to topic: {topic}') + self.subscribe(topic, True) # CONSUMER # def _on_input_channel_open(self, channel: Channel) -> None: @@ -335,6 +667,10 @@ def _on_input_channel_open(self, channel: Channel) -> None: channel.exchange_declare( exchange=_INTERSECT_MESSAGE_EXCHANGE, exchange_type='topic', durable=True, callback=cb_2 ) + # Resubscribe to all topics to ensure queues and bindings are re-established + for topic, _ in self._topics_to_handlers().items(): + logger.info(f'Resubscribing to topic: {topic}') + self.subscribe(topic, True) def _on_exchange_declareok(self, _unused_frame: Frame, channel: Channel) -> None: """Create a queue on the broker (called from AMQP). diff --git a/src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py b/src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py index 2ae9dc3..249b4f8 100644 --- a/src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py +++ b/src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py @@ -1,20 +1,82 @@ from __future__ import annotations +import random import threading +import time import uuid from typing import TYPE_CHECKING, Any, Callable import paho.mqtt.client as paho_client -from retrying import retry from ...logger import logger from .broker_client import BrokerClient if TYPE_CHECKING: + from ....config.shared import BrokerConfig from ..topic_handler import TopicHandler - _MQTT_MAX_RETRIES = 10 +_MQTT_RECONNECT_DELAY = 5 # Seconds to wait before reconnection attempts + + +class ClusterConnectionParameters: + """Configuration for an MQTT cluster. + + Attributes: + brokers: List of broker configurations for MQTT cluster nodes + username: MQTT broker username + password: MQTT broker password + connection_attempts: Number of connection attempts per node + connection_retry_delay: Delay between connection attempts in seconds + """ + + def __init__( + self, + brokers: list[BrokerConfig], + username: str, + password: str, + connection_attempts: int = 3, + connection_retry_delay: float = 2.0, + ) -> None: + """Initialize cluster connection parameters. + + Args: + brokers: List of broker configurations for MQTT cluster nodes + username: MQTT broker username + password: MQTT broker password + connection_attempts: Number of connection attempts per node + connection_retry_delay: Delay between connection attempts in seconds + """ + self.brokers = brokers + self.username = username + self.password = password + self.connection_attempts = connection_attempts + self.connection_retry_delay = connection_retry_delay + + def get_random_node(self) -> BrokerConfig: + """Get a random node from the cluster. + + Returns: + A BrokerConfig randomly selected from the available cluster nodes + """ + index = random.randint(0, len(self.brokers) - 1) # noqa: S311 + return self.brokers[index] + + def get_next_node(self, previous_index: int | None = None) -> tuple[int, BrokerConfig]: + """Get the next node in a round-robin fashion. + + Args: + previous_index: Index of the previously used node + + Returns: + A tuple containing (index, BrokerConfig) of the next node to try + """ + if previous_index is None or previous_index >= len(self.brokers) - 1: + next_index = 0 + else: + next_index = previous_index + 1 + + return next_index, self.brokers[next_index] class MQTTClient(BrokerClient): @@ -24,8 +86,8 @@ class MQTTClient(BrokerClient): Attributes: uid: String defining this client's unique ID in the broker - host: hostname of MQTT broker - port: port of MQTT broker + _cluster_params: Connection information to the MQTT broker cluster + _current_node_index: Index of the current node being used _connection: Paho Client used to interact with the broker _connected: current state of whether or not we're connected to the broker (boolean) _topics_to_handlers: Dictionary of string topic names to lists of @@ -34,32 +96,27 @@ class MQTTClient(BrokerClient): def __init__( self, - host: str, - port: int, - username: str, - password: str, - topics_to_handlers: Callable[[], dict[str, TopicHandler]], + cluster_params: ClusterConnectionParameters, + topics_to_handlers: Callable[[], dict[str, TopicHandler]] | None = None, uid: str | None = None, ) -> None: """The default constructor. Args: - host: String for hostname of MQTT broker - port: port number of MQTT broker - username: username credentials for MQTT broker - password: password credentials for MQTT broker + username: username credentials for MQTT broker (ignored if cluster_params is provided) + password: password credentials for MQTT broker (ignored if cluster_params is provided) topics_to_handlers: callback function which gets the topic to handler map from the channel manager + cluster_params: Optional cluster configuration parameters. If provided, host/port/username/password are ignored. uid: A string representing the unique id to identify the client. """ # Unique id for the MQTT broker to associate this client with self.uid = uid if uid else str(uuid.uuid4()) - self.host = host - self.port = port - # Create a client to connect to RabbitMQ - # TODO clean_session param is ONLY for MQTT v3 here - self._connection = paho_client.Client(client_id=self.uid, clean_session=False) - self._connection.username_pw_set(username=username, password=password) + # If cluster_params is provided, use it. Otherwise, create a single-node configuration. + self._cluster_params: ClusterConnectionParameters = cluster_params + self._current_node_index: int | None = None + self._connection: paho_client.Client | None = None + self._initialize_connection() # Whether the connection is currently active self._connected = False @@ -67,31 +124,144 @@ def __init__( self._unrecoverable = False self._connection_retries = 0 self._connected_flag = threading.Event() + self._cluster_connection_attempts = 0 + self._node_reconnect_thread: threading.Thread | None = None # ConnectionManager callable state self._topics_to_handlers = topics_to_handlers + def _initialize_connection(self) -> None: + """Initialize the MQTT connection with the current broker.""" + if self._connection is not None: + self._connection.disconnect() + self._connection.loop_stop() + + # Select a node to connect to if we don't have one + if self._current_node_index is None: + self._current_node_index, broker = self._cluster_params.get_next_node() + else: + # Use the current node + broker = self._cluster_params.brokers[self._current_node_index] + + logger.info(f'Initializing MQTT connection to {broker.host}:{broker.port}') + + # Create a client to connect to RabbitMQ + # TODO clean_session param is ONLY for MQTT v3 here + self._connection = paho_client.Client(client_id=self.uid, clean_session=False) + self._connection.username_pw_set( + username=self._cluster_params.username, password=self._cluster_params.password + ) + # MQTT callback functions self._connection.on_connect = self._handle_connect self._connection.on_disconnect = self._handle_disconnect self._connection.on_message = self._on_message - @retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000, wait_exponential_max=60000) def connect(self) -> None: """Connect to the defined broker.""" - # Create a client to connect to RabbitMQ - # TODO MQTT v5 implementations should set clean_start to NEVER here + self._should_disconnect = False self._connected_flag.clear() - self._connection.connect(self.host, self.port, 60) + self._try_connect_to_current_node() + + def _try_connect_to_current_node(self) -> None: + """Attempt to connect to the currently selected node.""" + if self._current_node_index is None: + self._current_node_index = self._cluster_params.get_next_node()[0] + host = self._cluster_params.brokers[self._current_node_index].host + port = self._cluster_params.brokers[self._current_node_index].port + + logger.info(f'Attempting to connect to MQTT node: {host}:{port}') + + # Reset connection if needed + if not hasattr(self._connection, 'is_connected'): + self._initialize_connection() + + # Connect and start the loop + self._connection.connect(host, port, 60) self._connection.loop_start() + + # Wait for connection to be established + timeout = 10 # shorter timeout for faster failover + start_time = time.time() while not self.is_connected() and not self._connected_flag.is_set(): + if time.time() - start_time > timeout: + break self._connected_flag.wait(1.0) + if self.is_connected() and not self._connected_flag.is_set(): + logger.error(f'Error connecting to MQTT node {host}:{port}') + self._connection_retries += 1 + + # Try next node if we have multiple nodes + if len(self._cluster_params.brokers) > 1 and not self._should_disconnect: + logger.info( + f'Will try next node in cluster (retries so far: {self._connection_retries})' + ) + self._try_next_node() + elif self._connection_retries >= _MQTT_MAX_RETRIES: + # Only give up after we've tried all nodes multiple times + total_attempts = _MQTT_MAX_RETRIES * len(self._cluster_params.brokers) + if self._connection_retries >= total_attempts: + logger.error( + f'Giving up MQTT reconnection attempt after {self._connection_retries} attempts across all nodes' + ) + self._connected_flag.set() + self._unrecoverable = True + else: + # If we're not yet at total attempts but we're out of nodes, + # start over with the first node + logger.info('Cycling back to first node in cluster') + self._current_node_index = None + self._try_next_node() + + def _try_next_node(self) -> None: + """Attempt to connect to the next node in the cluster.""" + if self._node_reconnect_thread and self._node_reconnect_thread.is_alive(): + logger.info('Already attempting to connect to another node') + return + + # Create a new thread to handle the node switch + self._node_reconnect_thread = threading.Thread( + target=self._switch_to_next_node, daemon=True, name='mqtt_node_reconnect' + ) + self._node_reconnect_thread.start() + + def _switch_to_next_node(self) -> None: + """Switch to the next node in the cluster and attempt to connect.""" + if self._should_disconnect: + return + + # Get the next node + prev_index = self._current_node_index + self._current_node_index, broker = self._cluster_params.get_next_node( + self._current_node_index + ) + + logger.info( + f'Switching from node {prev_index} to node {self._current_node_index} ({broker.host}:{broker.port})' + ) + + # Clean up the old connection + if self._connection: + self._connection.disconnect() + self._connection.loop_stop() + + # Quick failover - don't wait too long + time.sleep(1.0) + + # Initialize a fresh connection with the new node + self._initialize_connection() + + # Try to connect to the new node + logger.info(f'Attempting connection to alternative node: {broker.host}:{broker.port}') + self._try_connect_to_current_node() + def disconnect(self) -> None: """Disconnect from the broker.""" self._should_disconnect = True - self._connection.disconnect() - self._connection.loop_stop() + if self._connection: + self._connection.disconnect() + self._connection.loop_stop() def is_connected(self) -> bool: """Check if there is an active connection to the broker. @@ -116,7 +286,43 @@ def publish(self, topic: str, payload: bytes, persist: bool) -> None: if it should be removed immediately (False) """ # NOTE: RabbitMQ only works with QOS of 1 and 0, and seems to convert QOS2 to QOS1 - self._connection.publish(topic, payload, qos=2 if persist else 0) + if self._connection and self.is_connected(): + try: + logger.debug(f'Publishing message to topic: {topic}') + result = self._connection.publish(topic, payload, qos=2 if persist else 0) + + # Check if the publish was successful (rc=0 means success) + if result.rc != 0: + logger.error(f'Failed to publish message (rc={result.rc}): {result}') + + # If we fail to publish due to connection issues, try reconnecting + if not self._should_disconnect: + self._connected = False + self._try_next_node() + # Try publishing again after reconnection + if self.is_connected(): + logger.info('Retrying publish after reconnection') + self._connection.publish(topic, payload, qos=2 if persist else 0) + except Exception as e: # noqa: BLE001 + logger.error(f'Error publishing message to topic {topic}: {e}') + # If we encounter an exception, the connection might be bad + if not self._should_disconnect: + self._connected = False + self._try_next_node() + else: + logger.error('Cannot publish message: not connected to MQTT broker') + + # If we're not connected but should be, try reconnecting + if not self._should_disconnect: + logger.info('Attempting reconnection before publishing') + if self._current_node_index is None: + self._current_node_index = 0 + self._try_connect_to_current_node() + + # Try again after reconnection + if self.is_connected(): + logger.info('Publishing after reconnection') + self._connection.publish(topic, payload, qos=2 if persist else 0) def subscribe(self, topic: str, persist: bool) -> None: """Subscribe to a topic over the pre-existing connection (via connect()). @@ -126,7 +332,16 @@ def subscribe(self, topic: str, persist: bool) -> None: persist: Determine if the associated message queue of the topic is long-lived (True) or not (False) """ # NOTE: RabbitMQ only works with QOS of 1 and 0, and seems to convert QOS2 to QOS1 - self._connection.subscribe(topic, qos=2 if persist else 0) + if self._connection and self.is_connected(): + try: + logger.info(f'Subscribing to topic: {topic}') + self._connection.subscribe(topic, qos=2 if persist else 0) + except Exception as e: # noqa: BLE001 + logger.error(f'Error subscribing to topic {topic}: {e}') + # If we fail to subscribe, mark the connection as bad and try to reconnect + if not self._should_disconnect: + self._connected = False + self._try_next_node() def unsubscribe(self, topic: str) -> None: """Unsubscribe from a topic over the pre-existing connection. @@ -134,7 +349,8 @@ def unsubscribe(self, topic: str) -> None: Args: topic: Topic to unsubscribe from. """ - self._connection.unsubscribe(topic) + if self._connection and self.is_connected(): + self._connection.unsubscribe(topic) def _on_message( self, _client: paho_client.Client, _userdata: Any, message: paho_client.MQTTMessage @@ -151,7 +367,7 @@ def _on_message( for cb in topic_handler.callbacks: cb(message.payload) - def _handle_disconnect(self, client: paho_client.Client, _userdata: Any, _rc: int) -> None: + def _handle_disconnect(self, client: paho_client.Client, _userdata: Any, rc: int) -> None: """Handle a disconnection from the MQTT server. This callback usually implies a temporary connection fault, so we'll try to handle it. @@ -161,12 +377,39 @@ def _handle_disconnect(self, client: paho_client.Client, _userdata: Any, _rc: in _userdata: MQTT user data. rc: MQTT return code as an integer. """ + # Mark as disconnected self._connected = False + logger.warning(f'MQTT disconnected with code {rc}') + + # If this was an unexpected disconnect, try to reconnect if not self._should_disconnect: - client.reconnect() + logger.info('Attempting to reconnect to MQTT broker') + + # If we have multiple nodes in the cluster, try the next one + if len(self._cluster_params.brokers) > 1: + # Wait a moment to prevent rapid reconnection attempts + time.sleep(1.0) + self._try_next_node() + # Return here since _try_next_node will handle the reconnection + return + # With a single node, just try to reconnect to the same node + try: + # Don't immediately reconnect - wait a bit to avoid rapid reconnects + time.sleep(1.0) + logger.info('Attempting to reconnect to the same MQTT node') + client.reconnect() + except Exception as e: # noqa: BLE001 + logger.error(f'Failed to reconnect to MQTT broker: {e!s}') + self._connection_retries += 1 + + # If we've tried too many times, give up + if self._connection_retries >= _MQTT_MAX_RETRIES: + logger.error('Giving up MQTT reconnection attempt') + self._connected_flag.set() + self._unrecoverable = True def _handle_connect( - self, _client: paho_client.Client, userdata: Any, flags: dict[str, Any], rc: int + self, client: paho_client.Client, userdata: Any, flags: dict[str, Any], rc: int ) -> None: """Set the connection status in response to the result of a Paho connection attempt. @@ -181,22 +424,56 @@ def _handle_connect( """ # Return code 0 means connection was successful if rc == 0: + host = self._cluster_params.brokers[self._current_node_index].host + port = self._cluster_params.brokers[self._current_node_index].port + logger.info(f'Successfully connected to MQTT node {host}:{port}') + self._connected = True self._connection_retries = 0 self._should_disconnect = False self._connected_flag.set() - for topic, topic_handler in self._topics_to_handlers().items(): - self.subscribe(topic, topic_handler.topic_persist) + + # Make sure the client is valid and loop is running + if not hasattr(client, 'is_connected') or client != self._connection: + logger.warning('Client mismatch in connect callback - recreating connection') + self._initialize_connection() + client = self._connection + client.connect(host, port, 60) + client.loop_start() + + # Delay slightly before subscribing to ensure connection is fully established + time.sleep(0.5) + + # Resubscribe to all topics + logger.info('Resubscribing to topics after connection established') + topics = self._topics_to_handlers() + if topics: + logger.info(f'Resubscribing to {len(topics)} topics') + for topic, topic_handler in topics.items(): + logger.info(f'Resubscribing to topic: {topic}') + client.subscribe(topic, qos=2 if topic_handler.topic_persist else 0) + else: + logger.warning('No topics to resubscribe to!') else: # This will generally suggest a misconfiguration self._connected = False self._connection_retries += 1 + + host = self._cluster_params.brokers[self._current_node_index].host + port = self._cluster_params.brokers[self._current_node_index].port + logger.error( - f'On connect error received (probable broker config error), have tried {self._connection_retries} times' + f'MQTT connect error (rc={rc}) to {host}:{port}, attempt {self._connection_retries}' ) logger.error(f'Connection error userdata: {userdata}') logger.error(f'Connection error flags: {flags}') - if self._connection_retries >= _MQTT_MAX_RETRIES: - logger.error('Giving up MQTT reconnection attempt') + + # If we've tried all nodes multiple times, give up + cluster_max_retries = _MQTT_MAX_RETRIES * len(self._cluster_params.brokers) + if self._connection_retries >= cluster_max_retries: + logger.error('Giving up MQTT reconnection attempt after trying all nodes') self._connected_flag.set() self._unrecoverable = True + elif len(self._cluster_params.brokers) > 1: + # Try the next node + self._try_next_node() diff --git a/src/intersect_sdk/_internal/control_plane/control_plane_manager.py b/src/intersect_sdk/_internal/control_plane/control_plane_manager.py index e7a3628..de9987f 100644 --- a/src/intersect_sdk/_internal/control_plane/control_plane_manager.py +++ b/src/intersect_sdk/_internal/control_plane/control_plane_manager.py @@ -32,23 +32,35 @@ def create_control_provider( # only try to import the AMQP client if the user is using an AMQP broker try: from .brokers.amqp_client import AMQPClient + from .brokers.amqp_client import ClusterConnectionParameters as AMQPClusterParams + + # Create AMQPClusterParameters + cluster_params = AMQPClusterParams( + brokers=config.brokers, username=config.username, password=config.password + ) return AMQPClient( - host=config.host, - port=config.port or 5672, - username=config.username, - password=config.password, + # The AMQPClient constructor expects both regular parameters and cluster_params topics_to_handlers=topic_handler_callback, + cluster_params=cluster_params, ) except ImportError as e: msg = "Configuration includes AMQP broker, but AMQP dependencies were not installed. Install intersect with the 'amqp' optional dependency to use this backend. (i.e. `pip install intersect_sdk[amqp]`)" raise IntersectInvalidBrokerError(msg) from e # MQTT - return MQTTClient( - host=config.host, - port=config.port or 1883, + from .brokers.mqtt_client import ClusterConnectionParameters + + # Extract hosts and ports from broker config objects + + # Create ClusterConnectionParameters + cluster_params = ClusterConnectionParameters( + brokers=config.brokers, username=config.username, password=config.password, + ) + + return MQTTClient( + cluster_params=cluster_params, topics_to_handlers=topic_handler_callback, ) diff --git a/src/intersect_sdk/config/__init__.py b/src/intersect_sdk/config/__init__.py index faf5d00..394de01 100644 --- a/src/intersect_sdk/config/__init__.py +++ b/src/intersect_sdk/config/__init__.py @@ -12,6 +12,7 @@ from .service import IntersectServiceConfig from .shared import ( HIERARCHY_REGEX, + BrokerConfig, ControlPlaneConfig, ControlProvider, DataStoreConfig, diff --git a/src/intersect_sdk/config/shared.py b/src/intersect_sdk/config/shared.py index a23b348..9de6743 100644 --- a/src/intersect_sdk/config/shared.py +++ b/src/intersect_sdk/config/shared.py @@ -1,7 +1,9 @@ """Configuration types shared across both Clients and Services.""" +from __future__ import annotations + from dataclasses import dataclass, field -from typing import List, Literal, Optional, Set +from typing import Literal from pydantic import BaseModel, ConfigDict, Field, PositiveInt from typing_extensions import Annotated @@ -35,7 +37,7 @@ class HierarchyConfig(BaseModel): The name of this application - should be unique within an INTERSECT system """ - subsystem: Optional[str] = Field(default=None, pattern=HIERARCHY_REGEX) # noqa: FA100 (Pydantic uses runtime annotations) + subsystem: str | None = Field(default=None, pattern=HIERARCHY_REGEX) """ An associated subsystem / service-grouping of the system (should be unique within an INTERSECT system) """ @@ -80,9 +82,35 @@ def hierarchy_string(self, join_str: str = '') -> str: model_config = ConfigDict(regex_engine='python-re') +@dataclass +class BrokerConfig: + """Configuration for a single broker instance.""" + + host: Annotated[str, Field(min_length=1)] + """Broker hostname.""" + + port: PositiveInt + """ + Broker port. List of common ports: + + - 1883 (MQTT) + - 4222 (NATS default port) + - 5222 (XMPP) + - 5223 (XMPP over TLS) + - 5671 (AMQP over TLS) + - 5672 (AMQP) + - 7400 (DDS Discovery) + - 7401 (DDS User traffic) + - 8883 (MQTT over TLS) + - 61613 (RabbitMQ STOMP - WARNING: ephemeral port) + + NOTE: INTERSECT currently only supports AMQP and MQTT. + """ + + @dataclass class ControlPlaneConfig: - """Configuration for interacting with a broker.""" + """Configuration for interacting with a broker or broker cluster.""" protocol: ControlProvider """ @@ -100,27 +128,10 @@ class ControlPlaneConfig: Password credentials for broker connection. """ - host: Annotated[str, Field(min_length=1)] = '127.0.0.1' - """ - Broker hostname (default: 127.0.0.1) - """ - - port: Optional[PositiveInt] = None # noqa: FA100 (Pydantic uses runtime annotations) + brokers: list[BrokerConfig] """ - Broker port. List of common ports: - - - 1883 (MQTT) - - 4222 (NATS default port) - - 5222 (XMPP) - - 5223 (XMPP over TLS) - - 5671 (AMQP over TLS) - - 5672 (AMQP) - - 7400 (DDS Discovery) - - 7401 (DDS User traffic) - - 8883 (MQTT over TLS) - - 61613 (RabbitMQ STOMP - WARNING: ephemeral port) - - NOTE: INTERSECT currently only supports AMQP and MQTT. + List of broker configurations to connect to. The client will attempt to connect + to these brokers sequentially in the order provided. """ @@ -143,7 +154,7 @@ class DataStoreConfig: Data store hostname (default: 127.0.0.1) """ - port: Optional[PositiveInt] = None # noqa: FA100 (Pydantic uses runtime annotations) + port: PositiveInt | None = None """ Data store port """ @@ -153,12 +164,12 @@ class DataStoreConfig: class DataStoreConfigMap: """Configurations for any data stores the application should talk to.""" - minio: List[DataStoreConfig] = field(default_factory=list) # noqa: FA100 (Pydantic uses runtime annotations) + minio: list[DataStoreConfig] = field(default_factory=list) """ minio configurations """ - def get_missing_data_store_types(self) -> Set[IntersectDataHandler]: # noqa: FA100 (not technically a runtime annotation) + def get_missing_data_store_types(self) -> set[IntersectDataHandler]: """Return a set of IntersectDataHandlers which will not be permitted, due to a configuration type missing. If all data configurations exist, returns an empty set diff --git a/tests/integration/test_return_type_mismatch.py b/tests/integration/test_return_type_mismatch.py index 2ef15e5..18089fe 100644 --- a/tests/integration/test_return_type_mismatch.py +++ b/tests/integration/test_return_type_mismatch.py @@ -28,6 +28,7 @@ create_userspace_message, deserialize_and_validate_userspace_message, ) +from intersect_sdk.config.shared import BrokerConfig from tests.fixtures.example_schema import FAKE_HIERARCHY_CONFIG # FIXTURE ############################# @@ -63,8 +64,8 @@ def make_intersect_service() -> IntersectService: ControlPlaneConfig( username='intersect_username', password='intersect_password', - port=1883, protocol='mqtt3.1.1', + brokers=[BrokerConfig(host='localhost', port=1883)], ), ], status_interval=30.0, @@ -78,8 +79,8 @@ def make_message_interceptor() -> ControlPlaneManager: ControlPlaneConfig( username='intersect_username', password='intersect_password', - port=1883, protocol='mqtt3.1.1', + brokers=[BrokerConfig(host='localhost', port=1883)], ) ], ) diff --git a/tests/integration/test_service.py b/tests/integration/test_service.py index 190ed70..6e0a039 100644 --- a/tests/integration/test_service.py +++ b/tests/integration/test_service.py @@ -31,6 +31,7 @@ create_userspace_message, deserialize_and_validate_userspace_message, ) +from intersect_sdk.config.shared import BrokerConfig from tests.fixtures.example_schema import FAKE_HIERARCHY_CONFIG, DummyCapabilityImplementation # HELPERS ############################# @@ -54,8 +55,8 @@ def make_intersect_service() -> IntersectService: ControlPlaneConfig( username='intersect_username', password='intersect_password', - port=1883, protocol='mqtt3.1.1', + brokers=[BrokerConfig(host='localhost', port=1883)], ), ], status_interval=30.0, @@ -69,8 +70,8 @@ def make_message_interceptor() -> ControlPlaneManager: ControlPlaneConfig( username='intersect_username', password='intersect_password', - port=1883, protocol='mqtt3.1.1', + brokers=[BrokerConfig(host='localhost', port=1883)], ) ], ) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 0990473..c4a2061 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -10,6 +10,7 @@ IntersectClientConfig, IntersectServiceConfig, ) +from intersect_sdk.config.shared import BrokerConfig # TESTS ##################### @@ -54,29 +55,27 @@ def test_missing_control_plane_config(): with pytest.raises(ValidationError) as ex: TypeAdapter(ControlPlaneConfig).validate_python({}) errors = [{'type': e['type'], 'loc': e['loc']} for e in ex.value.errors()] - assert len(errors) == 3 + assert len(errors) == 4 assert {'type': 'missing', 'loc': ('username',)} in errors assert {'type': 'missing', 'loc': ('password',)} in errors assert {'type': 'missing', 'loc': ('protocol',)} in errors + assert {'type': 'missing', 'loc': ('brokers',)} in errors def test_invalid_control_plane_config(): with pytest.raises(ValidationError) as ex: TypeAdapter(ControlPlaneConfig).validate_python( ControlPlaneConfig( - host='', + brokers=[BrokerConfig(host='', port=0)], username='', password='', - port=0, protocol='mqtt', ).__dict__ ) errors = [{'type': e['type'], 'loc': e['loc']} for e in ex.value.errors()] - assert len(errors) == 5 + assert len(errors) == 3 assert {'type': 'string_too_short', 'loc': ('username',)} in errors assert {'type': 'string_too_short', 'loc': ('password',)} in errors - assert {'type': 'string_too_short', 'loc': ('host',)} in errors - assert {'type': 'greater_than', 'loc': ('port',)} in errors assert {'type': 'literal_error', 'loc': ('protocol',)} in errors @@ -165,15 +164,13 @@ def test_valid_service_config(): ControlPlaneConfig( username='user', password='secret', - host='http://hardknock.life', - port='1883', + brokers=[BrokerConfig(host='http://hardknock.life', port=1883)], protocol='mqtt3.1.1', ), ControlPlaneConfig( username='fine', password='fine', - host='www.nowhere.gov', - port='5672', + brokers=[BrokerConfig(host='www.nowhere.gov', port=5672)], protocol='amqp0.9.1', ), ], @@ -190,5 +187,5 @@ def test_valid_service_config(): assert config.hierarchy.subsystem is None assert config.hierarchy.hierarchy_string('/') == 'org/this-works/ello-14/-/serv' # make sure string values can be coerced into integers when specified - assert all(isinstance(b.port, int) for b in config.brokers) + assert all(isinstance(b.brokers[0].port, int) for b in config.brokers) assert all(isinstance(d.port, int) for d in config.data_stores.minio) diff --git a/tests/unit/test_invalid_schema_runtime.py b/tests/unit/test_invalid_schema_runtime.py index 4d80db0..0c8f7d4 100644 --- a/tests/unit/test_invalid_schema_runtime.py +++ b/tests/unit/test_invalid_schema_runtime.py @@ -15,6 +15,7 @@ IntersectServiceConfig, intersect_message, ) +from intersect_sdk.config.shared import BrokerConfig from ..fixtures.example_schema import FAKE_HIERARCHY_CONFIG @@ -35,8 +36,8 @@ def test_minio_not_allowed_without_config(caplog: pytest.LogCaptureFixture): ControlPlaneConfig( username='intersect_username', password='intersect_password', - port=1883, protocol='mqtt3.1.1', + brokers=[BrokerConfig(host='localhost', port=1883)], ), ], )