Parallel & Distributed Computing For Data Enthusiasts

Discuss parallel and distributed computing fundamentals behind data applications

Follow publication

Visual Guide to Distribution Patterns for Arrays in MPI, NCCL

When multiple processes are involved in a parallel computation, they must communicate periodically to synchronize the data. Many libraries and APIs are available to program such operations. Message Passing Interface (MPI) provides a standard interface for such operations.

Message Passing Interface (MPI) implementations provide distributed operations so that we can write our program as a set of parallel processes. The standard offers APIs for point-to-point communication and array-based distributed operations called collectives. Here are some popular implementations of the MPI standard.

  1. MPICH — One of the earliest MPI implementations. Actively developed and used in many supercomputers.
  2. OpenMPI — Actively developed and widely used MPI implementation. The project started by merging three MPI implementations: FT-MPI from the University of Tennessee, LA-MPI from Los Alamos National Laboratory, and LAM/MPI from Indiana University.
  3. MVAPICH — Also called MVAPICH2, developed by Ohio State University.

NCCL and UCC are other libraries that provide such operations. They don’t adhere to the same API standard as MPI but provide similar operations. In high-performance computing, these operations are termed collective operations. They provide the foundation for scaling HPC applications for thousands of nodes and are vital to the distributed training of deep learning models.

In the following sections, let's examine some of these operations visually and try to understand how they are implemented to transfer data.

Broadcast

A broadcast operation sends a copy of data to multiple processes from a single process. Broadcast is easy to understand and can be found in all major systems designed to process data in parallel. The figure below shows five parallel processes with IDs assigned from 0 to 4. The 0th process has an array that must broadcast to the rest; at the end of the operation, all the processes will have a copy of this array.

Broadcast operation

Broadcast does not consider the structure of the array, meaning it can be anything, such as a byte array created from a serialized object.

Reduce and AllReduce

Reduce is a popular operation for computations on arrays. A reduction is associated with a function, and some famous examples are:

  • Sum
  • Multiply
  • Max
  • Min
  • Logical or
  • Logical and

Commutativity is a crucial factor when considering reductions. All those listed above are commutative functions. An example of a noncommutative function is division. A binary operator is commutative if it satisfies the following condition.

If a reduction function is communicative, the order in which it is applied to data does not matter. But if it is noncommutative, the order of the reduction is essential.

Now, let us look at the semantics of reductions on arrays. Each distributed process has the same size array. The values in every array position are reduced individually with the corresponding values from other arrays. The result is an array of the same size and data type. Here are four arrays in the processes numbered from 0 to 3.

0 = [10, 12]

1 = [20, 2]

2 = [40, 6]

3 = [10, 10]

The Reduce result for a sum function of these arrays is another array, R = [80, 30]. This can be seen in the figure below, where the arrays are placed in process with IDs 1.

Reduce Operation

AllReduce is semantically equivalent to Reduce followed by Broadcast, meaning reduced values are transferred to all the targets of the operation. This raises the question: why have a separate operation called AllReduce if Reduce and Broadcast already exist in the first place?

First off, this is a heavily used operation in parallel programs. As such, it deserves its place since it would be unreasonable to expect users to use two operations for one of the most popular capabilities parallel programs require. Second, we can implement AllReduce more efficiently than Reduce, followed by Broadcast implementations. The figure below shows the same example with AllReduce instead of Reduce. Now, the reduced value is distributed to all the tasks.

AllReduce Operation

Gather and AllGather

As the name suggests, the Gather operation takes values from multiple locations and groups them to form a large set. Depending on preference, implementations can preserve the source that produced the data when creating the set. In classic parallel computing, the values are ordered according to the process identification number. A gather operation is displayed in the figure below.

Gather Operation

Like AllReduce, the AllGather operation is equivalent to combining Gather followed by Broadcast. It is implemented separately due to its importance and possible implementation optimizations. The figure below shows an AllGather example.

AllGather Operation

Scatter

Scatter is the opposite of the Gather operation, as it distributes the values in a single process to many processes according to a criterion specified by the user. Scatter operation from Process 0 to four other processes is shown in the following figure.

Scatter Operation

In the preceding Scatter operation, the first two consecutive values are sent to the first target, while the second set of consecutive values go to the second target.

AllToAll

AllToAll can be described as every process performing a scatter operation to all the other processes simultaneously. The figure below shows four arrays in four processes, each with four elements. The 0th element of these arrays goes to the 0th process, and the 1st element goes to the 1st process. Once we rearrange the arrays like this, four new arrays are created in the four processes.

AllToAll Operation

Optimized Operations

Messaging for parallel applications offers many challenges, especially when working in large distributed environments. Much research has been devoted to improving these operations, with billions of dollars of research funding going toward enhancing the messaging libraries and network hardware for large-scale computations.

