Source code for galaxy.galaxy

# standard Python imports
from pathlib import Path

# scientific package imports
import numpy as np
from numpy.linalg import norm

import astropy.units as u
from astropy.table import QTable

import pandas as pd

from galaxy.utilities import is_iterable

[docs]class Galaxy(): """ A class to find, read and manipulate files for a single galaxy. Args: name (str): short name used in filename of type 'name_000.txt', eg 'MW', 'M31'. snap (int): Snap number, equivalent to time elapsed. Zero is starting conditions. datadir (str): Directory to search first for the required file. Optional, and a default list of locations will be searched. usesql (bool): If True, data will be taken from a PostgreSQL database instead of text files. ptype (int or list): Optional. Restrict data to this particle type, for speed. Only valid with usesql=True. stride (int): Optional. For stride=n, get every nth row in the table. Only valid with usesql=True. Class attributes: filepath (`pathlib.Path` object): directory containing the data file filename (str): in `name_snap.txt` format, something like 'MW_000.txt' data (np.ndarray): type, mass, position_xyz, velocity_xyz for each particle """ def __init__(self, name, snap=0, datadir=None, usesql=False, ptype=None, stride=1): "Initial setup. Currently it calls read_file(), but this may change." self.name = name self.snap = snap self.ptype = ptype if usesql: self.read_db(ptype, stride) else: # We can probably make some assumptions about the filename: self.filename = f"{name}_{snap:03}.txt" self.filepath = self.get_filepath(datadir) self.read_file()
[docs] def read_db(self, ptype, stride): """ Get relevant data from a PostgreSQL database and format it to be identical to that read from test files. Args: ptype (int): Optional. Restrict data to this particle type. stride (int): Optional. For stride=n, get every nth row in the table. Changes: `self.time`, `self.particle_count` and `self.data` are set. Returns: nothing """ from galaxy.db import DB db = DB() cur = db.get_cursor() # set the elapsed time sql_t = f"SELECT time FROM simdata WHERE galname='{self.name}'" sql_t += f" and snap={self.snap} LIMIT 1" cur.execute(sql_t) time = cur.fetchone() try: self.time = time[0] * u.Myr except TypeError: print(time) print(self.name, self.snap, ptype) # set the bulk of the data colheads = ','.join(['type','m','x','y','z','vx','vy','vz']) if stride > 1: sql_d = f"SELECT {colheads}, ROW_NUMBER() OVER () as rn from simdata" else: sql_d = f"SELECT {colheads} from simdata" sql_d += f" where galname='{self.name}' and snap={self.snap}" if ptype: # sql_d += f" and type={ptype} ORDER BY pnum" if is_iterable(ptype): sql_d += f" and type in {ptype}" else: sql_d += f" and type={ptype}" sql_d += " ORDER BY pnum" if stride > 1: sql_d = f"SELECT {colheads} from ( {sql_d} ) as t where rn % {stride} = 0" dtype=[('type', 'uint8'), ('m', '<f4'), ('x', '<f4'), ('y', '<f4'), ('z', '<f4'), ('vx', '<f4'), ('vy', '<f4'), ('vz', '<f4')] cur.execute(sql_d) self.data = np.array(cur.fetchall(), dtype=dtype) self.particle_count = len(self.data)
[docs] def get_filepath(self, datadir): """ Args: datadir (str): path to search first for the required file Returns: `pathlib.Path` object. A directory containing the file. Raises: FileNotFoundError Pretty boring housekeeping code, but may make things more resilient. """ # helper function, returns (file found at this location?) True/False def has_file(dirname): if dirname is not None and \ Path(dirname).exists() and \ Path(dirname).is_dir(): if (Path(dirname) / self.filename).exists(): return True return False if has_file(datadir): return Path(datadir) # whoever called this knew the layout # Now we aren't sure where the file is so may need to search for it # it may be different on different systems, e.g. nimoy vs laptop # try the search path: # [datadir, '.', '..', '../..', '~', '~/galaxydata'] pwd = Path.cwd() home = Path.home() searchpath = [pwd, ] searchpath += [pwd.parents[n] for n in range(4)] searchpath += [home, home / 'galaxydata'] for p in searchpath: if has_file(p): return p # happy result, we got a valid path # Raise an error if not found pathstrings = f'{datadir}, ' if datadir is not None else '' pathstrings += ', '.join([sp.as_posix() for sp in searchpath]) msg = f'{self.filename}: Not found in {pathstrings}' raise FileNotFoundError(msg)
[docs] def read_file(self): """ Read in a datafile in np.ndarray format, store in `self.data`. Requires: `self.path` and `self.filename` are already set. Changes: `self.time`, `self.particle_count` and `self.data` are set. Returns: nothing """ fullname = self.filepath / self.filename # get header data with open(fullname) as file: _, value = file.readline().split() self.time = float(value) * u.Myr # corrected previous unit error label, value = file.readline().split() self.particle_count = int(value) # get the big array of values self.data = np.genfromtxt( fullname, dtype=None, names=True, skip_header=3)
# -------------------------------------------------------------------- # A couple of small utility functions to interconvert particle type # --------------------------------------------------------------------
[docs] def type2name(self, particle_type): """ Args: particle_type (int): valid values are 1, 2, or 3 Returns: typename (str): 'DM', 'disk' or 'bulge' """ typenames = {1: 'DM', 2: 'disk', 3: 'bulge'} return typenames[particle_type]
[docs] def name2type(self, typename): """ Args: typename (str): valid values are 'DM', 'disk' or 'bulge' Returns: particle_type (int): 1, 2, or 3 as used in data files """ types = {'DM': 1, 'disk': 2, 'bulge': 3} return types[typename]
# -------------------------------------------------------------------- # Subset the data by type # --------------------------------------------------------------------
[docs] def filter_by_type(self, particle_type, dataset=None): """ Subsets the data to a single particle type. Args: particle_type (int): for particles, 1=DM, 2=disk, 3=bulge dataset (array including a type column): defaults to self.data Kwargs: dataset (np.ndarray): optionally, a starting dataset other than self.data Returns: np.ndarray: subset data """ if dataset is None: dataset = self.data index = np.where(dataset['type'] == particle_type) return dataset[index]
# -------------------------------------------------------------------- # Methods to calculate properties of an existing dataset # --------------------------------------------------------------------
[docs] def single_particle_properties(self, particle_type=None, particle_num=0): """ Calculates distance from the origin and magnitude of the velocity. Kwargs: particle_type (int): a subset of the data filtered by 1=DM, 2=disk, 3=bulge particle_num (int): zero-based index to an array of particles returns: 3-tuple of Euclidean distance from origin (kpc), Euclidean velocity magnitude (km/s), particle mass (M_sun) """ # The next bit will throw IndexError if particle_num invalid # Be ready to catch this if particle_type is None: # all types accepted particle = self.data[particle_num] else: particle = self.filter_by_type(particle_type)[particle_num] # mass: m = particle['m'] * 1e10 * u.Msun # Euclidean distance from origin: pos_xyz = np.array([particle['x'], particle['y'], particle['z']]) pos_mag = norm(pos_xyz) * u.kpc # velocity: v_xyz = np.array([particle['vx'], particle['vy'], particle['vz']]) v_mag = norm(v_xyz) * u.km / u.s return np.around(pos_mag, 3), np.around(v_mag, 3), m
[docs] def all_particle_properties(self, particle_type=None, as_table=True): """ Calculates distances from the origin and magnitude of the velocities for all particles (default) or a specied particle type. Kwargs: particle_type (int): A subset of the data filtered by 1=DM, 2=disk, 3=bulge as_table (boolean): Return type. If True, an astropy QTable with units. If False, np.ndarrays for position and velocity Returns: QTable: The full list, optionally with units, optionally filtered by type. """ if particle_type is None: # all types accepted dataset = self.data else: dataset = self.filter_by_type(particle_type) # mass: m = dataset['m'] * 1e10 * u.Msun # Pythagorean distance from origin: pos_xyz = np.array([dataset['x'], dataset['y'], dataset['z']]) pos_mag = norm(pos_xyz, axis=0) * u.kpc # velocity: v_xyz = np.array([dataset['vx'], dataset['vy'], dataset['vz']]) v_mag = norm(v_xyz, axis=0) * u.km / u.s if as_table: # construct and return a QTable with units t = QTable() t['type'] = dataset['type'] t['m'] = np.around(m) t['pos'] = np.around(pos_mag, 3) t['v'] = np.around(v_mag, 3) return t else: return dataset['type'], dataset['m'], pos_mag, v_mag
[docs] def component_count(self, particle_type=None): """ Kwargs: particle_type (int): a subset of the data filtered by 1=DM, 2=disk, 3=bulge Returns: Quantity: The number of particles in the galaxy of this type """ if particle_type is None: # all types accepted dataset = self.data else: dataset = self.filter_by_type(particle_type) return len(dataset['m'])
[docs] def all_component_counts(self): """ Returns: list: The number of particles of each type in the galaxy Ordered as [halo, disk, bulge] """ return [self.component_count(ptype) for ptype in (1, 2, 3)]
[docs] def component_mass(self, particle_type=None): """ Kwargs: particle_type (int): a subset of the data filtered by 1=DM, 2=disk, 3=bulge Returns: Quantity: The aggregate mass of all particles in the galaxy of this type """ if particle_type is None: # all types accepted dataset = self.data else: dataset = self.filter_by_type(particle_type) return np.sum(dataset['m']) * 1e10 * u.Msun
[docs] def all_component_masses(self): """ Returns: list: The aggregate masses of particles of each type in the galaxy """ return [self.component_mass(ptype) for ptype in (1, 2, 3)]
# -------------------------------------------------------------------- # Define some getters which may turn out to be useful, perhaps # --------------------------------------------------------------------
[docs] def get_array(self): """ Returns: all particle data in `np.ndarray` format Pretty superfluous in Python (which has no private class members) """ return self.data
[docs] def get_df(self): """ Returns: data as pandas dataframe """ return pd.DataFrame(self.data)
[docs] def get_qtable(self): """ Returns: data as astropy QTable, with units """ t = QTable(self.data) # add appropriate units t['m'] = np.around(t['m'] * 1e10) * u.Msun for col in ('x', 'y', 'z'): t[col] *= u.kpc for col in ('vx', 'vy', 'vz'): t[col] *= (u.km / u.s) return t
[docs] def xyz(self): """ Returns position as a (3,N) array """ return np.array([self.data[xi] for xi in ('x','y','z')])
[docs] def vxyz(self): """ Returns velocity as a (3,N) array """ return np.array([self.data[vxi] for vxi in ('vx','vy','vz')])
[docs] def m(self): """ Conventience method to return array of masses """ return self.data['m']