Skip to content

Add missing parallel=:threads implementations #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions src/Parallel/distance.jl
Original file line number Diff line number Diff line change
@@ -1,20 +1,55 @@
# used in shortest path calculations

function eccentricity(
g::AbstractGraph,
vs=vertices(g),
distmx::AbstractMatrix{T}=weights(g);
parallel::Symbol=:distributed,
) where {T<:Number}
return if parallel === :threads
threaded_eccentricity(g, vs, distmx)
elseif parallel === :distributed
distr_eccentricity(g, vs, distmx)
else
throw(
ArgumentError(
"Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')",
),
)
end
end

function distr_eccentricity(
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
) where {T<:Number}
vlen = length(vs)
eccs = SharedVector{T}(vlen)
@sync @distributed for i in 1:vlen
eccs[i] = maximum(Graphs.dijkstra_shortest_paths(g, vs[i], distmx).dists)
local d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
eccs[i] = maximum(d.dists)
end
d = sdata(eccs)
maximum(d) == typemax(T) && @warn("Infinite path length detected")
return d
end

function eccentricity(g::AbstractGraph, distmx::AbstractMatrix)
return eccentricity(g, vertices(g), distmx)
function threaded_eccentricity(
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
) where {T<:Number}
vlen = length(vs)
eccs = Vector{T}(undef, vlen)
Base.Threads.@threads for i in 1:vlen
d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
eccs[i] = maximum(d.dists)
end

Check warning on line 44 in src/Parallel/distance.jl

View check run for this annotation

Codecov / codecov/patch

src/Parallel/distance.jl#L44

Added line #L44 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird that the end is not covered by the tests - the code inside the loop is, right? If it is indeed then we might just merge it anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That has been a common issue for me with macro-transformed begin-end blocks. I have never bothered delving into why it is happening though. Macro-heavy libraries like ResumableFunctions.jl and ConcurrentSim.jl suffer from that (and from other more severe macro-covereage issues).

maximum(eccs) == typemax(T) && @warn("Infinite path length detected")
return eccs
end

function eccentricity(
g::AbstractGraph, distmx::AbstractMatrix; parallel::Symbol=:distributed
)
return eccentricity(g, vertices(g), distmx; parallel)
end

function diameter(g::AbstractGraph, distmx::AbstractMatrix=weights(g))
Expand Down
44 changes: 42 additions & 2 deletions src/Parallel/shortestpaths/dijkstra.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,54 @@
end

"""
Parallel.dijkstra_shortest_paths(g, sources=vertices(g), distmx=weights(g))
Parallel.dijkstra_shortest_paths(g, sources=vertices(g), distmx=weights(g), parallel=:distributed)

Compute the shortest paths between all pairs of vertices in graph `g` by running
[`dijkstra_shortest_paths`] for every vertex and using an optional list of source vertex `sources` and
an optional distance matrix `distmx`. Return a [`Parallel.MultipleDijkstraState`](@ref) with relevant
traversal information.
traversal information. The `parallel` argument can be set to `:threads` or `:distributed` for multi-
threaded or multi-process parallelism, respectively.
"""
function dijkstra_shortest_paths(
g::AbstractGraph{U},
sources=vertices(g),
distmx::AbstractMatrix{T}=weights(g);
parallel::Symbol=:distributed,
) where {T<:Number} where {U}
return if parallel === :threads
threaded_dijkstra_shortest_paths(g, sources, distmx)
elseif parallel === :distributed
distr_dijkstra_shortest_paths(g, sources, distmx)
else
throw(
ArgumentError(
"Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')",
),
)
end
end

function threaded_dijkstra_shortest_paths(
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
) where {T<:Number} where {U}
n_v = nv(g)
r_v = length(sources)

# TODO: remove `Int` once julialang/#23029 / #23032 are resolved
dists = Matrix{T}(undef, Int(r_v), Int(n_v))
parents = Matrix{U}(undef, Int(r_v), Int(n_v))

Base.Threads.@threads for i in 1:r_v
state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx)
dists[i, :] = state.dists
parents[i, :] = state.parents
end

Check warning on line 53 in src/Parallel/shortestpaths/dijkstra.jl

View check run for this annotation

Codecov / codecov/patch

src/Parallel/shortestpaths/dijkstra.jl#L53

Added line #L53 was not covered by tests

result = MultipleDijkstraState(dists, parents)
return result
end

function distr_dijkstra_shortest_paths(
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
) where {T<:Number} where {U}
n_v = nv(g)
Expand Down
36 changes: 33 additions & 3 deletions src/Parallel/traversals/greedy_color.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
function random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
function random_greedy_color(
g::AbstractGraph{T}, reps::Integer; parallel::Symbol=:distributed
) where {T<:Integer}
return if parallel === :threads
threaded_random_greedy_color(g, reps)
elseif parallel === :distributed
distr_random_greedy_color(g, reps)
else
throw(
ArgumentError(
"Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')",
),
)
end
end

function threaded_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
local_best = Vector{Graphs.Coloring{T}}(undef, reps)
Base.Threads.@threads for i in 1:reps
seq = shuffle(vertices(g))
local_best[i] = Graphs.perm_greedy_color(g, seq)
end

Check warning on line 22 in src/Parallel/traversals/greedy_color.jl

View check run for this annotation

Codecov / codecov/patch

src/Parallel/traversals/greedy_color.jl#L22

Added line #L22 was not covered by tests
best = reduce(Graphs.best_color, local_best)

return convert(Graphs.Coloring{T}, best)
end

function distr_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
best = @distributed (Graphs.best_color) for i in 1:reps
seq = shuffle(vertices(g))
Graphs.perm_greedy_color(g, seq)
Expand All @@ -8,11 +35,14 @@
end

