rxjava (4)


RxJava. How to handle errors from multiple Observables?

Imagine you need to perform several parallel network request and you have Retrofit and Rx as instruments.
You may end up with zip or combineLatest operators:

    val firstObservable = ServerApi().firstRequest( requestParams1 )
    val secondObservable = ServerApi().secondRequest( requestParams2 )
    val thirdObservable = ServerApi().thirdRequest( requestParams3 )

    // combine observables into one
    Observable.combineLatest(
        firstObservable,
        secondObservable,
        thirdObservable,
        Function3<FirstResponse, SecondResponse, ThirdResponse, CombinedResult> { firstResponse, secondResponse, thirdResponse ->
            // combine responses and create result for return function Function3
            CombinedResult( firstResponse, secondResponse, thirdResponse )
        }
    )
    .flatMap { combinedResult ->
        // perform some intermediate logic with combined result if needed
        Observable.just(combinedResult) // return original observable unchanged
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ combinedResult ->
        // end of request
        // show result
    }, {
       it.printStackTrace()
    })

There is a problem with this chain. When one of observables throw an exception, for example, HttpException, then the whole chain will be terminated and we would not be able to show combinedResult.
How to avoid this situation? We just need to handle errors correctly for every Observable in the combinable list.
There is a handy operator for this: onErrorReturn() Call it on the combineLatest parameters as follows:

    Observable.combineLatest(
        firstObservable
            .onErrorReturn { responseFromException(it) },
        secondObservable
            .onErrorReturn { responseFromException(it) },
    ....

onErrorReturn

onErrorReturn is a handy operator which emits new created object instead of terminating chain.

Use Faсtory Method pattern or just write a function called responseFromException(throwable: Throwable)) which will create an empty Response object with error for you:

    ...
    // FirstResponse, SecondResponse and ThirdResponse should inherit from BaseResponse
    fun <T : Any> responseFromException(throwable: Throwable): BaseResponse<T> {
        var errorResponse: BaseResponse<T> = BaseResponse()
        if (throwable is HttpException) {
            errorResponse.code = it.code()
            errorResponse.message = it.message()
        }
      return errorResponse
     }

That’s all. The rest code stay the same. With only one line added for each observable we now may expect the initial chain to work as expected: perform all request without terminal state.




Flowable from Room database

Hi! Today I’ll tell you how to get data from Room database in reactive way.

Before starting make sure you have following in your build.gradel:

// Room components
implementation "android.arch.persistence.room:runtime:$rootProject.roomVersion"
implementation "android.arch.persistence.room:rxjava2:$rootProject.roomVersion"
annotationProcessor "android.arch.persistence.room:compiler:$rootProject.roomVersion"
androidTestImplementation "android.arch.persistence.room:testing:$rootProject.roomVersion"

// Lifecycle components
implementation "android.arch.lifecycle:extensions:$rootProject.archLifecycleVersion"
implementation "android.arch.lifecycle:reactivestreams:$rootProject.archLifecycleVersion"
annotationProcessor "android.arch.lifecycle:compiler:$rootProject.archLifecycleVersion"

// Rx
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.1'

First we need to get database from assets and define Dao class for data.

@Database(entities = {Verse.class}, version = 1, exportSchema = false)
public abstract class MyRoomDatabase extends RoomDatabase {
    public abstract MyDataDao myDataDao();

    private static MyRoomDatabase INSTANCE;

    static MyRoomDatabase getDatabase(final Context context) {
        if (INSTANCE == null) {
            synchronized (MyRoomDatabase.class) {
                if (INSTANCE == null) {
                    INSTANCE = Room.databaseBuilder(context.getApplicationContext(),
                            MyRoomDatabase.class, "data.sqlite3") // get db from assets
                            .openHelperFactory(new AssetSQLiteOpenHelperFactory())
                            .build();

                }
            }
        }
        return INSTANCE;
    }

    public void destroyInstance() {
        synchronized (MyRoomDatabase.class) {
            INSTANCE = null;
        }
    }
}

@Dao
public interface MyDataDao {

    @Query("SELECT * from verses ORDER BY RANDOM() LIMIT 1")
    LiveData<Verse> getRandomVerse();

    @Query("SELECT * from verses ORDER BY RANDOM() LIMIT 1")
        // same request
    Flowable<Verse> getRxRandomVerse();
}

Above code represents raw request for random Verse objects from database table verses
What I like about Room is it allows us to get Rx Flowable as well as LiveData objects out of the box!
In our case we will use Flowable.
Next, define a repository class:

class MyRepository(context: Context?) {

    private val mDataDao: MyDataDao
    private val db: MyRoomDatabase? = MyRoomDatabase.getDatabase(context)

    init {
        mDataDao = db!!.myDataDao()
    }

    fun getRxRandomVerse(): Flowable<Verse> {
        return mDataDao.getRxRandomVerse
    }

    fun getDb(context: Context?): MyRoomDatabase? {
        return db ?: MyRoomDatabase.getDatabase(context)
    }

    fun closeDb() {
        db?.close()
    }
}

Finally, let’s extract data onto presentation layer:

class MainActivityPresenter {

    private var mRepository: MyRepository? = null

    fun attach(mainActivityView: MainActivityView, context: Context) {
        this.mainActivityView = mainActivityView
        this.mRepository = MyRepository(context)
    }

    fun getRandomVerse() {
        mainActivityView!!.showProgressBar()
        mRepository.getRxRandomVerse()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ verse ->
                    mainActivityView!!.hideProgressBar()
                    updateUI(verse?.toString())
                }, { error ->
                    mainActivityView!!.hideProgressBar()
                    error.printStackTrace()
                })
    }
}

