From RxJava to Kotlin Flow: Error Handling

February 24, 2020

SourceSource

This post is part of series of comparing RxJava to Kotlin Flow. Previous articles were about Threading and Backpressure.

Introduction

Error handling is fundamental in reactive programming. Reactive streams might fail with exception and propagate it as an event downstream notifying consumers on the error that happened. As in previous articles, we’ll try to compare RxJava and Kotlin Flow error handling mechanisms. This might help us to migrate existing code from RxJava to Kotlin Flow and also understand how to properly use Kotlin Flow.

RxJava Error Handling

In RxJava general concepts for error handling are pretty simple and straightforward:

A basic example of error handling is:

observeChanges()
    .subscribe(
        { value -> println("value: $value") },
        { error -> println("error: $error") }
    )

Kotlin Flow Error Handling

As Kotlin Flow is essentially based on coroutines, the following applies:

Comparison

Setup

First of all, for our comparison, we’ll create helper functions of Flow and Observable, which emit a value and then throw an exception.

For Observable:

private fun observable(
    value: Int = 1
): Observable<Int> {
    return Observable.create { emitter ->
        emitter.onNext(value)
        emitter.onError(RuntimeException())
    }
}

And for Kotlin Flow (due to clash of names have to name it with “my” prefix):

private fun myFlow(
    value: Int = 1
): Flow<Int> {
    return flow {
        emit(value)
        throw RuntimeException()
    }
}

Next, we’ll set up short tests, which will be parametrized by operators. In that test we’ll take our test streams, subscribe to them on the io thread pool, then optionally will apply some operator and then print values and errors which will happen in the chain.

For Observable:

private fun testObservable(
    operator: Observable<Int>.() -> Observable<Int>
) {
    val latch = CountDownLatch(1)
    val result = StringBuffer()

observable()
        .subscribeOn(Schedulers.io())
        .operator()
        .doOnTerminate { latch.countDown() }
        .subscribe(
            { result.append("next $it, ") },
            { result.append("error $it") }
        )

latch.await()
    println(result)
}

For Kotlin Flow (note, that here we wrapped all the flow into the try-catch):

private fun testFlow(
    operator: Flow<Int>.() -> Flow<Int>
) {
    val latch = CountDownLatch(1)
    val result = StringBuffer()

    CoroutineScope(Job() + Dispatchers.IO).launch {
        try {
            myFlow()
                .operator()
                .onCompletion { latch.countDown() }
                .collect {
                    result.append("next $it, ")
                }
        } catch (e: Exception) {
            result.append("error $e")
        }
    }

    latch.await()
    println(result)
}

Basic Error Handling

First, our test will be to see what is the default behavior without additional changes. In this case, we’ll provide an identity operator:

testObservable { this }
testFlow { this }

In both cases in logs it will be printed:

next 1, error java.lang.RuntimeException

So, as we already discussed, our test streams emit one value and then terminate with an exception.

Catching Errors with an emitting default value

What if we don’t want our stream to be terminated (as error event terminates the stream)? One of the options is to emit some default value if we encounter an exception.

In RxJava for that, there is a special operator called onErrorReturn:

testObservable { onErrorReturn { 5 } }

If we run this example we’ll see that in logs it will be printed:

next 1, next 5,

So the first item in the stream (1) was emitted as before, but then instead of an error, we have a default value (5) emitted as well.

In Kotlin Flow for the same use case, there is an operator catch, and the usage is the following:

testFlow { catch { emit(5) } }

Operator catch catches all the exceptions from the upstream and can do some work, such as emitting default value. And result in logs will be the same.

Catching Errors by switching to another stream

Another option with not failing our stream on error is to switch to another stream. In RxJava for that, there is a special operator onErrorResumeNext:

testObservable { onErrorResumeNext(Observable.just(1, 2, 3)) }

After running the code the result will be:

next 1, next 1, next 2, next 3,

First, we have the value of 1 emitted (this is from the initial stream), then after error happened we switch to the new stream and all its values are emitted (and we won’t have an exception thrown).

NOTE: Essentially it is possible to achieve onErrorReturn with onErrorResumeNext by using onErrorResumeNext(Observable.just(5)), though usage of onErrorResumeNext has bigger overhead, so use it only when new stream is really needed.

In Kotlin Flow for such a case we use that same operator catch:

