RxJava. How to handle errors from multiple Observables?

Imagine you need to perform several parallel network request and you have Retrofit and Rx as instruments.
You may end up with zip or combineLatest operators:

    val firstObservable = ServerApi().firstRequest( requestParams1 )
    val secondObservable = ServerApi().secondRequest( requestParams2 )
    val thirdObservable = ServerApi().thirdRequest( requestParams3 )

    // combine observables into one
    Observable.combineLatest(
        firstObservable,
        secondObservable,
        thirdObservable,
        Function3<FirstResponse, SecondResponse, ThirdResponse, CombinedResult> { firstResponse, secondResponse, thirdResponse ->
            // combine responses and create result for return function Function3
            CombinedResult( firstResponse, secondResponse, thirdResponse )
        }
    )
    .flatMap { combinedResult ->
        // perform some intermediate logic with combined result if needed
        Observable.just(combinedResult) // return original observable unchanged
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ combinedResult ->
        // end of request
        // show result
    }, {
       it.printStackTrace()
    })

There is a problem with this chain. When one of observables throw an exception, for example, HttpException, then the whole chain will be terminated and we would not be able to show combinedResult.
How to avoid this situation? We just need to handle errors correctly for every Observable in the combinable list.
There is a handy operator for this: onErrorReturn() Call it on the combineLatest parameters as follows:

    Observable.combineLatest(
        firstObservable
            .onErrorReturn { responseFromException(it) },
        secondObservable
            .onErrorReturn { responseFromException(it) },
    ....

onErrorReturn

onErrorReturn is a handy operator which emits new created object instead of terminating chain.

Use Faсtory Method pattern or just write a function called responseFromException(throwable: Throwable)) which will create an empty Response object with error for you:

    ...
    // FirstResponse, SecondResponse and ThirdResponse should inherit from BaseResponse
    fun <T : Any> responseFromException(throwable: Throwable): BaseResponse<T> {
        var errorResponse: BaseResponse<T> = BaseResponse()
        if (throwable is HttpException) {
            errorResponse.code = it.code()
            errorResponse.message = it.message()
        }
      return errorResponse
     }

That’s all. The rest code stay the same. With only one line added for each observable we now may expect the initial chain to work as expected: perform all request without terminal state.




No Comments


You can leave the first : )



Leave a Reply

Your email address will not be published. Required fields are marked *