Создание наблюдаемого без использования Observable.create

Я использую RxJava в своем приложении для Android, и я хочу загрузить данные из базы данных.

Таким образом, я создаю новый Observable, используя Observable.create() который возвращает список EventLog

 public Observable<List<EventLog>> loadEventLogs() { return Observable.create(new Observable.OnSubscribe<List<EventLog>>() { @Override public void call(Subscriber<? super List<EventLog>> subscriber) { List<DBEventLog> logs = new Select().from(DBEventLog.class).execute(); List<EventLog> eventLogs = new ArrayList<>(logs.size()); for (int i = 0; i < logs.size(); i++) { eventLogs.add(new EventLog(logs.get(i))); } subscriber.onNext(eventLogs); } }); } 

Хотя он работает правильно, я читал, что использование Observable.create() самом деле не является лучшей практикой для Rx Java (см. Здесь ).

Таким образом, я изменил этот метод таким образом.

 public Observable<List<EventLog>> loadEventLogs() { return Observable.fromCallable(new Func0<List<EventLog>>() { @Override public List<EventLog> call() { List<DBEventLog> logs = new Select().from(DBEventLog.class).execute(); List<EventLog> eventLogs = new ArrayList<>(logs.size()); for (int i = 0; i < logs.size(); i++) { eventLogs.add(new EventLog(logs.get(i))); } return eventLogs; } }); } 

Это лучший подход с использованием Rx Java? Зачем? В чем разница между этими двумя методами?

Более того, поскольку база данных загружает список элементов, имеет смысл испускать весь список сразу? Или я должен испускать один элемент за раз?

Solutions Collecting From Web of "Создание наблюдаемого без использования Observable.create"

Эти два метода могут выглядеть похожими и вести себя одинаково, но fromCallable имеет дело с трудностями противодавления для вас, тогда как в версии для create нет. Работа с противодавлением внутри реализации OnSubscribe варьируется от простого до прямого разума; Однако, если они опущены, вы можете получить MissingBackpressureException с асинхронными границами (например, observeOn ) или даже на границах продолжения (например, concat ).

RxJava пытается обеспечить надлежащую поддержку противодавления как можно большему количеству фабрик и операторов, однако есть немало заводов и операторов, которые не могут его поддерживать.

Вторая проблема с ручной реализацией OnSubscribe – отсутствие поддержки отмены, особенно если вы генерируете много вызовов onNext . Многие из них могут быть заменены стандартными заводскими методами (такими как) или вспомогательными классами (такими как SyncOnSubscribe ), которые SyncOnSubscribe со всей сложностью для вас.

Вы можете найти множество примеров и примеров, которые (по-прежнему) используют create по двум причинам.

  1. Гораздо проще внедрить push-based потоки данных, показывая, как толчок событий работает императивно. На мой взгляд, такие источники тратят слишком много времени, create пропорционально, вместо того, чтобы говорить о стандартных заводских методах и показывая, как можно безопасно достичь определенных общих задач (таких как ваши).
  2. Многие из этих примеров были созданы за время, когда RxJava не требовал поддержки противодавления или даже надлежащей поддержки синхронной отмены или просто был перенесен из примеров Rx.NET (который до настоящего времени не поддерживает противодавление и синхронное отключение, как-то, любезно предоставлено C # I Думаю.) Генерация значений путем вызова onNext была беззаботной. Однако такое использование приводит к раздуванию буфера и чрезмерному использованию памяти, поэтому команда Netflix придумала способ ограничить использование памяти, потребовав от наблюдателей указать, сколько элементов они готовы продолжить. Это стало известно как противодавление.

Во втором вопросе, а именно, следует ли создавать список или последовательность значений, это зависит от вашего источника. Если ваш источник поддерживает какую-либо итерацию или потоки отдельных элементов данных (например, JDBC), вы можете просто подключиться к нему и испускать один за другим (см. SyncOnSubscribe ). Если он не поддерживает его, или вам это нужно в форме списка, то сохраните его как есть. Вы можете всегда конвертировать между двумя формами через toList и flatMapIterable если необходимо.

Как объясняется в ответе, который вы связали, с Observable.create вам может потребоваться нарушить расширенные требования RxJava.

Например, вам нужно будет реализовать противодавление или как отказаться от подписки.

В этом случае вы хотите испустить элемент, не имея дело с противодавлением или подпиской. Поэтому Observable.fromCallable – хороший вызов. RxJava будет иметь дело с остальными.