Let us take the previous example of summing up a set of values distributed over several processes. The approach we described before was to send the values from all processes to a single process. This is effective for small messages with a correspondingly small number of parallel processes. Imagine we need to send 1MB of data from 1000 parallel processes. That single process needs to receive gigabytes of data, which can take about 1 second with a 10Gbps Ethernet connection with a theoretical throughput. After receiving 1GB of this data, the process must go through and add them.

The network becomes congested when a thousand processes try to send 1MB messages to a single process. It drastically slows down, so it can take several seconds for all the messages to be delivered. This can lead to network congestion and large sequential computations for a single process. In the 1980s, computer scientists realized they could use different routing algorithms to send messages and avoid these problems. Such algorithms are now called collective algorithms and are widely used in optimizing these distributed operations.

Broadcast

The most straightforward approach to implementing Broadcast is to create N connections to the target processes and send the value through. In this model, if it takes time to put the broadcast value into the network link, it will ultimately take time to send the value to all N processes. This approach is illustrated in the figure below. It is termed a ‘flat tree’ as it forms a tree with the source at the root and all the other workers at the leaves. This method works better if N is small and takes much longer when either N or the message size increases.

Broadcast with a flat tree

Some methods can perform much better for many processes. One such option is to arrange the processes in a binary tree, as shown below, and send the values through the tree. The source sends the value to two targets; these, in turn, simultaneously send the value to four targets, which transfer the value to eight targets, and so on, until it reaches all targets. The parallelism of the operation increases exponentially, and as it expands, it uses more of the total available bandwidth of the network. Theoretically, it takes steps to broadcast the values to all the nodes. When N is large, this can significantly reduce the time required.

Broadcast with a binary tree

The tree approach only works well for smaller values that only partially utilize the network. In the case of large messages, there is still a bottleneck in this approach since every node other than the leaves tries to send the value to two processes simultaneously. Chaining and double trees can further optimize the operation for more significant data. It is important to note that some computer networks have built-in capabilities to broadcast values, and implementations sometimes exploit these features.

Reduce

Depending on the message sizes, different routing algorithms can be utilized to optimize the performance of the Reduce. For smaller values, routing based on tree structures is optimal. The tree-based routing algorithms arrange the participating processes in an inverted tree. The tree's root is the target that receives the final reduced value, and the rest of the tree is the source producing the data to be reduced. As in the broadcast case, the reduction can be done in log N steps for N parallel processes.

AllReduce

The simplest form of AllReduce implementation is a Reduce followed by a Broadcast. This works well for small messages. However, the network is not fully utilized when the message size is large. A ring-based routing algorithm or recursive doubling algorithm can be used to optimize the network bandwidth utilization.

Ring Algorithm — In this version, data from each process are sent around a virtual ring created by the process connecting to the number of processes. In the first step, each process sends data to its connected process. In the next step, each process sends the data it received in the previous step to its connected process. This is done in steps. The algorithm fully utilizes the available network bandwidth by activating all the network links equally at every step. It is best suited for significant data reductions. The figure below shows a reduced set of values in four processes in three steps.

AllReduce with Ring

Recursive doubling — The algorithm can be seen in the figure below. Each pair of processes will start by exchanging the data to their corresponding peer at a distance 1. However, at each step i, this distance will double; in other words, it will be 2i until all steps are completed. This algorithm takes log N steps to communicate the data between the processes. The figure below demonstrates how this algorithm works for four processes. The algorithm has better latency characteristics than the previous one but does not utilize the network as efficiently.

AllReduce with recursive doubling

Gather and AllGather Collective Algorithms

Gather collective algorithms, which are inverted versions of the broadcast algorithms. For small message sizes, we can use flat trees or binary trees. Gather produces a large dataset when the data size in individual processes or the number of parallel processes increases. Chaining algorithms are available for more significant messages.

For AllGather operations, Gather followed by Broadcast is a viable option for small messages with a small number of processes. AllGather can transfer more significant messages as it needs to distribute the gathered value to all the other processes. For these cases, one can use the ring or the recursive doubling algorithm described in AllReduce.

Scatter and AllToAll Collective Algorithms

Scatter utilizes collective algorithms found in broadcast to send the values present in a single process to multiple processes. AllToAll can involve many communications due to sending data from each process to all the other processes. It can be optimized using a chaining algorithm. This algorithm works by arranging the processes in a dynamic chain configuration. At the kth step, the process sends data to the where is the number of processes. We will discuss this algorithm later, along with the shuffle.

Summary

In this article, we examined the high-level semantics of collective operations on arrays and some algorithms for efficiently implementing them.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Parallel & Distributed Computing For Data Enthusiasts
Parallel & Distributed Computing For Data Enthusiasts

Published in Parallel & Distributed Computing For Data Enthusiasts

Discuss parallel and distributed computing fundamentals behind data applications

Supun Kamburugamuve
Supun Kamburugamuve

Written by Supun Kamburugamuve

Co-Author of "Foundations of Data Intensive Applications"

No responses yet

Write a response