function greedy_color(
g::AbstractGraph{U}; sort_degree::Bool=false, reps::Integer=1
g::AbstractGraph{U};
sort_degree::Bool=false,
reps::Integer=1,
parallel::Symbol=:distributed,
) where {U<:Integer}
return if sort_degree
Graphs.degree_greedy_color(g)
else
Parallel.random_greedy_color(g, reps)
Parallel.random_greedy_color(g, reps; parallel)
end
end
12 changes: 9 additions & 3 deletions test/parallel/distance.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@testset "Parallel.Distance" begin
@testset "Parallel.Distance" for parallel in [:threads, :distributed]
g4 = path_digraph(5)
adjmx1 = [0 1 0; 1 0 1; 0 1 0] # graph
adjmx2 = [0 1 0; 1 0 1; 1 1 0] # digraph
Expand All @@ -9,7 +9,7 @@

for g in testgraphs(a1)
z = @inferred(Graphs.eccentricity(g, distmx1))
y = @inferred(Parallel.eccentricity(g, distmx1))
y = @inferred(Parallel.eccentricity(g, distmx1; parallel))
@test isapprox(y, z)
@test @inferred(Graphs.diameter(y)) ==
@inferred(Parallel.diameter(g, distmx1)) ==
Expand All @@ -21,9 +21,15 @@
@test @inferred(Graphs.center(y)) == @inferred(Parallel.center(g, distmx1)) == [2]
end

let g = testgraphs(a1)[1]
# An error should be reported if the parallel mode could not be understood
@test_throws ArgumentError Parallel.eccentricity(g, distmx1; parallel=:thread)
@test_throws ArgumentError Parallel.eccentricity(g, distmx1; parallel=:distriibuted)
end

for g in testdigraphs(a2)
z = @inferred(Graphs.eccentricity(g, distmx2))
y = @inferred(Parallel.eccentricity(g, distmx2))
y = @inferred(Parallel.eccentricity(g, distmx2; parallel))
@test isapprox(y, z)
@test @inferred(Graphs.diameter(y)) ==
@inferred(Parallel.diameter(g, distmx2)) ==
Expand Down
22 changes: 15 additions & 7 deletions test/parallel/shortestpaths/dijkstra.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@testset "Parallel.Dijkstra" begin
@testset "Parallel.Dijkstra" for parallel in [:threads, :distributed]
g4 = path_digraph(5)
d1 = float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0])
d2 = sparse(float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0]))
Expand All @@ -8,7 +8,7 @@

for g in testgraphs(g3)
z = floyd_warshall_shortest_paths(g, d)
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d))
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d; parallel))
@test all(isapprox(z.dists, zp.dists))

for i in 1:5
Expand All @@ -21,7 +21,7 @@
end

z = floyd_warshall_shortest_paths(g)
zp = @inferred(Parallel.dijkstra_shortest_paths(g))
zp = @inferred(Parallel.dijkstra_shortest_paths(g; parallel))
@test all(isapprox(z.dists, zp.dists))

for i in 1:5
Expand All @@ -34,7 +34,7 @@
end

z = floyd_warshall_shortest_paths(g)
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]))
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]; parallel))
@test all(isapprox(z.dists[1:2, :], zp.dists))

for i in 1:2
Expand All @@ -51,9 +51,17 @@
g3 = path_digraph(5)
d = float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0])

# An error should be reported if the parallel mode could not be understood
@test_throws ArgumentError Parallel.dijkstra_shortest_paths(
testdigraphs(g3)[1], collect(1:5), d; parallel=:thread
)
@test_throws ArgumentError Parallel.dijkstra_shortest_paths(
testdigraphs(g3)[1], collect(1:5), d; parallel=:distriibuted
)

for g in testdigraphs(g3)
z = floyd_warshall_shortest_paths(g, d)
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d))
zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d; parallel))
@test all(isapprox(z.dists, zp.dists))

for i in 1:5
Expand All @@ -66,7 +74,7 @@
end

z = floyd_warshall_shortest_paths(g)
zp = @inferred(Parallel.dijkstra_shortest_paths(g))
zp = @inferred(Parallel.dijkstra_shortest_paths(g; parallel))
@test all(isapprox(z.dists, zp.dists))

for i in 1:5
Expand All @@ -79,7 +87,7 @@
end

z = floyd_warshall_shortest_paths(g)
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]))
zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]; parallel))
@test all(isapprox(z.dists[1:2, :], zp.dists))

for i in 1:2
Expand Down
18 changes: 15 additions & 3 deletions test/parallel/traversals/greedy_color.jl
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
@testset "Parallel.Greedy Coloring" begin
@testset "Parallel.Greedy Coloring" for parallel in [:threads, :distributed]
g3 = star_graph(10)
for g in testgraphs(g3)
for op_sort in (true, false)
C = @inferred(Parallel.greedy_color(g, reps=5, sort_degree=op_sort))
C = @inferred(Parallel.greedy_color(g; reps=5, sort_degree=op_sort, parallel))
@test C.num_colors == 2
end
end

g4 = path_graph(20)
g5 = complete_graph(20)

let g = testgraphs(g4)[1]
# An error should be reported if the parallel mode could not be understood
@test_throws ArgumentError Parallel.greedy_color(
g; reps=5, sort_degree=false, parallel=:thread
)
@test_throws ArgumentError Parallel.greedy_color(
g; reps=5, sort_degree=false, parallel=:distriibuted
)
end

for graph in [g4, g5]
for g in testgraphs(graph)
for op_sort in (true, false)
C = @inferred(Parallel.greedy_color(g, reps=5, sort_degree=op_sort))
C = @inferred(
Parallel.greedy_color(g; reps=5, sort_degree=op_sort, parallel)
)

@test C.num_colors <= maximum(degree(g)) + 1
correct = true
Expand Down
Loading