Source code for astrobase.lcproc.lcbin

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# lcbin.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Feb 2019

'''
This contains parallelized functions to bin large numbers of light curves in
time.

'''

#############
## LOGGING ##
#############

import logging
from astrobase import log_sub, log_fmt, log_date_fmt

DEBUG = False
if DEBUG:
    level = logging.DEBUG
else:
    level = logging.INFO
LOGGER = logging.getLogger(__name__)
logging.basicConfig(
    level=level,
    style=log_sub,
    format=log_fmt,
    datefmt=log_date_fmt,
)

LOGDEBUG = LOGGER.debug
LOGINFO = LOGGER.info
LOGWARNING = LOGGER.warning
LOGERROR = LOGGER.error
LOGEXCEPTION = LOGGER.exception


#############
## IMPORTS ##
#############

import pickle
import os
import os.path
import glob
import multiprocessing as mp

from tornado.escape import squeeze

# to turn a list of keys into a dict address
# from https://stackoverflow.com/a/14692747
from functools import reduce
from operator import getitem


def _dict_get(datadict, keylist):
    return reduce(getitem, keylist, datadict)


############
## CONFIG ##
############

NCPUS = mp.cpu_count()


###################
## LOCAL IMPORTS ##
###################

from astrobase.lcproc import get_lcformat
from astrobase.lcmath import (
    normalize_magseries,
    time_bin_magseries_with_errs,
)


###################################
## FUNCTIONS TO BIN LIGHT CURVES ##
###################################

