add galsec
This commit is contained in:
355
galsec.py
Normal file
355
galsec.py
Normal file
@@ -0,0 +1,355 @@
|
||||
# coding: utf-8
|
||||
"""
|
||||
Galaxy kiloparsec extractor
|
||||
|
||||
"""
|
||||
import numpy as np
|
||||
from astropy.table import QTable, hstack
|
||||
from astropy import units as u
|
||||
from astropy.units.quantity import Quantity
|
||||
|
||||
|
||||
def vect_r(position: np.array, vector: np.array) -> np.array:
|
||||
"""Radial component of a vector
|
||||
|
||||
Parameters
|
||||
----------
|
||||
position : np.array (N, 3)
|
||||
(only reads x and y)
|
||||
vector : np.array (N, 3)
|
||||
(only reads x and y components)
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.array (N)
|
||||
Radial component
|
||||
"""
|
||||
r = position[:, :2]
|
||||
ur = np.transpose((np.transpose(r) / np.sqrt(np.sum(r**2, axis=1))))
|
||||
return np.einsum("ij, ij -> i", vector[:, :2], ur)
|
||||
|
||||
|
||||
def vect_phi(position: np.array, vector: np.array):
|
||||
"""Azimuthal component of a vecto
|
||||
|
||||
Parameters
|
||||
----------
|
||||
position : np.array (N, 3)
|
||||
(only reads x and y)
|
||||
vector : np.array (N, 3)
|
||||
(only reads x and y components)
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.array (N)
|
||||
Azimuthal component
|
||||
"""
|
||||
r = position[:, :2]
|
||||
r_norm = np.sqrt(np.sum(r**2, axis=1))
|
||||
rot = np.array([[0, -1], [1, 0]])
|
||||
uphi = np.transpose(np.einsum("ij, kj -> ik", rot, r) / r_norm)
|
||||
return np.einsum("ij,ij -> i", vector[:, :2], uphi)
|
||||
|
||||
|
||||
def get_sfr(
|
||||
stars: QTable, time: Quantity[u.Myr], average_on: Quantity[u.Myr] = 30 * u.Myr
|
||||
) -> Quantity[u.Msun / u.year]:
|
||||
"""_summary_
|
||||
|
||||
Parameters
|
||||
----------
|
||||
stars : QTable
|
||||
_description_
|
||||
time : Quantity[u.Myr],
|
||||
time at wich the SFR should be computed
|
||||
average_on : Quantity, optional
|
||||
compute the sfr on the last `average_on` years, by default 30*u.Myr
|
||||
|
||||
Returns
|
||||
-------
|
||||
float
|
||||
SFR in Msun/yr
|
||||
"""
|
||||
try:
|
||||
mask = (stars["birth_time"] > max(time - average_on, 0 * u.Myr)) & (
|
||||
stars["birth_time"] < time
|
||||
)
|
||||
dtime = min(average_on, time).to(u.year)
|
||||
if dtime == 0:
|
||||
sfr = 0.0 * u.Msun / u.year
|
||||
else:
|
||||
sfr = np.sum(stars["mass"][mask]) / dtime
|
||||
except KeyError:
|
||||
sfr = 0.0 * u.Msun / u.year
|
||||
return sfr
|
||||
|
||||
|
||||
def aggregate(
|
||||
grouped_data: QTable,
|
||||
weight_field: str = "mass",
|
||||
extensive_fields: str = ["mass", "ek"],
|
||||
weighted_fields: list = ["velr", "velphi", "velz"],
|
||||
) -> QTable:
|
||||
"""Aggregate wisely from grouped data
|
||||
|
||||
Parameters
|
||||
----------
|
||||
grouped_data : QTable
|
||||
should already have group information
|
||||
weight_field : str, optional
|
||||
the field used for the weighting of the non extensive quantities, by default "mass"
|
||||
extensive_fields : str, optional
|
||||
these will be summed. Should include weight_field, by default ["mass", "ek"]
|
||||
weighted_fields : list, optional
|
||||
the field that will be weighted, by default ["velr", "velphi", "velz"]
|
||||
|
||||
Returns
|
||||
-------
|
||||
QTable
|
||||
a tabble with the aggregated value for each group
|
||||
"""
|
||||
assert weight_field in extensive_fields
|
||||
|
||||
for field in weighted_fields:
|
||||
grouped_data[field] *= grouped_data[weight_field]
|
||||
|
||||
binned_data = grouped_data[extensive_fields + weighted_fields].groups.aggregate(
|
||||
np.add
|
||||
)
|
||||
|
||||
for field in weighted_fields:
|
||||
binned_data[field] /= binned_data[weight_field] # weighted mean
|
||||
|
||||
# Veocity dispersion
|
||||
grouped_data[field] /= grouped_data[weight_field]
|
||||
for i in range(len(grouped_data.groups)):
|
||||
slice = grouped_data.groups.indices[i], grouped_data.groups.indices[i + 1]
|
||||
grouped_data[slice[0] : slice[1]][field] -= binned_data[i][field]
|
||||
grouped_data[field] = grouped_data[weight_field] * grouped_data[field] ** 2
|
||||
binned_data[f"sigma_{field}"] = grouped_data[field,].groups.aggregate(
|
||||
np.add
|
||||
)[field]
|
||||
binned_data[f"sigma_{field}"] = np.sqrt(
|
||||
binned_data[f"sigma_{field}"] / binned_data[weight_field]
|
||||
)
|
||||
|
||||
return binned_data
|
||||
|
||||
|
||||
class Galsec:
|
||||
"""
|
||||
Galactic sector extractor
|
||||
"""
|
||||
|
||||
fluids = ["gas", "stars", "dm"]
|
||||
units = {
|
||||
"gas": {
|
||||
"position": u.kpc,
|
||||
"volume": u.kpc**3,
|
||||
"velocity": u.km / u.s,
|
||||
"mass": u.Msun,
|
||||
},
|
||||
"stars": {
|
||||
"position": u.kpc,
|
||||
"velocity": u.km / u.s,
|
||||
"mass": u.Msun,
|
||||
"birth_time": u.Myr,
|
||||
},
|
||||
"dm": {"position": u.kpc, "velocity": u.km / u.s, "mass": u.Msun},
|
||||
}
|
||||
|
||||
def __init__(self, data: dict, copy=True) -> None:
|
||||
"""Initiazise a Galaxasec object
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cell_data : dict
|
||||
A dataset of cells in the following format:
|
||||
header:
|
||||
time float [Myr] time since the start of the simulation
|
||||
gas:
|
||||
position float array (Ngas, 3) [kpc], centered
|
||||
volume float array (Ngas) [pc^3]
|
||||
velocity float array (Ngas, 3) [km/s]
|
||||
mass float array (Ngas) [Msun]
|
||||
stars:
|
||||
position float array (Nstar, 3) [kpc], centered
|
||||
velocity float array (Nstar, 3) [km/s]
|
||||
mass float array (Nstar) [Msun]
|
||||
birth_time float array (Nstar) [Myr]
|
||||
dm:
|
||||
position float array (Ngas, 3) [kpc], centered
|
||||
velocity float array (Ngas, 3) [km/s]
|
||||
mass float array (Ngas) [Msun]
|
||||
maps:
|
||||
extent float list [xmin, xmax, ymin, ymax] Coordinates of the edges of the map, centered
|
||||
gas_coldens float array (Nx, Ny) [Msun/pc^2] Map of column density
|
||||
copy : bool, default=True
|
||||
Wheter the data should be copied.
|
||||
"""
|
||||
|
||||
self.data = {}
|
||||
|
||||
for fluid in self.fluids:
|
||||
self.data[fluid] = QTable(data[fluid], units=self.units[fluid], copy=copy)
|
||||
|
||||
self.time = data["header"]["time"] * u.Myr
|
||||
|
||||
self.compute_derived_values()
|
||||
|
||||
def compute_derived_values(self) -> None:
|
||||
"""
|
||||
Helper function to computed values derivated from input data
|
||||
"""
|
||||
for fluid in self.fluids:
|
||||
dset = self.data[fluid]
|
||||
dset["r"] = np.sqrt(
|
||||
np.sum(dset["position"][:, :2] ** 2, axis=1)
|
||||
) # galactic radius
|
||||
dset["phi"] = np.angle(
|
||||
dset["position"][:, 0] + dset["position"][:, 1] * 1j
|
||||
) # galactic longitude
|
||||
dset["phi"][dset["phi"] < 0] += (
|
||||
2 * np.pi * u.rad
|
||||
) # rescaling to get only positive values
|
||||
dset["velphi"] = vect_phi(
|
||||
dset["position"], dset["velocity"]
|
||||
) # azimuthal velocity
|
||||
dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity
|
||||
dset["velz"] = dset["velocity"][:, 2] # vertical velocity
|
||||
dset["ek"] = (dset["mass"] * np.sum(dset["velocity"] ** 2, axis=1)).to(
|
||||
u.erg
|
||||
) # kinetic energy
|
||||
|
||||
def ring_binning(self, delta_r: Quantity[u.kpc]):
|
||||
"""Add radial bin informations to the dataset
|
||||
|
||||
Parameters
|
||||
----------
|
||||
delta_r : Quantity[u.kpc]
|
||||
spacing between two radial bins
|
||||
"""
|
||||
for fluid in self.fluids:
|
||||
r_bin = np.trunc(self.data[fluid]["r"] / delta_r)
|
||||
r_bin = (r_bin + 0.5) * delta_r # Store the middle value
|
||||
self.data[fluid]["r_bin"] = r_bin
|
||||
|
||||
def sector_binning(self, delta_r: Quantity[u.kpc], delta_l: Quantity[u.kpc]):
|
||||
"""Add sector bin information to the dataset
|
||||
|
||||
Parameters
|
||||
----------
|
||||
delta_r : Quantity[u.kpc]
|
||||
spacing between two radial bins
|
||||
delta_l : Quantity[u.kpc]
|
||||
spacing (in spatial units) between two azimuthal bins
|
||||
"""
|
||||
|
||||
self.ring_binning(delta_r)
|
||||
|
||||
for fluid in self.fluids:
|
||||
delta_phi = (delta_l / self.data[fluid]["r_bin"]) * u.rad
|
||||
phi_bin = (np.trunc(self.data[fluid]["phi"] / delta_phi) + 0.5) * delta_phi
|
||||
self.data[fluid]["phi_bin"] = phi_bin
|
||||
|
||||
def sector_analysis(
|
||||
self,
|
||||
delta_r: Quantity[u.kpc] = u.kpc,
|
||||
delta_l: Quantity[u.kpc] = u.kpc,
|
||||
rmin: Quantity[u.kpc] = 1 * u.kpc,
|
||||
rmax: Quantity[u.kpc] = 12 * u.kpc,
|
||||
):
|
||||
"""Compute the aggration of quantities in sectors bins
|
||||
|
||||
Parameters
|
||||
----------
|
||||
delta_r : Quantity[u.kpc], optional
|
||||
spacing between two radial bins, by default u.kpc
|
||||
delta_l : Quantity[u.kpc], optional
|
||||
spacing (in spatial units) between two azimuthal bins, by default u.kpc
|
||||
rmin : Quantity[u.kpc], optional
|
||||
filter out bin below that radius, by default 1*u.kpc
|
||||
rmax : Quantity[u.kpc], optional
|
||||
filter out bin beyond that radius, by default 12*u.kpc
|
||||
"""
|
||||
|
||||
self.sector_binning(delta_r, delta_l)
|
||||
grouped_data = {}
|
||||
self.sectors = {}
|
||||
|
||||
for fluid in self.fluids:
|
||||
if fluid == "gas":
|
||||
extensive_fields = ["mass", "ek", "volume"]
|
||||
else:
|
||||
extensive_fields = ["mass", "ek"]
|
||||
filtered_data = self.data[fluid][
|
||||
np.logical_and(
|
||||
self.data[fluid]["r"] > rmin, self.data[fluid]["r"] < rmax
|
||||
)
|
||||
]
|
||||
grouped_data[fluid] = filtered_data.group_by(["r_bin", "phi_bin"])
|
||||
self.sectors[fluid] = hstack(
|
||||
[
|
||||
grouped_data[fluid]["r_bin", "phi_bin"].groups.aggregate(np.fmin),
|
||||
aggregate(grouped_data[fluid], extensive_fields=extensive_fields),
|
||||
]
|
||||
)
|
||||
self.sectors[fluid].rename_column("r_bin", "r")
|
||||
self.sectors[fluid].rename_column("phi_bin", "phi")
|
||||
|
||||
self.sectors["stars"]["sfr"] = (
|
||||
np.zeros(len(self.sectors["stars"]["mass"])) * u.Msun / u.year
|
||||
)
|
||||
for i, group in enumerate(grouped_data["stars"].groups):
|
||||
self.sectors["stars"]["sfr"][i] = get_sfr(group, self.time)
|
||||
|
||||
self.sectors["stars"]["sfr"][i] = get_sfr(group, self.time)
|
||||
|
||||
def ring_analysis(
|
||||
self,
|
||||
delta_r: Quantity[u.kpc] = u.kpc,
|
||||
rmin: Quantity[u.kpc] = 1 * u.kpc,
|
||||
rmax: Quantity[u.kpc] = 12 * u.kpc,
|
||||
):
|
||||
"""Compute the aggration of quantities in radial bins
|
||||
|
||||
Parameters
|
||||
----------
|
||||
delta_r : Quantity[u.kpc], optional
|
||||
spacing between two radial bins, by default u.kpc
|
||||
rmin : Quantity[u.kpc], optional
|
||||
filter out bin below that radius, by default 1*u.kpc
|
||||
rmax : Quantity[u.kpc], optional
|
||||
filter out bin beyond that radius, by default 12*u.kpc
|
||||
"""
|
||||
|
||||
self.ring_binning(delta_r)
|
||||
grouped_data = {}
|
||||
self.rings = {}
|
||||
|
||||
for fluid in self.fluids:
|
||||
if fluid == "gas":
|
||||
extensive_fields = ["mass", "ek", "volume"]
|
||||
else:
|
||||
extensive_fields = ["mass", "ek"]
|
||||
filtered_data = self.data[fluid][
|
||||
np.logical_and(
|
||||
self.data[fluid]["r"] > rmin, self.data[fluid]["r"] < rmax
|
||||
)
|
||||
]
|
||||
grouped_data[fluid] = filtered_data.group_by(["r_bin"])
|
||||
self.rings[fluid] = hstack(
|
||||
[
|
||||
grouped_data[fluid][
|
||||
"r_bin",
|
||||
].groups.aggregate(np.fmin),
|
||||
aggregate(grouped_data[fluid], extensive_fields=extensive_fields),
|
||||
]
|
||||
)
|
||||
self.rings[fluid].rename_column("r_bin", "r")
|
||||
|
||||
self.rings["stars"]["sfr"] = (
|
||||
np.zeros(len(self.rings["stars"]["mass"])) * u.Msun / u.year
|
||||
)
|
||||
for i, group in enumerate(grouped_data["stars"].groups):
|
||||
self.rings["stars"]["sfr"][i] = get_sfr(group, self.time)
|
||||
Reference in New Issue
Block a user