Buffer with openning and clossing boundaries in rxjava

I am trying to work with a buffer with openning and clossing boundaries in rxjava, but I am having trouble getting it to work. What I would like to do is assuming an observable emits, 0, 1, 2, 3, 0 , 1, 3, 0, 3 I would like to end up with {0, 1, 2, 3}, {0, 1, 3}, {0, 3}.

This is the code I have so far:

PublishSubject openning = PublishSubject.create();
        openning.doOnNext(new Consumer() {
            @Override
            public void accept(@NonNull Object o) throws Exception {
                if(o.equals("0"))
                    openning.onNext(o);
            }
        });

        Observable<String> observableA = Observable.interval(1, TimeUnit.SECONDS).map(value -> String.valueOf(value  % 10));

        observableA.subscribe(openning);
        // TODO: Buffer by boundary
        observableA = observableA.buffer(openning, new Function<String, Observable<List<String>>>() {
            @Override
            public Observable<List<String>> apply(@NonNull String o) throws Exception {
                list.add(o);
                if (o.equals("0")) {
                    return Observable.just(list);
                } else {
                    list.add(o);
                    sb.append(o);
                    return Observable.never();
                }
            }
        }, new Callable() {
            @Override
            public Object call() throws Exception {
                return list;
            }
        });

Any help will be appreciated

1 answer

  • answered 2017-08-12 22:52 akarnokd

    If you mean that the buffer should restart after encountering 3, look at the bufferUntil operator in the extensions project of RxJava 2.

    compile "com.github.akarnokd:rxjava2-extensions:0.17.5"
    
    Flowable.just("1", "2", "#", "3", "#", "4", "#")
    .compose(FlowableTransformers.bufferUntil(v -> "#".equals(v)))
    .test()
    .assertResult(
        Arrays.asList("1", "2", "#"),
        Arrays.asList("3", "#"),
        Arrays.asList("4", "#")
    );
    

    For 1.x, there is the rxjava-extras with Transformers.toListUntil().