class MainActivity : MainActivityPresenter.MainActivityView {

    ...

    override fun updateUI(verse: String) {
        textViewVerse.text = verse
    }

    ...
}

That’s all in general. Pretty simple and handy. I’ve successfully implemented this code in a new random quote app

If you have any questions, leave in comments below.




RxJava. ConcatMap for dependent Observables

Today I’m gonna tell how concatMap helps to transform an existing sequential dependent synchronous chain of methods into a reactive and multithreaded one.
Suppose we get a boolean value from a presenter, and perform some UI event:

  ...
  boolean state = mPresenter.getBooleanState(context)
  if(state) {
      showViewA();
    } else {
      showViewB();
  }

The class, that provides data, Presenter in our case, has a dependent method structure, the second depends on the result of the first:

public class MyPresenter<MyView> {

  ...

  private int getIntValue(Context context) {
      int retValue = someCalculationMethod(context);
      return retValue;
  }
  public boolean getBooleanState(Context context) {
      int intValue = getIntValue(context);
      return performSomeCalculationWith(intValue);
  }
}

When someCalculationMethod executes immediately, it might be run on the main thread. If it for example, server request, and we may expect some time delay, then it should be executed in a background thread.
With Rx we can easily achieve it, preliminary transformed methods getIntValue and getBooleanState into Observable:

  private Observable<Integer> getIntValue(Context context) {
      return Observable.fromCallable(() -> {
         int retValue = someCalculationMethod(context);
         return retValue;
      });
  }

  private Observable<Boolean> getBooleanState(Context context, Integer intValue) {
     return Observable.fromCallable(() -> {
        boolean retValue = performSomeCalculationWith(intValue);
        return retValue;
     });
  }

So, how do we get Observable from the getBooleanState without having getIntValue performed? All magic on merging these two Observables is made by concatMap operator:

  private Observable<Boolean> rxGetBooleanState(Context context) {
      Observable integerObservable = getIntValue(context)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

      Observable retObservable = integerObservable
        .concatMap(intValue -> getBooleanState(context, intValue));
      return retObservable;
  }

ConcatMap works similarly to flatMap. Important distinctive feature is it keeps elements order.

Let’s check docs:

Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatenating those resulting Observables.

And look at the concatMap operator scheme:

concatMap

concatMap scheme

concatMap applies a function (Observable) that you supply to each item emitted by the original Observable, and then merges the results of that function applied to every item emitted by the original Observable, thus creating a new Observable saving original order from first Observable.

And now we’re only need to call our method from appropriate context:

Disposable disposable = mPresenter.rxGetBooleanState(this)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { state ->
            if (required) {
                showViewA();
            } else {
               showViewB();
            }
        }

Update
Actually, the easiest way to execute dependant observables is to use .flatMap operator, as described here: https://github.com/ReactiveX/RxJava/issues/442


getIntValue(context).flatMap( intValue -> {
   return getBooleanState(context, intValue)
})



RxJava. share operator with RxBinding

Today I’ll tell you about a convenient reactive approach to handle EditText input. As an example, we will take a search string in a SearchView.

The task

Search usually takes a while. No matter if we requesting data from the server, or read the local database. User may input letters in the textfield much faster than the requests would work off. Therefore we need to settle some time threshold not making any requests more frequent than necessary. Besides, we may need to update some UI changes in real-time, without those time gaps.

Solution

Without Rx in Android we can use Runnable and Handler, which is not handy. But thanks to the RxBinding we may use the debounce operator and get an elegant solution:

RxSearchView.queryTextChanges(searchView) 
 .debounce(500, TimeUnit.MILLISECONDS) // delay for 500 ms
 .subscribe(query -> mPresenter.searchRequest(query));

Now, regardless of input speed, requests to the server will be performed no more often than twice a second.
But, what should we do, if we need to update UI on every letter input? For example, hint needs to be changed. Using the code above we would have debounced UI updates with 500 ms, what is not quite user-friendly.

Fortunately, in Rx there is the possibility to broadcast to multiple subscribers. In order to use it we need to call .share() on our Observable and voila!

Observable<String> sharedTextChanges = RxSearchView.queryTextChages(searchViw).share()
 
sharedTextChanges 
 .debounce(500, TimeUnit.MILLISECONDS) // use debounce 
 .subscribe(query -> mPresenter.searchRequest(query)); 
 
sharedTextChanges 
 .subscribe(query -> mPresenter.updateUI(query));

We subscribed to Observable twice: with and without debounce. Now UI refreshes on every letter input, whereas server request being performed no more frequent than 500 ms.

What’s under the hood?

Operator .share() is an wrapper for .publish().refcount(). They allow to “share” items emitted by a stream. Let’s look deeper.

Operator.publish( ) — transforms Observable into ConnectableObservable.

rxjava publish operator scheme

“ConnectedObservable” is an Observable, which is not emitting data until someone wouldn’t call .connect() operator on it.

Operator .refcount() controls multiple subscribers. According to the docs,

Returns an Observable that stays connected to this ConnectableObservable as long as there is at least one subscription to this ConnectableObservable.

RxJava operator refcount

refcount() operator knows how many subscribers are subscribed to the Observable and doesn’t disconnect from the source ConnectedObservable until all Observables are unsubscribed.

That’s all. Very effective and elegant, as always with Rx. Hope this post was useful for you!