Skip to content

Commit 5575b3a

Browse files
committed
Add missing parallel=:threads implementations
These implementations are extremely basic, but they try to follow the patterns in the other parts of the Parallel module. My real motivation is to be able to move the Distributed implementations into an extension, so that Graphs.jl does not depend on Distributed.
1 parent 6130332 commit 5575b3a

File tree

6 files changed

+152
-21
lines changed

6 files changed

+152
-21
lines changed

src/Parallel/distance.jl

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,55 @@
11
# used in shortest path calculations
22

33
function eccentricity(
4+
g::AbstractGraph,
5+
vs=vertices(g),
6+
distmx::AbstractMatrix{T}=weights(g);
7+
parallel::Symbol=:distributed,
8+
) where {T<:Number}
9+
return if parallel === :threads
10+
threaded_eccentricity(g, vs, distmx)
11+
elseif parallel === :distributed
12+
distr_eccentricity(g, vs, distmx)
13+
else
14+
throw(
15+
ArgumentError(
16+
"Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')",
17+
),
18+
)
19+
end
20+
end
21+
22+
function distr_eccentricity(
423
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
524
) where {T<:Number}
625
vlen = length(vs)
726
eccs = SharedVector{T}(vlen)
827
@sync @distributed for i in 1:vlen
9-
eccs[i] = maximum(Graphs.dijkstra_shortest_paths(g, vs[i], distmx).dists)
28+
local d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
29+
eccs[i] = maximum(d.dists)
1030
end
1131
d = sdata(eccs)
1232
maximum(d) == typemax(T) && @warn("Infinite path length detected")
1333
return d
1434
end
1535

16-
function eccentricity(g::AbstractGraph, distmx::AbstractMatrix)
17-
return eccentricity(g, vertices(g), distmx)
36+
function threaded_eccentricity(
37+
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
38+
) where {T<:Number}
39+
vlen = length(vs)
40+
eccs = Vector{T}(undef, vlen)
41+
Base.Threads.@threads for i in 1:vlen
42+
d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
43+
eccs[i] = maximum(d.dists)
44+
end
45+
maximum(eccs) == typemax(T) && @warn("Infinite path length detected")
46+
return eccs
47+
end
48+
49+
function eccentricity(
50+
g::AbstractGraph, distmx::AbstractMatrix; parallel::Symbol=:distributed
51+
)
52+
return eccentricity(g, vertices(g), distmx; parallel)
1853
end
1954

2055
function diameter(g::AbstractGraph, distmx::AbstractMatrix=weights(g))

src/Parallel/shortestpaths/dijkstra.jl

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,54 @@ struct MultipleDijkstraState{T<:Number,U<:Integer} <: AbstractPathState
99
end
1010

