Skip to content

Commit 7ded7c8

Browse files
committed
Fixed issue where planning with a c.p.Merge of two or more c.p.HashJoins would fail.
Currently unresolved for the Apache Tez planner.
1 parent 70b978a commit 7ded7c8

File tree

4 files changed

+139
-9
lines changed

4 files changed

+139
-9
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ Cascading Change Log
22

33
3.3.0 [unreleased]
44

5+
Fixed issue where planning with a c.p.Merge of two or more c.p.HashJoins would fail. Currently unresolved for the
6+
Apache Tez planner.
7+
58
Fixed issue where c.t.h.PartitionTap could not initialize with more than a few thousand input partitions in a
69
reasonable time frame. Fix supports PartitionTap initialization in under 10sec w/ 1 million paths.
710

cascading-core/src/main/java/cascading/flow/planner/iso/transformer/RemoveBranchGraphTransformer.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016 Chris K Wensel <[email protected]>. All Rights Reserved.
2+
* Copyright (c) 2016-2018 Chris K Wensel <[email protected]>. All Rights Reserved.
33
* Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
44
*
55
* Project and contact information: http://www.cascading.org/
@@ -55,18 +55,19 @@ protected boolean transformGraphInPlaceUsing( Transformed<ElementGraph> transfor
5555
if( primary.isEmpty() )
5656
return false;
5757

58-
if( primary.size() != 1 )
59-
throw new IllegalStateException( "too many captured primary elements" );
60-
6158
if( secondary.isEmpty() )
6259
{
63-
boolean found = ElementGraphs.removeBranchContaining( graph, Util.getFirst( primary ) );
60+
for( FlowElement flowElement : primary )
61+
{
62+
boolean found = ElementGraphs.removeBranchContaining( graph, flowElement );
6463

65-
if( !found )
66-
throw new IllegalStateException( "no branch found at: " + Util.getFirst( primary ) );
64+
if( !found )
65+
throw new IllegalStateException( "no branch found at: " + flowElement );
66+
}
6767
}
6868
else
6969
{
70+
// RemoveStreamedBranchTransformer uses a Secondary match to identify a branch but is currently unused
7071
if( secondary.size() != 1 )
7172
throw new IllegalStateException( "too many captured secondary elements" );
7273

cascading-hadoop2-tez/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,7 @@ Some notes and issues with running Cascading on Apache Tez. JIRA issues will be
188188
* There is no Vertex 'parallelization' default in Tez, FlowRuntimeProps must be called per application (see sample
189189
applications above).
190190

191-
* Currently no way to algorithmically set node parallelization during runtime.
191+
* Currently no way to algorithmically set node parallelization during runtime.
192+
193+
* Does not provide a plan supporting `MergePipesPlatformTest#testHashJoinHashJoinMerge` and
194+
`MergePipesPlatformTest#testHashJoinHashJoinHashJoinMergeMerge`.

cascading-platform/src/test/java/cascading/MergePipesPlatformTest.java

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016 Chris K Wensel <[email protected]>. All Rights Reserved.
2+
* Copyright (c) 2016-2018 Chris K Wensel <[email protected]>. All Rights Reserved.
33
* Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
44
*
55
* Project and contact information: http://www.cascading.org/
@@ -894,4 +894,127 @@ public void testSameSourceHashJoinMergeOnAccumulated() throws Exception
894894

895895
validateLength( flow, 10 );
896896
}
897+
898+
/**
899+
* Currently not supported by Tez
900+
*/
901+
@Test
902+
public void testHashJoinHashJoinMerge() throws Exception
903+
{
904+
if( getPlatform().isDAG() )
905+
return;
906+
907+
getPlatform().copyFromLocal( inputFileLower );
908+
getPlatform().copyFromLocal( inputFileUpper );
909+
910+
Tap lhsLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
911+
Tap lhsUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
912+
Tap rhsLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
913+
Tap rhsUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
914+
915+
Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
916+
917+
Pipe lhsLowerPipe = new Pipe( "lhsLower" );
918+
Pipe lhsUpperPipe = new Pipe( "lhsUpper" );
919+
Pipe rhsLowerPipe = new Pipe( "rhsLower" );
920+
Pipe rhsUpperPipe = new Pipe( "rhsUpper" );
921+
922+
lhsUpperPipe = new Rename( lhsUpperPipe, Fields.ALL, new Fields( "num2", "char2" ) );
923+
924+
Pipe lhs = new HashJoin( lhsLowerPipe, new Fields( "num" ), lhsUpperPipe, new Fields( "num2" ) );
925+
926+
lhs = new Retain( lhs, new Fields( "num", "char" ) );
927+
928+
rhsUpperPipe = new Rename( rhsUpperPipe, Fields.ALL, new Fields( "num2", "char2" ) );
929+
930+
Pipe rhs = new HashJoin( rhsLowerPipe, new Fields( "num" ), rhsUpperPipe, new Fields( "num2" ) );
931+
932+
rhs = new Retain( rhs, new Fields( "num", "char" ) );
933+
934+
Pipe merge = new Merge( "merge", lhs, rhs );
935+
936+
FlowDef flowDef = FlowDef.flowDef()
937+
.addSource( lhsLowerPipe, lhsLower )
938+
.addSource( lhsUpperPipe, lhsUpper )
939+
.addSource( rhsLowerPipe, rhsLower )
940+
.addSource( rhsUpperPipe, rhsUpper )
941+
.addTailSink( merge, sink );
942+
943+
Flow flow = getPlatform().getFlowConnector().connect( flowDef );
944+
945+
flow.complete();
946+
947+
validateLength( flow, 10 );
948+
}
949+
950+
/**
951+
* Tests for https://github.com/cwensel/cascading/issues/61
952+
* Currently not supported by Tez
953+
*/
954+
@Test
955+
public void testHashJoinHashJoinHashJoinMergeMerge() throws Exception
956+
{
957+
if( getPlatform().isDAG() )
958+
return;
959+
960+
getPlatform().copyFromLocal( inputFileLower );
961+
getPlatform().copyFromLocal( inputFileUpper );
962+
getPlatform().copyFromLocal( inputFileLowerOffset );
963+
964+
Tap lhsLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
965+
Tap lhsUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
966+
Tap midLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
967+
Tap midUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
968+
Tap rhsLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
969+
Tap rhsUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
970+
Tap far = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLowerOffset );
971+
972+
Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
973+
974+
Pipe lhsLowerPipe = new Pipe( "lhsLower" );
975+
Pipe lhsUpperPipe = new Pipe( "lhsUpper" );
976+
Pipe midLowerPipe = new Pipe( "midLower" );
977+
Pipe midUpperPipe = new Pipe( "midUpper" );
978+
Pipe rhsLowerPipe = new Pipe( "rhsLower" );
979+
Pipe rhsUpperPipe = new Pipe( "rhsUpper" );
980+
Pipe farPipe = new Pipe( "far" );
981+
982+
lhsUpperPipe = new Rename( lhsUpperPipe, Fields.ALL, new Fields( "num2", "char2" ) );
983+
984+
Pipe lhs = new HashJoin( lhsLowerPipe, new Fields( "num" ), lhsUpperPipe, new Fields( "num2" ) );
985+
986+
lhs = new Retain( lhs, new Fields( "num", "char" ) );
987+
988+
midUpperPipe = new Rename( midUpperPipe, Fields.ALL, new Fields( "num2", "char2" ) );
989+
990+
Pipe mid = new HashJoin( midLowerPipe, new Fields( "num" ), midUpperPipe, new Fields( "num2" ) );
991+
992+
mid = new Retain( mid, new Fields( "num", "char" ) );
993+
994+
rhsUpperPipe = new Rename( rhsUpperPipe, Fields.ALL, new Fields( "num2", "char2" ) );
995+
996+
Pipe rhs = new HashJoin( rhsLowerPipe, new Fields( "num" ), rhsUpperPipe, new Fields( "num2" ) );
997+
998+
rhs = new Retain( rhs, new Fields( "num", "char" ) );
999+
1000+
Pipe merge = new Merge( "merge", lhs, mid, rhs );
1001+
1002+
merge = new Merge( "next merge", merge, farPipe );
1003+
1004+
FlowDef flowDef = FlowDef.flowDef()
1005+
.addSource( lhsLowerPipe, lhsLower )
1006+
.addSource( lhsUpperPipe, lhsUpper )
1007+
.addSource( midLowerPipe, midLower )
1008+
.addSource( midUpperPipe, midUpper )
1009+
.addSource( rhsLowerPipe, rhsLower )
1010+
.addSource( rhsUpperPipe, rhsUpper )
1011+
.addSource( farPipe, far )
1012+
.addTailSink( merge, sink );
1013+
1014+
Flow flow = getPlatform().getFlowConnector().connect( flowDef );
1015+
1016+
flow.complete();
1017+
1018+
validateLength( flow, 19 );
1019+
}
8971020
}

0 commit comments

Comments
 (0)