@@ -17,6 +17,7 @@ public class WampChannelReconnector : IDisposable
17
17
private IDisposable mDisposable = Disposable . Empty ;
18
18
private bool mStarted = false ;
19
19
private readonly object mLock = new object ( ) ;
20
+ private IDisposable mConnectionBrokenDisposable ;
20
21
21
22
/// <summary>
22
23
/// Initializes a new instance of <see cref="WampChannelReconnector"/>.
@@ -29,20 +30,22 @@ public WampChannelReconnector(IWampChannel channel, Func<Task> connector)
29
30
30
31
var connectionBrokenObservable =
31
32
Observable . FromEventPattern < WampSessionCloseEventArgs >
32
- ( x => monitor . ConnectionBroken += x ,
33
- x => monitor . ConnectionBroken -= x )
34
- . Select ( x => Unit . Default ) ;
33
+ ( x => monitor . ConnectionBroken += x ,
34
+ x => monitor . ConnectionBroken -= x )
35
+ . Select ( x => Unit . Default )
36
+ . Replay ( 1 ) ;
35
37
36
38
var onceAndConnectionBroken =
37
- Observable . Return ( Unit . Default ) . Concat
38
- ( connectionBrokenObservable ) ;
39
+ connectionBrokenObservable . StartWith ( Unit . Default ) ;
39
40
40
41
IObservable < IObservable < Unit > > reconnect =
41
42
from connectionBroke in onceAndConnectionBroken
42
43
let tryReconnect = Observable . FromAsync ( connector )
43
44
. Catch < Unit , Exception > ( x => Observable . Empty < Unit > ( ) )
44
45
select tryReconnect ;
45
46
47
+ mConnectionBrokenDisposable = connectionBrokenObservable . Connect ( ) ;
48
+
46
49
mMerged = reconnect . Concat ( ) ;
47
50
}
48
51
@@ -78,6 +81,7 @@ public void Dispose()
78
81
{
79
82
mMerged = null ;
80
83
mDisposable . Dispose ( ) ;
84
+ mConnectionBrokenDisposable . Dispose ( ) ;
81
85
}
82
86
}
83
87
}
0 commit comments