Skip to content

Commit f917ab1

Browse files
committed
use objc_sync_enter directly for locking, more tests around concat/combinelatest
1 parent 9819ceb commit f917ab1

File tree

5 files changed

+338
-84
lines changed

5 files changed

+338
-84
lines changed

Sources/MiniRxSwift/MiniRxSwift.swift

Lines changed: 116 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -287,14 +287,15 @@ public extension ObservableType {
287287
}
288288

289289
/** Creates an observable which emits items from `source1` and `source2` paired together, using `resultSelector` to process each pair.
290-
Stops after the first sequence stops
291290
# Reference
292291
[CombineLatest](http://reactivex.io/documentation/operators/combinelatest.html) */
293292
static func combineLatest<O1: ObservableType, O2: ObservableType, R>(
294293
_ source1: O1,
295294
_ source2: O2,
296295
resultSelector: @escaping (O1.Element, O2.Element) -> R) -> Observable<R> {
297296
return Observable<R>.create { observer in
297+
var numberOfDone = 0
298+
298299
let group = CompositeDisposable()
299300

300301
var lastA: O1.Element? = nil
@@ -311,7 +312,8 @@ public extension ObservableType {
311312
observer.onError(err)
312313
}, onCompleted: {
313314
if let key = d1 { group.remove(for: key) }
314-
if group.count == 0 {
315+
numberOfDone += 1
316+
if numberOfDone == 2 {
315317
observer.onCompleted()
316318
}
317319
}))
@@ -324,7 +326,8 @@ public extension ObservableType {
324326
observer.onError(err)
325327
}, onCompleted: {
326328
if let key = d2 { group.remove(for: key) }
327-
if group.count == 0 {
329+
numberOfDone += 1
330+
if numberOfDone == 2 {
328331
observer.onCompleted()
329332
}
330333
}))
@@ -341,7 +344,7 @@ public struct Disposables {
341344
func dispose() { }
342345
}
343346