testFlow { catch { emitAll(flowOf(1, 2, 3)) } }

Instead of emitting a single value on error, we emitAll values from the stream with the same expected result.

Intercepting Errors

The next thing which we’ll do is intercepting errors. When we intercept errors we can do some side effects without actually handling that error, for example, logging.

In RxJava for this case, we can use the doOnError operator:

testObservable { doOnError { print("INTERCEPTED $it, ") } }

The result will be:

INTERCEPTED java.lang.RuntimeException, next 1, error java.lang.RuntimeException

So, first, we intercepted our exception, printed some logs, and then as in the case with basic error handling, we got the first item emitted with error followed.

In Kotlin Flow though there is no special operator for that case, but we can write our own:

fun <T> Flow<T>.doOnError(onError: (Throwable) -> Unit): Flow<T> {
    return flow {
        try {
            collect { value ->
                emit(value)
            }
        } catch (e: Exception) {
            onError(e)
            throw e
        }
    }
}

Here we just create new Flow, inside it, we start collecting all the values. We wrap everything in try-catch and if an error happens — we’ll execute the callback and re-throw exception.

The usage will be the following:

testFlow { doOnError { print("INTERCEPTED $it, ") } }

And as before, results in logs will be the same.

That’s all with the handling errors in the chain, but there is another topic of handling errors in inner streams.

Inner streams errors handling

As we know we can have inner streams by using flatMap in RxJava or flatMapMerge in Flow (or other xMap operators in RxJava or flatMapX operators in Kotlin Flow). We can apply directly what we’ve learned to our inner streams, though sometimes there is a need to control error handling from the parent stream perspective.

We’ll modify our test samples a bit to better show that.

For Observable our new example will be:

private fun testInnerObservable(
    operator: Observable<Int>.() -> Observable<Int>
) {
    val latch = CountDownLatch(1)
    val result = StringBuffer()

    Observable.just(10, 11, 12)
        .subscribeOn(Schedulers.io())
        .operator()
        .doOnTerminate { latch.countDown() }
        .subscribe(
            { result.append("next $it, ") },
            { result.append("error $it") }
        )

    latch.await()
    println(result)
}

The difference is that now our parent observable won’t throw an exception, instead it will emit three items and complete. Inside operator we’ll be able to add our inner streams.

For Kotlin Flow we’ll have the following sample:

private fun testInnerFlow(
    operator: Flow<Int>.() -> Flow<Int>
) {
    val latch = CountDownLatch(1)
    val result = StringBuffer()

    CoroutineScope(Job() + Dispatchers.IO).launch {
        try {
            flowOf(10, 11, 12)
                .operator()
                .onCompletion { latch.countDown() }
                .collect {
                    result.append("next $it, ")
                }
        } catch (e: Exception) {
            result.append("error $e")
        }
    }

    latch.await()
    println(result)
}

There are many different operators in RxJava, but we’ll take a look only at one: concatMapDelayError. In order to understand how it works, let’s first look at the concatMap and flatMapConcat operators. Both these operators start subscribing to the inner streams one by one. And subscription to the next inner stream happens only when previous was completed.

For RxJava and Kotlin Flow it will look like:

testInnerObservable { concatMap { observable(it) } }
testInnerFlow { flatMapConcat { myFlow(it) } }

What happens here:

Result of executing for RxJava and Kotlin Flow will be the same:

next 10, error java.lang.RuntimeException

We have first item emitted followed by an error and no more values are emitted. This happens because the first item (10) triggered a subscription of the inner stream, which emitted that value (10) and then failed with an exception. Because of that, the whole stream failed and the next item from parent (11) hasn’t been switched to the new inner stream.

What if we want to have all the values emitted and emit error only in the end (if it happened)? For such a case there is concatMapDelayError in RxJava. It will delay all the errors until the stream completes and throw CompositeException with all the exceptions which happened in the stream.

If we run the following code:

testInnerObservable { concatMapDelayError { observable(it) } }

We see the result:

next 10, next 11, next 12, error io.reactivex.exceptions.CompositeException: 3 exceptions occurred.

All three items were emitted followed by a single composite exception (which has three exceptions inside — one per each inner stream).

What about Kotlin Flow? It doesn’t have any operator with delaying errors. But we can try to write our own. We’ll start by copy-pasting sources of flatMapConcat operator and add some error handling there:

