Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Commit

Permalink
Merge pull request #153 from UrbanCompass/luis/update-combine-latest
Browse files Browse the repository at this point in the history
Fix combineLatest to only finish when all are done
  • Loading branch information
Luis Padron authored Mar 2, 2021
2 parents 93d05f4 + ba0a61b commit ce56707
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 101 deletions.
98 changes: 61 additions & 37 deletions Snail/Observable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -297,33 +297,40 @@ public class Observable<T>: ObservableType {
public static func combineLatest<U>(_ input1: Observable<T>, _ input2: Observable<U>) -> Observable<(T, U)> {
let combined = Observable<(T, U)>()

var value1: T?
var value2: U?
var input1Result: (value: T?, isComplete: Bool) = (nil, false)
var input2Result: (value: U?, isComplete: Bool) = (nil, false)

func triggerIfNeeded() {
guard let value1 = value1,
let value2 = value2 else {
return
guard let value1 = input1Result.value,
let value2 = input2Result.value else {
return
}
combined.on(.next((value1, value2)))
}

func finishIfNeeded() {
guard input1Result.isComplete, input2Result.isComplete else { return }
combined.on(.done)
}

input1.subscribe(onNext: {
value1 = $0
input1Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input1Result.isComplete = true
finishIfNeeded()
})

input2.subscribe(onNext: {
value2 = $0
input2Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input2Result.isComplete = true
finishIfNeeded()
})

return combined
Expand All @@ -334,44 +341,52 @@ public class Observable<T>: ObservableType {
_ input3: Observable<V>) -> Observable<(T, U, V)> {
let combined = Observable<(T, U, V)>()

var value1: T?
var value2: U?
var value3: V?
var input1Result: (value: T?, isComplete: Bool) = (nil, false)
var input2Result: (value: U?, isComplete: Bool) = (nil, false)
var input3Result: (value: V?, isComplete: Bool) = (nil, false)

func triggerIfNeeded() {
guard let value1 = value1,
let value2 = value2,
let value3 = value3 else {
guard let value1 = input1Result.value,
let value2 = input2Result.value,
let value3 = input3Result.value else {
return
}
combined.on(.next((value1, value2, value3)))
}

func finishIfNeeded() {
guard input1Result.isComplete, input2Result.isComplete, input3Result.isComplete else { return }
combined.on(.done)
}

input1.subscribe(onNext: {
value1 = $0
input1Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input1Result.isComplete = true
finishIfNeeded()
})

input2.subscribe(onNext: {
value2 = $0
input2Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input2Result.isComplete = true
finishIfNeeded()
})

input3.subscribe(onNext: {
value3 = $0
input3Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input3Result.isComplete = true
finishIfNeeded()
})

return combined
Expand All @@ -383,55 +398,64 @@ public class Observable<T>: ObservableType {
_ input4: Observable<K>) -> Observable<(T, U, V, K)> {
let combined = Observable<(T, U, V, K)>()

var value1: T?
var value2: U?
var value3: V?
var value4: K?
var input1Result: (value: T?, isComplete: Bool) = (nil, false)
var input2Result: (value: U?, isComplete: Bool) = (nil, false)
var input3Result: (value: V?, isComplete: Bool) = (nil, false)
var input4Result: (value: K?, isComplete: Bool) = (nil, false)

func triggerIfNeeded() {
guard let value1 = value1,
let value2 = value2,
let value3 = value3,
let value4 = value4 else {
guard let value1 = input1Result.value,
let value2 = input2Result.value,
let value3 = input3Result.value,
let value4 = input4Result.value else {
return
}
combined.on(.next((value1, value2, value3, value4)))
}

func finishIfNeeded() {
guard input1Result.isComplete, input2Result.isComplete, input3Result.isComplete, input4Result.isComplete else { return }
combined.on(.done)
}

input1.subscribe(onNext: {
value1 = $0
input1Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input1Result.isComplete = true
finishIfNeeded()
})

input2.subscribe(onNext: {
value2 = $0
input2Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input2Result.isComplete = true
finishIfNeeded()
})

input3.subscribe(onNext: {
value3 = $0
input3Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input3Result.isComplete = true
finishIfNeeded()
})

input4.subscribe(onNext: {
value4 = $0
input4Result.value = $0
triggerIfNeeded()
}, onError: {
combined.on(.error($0))
}, onDone: {
combined.on(.done)
input4Result.isComplete = true
finishIfNeeded()
})

return combined
Expand Down
106 changes: 42 additions & 64 deletions SnailTests/ObservableTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,20 @@ class ObservableTests: XCTestCase {
XCTAssertEqual(received.first, "ERROR")
}

func testCombineLatestDone_whenAllDone() {
let obs1 = Observable<String>()
let obs2 = Observable<Int>()

var isDone = false
Observable.combineLatest(obs1, obs2).subscribe(onDone: { isDone = true })

obs1.on(.done)
XCTAssertFalse(isDone)

obs2.on(.done)
XCTAssertTrue(isDone)
}

func testCombineLatest3() {
let one = Observable<String>()
let two = Observable<Int>()
Expand Down Expand Up @@ -615,28 +629,22 @@ class ObservableTests: XCTestCase {
waitForExpectations(timeout: 1)
}

func testCombineLatest3Done_fromSecondMember() {
let one = Observable<String>()
let two = Observable<Int>()
let three = Observable<Double>()
let subject = Observable.combineLatest(one, two, three)
func testCombineLatest3Done_whenAllDone() {
let obs1 = Observable<String>()
let obs2 = Observable<Int>()
let obs3 = Observable<Double>()

let exp = expectation(description: "combineLatest3 forwards done from observable")
subject.subscribe(onDone: { exp.fulfill() })
two.on(.done)
waitForExpectations(timeout: 1)
}
var isDone = false
Observable.combineLatest(obs1, obs2, obs3).subscribe(onDone: { isDone = true })

func testCombineLatest3Done_fromThirdMember() {
let one = Observable<String>()
let two = Observable<Int>()
let three = Observable<Double>()
let subject = Observable.combineLatest(one, two, three)
obs1.on(.done)
XCTAssertFalse(isDone)

let exp = expectation(description: "combineLatest3 forwards done from observable")
subject.subscribe(onDone: { exp.fulfill() })
three.on(.done)
waitForExpectations(timeout: 1)
obs2.on(.done)
XCTAssertFalse(isDone)

obs3.on(.done)
XCTAssertTrue(isDone)
}

func testCombineLatest3Optional() {
Expand Down Expand Up @@ -744,56 +752,26 @@ class ObservableTests: XCTestCase {
waitForExpectations(timeout: 1)
}

func testCombineLatest4Done_fromFirstMember() {
let one = Observable<String>()
let two = Observable<Int>()
let three = Observable<Double>()
let four = Observable<String>()
let subject = Observable.combineLatest(one, two, three, four)

let exp = expectation(description: "combineLatest4 forwards done from observable")
subject.subscribe(onDone: { exp.fulfill() })
one.on(.done)
waitForExpectations(timeout: 1)
}

func testCombineLatest4Done_fromSecondMember() {
let one = Observable<String>()
let two = Observable<Int>()
let three = Observable<Double>()
let four = Observable<String>()
let subject = Observable.combineLatest(one, two, three, four)
func testCombineLatest4Done_whenAllDone() {
let obs1 = Observable<String>()
let obs2 = Observable<Int>()
let obs3 = Observable<Double>()
let obs4 = Observable<Float>()

let exp = expectation(description: "combineLatest4 forwards done from observable")
subject.subscribe(onDone: { exp.fulfill() })
two.on(.done)
waitForExpectations(timeout: 1)
}
var isDone = false
Observable.combineLatest(obs1, obs2, obs3, obs4).subscribe(onDone: { isDone = true })

func testCombineLatest4Done_fromThirdMember() {
let one = Observable<String>()
let two = Observable<Int>()
let three = Observable<Double>()
let four = Observable<String>()
let subject = Observable.combineLatest(one, two, three, four)
obs1.on(.done)
XCTAssertFalse(isDone)

let exp = expectation(description: "combineLatest4 forwards done from observable")
subject.subscribe(onDone: { exp.fulfill() })
three.on(.done)
waitForExpectations(timeout: 1)
}
obs2.on(.done)
XCTAssertFalse(isDone)

func testCombineLatest4Done_fromFourthMember() {
let one = Observable<String>()
let two = Observable<Int>()
let three = Observable<Double>()
let four = Observable<String>()
let subject = Observable.combineLatest(one, two, three, four)
obs3.on(.done)
XCTAssertFalse(isDone)

let exp = expectation(description: "combineLatest4 forwards done from observable")
subject.subscribe(onDone: { exp.fulfill() })
four.on(.done)
waitForExpectations(timeout: 1)
obs4.on(.done)
XCTAssertTrue(isDone)
}

func testCombineLatest4Optional() {
Expand Down

0 comments on commit ce56707

Please sign in to comment.