Как выполнить flatMap в фоновом потоке

Я использую Retrofit и RxJava для выполнения некоторых фоновых задач. Код выглядит следующим образом:

public class MyLoader{ public Observable<MyData> getMyData(){ return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() { @Override public Observable<MyData> call(MyHelper myHelper) { return queryData(myHelper); } }); } private Observable<MyData> queryData(MyHelper myHelper){ ... } private Observable<MyHelper> setupHelper(){ return Observable.create(new Observable.OnSubscribe<MyHelper>() { @Override public void call(final Subscriber<? super MyHelper> subscriber) { try{ MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data subscriber.onNext(helper); subscriber.onCompleted(); }catch(RetrofitError e){ subscriber.onError(e) } } } } } 

Это не выполняется с RetrofitError из-за исключения NetworkOnMainThread в этой строке:

  MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data 

Подписывание моего наблюдаемого:

 myLoader.getMyData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<MyData>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(MyData inventory) { } }); 

Согласно документации Rx flatMap не работает ни на одном фоне. Мой вопрос заключается в том, как обеспечить, чтобы весь getMyData() в фоновом режиме.

Solutions Collecting From Web of "Как выполнить flatMap в фоновом потоке"

Я просто добавляю observeOn(Schedulers.newThread()) до flatMap и он работает!

Это перемещает только один шаг в конвейере в фоновый поток:

 Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val)); 

Первоначально ответил здесь: https://stackoverflow.com/a/35429084/2908525

Есть хороший шанс, когда вы создаете объект MyLoader в основном потоке, который должен выполняться Observable.create (или, возможно, где-то еще в вашем коде (?)). Если это так, то .subscribeOn(Schedulers.io()) не повлияет на изменение потока.

Вы можете попробовать обернуть .create () с помощью .defer() чтобы убедиться, что Observable создан только тогда, когда он подписан.

Например defer(() -> create(....))