RxJava的简单使用(一)

news/2024/7/4 1:41:12

一测试订阅

@Test
public void testSubscribe() {
    //观察者/订阅者
    final Subscriber<String> subscriber =
            new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted in tread:" +
                            Thread.currentThread().getName());

                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("onError in tread:" +
                            Thread.currentThread().getName());
                    e.printStackTrace();
                }

                @Override
                public void onNext(String s) {
                    System.out.println("onNext in tread:" +
                            Thread.currentThread().getName());
                    System.out.println(s);
                }
            };

    //被观察者
    Observable observable = Observable.create(
            new Observable.OnSubscribe<Subscriber>() {
                @Override
                public void call(Subscriber subscriber1) {
                    // 发生事件
                    System.out.println("call in tread:" + Thread.currentThread().getName());
                    subscriber1.onStart();
                    subscriber1.onError(new Exception("error"));
                    subscriber1.onNext("hello world");
                    subscriber1.onCompleted();
                }
            });

    //订阅
    observable.subscribe(subscriber);

}
二、测试线程

@Test
public void testScheduler() {

    //观察者/订阅者
    final Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted in tread:" +
                    Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError in tread:" +
                    Thread.currentThread().getName());
            e.printStackTrace();
        }

        @Override
        public void onNext(String s) {
            System.out.println("onNext in tread:" +
                    Thread.currentThread().getName());
            System.out.println(s);
        }
    };

    //被观察者
    Observable observable = Observable.create(
            new Observable.OnSubscribe<Subscriber>() {
                @Override
                public void call(Subscriber subscriber1) {
                    // 发生事件
                    System.out.println("call in tread:" +
                            Thread.currentThread().getName());
                    subscriber1.onStart();
                    subscriber1.onNext("hello world");
                    subscriber1.onCompleted();

                }
            });

    //订阅
    observable.subscribeOn(Schedulers.io())  // 指定生产事件在当前 线程中进行
            .observeOn(Schedulers.newThread()) //  指定消费事件在新线程中进行
            .subscribe(subscriber);


}
三、测试节点

  @Test
    public void testMap() {

        String name = "yijia";
        Observable.just(name)
                .subscribeOn(Schedulers.newThread()) // 指定下一个生成节点在新线程中处理
                .map(new Func1<String, User>() {
                    @Override
                    public User call(String name) {
                        User user = new User();
                        user.setName(name);
                        System.out.println("process User call in tread:"
                                + Thread.currentThread().getName());
                        return user;
                    }
                })

                .subscribeOn(Schedulers.newThread()) // 指定下一个生产节点在新线程中处理
                .map(new Func1<User, Object>() {
                    @Override
                    public Object call(User user) {
                        // 如果需要,我们在这里还可以对 User 进行加工
                        System.out.println("process User call in tread:"
                                + Thread.currentThread().getName());
                        return user;
                    }
                })

                .observeOn(Schedulers.newThread()) // 指定消费节点在新线程中处理
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object data) {

                        System.out.println("receive User call in tread:"
                                + Thread.currentThread().getName());
                    }
                });
    }
    public static class User {
        String name;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

}

四、rxjava在android中的使用

@RunWith(AndroidJUnit4.class)
public class TestRxJavaInAndroid {
    @Test
    public void testMapInAndroid(){
        Observable.just("yijia")
                .subscribeOn(Schedulers.io())// 指定下一个产生的线程节点在 IO 线程中处理
                .map(new Func1<String, User>() {
                    @Override
                    public User call(String name) {
                        User user = new User();
                        user.setName(name);
                        System.out.println("process User call in tread:" +
                                Thread.currentThread().getName());
                        return user;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object s) {
                        System.out.println("receive User call in tread:"
                                + Thread.currentThread().getName());
                    }
                });
                
    }
    public static class User {
        String name;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}


http://www.niftyadmin.cn/n/3649291.html

相关文章

[EntLib]关于SR.Strings的使用办法

编写者&#xff1a;郑昀UltraPower下载附件。安装String Resource Generator 1[1].2.5&#xff0c;运行SRGenerator.msi。然后给自己的工程中添加SR.strings文件&#xff0c;通过VS.NET在现有的.RESX或SR.strings文件设置Custom tool属性为&#xff1a;StringResourceTool或SRC…

android开源项目和框架

特效&#xff1a; http://www.theultimateandroidlibrary.com/ 常用效果&#xff1a; 1. https://github.com/novoda/ImageLoader 异步加载图片&#xff0c;缓存&#xff0c;生成缩略图&#xff0c; 基本上每个应用都会需要这个lib。 android-query框架 2. https://githu…

prisma 风格设置_Prisma中的身份验证-第1部分:设置

prisma 风格设置Unless if you’re using something like Firebase to handle your authentication, it can be a bit tricky to handle it in a way that is both secure and easy to manage. In this three-part series, we’re going to be going over how to setup your Gr…

RxBus-mvp模式下对Rxjav的封装(一)

一、首先定义一个Presenter接口&#xff1a;DataBusSubscriber 用来接受数据 public interface DataBusSubscriber {void onEvent(Object data); }二、定义一个RxBus的封装类 public class RxBus {public static final String TAG "RxBus";private static volatile…

VS 2005 Team Suite 轻松搞定白盒测试

注&#xff1a;此文的Word版本首次发表于&#xff1a; http://bbs.5etesting.com/viewthread.php?tid18&highlight%B0%D7%BA%D0%B2%E2%CA%D4 VS 2005 Team Suite轻松搞定白盒测试 &#xff08;此文已于《测试天地》杂志发表&#xff0c;如需转载&#xff0c;请与作者联系…

[推荐]dotNET中进程间同步/通信的经典框架

推荐一篇关于dotNET中常用的进程间同步或通信的框架文章&#xff1a;A C# Framework for Interprocess Synchronization and CommunicationBy Christoph Ruegg How to share resources and implement a rich message/data passing architecture between threads and processes …

Android 开发之RxJava 详解

我从去年开始使用 RxJava &#xff0c;到现在一年多了。今年加入了 Flipboard 后&#xff0c;看到 Flipboard 的 Android 项目也在使用 RxJava &#xff0c;并且使用的场景越来越多 。而最近这几个月&#xff0c;我也发现国内越来越多的人开始提及 RxJava 。有人说『RxJava 真是…

RxBus-mvp模式下对Rxjav的封装(二)

一、自定义注解&#xff0c;用于标记观察者模式 Target(ElementType.METHOD) Retention(RetentionPolicy.RUNTIME) Documented public interface RegisterBus {} 二、定义一个RxBus的类 public class RxBus {public static final String TAG "RxBus";private stati…