Source code for tike.communicators.pool

"""Defines a worker Pool for multi-device managerment."""

__author__ = "Daniel Ching"
__copyright__ = "Copyright (c) 2020, UChicago Argonne, LLC."
__docformat__ = 'restructuredtext en'

from concurrent.futures import ThreadPoolExecutor
import os
import warnings

import cupy as cp


[docs]class ThreadPool(ThreadPoolExecutor): """Python thread pool plus scatter gather methods. A Pool is a context manager which provides access to and communications amongst workers. Attributes ---------- workers : int, tuple(int) The number of GPUs to use or a tuple of the device numbers of the GPUs to use. If the number of GPUs is less than the requested number, only workers for the available GPUs are allocated. Raises ------ ValueError When invalid GPU device ids are provided. When the current CUDA device does not match the first GPU id in the list of workers. """ def __init__(self, workers): self.device_count = cp.cuda.runtime.getDeviceCount() if type(workers) is int: if workers < 1: raise ValueError(f"Provide workers > 0, not {workers}.") if workers > self.device_count: warnings.warn( "Not enough CUDA devices for workers!" f" Requested {workers} of {self.device_count} devices.") workers = min(workers, self.device_count) if workers == 1: # Respect "with cp.cuda.Device()" blocks for single thread workers = (cp.cuda.Device().id,) else: workers = tuple(range(workers)) for w in workers: if w < 0 or w >= self.device_count: raise ValueError(f'{w} is not a valid GPU device number.') if workers[0] != cp.cuda.Device().id: raise ValueError( "The primary worker must be the current device. " f"Use `with cupy.cuda.Device({workers[0]}):` to set the " "current device.") self.workers = workers self.num_workers = len(workers) self.xp = cp super().__init__(self.num_workers) def _copy_to(self, x, worker: int) -> cp.array: with cp.cuda.Device(worker): return self.xp.asarray(x)
[docs] def bcast(self, x: cp.array) -> list: """Send a copy of x to all workers.""" def f(worker): return self._copy_to(x, worker) return list(self.map(f, self.workers))
[docs] def gather(self, x: list, worker=None, axis=0) -> cp.array: """Concatenate x on a single worker along the given axis.""" if self.num_workers == 1: return x[0] worker = self.workers[0] if worker is None else worker with cp.cuda.Device(worker): return self.xp.concatenate( [self._copy_to(part, worker) for part in x], axis, )
[docs] def all_gather(self, x: list, axis=0) -> list: """Concatenate x on all workers along the given axis.""" def f(worker): return self.gather(x, worker, axis) return list(self.map(f, self.workers))
[docs] def scatter(self, x): """Split x along 0th dimension and send chunks to workers`.""" def f(worker, chunk): return self._copy_to(chunk, worker) return self.map(f, self.workers, x)
[docs] def reduce_gpu(self, x: list, worker=None) -> cp.array: """Reduce x by addition to one GPU from all other GPUs.""" if self.num_workers == 1: return x[0] worker = self.workers[0] if worker is None else worker with cp.cuda.Device(worker): for part in x[:worker]: x[worker] += self._copy_to(part, worker) for part in x[(worker + 1):]: x[worker] += self._copy_to(part, worker) return x[worker]
[docs] def reduce_cpu(self, x, buf=None): """Reduce x by addition from all GPUs to a CPU buffer.""" buf = 0 if buf is None else buf buf += sum([self.xp.asnumpy(part) for part in x]) return buf
[docs] def reduce_mean(self, x: list, axis, worker=None) -> cp.array: """Reduce x by addition to one GPU from all other GPUs.""" if self.num_workers == 1: return x[0] worker = self.workers[0] if worker is None else worker return cp.mean(self.gather(x, worker=worker, axis=axis), keepdims=True, axis=axis)
[docs] def map(self, func, *iterables, **kwargs): """ThreadPoolExecutor.map, but wraps call in a cuda.Device context.""" def f(worker, *args): with cp.cuda.Device(worker): return func(*args, **kwargs) return list(super().map(f, self.workers, *iterables))