Introducing Java Reactive Extentions in to a SpringBoot Micro Service
IntroductionToday we are going to take a look at Reactive Programming using Java8 and RxJava, which is a library for composing asynchronous and event-based programs by using observable sequences. RxJava stands for Reactive Extensions for the JVM. It extends the observer pattern [Gamma95] to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures .
Pre-Requisites: Having a good knowledge about Java8, Functional Programming and spring boot microservices.
What is Reactive programming?In essence, Reactive programming is an asynchronous programming paradigm concerned with data streams. The system time, a list of entries from the DB/API, user clicks, everything can be a stream of data. Data from different streams can easily be combined and transformed, and in the end processed/observed by the subscribers.
In RxJava Iterators and Iterables become Observers and Observables. Iterable and Iterator emulates a pull model whereas RxJava Observable and Observer emulates a push model. The push model is preferred over the imperative pull model. You will subscribe to a stream and you will receive notifications. The source will notify you when it emits new items. RxJava helps us to compose asynchronous programs elegantly.
Why Reactive programming?The same thing can be achieved using callbacks. In fact callbacks have low composition and lead to callback hell if you have many callbacks back to back. The code becomes more and more complex as you go on. On the other hand RxJava gives you rich set of operators to compose your code in more readable manner.
RxJava abstracts you away from the low level complexities related to threading and concurrency.
Concrete exampleNow let us see how to use RxJava components in a spring boot micro service. You may find the sample project in GitHub . All the necessary steps to run the project was given in the readme file, hence I am not going to discuss it again here.
This application implements a Spring Boot microservice using RxJava. It explains you how to bridge the gap between RxJava and Spring Boot while integrating them together. What this application does is when you submit a GET request like this,
it gives you the currency rates for the given currencies compared against a base currency which is EURO in this case. The sample response payload is given below.
The application merely calls external currency conversion API to get the work done. The high level component diagram of the system under consideration is depicted below.
Let’s now check out the code. The entry point to any spring boot application is the Controller or Resource class which defines some basic, presumably CRUD operations related to a resource in REST and associate them with classic HTTP verbs. Here our resource class is CurrencyResource. This class has an instance of the service layer which merely implements business logic of our application. The service class named CurrencyConverter uses RxJava components to call an external currency converter REST API asynchronously. Inside the getCurrencyRatesFromEp method it creates the Observable which is your source of data. The Observable.create() method creates a cold observable, meaning the invocation gets triggered only if a subscription is made to this Observable. That's the default behavior. Moreover it dictates that when a new event is emitted, invoke onNext on this subscription and signal for completion. The sample code is given below.
At the end you subscribe an Observer to your Observable stream. There you describe what you want to do with the final form of the data. The Observer is a simple interface with three methods.
onNext() - method that gets notified when there is a new item emitted in the stream.
onCompleted() - When the source has no elements to emit, it will notify the consumer to complete
onError() - Notifies about exceptions and exceptions are propagated up.
We will receive end calls to onNext, followed either by onCompleted or onError.
The subscription to the stream is made inside the Resource method and here’s how it looks in practice.
By default Rx-Java is synchronous. If you really want to execute things asynchronously, then you need to tell it explicitly using Schedulers. Since the task of calling a downstream endpoint is much more I/O intensive, it is reasonable for us to use Schedulers.io here. Rx-Java provides you a set of built in thread pools such as Schedulers.io, Schedulers.Computation and so forth that you can choose from. However these schedulers should be used judiciously.
There are tons of operators available in RxJava in different categories such as creation, filtering, counting, transforming, combining, retrying and many more to assist developers. A complete treatment of all those operators is beyond the scope of this introductory article. Therefore I thoroughly reckon you to try those operators. They are very robust operators which lets you write simple, readable, composable and efficient code.
ConclusionReactive Programming is not easy, and it definitely comes with a steep learning curve, as you will have to shift the gears from imperative programming and start thinking in a “functional reactive way”, but once you get around it, it will simplify your life a lot.