Skip to content

Commit 94aac78

Browse files
committed
Added option for coordinator to participate in UserCoordinationMessages (via inlet).
1 parent d353b43 commit 94aac78

5 files changed

Lines changed: 54 additions & 10 deletions

File tree

packages/liblsl_coordinator/example/multi_node_test.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ Future<void> _runNode({
9191
discoveryInterval: Duration(seconds: 5),
9292
nodeTimeout: Duration(seconds: 10),
9393
maxNodes: maxNodes, // This may cause some nodes to be rejected
94+
consumeCoordinationStreamAsCoordinator: false,
9495
);
9596

9697
// Create coordination configuration

packages/liblsl_coordinator/lib/src/network/coordination_session.dart

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,24 @@ class CoordinationSessionConfig implements IConfig {
3434
/// This should be at least twice the [heartbeatInterval].
3535
final Duration nodeTimeout;
3636

37+
/// Determines whether on becoming coordinator, the node should
38+
/// also consume the coordination stream - specifically, this is used for
39+
/// a coordinator to be able to consume UserMessages (UserCoordinationMessage)
40+
/// within the coordination session, this allows for a session coordinator
41+
/// to also act as a participant in the session, and have similar latency
42+
/// characteristics as other nodes.
43+
/// Default is true.
44+
45+
final bool consumeCoordinationStreamAsCoordinator;
46+
3747
CoordinationSessionConfig({
3848
required this.name,
3949
this.maxNodes = 10,
4050
this.minNodes = 1,
4151
this.heartbeatInterval = const Duration(seconds: 5),
4252
this.discoveryInterval = const Duration(seconds: 10),
4353
this.nodeTimeout = const Duration(seconds: 15),
54+
this.consumeCoordinationStreamAsCoordinator = true,
4455
}) {
4556
validate(throwOnError: true);
4657
}
@@ -100,14 +111,18 @@ class CoordinationSessionConfig implements IConfig {
100111
'heartbeatInterval': heartbeatInterval.inMilliseconds,
101112
'discoveryInterval': discoveryInterval.inMilliseconds,
102113
'nodeTimeout': nodeTimeout.inMilliseconds,
114+
'consumeCoordinationStreamAsCoordinator':
115+
consumeCoordinationStreamAsCoordinator,
103116
};
104117
}
105118

106119
@override
107120
String toString() {
108121
return 'NetworkSessionConfig(name: $name, maxNodes: $maxNodes, '
109122
'minNodes: $minNodes, heartbeatInterval: $heartbeatInterval, '
110-
'discoveryInterval: $discoveryInterval, nodeTimeout: $nodeTimeout)';
123+
'discoveryInterval: $discoveryInterval, nodeTimeout: $nodeTimeout, '
124+
'consumeCoordinationStreamAsCoordinator: '
125+
'$consumeCoordinationStreamAsCoordinator)';
111126
}
112127

113128
@override
@@ -118,6 +133,7 @@ class CoordinationSessionConfig implements IConfig {
118133
Duration? heartbeatInterval,
119134
Duration? discoveryInterval,
120135
Duration? nodeTimeout,
136+
bool? consumeCoordinationStreamAsCoordinator,
121137
}) {
122138
return CoordinationSessionConfig(
123139
name: name ?? this.name,
@@ -126,6 +142,9 @@ class CoordinationSessionConfig implements IConfig {
126142
heartbeatInterval: heartbeatInterval ?? this.heartbeatInterval,
127143
discoveryInterval: discoveryInterval ?? this.discoveryInterval,
128144
nodeTimeout: nodeTimeout ?? this.nodeTimeout,
145+
consumeCoordinationStreamAsCoordinator:
146+
consumeCoordinationStreamAsCoordinator ??
147+
this.consumeCoordinationStreamAsCoordinator,
129148
);
130149
}
131150

@@ -137,6 +156,7 @@ class CoordinationSessionConfig implements IConfig {
137156
heartbeatInterval: const Duration(seconds: 5),
138157
discoveryInterval: const Duration(seconds: 10),
139158
nodeTimeout: const Duration(seconds: 15),
159+
consumeCoordinationStreamAsCoordinator: true,
140160
);
141161
}
142162

