# -*- mode: python-mode; python-indent-offset: 4 -*- # coding: utf-8 import glob import os from functools import partial import numpy as np import yaml import f90nml from pp_params import default_params class NamelistRecursive: def __init__(self, namelist): self.data = namelist def get_nml_value(self, nml_key): res = self.data for key in nml_key.split("/"): if key in res: res = res[key] elif key == nml_key.split("/")[-1]: res = None else: raise KeyError(key) return res def __getitem__(self, key): return self.get_nml_value(key) def __repr__(self): return self.data.__repr__() def __str__(self): return self.data.__str__() class RunSelector: def __init__( self, path_in, in_runs=None, in_nums="all", pp_params=default_params(), filter_name="*", filter_nml={}, sort_run_by=None, time_min=None, time_max=None, time=None, ): """ Select runs and outputs with several filter options. By default, all runs and outputs within path_in are considered Parameters --------- 1. Define the set of runs and outputs considered path_in : str, path to the folder of the runs 2. Filter runs and outputs in_runs : str or list of str. The name runs to consider. Default: all. in_nums : int or list of int or str. The output numbers to consider. "last" select only the last output. "all" preselect all outputs (default) filter_name : str, filter runs by name. Default "*" filter_nml : tuple or list of tupple. Filter runs by namelist. tuples are in the following form: (nml_key, operator, nml_value) with nml_key a key from the namelist (eg. "cloud_params/dens0") operator within ("=", "!=", "<", ">", "in") and nml_value a string, float or int time_min : float, select output where time >= time_min (in code units) time_max : float, select output where time <= time_min (in code units) time : float or list of float. For each value, select the output closer to it. 3. Sort the runs sort_run_by : str, a key from the namelist used to sort the runs (by ascending order) """ self.path_in = path_in self.pp_params = pp_params self.namelist = {} self.runs = self.get_runs(in_runs, filter_name, filter_nml, sort_run_by) if len(self.runs) == 0: raise ValueError("No runs found") self.info = {} for run in self.runs: self.info[run] = {} self.nums = {} if not type(in_nums) == dict: nums_temp = in_nums in_nums = {} for run in self.runs: in_nums[run] = nums_temp for i, run in enumerate(self.runs): self.nums[run] = self.get_nums( run, in_nums[run], time_min, time_max, time, ) def select( self, runs=None, nums="all", filter_nml={}, sort_run_by=None, time_min=None, time_max=None, time=None, ): """ Sub-select runs and outputs from already selected runs and outputs Parameters --------- runs : str or list of str. The name runs to consider. Default: all. nums : int or list of int or str. The output numbers to consider. "last" select only the last output. "all" preselect all outputs (default) filter_nml : tuple or list of tupple. Filter runs by namelist. tuples are in the following form: (nml_key, operator, nml_value) with nml_key a key from the namelist (eg. "cloud_params/dens0") operator within ("=", "!=", "<", ">", "in") and nml_value a string, float or int time_min : float, select output where time >= time_min (in code units) time_max : float, select output where time <= time_min (in code units) time : float or list of float. For each value, select the output closer to it. sort_run_by : str, a key from the namelist used to sort the runs (by ascending order) Returns ------- (selected_runs, selected_nums) """ selected_runs = self.get_runs( runs, "*", filter_nml, sort_run_by, do_tests=False ) if len(selected_runs) == 0: raise ValueError("No runs found") if not type(nums) == dict: nums_temp = nums nums = {} for run in selected_runs: nums[run] = nums_temp selected_nums = {} for i, run in enumerate(selected_runs): selected_nums[run] = self.get_nums( run, nums[run], time_min, time_max, time, do_tests=False ) return selected_runs, selected_nums def load_namelist(self, run): path_run = self.path_in + "/" + run path_nml = path_run + "/" + self.pp_params.input.nml_filename return NamelistRecursive(f90nml.read(path_nml)) def get_nml_value(self, nml_key, run): return self.namelist[run][nml_key] def nml_select(self, runs, filter_nml): if type(filter_nml) == tuple: filter_nml = [filter_nml] for (nml_key, operator, operand) in filter_nml: value = {} for run in runs: value[run] = self.get_nml_value(nml_key, run) if operator == "=": runs = list(filter(lambda r: value[r] == operand, runs)) if operator == "!=": runs = list(filter(lambda r: not value[r] == operand, runs)) elif operator == ">": runs = list(filter(lambda r: value[r] > operand, runs)) elif operator == "<": runs = list(filter(lambda r: value[r] < operand, runs)) elif operator == "in": runs = list(filter(lambda r: value[r] in operand, runs)) return runs def get_runs( self, in_runs=None, filter_name="*", filter_nml={}, sort_run_by=None, do_tests=True, ): def try_load_nml(run): try: self.namelist[run] = self.load_namelist(run) success = True except IOError: success = False return success if do_tests: runs = list( map( os.path.basename, list( filter( os.path.isdir, glob.glob(self.path_in + "/" + filter_name) ) ), ) ) else: runs = self.runs if in_runs is not None: runs = list(filter(lambda n: n in runs, in_runs)) if do_tests: runs = list(filter(try_load_nml, runs)) # Select runs that match namelist conditions runs = self.nml_select(runs, filter_nml) # Sort by the value in the namelist of sort_run_by if sort_run_by is not None: if type(sort_run_by) == str: sort_run_by = [sort_run_by] for nml_key in reversed(sort_run_by): runs.sort(key=partial(self.get_nml_value, nml_key)) return runs def load_info(self, run, num): info_file = open( self.path_in + "/" + run + "/" + "output_" + str(num).zfill(5) + "/" + "info_" + str(num).zfill(5) + ".txt", "r", ) info = {} for line in info_file.readlines(): parsed = yaml.safe_load(line.replace("=", ":")) if type(parsed) == dict: info.update(parsed) info_file.close() return info def get_nums( self, run, in_nums=None, time_min=None, time_max=None, time=None, do_tests=True ): def try_load_info(num): if do_tests: try: self.info[run][num] = self.load_info(run, num) success = True except IOError: success = False else: success = True return success if isinstance(in_nums, int): in_nums = [in_nums] if do_tests: names = glob.glob( self.path_in + "/" + run + "/output_[0-9][0-9][0-9][0-9][0-9]" ) nums = list(map(lambda n: int(n.split("/")[-1].split("_")[1]), names)) else: nums = self.nums[run] if isinstance(in_nums, list): nums = list(filter(lambda n: n in nums, in_nums)) nums = np.sort(nums) if in_nums == "first": i = 0 while i < len(nums) and not try_load_info(nums[i]): i = i + 1 if i < len(nums): nums = [nums[i]] else: nums = [] elif in_nums == "last": i = len(nums) - 1 while i >= 0 and not try_load_info(nums[i]): i = i - 1 if i >= 0: nums = [nums[i]] else: nums = [] else: nums = list(filter(try_load_info, nums)) if time_min is not None: nums = list(filter(lambda n: self.info[run][n]["time"] >= time_min, nums)) if time_max is not None: nums = list(filter(lambda n: self.info[run][n]["time"] <= time_max, nums)) if time is not None: filtered_nums = [] if not isinstance(time, list): time = [time] # Get time for all already selected nums time_all = np.asarray([[self.info[run][n]["time"], n] for n in nums]) # For all times provided by the user, select the output closer to it for t in time: # Index of this output in the time_all array idx = (np.abs(time_all[:, 0] - t)).argmin() num = int(time_all[idx, 1]) # Only add each selected output once if num not in filtered_nums: filtered_nums.append(num) nums = filtered_nums return nums