多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > 綜合技術 > RxJava Observer與Subscriber的關系

RxJava Observer與Subscriber的關系

來源:程序員人生   發布時間:2016-06-21 08:56:32 閱讀次數:2526次

在說Observer與Subscriber的關系之前,我們下重溫下相干概念。

RxJava 的視察者模式

RxJava 有4個基本概念:Observable (可視察者,即被視察者)、 Observer (視察者)、 subscribe (定閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現定閱關系,從而 Observable 可以在需要的時候發失事件來通知 Observer。

與傳統視察者模式不同, RxJava 的事件回調方法除普通事件 onNext() (相當于 onClick() / onEvent())以外,還定義了兩個特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件隊列完結。RxJava 不但把每一個事件單獨處理,還會把它們看作1個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標志。
  • onError(): 事件隊列異常。在事件處理進程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
  • 在1個正確運行的事件序列中, onCompleted() 和 onError() 有且只有1個,并且是事件序列中的最后1個。需要注意的是,onCompleted() 和 onError() 2者也是互斥的,即在隊列中調用了其中1個,就不應當再調用另外一個。

RxJava 的視察者模式大致以下圖:
這里寫圖片描述

RxJava的實現

基于以上的概念, RxJava 的基本實現主要有3點:

1) 創建 Observer

Observer 即視察者,它決定事件觸發的時候將有怎樣的行動。 RxJava 中的 Observer 接口的實現方式:

Observer<Apps> observer = new Observer<Apps>() { @Override public void onCompleted() { listView.onRefreshComplete(); } @Override public void onError(Throwable e) { listView.onRefreshComplete(); } @Override public void onNext(Apps appsList) { listView.onRefreshComplete(); appLists.addAll(appsList.apps); adapter.notifyDataSetChanged(); } };

除 Observer 接口以外,RxJava 還內置了1個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了1些擴大,但他們的基本使用方式是完全1樣的:

Subscriber subscriber = new Subscriber<Apps>() { @Override public void onCompleted() { listView.onRefreshComplete(); } @Override public void onError(Throwable e) { listView.onRefreshComplete(); } @Override public void onNext(Apps appsList) { listView.onRefreshComplete(); appLists.addAll(appsList.apps); adapter.notifyDataSetChanged(); } };

不但基本使用方式1樣,實質上,在 RxJava 的 subscribe 進程中,Observer 也總是會先被轉換成1個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全1樣的。

Subscriber是Observer的實現類

public abstract class Subscriber<T> implements Observer<T>, Subscription

而onStart()方法是Subscriber中的1個方法。它也屬于回調級別的。

subscribe(Subscriber)方法中有以下代碼:

// if not already wrapped 包裹1層 if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); }

他將subscriber包裝起來,這個具體甚么意思有待研究,繼續下看。

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber);

hook是甚么呢?

private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

RxJavaObservableExecutionHook.java源碼:

/** * Copyright 2014 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package rx.plugins; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observable.Operator; import rx.Subscriber; import rx.Subscription; import rx.functions.Func1; /** * Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} execution with a * default no-op implementation. * <p> * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: * <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>. * <p> * <b>Note on thread-safety and performance:</b> * <p> * A single implementation of this class will be used globally so methods on this class will be invoked * concurrently from multiple threads so all functionality must be thread-safe. * <p> * Methods are also invoked synchronously and will add to execution time of the observable so all behavior * should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate * worker threads. * */ public abstract class RxJavaObservableExecutionHook { /** * Invoked during the construction by {@link Observable#create(OnSubscribe)} * <p> * This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra * logging, metrics and other such things and pass-thru the function. * * @param f * original {@link OnSubscribe}<{@code T}> to be executed * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass-thru */ public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) { return f; } /** * Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed. * <p> * This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra * logging, metrics and other such things and pass-thru the function. * * @param onSubscribe * original {@link OnSubscribe}<{@code T}> to be executed * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass-thru */ public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) { // pass-thru by default return onSubscribe; } /** * Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with returned * {@link Subscription}. * <p> * This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging, * metrics and other such things and pass-thru the subscription. * * @param subscription * original {@link Subscription} * @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a * pass-thru */ public <T> Subscription onSubscribeReturn(Subscription subscription) { // pass-thru by default return subscription; } /** * Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown Throwable. * <p> * This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when * attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>. * * @param e * Throwable thrown by {@link Observable#subscribe(Subscriber)} * @return Throwable that can be decorated, replaced or just returned as a pass-thru */ public <T> Throwable onSubscribeError(Throwable e) { // pass-thru by default return e; } /** * Invoked just as the operator functions is called to bind two operations together into a new * {@link Observable} and the return value is used as the lifted function * <p> * This can be used to decorate or replace the {@link Operator} instance or just perform extra * logging, metrics and other such things and pass-thru the onSubscribe. * * @param lift * original {@link Operator}{@code <R, T>} * @return {@link Operator}{@code <R, T>} function that can be modified, decorated, replaced or just * returned as a pass-thru */ public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) { return lift; } }

RxJavaObservableExecutionHook類的作用很特殊,仿佛沒有甚么太大的作用,傳進去甚么(類型)參數,返回甚么(類型)參數。

以下代碼所示:

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) { return f; } public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) { // pass-thru by default return onSubscribe; }

至于最后關鍵的返回結果:

public <T> Subscription onSubscribeReturn(Subscription subscription) { // pass-thru by default return subscription; }

說白了,就是返回定閱的Observer對象。


Observer與Subscriber的區分

它們的區分對使用者來講主要有兩點:

  1. onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用于做1些準備工作,例如數據的清零或重置。這是1個可選方法,默許情況下它的實現為空。需要注意的是,如果對準備工作的線程有要求(例如彈出1個顯示進度的對話框,這必須在主線程履行), onStart() 就不適用了,由于它總是在 subscribe 所產生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可使用 doOnSubscribe() 方法,具體可以在后面的文中看到。
  2. unsubscribe(): 這是 Subscriber 所實現的另外一個接口 Subscription 的方法,用于取消定閱。在這個方法被調用后,Subscriber 將不再接收事件。1般在這個方法調用前,可使用 isUnsubscribed() 先判斷1下狀態。 unsubscribe() 這個方法很重要,由于在 subscribe() 以后, Observable 會持有 Subscriber 的援用,這個援用如果不能及時被釋放,將有內存泄漏的風險。所以最好保持1個原則:要在不再使用的時候盡快在適合的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來消除援用關系,以免內存泄漏的產生。

2) 創建 Observable

Observable 即被視察者,它決定甚么時候觸發事件和觸發怎樣的事件。 RxJava 使用 create() 方法來創建1個 Observable ,并為它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("John"); subscriber.onCompleted(); } });

可以看到,這里傳入了1個 OnSubscribe 對象作為參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的作用相當于1個計劃表,當 Observable 被定閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定順次觸發(對上面的代碼,就是視察者Subscriber 將會被調用兩次 onNext() 和1次 onCompleted())。這樣,由被視察者調用了視察者的回調方法,就實現了由被視察者向視察者的事件傳遞,即視察者模式。

create() 是 RxJava 最基本的創造事件序列的操作符。基于這個操作符, RxJava 還提供了1些方法用來快捷創建事件隊列,詳見RxJava操作符系列文章:http://blog.csdn.net/jdsjlzx/article/details/51485861

3) Subscribe (定閱)
創建了 Observable 和 Observer 以后,再用 subscribe() 方法將它們聯結起來,整條鏈子就能夠工作了。代碼情勢很簡單:

observable.subscribe(observer); // 或: observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心代碼):

public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); } }

可以看到,subscriber() 做了3件事:

  1. 調用 Subscriber.onStart() 。這個方法在前面已介紹過,是1個可選的準備方法。
  2. 調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發送的邏輯開始運行。從這也能夠看出,在 RxJava 中, Observable 其實不是在創建的時候就立即開始發送事件,而是在它被定閱的時候,即當 subscribe() 方法履行的時候。
  3. 將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().

全部進程中對象間的關系以下圖:
這里寫圖片描述

除 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完全定義的回調,RxJava 會自動根據定義創建出 Subscriber 。情勢以下:

Action1<String> onNextAction = new Action1<String>() { // onNext() @Override public void call(String s) { Log.d(tag, s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { // onError() @Override public void call(Throwable throwable) { // Error handling } }; Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { Log.d(tag, "completed"); } }; // 自動創建 Subscriber ,并使用 onNextAction 來定義 onNext() observable.subscribe(onNextAction); // 自動創建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自動創建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 進程中終究會被轉換成 Subscriber 對象,因此,從某種程度上來講用 Subscriber 來代替 Observer ,這樣會更加嚴謹。

根據目前的經驗來看,Observer與Subscriber的主要區分在于onNext方法履行終了后是不是取消了定閱。

首先,我們分別定義mSubscriber 和 mObserver 。

以下代碼:

protected Subscriber<D> mSubscriber = new Subscriber<D>() { @Override public void onCompleted() { executeOnLoadFinish(); } @Override public void onError(Throwable e) { TLog.error("onError " + e.toString()); executeOnLoadDataError(null); } @Override public void onNext(D d) { TLog.log("onNext " ); List<T> list = d; TLog.log("entity " + list.size()); executeOnLoadDataSuccess(list); TLog.log("onSuccess totalPage " + totalPage); } }; protected Observer<D> mObserver = new Observer<D>() { @Override public void onCompleted() { executeOnLoadFinish(); } @Override public void onError(Throwable e) { TLog.error("onError " + e.toString()); executeOnLoadDataError(null); } @Override public void onNext(D d) { TLog.log("onNext " ); List<T> list = d; TLog.log("entity " + list.size()); executeOnLoadDataSuccess(list); TLog.log("onSuccess totalPage " + totalPage); } };
observable.subscribeOn(Schedulers.io()) .map(new Func1<Response<D>,D>() { @Override public D call(Response<D> response) { if(response == null){ throw new ApiException(100); } totalPage = response.total; return response.result; } }) .observeOn(AndroidSchedulers.mainThread()) //.subscribe(mObserver); .subscribe(mSubscriber);

subscribe(mObserver)和subscribe(mSubscriber)履行結果就會有區分:

  • subscribe(mSubscriber)這類定閱方式在第2次要求數據時就不會履行了,緣由就是onCompleted后自動取消了定閱
    (詳見文章:http://blog.csdn.net/jdsjlzx/article/details/51542003);
  • subscribe(mObserver)則不出現此問題。

提示:個人以為subscribe(mObserver)這個方式更合適分頁加載。

請注意,如果你每次都使用subscribe(new Subscriber< T>() {})方式實現定閱,就不會出現上面的問題。

以下代碼:

private void toSubscribe(Observable<Response<D>> observable) { observable.subscribeOn(Schedulers.io()) .map(new Func1<Response<D>,D>() { @Override public D call(Response<D> response) { if(response == null){ throw new ApiException(100); } totalPage = response.total; return response.result; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<D>() { @Override public void onCompleted() { executeOnLoadFinish(); } @Override public void onError(Throwable e) { TLog.error("onError " + e.toString()); executeOnLoadDataError(null); } @Override public void onNext(D d) { TLog.log("onNext " ); List<T> list = d; TLog.log("entity " + list.size()); executeOnLoadDataSuccess(list); TLog.log("onSuccess totalPage " + totalPage); } }); }

固然,這個方式實現分頁加載也是可以的。至于哪一個更好,還需要再驗證。

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 久久久久久久久久久大尺度免费视频 | 亚洲精品亚洲人成人网 | yellow中文字幕久久网 | 欧美成人一区亚洲一区 | 亚洲视频黄 | 亚洲第一精品夜夜躁人人爽 | 中文字幕成人 | 另类二区 | 国产最新精品视频 | 456亚洲人成影院在线观 | 国产一区二区免费不卡在线播放 | 精品视频一区二区三区四区五区 | 91九色最新地址 | 国产精品区一区二区三 | 最近高清中文国语视频 | 国产精品久久久精品三级 | 视频一区二区不卡 | 国产高清av在线播放 | 国产精品一国产精品免费 | 91精品国产91热久久p | 日本动漫片b站免费观看 | 中文精品视频一区二区在线观看 | 欧美xart系列高清在线视频 | 波多野结衣视频在线播放 | 中文字幕在线观看 | 免费在线看h | h视频在线观看网站 | 国产福利一区二区 | 国产xxxxx| 亚洲成年网站 | 欧美一级片手机在线观看 | 亚拍精品一区二区三区 | 欧美俄罗斯一级毛片 | 伊人网站 | 中文字幕一二三区乱码老 | 色费女人18毛片a级视频在线 | 老外一级毛片免费看 | 中文字幕日韩欧美一区二区三区 | 一级毛片在线 | 国产精品亚洲一区二区三区 | 欧美一区二区三区不卡免费观看 |