fun <T, R> Flow<T>.flatMapConcatDelayError(
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenConcatDelayError()

We’ll start from defining our operator flatMapConcatDelayError as combination of map and flatten (yep, map + flatten == flatMap). Then we’ll define our flattenConcatDelayError:

fun <T> Flow<Flow<T>>.flattenConcatDelayError(): Flow<T> {
    val list = CopyOnWriteArrayList<Exception>()
    return flow<T> {
        collect { value ->
            try {
                emitAll(value)
            } catch (e: Exception) {
                list.add(e)
            }
        }
    }.onCompletion {
        if (list.isNotEmpty()) {
            throw CompositeException(list)
        }
    }
}

What we do here is wrap our emits into try-catch with saving all the exceptions (note that we need to use thread-safe list). And throw a composite exception at the end if there were some exceptions.

Final call for Kotlin Flow will be:

testInnerFlow { flatMapConcatDelayError { myFlow(it) } }

RxJava-like subscribe for Kotlin Flow

Do I need to always use either catch at the bottom of each chain or wrap everything into try-catch? I’d say yes, but probably it could be useful to have similar API as in RxJava and have single subscribe with three lambdas.

We can try to implement such:

suspend fun <T> Flow<T>.subscribe(
    onNext: (T) -> Unit,
    onError: (Throwable) -> Unit,
    onComplete: () -> Unit
) {
    this
        .onEach { onNext(it) }
        .onCompletion { error: Throwable? ->
            if (error == null) {
                onComplete()
            }
        }
        .catch { onError(it) }
        .collect()
}

What we do here is instead of terminal collect method, we’ll use our own subscribe, which takes three lambdas. In onEach we’ll get all the values emitted. In onCompletion we’ll see that we track our onComplete (note that we need to check that we completed without error). And in catch we catch all the exceptions.

NOTE: we could move onError inside onCompletion if we wish and make catch clause empty. Say it is matter of taste.

But did anyone spot the issue?

Let’s run few tests:

runBlocking {
    flowOf(1, 2, 3)
        .subscribe(
            { print("next $it, ") },
            { print("error $it, ") },
            { print("complete ") }
        )
}

It prints:

next 1, next 2, next 3, complete

So far so good. We have all three values emitted following by completion event.

Next test will be with error:

runBlocking {
    flow {
        emit(1)
        throw RuntimeException()
    }
        .subscribe(
            { print("next $it, ") },
            { print("error $it, ") },
            { print("complete ") }
        )
}

It prints:

next 1, complete error java.lang.RuntimeException,

Also good, we have one value followed by error event. And we don’t have a completion event.

Let’s check the final test — we throw exception in a callback:

runBlocking {
    flow {
        emit(1)
    }
        .subscribe(
            { print("next $it, ") },
            { print("error $it, ") },
            { throw RuntimeException() }
        )
}

The result will be:

next 1, error java.lang.RuntimeException,

And this is not what we’ve expected. If we have an exception in a callback, then it should be thrown and should not pass to our error handler. Error handler should catch all the errors in the chain and callback is not essentially part of that chain.

Why it happens so? Because catch is the last operator in the chain and it handles everything which is above. Including what is inside onEach and onCompletion operators.

This is the difference which is needed to keep in mind. If anyone would be able to create an identical implementation of subscribe method, feel free to post it in comments.

UPD: retry

As was pointed by feedback I missed in this article operators for retry logic. Basically both RxJava and Kotlin Flow has built-in support for retry. For example to retry stream twice (if there was error) one need to use “retry(2)” operator.

Conclusion

Following this article, I hope we now have a better understanding of error handling in RxJava and Kotlin Flow. RxJava has a lot of operators, many of them are really useful (and are missing in Kotlin Flow right now).

We can write our own implementations, but they stick to be error-prone and platform-specific. Say that flatMapConcatDelayError operator: it uses platform-specific thread-safe list, which might work incorrectly with coroutines (which can jump over threads) — mean that mixing concepts might lead to weird issues. Also, the whole implementation might not be that refined, contain races or whatever.

It is possible to do everything we can do in RxJava with Kotlin Flow and hopefully, additional operators will be added to Kotlin Flow as well.

If anyone wants to play with the setup one can find in this gist. Feel free to post comments and share your examples or ideas on the important things in error handling in RxJava and Kotlin Flow.

Happy coding!