1
+ package com .baeldung .data .pipeline ;
2
+
3
+ import static com .datastax .spark .connector .japi .CassandraJavaUtil .javaFunctions ;
4
+ import static com .datastax .spark .connector .japi .CassandraJavaUtil .mapToRow ;
5
+
6
+ import java .util .Arrays ;
7
+ import java .util .Collection ;
8
+ import java .util .HashMap ;
9
+ import java .util .Iterator ;
10
+ import java .util .List ;
11
+ import java .util .Map ;
12
+
13
+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
14
+ import org .apache .kafka .common .serialization .StringDeserializer ;
15
+ import org .apache .log4j .Level ;
16
+ import org .apache .log4j .Logger ;
17
+ import org .apache .spark .SparkConf ;
18
+ import org .apache .spark .api .java .JavaPairRDD ;
19
+ import org .apache .spark .api .java .JavaRDD ;
20
+ import org .apache .spark .api .java .JavaSparkContext ;
21
+ import org .apache .spark .api .java .Optional ;
22
+ import org .apache .spark .api .java .function .FlatMapFunction ;
23
+ import org .apache .spark .api .java .function .Function ;
24
+ import org .apache .spark .api .java .function .Function2 ;
25
+ import org .apache .spark .api .java .function .Function3 ;
26
+ import org .apache .spark .api .java .function .PairFunction ;
27
+ import org .apache .spark .api .java .function .VoidFunction ;
28
+ import org .apache .spark .streaming .Durations ;
29
+ import org .apache .spark .streaming .State ;
30
+ import org .apache .spark .streaming .StateSpec ;
31
+ import org .apache .spark .streaming .api .java .JavaDStream ;
32
+ import org .apache .spark .streaming .api .java .JavaInputDStream ;
33
+ import org .apache .spark .streaming .api .java .JavaMapWithStateDStream ;
34
+ import org .apache .spark .streaming .api .java .JavaPairDStream ;
35
+ import org .apache .spark .streaming .api .java .JavaStreamingContext ;
36
+ import org .apache .spark .streaming .kafka010 .ConsumerStrategies ;
37
+ import org .apache .spark .streaming .kafka010 .KafkaUtils ;
38
+ import org .apache .spark .streaming .kafka010 .LocationStrategies ;
39
+
40
+ import scala .Tuple2 ;
41
+
42
+ public class WordCountingAppWithCheckpoint {
43
+
44
+ public static JavaSparkContext sparkContext ;
45
+
46
+ @ SuppressWarnings ("serial" )
47
+ public static void main (String [] args ) throws InterruptedException {
48
+
49
+ Logger .getLogger ("org" )
50
+ .setLevel (Level .OFF );
51
+ Logger .getLogger ("akka" )
52
+ .setLevel (Level .OFF );
53
+
54
+ Map <String , Object > kafkaParams = new HashMap <>();
55
+ kafkaParams .put ("bootstrap.servers" , "localhost:9092" );
56
+ kafkaParams .put ("key.deserializer" , StringDeserializer .class );
57
+ kafkaParams .put ("value.deserializer" , StringDeserializer .class );
58
+ kafkaParams .put ("group.id" , "use_a_separate_group_id_for_each_stream" );
59
+ kafkaParams .put ("auto.offset.reset" , "latest" );
60
+ kafkaParams .put ("enable.auto.commit" , false );
61
+
62
+ Collection <String > topics = Arrays .asList ("messages" );
63
+
64
+ SparkConf sparkConf = new SparkConf ();
65
+ sparkConf .setMaster ("local[2]" );
66
+ sparkConf .setAppName ("WordCountingAppWithCheckpoint" );
67
+ sparkConf .set ("spark.cassandra.connection.host" , "127.0.0.1" );
68
+
69
+ JavaStreamingContext streamingContext = new JavaStreamingContext (sparkConf , Durations .seconds (1 ));
70
+
71
+ sparkContext = streamingContext .sparkContext ();
72
+
73
+ streamingContext .checkpoint ("./.checkpoint" );
74
+
75
+ JavaInputDStream <ConsumerRecord <String , String >> messages = KafkaUtils .createDirectStream (streamingContext , LocationStrategies .PreferConsistent (), ConsumerStrategies .<String , String > Subscribe (topics , kafkaParams ));
76
+
77
+ JavaPairDStream <String , String > results = messages .mapToPair (new PairFunction <ConsumerRecord <String , String >, String , String >() {
78
+ @ Override
79
+ public Tuple2 <String , String > call (ConsumerRecord <String , String > record ) {
80
+ return new Tuple2 <>(record .key (), record .value ());
81
+ }
82
+ });
83
+
84
+ JavaDStream <String > lines = results .map (new Function <Tuple2 <String , String >, String >() {
85
+ @ Override
86
+ public String call (Tuple2 <String , String > tuple2 ) {
87
+ return tuple2 ._2 ();
88
+ }
89
+ });
90
+
91
+ JavaDStream <String > words = lines .flatMap (new FlatMapFunction <String , String >() {
92
+ @ Override
93
+ public Iterator <String > call (String x ) {
94
+ return Arrays .asList (x .split ("\\ s+" ))
95
+ .iterator ();
96
+ }
97
+ });
98
+
99
+ JavaPairDStream <String , Integer > wordCounts = words .mapToPair (new PairFunction <String , String , Integer >() {
100
+ @ Override
101
+ public Tuple2 <String , Integer > call (String s ) {
102
+ return new Tuple2 <>(s , 1 );
103
+ }
104
+ })
105
+ .reduceByKey (new Function2 <Integer , Integer , Integer >() {
106
+ @ Override
107
+ public Integer call (Integer i1 , Integer i2 ) {
108
+ return i1 + i2 ;
109
+ }
110
+ });
111
+
112
+ Function3 <String , Optional <Integer >, State <Integer >, Tuple2 <String , Integer >> mappingFunc = (word , one , state ) -> {
113
+ int sum = one .orElse (0 ) + (state .exists () ? state .get () : 0 );
114
+ Tuple2 <String , Integer > output = new Tuple2 <>(word , sum );
115
+ state .update (sum );
116
+ return output ;
117
+ };
118
+
119
+ JavaPairRDD <String , Integer > initialRDD = JavaPairRDD .fromJavaRDD (sparkContext .emptyRDD ());
120
+
121
+ JavaMapWithStateDStream <String , Integer , Integer , Tuple2 <String , Integer >> cumulativeWordCounts = wordCounts .mapWithState (StateSpec .function (mappingFunc )
122
+ .initialState (initialRDD ));
123
+
124
+ cumulativeWordCounts .foreachRDD (new VoidFunction <JavaRDD <Tuple2 <String , Integer >>>() {
125
+ @ Override
126
+ public void call (JavaRDD <Tuple2 <String , Integer >> javaRdd ) throws Exception {
127
+ List <Tuple2 <String , Integer >> wordCountList = javaRdd .collect ();
128
+ for (Tuple2 <String , Integer > tuple : wordCountList ) {
129
+ List <Word > words = Arrays .asList (new Word (tuple ._1 , tuple ._2 ));
130
+ JavaRDD <Word > rdd = sparkContext .parallelize (words );
131
+ javaFunctions (rdd ).writerBuilder ("vocabulary" , "words" , mapToRow (Word .class ))
132
+ .saveToCassandra ();
133
+ }
134
+ }
135
+ });
136
+
137
+ streamingContext .start ();
138
+ streamingContext .awaitTermination ();
139
+ }
140
+ }
0 commit comments