@@ -463,8 +463,8 @@ variable takes on all values added to the channel. An empty, closed channel
463
463
causes the ``for `` loop to terminate.
464
464
465
465
466
- Shared Arrays (Experimental)
467
- -----------------------------------------------
466
+ Shared Arrays
467
+ -------------
468
468
469
469
Shared Arrays use system shared memory to map the same array across
470
470
many processes. While there are some similarities to a :class: `DArray `,
@@ -499,15 +499,17 @@ specified, it is called on all the participating workers. You can
499
499
arrange it so that each worker runs the ``init `` function on a
500
500
distinct portion of the array, thereby parallelizing initialization.
501
501
502
- Here's a brief example::
502
+ Here's a brief example:
503
+
504
+ .. doctest ::
503
505
504
506
julia> addprocs(3)
505
- 3-element Array{Any ,1}:
507
+ 3-element Array{Int64 ,1}:
506
508
2
507
509
3
508
510
4
509
511
510
- julia> S = SharedArray(Int, (3,4), init = S -> S[localindexes(S)] = myid())
512
+ julia> S = SharedArray(Int, (3,4), init = S -> S[Base. localindexes(S)] = myid())
511
513
3x4 SharedArray{Int64,2}:
512
514
2 2 3 4
513
515
2 3 3 4
@@ -522,9 +524,11 @@ Here's a brief example::
522
524
2 3 3 4
523
525
2 7 4 4
524
526
525
- :func: `localindexes ` provides disjoint one-dimensional ranges of indexes,
527
+ :func: `Base. localindexes ` provides disjoint one-dimensional ranges of indexes,
526
528
and is sometimes convenient for splitting up tasks among processes.
527
- You can, of course, divide the work any way you wish::
529
+ You can, of course, divide the work any way you wish:
530
+
531
+ .. doctest ::
528
532
529
533
julia> S = SharedArray(Int, (3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = myid())
530
534
3x4 SharedArray{Int64,2}:
@@ -548,6 +552,99 @@ would result in undefined behavior: because each process fills the
548
552
execute (for any particular element of ``S ``) will have its ``pid ``
549
553
retained.
550
554
555
+ As a more extended and complex example, consider running the following
556
+ "kernel" in parallel::
557
+
558
+ q[i,j,t+1] = q[i,j,t] + u[i,j,t]
559
+
560
+ In this case, if we try to split up the work using a one-dimensional
561
+ index, we are likely to run into trouble: if ``q[i,j,t] `` is near the
562
+ end of the block assigned to one worker and ``q[i,j,t+1] `` is near the
563
+ beginning of the block assigned to another, it's very likely that
564
+ ``q[i,j,t] `` will not be ready at the time it's needed for computing
565
+ ``q[i,j,t+1] ``. In such cases, one is better off chunking the array
566
+ manually. Let's split along the second dimension::
567
+
568
+ # This function retuns the (irange,jrange) indexes assigned to this worker
569
+ @everywhere function myrange(q::SharedArray)
570
+ idx = indexpids(q)
571
+ if idx == 0
572
+ # This worker is not assigned a piece
573
+ return 1:0, 1:0
574
+ end
575
+ nchunks = length(procs(q))
576
+ splits = [round(Int, s) for s in linspace(0,size(q,2),nchunks+1)]
577
+ 1:size(q,1), splits[idx]+1:splits[idx+1]
578
+ end
579
+
580
+ # Here's the kernel
581
+ @everywhere function advection_chunk!(q, u, irange, jrange, trange)
582
+ @show (irange, jrange, trange) # display so we can see what's happening
583
+ for t in trange, j in jrange, i in irange
584
+ q[i,j,t+1] = q[i,j,t] + u[i,j,t]
585
+ end
586
+ q
587
+ end
588
+
589
+ # Here's a convenience wrapper for a SharedArray implementation
590
+ @everywhere advection_shared_chunk!(q, u) = advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
591
+
592
+ Now let's compare three different versions, one that runs in a single process::
593
+
594
+ advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1)
595
+
596
+ one that uses ``@parallel ``::
597
+
598
+ function advection_parallel!(q, u)
599
+ for t = 1:size(q,3)-1
600
+ @sync @parallel for j = 1:size(q,2)
601
+ for i = 1:size(q,1)
602
+ q[i,j,t+1]= q[i,j,t] + u[i,j,t]
603
+ end
604
+ end
605
+ end
606
+ q
607
+ end
608
+
609
+ and one that delegates in chunks::
610
+
611
+ function advection_shared!(q, u)
612
+ @sync begin
613
+ for p in procs(q)
614
+ @async remotecall_wait(p, advection_shared_chunk!, q, u)
615
+ end
616
+ end
617
+ q
618
+ end
619
+
620
+ If we create SharedArrays and time these functions, we get the following results (with ``julia -p 4 ``)::
621
+
622
+ q = SharedArray(Float64, (500,500,500))
623
+ u = SharedArray(Float64, (500,500,500))
624
+
625
+ # Run once to JIT-compile
626
+ advection_serial!(q, u)
627
+ advection_parallel!(q, u)
628
+ advection_shared!(q,u)
629
+
630
+ # Now the real results:
631
+ julia> @time advection_serial!(q, u);
632
+ (irange,jrange,trange) = (1:500,1:500,1:499)
633
+ 830.220 milliseconds (216 allocations: 13820 bytes)
634
+
635
+ julia> @time advection_parallel!(q, u);
636
+ 2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
637
+
638
+ julia> @time advection_shared!(q,u);
639
+ From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
640
+ From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
641
+ From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
642
+ From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
643
+ 238.119 milliseconds (2264 allocations: 169 KB)
644
+
645
+ The biggest advantage of ``advection_shared! `` is that it minimizes traffic
646
+ among the workers, allowing each to compute for an extended time on the
647
+ assigned piece.
551
648
552
649
.. _man-clustermanagers :
553
650
0 commit comments