Цепь двух модифицированных наблюдаемых ж / RxJava

Я хочу выполнить два сетевых вызова один за другим. Оба сетевых вызова возвращаются Observable. Второй вызов использует данные из успешного результата первого вызова, метод в успешном результате второго вызова использует данные как из успешного результата первого, так и для второго вызова. Также я должен иметь возможность обрабатывать как onError «события» по-разному. Как я могу достичь этого, избегая обратного ад, как в примере ниже:

API().auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<AuthResponse>() { @Override public void call(final AuthResponse authResponse) { API().getUser(authResponse.getAccessToken()) .subscribe(new Action1<List<User>>() { @Override public void call(List<User> users) { doSomething(authResponse, users); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { onErrorGetUser(); } }); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { onErrorAuth(); } }); 

Я знаю про zip, но я хочу избежать создания класса Combiner.

Обновление 1. Пытался выполнить ответ akarnokd:

  API() .auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(authResponse -> API() .getUser(authResponse.getAccessToken()) .doOnError(throwable -> { getView().setError(processFail(throwable)); }), ((authResponse, users) -> { // Ensure returned user is the which was authenticated if (authResponse.getUserId().equals(users.get(0).getId())) { SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0)); getView().toNews(); } else { getView().setError(R.string.something_went_wrong); } })); 

Однако внутри flatMap метода flatMap говорится, что он не может разрешать методы authResponse и пользователей ( authResponse.getAccessToken() , users.get(0) т. Д.). Im новое для программирования rx и lambdas – пожалуйста, скажите мне, в чем проблема. В любом случае код выглядит намного чище.

Обновление 2.

 API() .auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> getView().setError(processFail(throwable))) .flatMap((AuthResponse authResponse) -> API() .getUser(authResponse.getAccessToken()) .doOnError(throwable -> getView().setError(processFail(throwable))), ((AuthResponse authResponse, List<User> users) -> { // Ensure returned user is the which was authenticated if (authResponse.getUserId().equals(users.get(0).getId())) { SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0)); getView().toNews(); } return Observable.just(this); })); 

Сделали это так, но теперь мои сетевые вызовы вообще не выполняются.

Solutions Collecting From Web of "Цепь двух модифицированных наблюдаемых ж / RxJava"

В дополнение к ответу Энтони Р., существует перегрузка flatMap, которая принимает Func2 и соединяет ваши первичные и сплющенные значения для вас. Кроме того, посмотрите на операторы onErrorXXX и onExceptionXXX для обработки ошибок и соедините их с помощью ваших первых и вторых наблюдаемых

 first.onErrorReturn(1) .flatMap(v -> service(v).onErrorReturn(2), (a, b) -> a + b); 

Вы заглянули в flatMap ()? Если ваше отвращение к нему (или zip ()) – это необходимость сделать ненужный класс просто для хранения двух объектов, android.util.Pair может быть ответом. Я не уверен, как получить именно ту обработку ошибок, которую вы ищете.

  API().auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1<AuthResponse, Observable<List<User>>>() { @Override public Observable<List<User>> call(AuthResponse authResponse) { return API().getUser(authResponse.getAccessToken()); } }, new Func2<AuthResponse, List<User>, Pair<AuthResponse, List<User>>>() { @Override public Pair<AuthResponse, List<User>> call(AuthResponse authResponse, List<User> users) { return new Pair<>(authResponse, users); } }).subscribe(new Action1<Pair<AuthResponse, List<User>>>() { @Override public void call(Pair<AuthResponse, List<User>> pair) { doSomething(pair.first, pair.second); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { // not sure how to tell which one threw the error } });