From b76d4fc68834debba0ecbf6c2775eecb9ed8c127 Mon Sep 17 00:00:00 2001 From: Noe Brucy Date: Thu, 28 Nov 2024 11:50:40 +0100 Subject: [PATCH] Update galsec: refector, timeseries, etc. --- galsec.py | 384 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 284 insertions(+), 100 deletions(-) diff --git a/galsec.py b/galsec.py index 66d537c..49acc71 100644 --- a/galsec.py +++ b/galsec.py @@ -4,12 +4,13 @@ NoƩ Brucy 2023 """ import numpy as np -from astropy.table import QTable, hstack +from astropy.table import QTable, hstack, vstack from astropy import units as u from astropy.units.quantity import Quantity from collections import defaultdict from numba import jit, prange +from abc import ABC _atomic_mass = { "H+": 1, @@ -296,15 +297,153 @@ def regroup( return result -class Galsec: +class GalsecAnalysis(ABC): + + def __init__(self): + raise NotImplementedError("This method should be implemented in a subclass") + + def analysis(self, bin_deltas: dict, filter_bounds: dict): + raise NotImplementedError("This method should be implemented in a subclass") + + def sector_analysis( + self, + delta_r: Quantity[u.kpc] = u.kpc, + delta_l: Quantity[u.kpc] = u.kpc, + rmin: Quantity[u.kpc] = 1 * u.kpc, + rmax: Quantity[u.kpc] = 12 * u.kpc, + zmin: Quantity[u.kpc] = -0.5 * u.kpc, + zmax: Quantity[u.kpc] = 0.5 * u.kpc, + **kwargs, + ): + """Compute the aggregation of quantities in sectors bins + + Parameters + ---------- + delta_r : Quantity[u.kpc], optional + spacing between two radial bins, by default u.kpc + delta_l : Quantity[u.kpc], optional + spacing (in spatial units) between two azimuthal bins, by default u.kpc + rmin : Quantity[u.kpc], optional + filter out bin below that radius, by default 1*u.kpc + rmax : Quantity[u.kpc], optional + filter out bin beyond that radius, by default 12*u.kpc + """ + + bin_deltas = {"r": delta_r, "phi": delta_l} + filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} + self.sectors = self.analysis(bin_deltas, filter_bounds, **kwargs) + return self.sectors + + def cartesian_analysis( + self, + delta_x: Quantity[u.kpc] = u.kpc, + delta_y: Quantity[u.kpc] = u.kpc, + delta_z: Quantity[u.kpc] = u.kpc, + xmin: Quantity[u.kpc] = -30 * u.kpc, + xmax: Quantity[u.kpc] = 30 * u.kpc, + ymin: Quantity[u.kpc] = -30 * u.kpc, + ymax: Quantity[u.kpc] = 30 * u.kpc, + zmin: Quantity[u.kpc] = -0.5 * u.kpc, + zmax: Quantity[u.kpc] = 0.5 * u.kpc, + **kwargs, + ): + """Compute the aggregation of quantities in cartesian bins + + Parameters + ---------- + delta_x : Quantity[u.kpc] + spacing between two x bins + delta_y : Quantity[u.kpc] + spacing between two y bins + """ + + bin_deltas = {"x": delta_x, "y": delta_y, "z": delta_z} + filter_bounds = {"x": [xmin, xmax], "y": [ymin, ymax], "z": [zmin, zmax]} + self.grid = self.analysis(bin_deltas, filter_bounds) + return self.grid + + def ring_analysis( + self, + delta_r: Quantity[u.kpc] = u.kpc, + rmin: Quantity[u.kpc] = 1 * u.kpc, + rmax: Quantity[u.kpc] = 30 * u.kpc, + zmin: Quantity[u.kpc] = -0.5 * u.kpc, + zmax: Quantity[u.kpc] = 0.5 * u.kpc, + **kwargs, + ): + """Compute the aggration of quantities in radial bins + + Parameters + ---------- + delta_r : Quantity[u.kpc], optional + spacing between two radial bins, by default u.kpc + rmin : Quantity[u.kpc], optional + filter out bin below that radius, by default 1*u.kpc + rmax : Quantity[u.kpc], optional + filter out bin beyond that radius, by default 30*u.kpc + """ + + bin_deltas = {"r": delta_r} + filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} + self.rings = self.analysis(bin_deltas, filter_bounds, **kwargs) + return self.rings + + def vertical_ring_analysis( + self, + delta_r: Quantity[u.kpc] = u.kpc, + delta_z: Quantity[u.kpc] = u.kpc, + rmin: Quantity[u.kpc] = 1 * u.kpc, + rmax: Quantity[u.kpc] = 30 * u.kpc, + zmin: Quantity[u.kpc] = -0.5 * u.kpc, + zmax: Quantity[u.kpc] = 0.5 * u.kpc, + **kwargs, + ): + + bin_deltas = {"r": delta_r, "z": delta_z} + filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} + return self.analysis(bin_deltas, filter_bounds, **kwargs) + + def scale_analysis( + self, + sub_analysis_method: str = "cartesian_analysis", + scales: dict = { + "delta_x": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc, + "delta_y": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc, + "delta_z": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc, + }, + **kwargs): + + nb_scales = len(list(scales.values())[0]) + + methods = { + "cartesian_analysis" : self.cartesian_analysis, + "sector_analysis" : self.sector_analysis, + "ring_analysis" : self.ring_analysis, + "vertical_ring_analysis" : self.vertical_ring_analysis, + } + + for i in range(nb_scales): + + scale_i = {key: value[i] for key, value in scales.items()} + + print(f"Doing {sub_analysis_method} for scale {i}/{nb_scales}: {scale_i}") + grid = methods[sub_analysis_method](**scale_i, **kwargs) + + if i==0: + all_grids = grid + else: + all_grids = {fluid: vstack([all_grids[fluid], grid[fluid]]) for fluid in self.fluids} + return all_grids + +class Galsec(GalsecAnalysis): """ Galactic sector extractor """ fluids = ["gas", "stars", "dm"] extensive_fields = defaultdict( - lambda: ["mass"], - gas=["mass", "volume"], + lambda: ["mass", "number"], + gas=["mass", "volume", "number"], ) weighted_fields = defaultdict(lambda: ["velr", "velphi", "velx", "vely", "velz"]) @@ -322,8 +461,8 @@ class Galsec: species = {} abundance_He = 0.1 - def __init__(self, data: dict, copy=True) -> None: - """Initiazise a Galaxasec object + def __init__(self, data: dict, copy=True, dense_output=True) -> None: + """Initiazise a Galsec object Parameters ---------- @@ -348,7 +487,7 @@ class Galsec: velocity float array (Ngas, 3) [km/s] mass float array (Ngas) [Msun] copy : bool, default=True - Wheter the data should be copied. + Whether the data should be copied. """ self.data = {} @@ -372,6 +511,8 @@ class Galsec: self.abundance_He = data["header"]["ABHE"] self.compute_derived_values() + + self.dense_output = dense_output def compute_derived_values(self) -> None: """ @@ -381,7 +522,7 @@ class Galsec: dset = self.data[fluid] dset["r"] = np.sqrt( np.sum(dset["position"][:, :2] ** 2, axis=1) - ) # galactic radius%run + ) # galactic radius dset["phi"] = np.angle( dset["position"][:, 0] + dset["position"][:, 1] * 1j ) # galactic longitude @@ -392,6 +533,9 @@ class Galsec: dset["position"], dset["velocity"] ) # azimuthal velocity dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity + + # Number of cells / resolution element per bin (1 when not binned!) + dset["number"] = np.ones(len(dset["r"])) # Making aliases. No copy is done here. dset["x"] = dset["position"][:, 0] @@ -401,13 +545,26 @@ class Galsec: dset["vely"] = dset["velocity"][:, 1] dset["velz"] = dset["velocity"][:, 2] - def binning(self, bin_deltas): + def binning(self, bins, filter_bounds, binning_mode = "delta", dataset=None): + """ + Bin the data + """ + if dataset is None: + dataset = self.data boxsize = self.box_size keys_binning = [] for fluid in self.fluids: - data = self.data[fluid] # no copy - for name in bin_deltas: - delta = bin_deltas[name] + data = dataset[fluid] # no copy + for name in bins: + + delta = None + if binning_mode == "delta": + delta = bins[name] + elif binning_mode == "number": + bin_min = filter_bounds[name][0].value + bin_max = filter_bounds[name][1].value + delta = (bin_max - bin_min) / bins[name] + if delta is not None: if name in ["x", "y", "z"]: # stay positive @@ -422,19 +579,83 @@ class Galsec: elif name == "phi": delta_phi = (delta / data["r_bin"]) * u.rad phi_bin = (np.trunc(data["phi"] / delta_phi) + 0.5) * delta_phi + phi_bin[phi_bin >= 2 * np.pi * u.rad] = 0.5 * delta_phi[phi_bin >= 2 * np.pi * u.rad] data["phi_bin"] = phi_bin else: print(f"Unsupported binning variable {name}") break if name not in keys_binning: keys_binning.append(name) - return keys_binning - def analysis(self, bin_deltas: dict, filter_bounds: dict): + elif binning_mode == "array": + bin_array = bins[name] + bin = np.digitize(data[name], bin_array, right=False) + bin_array = np.append(bin_array, np.max(data[name])) + + + # Store the middle value + data[f"{name}_bin"] = (bin_array[bin - 1] + bin_array[bin]) / 2 + + if name not in keys_binning: + keys_binning.append(name) + return keys_binning + + + def densify(self, processed_data:dict, bin_deltas:dict, filter_bounds:dict): + """ + Densify the data by adding void bins where no data is present + """ + + bins_array = {} + for bin_name in bin_deltas: + if bin_name == "phi": + if "r" in filter_bounds: + rmax = filter_bounds["r"][1].value + else: + rmax = np.max([np.max(processed_data[fluid]["r"].value) for fluid in processed_data]) + delta_phi = bin_deltas["phi"].value / rmax + bins_array[bin_name] = np.arange(0, 2 * np.pi, delta_phi*0.9) + else: + if bin_name in filter_bounds: + bin_min = filter_bounds[bin_name][0].value + bin_deltas[bin_name].value / 2 + bin_max = filter_bounds[bin_name][1].value + else: + bin_min = np.min([np.min(processed_data[fluid][bin_name].value) for fluid in processed_data]) + bin_max = np.max([np.max(processed_data[fluid][bin_name].value) for fluid in processed_data]) + bins_array[bin_name] = np.arange(bin_min, bin_max, bin_deltas[bin_name].value*0.9) + + grids = np.meshgrid(*list(bins_array.values())) + bins_array = {key: grid.flatten() for key, grid in zip(bins_array.keys(), grids)} + + for fluid in processed_data: + keys = processed_data[fluid].keys() + units = {key: processed_data[fluid][key].unit for key in keys} + void_array = {key: np.zeros_like(list(bins_array.values())[0]) for key in keys} + + for bin_name in bin_deltas: + void_array[bin_name] = bins_array[bin_name] + + void_array = QTable(void_array, names=keys, units=units) + dense_array = vstack([processed_data[fluid], void_array]) + processed_data[fluid] = dense_array + + self.binning(dataset=processed_data, bins=bin_deltas, filter_bounds=filter_bounds) + + for fluid in processed_data: + bin_names_with_bin = [bin_name + "_bin" for bin_name in bin_deltas] + processed_data[fluid] = processed_data[fluid].group_by(bin_names_with_bin).groups.aggregate(np.add) + for bin_name in bin_deltas: + processed_data[fluid].remove_column(bin_name) + processed_data[fluid].rename_column(bin_name + "_bin", bin_name) + + return processed_data + + + def analysis(self, bins: dict, filter_bounds: dict, binning_mode="delta"): result = {} - keys = self.binning(bin_deltas) + keys = self.binning(bins, filter_bounds, binning_mode=binning_mode) keys_bin = [key + "_bin" for key in keys] for fluid in self.fluids: @@ -464,95 +685,58 @@ class Galsec: self.time.value, ) result["stars"]["sfr"] = sfr * u.Msun / u.year + + if self.dense_output and (binning_mode == "delta" or binning_mode == "number"): + result = self.densify(result, bins, filter_bounds) return result - def sector_analysis( - self, - delta_r: Quantity[u.kpc] = u.kpc, - delta_l: Quantity[u.kpc] = u.kpc, - rmin: Quantity[u.kpc] = 1 * u.kpc, - rmax: Quantity[u.kpc] = 12 * u.kpc, - zmin: Quantity[u.kpc] = -0.5 * u.kpc, - zmax: Quantity[u.kpc] = 0.5 * u.kpc, - ): - """Compute the aggregation of quantities in sectors bins + - Parameters - ---------- - delta_r : Quantity[u.kpc], optional - spacing between two radial bins, by default u.kpc - delta_l : Quantity[u.kpc], optional - spacing (in spatial units) between two azimuthal bins, by default u.kpc - rmin : Quantity[u.kpc], optional - filter out bin below that radius, by default 1*u.kpc - rmax : Quantity[u.kpc], optional - filter out bin beyond that radius, by default 12*u.kpc - """ +class GalsecTimeSeries(GalsecAnalysis): - bin_deltas = {"r": delta_r, "phi": delta_l} - filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} - self.sectors = self.analysis(bin_deltas, filter_bounds) - return self.sectors + def __init__(self, galsecs: list, loader=None): + self.galsecs = galsecs + self.loader = loader + self.loaded = loader is None - def cartesian_analysis( - self, - delta_x: Quantity[u.kpc] = u.kpc, - delta_y: Quantity[u.kpc] = u.kpc, - delta_z: Quantity[u.kpc] = u.kpc, - zmin: Quantity[u.kpc] = -0.5 * u.kpc, - zmax: Quantity[u.kpc] = 0.5 * u.kpc, - ): - """Compute the aggregation of quantities in cartesian bins + + def _analysis_single(self, galsec, bins, filter_bounds, binning_mode="delta"): + if not self.loaded: + galsec = self.loader(galsec) + analysis_result = galsec.analysis(bins, filter_bounds, binning_mode=binning_mode) + for fluid in analysis_result: + analysis_result[fluid]["time"] = galsec.time + return analysis_result - Parameters - ---------- - delta_x : Quantity[u.kpc] - spacing between two x bins - delta_y : Quantity[u.kpc] - spacing between two y bins - """ - - bin_deltas = {"x": delta_x, "y": delta_y, "z": delta_z} - filter_bounds = {"z": [zmin, zmax]} - self.grid = self.analysis(bin_deltas, filter_bounds) - return self.grid - - def ring_analysis( - self, - delta_r: Quantity[u.kpc] = u.kpc, - rmin: Quantity[u.kpc] = 1 * u.kpc, - rmax: Quantity[u.kpc] = 30 * u.kpc, - zmin: Quantity[u.kpc] = -0.5 * u.kpc, - zmax: Quantity[u.kpc] = 0.5 * u.kpc, - ): - """Compute the aggration of quantities in radial bins - - Parameters - ---------- - delta_r : Quantity[u.kpc], optional - spacing between two radial bins, by default u.kpc - rmin : Quantity[u.kpc], optional - filter out bin below that radius, by default 1*u.kpc - rmax : Quantity[u.kpc], optional - filter out bin beyond that radius, by default 30*u.kpc - """ - - bin_deltas = {"r": delta_r} - filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} - self.rings = self.analysis(bin_deltas, filter_bounds) - return self.rings - - def vertical_ring_analysis( - self, - delta_r: Quantity[u.kpc] = u.kpc, - delta_z: Quantity[u.kpc] = u.kpc, - rmin: Quantity[u.kpc] = 1 * u.kpc, - rmax: Quantity[u.kpc] = 30 * u.kpc, - zmin: Quantity[u.kpc] = -0.5 * u.kpc, - zmax: Quantity[u.kpc] = 0.5 * u.kpc, - ): - - bin_deltas = {"r": delta_r, "z": delta_z} - filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} - return self.analysis(bin_deltas, filter_bounds) + + def analysis(self, bins: dict, filter_bounds: dict, + binning_mode="delta", + aggregate=True, std=True, percentiles=[], + num_processes=1): + timed_data = [] + if num_processes == 1: + for galsec in self.galsecs: + timed_data.append(self._analysis_single(galsec, bins, filter_bounds, binning_mode)) + else: + from multiprocessing import Pool + with Pool(num_processes) as p: + timed_data = p.starmap( + self._analysis_single, + [(galsec, bins, filter_bounds, binning_mode) for galsec in self.galsecs], + ) + + averaged_data = {} + for fluid in timed_data[0]: + averaged_data[fluid] = vstack([result[fluid] for result in timed_data]) + if aggregate: + grouped_data = averaged_data[fluid].group_by(list(bins.keys())).groups + if std: + averaged_data[fluid + "_std"] = grouped_data.aggregate(lambda x: np.std(x.value)) + for q in percentiles: + averaged_data[fluid + f"_q{q}"] = grouped_data.aggregate(lambda x: np.percentile(x.value, q)) + averaged_data[fluid] = grouped_data.aggregate(np.mean) + + + return averaged_data +