[docs]def timebinlc(lcfile, binsizesec, outdir=None, lcformat='hat-sql', lcformatdir=None, timecols=None, magcols=None, errcols=None, minbinelems=7): '''This bins the given light curve file in time using the specified bin size. Parameters ---------- lcfile : str The file name to process. binsizesec : float The time bin-size in seconds. outdir : str or None If this is a str, the output LC will be written to `outdir`. If this is None, the output LC will be written to the same directory as `lcfile`. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curve file. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. timecols,magcols,errcols : lists of str The keys in the lcdict produced by your light curve reader function that correspond to the times, mags/fluxes, and associated measurement errors that will be used as inputs to the binning process. If these are None, the default values for `timecols`, `magcols`, and `errcols` for your light curve format will be used here. minbinelems : int The minimum number of time-bin elements required to accept a time-bin as valid for the output binned light curve. Returns ------- str The name of the output pickle file with the binned LC. Writes the output binned light curve to a pickle that contains the lcdict with an added `lcdict['binned'][magcol]` key, which contains the binned times, mags/fluxes, and errs as `lcdict['binned'][magcol]['times']`, `lcdict['binned'][magcol]['mags']`, and `lcdict['epd'][magcol]['errs']` for each `magcol` provided in the input or default `magcols` value for this light curve format. ''' try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (dfileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return None except Exception: LOGEXCEPTION("can't figure out the light curve format") return None # override the default timecols, magcols, and errcols # using the ones provided to the function if timecols is None: timecols = dtimecols if magcols is None: magcols = dmagcols if errcols is None: errcols = derrcols # get the LC into a dict lcdict = readerfunc(lcfile) # this should handle lists/tuples being returned by readerfunc # we assume that the first element is the actual lcdict # FIXME: figure out how to not need this assumption if ( (isinstance(lcdict, (list, tuple))) and (isinstance(lcdict[0], dict)) ): lcdict = lcdict[0] # skip already binned light curves if 'binned' in lcdict: LOGERROR('this light curve appears to be binned already, skipping...') return None lcdict['binned'] = {} for tcol, mcol, ecol in zip(timecols, magcols, errcols): # dereference the columns and get them from the lcdict if '.' in tcol: tcolget = tcol.split('.') else: tcolget = [tcol] times = _dict_get(lcdict, tcolget) if '.' in mcol: mcolget = mcol.split('.') else: mcolget = [mcol] mags = _dict_get(lcdict, mcolget) if '.' in ecol: ecolget = ecol.split('.') else: ecolget = [ecol] errs = _dict_get(lcdict, ecolget) # normalize here if not using special normalization if normfunc is None: ntimes, nmags = normalize_magseries( times, mags, magsarefluxes=magsarefluxes ) times, mags, errs = ntimes, nmags, errs # now bin the mag series as requested binned = time_bin_magseries_with_errs(times, mags, errs, binsize=binsizesec, minbinelems=minbinelems) # put this into the special binned key of the lcdict lcdict['binned'][mcol] = {'times':binned['binnedtimes'], 'mags':binned['binnedmags'], 'errs':binned['binnederrs'], 'nbins':binned['nbins'], 'timebins':binned['jdbins'], 'binsizesec':binsizesec} # done with binning for all magcols, now generate the output file # this will always be a pickle if outdir is None: outdir = os.path.dirname(lcfile) outfile = os.path.join(outdir, '%s-binned%.1fsec-%s.pkl' % (squeeze(lcdict['objectid']).replace(' ','-'), binsizesec, lcformat)) with open(outfile, 'wb') as outfd: pickle.dump(lcdict, outfd, protocol=pickle.HIGHEST_PROTOCOL) return outfile
[docs]def timebinlc_worker(task): ''' This is a parallel worker for the function below. Parameters ---------- task : tuple This is of the form:: task[0] = lcfile task[1] = binsizesec task[3] = {'outdir','lcformat','lcformatdir', 'timecols','magcols','errcols','minbinelems'} Returns ------- str The output pickle file with the binned LC if successful. None otherwise. ''' lcfile, binsizesec, kwargs = task try: binnedlc = timebinlc(lcfile, binsizesec, **kwargs) LOGINFO('%s binned using %s sec -> %s OK' % (lcfile, binsizesec, binnedlc)) return binnedlc except Exception: LOGEXCEPTION('failed to bin %s using binsizesec = %s' % (lcfile, binsizesec)) return None
[docs]def parallel_timebin(lclist, binsizesec, maxobjects=None, outdir=None, lcformat='hat-sql', lcformatdir=None, timecols=None, magcols=None, errcols=None, minbinelems=7, nworkers=NCPUS, maxworkertasks=1000): '''This time-bins all the LCs in the list using the specified bin size. Parameters ---------- lclist : list of str The input LCs to process. binsizesec : float The time bin size to use in seconds. maxobjects : int or None If provided, LC processing will stop at `lclist[maxobjects]`. outdir : str or None The directory where output LCs will be written. If None, will write to the same directory as the input LCs. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curve file. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. timecols,magcols,errcols : lists of str The keys in the lcdict produced by your light curve reader function that correspond to the times, mags/fluxes, and associated measurement errors that will be used as inputs to the binning process. If these are None, the default values for `timecols`, `magcols`, and `errcols` for your light curve format will be used here. minbinelems : int The minimum number of time-bin elements required to accept a time-bin as valid for the output binned light curve. nworkers : int Number of parallel workers to launch. maxworkertasks : int The maximum number of tasks a parallel worker will complete before being replaced to guard against memory leaks. Returns ------- dict The returned dict contains keys = input LCs, vals = output LCs. ''' if outdir and not os.path.exists(outdir): os.mkdir(outdir) if maxobjects is not None: lclist = lclist[:maxobjects] tasks = [(x, binsizesec, {'outdir':outdir, 'lcformat':lcformat, 'lcformatdir':lcformatdir, 'timecols':timecols, 'magcols':magcols, 'errcols':errcols, 'minbinelems':minbinelems}) for x in lclist] pool = mp.Pool(nworkers, maxtasksperchild=maxworkertasks) results = pool.map(timebinlc_worker, tasks) pool.close() pool.join() resdict = {os.path.basename(x):y for (x,y) in zip(lclist, results)} return resdict
[docs]def parallel_timebin_lcdir(lcdir, binsizesec, maxobjects=None, outdir=None, lcformat='hat-sql', lcformatdir=None, timecols=None, magcols=None, errcols=None, minbinelems=7, nworkers=NCPUS, maxworkertasks=1000): ''' This time bins all the light curves in the specified directory. Parameters ---------- lcdir : list of str Directory containing the input LCs to process. binsizesec : float The time bin size to use in seconds. maxobjects : int or None If provided, LC processing will stop at `lclist[maxobjects]`. outdir : str or None The directory where output LCs will be written. If None, will write to the same directory as the input LCs. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curve file. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. timecols,magcols,errcols : lists of str The keys in the lcdict produced by your light curve reader function that correspond to the times, mags/fluxes, and associated measurement errors that will be used as inputs to the binning process. If these are None, the default values for `timecols`, `magcols`, and `errcols` for your light curve format will be used here. minbinelems : int The minimum number of time-bin elements required to accept a time-bin as valid for the output binned light curve. nworkers : int Number of parallel workers to launch. maxworkertasks : int The maximum number of tasks a parallel worker will complete before being replaced to guard against memory leaks. Returns ------- dict The returned dict contains keys = input LCs, vals = output LCs. ''' try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (fileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return None except Exception: LOGEXCEPTION("can't figure out the light curve format") return None lclist = sorted(glob.glob(os.path.join(lcdir, fileglob))) return parallel_timebin(lclist, binsizesec, maxobjects=maxobjects, outdir=outdir, lcformat=lcformat, timecols=timecols, magcols=magcols, errcols=errcols, minbinelems=minbinelems, nworkers=nworkers, maxworkertasks=maxworkertasks)