Reactive programming is a general programming term that is focused on reacting to changes, such as data values or events. A callback is an approach to reactive programming done imperatively. RxJava provides dozens of operators that allow composing, transforming, scheduling, throttling, error handling, and lifecycle management.
Examples are taps by a user on the screen and listening for the results of asynchronous network calls.
These are some scenarios where you may use reactive programming:
RxJava has Observable class that represents a stream of data or events. It is not eager but lazy(not execute until we call). It is intendent for push(reactive) but can also be used for pull(interactive). It can be used synchronously or asynchronously. It can represent single, many or infinite values or events over time.
An Observable
can emit stream of data and can be subscribed to by an Observer.
Upon subscription, the Observer
can have three types of events pushed to it:
void onNext(T t)
: This event carries data values to Observers.void onComplete()
: This event terminates the event sequence with
success. Now the Observable completed and won’t emit any other events.void onError(Throwable t)
: This event also terminates the event
sequence but with an error and will not emit other events. Synchronous example of RxJava -
Observable<Integer> observable = Observable.create(emitter -> {
// Emit 100 numbers
for (int i = 0; i < 100; i++) {
System.out.println("Emitting : " + i);
// Publish or emit a value.
emitter.onNext(i);
}
// When all values are emitted, call complete.
emitter.onComplete();
});
observable.subscribe(value -> {
System.out.println("Received : " + value);
});
Let’s see how we can make the above code asynchronous:
subscribeOn()
to subscribe and
listen to events on a different thread i.e. other than main thread.observeOn()
to publish the events on a
different thread i.e. other than main thread.Thread.sleep()
Let’s see the code -
Observable<Integer> observable = Observable.create(emitter -> {
for (int i = 0; i < 100; i++) {
//We will also print current thread
System.out.println(Thread.currentThread().getName() + ", Emitting : " + i);
Thread.sleep(10);
// Emit a value.
emitter.onNext(i);
}
// At the end, we call complete.
emitter.onComplete();
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread());
observable.subscribe(value -> {
System.out.println(Thread.currentThread().getName() + ", Received : " + value);
});
Thread.sleep(5000);
RxJava mostly uses the large API of operators used to manipulate, combine, and transform data, such as map(), filter(), take(), flatMap(), and groupBy(). Most of these operators are synchronous,meaning that they perform their computation synchronously inside the onNext() as the events pass by.
Let’s see what does map() operator do (synchronously)-
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onCompleted();
});
observable.map(value -> "Number " + value)
.subscribe(System.out::println);
Conclusion - In this article we’ve seen basics of Observables, Observers and Operators. In the next one I will try to deep dive into RxJava and will focus more on asynchronous codes.
tags: Java - RxJava - Android