ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
Principles:
Observable Patterns:
Observable, Single, Flowable, Completable, MaybeStream Subjects:
PublishSubject, ReplaySubject, BehaviorSubject, AsyncSubjectSchedulers:
IO, Main thread, COMPUTATION, SINGLE, NEW_THREADOperators:
create, defer, just, map, flatMap, filter, first, join, merge, zip, retry…
Stream Pattern
- Observable: The Observable class is the non-backpressure. It might emit multi or no item or exception
- Single: Always emit only one item or exception
- Flowable: equal observable plus backpressure. The Flowable hosts the default buffer size of 128 elements for operators
- Completable: represents a deferred computation without any value but an only indication for completion or exception
- Maybe: Might emit one item, nothing or exception
Example:
getObservable()
.subscribeOn(Schedulers.io) // lister to IO thread
.map { item -> item.mapToOtherModel() } // transform operator
.distinct() // operator
... // other operator
.observeOn(AndroidSchedulers.mainThread()) // thread execute
.subscribe(subscriber) // do action
Stream Subject
1. PublishSubject
PublishSubject emits all the items at the point of subscription
2. BehaviorSubject
The subject that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer
.
3. ReplaySubject
ReplaySubject emits all the items of the Observable, regardless of when the subscriber subscribes
4. AsyncSubject
AsyncSubject emits only the last value of the Observable and this only happens after the Observable completes
Operator
1. Create
create an Observable from scratch by means of a function
Observable.create { emitter ->
....
emitter.onNext() // call multiple time
emitter.onComplete() // call only one time
emitter.onError() // call only one time
}
2. Just
create an Observable that emits a particular item
3.Defer
The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function
Observable.defer {
Observable.just(getItem()
}
4. From/FromCallable
Create an Observable from scratch but can emit only one item means fromCallable return an item.
Observable.fromCallable<String> {
// Do something
return@fromCallable "ReactiveX"
}
5. Buffer
Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
getObservable()
.buffer(3)
6. FlatMap
Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
getObservableX()
.flatMap{ getObervableY() }
7. Map
Transform the items emitted by an Observable by applying a function to each item
getObservable()
.map { item -> item.toUser() }
8. Debounce
only emit an item from an Observable if a particular timespan has passed without it emitting another item
getObservable()
.debounce(1000, TimeUnit.MILLISECOND)
9. Distinct
Suppress duplicate items emitted by an Observable
var subject = new Subject<int>()
var distinct = subject.distinct()subject.subscribe(
i => Console.WriteLine("{0}", i),
() => Console.WriteLine("subject.OnCompleted()")
)distinct.subscribe(
i => Console.WriteLine("distinct.OnNext({0})", i),
() => Console.WriteLine("distinct.OnCompleted()")
)subject.OnNext(1)
subject.OnNext(2)
subject.OnNext(3)
subject.OnNext(1)
subject.OnNext(1)
subject.OnNext(4)
subject.OnCompleted()Results:
1
distinct.OnNext(1)
2
distinct.OnNext(2)
3
distinct.OnNext(3)
1
1
4
distinct.OnNext(4)
subject.OnCompleted()
distinct.OnCompleted()
DistinctUntilChanged
var distinct = subject.DistinctUntilChanged()Results:
1
distinct.OnNext(1)
2
distinct.OnNext(2)
3
distinct.OnNext(3)
1
distinct.OnNext(1)
1
4
distinct.OnNext(4)
subject.OnCompleted()
distinct.OnCompleted()
10. Filter
Emit only those items from an Observable that pass a predicate test
11. First
Emit only the first item (or the first item that meets some condition) emitted by an Observable
12. Last
Emit only the last item (or the last item that meets some condition) emitted by an Observable
13. Take
Emit only the first n items emitted by an Observable
14. Merge
combine multiple Observables into one by merging their emissions
15. Concat
emit the emissions from two or more Observables without interleaving them
16. Zip
combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
You can see more operator here