Skip to content

Commit 20e8886

Browse files
kcacademicpivovarit
authored andcommitted
Kafka spark cassandra (eugenp#6078)
* Adding files for the tutorial BAEL-2301 * Incorporating review comments on the article.
1 parent 361bc4b commit 20e8886

File tree

2 files changed

+32
-98
lines changed

2 files changed

+32
-98
lines changed

apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java

Lines changed: 15 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.Arrays;
77
import java.util.Collection;
88
import java.util.HashMap;
9-
import java.util.Iterator;
109
import java.util.List;
1110
import java.util.Map;
1211

@@ -35,7 +34,6 @@
3534

3635
public class WordCountingApp {
3736

38-
@SuppressWarnings("serial")
3937
public static void main(String[] args) throws InterruptedException {
4038
Logger.getLogger("org")
4139
.setLevel(Level.OFF);
@@ -61,52 +59,24 @@ public static void main(String[] args) throws InterruptedException {
6159

6260
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
6361

64-
JavaPairDStream<String, String> results = messages.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
65-
@Override
66-
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
67-
return new Tuple2<>(record.key(), record.value());
68-
}
69-
});
62+
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
7063

71-
JavaDStream<String> lines = results.map(new Function<Tuple2<String, String>, String>() {
72-
@Override
73-
public String call(Tuple2<String, String> tuple2) {
74-
return tuple2._2();
75-
}
76-
});
64+
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
7765

78-
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
79-
@Override
80-
public Iterator<String> call(String x) {
81-
return Arrays.asList(x.split("\\s+"))
82-
.iterator();
83-
}
84-
});
66+
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
67+
.iterator());
8568

86-
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
87-
@Override
88-
public Tuple2<String, Integer> call(String s) {
89-
return new Tuple2<>(s, 1);
90-
}
91-
})
92-
.reduceByKey(new Function2<Integer, Integer, Integer>() {
93-
@Override
94-
public Integer call(Integer i1, Integer i2) {
95-
return i1 + i2;
96-
}
97-
});
98-
99-
wordCounts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
100-
@Override
101-
public void call(JavaPairRDD<String, Integer> javaRdd) throws Exception {
102-
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
103-
for (String key : wordCountMap.keySet()) {
104-
List<Word> words = Arrays.asList(new Word(key, wordCountMap.get(key)));
105-
JavaRDD<Word> rdd = streamingContext.sparkContext()
106-
.parallelize(words);
107-
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
108-
.saveToCassandra();
109-
}
69+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
70+
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
71+
72+
wordCounts.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>) javaRdd -> {
73+
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
74+
for (String key : wordCountMap.keySet()) {
75+
List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
76+
JavaRDD<Word> rdd = streamingContext.sparkContext()
77+
.parallelize(wordList);
78+
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
79+
.saveToCassandra();
11080
}
11181
});
11282

apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.Arrays;
77
import java.util.Collection;
88
import java.util.HashMap;
9-
import java.util.Iterator;
109
import java.util.List;
1110
import java.util.Map;
1211

@@ -15,7 +14,6 @@
1514
import org.apache.log4j.Level;
1615
import org.apache.log4j.Logger;
1716
import org.apache.spark.SparkConf;
18-
import org.apache.spark.api.java.JavaPairRDD;
1917
import org.apache.spark.api.java.JavaRDD;
2018
import org.apache.spark.api.java.JavaSparkContext;
2119
import org.apache.spark.api.java.Optional;
@@ -43,7 +41,6 @@ public class WordCountingAppWithCheckpoint {
4341

4442
public static JavaSparkContext sparkContext;
4543

46-
@SuppressWarnings("serial")
4744
public static void main(String[] args) throws InterruptedException {
4845

4946
Logger.getLogger("org")
@@ -74,63 +71,30 @@ public static void main(String[] args) throws InterruptedException {
7471

7572
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
7673

77-
JavaPairDStream<String, String> results = messages.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
78-
@Override
79-
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
80-
return new Tuple2<>(record.key(), record.value());
81-
}
82-
});
74+
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
8375

84-
JavaDStream<String> lines = results.map(new Function<Tuple2<String, String>, String>() {
85-
@Override
86-
public String call(Tuple2<String, String> tuple2) {
87-
return tuple2._2();
88-
}
89-
});
76+
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
9077

91-
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
92-
@Override
93-
public Iterator<String> call(String x) {
94-
return Arrays.asList(x.split("\\s+"))
95-
.iterator();
96-
}
97-
});
78+
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
79+
.iterator());
9880

99-
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
100-
@Override
101-
public Tuple2<String, Integer> call(String s) {
102-
return new Tuple2<>(s, 1);
103-
}
104-
})
105-
.reduceByKey(new Function2<Integer, Integer, Integer>() {
106-
@Override
107-
public Integer call(Integer i1, Integer i2) {
108-
return i1 + i2;
109-
}
110-
});
111-
112-
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = (word, one, state) -> {
81+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
82+
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
83+
84+
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>) (word, one, state) -> {
11385
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
11486
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
11587
state.update(sum);
11688
return output;
117-
};
118-
119-
JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD(sparkContext.emptyRDD());
120-
121-
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function(mappingFunc)
122-
.initialState(initialRDD));
123-
124-
cumulativeWordCounts.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, Integer>>>() {
125-
@Override
126-
public void call(JavaRDD<Tuple2<String, Integer>> javaRdd) throws Exception {
127-
List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
128-
for (Tuple2<String, Integer> tuple : wordCountList) {
129-
List<Word> words = Arrays.asList(new Word(tuple._1, tuple._2));
130-
JavaRDD<Word> rdd = sparkContext.parallelize(words);
131-
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
132-
.saveToCassandra();
133-
}
89+
}));
90+
91+
cumulativeWordCounts.foreachRDD((VoidFunction<JavaRDD<Tuple2<String, Integer>>>) javaRdd -> {
92+
List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
93+
for (Tuple2<String, Integer> tuple : wordCountList) {
94+
List<Word> wordList = Arrays.asList(new Word(tuple._1, tuple._2));
95+
JavaRDD<Word> rdd = sparkContext.parallelize(wordList);
96+
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
97+
.saveToCassandra();
13498
}
13599
});
136100

0 commit comments

Comments
 (0)