344-
private class AnyDisposable : Disposable, Lockable {
347+
private class AnyDisposable : Disposable {
345348
private var _disposeAction: (() -> Void)?
346349

347350
init(disposeAction: @escaping () -> Void) {
@@ -350,12 +353,13 @@ public struct Disposables {
350353

351354
func dispose() {
352355
var action: (() -> Void)? = nil
353-
synchronized {
354-
if let d = _disposeAction {
355-
action = d
356-
_disposeAction = nil
357-
}
356+
objc_sync_enter(self)
357+
if let d = _disposeAction {
358+
action = d
359+
_disposeAction = nil
358360
}
361+
objc_sync_exit(self)
362+
359363
action?()
360364
}
361365
}
@@ -445,7 +449,7 @@ fileprivate enum SubjectState {
445449

446450
/** Represents an Event Source that you can use to publish values:
447451
http://www.introtorx.com/content/v1.0.10621.0/02_KeyTypes.html#Subject */
448-
public class PublishSubject<T> : Observable<T>, ObserverType, Lockable {
452+
public class PublishSubject<T> : Observable<T>, ObserverType {
449453
private var _subscribers = Bag<AnyObserver<T>>()
450454

451455
// note PublishSubject remembers if it is stopped/failed
@@ -456,49 +460,55 @@ public class PublishSubject<T> : Observable<T>, ObserverType, Lockable {
456460
}
457461

458462
public var hasObservers: Bool {
459-
synchronized { _subscribers.count > 0}
463+
objc_sync_enter(self); defer { objc_sync_exit(self) }
464+
return _subscribers.count > 0
460465
}
461466

462467
public override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.Element == T {
463-
let currentState = synchronized { self.state }
468+
objc_sync_enter(self)
469+
let currentState = self.state
470+
objc_sync_exit(self)
471+
464472
guard case SubjectState.running = currentState else { return Disposables.create() }
465473

466474
let wrapper = AnyObserver.from(observer)
467-
let removeKey = synchronized {
468-
_subscribers.insert(wrapper)
469-
}
475+
476+
objc_sync_enter(self)
477+
let removeKey = _subscribers.insert(wrapper)
478+
objc_sync_exit(self)
479+
470480
return Disposables.create(with: {
471-
self.synchronized {
472-
self._subscribers.removeKey(removeKey)
473-
}
474-
() // our block needs to return void rather than the result of removeKey
481+
objc_sync_enter(self)
482+
_ = self._subscribers.removeKey(removeKey)
483+
objc_sync_exit(self)
475484
})
476485
}
477486

478487
public func onNext(_ element: T) {
479-
let subscribers = synchronized { () -> [AnyObserver<T>] in
480-
return _subscribers.toArray()
481-
}
488+
objc_sync_enter(self)
489+
let subscribers = _subscribers.toArray()
490+
objc_sync_exit(self)
491+
482492
for s in subscribers { s.onNext(element) }
483493
}
484494

485495
public func onError(_ error: Error) {
486-
let subscribers = synchronized { () -> [AnyObserver<T>] in
487-
state = .failed(error)
488-
let r = _subscribers.toArray()
489-
_subscribers.removeAll()
490-
return r
491-
}
496+
objc_sync_enter(self)
497+
state = .failed(error)
498+
let subscribers = _subscribers.toArray()
499+
_subscribers.removeAll()
500+
objc_sync_exit(self)
501+
492502
for s in subscribers { s.onError(error) }
493503
}
494504

495505
public func onCompleted() {
496-
let subscribers = synchronized { () -> [AnyObserver<T>] in
497-
state = .completed
498-
let r = _subscribers.toArray()
499-
_subscribers.removeAll()
500-
return r
501-
}
506+
objc_sync_enter(self)
507+
state = .completed
508+
let subscribers = _subscribers.toArray()
509+
_subscribers.removeAll()
510+
objc_sync_exit(self)
511+
502512
for s in subscribers { s.onCompleted() }
503513
}
504514
}
@@ -571,10 +581,11 @@ public extension ObservableType {
571581
}
572582

573583
// type-erased ObserverType
574-
public struct AnyObserver<Element> : ObserverType {
584+
public class AnyObserver<Element> : ObserverType {
575585
private let _onNext: ((Element) -> Void)?
576586
private let _onError: ((Swift.Error) -> Void)?
577587
private let _onCompleted: (() -> Void)?
588+
private var _isStopped = false
578589

579590
/// Creates an AnyObserver from an existing observer, with a fast-path if the incoming observer is already an AnyObserver
580591
public static func from<T>(_ observer: T) -> AnyObserver<Element> where T : ObserverType, Element == T.Element {
@@ -598,17 +609,40 @@ public struct AnyObserver<Element> : ObserverType {
598609
}
599610

600611
public func onNext(_ element: Element) {
612+
do {
613+
objc_sync_enter(self)
614+
defer { objc_sync_exit(self) }
615+
616+
if _isStopped { return }
617+
}
618+
601619
_onNext?(element)
602620
}
621+
603622
public func onCompleted() {
623+
do {
624+
objc_sync_enter(self)
625+
defer { objc_sync_exit(self) }
626+
627+
if _isStopped { return }
628+
_isStopped = true
629+
}
604630
_onCompleted?()
605631
}
632+
606633
public func onError(_ error: Error) {
634+
do {
635+
objc_sync_enter(self)
636+
defer { objc_sync_exit(self) }
637+
638+
if _isStopped { return }
639+
_isStopped = true
640+
}
607641
_onError?(error)
608642
}
609643
}
610644

611-
public class CompositeDisposable : Disposable, Lockable {
645+
public class CompositeDisposable : Disposable {
612646
private var _disposables = Bag<Disposable>()
613647
private var _disposed = false
614648

@@ -628,39 +662,41 @@ public class CompositeDisposable : Disposable, Lockable {
628662
}
629663

630664
public func insert(_ disposable: Disposable) -> DisposeKey? {
631-
synchronized {
632-
if _disposed {
633-
disposable.dispose()
634-
return nil
635-
}
636-
let bagKey = _disposables.insert(disposable)
637-
return DisposeKey(value: bagKey)
665+
objc_sync_enter(self); defer { objc_sync_exit(self) }
666+
if _disposed {
667+
disposable.dispose()
668+
return nil
638669
}
670+
let bagKey = _disposables.insert(disposable)
671+
return DisposeKey(value: bagKey)
639672
}
640673

641674
public var count: Int {
642-
synchronized { _disposables.count }
675+
objc_sync_enter(self); defer { objc_sync_exit(self) }
676+
return _disposables.count
643677
}
644678

645679
// removes and disposes the value identified by disposeKey
646680
public func remove(for disposeKey: DisposeKey) {
647-
synchronized {
648-
_disposables.removeKey(disposeKey.value)
649-
}?.dispose()
681+
objc_sync_enter(self)
682+
let d = _disposables.removeKey(disposeKey.value)
683+
objc_sync_exit(self)
684+
d?.dispose()
650685
}
651686

652687
public func dispose() {
653-
let copy:[Disposable] = synchronized {
688+
let copy:[Disposable]
689+
do {
690+
objc_sync_enter(self); defer { objc_sync_exit(self) }
654691
_disposed = true
655-
let copy = _disposables.toArray()
692+
copy = _disposables.toArray()
656693
_disposables = .init()
657-
return copy
658694
}
659695
for d in copy { d.dispose() }
660696
}
661697
}
662698

663-
public class SerialDisposable : Cancelable, Lockable {
699+
public class SerialDisposable : Cancelable {
664700
private var _disposable: Disposable?
665701
private var _disposed = false
666702

@@ -671,19 +707,20 @@ public class SerialDisposable : Cancelable, Lockable {
671707
}
672708

673709
public var isDisposed: Bool {
674-
synchronized { _disposed }
710+
objc_sync_enter(self); defer { objc_sync_exit(self) }
711+
return _disposed
675712
}
676713

677714
public var disposable:Disposable? {
678715
get { return _disposable }
679716
set {
680-
if let old: Disposable = synchronized({
681-
let x = _disposable
682-
_disposable = newValue
683-
return x
684-
}) {
685-
old.dispose()
686-
}
717+
objc_sync_enter(self)
718+
let old = _disposable
719+
_disposable = newValue
720+
objc_sync_exit(self)
721+
722+
old?.dispose()
723+
687724
// needs to come after the old/swap so dispose() can call this
688725
if _disposed {
689726
newValue?.dispose()
@@ -974,30 +1011,39 @@ public extension ObservableType {
9741011
[ObserveOn](http://reactivex.io/documentation/operators/timeout.html) */
9751012
func timeout(_ dueTime: DispatchTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
9761013
return Observable.create { observer in
977-
let gate = Lock()
1014+
let gate = NSObject()
9781015
var innerDisposable: Disposable? = nil
9791016
let timeoutDisposable = scheduler.scheduleRelative((), dueTime: dueTime) {
980-
gate.synchronized({ () -> Disposable? in
981-
let r = innerDisposable
982-
innerDisposable = nil
983-
return r
984-
})?.dispose()
1017+
objc_sync_enter(gate)
1018+
let r = innerDisposable
1019+
innerDisposable = nil
1020+
objc_sync_exit(gate)
1021+
r?.dispose()
1022+
9851023
observer.onError(MiniRxError.timeout)
9861024
return Disposables.create()
9871025
}
9881026

9891027
innerDisposable = self.subscribe { (value) in
990-
if gate.synchronized({ innerDisposable }) == nil {
991-
return
992-
}
1028+
objc_sync_enter(gate)
1029+
let bailout = innerDisposable == nil
1030+
objc_sync_exit(gate)
1031+
if bailout { return }
1032+
9931033
timeoutDisposable.dispose()
9941034
observer.onNext(value)
9951035
} onError: { (err) in
996-
gate.synchronized { innerDisposable = nil }
1036+
objc_sync_enter(gate)
1037+
innerDisposable = nil
1038+
objc_sync_exit(gate)
1039+
9971040
timeoutDisposable.dispose()
9981041
observer.onError(err)
9991042
} onCompleted: {
1000-
gate.synchronized { innerDisposable = nil }
1043+
objc_sync_enter(gate)
1044+
innerDisposable = nil
1045+
objc_sync_exit(gate)
1046+
10011047
timeoutDisposable.dispose()
10021048
observer.onCompleted()
10031049
}
@@ -1215,17 +1261,3 @@ public class ConcurrentDispatchQueueScheduler : DispatchQueueScheduler {
12151261
target: nil))
12161262
}
12171263
}
1218-
1219-
fileprivate protocol Lockable : AnyObject { }
1220-
1221-
fileprivate extension Lockable {
1222-
@discardableResult // sometimes you just want to lock something and don't care about the return value
1223-
func synchronized<T>(_ block:() throws -> T) rethrows -> T {
1224-
objc_sync_enter(self)
1225-
defer{ objc_sync_exit(self) }
1226-
1227-
return try block()
1228-
}
1229-
}
1230-
1231-
fileprivate class Lock : Lockable { }

0 commit comments

Comments
 (0)