@@ -151,7 +171,9 @@ class CoordinationSessionConfig implements IConfig {
151171
other.minNodes == minNodes &&
152172
other.heartbeatInterval == heartbeatInterval &&
153173
other.discoveryInterval == discoveryInterval &&
154-
other.nodeTimeout == nodeTimeout;
174+
other.nodeTimeout == nodeTimeout &&
175+
other.consumeCoordinationStreamAsCoordinator ==
176+
consumeCoordinationStreamAsCoordinator;
155177
}
156178

157179
@override
@@ -162,7 +184,8 @@ class CoordinationSessionConfig implements IConfig {
162184
minNodes.hashCode ^
163185
heartbeatInterval.hashCode ^
164186
discoveryInterval.hashCode ^
165-
nodeTimeout.hashCode;
187+
nodeTimeout.hashCode ^
188+
consumeCoordinationStreamAsCoordinator.hashCode;
166189
}
167190
}
168191

packages/liblsl_coordinator/lib/transports/lsl/lsl_coordination_controller.dart

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,14 @@ class CoordinationController {
233233
_state.transitionTo(CoordinationPhase.accepting);
234234

235235
logger.fine('Coordinator ready, accepting nodes');
236+
if (coordinationConfig
237+
.sessionConfig
238+
.consumeCoordinationStreamAsCoordinator) {
239+
// Connect to own coordinator stream as participant
240+
await _connectToCoordinator(thisNode.uId);
241+
} else {
242+
logger.info('Not consuming own coordinator stream as per configuration');
243+
}
236244
}
237245

238246
/// Become a participant
@@ -304,11 +312,23 @@ class CoordinationController {
304312
logger.finer(
305313
'[CONTROLLER-${thisNode.uId}] Found coordinator stream, adding inlet...',
306314
);
307-
// parse streaminfo
308-
final info = LSLStreamInfoHelper.parseSourceId(streamInfos.first.sourceId);
309-
final nodeUId = info[LSLStreamInfoHelper.nodeUIdKey]!;
310-
// set coordinator UId in state
311-
_state.becomeParticipant(nodeUId);
315+
if (thisNode.uId != coordinatorUId) {
316+
// parse streaminfo
317+
final info = LSLStreamInfoHelper.parseSourceId(
318+
streamInfos.first.sourceId,
319+
);
320+
final nodeUId = info[LSLStreamInfoHelper.nodeUIdKey]!;
321+
logger.fine(
322+
'[CONTROLLER-${thisNode.uId}] Connecting to coordinator stream of node $nodeUId as participant',
323+
);
324+
// set coordinator UId in state
325+
_state.becomeParticipant(nodeUId);
326+
} else {
327+
logger.fine(
328+
'[CONTROLLER-${thisNode.uId}] Connected to own coordinator stream (self-coordination)',
329+
);
330+
}
331+
312332
await _coordinationStream.addInlet(streamInfos.first);
313333
logger.info(
314334
'[CONTROLLER-${thisNode.uId}] Connected to coordinator stream successfully',

packages/liblsl_coordinator/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: liblsl_coordinator
22
description: A performance-focused Dart (and Flutter) LSL-based device coordination library.
3-
version: 0.0.5+0
3+
version: 0.0.6+0
44
homepage: https://zeyus.com/
55
repository: https://github.com/NexusDynamic/liblsl.dart
66
issue_tracker: https://github.com/NexusDynamic/liblsl.dart/issues

packages/liblsl_test/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ resolution: workspace
2222
# https://developer.apple.com/library/archive/documentation/General/Reference/InfoPlistKeyReference/Articles/CoreFoundationKeys.html
2323
# In Windows, build-name is used as the major, minor, and patch parts
2424
# of the product and file versions while build-number is used as the build suffix.
25-
version: 1.1.0+1
25+
version: 1.1.1+1
2626

2727
environment:
2828
sdk: ^3.9.0

0 commit comments

Comments
 (0)