RxJava2学习之二:RxJava2 基本使用

RxJava简介

RxJava 是一个在Java虚拟机上实现的响应式扩展库:提供了基于Observable序列实现的异步调用及基于事件编程。它扩展了观察者模式,支持数据、事件序列并允许你合并序列,无需关心底层的线程处理、同步、线程安全、并发数据结构和非阻塞I/O处理。

RxJava 是一个响应式编程框架,采用观察者设计模式,自然包含Observable和Subscriber。

简言之,RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

RxAndroid是RxJava的一个针对Android平台的扩展。它包含了一些能够简化Android开发的工具 AndroidObservable。

  • RxJava

RxJava:https://github.com/ReactiveX/RxJava
RxAndroid:https://github.com/ReactiveX/RxAndroid

  • RxJava2

RxJava2:https://github.com/ReactiveX/RxJava/tree/2.x
RxAndroid2:https://github.com/ReactiveX/RxAndroid/tree/2.x

使用RxJava和RxAndroid

  • RxJava
compile "io.reactivex.rxjava2:rxjava:2.x.y"
  • RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.x.y'

基本使用3部曲

创建Observable

private Observable<String> getObservable() {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("工作");
            e.onNext("开会");
            e.onComplete();
            //e.onError(new Throwable);
        }
    });
}

创建Observer

private Observer<String> getObserver() {
    return new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            //d.dispose();  //移除订阅关系
            //d.isDisposed();  //判断是否发生订阅关系
            Log.i(TAG, "onSubscribe: " + d.toString());
        }

        @Override
        public void onNext(String value) {
            Log.i(TAG, "onNext: " + value);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: " + e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete: ");
        }
    };
}

订阅

public void hello(View view) {
    //1. 创建Observable
    Observable<String> observable = getObservable();
    //2. 创建Observer
    Observer<String> observer = getObserver();
    //3. 订阅
    observable.subscribe(observer);
}

简洁写法

Observable创建的简洁写法

使用Observable.just创建Observable

//使用Observable.just创建Observable
private Observable<String> getObservableWithJust() {
    return Observable.just("工作", "开会", "大保健");
}

使用Observable.fromArray创建Observable

//使用Observable.fromArray创建Observable
private Observable<String> getObservableWithFromArray() {
    return Observable.fromArray("工作", "开会", "大保健");
}

使用Observable.fromCallable创建Observable

//使用Observable.fromCallable创建Observable
private Observable<String> getObservableWithFromCallable() {
    return Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "大保健";
        }
    });
}

Observable订阅及Observer创建的简洁写法

observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, "accept: " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {

    }
}, new Action() {
    @Override
    public void run() throws Exception {

    }
});

Disposable

Disposable 相当于RxJava1.x中的Subscription,用于解除订阅

private Observer<String> getDisposableObserver() {
    return new Observer<String>() {
        private Disposable d;

        @Override
        public void onSubscribe(Disposable d) {
            this.d = d;
            Log.i(TAG, "onSubscribe: " + d.toString());
        }

        @Override
        public void onNext(String value) {
            Log.i(TAG, "onNext: " + value);
            if ("开会".equals(value)) {
                d.dispose();  //移除订阅关系
            }
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: " + e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete: ");
        }
    };
}

版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/02/25/rxjava2-learning-2-basic-usage-of-rxjava2/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
海报
RxJava2学习之二:RxJava2 基本使用
RxJava简介 RxJava 是一个在Java虚拟机上实现的响应式扩展库:提供了基于Observable序列实现的异步调用及基于事件编程。它扩展了观察者模式,支持数据、事件序……
<<上一篇
下一篇>>
文章目录
关闭
目 录