1
1
package com .liuh .learn .sample1 ;
2
2
3
+ import android .os .Environment ;
3
4
import android .support .v7 .app .AppCompatActivity ;
4
5
import android .os .Bundle ;
5
6
import android .util .Log ;
8
9
import org .reactivestreams .Subscriber ;
9
10
import org .reactivestreams .Subscription ;
10
11
12
+ import java .io .BufferedReader ;
13
+ import java .io .FileReader ;
14
+ import java .io .IOException ;
15
+ import java .io .InputStream ;
16
+ import java .io .InputStreamReader ;
17
+
11
18
import butterknife .ButterKnife ;
12
19
import butterknife .OnClick ;
13
20
import io .reactivex .BackpressureStrategy ;
@@ -49,7 +56,7 @@ protected void onCreate(Bundle savedInstanceState) {
49
56
}
50
57
51
58
@ OnClick ({R .id .btn_flowable_test , R .id .btn_flowable_test_synchro , R .id .btn_flowable_test_asynchronous ,
52
- R .id .btn_flowable_test_asynchronous2 })
59
+ R .id .btn_flowable_test_asynchronous2 , R . id . btn_flowable_test_readfile_from_sd })
53
60
void onViewClicked (View view ) {
54
61
switch (view .getId ()) {
55
62
case R .id .btn_flowable_test :
@@ -64,6 +71,9 @@ void onViewClicked(View view) {
64
71
case R .id .btn_flowable_test_asynchronous2 :
65
72
demo_flowable_asynchronous2 ();
66
73
break ;
74
+ case R .id .btn_flowable_test_readfile_from_sd :
75
+ practice1 ();
76
+ break ;
67
77
}
68
78
}
69
79
@@ -262,4 +272,68 @@ public void onComplete() {
262
272
});
263
273
}
264
274
275
+
276
+ public void practice1 () {
277
+ Flowable .create (new FlowableOnSubscribe <String >() {
278
+ @ Override
279
+ public void subscribe (FlowableEmitter <String > emitter ) throws Exception {
280
+
281
+ try {
282
+
283
+ InputStream inputStream = getResources ().openRawResource (R .raw .kangqiao );
284
+ InputStreamReader inputStreamReader = new InputStreamReader (inputStream , "gbk" );
285
+ BufferedReader bufferedReader = new BufferedReader (inputStreamReader );
286
+
287
+ String str ;
288
+
289
+ while ((str = bufferedReader .readLine ()) != null && !emitter .isCancelled ()) {
290
+ while (emitter .requested () == 0 ) {
291
+ if (emitter .isCancelled ()) {
292
+ break ;
293
+ }
294
+ }
295
+ emitter .onNext (str );
296
+ }
297
+ bufferedReader .close ();
298
+ inputStreamReader .close ();
299
+ inputStream .close ();
300
+
301
+ emitter .onComplete ();
302
+ } catch (IOException e ) {
303
+ e .printStackTrace ();
304
+ }
305
+ }
306
+ }, BackpressureStrategy .ERROR )
307
+ .subscribeOn (Schedulers .io ())
308
+ .observeOn (AndroidSchedulers .mainThread ())
309
+ .subscribe (new Subscriber <String >() {
310
+ @ Override
311
+ public void onSubscribe (Subscription s ) {
312
+ mSubscription = s ;
313
+ mSubscription .request (1 );
314
+ }
315
+
316
+ @ Override
317
+ public void onNext (String s ) {
318
+ System .out .println (s );
319
+ try {
320
+ Thread .sleep (2000 );
321
+ mSubscription .request (1 );
322
+ } catch (InterruptedException e ) {
323
+ e .printStackTrace ();
324
+ }
325
+ }
326
+
327
+ @ Override
328
+ public void onError (Throwable t ) {
329
+ System .out .println (t );
330
+ }
331
+
332
+ @ Override
333
+ public void onComplete () {
334
+
335
+ }
336
+ });
337
+ }
338
+
265
339
}
0 commit comments