Files
pipeline/utils/runselector.py
T
2024-03-22 15:06:07 +01:00

529 lines
17 KiB
Python

# -*- mode: python-mode; python-indent-offset: 4 -*-
# coding: utf-8
import glob
import os
from functools import partial
from pymses.sources.ramses.info import read_ramses_info_file
import numpy as np
import logging
import sys
import f90nml
class NamelistRecursive:
def __init__(self, namelist):
self.data = namelist
def get_nml_value(self, nml_key):
res = self.data
for key in nml_key.split("/"):
if key in res:
res = res[key]
elif key == nml_key.split("/")[-1]:
res = None
else:
raise KeyError(key)
return res
def __getitem__(self, key):
return self.get_nml_value(key)
def __repr__(self):
return self.data.__repr__()
def __str__(self):
return self.data.__str__()
class RunSelector:
def __init__(
self,
path_in=".",
in_runs=None,
in_nums="all",
nml_filename="run.nml",
filter_name="*",
filter_nml={},
sort_run_by=None,
time_min=None,
time_max=None,
time=None,
unit_time=None,
allow_nodata=False,
):
"""
Select runs and outputs with several filter options.
By default, all runs and outputs within path_in are considered
Args:
1. Define the set of runs and outputs considered
path_in : str, path to the folder of the runs
2. Filter runs and outputs
in_runs : str or list of str. The name runs to consider. Default: all.
in_nums : int or list of int or str.
The output numbers to consider.
"last" select only the last output.
"all" preselect all outputs (default)
nml_filename : str name of the default namelist (otherwise look into the output files)
filter_name : str, filter runs by name. Default "*"
filter_nml : tuple or list of tupple.
Filter runs by namelist.
tuples are in the following form:
(nml_key, operator, nml_value)
with nml_key a key from the namelist (eg. "cloud_params/dens0")
operator within ("=", "!=", "<", ">", "in")
and nml_value a string, float or int
time_min : float, select output where time >= time_min (in code units)
time_max : float, select output where time <= time_min (in code units)
time : float or list of float. For each value, select the output closer to it.
unit_time : astrophysix.Unit, unit for the time above. None is code unit.
allow_nodata : allow runs whith only postprocessed datas
3. Sort the runs
sort_run_by : str, a key from the namelist used to sort the runs (by ascending order)
"""
# Initialize logger
self.logger = logging.getLogger("run_self")
self.logger.propagate = False
logging_format = "%(levelname)s | %(asctime)s | %(name)s.%(funcName)s:%(lineno)d | %(message)s"
formatter = logging.Formatter(logging_format, datefmt="%H:%M:%S")
if not self.logger.hasHandlers():
stream = logging.StreamHandler(sys.stdout)
stream.setFormatter(formatter)
self.logger.addHandler(stream)
self.path_in = path_in
self.nml_filename = nml_filename
self.allow_nodata = allow_nodata
self.namelist = {}
do_tests = not self.allow_nodata
self.runs = self.get_runs(
in_runs, filter_name, filter_nml, sort_run_by, do_tests=do_tests
)
self.info = {}
for run in self.runs:
self.info[run] = {}
self.nums = {}
if not type(in_nums) == dict:
nums_temp = in_nums
in_nums = {}
for run in self.runs:
in_nums[run] = nums_temp
for i, run in enumerate(self.runs):
self.nums[run] = self.get_nums(
run,
in_nums[run],
time_min,
time_max,
time,
unit_time,
)
i = 0
for run in self.runs.copy():
if len(self.nums[run]) == 0:
self.logger.warning(f"No snapshot found for run {run}")
del self.runs[i]
del self.nums[run]
else:
i += 1
if len(self.runs) == 0:
raise ValueError("No runs found")
def select(
self,
runs=None,
nums="all",
filter_nml={},
filter_name="*",
sort_run_by=None,
time_min=None,
time_max=None,
time=None,
unit_time=None,
):
"""
Sub-select runs and outputs from already selected runs and outputs
Args:
runs : str or list of str. The name runs to consider. Default: all.
nums : int or list of int or str.
The output numbers to consider.
"last" select only the last output.
"all" preselect all outputs (default)
filter_name : str.
glob pattern used to filter run names.
default is "*" (all runs)
filter_nml : tuple or list of tupple.
Filter runs by namelist.
tuples are in the following form:
(nml_key, operator, nml_value)
with nml_key a key from the namelist (eg. "cloud_params/dens0")
operator within ("=", "!=", "<", ">", "in")
and nml_value a string, float or int
time_min : float, select output where time >= time_min (in code units)
time_max : float, select output where time <= time_min (in code units)
time : float or list of float. For each value, select the output closer to it.
unit_time : astrophysix.Unit, unit for the time above. None is code unit.
sort_run_by : str, a key from the namelist used to sort the runs (by ascending order)
Returns:
(selected_runs, selected_nums)
"""
if runs is None:
runs = self.runs
selected_runs = self.get_runs(
runs, filter_name, filter_nml, sort_run_by, do_tests=False
)
if len(selected_runs) == 0:
raise ValueError("No runs found")
if not type(nums) == dict:
nums_temp = nums
nums = {}
for run in selected_runs:
nums[run] = nums_temp
selected_nums = {}
for i, run in enumerate(selected_runs):
selected_nums[run] = self.get_nums(
run, nums[run], time_min, time_max, time, unit_time, do_tests=False
)
return selected_runs, selected_nums
def load_namelist(self, run, path=None):
if path is None:
names = glob.glob(
self.path_in + "/" + run + "/output_[0-9][0-9][0-9][0-9][0-9]"
)
i = 0
path = self.path_in + "/" + run + "/" + self.nml_filename
while not os.path.exists(path) and i < len(names):
path = f"{names[i]}/namelist.txt"
i += 1
return NamelistRecursive(f90nml.read(path))
def get_nml_value(self, nml_key, run):
return self.namelist[run][nml_key]
def nml_select(self, runs, filter_nml):
if type(filter_nml) == tuple:
filter_nml = [filter_nml]
for (nml_key, operator, operand) in filter_nml:
value = {}
for run in runs:
value[run] = self.get_nml_value(nml_key, run)
if operator == "=":
runs = list(filter(lambda r: value[r] == operand, runs))
if operator == "!=":
runs = list(filter(lambda r: not value[r] == operand, runs))
elif operator == ">":
runs = list(filter(lambda r: value[r] > operand, runs))
elif operator == "<":
runs = list(filter(lambda r: value[r] < operand, runs))
elif operator == "in":
runs = list(filter(lambda r: value[r] in operand, runs))
return runs
def get_runs(
self,
in_runs=None,
filter_name="*",
filter_nml={},
sort_run_by=None,
do_tests=True,
):
def try_load_nml(run):
try:
self.namelist[run] = self.load_namelist(run)
success = True
except IOError:
success = False
return success
runs = list(
map(
os.path.basename,
list(
filter(os.path.isdir, glob.glob(self.path_in + "/" + filter_name))
),
)
)
if in_runs is not None:
if isinstance(in_runs, str):
in_runs = [in_runs]
runs = list(filter(lambda n: n in runs, in_runs))
if do_tests:
runs = list(filter(try_load_nml, runs))
# Select runs that match namelist conditions
runs = self.nml_select(runs, filter_nml)
# Sort by the value in the namelist of sort_run_by
if sort_run_by is not None:
if type(sort_run_by) == str:
sort_run_by = [sort_run_by]
for nml_key in reversed(sort_run_by):
if nml_key == "name":
runs.sort()
else:
runs.sort(key=partial(self.get_nml_value, nml_key))
return runs
def load_info(self, run, num):
info_filename_output = f"{self.path_in}/{run}/output_{num:05}/info_{num:05}.txt"
# Path of the filename if ratarmount was used
info_filename_tarmount_output = (
f"{self.path_in}/{run}/output_{num:05}/output_{num:05}/info_{num:05}.txt"
)
info_filename_folder = f"{self.path_in}/{run}/info/info_{num:05}.txt"
if os.path.exists(info_filename_output):
info = read_ramses_info_file(info_filename_output)
elif os.path.exists(info_filename_tarmount_output):
info = read_ramses_info_file(info_filename_tarmount_output)
elif self.allow_nodata:
info = read_ramses_info_file(info_filename_folder)
else:
raise IOError
return info
def get_nums(
self,
run,
in_nums=None,
time_min=None,
time_max=None,
time=None,
unit_time=None,
do_tests=True,
):
"""
Select snapshots from the disk
Args:
in_nums : int or list of int or str.
The output numbers to consider.
"last" select only the last output.
"all" preselect all outputs (default)
time_min : float, select output where time >= time_min (in code units)
time_max : float, select output where time <= time_min (in code units)
time : float or list of float. For each value, select the output closer to it.
unit_time : astrophysix.Unit, unit for the time above. None is code unit.
do_tests : test if the snapshots are actually on disk. Not needed when subselecting snapshots.
"""
# -- Initialize info loader --
if do_tests:
def try_load_info(num):
try:
if num not in self.info[run]:
self.info[run][num] = self.load_info(run, num)
success = True
except (IOError, AttributeError):
success = False
return success
else:
def try_load_info(num):
return True
# -- A function to search a given time using dichotomy
def search(nums, time, position="closest"):
while len(nums) > 0 and not try_load_info(nums[0]):
del nums[0]
while len(nums) > 0 and not try_load_info(nums[-1]):
del nums[-1]
if len(nums) == 0:
return None
ileft, iright = 0, len(nums) - 1
if get_time(nums[ileft]) >= time:
if position in ["closest", "right"]:
return ileft
else:
return None
if get_time(nums[iright]) < time:
if position in ["closest", "left"]:
return iright
else:
return None
while iright - ileft > 1:
imid = (ileft + iright) // 2
while not try_load_info(nums[imid]):
del nums[imid]
iright -= 1
imid = (ileft + iright) // 2
if get_time(nums[imid]) < time:
ileft = imid
else:
iright = imid
if position == "left":
return ileft
elif position == "right":
return iright
else:
dleft = np.abs(get_time(nums[ileft]) - time)
dright = np.abs(get_time(nums[iright]) - time)
if dleft <= dright:
return ileft
else:
return iright
# -- Get the list of seemingly available snapshots on the disk or already selected --
if do_tests:
names = glob.glob(
self.path_in + "/" + run + "/output_[0-9][0-9][0-9][0-9][0-9]"
)
nums = list(map(lambda n: int(n.split("/")[-1].split("_")[1]), names))
else:
nums = self.nums[run]
# -- Filter with the provided in_nums array
if isinstance(in_nums, int):
in_nums = [in_nums]
if isinstance(in_nums, list):
nums = list(filter(lambda n: n in nums, in_nums))
nums.sort()
if len(nums) == 0:
return []
# -- Select either the first or last output from the list, or all the valid ones --
if in_nums == "first":
i = 0
while i < len(nums) and not try_load_info(nums[i]):
i = i + 1
if i < len(nums):
nums = [nums[i]]
else:
return []
elif in_nums == "last":
i = len(nums) - 1
while i >= 0 and not try_load_info(nums[i]):
i = i - 1
if i >= 0:
nums = [nums[i]]
else:
return []
# -- Time getter according to unit_time
if unit_time is None:
def get_time(num):
return self.info[run][num]["time"]
elif isinstance(unit_time, str):
factor = self.get_nml_value(unit_time, run)
def get_time(num):
time_code = self.info[run][num]["time"]
return time_code / factor
else:
def get_time(num):
time_code = self.info[run][num]["time"]
return time_code * self.info[run][num]["unit_time"].express(unit_time)
# -- Select according to time --
if time_min is not None and len(nums) > 0:
imin = search(nums, time_min, "right")
if imin is not None:
nums = nums[imin:]
else:
return []
if time_max is not None and len(nums) > 0:
imax = search(nums, time_max, "left")
if imax is not None:
nums = nums[: imax + 1]
else:
return []
if time is not None and len(nums) > 0:
filtered_nums = []
if not isinstance(time, list):
time = [time]
# For all times provided by the user, select the output closer to it
for t in time:
iclose = search(nums, t)
if iclose is not None:
num = nums[iclose]
# Only add each selected output once
if num not in filtered_nums:
filtered_nums.append(num)
else:
break
nums = filtered_nums
else:
nums = list(filter(try_load_info, nums))
return nums
def write_paths(self, prefix=None, filename="~/list_file"):
"""
Write the paths of the selected runs on a file
Args:
prefix (str, optional): Prefix for the pathscd si. Defaults to path_in.
filename (str, optional): F. Defaults to "~/list_file".
"""
if prefix is None:
prefix = self.path_in
paths = []
for run in self.nums:
for num in self.nums[run]:
if os.path.exists("{prefix}/{run}/output_{num:05}/output_{num:05}\n"):
paths.append(f"{prefix}/{run}/output_{num:05}/output_{num:05}\n")
else:
paths.append(f"{prefix}/{run}/output_{num:05}\n")
f = open(os.path.expanduser(filename), "w")
f.writelines(paths)
f.close()