throw multiple errors in reactor 3

    Flux.create(sink -> {
        sink.error(new IOException());
        sink.error(new IOException());
    }).subscribe(System.out::println, System.out::println, System.out::println);

I thought (wrongly) that after the first error, the flux will be disposing and not listen to any more signals but the above code creates an exception:
Exception in thread "main" reactor.core.Exceptions$BubblingException:
    at reactor.core.Exceptions.bubble(
    at reactor.core.publisher.Operators.onErrorDropped(
    at reactor.core.publisher.FluxCreate$SerializedSink.error(
    at main.App.lambda$f1$0(
    at reactor.core.publisher.FluxCreate.subscribe(
    at reactor.core.publisher.Flux.subscribe(
    at reactor.core.publisher.Flux.subscribeWith(
    at reactor.core.publisher.Flux.subscribe(
    at reactor.core.publisher.Flux.subscribe(
    at main.App.f1(
    at main.App.main(
Caused by:
    ... 8 more

Why I got this exception?

1 answer

  • answered 2018-01-11 19:44 Simon Baslé

    As per reactive streams it is illegal to signal onError() twice, and when the create operator receives the second exception, it drops it. There is a hook that can be used to capture such cases and do something meaningful with it, but in your case the default implementation kicks in, which is simply to wrap the extraneous exception and throw it, or "bubble" it up, to the caller.