# -*- mode: python-mode; python-indent-offset: 4 -*- # coding: utf-8 import glob import os from functools import partial from pymses.sources.ramses.info import read_ramses_info_file import numpy as np import f90nml class NamelistRecursive: def __init__(self, namelist): self.data = namelist def get_nml_value(self, nml_key): res = self.data for key in nml_key.split("/"): if key in res: res = res[key] elif key == nml_key.split("/")[-1]: res = None else: raise KeyError(key) return res def __getitem__(self, key): return self.get_nml_value(key) def __repr__(self): return self.data.__repr__() def __str__(self): return self.data.__str__() class RunSelector: def __init__( self, path_in, in_runs=None, in_nums="all", nml_filename="run.nml", filter_name="*", filter_nml={}, sort_run_by=None, time_min=None, time_max=None, time=None, unit_time=None, allow_nodata=False, ): """ Select runs and outputs with several filter options. By default, all runs and outputs within path_in are considered Args: 1. Define the set of runs and outputs considered path_in : str, path to the folder of the runs 2. Filter runs and outputs in_runs : str or list of str. The name runs to consider. Default: all. in_nums : int or list of int or str. The output numbers to consider. "last" select only the last output. "all" preselect all outputs (default) nml_filename : str name of the namelist (should be the same for all outputs) filter_name : str, filter runs by name. Default "*" filter_nml : tuple or list of tupple. Filter runs by namelist. tuples are in the following form: (nml_key, operator, nml_value) with nml_key a key from the namelist (eg. "cloud_params/dens0") operator within ("=", "!=", "<", ">", "in") and nml_value a string, float or int time_min : float, select output where time >= time_min (in code units) time_max : float, select output where time <= time_min (in code units) time : float or list of float. For each value, select the output closer to it. unit_time : astrophysix.Unit, unit for the time above. None is code unit. allow_nodata : allow runs whith only postprocessed datas 3. Sort the runs sort_run_by : str, a key from the namelist used to sort the runs (by ascending order) """ self.path_in = path_in self.nml_filename = nml_filename self.namelist = {} self.runs = self.get_runs(in_runs, filter_name, filter_nml, sort_run_by) self.allow_nodata = allow_nodata self.info = {} for run in self.runs: self.info[run] = {} self.nums = {} if not type(in_nums) == dict: nums_temp = in_nums in_nums = {} for run in self.runs: in_nums[run] = nums_temp for i, run in enumerate(self.runs): self.nums[run] = self.get_nums( run, in_nums[run], time_min, time_max, time, unit_time, ) i = 0 for run in self.runs.copy(): if len(self.nums[run]) == 0: print(f"[WARNING] No snapshot found for run {run}") del self.runs[i] del self.nums[run] else: i += 1 if len(self.runs) == 0: raise ValueError("No runs found") def select( self, runs=None, nums="all", filter_nml={}, filter_name="*", sort_run_by=None, time_min=None, time_max=None, time=None, unit_time=None, ): """ Sub-select runs and outputs from already selected runs and outputs Args: runs : str or list of str. The name runs to consider. Default: all. nums : int or list of int or str. The output numbers to consider. "last" select only the last output. "all" preselect all outputs (default) filter_name : str. glob pattern used to filter run names. default is "*" (all runs) filter_nml : tuple or list of tupple. Filter runs by namelist. tuples are in the following form: (nml_key, operator, nml_value) with nml_key a key from the namelist (eg. "cloud_params/dens0") operator within ("=", "!=", "<", ">", "in") and nml_value a string, float or int time_min : float, select output where time >= time_min (in code units) time_max : float, select output where time <= time_min (in code units) time : float or list of float. For each value, select the output closer to it. unit_time : astrophysix.Unit, unit for the time above. None is code unit. sort_run_by : str, a key from the namelist used to sort the runs (by ascending order) Returns: (selected_runs, selected_nums) """ if runs is None: runs = self.runs selected_runs = self.get_runs( runs, filter_name, filter_nml, sort_run_by, do_tests=False ) if len(selected_runs) == 0: raise ValueError("No runs found") if not type(nums) == dict: nums_temp = nums nums = {} for run in selected_runs: nums[run] = nums_temp selected_nums = {} for i, run in enumerate(selected_runs): selected_nums[run] = self.get_nums( run, nums[run], time_min, time_max, time, unit_time, do_tests=False ) return selected_runs, selected_nums def load_namelist(self, run): path_nml = f"{self.path_in}/{run}/{self.nml_filename}" return NamelistRecursive(f90nml.read(path_nml)) def get_nml_value(self, nml_key, run): return self.namelist[run][nml_key] def nml_select(self, runs, filter_nml): if type(filter_nml) == tuple: filter_nml = [filter_nml] for (nml_key, operator, operand) in filter_nml: value = {} for run in runs: value[run] = self.get_nml_value(nml_key, run) if operator == "=": runs = list(filter(lambda r: value[r] == operand, runs)) if operator == "!=": runs = list(filter(lambda r: not value[r] == operand, runs)) elif operator == ">": runs = list(filter(lambda r: value[r] > operand, runs)) elif operator == "<": runs = list(filter(lambda r: value[r] < operand, runs)) elif operator == "in": runs = list(filter(lambda r: value[r] in operand, runs)) return runs def get_runs( self, in_runs=None, filter_name="*", filter_nml={}, sort_run_by=None, do_tests=True, ): def try_load_nml(run): try: self.namelist[run] = self.load_namelist(run) success = True except IOError: success = False return success runs = list( map( os.path.basename, list( filter(os.path.isdir, glob.glob(self.path_in + "/" + filter_name)) ), ) ) if in_runs is not None: if isinstance(in_runs, str): in_runs = [in_runs] runs = list(filter(lambda n: n in runs, in_runs)) if do_tests: runs = list(filter(try_load_nml, runs)) # Select runs that match namelist conditions runs = self.nml_select(runs, filter_nml) # Sort by the value in the namelist of sort_run_by if sort_run_by is not None: if type(sort_run_by) == str: sort_run_by = [sort_run_by] for nml_key in reversed(sort_run_by): if nml_key == "name": runs.sort() else: runs.sort(key=partial(self.get_nml_value, nml_key)) return runs def load_info(self, run, num): info_filename_output = f"{self.path_in}/{run}/output_{num:05}/info_{num:05}.txt" # Path of the filename if ratarmount was used info_filename_tarmount_output = f"{self.path_in}/{run}/output_{num:05}/output_{num:05}/info_{num:05}.txt" info_filename_folder = f"{self.path_in}/{run}/info/info_{num:05}.txt" if os.path.exists(info_filename_output): info = read_ramses_info_file(info_filename_output) elif os.path.exists(info_filename_tarmount_output): info = read_ramses_info_file(info_filename_tarmount_output) elif self.allow_nodata: info = read_ramses_info_file(info_filename_folder) else: raise IOError return info def get_nums( self, run, in_nums=None, time_min=None, time_max=None, time=None, unit_time=None, do_tests=True, ): """ Select snapshots from the disk Args: in_nums : int or list of int or str. The output numbers to consider. "last" select only the last output. "all" preselect all outputs (default) time_min : float, select output where time >= time_min (in code units) time_max : float, select output where time <= time_min (in code units) time : float or list of float. For each value, select the output closer to it. unit_time : astrophysix.Unit, unit for the time above. None is code unit. do_tests : test if the snapshots are actually on disk. Not needed when subselecting snapshots. """ # -- Initialize info loader -- if do_tests: def try_load_info(num): try: if num not in self.info[run]: self.info[run][num] = self.load_info(run, num) success = True except (IOError, AttributeError): success = False return success else: def try_load_info(num): return True # -- Time getter according to unit_time if unit_time is None: def get_time(num): return self.info[run][num]["time"] elif isinstance(unit_time, str): factor = self.get_nml_value(unit_time, run) def get_time(num): time_code = self.info[run][num]["time"] return time_code / factor else: def get_time(num): time_code = self.info[run][num]["time"] return time_code * self.info[run][num]["unit_time"].express(unit_time) # -- A function to search a given time using dichotomy def search(nums, time, position="closest"): while len(nums) > 0 and not try_load_info(nums[0]): del nums[0] while len(nums) > 0 and not try_load_info(nums[-1]): del nums[-1] if len(nums) == 0: return None ileft, iright = 0, len(nums) - 1 if get_time(nums[ileft]) >= time: if position in ["closest", "right"]: return ileft else: return None if get_time(nums[iright]) < time: if position in ["closest", "left"]: return iright else: return None while iright - ileft > 1: imid = (ileft + iright) // 2 while not try_load_info(nums[imid]): del nums[imid] iright -= 1 imid = (ileft + iright) // 2 if get_time(nums[imid]) < time: ileft = imid else: iright = imid if position == "left": return ileft elif position == "right": return iright else: dleft = np.abs(get_time(nums[ileft]) - time) dright = np.abs(get_time(nums[iright]) - time) if dleft <= dright: return ileft else: return iright # -- Get the list of seemingly available snapshots on the disk or already selected -- if do_tests: names = glob.glob( self.path_in + "/" + run + "/output_[0-9][0-9][0-9][0-9][0-9]" ) nums = list(map(lambda n: int(n.split("/")[-1].split("_")[1]), names)) else: nums = self.nums[run] # -- Filter with the provided in_nums array if isinstance(in_nums, int): in_nums = [in_nums] if isinstance(in_nums, list): nums = list(filter(lambda n: n in nums, in_nums)) nums.sort() # -- Select either the first or last output from the list, or all the valid ones -- if in_nums == "first": i = 0 while i < len(nums) and not try_load_info(nums[i]): i = i + 1 if i < len(nums): nums = [nums[i]] else: nums = [] elif in_nums == "last": i = len(nums) - 1 while i >= 0 and not try_load_info(nums[i]): i = i - 1 if i >= 0: nums = [nums[i]] else: nums = [] # -- Select according to time -- if time_min is not None and len(nums) > 0: imin = search(nums, time_min, "right") if imin is not None: nums = nums[imin:] else: nums = [] if time_max is not None and len(nums) > 0: imax = search(nums, time_max, "left") if imax is not None: nums = nums[: imax + 1] if time is not None and len(nums) > 0: filtered_nums = [] if not isinstance(time, list): time = [time] # For all times provided by the user, select the output closer to it for t in time: iclose = search(nums, t) num = nums[iclose] # Only add each selected output once if num not in filtered_nums: filtered_nums.append(num) nums = filtered_nums else: nums = list(filter(try_load_info, nums)) return nums def write_paths(self, prefix=None, filename="~/list_file"): """ Write the paths of the selected runs on a file Args: prefix (str, optional): Prefix for the pathscd si. Defaults to path_in. filename (str, optional): F. Defaults to "~/list_file". """ if prefix is None: prefix = self.path_in paths = [] for run in self.nums: for num in self.nums[run]: if os.path.exists("{prefix}/{run}/output_{num:05}/output_{num:05}\n"): paths.append(f"{prefix}/{run}/output_{num:05}/output_{num:05}\n") else: paths.append(f"{prefix}/{run}/output_{num:05}\n") f = open(os.path.expanduser(filename), "w") f.writelines(paths) f.close()