communicators#

Module for communicators using threadpool and MPI.

This module implements both the p2p and collective communications among multiple GPUs and multiple nodes.

class tike.communicators.Comm[source]#

Bases: object

A Ptychography communicator.

Compose the multiprocessing and multithreading communicators to handle synchronization and communication among both GPUs and nodes.

gpu_count#

The number of GPUs to use per process.

Type

int

mpi#

The multi-processing communicator.

Type

class

pool#

The multi-threading communicator.

Type

class

Allreduce(x, s=None, **kwargs)[source]#

ThreadPool allreduce coupled with MPI allreduce.

Parameters
  • x (list) – Chunks to be all-reduced in grouped devices and between processes.

  • s (int) – The size of a device group. e.g. s=4 and num_gpu=8, then x[:4] will perform all-reduce within workers[:4] while x[4:] will perform all-reduce within workers[4:].

Allreduce_mean(x, axis=0)[source]#

Multi-process multi-GPU based mean.

Parameters
  • x (List[cupy.ndarray]) –

  • axis (Optional[int]) –

Return type

cupy.ndarray

Allreduce_reduce_cpu(x)[source]#

ThreadPool reduce followed by MPI Allreduce.

Parameters

x (List[cupy.ndarray]) –

Return type

numpy.ndarray

Allreduce_reduce_gpu(x)[source]#

ThreadPool reduce followed by MPI Allreduce.

Parameters

x (List[cupy.ndarray]) –

Return type

List[cupy.ndarray]

__init__(gpu_count, mpi=<class 'tike.communicators.mpi.NoMPIComm'>, pool=<class 'tike.communicators.pool.ThreadPool'>)[source]#
Parameters
reduce(x, dest, s=1, **kwargs)[source]#

ThreadPool reduce from all GPUs to a GPU or CPU.

Parameters
  • x (list) – Chunks to be reduced to a device group or the host.

  • s (int) – The size of the device group. e.g. s=2 and num_gpu=8, then x[::2] will be reduced to workers[0] while x[1::2] will be reduced to workers[1].

class tike.communicators.MPIComm[source]#

Bases: MPIio

A class for python MPI wrapper.

Many clusters do not support inter-node GPU-GPU communications, so we first gather the data into main memory then communicate them.

rank#

The identity of this process.

Type

int

size#

The total number of MPI processes.

Type

int

Allgather(sendbuf, *args, **kwargs)[source]#
Allreduce(sendbuf, *args, **kwargs)[source]#
Bcast(sendbuf, *args, **kwargs)[source]#
Gather(sendbuf, *args, **kwargs)[source]#
MPIio_lamino(*args, axis=0)#

Read data parts to different processes for lamino.

MPIio_ptycho(scan, *args)#

Read data parts to different processes for ptycho.

__init__()[source]#
bcast(sendobj, root=0)[source]#

Send a Python object from a root to all processes.

Parameters
  • sendobj (Any) –

  • root (int) –

Return type

Any

class tike.communicators.ThreadPool[source]#

Bases: ThreadPoolExecutor

Python thread pool plus scatter gather methods.

A Pool is a context manager which provides access to and communications amongst workers.

workers#

The number of GPUs to use or a tuple of the device numbers of the GPUs to use. If the number of GPUs is less than the requested number, only workers for the available GPUs are allocated.

Type

int, tuple(int)

device_count#

The total number of devices on the host as reported by CUDA runtime.

Type

int

num_workers#

Returns len(self.workers). For convenience.

Type

int

Raises

ValueError – When invalid GPU device ids are provided. When the current CUDA device does not match the first GPU id in the list of workers.

__init__(workers, xp=cupy, device_count=None)[source]#

Initializes a new ThreadPoolExecutor instance.

Parameters
  • max_workers – The maximum number of threads that can be used to execute the given calls.

  • thread_name_prefix – An optional name prefix to give our threads.

  • initializer – A callable used to initialize worker threads.

  • initargs – A tuple of arguments to pass to the initializer.

  • workers (Union[int, Tuple[int, ...]]) –

  • device_count (Optional[int]) –

all_gather(x, axis=0)[source]#

Concatenate x on all workers along the given axis.

