communicators

Module for communicators using threadpool and MPI.

This module implements both the p2p and collective communications among multiple GPUs and multiple nodes.

class tike.communicators.Comm(gpu_count, mpi=<class 'tike.communicators.mpi.MPIComm'>, pool=<class 'tike.communicators.pool.ThreadPool'>, **kwargs)[source]

Bases: object

A Ptychography communicator.

Compose the multiprocessing and multithreading communicators to handle synchronization and communication among both GPUs and nodes.

gpu_count

The number of GPUs to use per process.

Type

int

mpi

The multi-processing communicator.

Type

class

pool

The multi-threading communicator.

Type

class

Allreduce_reduce(x, dest, **kwargs)[source]

ThreadPool reduce coupled with MPI allreduce.

reduce(x, dest, **kwargs)[source]

ThreadPool reduce from all GPUs to a GPU or CPU.

class tike.communicators.MPIComm[source]

Bases: object

A class for python MPI wrapper.

Many clusters do not support inter-node GPU-GPU communications, so we first gather the data into main memory then communicate them.

rank

The identity of this process.

Type

int

size

The total number of MPI processes.

Type

int

Allreduce(sendbuf, op=mpi4py.MPI.SUM)[source]

Combines data from all processes and distributes the result back to all processes.

Bcast(data, root: int = 0)[source]

Send data from a root to all processes.

Gather(sendbuf, dest: int = 0)[source]

Take data from all processes into one destination.

MPIio(scan, data)[source]

Read data parts to different processes.

Scatter(sendbuf, src: int = 0)[source]

Spread data from a source to all processes.

p2p(sendbuf, src=0, dest=1, tg=0, **kwargs)[source]

Send data from a source to a designated destination.

class tike.communicators.ThreadPool(workers)[source]

Bases: concurrent.futures.thread.ThreadPoolExecutor

Python thread pool plus scatter gather methods.

A Pool is a context manager which provides access to and communications amongst workers.

workers

The number of GPUs to use or a tuple of the device numbers of the GPUs to use. If the number of GPUs is less than the requested number, only workers for the available GPUs are allocated.

Type

int, tuple(int)

Raises

ValueError – When invalid GPU device ids are provided. When the current CUDA device does not match the first GPU id in the list of workers.

all_gather(x: list, axis=0)list[source]

Concatenate x on all workers along the given axis.

bcast(x: cupy.array)list[source]

Send a copy of x to all workers.

gather(x: list, worker=None, axis=0)cupy.array[source]

Concatenate x on a single worker along the given axis.

map(func, *iterables, **kwargs)[source]

ThreadPoolExecutor.map, but wraps call in a cuda.Device context.

reduce_cpu(x, buf=None)[source]

Reduce x by addition from all GPUs to a CPU buffer.

reduce_gpu(x: list, worker=None)cupy.array[source]

Reduce x by addition to one GPU from all other GPUs.

reduce_mean(x: list, axis, worker=None)cupy.array[source]

Reduce x by addition to one GPU from all other GPUs.

scatter(x)[source]

Split x along 0th dimension and send chunks to workers`.

shutdown(wait=True)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters

wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

submit(**kwargs)

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

Returns

A Future representing the given call.