Skip to content

Enhance DArray Distribution with Processor Assignment #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .gitmodules
Empty file.
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ makedocs(;
"Parallel Nested Loops" => "use-cases/parallel-nested-loops.md",
],
"Task Spawning" => "task-spawning.md",
"Task Affinity" => "task-affinity.md",
"Data Management" => "data-management.md",
"Distributed Arrays" => "darray.md",
"Streaming Tasks" => "streaming.md",
Expand Down
266 changes: 265 additions & 1 deletion docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,270 @@ across the workers in the Julia cluster in a relatively even distribution;
future operations on a `DArray` may produce a different distribution from the
one chosen by previous calls.

<!-- -->

### Explicit Processor Mapping of DArray Blocks

This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors within the cluster. Controlling data locality is crucial for optimizing the performance of distributed algorithms.

You can specify the mapping using the optional `assignment` argument in the `DArray` constructor functions (`DArray`, `DVector`, and `DMatrix`), the `distribute` function, and also directly within constructor-like functions such as `rand`, `randn`, `sprand`, `ones`, and `zeros` using the `assignment` optional keyword argument.

The `assignment` argument accepts the following values:

* `:arbitrary` **(Default)**:

* If `assignment` is not provided or is set to symbol `:arbitrary`, Dagger's scheduler assigns blocks to processors automatically. This is the default behavior.

* `:blockrow`:

* Divides the matrix blocks row-wise (vertically in the terminal). Each processor gets a contiguous chunk of row blocks.

* `:blockcol`:

* Divides the matrix blocks column-wise (horizontally in the terminal). Each processor gets a contiguous chunk of column blocks.

* `:cyclicrow`:
* Assigns row-blocks to processors in a round-robin fashion. Blocks are distributed one row-block at a time. Useful for parallel row-wise tasks.

* `:cycliccol`:
* Assigns column-blocks to processors in a round-robin fashion. Blocks are distributed one column-block at a time. Useful for parallel column-wise tasks.

* Any other symbol used for `assignment` results in an error.

* `AbstractArray{<:Int, N}`:

* Provide an integer **N**-dimensional array of worker IDs. The dimension **N** must match the number of dimensions of the `DArray`.
* Dagger maps blocks to worker IDs in a block-cyclic manner according to this processor-array. The block at index `(i,j,...)` is assigned to the first thread of the processor with ID `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.

* `AbstractArray{<:Processor, N}`:

* Provide an **N**-dimensional array of `Processor` objects. The dimension **N** must match the number of dimensions of the `DArray` blocks.
* Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the assignment array. The block at index `(i,j,...)` is assigned to the processor at `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.

#### Examples and Usage

The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array. For functions like `rand`, `randn`, `sprand`, `ones`, and `zeros`, `assignment` is an keyword argument.

* `DArray`: For N-dimensional distributed arrays.

* `DVector`: Specifically for 1-dimensional distributed arrays.

* `DMatrix`: Specifically for 2-dimensional distributed arrays.

* `distribute`: General function to distribute arrays.

* `rand`, `randn`, `sprand`, `ones`, `zeros`: Functions to create DArrays with initial values, also supporting `assignment`.

Here are some examples using a setup with one master processor and three worker processors.

First, let's create some sample arrays for `distribute` (and constructor functions):

```julia
A = rand(7, 11) # 2D array
v = ones(15) # 1D array
M = zeros(5, 5, 5) # 3D array
```

1. **Arbitrary Assignment:**

```julia
Ad = distribute(A, Blocks(2, 2), :arbitrary)
# DMatrix(A, Blocks(2, 2), :arbitrary)

vd = distribute(v, Blocks(3), :arbitrary)
# DVector(v, Blocks(3), :arbitrary)

Md = distribute(M, Blocks(2, 2, 2), :arbitrary)
# DArray(M, Blocks(2,2,2), :arbitrary)

Rd = rand(Blocks(2, 2), 7, 11; assignment=:arbitrary)
# distribute(rand(7, 11), Blocks(2, 2), :arbitrary)
```

This creates distributed arrays with the specified block sizes, and assigns the blocks to processors arbitrarily. For example, the assignment for `Ad` might look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(3, 1)
ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(4, 1)
ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(3, 1)
```

2. **Structured Assignments:**

* **`:blockrow` Assignment:** 

```julia
Ad = distribute(A, Blocks(1, 2), :blockrow)
# DMatrix(A, Blocks(1, 2), :blockrow)

vd = distribute(v, Blocks(3), :blockrow)
# DVector(v, Blocks(3), :blockrow)

Md = distribute(M, Blocks(2, 2, 2), :blockrow)
# DArray(M, Blocks(2,2,2), :blockrow)

