Files
galaxy_ninja/galsec.py
2024-02-27 17:24:00 +01:00

559 lines
18 KiB
Python

# coding: utf-8
"""
Galaxy kiloparsec extractor
Noé Brucy 2023
"""
import numpy as np
from astropy.table import QTable, hstack
from astropy import units as u
from astropy.units.quantity import Quantity
from collections import defaultdict
from numba import jit, prange
_atomic_mass = {
"H+": 1,
"H2": 2,
"CO": 28,
}
def vect_r(position: np.array, vector: np.array) -> np.array:
"""Radial component of a vector
Parameters
----------
position : np.array (N, 3)
(only reads x and y)
vector : np.array (N, 3)
(only reads x and y components)
Returns
-------
np.array (N)
Radial component
"""
r = position[:, :2]
ur = np.transpose((np.transpose(r) / np.sqrt(np.sum(r**2, axis=1))))
return np.einsum("ij, ij -> i", vector[:, :2], ur)
def vect_phi(position: np.array, vector: np.array):
"""Azimuthal component of a vecto
Parameters
----------
position : np.array (N, 3)
(only reads x and y)
vector : np.array (N, 3)
(only reads x and y components)
Returns
-------
np.array (N)
Azimuthal component
"""
r = position[:, :2]
r_norm = np.sqrt(np.sum(r**2, axis=1))
rot = np.array([[0, -1], [1, 0]])
uphi = np.transpose(np.einsum("ij, kj -> ik", rot, r) / r_norm)
return np.einsum("ij,ij -> i", vector[:, :2], uphi)
def filter_data(table, bounds):
mask = np.ones(len(table), dtype=bool)
for field in bounds:
field_min, field_max = bounds[field]
if field_min is not None:
mask &= table[field] > field_min
if field_max is not None:
mask &= table[field] < field_max
return table[mask]
@jit(nopython=True, parallel=True)
def get_sfr(
mass: np.ndarray,
birth_time: np.ndarray,
group_indices: np.ndarray,
time: float,
average_on: float = 30,
):
"""Compute SFR in each group
Parameters
----------
mass : np.ndarray
mass in Msun, dim Ncell, sorted by group
birth_time : np.ndarray
birth time in Myr, dim Ncell, sorted by group
group_indices : np.ndarray
array of the indices of each bin, dim Ngroup + 1
time : float
time in Myr
average_on : float, optional
time sfr is averaged on in Myr, by default 30
Returns
-------
np.ndarray
sfr for each group in Msun/year, dim Ngroup
"""
sfr = np.zeros(len(group_indices) - 1)
dtime = min(average_on, time)
if dtime > 0:
for i in prange(len(group_indices) - 1):
slice_group = slice(group_indices[i], group_indices[i + 1])
mask = (birth_time[slice_group] > max(time - average_on, 0)) & (
birth_time[slice_group] < time
)
sfr[i] = np.sum(mass[slice_group][mask]) / (1e6 * dtime)
return sfr
@jit(nopython=True, parallel=True)
def mul_grouped(array: np.ndarray, group_indices: np.ndarray, to_mul: np.ndarray):
"""Multiply each group by a different value
Parameters
----------
array : np.ndarray
array to be multiplied, dim Ncell, sorted by group
group_indices : np.ndarray
dim Ngroup + 1
to_mul : np.ndarray
array to multiply with, dim Ngroup
Returns
-------
np.ndarray
result, dim Ncell
"""
for i in prange(len(group_indices) - 1):
slice_group = slice(group_indices[i], group_indices[i + 1])
array[slice_group] *= to_mul[i]
return array
def aggregate(
grouped_data: QTable,
weight_field: str = "mass",
extensive_fields: str = [],
weighted_fields: list = ["velr", "velphi", "velx", "vely", "velz"],
compute_dispersions=True,
species: list = [],
abundance_He=0.1,
) -> QTable:
"""Aggregate wisely from grouped data
Parameters
----------
grouped_data : QTable
should already have group information
weight_field : str, optional
the field used for the weighting of the non extensive quantities, by default "mass"
extensive_fields : str, optional
these will be summed. Should include weight_field, by default ["mass", "ek"]
weighted_fields : list, optional
the field that will be weighted, by default ["velr", "velphi", "velz"]
Returns
-------
QTable
a table with the aggregated value for each group
"""
if weight_field not in extensive_fields:
extensive_fields.append(weight_field)
weight_fields_species = []
weighted_fields_species = []
for i, spec in enumerate(species):
abundance = grouped_data["chemical_abundances"][:, i]
atomic_mass = _atomic_mass[spec]
grouped_data[f"{weight_field}_{spec}"] = (
grouped_data[weight_field]
* abundance
* atomic_mass
/ (1 + 4 * abundance_He)
)
weight_fields_species.append(f"{weight_field}_{spec}")
for field in weighted_fields:
# Multiply weighted field by the weight v*m
for spec in species:
grouped_data[f"{field}_{spec}"] = (
grouped_data[field] * grouped_data[f"{weight_field}_{spec}"]
)
weighted_fields_species.append(f"{field}_{spec}")
grouped_data[field] *= grouped_data[weight_field]
to_sum = (
extensive_fields
+ weighted_fields
+ weight_fields_species
+ weighted_fields_species
)
# Compute the sum of all fields
binned_data = grouped_data[to_sum].groups.aggregate(np.add)
for field in weighted_fields:
for spec in ["all"] + species:
if spec == "all":
weight_field_here = weight_field
field_here = field
else:
weight_field_here = f"{weight_field}_{spec}"
field_here = f"{field}_{spec}"
# For weighted field, divided by the total mass
# We obtain the weighted mean vmean = 𝚺 (m*v) / 𝚺 m
# First we remove 0s to avoid dividing by 0
binned_data[weight_field_here][binned_data[weight_field_here] == 0] = (
1.0 * binned_data[weight_field_here].unit
)
binned_data[field_here] /= binned_data[weight_field_here]
if compute_dispersions:
# We also compute the weighted dispersion around the weighted mean
# First we get m * vmean
weight_mean = grouped_data[weight_field_here].value.copy()
mul_grouped(
weight_mean,
grouped_data.groups.indices,
binned_data[field_here].value,
)
# Then we comput (m*v - m*vmean)
grouped_data[field_here] -= weight_mean * grouped_data[field_here].unit
# Compute m * (v - vmean)^2 = (m*v - m*vmean) * (m*v - m*vmean) / m
# Since we don't want to divide by zero, we first clean them out
grouped_data[weight_field_here][
grouped_data[weight_field_here] == 0
] = (1.0 * grouped_data[weight_field_here].unit)
grouped_data[field_here] *= (
grouped_data[field_here] / grouped_data[weight_field_here]
)
if not np.all(np.isfinite(grouped_data[field_here])):
raise FloatingPointError
# Compute 𝚺 m * (v - vmean)^2
binned_data[f"sigma_{field_here}"] = grouped_data[
field_here,
].groups.aggregate(np.add)[field_here]
# Compute sigma = 𝚺 (m * (v - vmean)^2) / 𝚺 m
binned_data[f"sigma_{field_here}"] = np.sqrt(
binned_data[f"sigma_{field_here}"] / binned_data[weight_field_here]
)
return binned_data
def regroup(
galex,
dataset_key="sectors",
bin_key="r",
weighted_fields=None,
compute_dispersions=True,
):
dataset = galex.__dict__[dataset_key]
result = {}
for fluid in galex.fluids:
if weighted_fields is None:
weighted_fields_fluid = galex.weighted_fields[fluid]
else:
weighted_fields_fluid = weighted_fields
grouped = dataset[fluid].group_by(bin_key)
agg_spec = {}
for spec in galex.species[fluid]:
weighted_fields_spec = [
f"{field}_{spec}" for field in weighted_fields_fluid
]
agg_spec[spec] = aggregate(
grouped,
weight_field=f"mass_{spec}",
extensive_fields=[],
weighted_fields=weighted_fields_spec,
compute_dispersions=compute_dispersions,
)
extensive_fields = []
if "sfr" in grouped.keys():
extensive_fields.append("sfr")
bin_value = grouped[
bin_key,
].groups.aggregate(np.fmin)
all_values = aggregate(
grouped,
extensive_fields=extensive_fields,
weighted_fields=weighted_fields_fluid,
compute_dispersions=compute_dispersions,
)
result[fluid] = hstack([bin_value, all_values] + list(agg_spec.values()))
return result
class Galsec:
"""
Galactic sector extractor
"""
fluids = ["gas", "stars", "dm"]
extensive_fields = defaultdict(
lambda: ["mass"],
gas=["mass", "volume"],
)
weighted_fields = defaultdict(lambda: ["velr", "velphi", "velx", "vely", "velz"])
units = {
"position": u.kpc,
"volume": u.kpc**3,
"mass": u.Msun,
"density": u.g / u.cm**3,
"velocity": u.km / u.s,
"birth_time": u.Myr,
"internal_energy": u.km**2 / u.s**2,
}
species = {}
abundance_He = 0.1
def __init__(self, data: dict, copy=True) -> None:
"""Initiazise a Galaxasec object
Parameters
----------
cell_data : dict
A dataset of cells in the following format:
header:
time float [Myr] time since the start of the simulation
box_size float [kpc] size of the box
fluids str list list of fluids included
gas:
position float array (Ngas, 3) [kpc], centered
volume float array (Ngas) [pc^3]
velocity float array (Ngas, 3) [km/s]
mass float array (Ngas) [Msun]
stars:
position float array (Nstar, 3) [kpc], centered
velocity float array (Nstar, 3) [km/s]
mass float array (Nstar) [Msun]
birth_time float array (Nstar) [Myr]
dm:
position float array (Ngas, 3) [kpc], centered
velocity float array (Ngas, 3) [km/s]
mass float array (Ngas) [Msun]
copy : bool, default=True
Wheter the data should be copied.
"""
self.data = {}
if "fluids" in data["header"]:
self.fluids = data["header"]["fluids"]
for fluid in self.fluids:
units = {}
for key in data[fluid]:
if key in self.units:
units[key] = self.units[key]
self.data[fluid] = QTable(data[fluid], units=units, copy=copy)
self.species[fluid] = []
self.time = data["header"]["time"] * u.Myr
self.box_size = data["header"]["box_size"] * u.kpc
if "species" in data["header"]:
assert "chemical_abundances" in data["gas"]
self.species["gas"] = data["header"]["species"]
self.abundance_He = data["header"]["ABHE"]
self.compute_derived_values()
def compute_derived_values(self) -> None:
"""
Helper function to computed values derivated from input data
"""
for fluid in self.fluids:
dset = self.data[fluid]
dset["r"] = np.sqrt(
np.sum(dset["position"][:, :2] ** 2, axis=1)
) # galactic radius%run
dset["phi"] = np.angle(
dset["position"][:, 0] + dset["position"][:, 1] * 1j
) # galactic longitude
dset["phi"][dset["phi"] < 0] += (
2 * np.pi * u.rad
) # rescaling to get only positive values
dset["velphi"] = vect_phi(
dset["position"], dset["velocity"]
) # azimuthal velocity
dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity
# Making aliases. No copy is done here.
dset["x"] = dset["position"][:, 0]
dset["y"] = dset["position"][:, 1]
dset["z"] = dset["position"][:, 2]
dset["velx"] = dset["velocity"][:, 0]
dset["vely"] = dset["velocity"][:, 1]
dset["velz"] = dset["velocity"][:, 2]
def binning(self, bin_deltas):
boxsize = self.box_size
keys_binning = []
for fluid in self.fluids:
data = self.data[fluid] # no copy
for name in bin_deltas:
delta = bin_deltas[name]
if delta is not None:
if name in ["x", "y", "z"]:
# stay positive
pos = data[name] + 0.5 * boxsize
bin = np.floor(pos / delta)
# Store the middle value
data[f"{name}_bin"] = (bin + 0.5) * delta - 0.5 * boxsize
elif name == "r":
r_bin = np.trunc(data["r"] / delta)
r_bin = (r_bin + 0.5) * delta # Store the middle value
data["r_bin"] = r_bin
elif name == "phi":
delta_phi = (delta / data["r_bin"]) * u.rad
phi_bin = (np.trunc(data["phi"] / delta_phi) + 0.5) * delta_phi
data["phi_bin"] = phi_bin
else:
print(f"Unsupported binning variable {name}")
break
if name not in keys_binning:
keys_binning.append(name)
return keys_binning
def analysis(self, bin_deltas: dict, filter_bounds: dict):
result = {}
keys = self.binning(bin_deltas)
keys_bin = [key + "_bin" for key in keys]
for fluid in self.fluids:
grouped_data = filter_data(self.data[fluid], filter_bounds).group_by(
keys_bin
)
result[fluid] = hstack(
[
grouped_data[keys_bin].groups.aggregate(np.fmin),
aggregate(
grouped_data,
extensive_fields=self.extensive_fields[fluid],
weighted_fields=self.weighted_fields[fluid],
species=self.species[fluid],
abundance_He=self.abundance_He,
),
]
)
for key in keys:
result[fluid].rename_column(key + "_bin", key)
if fluid == "stars" and "birth_time" in self.data["stars"].keys():
sfr = get_sfr(
grouped_data["mass"].value,
grouped_data["birth_time"].value,
grouped_data.groups.indices,
self.time.value,
)
result["stars"]["sfr"] = sfr * u.Msun / u.year
return result
def sector_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
delta_l: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 12 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggregation of quantities in sectors bins
Parameters
----------
delta_r : Quantity[u.kpc], optional
spacing between two radial bins, by default u.kpc
delta_l : Quantity[u.kpc], optional
spacing (in spatial units) between two azimuthal bins, by default u.kpc
rmin : Quantity[u.kpc], optional
filter out bin below that radius, by default 1*u.kpc
rmax : Quantity[u.kpc], optional
filter out bin beyond that radius, by default 12*u.kpc
"""
bin_deltas = {"r": delta_r, "phi": delta_l}
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
self.sectors = self.analysis(bin_deltas, filter_bounds)
return self.sectors
def cartesian_analysis(
self,
delta_x: Quantity[u.kpc] = u.kpc,
delta_y: Quantity[u.kpc] = u.kpc,
delta_z: Quantity[u.kpc] = u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggregation of quantities in cartesian bins
Parameters
----------
delta_x : Quantity[u.kpc]
spacing between two x bins
delta_y : Quantity[u.kpc]
spacing between two y bins
"""
bin_deltas = {"x": delta_x, "y": delta_y, "z": delta_z}
filter_bounds = {"z": [zmin, zmax]}
self.grid = self.analysis(bin_deltas, filter_bounds)
return self.grid
def ring_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 30 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggration of quantities in radial bins
Parameters
----------
delta_r : Quantity[u.kpc], optional
spacing between two radial bins, by default u.kpc
rmin : Quantity[u.kpc], optional
filter out bin below that radius, by default 1*u.kpc
rmax : Quantity[u.kpc], optional
filter out bin beyond that radius, by default 30*u.kpc
"""
bin_deltas = {"r": delta_r}
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
self.rings = self.analysis(bin_deltas, filter_bounds)
return self.rings
def vertical_ring_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
delta_z: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 30 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
bin_deltas = {"r": delta_r, "z": delta_z}
filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]}
return self.analysis(bin_deltas, filter_bounds)