RxSwift course - combining observables: merge, zip, combineLatest, withLatestFrom

RxSwift provides many ways to combine and merge two or more observables together. Sometimes more than one thing has to go right to get the desired result.

Starter project

We will reuse the starter project available here.

Merge

This operator merges two observables of the same value type together. Consider this code:

let disposeBag = DisposeBag()

let first = PublishSubject<String>()
let second = PublishSubject<String>()

Observable.merge(first, second)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

first.onNext("One")
second.onNext("1")
first.onNext("Two")
first.onNext("Three")
second.onNext("2")
first.onNext("Four")
second.onNext("3")
first.onNext("Five")
second.onNext("4")
first.onCompleted()
second.onNext("5")

I'm sending elements to both subjects, and the first subject ends before the second sends the last element.

One
1
Two
Three
2
Four
3
Five
4
5

merge combines both observables and emits their elements in one stream. merge will complete when all merged observables are completed.

Zip

Zip always sends a group of new values, it will emit value only when all zipped observables emit value. It will buffer all emitted values from zipped observables and send them in incoming order. It is better to explain in example:

let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<Int>()
let third = PublishSubject<Double>()

Observable.zip(first.do(onNext: { print("first \($0)")}),
               second.do(onNext: { print("second \($0)")}),
               third.do(onNext: { print("third \($0)")}))
    .subscribe(onNext: { print("zipped \($0) \($1) \($2)") })
    .disposed(by: disposeBag)


first.onNext("A")
second.onNext(1)
first.onNext("B")
third.onNext(0.1)
third.onNext(0.2)
third.onNext(0.3)
third.onCompleted()
first.onNext("C")
second.onNext(2)
second.onNext(3)
first.onNext("D")
second.onNext(4)
first.onNext("E")
second.onNext(5)
second.onNext(6)

Side effects

For better visibility, I'm using a new operator - .do(...) this will execute a block of code on an event, without changing emitted element. This can be useful when debugging or with analytics when you want to send a log when a certain thing happens. In our case, it will print a new element when it comes, before being zipped.

This time I'll use graphic aid - colors:

I matched what zip is doing, every group is shown by different colors. Only when zip has one of each element available it will emit its element as a touple. As you can see, there are five elements emitted by the first observable, six by the second observable, but only three emitted by the last observable, and we have three groups emitted by zip.

Combine latest

Combine latest will emit value every time one of the inner observables emit value, but the first one will come when all of the inner observables emit at least one value. I will use the same input as for zip to better show difference.

let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<Int>()
let third = PublishSubject<Double>()

Observable.combineLatest(first.do(onNext: { print("first \($0)")}),
               second.do(onNext: { print("second \($0)")}),
               third.do(onNext: { print("third \($0)")}))
    .subscribe(onNext: { print("\($0) \($1) \($2)") })
    .disposed(by: disposeBag)


first.onNext("A")
second.onNext(1)
first.onNext("B")
third.onNext(0.1)
third.onNext(0.2)
third.onNext(0.3)
third.onCompleted()
first.onNext("C")
second.onNext(2)
second.onNext(3)
first.onNext("D")
second.onNext(4)
first.onNext("E")
second.onNext(5)
second.onNext(6)

And output:

first A
second 1
first B
third 0.1
B 1 0.1
third 0.2
B 1 0.2
third 0.3
B 1 0.3
first C
C 1 0.3
second 2
C 2 0.3
second 3
C 3 0.3
first D
D 3 0.3
second 4
D 4 0.3
first E
E 4 0.3
second 5
E 5 0.3
second 6
E 6 0.3

After all of the inner observables emit at least one value, combineLatest will start to emit its own. Look at the first value emitted by combineLatest - B 1 0.1 - it starts with B, because this operator doesn't buffer inner observables, it just remembers the last one. After emitting the first value, it will emit a new one whenever any of the inner observables provides value.

withLatestFrom

This operator will emit the value of the second operator every time it itself emit value, the first value will be emitted when the second emits at least one element. In simple words, it means "Give me this value every time I emit element"

let disposeBag = DisposeBag()

let first = PublishSubject<String>()
let second = PublishSubject<Int>()

first.do(onNext: { print("first \($0)")})
    .withLatestFrom(second.do(onNext: { print("second \($0)")}))
    .subscribe(onNext: { print("\($0)") })
    .disposed(by: disposeBag)


first.onNext("A")
second.onNext(1)
first.onNext("B")
first.onNext("C")
second.onNext(2)
second.onNext(3)
first.onNext("D")
second.onNext(4)
first.onNext("E")
second.onNext(5)
second.onNext(6)

And output:

first A
second 1
first B
1
first C
1
second 2
second 3
first D
3
second 4
first E
4
second 5
second 6

From this, we can see that when A is emitted nothing happens, then 1 is emitted, still, no output, we need first to emit element to get value from second. Then B is emitted and we got our output - 1 because it is the latest element emitted by second.

There are a couple of more variations of these operators and more options to control and merge observables. If you want to read more about flow control and Rx operators go here.

We will see each other next time! And it will be a big one - async work!

Here you can find all RxSwift course posts:

RxSwift course
I’ll put here all RxSwift related posts! RxSwift course - basicsFor 3 years I’m working with RxSwift on daily basis. It helps a lot with data manipulation and UI binding when MVVM is your architecture of choice. The framework gives developers flexibility and extendability. I made even an Rx