Calling an exterenal NoSQL database asynchronously from within a Spring Boot microservice using RxJava

Well, it seems RxJava lends me to write lots of blog posts due to it’s immense capabilities and operators. Today I am gonna discuss accessing a NoSQL database from within a Spring Boot microservice asynchronously using the capabilities of RxJava.

Pre-requisites - Having some basic understanding on Java programming, Java 8, Micro services, Spring Boot, NoSQL databases.

If you need to get some idea about RxJava please refer my previous blogpost [1] where I have given some basic introduction to RxJava. If you are more interested in RxJava, I recommend you to read through this book [2].








What is RxJava and Reactive Programming.
In Reactive Programming the consumer reacts to the data as it comes in. This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers. [3]

RxJava is a port from Netflix of the Reactive Extensions (Rx) to Java. RxJava was open sourced 2014 and is hosted at http://reactivex.io/. The Java version of this concept is called RxJava and is hosted under https://github.com/ReactiveX/RxJava. RxJava is published under the Apache 2.0 license. [3]

RxJava describes itself as an API for asynchronous programming with observable streams.

Why Asynchronous Programming
Nowadays programming in an imperative single threaded way usually leads to strange behaviors, blocking non responsive UIs and therefore a bad user experience.

This can be avoided by handling unpredicted things asynchronously. For example actively waiting for a database query or a web/rest service call can cause an application to freeze, if the network is not responsive.

To improve this query or a web/rest service call, which takes an unpredictable amount of time, it should be run in a different thread and inform/notify the main thread when a result comes in. [3]


Observables, Observers and Subscriptions
RxJava provides Observables and Observers. Observables can send out values. Observers, watch Observables by subscribing to them. Observers are notified when an Observable emits a value, when the Observable says an error has occurred. They are also notified when the    Observable sends the information that it no longer has any values to emit. The corresponding functions are onNext, onError, and onCompleted() from the Observer interface. An instance of Subscription represents the connection between an observer and an observable. If you call unsubscribe() on this instance then it will remove the connection. [3]


Workshop
Now we got some basic understanding about RxJava and it’s time for us to get into this. Today I am going to call an external NoSQL database to create a student object/document and fetch a student object/document given the name value. We’ll do all these external interactions asynchronously using RxJava. The high level component diagram of our usecase is depicted below. I am going to use MongoDB as our NoSQL database.



 Figure1: High Level view of the scenario


I assume you have MongoDB installed on your local machine and it is up and running. We are going to use MongoDB as our NoSQL database. I am using default connection settings here. For some reason if you have a different configuration for the MongoDB instance that you are going to connect you have to change it. For that locate the application.properties file under $PROJECT_ROOT/src/main/resources directory and change the following properties according to your setup.



All the steps needed to startup the microservice and test the functionality has been given in the readme file distributed with the project. [4] I will just go through the code [4] and explain it to you here. Firstly we are using MongoDBRxJava Driver for our purpose.

Let’s first focus on fetching Student data by name usecase. The sample url is given below. You have to send a GET request to the url passing the name as a query parameter.


A sample response that you get for the above request is given below.




The source code of the findByName method in StudentDAO class which connects to the MnogoDB instance with the given configuration is listed down below. This is where the so called RxJava magic happens.


Here we call the MongoDB asynchronously and it returns an RxJava Observable to which we can subscribe later. When the item is emitted, a Document with the student information is received which then is converted into a StudentDTO instance using the powerful RxJava operator called map. [5] This ultimately returns an Observable that emits the results of these transformations.


Map

Transform the items emitted by an Observable by applying a function to each item. 

 

The Map operator applies a function of your choosing to each item emitted by the source Observable, and returns an Observable that emits the results of these function applications.

Finally let’s come back to the StudentResource class where we hook them up to each other using a subscription. I have given the code below.




Here we merely create a subscriber to consume data.  When the data is available in the stream, the Observable will notify the subscribers so that they can consume the data.

Creating a new student entity in the system by POSTing a JSON payload works similarly and I will leave it for you to try. If you have any questions regarding that flow don’t hesitate to post them here.

There is no blocking code neither in the driver nor in the application code. The solution is completely reactive and asynchronous.


Conclusion
Well, we covered a lot of ground in RxJava landscape today. That does it for another blog post. I’ll see you in my next blogpost. Happy Coding !



References


Comments

Popular posts from this blog

WSO2 ESB Worker Manager Cluster without a Load Balancer

XML to XML transformation via Smooks Mediator in WSO2 ESB

JMS 2.0 Support in WSO2 ESB