Skip to content

Commit a8e84b1

Browse files
committed
16 . Flowable的使用,涉及到背压策略(BackPressure),响应式拉取。
1 parent c58fece commit a8e84b1

File tree

4 files changed

+287
-11
lines changed

4 files changed

+287
-11
lines changed

learn-sample1/build.gradle

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ apply plugin: 'com.android.application'
33
android {
44
compileSdkVersion 28
55

6-
7-
86
defaultConfig {
97
applicationId "com.liuh.learn.sample1"
108
minSdkVersion 17
@@ -33,4 +31,10 @@ dependencies {
3331
testImplementation 'junit:junit:4.12'
3432
androidTestImplementation 'com.android.support.test:runner:1.0.2'
3533
androidTestImplementation 'com.android.support.test.espresso:espresso-core:3.0.2'
34+
35+
implementation 'com.jakewharton:butterknife:8.8.1'
36+
annotationProcessor 'com.jakewharton:butterknife-compiler:8.8.1'
37+
38+
implementation "io.reactivex.rxjava2:rxjava:2.1.6"
39+
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
3640
}

learn-sample1/readme.md

+4
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,9 @@
33
作者用水管来模拟观察者和被观察者,其中上游和下游分别对应着RxJava中的Observable和Observer,它们之间的连接就对应着subscribe()方法。
44

55
![水管图](/assets/images/1.webp)
6+
### 2 . RxJava的线程控制
7+
8+
9+
610

711

learn-sample1/src/main/java/com/liuh/learn/sample1/MainActivity.java

+252
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,264 @@
22

33
import android.support.v7.app.AppCompatActivity;
44
import android.os.Bundle;
5+
import android.util.Log;
6+
import android.view.View;
57

8+
import org.reactivestreams.Subscriber;
9+
import org.reactivestreams.Subscription;
10+
11+
import butterknife.ButterKnife;
12+
import butterknife.OnClick;
13+
import io.reactivex.BackpressureStrategy;
14+
import io.reactivex.Flowable;
15+
import io.reactivex.FlowableEmitter;
16+
import io.reactivex.FlowableOnSubscribe;
17+
import io.reactivex.android.schedulers.AndroidSchedulers;
18+
import io.reactivex.schedulers.Schedulers;
19+
20+
/**
21+
* RxJava 中 Flowable 的使用,涉及到背压策略(BackPressure),响应式拉取.
22+
* <p>
23+
* 在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, <br>
24+
* 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.
25+
* <p>
26+
* MissingBackpressureException 异常.
27+
* <p>
28+
* Observable<----------subscribe------->Observer(Consumer)<br>
29+
* <p>
30+
* Flowable<----------subscribe------->Subscriber
31+
* <p>
32+
* BackpressureStrateg有五种策略:MISSING,ERROR,BUFFER,DROP,LATEST。
33+
* <p>
34+
* Flowable在设计的时候采用了一种新的思路也就是 响应式拉取 的方式来更好的解决上下游流速不均衡的问题。
35+
* <p>
36+
* 可以把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件,
37+
* 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM.
38+
*/
639
public class MainActivity extends AppCompatActivity {
740

41+
private static Subscription mSubscription;
42+
843
@Override
944
protected void onCreate(Bundle savedInstanceState) {
1045
super.onCreate(savedInstanceState);
1146
setContentView(R.layout.activity_main);
47+
ButterKnife.bind(this);
48+
demo_flowable();
1249
}
50+
51+
@OnClick({R.id.btn_flowable_test, R.id.btn_flowable_test_synchro, R.id.btn_flowable_test_asynchronous,
52+
R.id.btn_flowable_test_asynchronous2})
53+
void onViewClicked(View view) {
54+
switch (view.getId()) {
55+
case R.id.btn_flowable_test:
56+
request(96);
57+
break;
58+
case R.id.btn_flowable_test_synchro:
59+
demo_flowable_synchro();
60+
break;
61+
case R.id.btn_flowable_test_asynchronous:
62+
demo_flowable_asynchronous();
63+
break;
64+
case R.id.btn_flowable_test_asynchronous2:
65+
demo_flowable_asynchronous2();
66+
break;
67+
}
68+
}
69+
70+
private static void request(int count) {
71+
mSubscription.request(count);
72+
}
73+
74+
private static void demo_flowable() {
75+
Flowable.create(new FlowableOnSubscribe<Integer>() {
76+
@Override
77+
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
78+
emitter.onNext(1);
79+
emitter.onNext(2);
80+
emitter.onNext(3);
81+
emitter.onNext(4);
82+
emitter.onComplete();
83+
}
84+
}, BackpressureStrategy.ERROR)
85+
.subscribeOn(Schedulers.io())
86+
.observeOn(AndroidSchedulers.mainThread())
87+
.subscribe(new Subscriber<Integer>() {
88+
@Override
89+
public void onSubscribe(Subscription s) {
90+
mSubscription = s;
91+
}
92+
93+
@Override
94+
public void onNext(Integer integer) {
95+
Log.e("---", "onNext: " + integer);
96+
}
97+
98+
@Override
99+
public void onError(Throwable t) {
100+
Log.e("---", "onError: " + t.getLocalizedMessage());
101+
}
102+
103+
@Override
104+
public void onComplete() {
105+
Log.e("---", "onComplete.");
106+
}
107+
});
108+
}
109+
110+
/**
111+
* 同步的,即上游和下游同在主线程中
112+
*/
113+
private static void demo_flowable_synchro() {
114+
Flowable.create(new FlowableOnSubscribe<Integer>() {
115+
@Override
116+
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
117+
Log.e("---", "brfore emit, requested= " + emitter.requested());
118+
119+
Log.e("---", "emit 1");
120+
emitter.onNext(1);
121+
Log.e("---", "after emit 1,request= " + emitter.requested());
122+
123+
Log.e("---", "emit 2");
124+
emitter.onNext(2);
125+
Log.e("---", "after emit 2,request= " + emitter.requested());
126+
127+
Log.e("---", "emit 3");
128+
emitter.onNext(3);
129+
Log.e("---", "after emit 3,request= " + emitter.requested());
130+
131+
Log.e("---", "emit complete");
132+
emitter.onComplete();
133+
134+
Log.e("---", "after emit complete,request= " + emitter.requested());
135+
}
136+
}, BackpressureStrategy.ERROR)
137+
.subscribe(new Subscriber<Integer>() {
138+
@Override
139+
public void onSubscribe(Subscription s) {
140+
mSubscription = s;
141+
s.request(2);
142+
}
143+
144+
@Override
145+
public void onNext(Integer integer) {
146+
Log.e("---", "onNext: " + integer);
147+
}
148+
149+
@Override
150+
public void onError(Throwable t) {
151+
Log.e("---", "onError: " + t);
152+
}
153+
154+
@Override
155+
public void onComplete() {
156+
Log.e("---", "onComplete");
157+
}
158+
});
159+
}
160+
161+
/**
162+
* 异步的
163+
* <p>
164+
* 因为异步的话,上游和下游之间有一个缓冲池,即作者说的水缸,容量为128.
165+
* <p>
166+
* 设置上游requested的值的这个内部调用会在合适的时候自动触发,那到底什么时候是合适的时候呢?
167+
* <p>
168+
* 作者演示的时:当下游消费掉96个事件之后,即我们调用了request(96)后,上游才回继续发送事件。
169+
*/
170+
private static void demo_flowable_asynchronous() {
171+
Flowable.create(new FlowableOnSubscribe<Integer>() {
172+
@Override
173+
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
174+
Log.e("---", "brfore emit, requested= " + emitter.requested());
175+
176+
Log.e("---", "emit 1");
177+
emitter.onNext(1);
178+
Log.e("---", "after emit 1,request= " + emitter.requested());
179+
180+
Log.e("---", "emit 2");
181+
emitter.onNext(2);
182+
Log.e("---", "after emit 2,request= " + emitter.requested());
183+
184+
Log.e("---", "emit 3");
185+
emitter.onNext(3);
186+
Log.e("---", "after emit 3,request= " + emitter.requested());
187+
188+
Log.e("---", "emit complete");
189+
emitter.onComplete();
190+
191+
Log.e("---", "after emit complete,request= " + emitter.requested());
192+
}
193+
}, BackpressureStrategy.ERROR)
194+
.subscribeOn(Schedulers.io())
195+
.observeOn(AndroidSchedulers.mainThread())
196+
.subscribe(new Subscriber<Integer>() {
197+
@Override
198+
public void onSubscribe(Subscription s) {
199+
mSubscription = s;
200+
s.request(2);
201+
}
202+
203+
@Override
204+
public void onNext(Integer integer) {
205+
Log.e("---", "onNext: " + integer);
206+
}
207+
208+
@Override
209+
public void onError(Throwable t) {
210+
Log.e("---", "onError: " + t);
211+
}
212+
213+
@Override
214+
public void onComplete() {
215+
Log.e("---", "onComplete");
216+
}
217+
});
218+
}
219+
220+
private static void demo_flowable_asynchronous2() {
221+
Flowable.create(new FlowableOnSubscribe<Integer>() {
222+
@Override
223+
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
224+
boolean flag;
225+
226+
for (int i = 0; ; i++) {
227+
flag = false;
228+
while (emitter.requested() == 0) {
229+
if (!flag) {
230+
Log.e("---", "我不能再发送事件了");
231+
flag = true;
232+
}
233+
}
234+
emitter.onNext(i);
235+
Log.e("---", "emitter : " + i + ", request= " + emitter.requested());
236+
}
237+
238+
239+
}
240+
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
241+
.observeOn(AndroidSchedulers.mainThread())
242+
.subscribe(new Subscriber<Integer>() {
243+
@Override
244+
public void onSubscribe(Subscription s) {
245+
mSubscription = s;
246+
}
247+
248+
@Override
249+
public void onNext(Integer integer) {
250+
Log.e("---", "onNext: " + integer);
251+
}
252+
253+
@Override
254+
public void onError(Throwable t) {
255+
Log.e("---", "onError: " + t);
256+
}
257+
258+
@Override
259+
public void onComplete() {
260+
Log.e("---", "onComplete");
261+
}
262+
});
263+
}
264+
13265
}
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,34 @@
11
<?xml version="1.0" encoding="utf-8"?>
2-
<android.support.constraint.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
2+
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
33
xmlns:app="http://schemas.android.com/apk/res-auto"
44
xmlns:tools="http://schemas.android.com/tools"
55
android:layout_width="match_parent"
66
android:layout_height="match_parent"
7+
android:orientation="vertical"
78
tools:context=".MainActivity">
89

