Source code for wannierberri.parallel

import os
import warnings


[docs] class Parallel(): """ a class to store parameters of parallel evaluation Parameters ----------- num_cpus : int number of parallel processes. If `None` - automatically chose by `Ray` (1 per CPU) npar_k : int additional parallelisation over k-points inside the FFT grid cluster : bool set to `True` to use a multi-node ray cluster ( see also `wannierberri.cluster <file:///home/stepan/github/wannier-berri-org/html/docs/parallel.html#multi-node-mode>`__ module) ray_init : dict parameters to be passed to `ray.init()`. Use only if you know wwhat you are doing. progress_step_percent : int or float progress (and estimated time to end) will be printed after each percent is completed """ def __init__( self, num_cpus=None, npar_k=0, ray_init=None, # add extra parameters for ray.init() cluster=False, # add parameters for ray.init() for the slurm cluster progress_step_percent=1, ): self.method = "ray" self.progress_step_percent = progress_step_percent if ray_init is None: ray_init = {} ray_init_loc = {} if cluster: # The follwoing is done for testing, when __init__ is called with `cluster = True`, # but no actual ray cluster was initialized (and hence the needed environmental variables are not set def set_opt(opt, def_val): if opt not in ray_init: ray_init_loc[opt] = def_val() else: warnings.warn(f"the ray cluster will use '{ray_init[opt]}' provided in ray_init") set_opt('address', lambda: 'auto') set_opt('_node_ip_address', lambda: os.environ["ip_head"].split(":")[0]) set_opt('_redis_password', lambda: os.environ["redis_password"]) ray_init_loc.update(ray_init) ray_init_loc['num_cpus'] = num_cpus import ray print("initializing ray with ", ray_init_loc) ray.init(**ray_init_loc) self.num_cpus = int(round(ray.available_resources()['CPU'])) self.ray = ray _, self.npar_k = pool(npar_k) self.npar_K = int(round(self.num_cpus / self.npar_k)) def progress_step(self, n_tasks, npar): return max(1, npar, int(round(n_tasks * self.progress_step_percent / 100))) def shutdown(self): self.ray.shutdown()
[docs] class Serial(Parallel): """ a class defining the serial execution (although `npar_k` is allowed) Parameters ----------- npar_k : int additional parallelisation ove k-points inside the FFT grid progress_step_percent : int or float progress (and estimated time to end) will be printed after each percent is completed """ def __init__(self, npar_k=None, progress_step_percent=1): self.progress_step_percent = progress_step_percent self.method = "serial" self.num_cpus = 1 if npar_k is None: _, self.npar_k = pool(0) else: self.npar_k = npar_k self.npar_K = 1 def shutdown(self): print("No need to shutdown Serial()")
def pool(npar): if npar > 1: try: from multiprocessing import Pool pool = Pool(npar).imap print(f'created a pool of {npar} workers') return pool, npar except Exception as err: print(f'failed to create a pool of {npar} workers : {err}\n doing in serial') return (lambda fun, lst: [fun(x) for x in lst]), 1