How to chain multiple observables with RxJava?

I am trying to perform some operatations when a activity onDestroy is called. I want start a Observable with a ID, then retrieve some data from Realm and execute a HTTP request to a backend based on the retrieved data and afterwards store the retrieved data to the row given with the starting id.

Summary:

  1. Retrieve data from database with id
  2. Use data to perform a request to backend
  3. Store retrieved data to row with the id from step 1

Graphical:

expected flow

Code: what I ended up with and got stuck

Observable.just(id)
        .observeOn(Schedulers.io())
        .map(new Function<String, Person>() {
            @Override
            public Person apply(@NonNull String id) throws Exception {
                Realm realm = Realm.getDefaultInstance();

                Person person = realm.copyFromRealm(realm.where(Person.class).equalTo("id", id).findFirst());

                realm.close();

                return person;
            }
        })
        .switchMap(new Function<Person, Observable<Directions>>() {
            @Override
            public Observable<Directions> apply(@NonNull Person person) throws Exception {
                return Utils.getRemoteService().getDirections(person.getAddress()); // retrofit
            }
        })
        .map(new Function<Directions, Object>() {
            @Override
            public Object apply(@NonNull Directions directions) throws Exception {

                // how do I get the id here to store the data to the correct person

                return null;
            }
        })
        .subscribe();

Note:

  • POJO's are fictive
  • It's my first time using RxJava

1 answer

  • answered 2017-06-17 18:05 MatBos

    The information has to be passed down the stream, it can be done like below. It would be much more readable when you wrap it in a class rather than Pair.

     Observable.just(id)
                .observeOn(Schedulers.io())
                .map(new Function<String, Person>() {
                    @Override
                    public Person apply(@NonNull String id) throws Exception {
                        Realm realm = Realm.getDefaultInstance();
    
                        Person person = realm.copyFromRealm(realm.where(Person.class).equalTo("id", id).findFirst());
    
                        realm.close();
    
                        return person;
                    }
                })
                .switchMap(new Function<Person, Observable<Directions>>() {
                    @Override
                    public Observable<Directions> apply(@NonNull Pair<String, Person> pair) throws Exception {
                        // assuming that id is available by getId
                        return Pair(person.getId(), Utils.getRemoteService().getDirections(person.getAddress())); // retrofit
                    }
                })
                .map(new Function<Pair<String, Directions>, Object>() {
                    @Override
                    public Object apply(@NonNull Pair<String, Directions> pair) throws Exception {
    
                        // how do I get the id here to store the data to the correct person
                        // pair.first contains the id
                        // pair.second contains the Directions
                        return null;
                    }
                })
                .subscribe();