The BEST ReactiveX Tricks

Long Do
5 min readOct 16, 2020

--

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

Principles:

  • Observable Patterns: Observable, Single, Flowable, Completable, Maybe
  • Stream Subjects: PublishSubject, ReplaySubject, BehaviorSubject, AsyncSubject
  • Schedulers: IO, Main thread, COMPUTATION, SINGLE, NEW_THREAD
  • Operators: create, defer, just, map, flatMap, filter, first, join, merge, zip, retry…

Stream Pattern

  • Observable: The Observable class is the non-backpressure. It might emit multi or no item or exception
  • Single: Always emit only one item or exception
  • Flowable: equal observable plus backpressure. The Flowable hosts the default buffer size of 128 elements for operators
  • Completable: represents a deferred computation without any value but an only indication for completion or exception
  • Maybe: Might emit one item, nothing or exception

Example:

getObservable()
.subscribeOn(Schedulers.io) // lister to IO thread
.map { item -> item.mapToOtherModel() } // transform operator
.distinct() // operator
... // other operator
.observeOn(AndroidSchedulers.mainThread()) // thread execute
.subscribe(subscriber) // do action

Stream Subject

1. PublishSubject

PublishSubject emits all the items at the point of subscription

2. BehaviorSubject

The subject that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.

3. ReplaySubject

ReplaySubject emits all the items of the Observable, regardless of when the subscriber subscribes

4. AsyncSubject

AsyncSubject emits only the last value of the Observable and this only happens after the Observable completes

Operator

1. Create

create an Observable from scratch by means of a function

Observable.create { emitter ->
....
emitter.onNext() // call multiple time
emitter.onComplete() // call only one time
emitter.onError() // call only one time
}

2. Just

create an Observable that emits a particular item

3.Defer

The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function

Observable.defer {
Observable.just(getItem()
}

4. From/FromCallable

Create an Observable from scratch but can emit only one item means fromCallable return an item.

Observable.fromCallable<String> {
// Do something
return@fromCallable "ReactiveX"
}

5. Buffer

Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time

getObservable()
.buffer(3)

6. FlatMap

Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

getObservableX()
.flatMap{ getObervableY() }

7. Map

Transform the items emitted by an Observable by applying a function to each item

getObservable()
.map { item -> item.toUser() }

8. Debounce

only emit an item from an Observable if a particular timespan has passed without it emitting another item

getObservable()
.debounce(1000, TimeUnit.MILLISECOND)

9. Distinct

Suppress duplicate items emitted by an Observable

var subject = new Subject<int>()
var distinct = subject.distinct()
subject.subscribe(
i => Console.WriteLine("{0}", i),
() => Console.WriteLine("subject.OnCompleted()")
)
distinct.subscribe(
i => Console.WriteLine("distinct.OnNext({0})", i),
() => Console.WriteLine("distinct.OnCompleted()")
)
subject.OnNext(1)
subject.OnNext(2)
subject.OnNext(3)
subject.OnNext(1)
subject.OnNext(1)
subject.OnNext(4)
subject.OnCompleted()
Results:
1
distinct.OnNext(1)
2
distinct.OnNext(2)
3
distinct.OnNext(3)
1
1
4
distinct.OnNext(4)
subject.OnCompleted()
distinct.OnCompleted()

DistinctUntilChanged

var distinct = subject.DistinctUntilChanged()Results:
1
distinct.OnNext(1)
2
distinct.OnNext(2)
3
distinct.OnNext(3)
1
distinct.OnNext(1)
1
4
distinct.OnNext(4)
subject.OnCompleted()
distinct.OnCompleted()

10. Filter

Emit only those items from an Observable that pass a predicate test

11. First

Emit only the first item (or the first item that meets some condition) emitted by an Observable

12. Last

Emit only the last item (or the last item that meets some condition) emitted by an Observable

13. Take

Emit only the first n items emitted by an Observable

14. Merge

combine multiple Observables into one by merging their emissions

15. Concat

emit the emissions from two or more Observables without interleaving them

16. Zip

combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

You can see more operator here

--

--

Long Do
Long Do

Written by Long Do

Better code, better world

No responses yet