Skip to content

Commit 1d0581c

Browse files
committed
Merge branch 'master' of https://github.com/eugenp/tutorials
2 parents 17cf6fc + ae287cb commit 1d0581c

File tree

15 files changed

+471
-4
lines changed

15 files changed

+471
-4
lines changed

core-java-collections/src/test/java/com/baeldung/collection/StreamOperateAndRemoveUnitTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import java.util.function.Predicate;
56
import java.util.stream.Collectors;
67

78
import org.junit.Assert;
@@ -35,8 +36,9 @@ public void givenAListOf10Items_whenFilteredForQualifiedItems_thenFilteredListCo
3536
@Test
3637
public void givenAListOf10Items_whenOperateAndRemoveQualifiedItemsUsingRemoveIf_thenListContains5Items() {
3738

38-
itemList.stream().filter(item -> item.isQualified()).forEach(item -> item.operate());
39-
itemList.removeIf(item -> item.isQualified());
39+
final Predicate<Item> isQualified = item -> item.isQualified();
40+
itemList.stream().filter(isQualified).forEach(item -> item.operate());
41+
itemList.removeIf(isQualified);
4042

4143
Assert.assertEquals(5, itemList.size());
4244
}

libraries-data/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@
99
- [Introduction to JCache](http://www.baeldung.com/jcache)
1010
- [A Guide to Apache Ignite](http://www.baeldung.com/apache-ignite)
1111
- [Apache Ignite with Spring Data](http://www.baeldung.com/apache-ignite-spring-data)
12-
- [Guide to JMapper](https://github.com/eugenp/tutorials/tree/master/libraries-data)
12+
- [Guide to JMapper](https://www.baeldung.com/jmapper)
13+
- [A Guide to Apache Crunch](https://www.baeldung.com/crunch)

libraries-data/pom.xml

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,44 @@
147147
<artifactId>jmapper-core</artifactId>
148148
<version>${jmapper.version}</version>
149149
</dependency>
150+
151+
<!-- crunch project -->
152+
<dependency>
153+
<groupId>org.apache.crunch</groupId>
154+
<artifactId>crunch-core</artifactId>
155+
<version>${org.apache.crunch.crunch-core.version}</version>
156+
</dependency>
157+
<dependency>
158+
<groupId>org.apache.hadoop</groupId>
159+
<artifactId>hadoop-client</artifactId>
160+
<version>${org.apache.hadoop.hadoop-client}</version>
161+
<scope>provided</scope>
162+
</dependency>
163+
164+
<dependency>
165+
<groupId>commons-cli</groupId>
166+
<artifactId>commons-cli</artifactId>
167+
<version>1.2</version>
168+
<scope>provided</scope>
169+
</dependency>
170+
<dependency>
171+
<groupId>commons-io</groupId>
172+
<artifactId>commons-io</artifactId>
173+
<version>2.1</version>
174+
<scope>provided</scope>
175+
</dependency>
176+
<dependency>
177+
<groupId>commons-httpclient</groupId>
178+
<artifactId>commons-httpclient</artifactId>
179+
<version>3.0.1</version>
180+
<scope>provided</scope>
181+
<exclusions>
182+
<exclusion>
183+
<groupId>commons-codec</groupId>
184+
<artifactId>commons-codec</artifactId>
185+
</exclusion>
186+
</exclusions>
187+
</dependency>
150188

151189
</dependencies>
152190

@@ -252,6 +290,31 @@
252290
</executions>
253291
</plugin>
254292

293+
<plugin>
294+
<groupId>org.apache.maven.plugins</groupId>
295+
<artifactId>maven-assembly-plugin</artifactId>
296+
<version>2.3</version>
297+
<configuration>
298+
<descriptors>
299+
<descriptor>src/main/assembly/hadoop-job.xml</descriptor>
300+
</descriptors>
301+
<archive>
302+
<manifest>
303+
<mainClass>com.baeldung.crunch.WordCount</mainClass>
304+
</manifest>
305+
</archive>
306+
</configuration>
307+
<executions>
308+
<execution>
309+
<id>make-assembly</id>
310+
<phase>package</phase>
311+
<goals>
312+
<goal>single</goal>
313+
</goals>
314+
</execution>
315+
</executions>
316+
</plugin>
317+
255318
</plugins>
256319
</build>
257320

@@ -282,7 +345,9 @@
282345
<datanucleus-maven-plugin.version>5.0.2</datanucleus-maven-plugin.version>
283346
<datanucleus-xml.version>5.0.0-release</datanucleus-xml.version>
284347
<datanucleus-jdo-query.version>5.0.4</datanucleus-jdo-query.version>
285-
<jmapper.version>1.6.0.1</jmapper.version>
348+
<jmapper.version>1.6.0.1</jmapper.version>
349+
<org.apache.crunch.crunch-core.version>0.15.0</org.apache.crunch.crunch-core.version>
350+
<org.apache.hadoop.hadoop-client>2.2.0</org.apache.hadoop.hadoop-client>
286351
</properties>
287352

288353
</project>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
6+
7+
<id>job</id>
8+
<formats>
9+
<format>jar</format>
10+
</formats>
11+
<includeBaseDirectory>false</includeBaseDirectory>
12+
<dependencySets>
13+
<dependencySet>
14+
<unpack>false</unpack>
15+
<scope>runtime</scope>
16+
<outputDirectory>lib</outputDirectory>
17+
<excludes>
18+
<exclude>${groupId}:${artifactId}</exclude>
19+
</excludes>
20+
</dependencySet>
21+
<dependencySet>
22+
<unpack>true</unpack>
23+
<includes>
24+
<include>${groupId}:${artifactId}</include>
25+
</includes>
26+
</dependencySet>
27+
</dependencySets>
28+
</assembly>
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.baeldung.crunch;
2+
3+
import java.util.Set;
4+
5+
import org.apache.crunch.FilterFn;
6+
7+
import com.google.common.collect.ImmutableSet;
8+
9+
/**
10+
* A filter that removes known stop words.
11+
*/
12+
public class StopWordFilter extends FilterFn<String> {
13+
14+
// English stop words, borrowed from Lucene.
15+
private static final Set<String> STOP_WORDS = ImmutableSet
16+
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
17+
"for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
18+
"or", "s", "such", "t", "that", "the", "their", "then", "there",
19+
"these", "they", "this", "to", "was", "will", "with" });
20+
21+
@Override
22+
public boolean accept(String word) {
23+
return !STOP_WORDS.contains(word);
24+
}
25+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.baeldung.crunch;
2+
3+
import org.apache.crunch.MapFn;
4+
5+
public class ToUpperCaseFn extends MapFn<String, String> {
6+
7+
@Override
8+
public String map(String input) {
9+
return input != null ? input.toUpperCase() : input;
10+
}
11+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.baeldung.crunch;
2+
3+
import org.apache.crunch.MapFn;
4+
5+
@SuppressWarnings("serial")
6+
public class ToUpperCaseWithCounterFn extends MapFn<String, String> {
7+
8+
@Override
9+
public String map(String input) {
10+
if (input == null) {
11+
return input;
12+
} else {
13+
String output = input.toUpperCase();
14+
if (!input.equals(output)) {
15+
increment("UpperCase", "modified");
16+
}
17+
return output;
18+
}
19+
}
20+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.baeldung.crunch;
2+
3+
import org.apache.crunch.DoFn;
4+
import org.apache.crunch.Emitter;
5+
6+
import com.google.common.base.Splitter;
7+
8+
/**
9+
* Splits a line of text, filtering known stop words.
10+
*/
11+
public class Tokenizer extends DoFn<String, String> {
12+
private static final Splitter SPLITTER = Splitter
13+
.onPattern("\\s+")
14+
.omitEmptyStrings();
15+
16+
@Override
17+
public void process(String line,
18+
Emitter<String> emitter) {
19+
for (String word : SPLITTER.split(line)) {
20+
emitter.emit(word);
21+
}
22+
}
23+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.baeldung.crunch;
2+
3+
import org.apache.crunch.PCollection;
4+
import org.apache.crunch.PTable;
5+
import org.apache.crunch.Pipeline;
6+
import org.apache.crunch.PipelineResult;
7+
import org.apache.crunch.impl.mr.MRPipeline;
8+
import org.apache.crunch.types.writable.Writables;
9+
import org.apache.hadoop.conf.Configuration;
10+
import org.apache.hadoop.conf.Configured;
11+
import org.apache.hadoop.util.GenericOptionsParser;
12+
import org.apache.hadoop.util.Tool;
13+
import org.apache.hadoop.util.ToolRunner;
14+
15+
/**
16+
* A word count example for Apache Crunch, based on Crunch's example projects.
17+
*/
18+
public class WordCount extends Configured implements Tool {
19+
20+
public static void main(String[] args) throws Exception {
21+
ToolRunner.run(new Configuration(), new WordCount(), args);
22+
}
23+
24+
public int run(String[] args) throws Exception {
25+
26+
if (args.length != 2) {
27+
System.err.println("Usage: hadoop jar crunch-1.0.0-SNAPSHOT-job.jar" + " [generic options] input output");
28+
System.err.println();
29+
GenericOptionsParser.printGenericCommandUsage(System.err);
30+
return 1;
31+
}
32+
33+
String inputPath = args[0];
34+
String outputPath = args[1];
35+
36+
// Create an object to coordinate pipeline creation and execution.
37+
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
38+
39+
// Reference a given text file as a collection of Strings.
40+
PCollection<String> lines = pipeline.readTextFile(inputPath);
41+
42+
// Define a function that splits each line in a PCollection of Strings into
43+
// a PCollection made up of the individual words in the file.
44+
// The second argument sets the serialization format.
45+
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
46+
47+
// Take the collection of words and remove known stop words.
48+
PCollection<String> noStopWords = words.filter(new StopWordFilter());
49+
50+
// The count method applies a series of Crunch primitives and returns
51+
// a map of the unique words in the input PCollection to their counts.
52+
PTable<String, Long> counts = noStopWords.count();
53+
54+
// Instruct the pipeline to write the resulting counts to a text file.
55+
pipeline.writeTextFile(counts, outputPath);
56+
57+
// Execute the pipeline as a MapReduce.
58+
PipelineResult result = pipeline.done();
59+
60+
return result.succeeded() ? 0 : 1;
61+
}
62+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package com.baeldung.crunch;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
5+
6+
import java.io.File;
7+
import java.io.IOException;
8+
import java.nio.file.Files;
9+
import java.nio.file.Path;
10+
import java.util.Calendar;
11+
12+
import org.apache.crunch.PCollection;
13+
import org.apache.crunch.Pipeline;
14+
import org.apache.crunch.Source;
15+
import org.apache.crunch.Target;
16+
import org.apache.crunch.impl.mem.MemPipeline;
17+
import org.apache.crunch.io.From;
18+
import org.apache.crunch.io.To;
19+
import org.junit.Ignore;
20+
import org.junit.Test;
21+
22+
public class MemPipelineUnitTest {
23+
24+
private static final String INPUT_FILE_PATH = "src/test/resources/crunch/input.txt";
25+
26+
@Test
27+
public void givenPipeLineAndSource_whenSourceRead_thenExpectedNumberOfRecordsRead() {
28+
Pipeline pipeline = MemPipeline.getInstance();
29+
Source<String> source = From.textFile(INPUT_FILE_PATH);
30+
31+
PCollection<String> lines = pipeline.read(source);
32+
33+
assertEquals(21, lines.asCollection()
34+
.getValue()
35+
.size());
36+
}
37+
38+
@Test
39+
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
40+
Pipeline pipeline = MemPipeline.getInstance();
41+
42+
PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);
43+
44+
assertEquals(21, lines.asCollection()
45+
.getValue()
46+
.size());
47+
}
48+
49+
private String createOutputPath() throws IOException {
50+
Path path = Files.createTempDirectory("test");
51+
final String outputFilePath = path.toString() + File.separatorChar
52+
+ "output.text";
53+
return outputFilePath;
54+
}
55+
56+
@Test
57+
@Ignore("Requires Hadoop binaries")
58+
public void givenCollection_whenWriteCalled_fileWrittenSuccessfully()
59+
throws IOException {
60+
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
61+
"Apache", "Crunch", Calendar.getInstance()
62+
.toString());
63+
final String outputFilePath = createOutputPath();
64+
Target target = To.textFile(outputFilePath);
65+
66+
inputStrings.write(target);
67+
68+
Pipeline pipeline = MemPipeline.getInstance();
69+
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
70+
assertIterableEquals(inputStrings.materialize(), lines.materialize());
71+
}
72+
73+
@Test
74+
@Ignore("Requires Hadoop binaries")
75+
public void givenPipeLine_whenWriteTextFileCalled_fileWrittenSuccessfully()
76+
throws IOException {
77+
Pipeline pipeline = MemPipeline.getInstance();
78+
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
79+
"Apache", "Crunch", Calendar.getInstance()
80+
.toString());
81+
final String outputFilePath = createOutputPath();
82+
83+
pipeline.writeTextFile(inputStrings, outputFilePath);
84+
85+
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
86+
assertIterableEquals(inputStrings.materialize(), lines.materialize());
87+
}
88+
89+
}

0 commit comments

Comments
 (0)