Introduction to RxJava : Reactive Programming in Java
In our latest RxJava series, we will learn about reactive programming in Java. In this series, you will be introduced to reactive programming concepts. You’ll learn the basics of creating observable sequences using RxJava, also when and how to use RxJava in your project.
1. Setting Up Environment
To Setup the environment, first we will create a Maven Project in eclipse
File -> New -> Maven Project
Next add below maven dependency for RxJava in pom.xml file.
1 2 3 4 5 |
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.3</version> </dependency> |
By default Maven Project has Java dependency of JDK 1.5, To use RxJava we need to change it to JDK 1.8
go to Build Path -> JRE System Library -> Edit -> Select JDK 1.8
Next Import necessary dependency for using RxJava into a Java class.
1 |
import io.reactivex.*; |
Introduction to Reactive Programming
Reactive Programming is a programming paradigm that reacts to the changes instead of doing state change. It listens to the event and changes and runs code accordingly. In typical Observer pattern basically it has Producer and Consumer. Producer is often called as Observable/Subject and Consumers are often called as Observer/Subscriber.
Observer always listens to the events or stream of data coming from Observable/Producer it is attached to. The data received can be further transformed using functions using Functional Programming.
When to Use Reactive Programming
RxJava or Reactive Programming in Java helps in reducing the program complexity when program grows gradually, it becomes difficult to manage the application and its state. When we look at the complexity of the program it does not increase linearly with number of features. But as the internal state of the program gets larger and is interdependent with various inputs its gets really harder to maintain and keep track of our code and understand its impact on the entire system at once and possible states.
1. Managing Complexity
a) Many Inputs
b) Many Outputs
c) Complex state
Java Reactive programming manifesto helps in understanding the usage of it. It defines four main components that reactive program must take into consideration. It has to Responsive, Elastic, Resilient and Message Driven.
When we talk about Message Driven it means components of the application that triggers events that are observed rather than adding more variable and increase complexity of state, we just as components that add observable events. When changes are made in the component it triggers and event and the subscribers attached to it will execute the code based on the event. So we don’t have to manage state using variables.
To read more about reactive manifesto click here
Thinking Reactively
To start working with reactive programming we need to think reactively, So instead of thinking about the global state of the system we want to think in modular terms. So that we can decouple our system and start thinking about the flow of data through our system and not the entire system all at once.
We would let our modules and components send a event when it changes its state and put that into the steam. Subscribers listening for the events from the steam will get notified once event is triggered and will do what needs to do.
Types of Observables
- Hot : Always emitting whether or not we are subscribing to it. e.g. mouse click. If no one is listening then we just ignore it.
- Cold : Only emits when its listened to. eg. API polling.
Also when we think of reactive programming we always want to have a fault tolerance system which can be scaled easily. When building large scale system we have to think of decoupling our system components, as it makes thinks more elastic and free from bottlenecks and deadlocks.
Introduction to RxJava
We will write some sample for Observables and Observers to understand further.
1. Creating Observables Using from()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import java.util.Arrays; import java.util.List; import io.reactivex.Observable; public class RxJavaDemo { public static void main(String[] args) { Integer[] numbers = {1,2,3}; Observable<Integer> numbersObservable = Observable.fromArray(numbers); numbersObservable.subscribe(System.out::println); List<String> list = Arrays.asList("Red","green","blue","brown","yellow","white"); Observable<String> listsObservable = Observable.fromIterable(list); listsObservable.subscribe(s -> System.out.println(s.toLowerCase())); listsObservable.subscribe(s -> System.out.println(s.toUpperCase())); } } |
Here we have declared 2 Observable, first for Integer array data and second for List of Strings. For Integer array first we declare int array and in second line we initialize Observable using fromArray() method which takes array source and converts array into ObservableSource that emits the item in array. In Next line we subscribe consumers to the observable “numbersObservable” object.
1 |
public final Disposable subscribe(Consumer<? super T> onNext) |
The subscribe() method Subscribes to an ObservableSource and provides a callback to handle the items it emits. If the Observable emits an error, it is wrapped into an OnErrorNotImplementedException and routed to the RxJavaPlugins.onError handler.
For the Observer and Subscriber to listen to the data stream emitted by the Observable they need to be subscribed using the subscribe() method as shown below.
1 2 |
myObservable.subscribe(mySubscriber); myObservable.subscribe(myObserver); |
Next we declare List of Strings for which we will create Observable using fromIterable() method. As we have Iterable source we need to use fromIterable() method to initialize the observables. fromIterable() Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.
Next we have subscribed 2 subscriber to the “listObservable” to show we can have multiple subscribers listening for the events emitted by observable. First subscriber is making text as lowercase and Second subscriber is making text as uppercase.
Subscribers listen for the events in asynchronous manner.
Output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
1 2 3 red green blue brown yellow white RED GREEN BLUE BROWN YELLOW WHITE |
2. Creating Observables Using just()
Just() method helps in creating Observables using a single object. It emits whatever is present inside the just() function without any conversion from iteratable to the individual elements. It can take max upto 10 parameters.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import io.reactivex.Observable; class Point{ int x, y; Point(int x, int y){ this.x = x; this.y = y; } @Override public String toString() { return "("+x+", "+y+")"; } } public class RxJavaObservableJust { public static void main(String[] args) { Observable<Point> points = Observable.just(new Point(1,2), new Point(3,4)); points.subscribe(p -> { System.out.println(p); }); } } |
In above example just() method is taking 2 parameters of Point object with 1,2 and 3,4 as ‘x’ and ‘y’ values respectively. And subscriber who has subscribed to the object immediately emits the values.
Output:
1 2 |
(1, 2) (3, 4) |
3. Functional Reactive Programming
- Functional programming is the process of building software by composing pure functions, avoiding shared state, mutable data, and side-effects.
- Reactive programming is an asynchronous programming paradigm concerned with data streams and generating events regarding change.
Together, functional combined with reactive programming provides an elegant approach to event-driven programming – having pure functions to handle single objective and event based trigger on the stream of data – so that client reacts to the data as it comes in. This makes it Responsive, Elastic, Resilient and Message Driven and helps in fault tolerant and highly scalable system.
Creating Observers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Observer<String> sampleObserver = new Observer<String>() { @Override public void onNext(String s) { System.out.println("MyObserver onNext(): "+ s); } @Override public void onCompleted() { System.out.println("Observer complete"); } @Override public void onError(Throwable e) { } }; |
Creating Subscribers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Subscriber<String> sampleSubscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; |
- onNext() – The onNext() method is called on our observer each time a new event is published to the attached Observable. This the place where operation is performed on each event.
- onCompleted() – This method is called when events associated with and Observable is complete, indicating no more data left for processing.
- onError() – This is called when an unhandled exception is thrown.
Subscribing to an Observable
1 2 3 4 5 6 7 8 9 |
Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //sample } }); myObservable.subscribe(sampleSubscriber); myObservable.subscribe(sampleObserver); |
Happy Learning 🙂