Od = ones(Blocks(1, 2), 7, 11; assignment=:blockrow)
# distribute(ones(7, 11), Blocks(1, 2), :blockrow)
```

This creates distributed arrays with the specified block sizes, and assigns contiguous row-blocks to processors evenly. For example, the assignment for `Ad` (and `Od`) will look like this:

```julia
7×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1)
```

* **`:blockcol` Assignment:** 

```julia
Ad = distribute(A, Blocks(2, 2), :blockcol)
# DMatrix(A, Blocks(2, 2), :blockcol)

vd = distribute(v, Blocks(3), :blockcol)
# DVector(v, Blocks(3), :blockcol)

Md = distribute(M, Blocks(2, 2, 2), :blockcol)
# DArray(M, Blocks(2,2,2), :blockcol)

Rd = randn(Blocks(2, 2), 7, 11; assignment=:blockcol)
# distribute(randn(7, 11), Blocks(2, 2), :blockcol)
```

This creates distributed arrays with the specified block sizes, and assigns contiguous column-blocks to processors evenly. For example, the assignment for `Ad` (and `Rd`) will look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
```

* **`:cyclicrow` Assignment:** 

```julia
Ad = distribute(A, Blocks(1, 2), :cyclicrow)
# DMatrix(A, Blocks(1, 2), :cyclicrow)

vd = distribute(v, Blocks(3), :cyclicrow)
# DVector(v, Blocks(3), :cyclicrow)

Md = distribute(M, Blocks(2, 2, 2), :cyclicrow)
# DArray(M, Blocks(2,2,2), :cyclicrow)

Zd = zeros(Blocks(1, 2), 7, 11; assignment=:cyclicrow)
# distribute(zeros(7, 11), Blocks(1, 2), :cyclicrow)
```

This creates distributed arrays with the specified block sizes, and assigns row-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Zd`) will look like this:

```julia
7×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
```

* **`:cycliccol` Assignment:** 

```julia
Ad = distribute(A, Blocks(2, 2), :cycliccol)
# DMatrix(A, Blocks(2, 2), :cycliccol)

vd = distribute(v, Blocks(3), :cycliccol)
# DVector(v, Blocks(3), :cycliccol)

Md = distribute(M, Blocks(2, 2, 2), :cycliccol)
# DArray(M, Blocks(2,2,2), :cycliccol)

Od = ones(Blocks(2, 2), 7, 11; assignment=:cycliccol)
# distribute(ones(7, 11), Blocks(2, 2), :cycliccol)
```

This creates distributed arrays with the specified block sizes, and assigns column-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Od`) will look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
```

3. **Block-Cyclic Assignment with Integer Array:**

```julia
assignment_2d = [2 1; 4 3]
Ad = distribute(A, Blocks(2, 2), assignment_2d)
# DMatrix(A, Blocks(2, 2), [3 1; 4 2])

assignment_1d = [2,3,1,4]
vd = distribute(v, Blocks(3), assignment_1d)
# DVector(v, Blocks(3), [2,3,1,4])

assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3)
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
# DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3))

Rd = sprand(Blocks(2, 2), 7, 11, 0.2; assignment=assignment_2d)
# distribute(sprand(7,11, 0.2), Blocks(2, 2), assignment_2d)
```

The assignment is a integer matrix of `Processor` ID’s, the blocks are assigned in block-cyclic manner to first thread `Processor` ID’s. The assignment for `Ad` (and `Rd`) would be

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)
```

4. **Block-Cyclic Assignment with Processor Array:**

```julia
assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1);
Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)]
Ad = distribute(A, Blocks(2, 2), assignment_2d)
# DMatrix(A, Blocks(2, 2), assignment_2d)

assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)]
vd = distribute(v, Blocks(3), assignment_1d)
# DVector(v, Blocks(3), assignment_1d)

assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)],
[Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3)
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
# DArray(M, Blocks(2, 2, 2), assignment_3d)

Rd = rand(Blocks(2, 2), 7, 11; assignment=assignment_2d))
# distribute(rand(7,11), Blocks(2, 2), assignment_2d)
```

The assignment is a matrix of `Processor` objects, the blocks are assigned in block-cyclic manner to `Processor` objects. The assignment for `Ad` (and `Rd`) would be:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
```

<!-- -->

## Broadcasting

As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's
Expand Down Expand Up @@ -446,4 +710,4 @@ From `LinearAlgebra`:
- `*` (Out-of-place Matrix-(Matrix/Vector) multiply)
- `mul!` (In-place Matrix-Matrix multiply)
- `cholesky`/`cholesky!` (In-place/Out-of-place Cholesky factorization)
- `lu`/`lu!` (In-place/Out-of-place LU factorization (`NoPivot` only))
- `lu`/`lu!` (In-place/Out-of-place LU factorization (`NoPivot` only))
Loading