9-
<TextView
10-
android:layout_width="wrap_content"
10+
<Button
11+
android:id="@+id/btn_flowable_test"
12+
android:layout_width="match_parent"
1113
android:layout_height="wrap_content"
12-
android:text="Hello World!"
13-
app:layout_constraintBottom_toBottomOf="parent"
14-
app:layout_constraintLeft_toLeftOf="parent"
15-
app:layout_constraintRight_toRightOf="parent"
16-
app:layout_constraintTop_toTopOf="parent" />
14+
android:text="request_flowable" />
1715

18-
</android.support.constraint.ConstraintLayout>
16+
<Button
17+
android:id="@+id/btn_flowable_test_synchro"
18+
android:layout_width="match_parent"
19+
android:layout_height="wrap_content"
20+
android:text="request_flowable_同步的" />
21+
22+
<Button
23+
android:id="@+id/btn_flowable_test_asynchronous"
24+
android:layout_width="match_parent"
25+
android:layout_height="wrap_content"
26+
android:text="request_flowable_异步的" />
27+
28+
<Button
29+
android:id="@+id/btn_flowable_test_asynchronous2"
30+
android:layout_width="match_parent"
31+
android:layout_height="wrap_content"
32+
android:text="request_flowable_异步的2" />
33+
34+
</LinearLayout>

0 commit comments

Comments
 (0)