1111
"""
12-
Parallel.dijkstra_shortest_paths(g, sources=vertices(g), distmx=weights(g))
12+
Parallel.dijkstra_shortest_paths(g, sources=vertices(g), distmx=weights(g), parallel=:distributed)
1313
1414
Compute the shortest paths between all pairs of vertices in graph `g` by running
1515
[`dijkstra_shortest_paths`] for every vertex and using an optional list of source vertex `sources` and
1616
an optional distance matrix `distmx`. Return a [`Parallel.MultipleDijkstraState`](@ref) with relevant
17-
traversal information.
17+
traversal information. The `parallel` argument can be set to `:threads` and `:distributed` for multi-
18+
threaded or multi-process parallelism, respectively.
1819
"""
1920
function dijkstra_shortest_paths(
21+
g::AbstractGraph{U},
22+
sources=vertices(g),
23+
distmx::AbstractMatrix{T}=weights(g);
24+
parallel::Symbol=:distributed,
25+
) where {T<:Number} where {U}
26+
return if parallel === :threads
27+
threaded_dijkstra_shortest_paths(g, sources, distmx)
28+
elseif parallel === :distributed
29+
distr_dijkstra_shortest_paths(g, sources, distmx)
30+
else
31+
throw(
32+
ArgumentError(
33+
"Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')",
34+
),
35+
)
36+
end
37+
end
38+
39+
function threaded_dijkstra_shortest_paths(
40+
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
41+
) where {T<:Number} where {U}
42+
n_v = nv(g)
43+
r_v = length(sources)
44+
45+
# TODO: remove `Int` once julialang/#23029 / #23032 are resolved
46+
dists = Matrix{T}(undef, Int(r_v), Int(n_v))
47+
parents = Matrix{U}(undef, Int(r_v), Int(n_v))
48+
49+
Base.Threads.@threads for i in 1:r_v
50+
state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx)
51+
dists[i, :] = state.dists
52+
parents[i, :] = state.parents
53+
end
54+
55+
result = MultipleDijkstraState(dists, parents)
56+
return result
57+
end
58+
59+
function distr_dijkstra_shortest_paths(
2060
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
2161
) where {T<:Number} where {U}
2262
n_v = nv(g)
Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,31 @@
1-
function random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
1+
function random_greedy_color(
2+
g::AbstractGraph{T}, reps::Integer; parallel::Symbol=:distributed
3+
) where {T<:Integer}
4+
return if parallel === :threads
5+
threaded_random_greedy_color(g, reps)
6+
elseif parallel === :distributed
7+
distr_random_greedy_color(g, reps)
8+
else
9+
throw(
10+
ArgumentError(
11+
"Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')",
12+
),
13+
)
14+
end
15+
end
16+
17+
function threaded_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
18+
local_best = Vector{Graphs.Coloring{T}}(undef, reps)
19+
Base.Threads.@threads for i in 1:reps
20+
seq = shuffle(vertices(g))
21+
local_best[i] = Graphs.perm_greedy_color(g, seq)
22+
end
23+
best = reduce(Graphs.best_color, local_best)
24+
25+
return convert(Graphs.Coloring{T}, best)
26+
end
27+
28+
function distr_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
229
best = @distributed (Graphs.best_color) for i in 1:reps
330
seq = shuffle(vertices(g))
431
Graphs.perm_greedy_color(g, seq)
@@ -8,11 +35,14 @@ function random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integ
835
end
936

1037
function greedy_color(
11-
g::AbstractGraph{U}; sort_degree::Bool=false, reps::Integer=1
38+
g::AbstractGraph{U};
39+
sort_degree::Bool=false,
40+
reps::Integer=1,
41+
parallel::Symbol=:distributed,
1242
) where {U<:Integer}
1343
return if sort_degree
1444
Graphs.degree_greedy_color(g)
1545
else
16-
Parallel.random_greedy_color(g, reps)
46+
Parallel.random_greedy_color(g, reps; parallel)
1747
end
1848
end

test/parallel/distance.jl

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
@testset "Parallel.Distance" begin
1+
@testset "Parallel.Distance" for parallel in [:threads, :distributed]
22
g4 = path_digraph(5)
33
adjmx1 = [0 1 0; 1 0 1; 0 1 0] # graph
44
adjmx2 = [0 1 0; 1 0 1; 1 1 0] # digraph
@@ -9,7 +9,7 @@
99

1010
for g in testgraphs(a1)
1111
z = @inferred(Graphs.eccentricity(g, distmx1))
12-
y = @inferred(Parallel.eccentricity(g, distmx1))
12+
y = @inferred(Parallel.eccentricity(g, distmx1; parallel))
1313
@test isapprox(y, z)
1414
@test @inferred(Graphs.diameter(y)) ==
1515
@inferred(Parallel.diameter(g, distmx1)) ==
@@ -21,9 +21,15 @@
2121
@test @inferred(Graphs.center(y)) == @inferred(Parallel.center(g, distmx1)) == [2]
2222
end
2323

24+
let g = testgraphs(a1)[1]
25+
# An error should be reported if the parallel mode could not be understood
26+
@test_throws ArgumentError Parallel.eccentricity(g, distmx1; parallel=:thread)
27+
@test_throws ArgumentError Parallel.eccentricity(g, distmx1; parallel=:distriibuted)
28+
end
29+
2430
for g in testdigraphs(a2)
2531
z = @inferred(Graphs.eccentricity(g, distmx2))
26-
y = @inferred(Parallel.eccentricity(g, distmx2))
32+
y = @inferred(Parallel.eccentricity(g, distmx2; parallel))
2733
@test isapprox(y, z)
2834
@test @inferred(Graphs.diameter(y)) ==
2935
@inferred(Parallel.diameter(g, distmx2)) ==

