Friday, February 12, 2016

Observable

Why you should consider Couchbase as a KV or document database?

Tomek Nurkiewicz gave me access to his fresh book: "Reactive Programming with RxJava". I think that this is my chance to learn "something new" by designing fully reactive application. 

Evolution - from callbacks to RX

As you probably know node.js and javascript in general is single threaded. You can ask question - how it is possible that most nowadays Single Page Applications do a lot of things under the hood? They can send few request to backend and redraw user interface in responsive timing. Of course there is a possibility to run multiple IO operations at the same time in background but our app should not actively wait and check IO operations statuses.
Key to the "success" or rather a way how they do this in javascript is callbacks. After our operation ends (with success or error) our app gets informed instantly. 
Have you heard about callback hell? It all starts in javascript. For many years a lot of fresh js adepts create a lot of async code that tends to move to the right - I mean literally moves to the right because every callback moves their code 4 spaces right.
Now you can't enter modern JS without hearing about solutions to avoid callback hell (I mean promises and $q).

I was quite familiar with $q and promises in javascript when I found out CompletableFuture in java8. I like those new mechanism in java8 especially mixed with lambdas.
If you don't know much about RxJava you can imagine that RxJava is just CompletableFuture on steroids.

I found in Tomeks book a chapter named: "Couchbase client API" but there is no content behind this marker in current version. I can imagine that this chapter will be extremely interesting and especially if Tomek will compare RxJava driver with other db drivers - those blocking ones.

What is so special about relation of Couchbase with RxJava? Couchbase is first database I know to provide native RxJava driver. What does it mean? You can query your database asynchronously and be informed just after our data reach client side document by document until we get complete result or we end with any error. Lets look how we can write this in code:

    @RequestMapping(value = "/{login}", method = RequestMethod.GET)
    public DeferredResult<User> findByLogin(@PathVariable String login) {
        DeferredResult<User> deferred = new DeferredResult<>();
        userService
                .get(login)
                .subscribe(deferred::setResult,
                        e -> {
                            logger.error("Can't find user by login", e);
                            throw new UserDoesNotExist();
                        });
        return deferred;
    }

We have few mechanisms in this small piece of code:

DeferredResult - this is Spring way to defer returning of result (from Spring MVC 3.2). This mechanism benefit from servlet 3 and its ability to async request processing. This is quite important because it is very easy to transform RxJava into blocking and we should do such trick to avoid blocking any thread.

Observable<User> - this is RxJava stuff. UserService.get(login) method returns Observable<User>. We can subscribe on events from this observable and react to emitted events. If we get healthy user we can return our user as a result with deferred::setResult. If we get some nasty error we can put message in log and throw runtime error. With all those tricks during the time our database process our query no threads on our application server waits for result. Just after they will come our app will process them with new threads.

I wonder if I can develop entire API in this manner - I mean reactive way. I will get back to this post if I encounter any problems with this approach. I think that Tomeks book will help me to stay on course and avoid me from making mistakes that transforms RxJava into blocking one.

2 comments:

  1. Observable.subscribe może być blokujący (gdy jest wywołany na aktualnym wątku) wtedy ten kod by działał, ale dalej nie byłby idiomatyczny. Subscribe to nie jest dobre miejsce do rzucania wyjątków, bo wtedy rzucają się asynchronicznie na nie koniecznie znanym wątku (zwykle w jakiejś puli wątków i taki wyjątek nic nam nie mów, Couchbase ma swoją własną pulę i jeśli się nie zmienia schedulera to pracujemy na ich wątkach, co z kolei może być złe na bardzo wiele sposobów). Bardzo polecam śledzenie wątków w zabawach z RxJava.

    W subscribe - on error, można je obsługiwać lub podawać dalej. Można na przykład zamknąć request z kodem 401 albo wywołać DeferredResult.setErrorResult. Jeśli zaczyna się pracę z Observable to mają one ciekawy wpływ na implementacje. Zaczyna się od krótkich pojedyńczych metod ale bardzo szybko rozchodzą się po kodzie i każda metoda zwraca observable i mało gdzie jest subscribe. Da się w całości pisać api asynchronicznie - http://vertx.io/

    ReplyDelete
    Replies
    1. Krzysiek point few very interesting subjects:
      Observable.subscribe can be blocking (if we run this in the same thread that run current execution). This is feature not a bug I think. By design RxJava can be used in one thread model and block on execution. Krzysiek points that this is not idiomatic, but working way of writing rxjava stuff.
      Couchbase threading is very interesting and I wrote a blog post about that (Schedulers in couchbase).
      Most important thing and my mistake: do not throw Exceptions but propagate/handle errors in rxjava way.
      In reactive applications we will end with entire application based on Observables and subscribe only in one place - to connect this world with external frameworks.

      Delete