RxJava. ConcatMap for dependent Observables

Today I’m gonna tell how concatMap helps to transform an existing sequential dependent synchronous chain of methods into a reactive and multithreaded one.
Suppose we get a boolean value from a presenter, and perform some UI event:

  ...
  boolean state = mPresenter.getBooleanState(context)
  if(state) {
      showViewA();
    } else {
      showViewB();
  }

The class, that provides data, Presenter in our case, has a dependent method structure, the second depends on the result of the first:

public class MyPresenter<MyView> {

  ...

  private int getIntValue(Context context) {
      int retValue = someCalculationMethod(context);
      return retValue;
  }
  public boolean getBooleanState(Context context) {
      int intValue = getIntValue(context);
      return performSomeCalculationWith(intValue);
  }
}

When someCalculationMethod executes immediately, it might be run on the main thread. If it for example, server request, and we may expect some time delay, then it should be executed in a background thread.
With Rx we can easily achieve it, preliminary transformed methods getIntValue and getBooleanState into Observable:

  private Observable<Integer> getIntValue(Context context) {
      return Observable.fromCallable(() -> {
         int retValue = someCalculationMethod(context);
         return retValue;
      });
  }

  private Observable<Boolean> getBooleanState(Context context, Integer intValue) {
     return Observable.fromCallable(() -> {
        boolean retValue = performSomeCalculationWith(intValue);
        return retValue;
     });
  }

So, how do we get Observable from the getBooleanState without having getIntValue performed? All magic on merging these two Observables is made by concatMap operator:

  private Observable<Boolean> rxGetBooleanState(Context context) {
      Observable integerObservable = getIntValue(context)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

      Observable retObservable = integerObservable
        .concatMap(intValue -> getBooleanState(context, intValue));
      return retObservable;
  }

ConcatMap works similarly to flatMap. Important distinctive feature is it keeps elements order.

Let’s check docs:

Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatenating those resulting Observables.

And look at the concatMap operator scheme:

concatMap

concatMap scheme

concatMap applies a function (Observable) that you supply to each item emitted by the original Observable, and then merges the results of that function applied to every item emitted by the original Observable, thus creating a new Observable saving original order from first Observable.

And now we’re only need to call our method from appropriate context:

Disposable disposable = mPresenter.rxGetBooleanState(this)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { state ->
            if (required) {
                showViewA();
            } else {
               showViewB();
            }
        }

Update
Actually, the easiest way to execute dependant observables is to use .flatMap operator, as described here: https://github.com/ReactiveX/RxJava/issues/442


getIntValue(context).flatMap( intValue -> {
   return getBooleanState(context, intValue)
})



No Comments


You can leave the first : )



Leave a Reply

Your email address will not be published. Required fields are marked *