RxJava. share operator with RxBinding

Today I’ll tell you about a convenient reactive approach to handle EditText input. As an example, we will take a search string in a SearchView.

The task

Search usually takes a while. No matter if we requesting data from the server, or read the local database. User may input letters in the textfield much faster than the requests would work off. Therefore we need to settle some time threshold not making any requests more frequent than necessary. Besides, we may need to update some UI changes in real-time, without those time gaps.

Solution

Without Rx in Android we can use Runnable and Handler, which is not handy. But thanks to the RxBinding we may use the debounce operator and get an elegant solution:

RxSearchView.queryTextChanges(searchView) 
 .debounce(500, TimeUnit.MILLISECONDS) // delay for 500 ms
 .subscribe(query -> mPresenter.searchRequest(query));

Now, regardless of input speed, requests to the server will be performed no more often than twice a second.
But, what should we do, if we need to update UI on every letter input? For example, hint needs to be changed. Using the code above we would have debounced UI updates with 500 ms, what is not quite user-friendly.

Fortunately, in Rx there is the possibility to broadcast to multiple subscribers. In order to use it we need to call .share() on our Observable and voila!

Observable<String> sharedTextChanges = RxSearchView.queryTextChages(searchViw).share()
 
sharedTextChanges 
 .debounce(500, TimeUnit.MILLISECONDS) // use debounce 
 .subscribe(query -> mPresenter.searchRequest(query)); 
 
sharedTextChanges 
 .subscribe(query -> mPresenter.updateUI(query));

We subscribed to Observable twice: with and without debounce. Now UI refreshes on every letter input, whereas server request being performed no more frequent than 500 ms.

What’s under the hood?

Operator .share() is an wrapper for .publish().refcount(). They allow to “share” items emitted by a stream. Let’s look deeper.

Operator.publish( ) — transforms Observable into ConnectableObservable.

rxjava publish operator scheme

“ConnectedObservable” is an Observable, which is not emitting data until someone wouldn’t call .connect() operator on it.

Operator .refcount() controls multiple subscribers. According to the docs,

Returns an Observable that stays connected to this ConnectableObservable as long as there is at least one subscription to this ConnectableObservable.

RxJava operator refcount

refcount() operator knows how many subscribers are subscribed to the Observable and doesn’t disconnect from the source ConnectedObservable until all Observables are unsubscribed.

That’s all. Very effective and elegant, as always with Rx. Hope this post was useful for you!




No Comments


You can leave the first : )



Leave a Reply

Your email address will not be published. Required fields are marked *