RxJava 2 Tutorial.

1.1. What is RxJava and reactive programming

In reactive programming the consumer reacts to the data as it comes in. This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers.
Reactivex is a project which provides implementations for this concept for different programming languages. It describes itself as:
The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.
RxJava is the Java implementation of this concept. RxJava is published under the Apache 2.0 license. RxJava provides Java API for asynchronous programming with observable streams.

2. Build blocks for RxJava

The build blocks for RxJava code are the following:
  • observables representing sources of data
  • subscribers (or observers) listening to the observables
  • a set of methods for modifying and composing the data
An observable emits items; a subscriber consumes those items.

2.1. Observables

Observables are the sources for the data. Usually they start providing data once a subscriber starts listening. An observable may emit any number of items (including zero items). It can terminate either successfully or with an error. Sources may never terminate, for example, an observable for a button click can potentially produce an infinite stream of events.

2.2. Subscribers

A observable can have any number of subscribers. If a new item is emitted from the observable, the onNext() method is called on each subscriber. If the observable finishes its data flow successful, the onComplete() method is called on each subscriber. Similar, if the observable finishes its data flow with an error, the onError() method is called on each subscriber.

3. RxJava example

A very simple example written as JUnit4 test is the following:
package com.vogella.android.rxjava.simple;

import org.junit.Test;

import io.reactivex.Observable;

import static junit.framework.Assert.assertTrue;


public class RxJavaUnitTest {
    String result="";

    // Simple subscription to a fix value
    @Test
    public void returnAValue(){
        result = "";
        Observable<String> observer = Observable.just("Hello"); // provides datea
        observer.subscribe(s -> result=s); // Callable as subscriber
        assertTrue(result.equals("Hello"));
    }
}

3.1. Why doing asynchronous programming

Reactive programming provides a simple way of asynchronous programming. This allows to simplify the asynchronously processing of potential long running operations. It also provides a defined way of handling multiple events, errors and termination of the event stream. Reactive programming provides also a simplified way of running different tasks in different threads. For example, widgets in SWT and Android have to be updated from the UI thread and reactive programming provides ways to run observables and subscribers in different threads.
It is also possible to convert the stream before its received by the observers. And you can chain operations, e.g., if a API call depends on the call of another API Last but not least, reactive programming reduces the need for state variables, which can be the source of errors.

3.2. Adding RxJava 2 to a Java project

As of this writing the version 2.1.1 is currently the released one. Replace the version with your desired version.
To use RxJava in a Gradle build, add the following as dependency.
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.1.1'
For Maven, you can add RxJava via the following snippet.
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.0.4</version>
</dependency>
For OSGi environments, e.g., Eclipse RCP development, https://dl.bintray.com/simon-scholz/RxJava-OSGi/ can be used as p2 update site.

4. Creating Observables, subscribing to them and disposing them

4.1. Creating observables

You can create different types of observables.
Table 1. Obervable types
TypeDescription
Flowable<T>
Emits 0 or n items and terminates with an success or an error event. Supports backpressure, which allows to control how fast a source emits items.
Observable<T>
Emits 0 or n items and terminates with an success or an error event.
Single<T>
Emits either a single item or an error event. The reactive version of a method call.
Maybe<T>
Succeeds with an item, or no item, or errors. The reactive version of an Optional.
Completable
Either completes with an success or with an error event. It never emits items. The reactive version of a Runnable.
An example for the usage of Flowable, is when you process touch events. You cannot control the user who is doing these touch events, but you can tell the source to emit the events on a slower rate in case you cannot processes them at the rate the user produces them.
The following shows an example for the creation of an observable.
 Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
            @Override
            public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
                try {
                    List<Todo> todos = RxJavaUnitTest.this.getTodos();
                    for (Todo todo : todos) {
                        emitter.onNext(todo);
                    }
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });
Using lambdas, the same statement can be expressed as:
Observable<Todo> todoObservable = Observable.create(emitter -> {
    try {
        List<Todo> todos = getTodos();
        for (Todo todo : todos) {
            emitter.onNext(todo);
        }
        emitter.onComplete();
    } catch (Exception e) {
        emitter.onError(e);
    }
});
The following is an example for a Maybe.
Maybe<List<Todo>> todoMaybe = Maybe.create(emitter -> {
    try {
        List<Todo> todos = getTodos();
        if(todos != null && !todos.isEmpty()) {
            emitter.onSuccess(todos); 
        } else {
            emitter.onComplete(); 
        }
    } catch (Exception e) {
        emitter.onError(e); 
    }
});
java.util.Optional has a value
java.util.Optional contains no value → null
An error occurred

4.2. Convenience methods to create observables

RxJava provides several convenience methods to create observables
  • Observable.just("Hello") - Allows to create an observable as wrapper around other data types
  • Observable.fromIterable() - takes an java.lang.Iterable<T> and emits their values in their order in the data structure
  • Observable.fromArray() - takes an array and emits their values in their order in the data structure
  • Observable.fromCallable() - Allows to create an observable for a java.util.concurrent.Callable<V>
  • Observable.fromFuture() - Allows to create an observable for a java.util.concurrent.Future
  • Observable.interval() - An observable that emits Long objects in a given interval
Similar methods exists for the other data types, e.g., *Flowable.just()Maybe.just() and Single.just.

4.3. Subscribing in RxJava

To receive the data emitted from an observable you need to subscribe to it. observables offer a large variety of subscribe methods.
Observable<Todo> todoObservable = Observable.create(emitter -> { ... });

// Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));

// Dispose the subscription when not interested in the emitted data any more
disposable.dispose();

// Also handle the error case with a second io.reactivex.functions.Consumer<T>
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());
There is also a subscribeWith method on observable instances, which can be used like this:
DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new  DisposableObserver<Todo>() {

    @Override
    public void onNext(Todo t) {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }
});

4.4. Disposing subscriptions and using CompositeDisposable

When listers or subscribers are attached they usually are not supposed to listen eternally.
So it could happen that due to some state change the event being emitted by an observable might be not interesting any more.
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;

Single<List<Todo>> todosSingle = getTodos();

Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

    @Override
    public void onSuccess(List<Todo> todos) {
        // work with the resulting todos
    }

    @Override
    public void onError(Throwable e) {
        // handle the error case
    }
});

// continue working and dispose when value of the Single is not interesting any more
disposable.dispose();
The Single class and other observable classes offer different subscribe methods, which return a Disposable object.
When working with multiple subscriptions, which may become obsolete due to the same state change using a CompositeDisposable is pretty handy to dispose a collection of subscriptions.
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;

CompositeDisposable compositeDisposable = new CompositeDisposable();

Single<List<Todo>> todosSingle = getTodos();

Single<Happiness> happiness = getHappiness();

compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

    @Override
    public void onSuccess(List<Todo> todos) {
        // work with the resulting todos
    }

    @Override
    public void onError(Throwable e) {
        // handle the error case
    }
}));

compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver<Happiness>() {

    @Override
    public void onSuccess(Happiness happiness) {
        // celebrate the happiness :-D
    }

    @Override
    public void onError(Throwable e) {
        System.err.println("Don't worry, be happy! :-P");
    }
}));

// continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
compositeDisposable.dispose();

Reference:
Source: http://www.vogella.com/tutorials/RxJava/article.html

Comments

Popular Posts