RxSwift course - combining observables: merge, zip, combineLatest, withLatestFrom
RxSwift provides many ways to combine and merge two or more observables together. Sometimes more than one thing has to go right to get the desired result.
Starter project
We will reuse the starter project available here.
Merge
This operator merges two observables of the same value type together. Consider this code:
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.merge(first, second)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("One")
second.onNext("1")
first.onNext("Two")
first.onNext("Three")
second.onNext("2")
first.onNext("Four")
second.onNext("3")
first.onNext("Five")
second.onNext("4")
first.onCompleted()
second.onNext("5")
I'm sending elements to both subjects, and the first subject ends before the second sends the last element.
One
1
Two
Three
2
Four
3
Five
4
5
merge
combines both observables and emits their elements in one stream. merge
will complete when all merged observables are completed.
Zip
Zip always sends a group of new values, it will emit value only when all zipped observables emit value. It will buffer all emitted values from zipped observables and send them in incoming order. It is better to explain in example:
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<Int>()
let third = PublishSubject<Double>()
Observable.zip(first.do(onNext: { print("first \($0)")}),
second.do(onNext: { print("second \($0)")}),
third.do(onNext: { print("third \($0)")}))
.subscribe(onNext: { print("zipped \($0) \($1) \($2)") })
.disposed(by: disposeBag)
first.onNext("A")
second.onNext(1)
first.onNext("B")
third.onNext(0.1)
third.onNext(0.2)
third.onNext(0.3)
third.onCompleted()
first.onNext("C")
second.onNext(2)
second.onNext(3)
first.onNext("D")
second.onNext(4)
first.onNext("E")
second.onNext(5)
second.onNext(6)
Side effects
For better visibility, I'm using a new operator - .do(...)
this will execute a block of code on an event, without changing emitted element. This can be useful when debugging or with analytics when you want to send a log when a certain thing happens. In our case, it will print a new element when it comes, before being zipped.
This time I'll use graphic aid - colors:
I matched what zip is doing, every group is shown by different colors. Only when zip has one of each element available it will emit its element as a touple. As you can see, there are five elements emitted by the first observable, six by the second observable, but only three emitted by the last observable, and we have three groups emitted by zip.
Combine latest
Combine latest will emit value every time one of the inner observables emit value, but the first one will come when all of the inner observables emit at least one value. I will use the same input as for zip
to better show difference.
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<Int>()
let third = PublishSubject<Double>()
Observable.combineLatest(first.do(onNext: { print("first \($0)")}),
second.do(onNext: { print("second \($0)")}),
third.do(onNext: { print("third \($0)")}))
.subscribe(onNext: { print("\($0) \($1) \($2)") })
.disposed(by: disposeBag)
first.onNext("A")
second.onNext(1)
first.onNext("B")
third.onNext(0.1)
third.onNext(0.2)
third.onNext(0.3)
third.onCompleted()
first.onNext("C")
second.onNext(2)
second.onNext(3)
first.onNext("D")
second.onNext(4)
first.onNext("E")
second.onNext(5)
second.onNext(6)
And output:
first A
second 1
first B
third 0.1
B 1 0.1
third 0.2
B 1 0.2
third 0.3
B 1 0.3
first C
C 1 0.3
second 2
C 2 0.3
second 3
C 3 0.3
first D
D 3 0.3
second 4
D 4 0.3
first E
E 4 0.3
second 5
E 5 0.3
second 6
E 6 0.3
After all of the inner observables emit at least one value, combineLatest
will start to emit its own. Look at the first value emitted by combineLatest
- B 1 0.1
- it starts with B
, because this operator doesn't buffer inner observables, it just remembers the last one. After emitting the first value, it will emit a new one whenever any of the inner observables provides value.
withLatestFrom
This operator will emit the value of the second operator every time it itself emit value, the first value will be emitted when the second emits at least one element. In simple words, it means "Give me this value every time I emit element"
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<Int>()
first.do(onNext: { print("first \($0)")})
.withLatestFrom(second.do(onNext: { print("second \($0)")}))
.subscribe(onNext: { print("\($0)") })
.disposed(by: disposeBag)
first.onNext("A")
second.onNext(1)
first.onNext("B")
first.onNext("C")
second.onNext(2)
second.onNext(3)
first.onNext("D")
second.onNext(4)
first.onNext("E")
second.onNext(5)
second.onNext(6)
And output:
first A
second 1
first B
1
first C
1
second 2
second 3
first D
3
second 4
first E
4
second 5
second 6
From this, we can see that when A
is emitted nothing happens, then 1
is emitted, still, no output, we need first to emit element to get value from second. Then B
is emitted and we got our output - 1
because it is the latest element emitted by second.
There are a couple of more variations of these operators and more options to control and merge observables. If you want to read more about flow control and Rx operators go here.
We will see each other next time! And it will be a big one - async work!
Here you can find all RxSwift course posts: