Files
pipeline/run_selector.py
T
2021-06-24 10:47:53 +02:00

349 lines
11 KiB
Python

# -*- mode: python-mode; python-indent-offset: 4 -*-
# coding: utf-8
import glob
import os
from functools import partial
from pymses.sources.ramses.info import read_ramses_info_file
import numpy as np
import f90nml
class NamelistRecursive:
def __init__(self, namelist):
self.data = namelist
def get_nml_value(self, nml_key):
res = self.data
for key in nml_key.split("/"):
if key in res:
res = res[key]
elif key == nml_key.split("/")[-1]:
res = None
else:
raise KeyError(key)
return res
def __getitem__(self, key):
return self.get_nml_value(key)
def __repr__(self):
return self.data.__repr__()
def __str__(self):
return self.data.__str__()
class RunSelector:
def __init__(
self,
path_in,
in_runs=None,
in_nums="all",
nml_filename="run.nml",
filter_name="*",
filter_nml={},
sort_run_by=None,
time_min=None,
time_max=None,
time=None,
):
"""
Select runs and outputs with several filter options.
By default, all runs and outputs within path_in are considered
Parameters
---------
1. Define the set of runs and outputs considered
path_in : str, path to the folder of the runs
2. Filter runs and outputs
in_runs : str or list of str. The name runs to consider. Default: all.
in_nums : int or list of int or str.
The output numbers to consider.
"last" select only the last output.
"all" preselect all outputs (default)
nml_filename : str name of the namelist (should be the same for all outputs)
filter_name : str, filter runs by name. Default "*"
filter_nml : tuple or list of tupple.
Filter runs by namelist.
tuples are in the following form:
(nml_key, operator, nml_value)
with nml_key a key from the namelist (eg. "cloud_params/dens0")
operator within ("=", "!=", "<", ">", "in")
and nml_value a string, float or int
time_min : float, select output where time >= time_min (in code units)
time_max : float, select output where time <= time_min (in code units)
time : float or list of float. For each value, select the output closer to it.
3. Sort the runs
sort_run_by : str, a key from the namelist used to sort the runs (by ascending order)
"""
self.path_in = path_in
self.nml_filename = nml_filename
self.namelist = {}
self.runs = self.get_runs(in_runs, filter_name, filter_nml, sort_run_by)
if len(self.runs) == 0:
raise ValueError("No runs found")
self.info = {}
for run in self.runs:
self.info[run] = {}
self.nums = {}
if not type(in_nums) == dict:
nums_temp = in_nums
in_nums = {}
for run in self.runs:
in_nums[run] = nums_temp
for i, run in enumerate(self.runs):
self.nums[run] = self.get_nums(
run,
in_nums[run],
time_min,
time_max,
time,
)
for i, run in enumerate(self.runs):
if len(self.nums[run]) == 0:
print(f"[WARNING] No snapshot found for run {run}")
del self.runs[i]
del self.nums[run]
def select(
self,
runs=None,
nums="all",
filter_nml={},
filter_name="*",
sort_run_by=None,
time_min=None,
time_max=None,
time=None,
):
"""
Sub-select runs and outputs from already selected runs and outputs
Parameters
---------
runs : str or list of str. The name runs to consider. Default: all.
nums : int or list of int or str.
The output numbers to consider.
"last" select only the last output.
"all" preselect all outputs (default)
filter_name : str.
glob pattern used to filter run names.
default is "*" (all runs)
filter_nml : tuple or list of tupple.
Filter runs by namelist.
tuples are in the following form:
(nml_key, operator, nml_value)
with nml_key a key from the namelist (eg. "cloud_params/dens0")
operator within ("=", "!=", "<", ">", "in")
and nml_value a string, float or int
time_min : float, select output where time >= time_min (in code units)
time_max : float, select output where time <= time_min (in code units)
time : float or list of float. For each value, select the output closer to it.
sort_run_by : str, a key from the namelist used to sort the runs (by ascending order)
Returns
-------
(selected_runs, selected_nums)
"""
if runs is None:
runs = self.runs
selected_runs = self.get_runs(
runs, filter_name, filter_nml, sort_run_by, do_tests=False
)
if len(selected_runs) == 0:
raise ValueError("No runs found")
if not type(nums) == dict:
nums_temp = nums
nums = {}
for run in selected_runs:
nums[run] = nums_temp
selected_nums = {}
for i, run in enumerate(selected_runs):
selected_nums[run] = self.get_nums(
run, nums[run], time_min, time_max, time, do_tests=False
)
return selected_runs, selected_nums
def load_namelist(self, run):
path_run = self.path_in + "/" + run
path_nml = path_run + "/" + self.nml_filename
return NamelistRecursive(f90nml.read(path_nml))
def get_nml_value(self, nml_key, run):
return self.namelist[run][nml_key]
def nml_select(self, runs, filter_nml):
if type(filter_nml) == tuple:
filter_nml = [filter_nml]
for (nml_key, operator, operand) in filter_nml:
value = {}
for run in runs:
value[run] = self.get_nml_value(nml_key, run)
if operator == "=":
runs = list(filter(lambda r: value[r] == operand, runs))
if operator == "!=":
runs = list(filter(lambda r: not value[r] == operand, runs))
elif operator == ">":
runs = list(filter(lambda r: value[r] > operand, runs))
elif operator == "<":
runs = list(filter(lambda r: value[r] < operand, runs))
elif operator == "in":
runs = list(filter(lambda r: value[r] in operand, runs))
return runs
def get_runs(
self,
in_runs=None,
filter_name="*",
filter_nml={},
sort_run_by=None,
do_tests=True,
):
def try_load_nml(run):
try:
self.namelist[run] = self.load_namelist(run)
success = True
except IOError:
success = False
return success
runs = list(
map(
os.path.basename,
list(
filter(os.path.isdir, glob.glob(self.path_in + "/" + filter_name))
),
)
)
if in_runs is not None:
runs = list(filter(lambda n: n in runs, in_runs))
if do_tests:
runs = list(filter(try_load_nml, runs))
# Select runs that match namelist conditions
runs = self.nml_select(runs, filter_nml)
# Sort by the value in the namelist of sort_run_by
if sort_run_by is not None:
if type(sort_run_by) == str:
sort_run_by = [sort_run_by]
for nml_key in reversed(sort_run_by):
if nml_key == "name":
runs.sort()
else:
runs.sort(key=partial(self.get_nml_value, nml_key))
return runs
def load_info(self, run, num):
info_filename = f"{self.path_in}/{run}/output_{num:05}/info_{num:05}.txt"
info = read_ramses_info_file(info_filename)
return info
def get_nums(
self, run, in_nums=None, time_min=None, time_max=None, time=None, do_tests=True
):
def try_load_info(num):
if do_tests:
try:
self.info[run][num] = self.load_info(run, num)
success = True
except (IOError, AttributeError):
success = False
else:
success = True
return success
if isinstance(in_nums, int):
in_nums = [in_nums]
if do_tests:
names = glob.glob(
self.path_in + "/" + run + "/output_[0-9][0-9][0-9][0-9][0-9]"
)
nums = list(map(lambda n: int(n.split("/")[-1].split("_")[1]), names))
else:
nums = self.nums[run]
if isinstance(in_nums, list):
nums = list(filter(lambda n: n in nums, in_nums))
nums = np.sort(nums)
if in_nums == "first":
i = 0
while i < len(nums) and not try_load_info(nums[i]):
i = i + 1
if i < len(nums):
nums = [nums[i]]
else:
nums = []
elif in_nums == "last":
i = len(nums) - 1
while i >= 0 and not try_load_info(nums[i]):
i = i - 1
if i >= 0:
nums = [nums[i]]
else:
nums = []
else:
nums = list(filter(try_load_info, nums))
if time_min is not None:
nums = list(filter(lambda n: self.info[run][n]["time"] >= time_min, nums))
if time_max is not None:
nums = list(filter(lambda n: self.info[run][n]["time"] <= time_max, nums))
if time is not None:
filtered_nums = []
if not isinstance(time, list):
time = [time]
# Get time for all already selected nums
time_all = np.asarray([[self.info[run][n]["time"], n] for n in nums])
# For all times provided by the user, select the output closer to it
for t in time:
# Index of this output in the time_all array
idx = (np.abs(time_all[:, 0] - t)).argmin()
num = int(time_all[idx, 1])
# Only add each selected output once
if num not in filtered_nums:
filtered_nums.append(num)
nums = filtered_nums
return nums