11
22using System ;
33using System . Collections . Generic ;
4- using System . Globalization ;
54using System . IO ;
6- using System . IO . Compression ;
75using System . Text ;
8- using System . Threading . Tasks ;
96using Halibut . Queue . MessageStreamWrapping ;
107using Halibut . Transport . Protocol ;
118using Halibut . Util ;
@@ -37,38 +34,49 @@ public QueueMessageSerializer(Func<StreamCapturingJsonSerializer> createStreamCa
3734
3835 using var ms = new MemoryStream ( ) ;
3936 Stream stream = ms ;
40- using var disposables = new DisposableCollection ( ) ;
41- foreach ( var streamer in messageStreamWrappers . Wrappers )
37+ using ( var wrappedStreamDisposables = new DisposableCollection ( ) )
4238 {
43- stream = streamer . WrapMessageSerialisationStream ( stream ) ;
44- disposables . Add ( stream ) ;
45- }
46- using ( var sw = new StreamWriter ( stream , Encoding . UTF8
39+ stream = WrapInMessageSerialisationStreams ( messageStreamWrappers , stream , wrappedStreamDisposables ) ;
40+
41+ using ( var sw = new StreamWriter ( stream , Encoding . UTF8
4742#if NET8_0_OR_GREATER
48- , leaveOpen : true
43+ , leaveOpen : true
4944#endif
50- ) ) {
51- using ( var jsonTextWriter = new JsonTextWriter ( sw ) { CloseOutput = false } )
45+ ) )
5246 {
53- var streamCapturingSerializer = createStreamCapturingSerializer ( ) ;
54- streamCapturingSerializer . Serializer . Serialize ( jsonTextWriter , new MessageEnvelope < T > ( message ) ) ;
55- dataStreams = streamCapturingSerializer . DataStreams ;
47+ using ( var jsonTextWriter = new JsonTextWriter ( sw ) { CloseOutput = false } )
48+ {
49+ var streamCapturingSerializer = createStreamCapturingSerializer ( ) ;
50+ streamCapturingSerializer . Serializer . Serialize ( jsonTextWriter , new MessageEnvelope < T > ( message ) ) ;
51+ dataStreams = streamCapturingSerializer . DataStreams ;
52+ }
5653 }
5754 }
5855
5956 return ( ms . ToArray ( ) , dataStreams ) ;
6057 }
6158
59+ public static Stream WrapInMessageSerialisationStreams ( MessageStreamWrappers messageStreamWrappers , Stream stream , DisposableCollection disposables )
60+ {
61+ foreach ( var streamer in messageStreamWrappers . Wrappers )
62+ {
63+ var wrappedStream = streamer . WrapMessageSerialisationStream ( stream ) ;
64+ if ( ! ReferenceEquals ( wrappedStream , stream ) )
65+ {
66+ stream = wrappedStream ;
67+ disposables . Add ( stream ) ;
68+ }
69+ }
70+
71+ return stream ;
72+ }
73+
6274 public ( T Message , IReadOnlyList < DataStream > DataStreams ) ReadMessage < T > ( byte [ ] json )
6375 {
6476 using var ms = new MemoryStream ( json ) ;
6577 Stream stream = ms ;
6678 using var disposables = new DisposableCollection ( ) ;
67- foreach ( var streamer in messageStreamWrappers . Wrappers )
68- {
69- stream = streamer . WrapMessageDeserialisationStream ( stream ) ;
70- disposables . Add ( stream ) ;
71- }
79+ stream = WrapStreamInMessageDeserialisationStreams ( messageStreamWrappers , stream , disposables ) ;
7280 using var sr = new StreamReader ( stream , Encoding . UTF8
7381#if NET8_0_OR_GREATER
7482 , leaveOpen : true
@@ -85,7 +93,22 @@ public QueueMessageSerializer(Func<StreamCapturingJsonSerializer> createStreamCa
8593
8694 return ( result . Message , streamCapturingSerializer . DataStreams ) ;
8795 }
88-
96+
97+ public static Stream WrapStreamInMessageDeserialisationStreams ( MessageStreamWrappers messageStreamWrappers , Stream stream , DisposableCollection disposables )
98+ {
99+ foreach ( var streamer in messageStreamWrappers . Wrappers )
100+ {
101+ var wrappedStream = streamer . WrapMessageDeserialisationStream ( stream ) ;
102+ if ( ! ReferenceEquals ( wrappedStream , stream ) )
103+ {
104+ stream = wrappedStream ;
105+ disposables . Add ( stream ) ;
106+ }
107+ }
108+
109+ return stream ;
110+ }
111+
89112 // By making this a generic type, each message specifies the exact type it sends/expects
90113 // And it is impossible to deserialize the wrong type - any mismatched type will refuse to deserialize
91114 class MessageEnvelope < T >
0 commit comments