test/parallel/shortestpaths/dijkstra.jl

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
@testset "Parallel.Dijkstra" begin
1+
@testset "Parallel.Dijkstra" for parallel in [:threads, :distributed]
22
g4 = path_digraph(5)
33
d1 = float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0])
44
d2 = sparse(float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0]))
@@ -8,7 +8,7 @@
88

99
for g in testgraphs(g3)
1010
z = floyd_warshall_shortest_paths(g, d)
11-
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d))
11+
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d; parallel))
1212
@test all(isapprox(z.dists, zp.dists))
1313

1414
for i in 1:5
@@ -21,7 +21,7 @@
2121
end
2222

2323
z = floyd_warshall_shortest_paths(g)
24-
zp = @inferred(Parallel.dijkstra_shortest_paths(g))
24+
zp = @inferred(Parallel.dijkstra_shortest_paths(g; parallel))
2525
@test all(isapprox(z.dists, zp.dists))
2626

2727
for i in 1:5
@@ -34,7 +34,7 @@
3434
end
3535

3636
z = floyd_warshall_shortest_paths(g)
37-
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]))
37+
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]; parallel))
3838
@test all(isapprox(z.dists[1:2, :], zp.dists))
3939

4040
for i in 1:2
@@ -51,9 +51,17 @@
5151
g3 = path_digraph(5)
5252
d = float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0])
5353

54+
# An error should be reported if the parallel mode could not be understood
55+
@test_throws ArgumentError Parallel.dijkstra_shortest_paths(
56+
testdigraphs(g3)[1], collect(1:5), d; parallel=:thread
57+
)
58+
@test_throws ArgumentError Parallel.dijkstra_shortest_paths(
59+
testdigraphs(g3)[1], collect(1:5), d; parallel=:distriibuted
60+
)
61+
5462
for g in testdigraphs(g3)
5563
z = floyd_warshall_shortest_paths(g, d)
56-
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d))
64+
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d; parallel))
5765
@test all(isapprox(z.dists, zp.dists))
5866

5967
for i in 1:5
@@ -66,7 +74,7 @@
6674
end
6775

6876
z = floyd_warshall_shortest_paths(g)
69-
zp = @inferred(Parallel.dijkstra_shortest_paths(g))
77+
zp = @inferred(Parallel.dijkstra_shortest_paths(g; parallel))
7078
@test all(isapprox(z.dists, zp.dists))
7179

7280
for i in 1:5
@@ -79,7 +87,7 @@
7987
end
8088

8189
z = floyd_warshall_shortest_paths(g)
82-
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]))
90+
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]; parallel))
8391
@test all(isapprox(z.dists[1:2, :], zp.dists))
8492

8593
for i in 1:2

test/parallel/traversals/greedy_color.jl

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
1-
@testset "Parallel.Greedy Coloring" begin
1+
@testset "Parallel.Greedy Coloring" for parallel in [:threads, :distributed]
22
g3 = star_graph(10)
33
for g in testgraphs(g3)
44
for op_sort in (true, false)
5-
C = @inferred(Parallel.greedy_color(g, reps=5, sort_degree=op_sort))
5+
C = @inferred(Parallel.greedy_color(g; reps=5, sort_degree=op_sort, parallel))
66
@test C.num_colors == 2
77
end
88
end
99

1010
g4 = path_graph(20)
1111
g5 = complete_graph(20)
1212

13+
let g = testgraphs(g4)[1]
14+
# An error should be reported if the parallel mode could not be understood
15+
@test_throws ArgumentError Parallel.greedy_color(
16+
g; reps=5, sort_degree=false, parallel=:thread
17+
)
18+
@test_throws ArgumentError Parallel.greedy_color(
19+
g; reps=5, sort_degree=false, parallel=:distriibuted
20+
)
21+
end
22+
1323
for graph in [g4, g5]
1424
for g in testgraphs(graph)
1525
for op_sort in (true, false)
16-
C = @inferred(Parallel.greedy_color(g, reps=5, sort_degree=op_sort))
26+
C = @inferred(
27+
Parallel.greedy_color(g; reps=5, sort_degree=op_sort, parallel)
28+
)
1729

1830
@test C.num_colors <= maximum(degree(g)) + 1
1931
correct = true

0 commit comments

Comments
 (0)