Update galsec: refector, timeseries, etc.

This commit is contained in:
Noe Brucy
2024-11-28 11:50:40 +01:00
parent 62b0ced60d
commit b76d4fc688

370
galsec.py
View File

@@ -4,12 +4,13 @@
Noé Brucy 2023 Noé Brucy 2023
""" """
import numpy as np import numpy as np
from astropy.table import QTable, hstack from astropy.table import QTable, hstack, vstack
from astropy import units as u from astropy import units as u
from astropy.units.quantity import Quantity from astropy.units.quantity import Quantity
from collections import defaultdict from collections import defaultdict
from numba import jit, prange from numba import jit, prange
from abc import ABC
_atomic_mass = { _atomic_mass = {
"H+": 1, "H+": 1,
@@ -296,15 +297,153 @@ def regroup(
return result return result
class Galsec: class GalsecAnalysis(ABC):
def __init__(self):
raise NotImplementedError("This method should be implemented in a subclass")
def analysis(self, bin_deltas: dict, filter_bounds: dict):
raise NotImplementedError("This method should be implemented in a subclass")
def sector_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
delta_l: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 12 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
**kwargs,
):
"""Compute the aggregation of quantities in sectors bins
Parameters
----------
delta_r : Quantity[u.kpc], optional
spacing between two radial bins, by default u.kpc
delta_l : Quantity[u.kpc], optional
spacing (in spatial units) between two azimuthal bins, by default u.kpc
rmin : Quantity[u.kpc], optional
filter out bin below that radius, by default 1*u.kpc
rmax : Quantity[u.kpc], optional
filter out bin beyond that radius, by default 12*u.kpc
"""
bin_deltas = {"r": delta_r, "phi": delta_l}
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
self.sectors = self.analysis(bin_deltas, filter_bounds, **kwargs)
return self.sectors
def cartesian_analysis(
self,
delta_x: Quantity[u.kpc] = u.kpc,
delta_y: Quantity[u.kpc] = u.kpc,
delta_z: Quantity[u.kpc] = u.kpc,
xmin: Quantity[u.kpc] = -30 * u.kpc,
xmax: Quantity[u.kpc] = 30 * u.kpc,
ymin: Quantity[u.kpc] = -30 * u.kpc,
ymax: Quantity[u.kpc] = 30 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
**kwargs,
):
"""Compute the aggregation of quantities in cartesian bins
Parameters
----------
delta_x : Quantity[u.kpc]
spacing between two x bins
delta_y : Quantity[u.kpc]
spacing between two y bins
"""
bin_deltas = {"x": delta_x, "y": delta_y, "z": delta_z}
filter_bounds = {"x": [xmin, xmax], "y": [ymin, ymax], "z": [zmin, zmax]}
self.grid = self.analysis(bin_deltas, filter_bounds)
return self.grid
def ring_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 30 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
**kwargs,
):
"""Compute the aggration of quantities in radial bins
Parameters
----------
delta_r : Quantity[u.kpc], optional
spacing between two radial bins, by default u.kpc
rmin : Quantity[u.kpc], optional
filter out bin below that radius, by default 1*u.kpc
rmax : Quantity[u.kpc], optional
filter out bin beyond that radius, by default 30*u.kpc
"""
bin_deltas = {"r": delta_r}
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
self.rings = self.analysis(bin_deltas, filter_bounds, **kwargs)
return self.rings
def vertical_ring_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
delta_z: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 30 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
**kwargs,
):
bin_deltas = {"r": delta_r, "z": delta_z}
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
return self.analysis(bin_deltas, filter_bounds, **kwargs)
def scale_analysis(
self,
sub_analysis_method: str = "cartesian_analysis",
scales: dict = {
"delta_x": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc,
"delta_y": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc,
"delta_z": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc,
},
**kwargs):
nb_scales = len(list(scales.values())[0])
methods = {
"cartesian_analysis" : self.cartesian_analysis,
"sector_analysis" : self.sector_analysis,
"ring_analysis" : self.ring_analysis,
"vertical_ring_analysis" : self.vertical_ring_analysis,
}
for i in range(nb_scales):
scale_i = {key: value[i] for key, value in scales.items()}
print(f"Doing {sub_analysis_method} for scale {i}/{nb_scales}: {scale_i}")
grid = methods[sub_analysis_method](**scale_i, **kwargs)
if i==0:
all_grids = grid
else:
all_grids = {fluid: vstack([all_grids[fluid], grid[fluid]]) for fluid in self.fluids}
return all_grids
class Galsec(GalsecAnalysis):
""" """
Galactic sector extractor Galactic sector extractor
""" """
fluids = ["gas", "stars", "dm"] fluids = ["gas", "stars", "dm"]
extensive_fields = defaultdict( extensive_fields = defaultdict(
lambda: ["mass"], lambda: ["mass", "number"],
gas=["mass", "volume"], gas=["mass", "volume", "number"],
) )
weighted_fields = defaultdict(lambda: ["velr", "velphi", "velx", "vely", "velz"]) weighted_fields = defaultdict(lambda: ["velr", "velphi", "velx", "vely", "velz"])
@@ -322,8 +461,8 @@ class Galsec:
species = {} species = {}
abundance_He = 0.1 abundance_He = 0.1
def __init__(self, data: dict, copy=True) -> None: def __init__(self, data: dict, copy=True, dense_output=True) -> None:
"""Initiazise a Galaxasec object """Initiazise a Galsec object
Parameters Parameters
---------- ----------
@@ -348,7 +487,7 @@ class Galsec:
velocity float array (Ngas, 3) [km/s] velocity float array (Ngas, 3) [km/s]
mass float array (Ngas) [Msun] mass float array (Ngas) [Msun]
copy : bool, default=True copy : bool, default=True
Wheter the data should be copied. Whether the data should be copied.
""" """
self.data = {} self.data = {}
@@ -373,6 +512,8 @@ class Galsec:
self.compute_derived_values() self.compute_derived_values()
self.dense_output = dense_output
def compute_derived_values(self) -> None: def compute_derived_values(self) -> None:
""" """
Helper function to computed values derivated from input data Helper function to computed values derivated from input data
@@ -381,7 +522,7 @@ class Galsec:
dset = self.data[fluid] dset = self.data[fluid]
dset["r"] = np.sqrt( dset["r"] = np.sqrt(
np.sum(dset["position"][:, :2] ** 2, axis=1) np.sum(dset["position"][:, :2] ** 2, axis=1)
) # galactic radius%run ) # galactic radius
dset["phi"] = np.angle( dset["phi"] = np.angle(
dset["position"][:, 0] + dset["position"][:, 1] * 1j dset["position"][:, 0] + dset["position"][:, 1] * 1j
) # galactic longitude ) # galactic longitude
@@ -393,6 +534,9 @@ class Galsec:
) # azimuthal velocity ) # azimuthal velocity
dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity
# Number of cells / resolution element per bin (1 when not binned!)
dset["number"] = np.ones(len(dset["r"]))
# Making aliases. No copy is done here. # Making aliases. No copy is done here.
dset["x"] = dset["position"][:, 0] dset["x"] = dset["position"][:, 0]
dset["y"] = dset["position"][:, 1] dset["y"] = dset["position"][:, 1]
@@ -401,13 +545,26 @@ class Galsec:
dset["vely"] = dset["velocity"][:, 1] dset["vely"] = dset["velocity"][:, 1]
dset["velz"] = dset["velocity"][:, 2] dset["velz"] = dset["velocity"][:, 2]
def binning(self, bin_deltas): def binning(self, bins, filter_bounds, binning_mode = "delta", dataset=None):
"""
Bin the data
"""
if dataset is None:
dataset = self.data
boxsize = self.box_size boxsize = self.box_size
keys_binning = [] keys_binning = []
for fluid in self.fluids: for fluid in self.fluids:
data = self.data[fluid] # no copy data = dataset[fluid] # no copy
for name in bin_deltas: for name in bins:
delta = bin_deltas[name]
delta = None
if binning_mode == "delta":
delta = bins[name]
elif binning_mode == "number":
bin_min = filter_bounds[name][0].value
bin_max = filter_bounds[name][1].value
delta = (bin_max - bin_min) / bins[name]
if delta is not None: if delta is not None:
if name in ["x", "y", "z"]: if name in ["x", "y", "z"]:
# stay positive # stay positive
@@ -422,19 +579,83 @@ class Galsec:
elif name == "phi": elif name == "phi":
delta_phi = (delta / data["r_bin"]) * u.rad delta_phi = (delta / data["r_bin"]) * u.rad
phi_bin = (np.trunc(data["phi"] / delta_phi) + 0.5) * delta_phi phi_bin = (np.trunc(data["phi"] / delta_phi) + 0.5) * delta_phi
phi_bin[phi_bin >= 2 * np.pi * u.rad] = 0.5 * delta_phi[phi_bin >= 2 * np.pi * u.rad]
data["phi_bin"] = phi_bin data["phi_bin"] = phi_bin
else: else:
print(f"Unsupported binning variable {name}") print(f"Unsupported binning variable {name}")
break break
if name not in keys_binning: if name not in keys_binning:
keys_binning.append(name) keys_binning.append(name)
elif binning_mode == "array":
bin_array = bins[name]
bin = np.digitize(data[name], bin_array, right=False)
bin_array = np.append(bin_array, np.max(data[name]))
# Store the middle value
data[f"{name}_bin"] = (bin_array[bin - 1] + bin_array[bin]) / 2
if name not in keys_binning:
keys_binning.append(name)
return keys_binning return keys_binning
def analysis(self, bin_deltas: dict, filter_bounds: dict):
def densify(self, processed_data:dict, bin_deltas:dict, filter_bounds:dict):
"""
Densify the data by adding void bins where no data is present
"""
bins_array = {}
for bin_name in bin_deltas:
if bin_name == "phi":
if "r" in filter_bounds:
rmax = filter_bounds["r"][1].value
else:
rmax = np.max([np.max(processed_data[fluid]["r"].value) for fluid in processed_data])
delta_phi = bin_deltas["phi"].value / rmax
bins_array[bin_name] = np.arange(0, 2 * np.pi, delta_phi*0.9)
else:
if bin_name in filter_bounds:
bin_min = filter_bounds[bin_name][0].value + bin_deltas[bin_name].value / 2
bin_max = filter_bounds[bin_name][1].value
else:
bin_min = np.min([np.min(processed_data[fluid][bin_name].value) for fluid in processed_data])
bin_max = np.max([np.max(processed_data[fluid][bin_name].value) for fluid in processed_data])
bins_array[bin_name] = np.arange(bin_min, bin_max, bin_deltas[bin_name].value*0.9)
grids = np.meshgrid(*list(bins_array.values()))
bins_array = {key: grid.flatten() for key, grid in zip(bins_array.keys(), grids)}
for fluid in processed_data:
keys = processed_data[fluid].keys()
units = {key: processed_data[fluid][key].unit for key in keys}
void_array = {key: np.zeros_like(list(bins_array.values())[0]) for key in keys}
for bin_name in bin_deltas:
void_array[bin_name] = bins_array[bin_name]
void_array = QTable(void_array, names=keys, units=units)
dense_array = vstack([processed_data[fluid], void_array])
processed_data[fluid] = dense_array
self.binning(dataset=processed_data, bins=bin_deltas, filter_bounds=filter_bounds)
for fluid in processed_data:
bin_names_with_bin = [bin_name + "_bin" for bin_name in bin_deltas]
processed_data[fluid] = processed_data[fluid].group_by(bin_names_with_bin).groups.aggregate(np.add)
for bin_name in bin_deltas:
processed_data[fluid].remove_column(bin_name)
processed_data[fluid].rename_column(bin_name + "_bin", bin_name)
return processed_data
def analysis(self, bins: dict, filter_bounds: dict, binning_mode="delta"):
result = {} result = {}
keys = self.binning(bin_deltas) keys = self.binning(bins, filter_bounds, binning_mode=binning_mode)
keys_bin = [key + "_bin" for key in keys] keys_bin = [key + "_bin" for key in keys]
for fluid in self.fluids: for fluid in self.fluids:
@@ -465,94 +686,57 @@ class Galsec:
) )
result["stars"]["sfr"] = sfr * u.Msun / u.year result["stars"]["sfr"] = sfr * u.Msun / u.year
if self.dense_output and (binning_mode == "delta" or binning_mode == "number"):
result = self.densify(result, bins, filter_bounds)
return result return result
def sector_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
delta_l: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 12 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggregation of quantities in sectors bins
Parameters
----------
delta_r : Quantity[u.kpc], optional
spacing between two radial bins, by default u.kpc
delta_l : Quantity[u.kpc], optional
spacing (in spatial units) between two azimuthal bins, by default u.kpc
rmin : Quantity[u.kpc], optional
filter out bin below that radius, by default 1*u.kpc
rmax : Quantity[u.kpc], optional
filter out bin beyond that radius, by default 12*u.kpc
"""
bin_deltas = {"r": delta_r, "phi": delta_l} class GalsecTimeSeries(GalsecAnalysis):
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
self.sectors = self.analysis(bin_deltas, filter_bounds)
return self.sectors
def cartesian_analysis( def __init__(self, galsecs: list, loader=None):
self, self.galsecs = galsecs
delta_x: Quantity[u.kpc] = u.kpc, self.loader = loader
delta_y: Quantity[u.kpc] = u.kpc, self.loaded = loader is None
delta_z: Quantity[u.kpc] = u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggregation of quantities in cartesian bins
Parameters
----------
delta_x : Quantity[u.kpc]
spacing between two x bins
delta_y : Quantity[u.kpc]
spacing between two y bins
"""
bin_deltas = {"x": delta_x, "y": delta_y, "z": delta_z} def _analysis_single(self, galsec, bins, filter_bounds, binning_mode="delta"):
filter_bounds = {"z": [zmin, zmax]} if not self.loaded:
self.grid = self.analysis(bin_deltas, filter_bounds) galsec = self.loader(galsec)
return self.grid analysis_result = galsec.analysis(bins, filter_bounds, binning_mode=binning_mode)
for fluid in analysis_result:
analysis_result[fluid]["time"] = galsec.time
return analysis_result
def ring_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 30 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggration of quantities in radial bins
Parameters def analysis(self, bins: dict, filter_bounds: dict,
---------- binning_mode="delta",
delta_r : Quantity[u.kpc], optional aggregate=True, std=True, percentiles=[],
spacing between two radial bins, by default u.kpc num_processes=1):
rmin : Quantity[u.kpc], optional timed_data = []
filter out bin below that radius, by default 1*u.kpc if num_processes == 1:
rmax : Quantity[u.kpc], optional for galsec in self.galsecs:
filter out bin beyond that radius, by default 30*u.kpc timed_data.append(self._analysis_single(galsec, bins, filter_bounds, binning_mode))
""" else:
from multiprocessing import Pool
with Pool(num_processes) as p:
timed_data = p.starmap(
self._analysis_single,
[(galsec, bins, filter_bounds, binning_mode) for galsec in self.galsecs],
)
bin_deltas = {"r": delta_r} averaged_data = {}
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} for fluid in timed_data[0]:
self.rings = self.analysis(bin_deltas, filter_bounds) averaged_data[fluid] = vstack([result[fluid] for result in timed_data])
return self.rings if aggregate:
grouped_data = averaged_data[fluid].group_by(list(bins.keys())).groups
if std:
averaged_data[fluid + "_std"] = grouped_data.aggregate(lambda x: np.std(x.value))
for q in percentiles:
averaged_data[fluid + f"_q{q}"] = grouped_data.aggregate(lambda x: np.percentile(x.value, q))
averaged_data[fluid] = grouped_data.aggregate(np.mean)
def vertical_ring_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
delta_z: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 30 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
bin_deltas = {"r": delta_r, "z": delta_z} return averaged_data
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
return self.analysis(bin_deltas, filter_bounds)