Parameters
  • axis (Optional[int]) – Concatenate the gathered arrays long this existing axis; a new leading axis is created if axis is None.

  • x (List[cupy.ndarray]) –

Return type

List[cupy.ndarray]

allreduce(x, stride=None)[source]#

All-reduce x by addition within device groups.

allreduce([0, 1, 2, 3, 4, 5, 6], stride=2) -> [1, 1, 5, 5, 9, 9, 6]

Parameters
  • x (list) – Chunks to be all-reduced in grouped devices context.

  • stride (int) – The size of a device group. e.g. s=4 and num_gpu=8, then x[:4] will perform all-reduce within workers[:4] while x[4:] will perform all-reduce within workers[4:].

Return type

List[cupy.ndarray]

bcast(x, stride=1)[source]#

Send each x to all device groups.

Parameters
  • x (list) – A list of data to be broadcast.

  • stride (int > 0) – The stride of the broadcast. e.g. stride=2 and num_gpu=8, then x[0] will be broadcast to workers[::2] while x[1] will go to workers[1::2].

Return type

List[cupy.ndarray]

gather(x, worker=None, axis=0)[source]#

Concatenate x on a single worker along the given axis.

Parameters
  • axis (Optional[int]) – Concatenate the gathered arrays long this existing axis; a new leading axis is created if axis is None.

  • x (List[cupy.ndarray]) –

  • worker (Optional[int]) –

Return type

cupy.ndarray

gather_host(x, axis=0)[source]#

Concatenate x on host along the given axis.

Parameters
  • axis (Optional[int]) – Concatenate the gathered arrays long this existing axis; a new leading axis is created if axis is None.

  • x (List[cupy.ndarray]) –

Return type

numpy.ndarray

map(func, *iterables, workers=None, **kwargs)[source]#

ThreadPoolExecutor.map, but wraps call in a cuda.Device context.

Parameters

workers (Optional[Tuple[int, ...]]) –

Return type

list

property num_workers#
reduce_cpu(x)[source]#

Reduce x by addition from all GPUs to a CPU buffer.

Parameters

x (List[cupy.ndarray]) –

Return type

numpy.ndarray

reduce_gpu(x, stride=1, workers=None)[source]#

Reduce x by addition to a device group from all other devices.

reduce_gpu([0, 1, 2, 3, 4], stride=2) -> [6, 4]

Parameters
  • x (list) – Chunks to be reduced to a device group.

  • stride (int) – The stride of the reduction. e.g. stride=2 and num_gpu=8, then x[0::2] will be reduced to workers[0] while x[1::2] will be reduced to workers[1].

  • workers (Optional[Tuple[int, ...]]) –

Return type

List[cupy.ndarray]

reduce_mean(x, axis=0, worker=None)[source]#

Reduce x by addition to one GPU from all other GPUs.

Parameters
  • x (List[cupy.ndarray]) –

  • axis (Optional[int]) –

  • worker (Optional[int]) –

Return type

cupy.ndarray

scatter(x, stride=1)[source]#

Scatter each x with given stride.

scatter_bcast(x=[0, 1], stride=3) -> [0, 0, 0, 1, 1, 1]

Same as scatter_bcast, but with a different communication pattern. In this function, array are copied from their initial devices.

Parameters
  • x (list) – Chunks to be sent to other devices.

  • stride (int) – The size of a device group. e.g. stride=4 and num_gpu=8, then x[0] will be broadcast to workers[:4] while x[1] will go to workers[4:].

Return type

List[cupy.ndarray]

scatter_bcast(x, stride=1)[source]#

Scatter each x with given stride and then broadcast nearby.

scatter_bcast(x=[0, 1], stride=3) -> [0, 0, 0, 1, 1, 1]

Same as scatter, but with a different communication pattern. In this function, arrays are first copied to a device in each group, then copied from that device locally.

Parameters
  • x (list) – Chunks to be sent and copied.

  • stride (int) – The stride length of the scatter. e.g. stride=4 and num_gpu=8, then x[0] will be broadcast to workers[:4] while x[1] will go to workers[4:].

Return type

List[cupy.ndarray]

shutdown(wait=True, *, cancel_futures=False)#

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, /, *args, **kwargs)#

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

Returns

A Future representing the given call.