RxJava Observable.cache недействителен

Я пытаюсь изучить rxjava в среде Android. Допустим, у меня есть наблюдаемое, которое испускает результат сетевого вызова. Если я правильно понял, широко распространенный подход к решению изменений конфигурации заключается в следующем:

Сделав это, мы не потеряем результат наблюдаемого, который будет повторно наблюдаться после того, как будет выполнена новая конфигурация.

Теперь, мой вопрос:

Есть ли способ заставить наблюдаемого испускать новое значение (и аннулировать кешированный)? Нужно ли создавать новые наблюдаемые каждый раз, когда мне нужны свежие данные из сети (что не похоже на плохую практику в мире Android, потому что это сделает gc дополнительной работой)?

Большое спасибо,

Federico

Solutions Collecting From Web of "RxJava Observable.cache недействителен"

Создайте пользовательскую реализацию OnSubscribe которая делает то, что вы хотите:

 public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> { private final AtomicBoolean refresh = new AtomicBoolean(true); private final Observable<T> source; private volatile Observable<T> current; public OnSubscribeRefreshingCache(Observable<T> source) { this.source = source; this.current = source; } public void reset() { refresh.set(true); } @Override public void call(Subscriber<? super T> subscriber) { if (refresh.compareAndSet(true, false)) { current = source.cache(); } current.unsafeSubscribe(subscriber); } } 

Этот бит кода демонстрирует использование и показывает, что кеш по существу сбрасывается:

 Observable<Integer> o = Observable.just(1) .doOnCompleted(() -> System.out.println("completed")); OnSubscribeRefreshingCache<Integer> cacher = new OnSubscribeRefreshingCache<Integer>(o); Observable<Integer> o2 = Observable.create(cacher); o2.subscribe(System.out::println); o2.subscribe(System.out::println); cacher.reset(); o2.subscribe(System.out::println); 

Вывод:

 completed 1 1 completed 1 

Кстати, вы можете заметить, что .cache не испускает до завершения. Это ошибка, которая должна быть исправлена ​​rxjava 1.0.14.

Что касается ваших проблем с давлением в ГК, каждый оператор, применяемый к Наблюдаемому, создает новый Observable, как правило, посредством lift или create . Состояние базового элемента, связанное с созданием нового Observable, является ссылкой на функцию onSubscribe . cache отличается от большинства тем, что он поддерживает состояние через подписки, и это имеет потенциал для давления в ГК, если он содержит много состояний и часто отбрасывается. Даже если вы использовали одну и ту же измененную структуру данных для хранения состояния при сбрасывании, GC все равно придется обрабатывать содержимое структуры данных при ее очистке, чтобы вы не могли получить много.

Оператор cache RxJava построен для нескольких параллельных подписей. Вероятно, вы можете представить себе, что функция сброса может оказаться проблематичной для реализации. Во что бы то ни стало поднимайте вопрос о Github RxJava, если вы хотите исследовать его дальше.