communicators#
Module for communicators using threadpool and MPI.
This module implements both the p2p and collective communications among multiple GPUs and multiple nodes.
- class tike.communicators.Comm[source]#
Bases:
object
A Ptychography communicator.
Compose the multiprocessing and multithreading communicators to handle synchronization and communication among both GPUs and nodes.
- gpu_count#
The number of GPUs to use per process.
- Type
int
- mpi#
The multi-processing communicator.
- Type
class
- pool#
The multi-threading communicator.
- Type
class
- Allreduce(x, s=None, **kwargs)[source]#
ThreadPool allreduce coupled with MPI allreduce.
- Parameters
x (list) – Chunks to be all-reduced in grouped devices and between processes.
s (int) – The size of a device group. e.g. s=4 and num_gpu=8, then x[:4] will perform all-reduce within workers[:4] while x[4:] will perform all-reduce within workers[4:].
- Allreduce_mean(x, axis=0)[source]#
Multi-process multi-GPU based mean.
- Parameters
x (List[cupy.ndarray]) –
axis (Optional[int]) –
- Return type
cupy.ndarray
- Allreduce_reduce_cpu(x)[source]#
ThreadPool reduce followed by MPI Allreduce.
- Parameters
x (List[cupy.ndarray]) –
- Return type
numpy.ndarray
- Allreduce_reduce_gpu(x)[source]#
ThreadPool reduce followed by MPI Allreduce.
- Parameters
x (List[cupy.ndarray]) –
- Return type
List[cupy.ndarray]
- __init__(gpu_count, mpi=<class 'tike.communicators.mpi.NoMPIComm'>, pool=<class 'tike.communicators.pool.ThreadPool'>)[source]#
- Parameters
mpi (Union[Type[MPIComm], Type[NoMPIComm]]) –
pool (Type[ThreadPool]) –
- reduce(x, dest, s=1, **kwargs)[source]#
ThreadPool reduce from all GPUs to a GPU or CPU.
- Parameters
x (list) – Chunks to be reduced to a device group or the host.
s (int) – The size of the device group. e.g. s=2 and num_gpu=8, then x[::2] will be reduced to workers[0] while x[1::2] will be reduced to workers[1].
- class tike.communicators.MPIComm[source]#
Bases:
MPIio
A class for python MPI wrapper.
Many clusters do not support inter-node GPU-GPU communications, so we first gather the data into main memory then communicate them.
- rank#
The identity of this process.
- Type
int
- size#
The total number of MPI processes.
- Type
int
- MPIio_lamino(*args, axis=0)#
Read data parts to different processes for lamino.
- MPIio_ptycho(scan, *args)#
Read data parts to different processes for ptycho.
- class tike.communicators.ThreadPool[source]#
Bases:
ThreadPoolExecutor
Python thread pool plus scatter gather methods.
A Pool is a context manager which provides access to and communications amongst workers.
- workers#
The number of GPUs to use or a tuple of the device numbers of the GPUs to use. If the number of GPUs is less than the requested number, only workers for the available GPUs are allocated.
- Type
int, tuple(int)
- device_count#
The total number of devices on the host as reported by CUDA runtime.
- Type
int
- num_workers#
Returns len(self.workers). For convenience.
- Type
int
- Raises
ValueError – When invalid GPU device ids are provided. When the current CUDA device does not match the first GPU id in the list of workers.
- __init__(workers, xp=cupy, device_count=None)[source]#
Initializes a new ThreadPoolExecutor instance.
- Parameters
max_workers – The maximum number of threads that can be used to execute the given calls.
thread_name_prefix – An optional name prefix to give our threads.
initializer – A callable used to initialize worker threads.
initargs – A tuple of arguments to pass to the initializer.
workers (Union[int, Tuple[int, ...]]) –
device_count (Optional[int]) –
- all_gather(x, axis=0)[source]#
Concatenate x on all workers along the given axis.
- Parameters
axis (Optional[int]) – Concatenate the gathered arrays long this existing axis; a new leading axis is created if axis is None.
x (List[cupy.ndarray]) –
- Return type
List[cupy.ndarray]
- allreduce(x, stride=None)[source]#
All-reduce x by addition within device groups.
allreduce([0, 1, 2, 3, 4, 5, 6], stride=2) -> [1, 1, 5, 5, 9, 9, 6]
- Parameters
x (list) – Chunks to be all-reduced in grouped devices context.
stride (int) – The size of a device group. e.g. s=4 and num_gpu=8, then x[:4] will perform all-reduce within workers[:4] while x[4:] will perform all-reduce within workers[4:].
- Return type
List[cupy.ndarray]
- bcast(x, stride=1)[source]#
Send each x to all device groups.
- Parameters
x (list) – A list of data to be broadcast.
stride (int > 0) – The stride of the broadcast. e.g. stride=2 and num_gpu=8, then x[0] will be broadcast to workers[::2] while x[1] will go to workers[1::2].
- Return type
List[cupy.ndarray]
- gather(x, worker=None, axis=0)[source]#
Concatenate x on a single worker along the given axis.
- Parameters
axis (Optional[int]) – Concatenate the gathered arrays long this existing axis; a new leading axis is created if axis is None.
x (List[cupy.ndarray]) –
worker (Optional[int]) –
- Return type
cupy.ndarray
- gather_host(x, axis=0)[source]#
Concatenate x on host along the given axis.
- Parameters
axis (Optional[int]) – Concatenate the gathered arrays long this existing axis; a new leading axis is created if axis is None.
x (List[cupy.ndarray]) –
- Return type
numpy.ndarray
- map(func, *iterables, workers=None, **kwargs)[source]#
ThreadPoolExecutor.map, but wraps call in a cuda.Device context.
- Parameters
workers (Optional[Tuple[int, ...]]) –
- Return type
list
- property num_workers#
- reduce_cpu(x)[source]#
Reduce x by addition from all GPUs to a CPU buffer.
- Parameters
x (List[cupy.ndarray]) –
- Return type
numpy.ndarray
- reduce_gpu(x, stride=1, workers=None)[source]#
Reduce x by addition to a device group from all other devices.
reduce_gpu([0, 1, 2, 3, 4], stride=2) -> [6, 4]
- Parameters
x (list) – Chunks to be reduced to a device group.
stride (int) – The stride of the reduction. e.g. stride=2 and num_gpu=8, then x[0::2] will be reduced to workers[0] while x[1::2] will be reduced to workers[1].
workers (Optional[Tuple[int, ...]]) –
- Return type
List[cupy.ndarray]
- reduce_mean(x, axis=0, worker=None)[source]#
Reduce x by addition to one GPU from all other GPUs.
- Parameters
x (List[cupy.ndarray]) –
axis (Optional[int]) –
worker (Optional[int]) –
- Return type
cupy.ndarray
- scatter(x, stride=1)[source]#
Scatter each x with given stride.
scatter_bcast(x=[0, 1], stride=3) -> [0, 0, 0, 1, 1, 1]
Same as scatter_bcast, but with a different communication pattern. In this function, array are copied from their initial devices.
- Parameters
x (list) – Chunks to be sent to other devices.
stride (int) – The size of a device group. e.g. stride=4 and num_gpu=8, then x[0] will be broadcast to workers[:4] while x[1] will go to workers[4:].
- Return type
List[cupy.ndarray]
- scatter_bcast(x, stride=1)[source]#
Scatter each x with given stride and then broadcast nearby.
scatter_bcast(x=[0, 1], stride=3) -> [0, 0, 0, 1, 1, 1]
Same as scatter, but with a different communication pattern. In this function, arrays are first copied to a device in each group, then copied from that device locally.
- Parameters
x (list) – Chunks to be sent and copied.
stride (int) – The stride length of the scatter. e.g. stride=4 and num_gpu=8, then x[0] will be broadcast to workers[:4] while x[1] will go to workers[4:].
- Return type
List[cupy.ndarray]
- shutdown(wait=True, *, cancel_futures=False)#
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters
wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.
cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.