Source code for galaxy.remnant

# standard Python imports
from pathlib import Path

# scientific package imports
import numpy as np
from numpy.linalg import norm, eigh
from scipy.optimize import curve_fit
import astropy.units as u

from galaxy.galaxy import Galaxy
from galaxy.utilities import is_iterable


[docs]class Remnant(Galaxy): """ A class to work with the MW-M31 post-merger remant. Args: snap (int): Snap number, equivalent to time elapsed. Defaults to the last timepoint. datadir (str): Directory to search first for the required file. Optional, and a default list of locations will be searched. usesql (bool): If True, data will be taken from a PostgreSQL database instead of text files. stride (int): Optional. For stride=n, get every nth row in the table. Only valid with usesql=True. ptype (int or iterable of int): Particle type: 1, 2, 3 or a combination of these Class attributes: data (np.ndarray): type, mass, position_xyz, velocity_xyz for each particle """ def __init__(self, snap=801, datadir=None, usesql=False, stride=1, ptype=(2,3)): "Initial setup. Currently it calls read_file(), but this may change." self.snap = snap self.ptype = ptype if usesql: self.read_db(stride) else: raise NotImplementedError
[docs] def read_db(self, stride): """ Get relevant data from a PostgreSQL database and format it to be identical to that read from test files. Ex-disk and ex-bulge particles are included, not DM particles. Args: stride (int): Optional. For stride=n, get every nth row in the table. Changes: `self.time`, `self.particle_count` and `self.data` are set. Returns: nothing """ from galaxy.db import DB db = DB() cur = db.get_cursor() # set the elapsed time sql_t = f"SELECT time FROM simdata WHERE galname in ('MW', 'M31')" sql_t += f" and snap={self.snap} LIMIT 1" cur.execute(sql_t) time = cur.fetchone() try: self.time = time[0] * u.Myr except TypeError: print(self.name, self.snap, ptype) # set the bulk of the data colheads = ','.join(['galname','type','m','x','y','z','vx','vy','vz']) if stride > 1: sql_d = f"SELECT {colheads}, ROW_NUMBER() OVER () as rn from simdata" else: sql_d = f"SELECT {colheads} from simdata" sql_d += f" where galname in ('MW', 'M31') and snap={self.snap}" if is_iterable(self.ptype): sql_d += f" and type in {self.ptype}" else: sql_d += f" and type={self.ptype}" sql_d += " ORDER BY galname, pnum" if stride > 1: sql_d = f"SELECT {colheads} from ( {sql_d} ) as t where rn % {stride} = 0" dtype=[('galname', 'U3'), ('type', 'uint8'), ('m', '<f4'), ('x', '<f4'), ('y', '<f4'), ('z', '<f4'), ('vx', '<f4'), ('vy', '<f4'), ('vz', '<f4')] cur.execute(sql_d) self.data = np.array(cur.fetchall(), dtype=dtype) self.particle_count = len(self.data)
[docs] def xyz(self): """ Convenience method to get positions as a np.array of shape (3,N) """ return np.array([self.data[xi] for xi in ('x','y','z')])
[docs] def vxyz(self): """ Convenience method to get velocities as a np.array of shape (3,N) """ return np.array([self.data[vxi] for vxi in ('vx','vy','vz')])
# # Methods to extimate ellipsoid principal axes #
[docs] def I_tensor(self, m, x, y, z): """ Args: m, x, y, z: 1-D arrays with mass and coordinates (no units) Returns: 3x3 array representing the moment of inertia tensor """ # 3 moments of inertia for the diagonal Ixx = np.sum(m*(y**2 + z**2)) Iyy = np.sum(m*(z**2 + x**2)) Izz = np.sum(m*(x**2 + y**2)) # 3 products of inertia for symmetric off-diagonals Ixy = Iyx = np.sum(m*x*y) Ixz = Izx = np.sum(m*x*z) Iyz = Izy = np.sum(m*y*z) # assemble the tensor and return it I = np.array([[Ixx, Ixy, Ixz], [Iyx, Iyy, Iyz], [Izx, Izy, Izz]]) return I
[docs] def ellipsoid_axes(self, m, x, y, z, r_lim=None): """ Args: m, x, y, z: 1-D arrays with mass and coordinates (no units) r_lim : float Radius to include in calculation (implicit kpc, no units) Returns: Two 3-tuples: relative semimajor axes and principal axis vectors """ if r_lim is not None: r = np.sqrt(x**2 + y**2 + z**2) central = np.where(r < r_lim) x = x[central] y = y[central] z = z[central] m = m[central] # get moment-of-inertia tensor and eigenvalues/vectors I = self.I_tensor(m,x,y,z) w, v = eigh(I) # moments of intertia around principal axes: A, B, C = w / np.max(w) # principal axis units vectors are in columns of eigenvalues `v` vA, vB, vC = v.T # solve for semi-major axes of ellipsoid and normalize a = np.sqrt((B-A+C)/2) b = np.sqrt((C-B+A)/2) c = np.sqrt((A-C+B)/2) a, b, c = (a, b, c)/a return (a, b, c), (vA, vB, vC)
# # Methods work with mass profiles, given CoM-centered coordinates #
[docs] def sub_mass_enclosed(self, radii, m, xyz): """ Calculate the mass within a given radius of the origin. Based on code in MassProfile, but this version assumes CoM-centric coordinates are supplied. Args: radii (array of distances): spheres to integrate over m (array of masses): shape (N,) xyz (array of Cartesian coordinates): shape (3,N) Returns: array of masses, in units of M_sun """ # distances from CoM: dist = norm(xyz, axis=0) * u.kpc within_r = lambda r: np.sum(m[dist < r]) masses = np.array([within_r(ri) for ri in radii]) * 1e10 * u.M_sun return masses
[docs] def hernquist_mass(self, r, a, M_halo): """ Calculate the mass enclosed for a theoretical profile Args: r (Quantity, units of kpc): distance from center a (Quantity, units of kpc): scale radius M_halo (Quantity, units of M_sun): total DM mass Returns: Total DM mass enclosed within r (M_sun) """ return np.round(M_halo * r**2 / (a + r)**2, 2)
[docs] def fit_hernquist_a(self, m, xyz, r_inner=1, r_outer=100): """ Get `scipy.optimize` to do a non-linear least squares fit to find the best scale radius `a` for the Hernquist profile. Args: r_inner (numeric): Optional. Minimum radius to consider (implicit kpc). Avoid values < 1 as they cause numeric problems. r_outer (numeric): Optional. Maximum radius to consider (implicit kpc). """ # A function suitable for curve_fit. Units must be removed. def hq(r, a): masses = (self.hernquist_mass(r*u.kpc, a*u.kpc, M_halo)).value return np.log(masses) # The fitting has problems inside 1 kpc, so use a more restricted # range of radii than for the plots radii_outer = np.linspace(r_inner, r_outer) * u.kpc M_halo = np.sum(m) * 1e10 # y values to fit to halo = np.log(self.sub_mass_enclosed(radii_outer, m, xyz).value) # run the fit and store the optimum a value popt, pcov = curve_fit(hq, radii_outer, halo, (60,)) fitted_a = np.round(popt[0], 3)*u.kpc perr = np.round(np.sqrt(np.diag(pcov))[0], 3)*u.kpc return fitted_a, perr
[docs] def sersic(self, R, Re, n, Mtot): """ Function that returns Sersic Profile for an Elliptical System (See in-class lab 6) Input R: radius (kpc) Re: half mass radius (kpc) n: sersic index Mtot: total stellar mass Returns Surface Brightness profile in Lsun/kpc^2 """ # We are assuming M/L = 1, so the luminosity is: L = Mtot # the effective surface brightness is # Ie = L/7.2/pi/Re**2 Ie = L / (7.2 * np.pi * Re**2) return Ie * np.exp(-7.67 * ((R/Re)**(1.0/n) - 1.0))
[docs] def Re(self, R, m, xyz): """ Find the radius enclosing half the mass. Args: R (array of Quantity): Radii to consider (kpc) m (array of float): masses (no units) xyz (array of float with shape (3,N)): coordinates (implicit kpc) Returns: sub_Re (Quantity) : Radius enclosing half light/mass (kpc) sub_total (numeric): Mass of entire system (M_sun, no units) subI (array of Quantity): Surface brightness at radii R (kpc^-2), assuming M/L=1 """ sub_mass = self.sub_mass_enclosed(R, m, xyz).value subI = sub_mass / (4 * np.pi * R**2) sub_total = np.max(sub_mass) Blow = sub_total / 2.0 index = np.where((sub_mass > Blow)) sub_Re = R[index][0] return sub_Re, sub_total, subI
[docs] def fit_sersic_n(self, R, sub_Re, sub_total, subI): """ Get `scipy.optimize` to do a non-linear least squares fit to find the best value of `n` for a Sersic profile. Args: R (array of quantity): Radii at which to calculate fit (kpc) Re (Quantity) : Radius enclosing half light/mass (kpc) bulge_total (numeric): Mass of entire bulge (M_sun, no units) bulgeI (array of Quantity): Surface brightness at radii R (kpc^-2) Returns: best `n` value and error estimate """ # A function suitable for curve_fit. Units must be removed. def ser(r, n): logI = self.sersic(r, sub_Re.value, n, sub_total) return np.log(logI) log_subI = np.log(subI.value) popt, pcov = curve_fit(ser, R, log_subI, (60,)) return popt[0], pcov[0][0]