Combine the emissions of multiple Observables together using RxJava Zip operator in a Spring Boot Micro service

INTRODUCTION
Well, today we are going to discuss an advanced RxJava operator called Zip. Use of this Zip operator will definitely improve the performance of your application since it involves asynchronous execution of IO bound tasks such as external API calls over the networks which are really prominent in today's applications.

Prerequisites: Good knowledge of RxJava, Spring Boot, Microservices, Java8
User Level: Advanced




Since this is an advanced article, I am not going to walk you through the basics of RxJava or Spring Boot microservices. If you need to get some basic understanding of RxJava, I thoroughly recommend you to go through one of my previous blog posts [1] [2] or any other good introductory article about RxJava. If you need to learn more about Spring Boot microservices please go through some of the spring tutorials which are really comprehensive  and impressive.


RxJava Zip Operator
Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function.





The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable. It applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by Observable #1 and the first item emitted by Observable #2; the second item emitted by the new zip-Observable will be the result of the function applied to the second item emitted by Observable #1 and the second item emitted by Observable #2; and so forth. It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items. [3]


Workshop
Well, let's move on to the scenario that we are going to consider today. Check out the illustration below which depicts the big picture of the system we are going to  consider here.



Figure 1: The Big Picture


The use case we are going to use to demonstrate the RxJava Zip operator is an eCommerce system such as Amazon where people are purchasing things by placing orders online and getting them delivered at their door steps.

Suppose we have a simple REST API which returns us product information associated with our order. There is another REST API which computes shipping cost and returns all the necessary shipping information associated with this order. Our micro service is going to call both these services asynchronously and merge those information and return Order details to the end user. The product API [4] returns the price of the item while the shipping API [5] returns the shipping cost associated with it. Finally we sum them up to calculate the total price of the item and return it with all the other information about this order back to the user. 

The response of the Product API [4] is given below.


The response of the Shipping API [5] looks something like this.


 
When you send a get request to our Order API which we are planning to expose to the clients, you may get the following response.



All the necessary steps to run the sample microservice application [6] is given in the readme file distributed with the project itself, hence I am not gonna discuss them again here. Let’s now walk through the code and get into the implementation details.


Here you may see the use of the RxJava Zip operator. Here we are calling two REST service asynchronously. One is the product service [4] to get the product details associated with this order and the other is the Shipping Information Service [5] to get all the shipping cost associated with this order. I have used mock endpoints for the demonstration purpose here. And then we want to combine these two information in asynchronous manner. Here we are preparing the call to getProductDetails method. Then we are preparing the call to getShippingInformation method. Finally  we zip these two information asynchronously as the information arrives in each Observable stream. We are combining the sources of data as pairs and create a wrapper object called Order details here. We are passing a transformation function as an argument to the Zip operator and which does the magic.

Finally let’s come back to the OrderResource class where we hook them up to each other using a subscription. I have given the code below. Here we merely create a subscriber to consume data.  When the data is available in the stream, the Observable will notify the subscribers so that they can consume the data. 



Exercise
Carefully check out the places in your code and use case which can be substituted with RxJava Zip operator. Then when you refactor your code using RxJava, substitute them with Zip operator and see how much complexity it can reduce in your code. Another point to consider is that the performance you gain by calling all these IO bound tasks asynchronously which is well worth the payoff.



Conclusion
Well, we have covered a lot of ground in RxJava transformations and operator landscape today. I introduced an advanced RxJava operator called Zip which will be really helpful to you. I have explained it with a reference implementation and a sample use case while sharing the code with you [6]. That does it for another article. I’ll see you in my next blogpost. Happy Coding !



References
[1] http://ravindraranwala.blogspot.com/2016/12/introducing-java-reactive-extentions-in.html
[2] http://ravindraranwala.blogspot.com/2017/01/calling-exterenal-nosql-database.html
[3] http://reactivex.io/documentation/operators/zip.html
[6] https://github.com/ravindraranwala/SpringBootRxJava

 

Comments

Popular posts from this blog

WSO2 ESB Worker Manager Cluster without a Load Balancer

XML to XML transformation via Smooks Mediator in WSO2 ESB

JMS 2.0 Support in WSO2 ESB