Source code for astrobase.lcproc.tfa

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# tfa.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Feb 2019

'''
This contains functions to run the Trend Filtering Algorithm (TFA) in a
parallelized manner on large collections of light curves.

'''

#############
## LOGGING ##
#############

import logging
from astrobase import log_sub, log_fmt, log_date_fmt

DEBUG = False
if DEBUG:
    level = logging.DEBUG
else:
    level = logging.INFO
LOGGER = logging.getLogger(__name__)
logging.basicConfig(
    level=level,
    style=log_sub,
    format=log_fmt,
    datefmt=log_date_fmt,
)

LOGDEBUG = LOGGER.debug
LOGINFO = LOGGER.info
LOGWARNING = LOGGER.warning
LOGERROR = LOGGER.error
LOGEXCEPTION = LOGGER.exception


#############
## IMPORTS ##
#############

import pickle
import os
import os.path
import glob
import multiprocessing as mp
import gzip

from tornado.escape import squeeze

import numpy as np
import numpy.random as npr
npr.seed(0xc0ffee)

import scipy.interpolate as spi
from scipy import linalg as spla

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

# to turn a list of keys into a dict address
# from https://stackoverflow.com/a/14692747
from functools import reduce
from operator import getitem


def _dict_get(datadict, keylist):
    return reduce(getitem, keylist, datadict)


############
## CONFIG ##
############

NCPUS = mp.cpu_count()


###################
## LOCAL IMPORTS ##
###################

from astrobase import coordutils
from astrobase.varclass import starfeatures, varfeatures
from astrobase.lcmath import (
    normalize_magseries,
    sigclip_magseries
)

from astrobase.lcproc import get_lcformat


##################################
## LIGHT CURVE DETRENDING - TFA ##
##################################

def _collect_tfa_stats(task):
    '''
    This is a parallel worker to gather LC stats.

    task[0] = lcfile
    task[1] = lcformat
    task[2] = lcformatdir
    task[3] = timecols
    task[4] = magcols
    task[5] = errcols
    task[6] = custom_bandpasses

    '''

    try:

        (lcfile, lcformat, lcformatdir,
         timecols, magcols, errcols,
         custom_bandpasses) = task

        try:
            formatinfo = get_lcformat(lcformat,
                                      use_lcformat_dir=lcformatdir)
            if formatinfo:
                (dfileglob, readerfunc,
                 dtimecols, dmagcols, derrcols,
                 magsarefluxes, normfunc) = formatinfo
            else:
                LOGERROR("can't figure out the light curve format")
                return None
        except Exception:
            LOGEXCEPTION("can't figure out the light curve format")
            return None

        # override the default timecols, magcols, and errcols
        # using the ones provided to the function
        if timecols is None:
            timecols = dtimecols
        if magcols is None:
            magcols = dmagcols
        if errcols is None:
            errcols = derrcols

        # get the LC into a dict
        lcdict = readerfunc(lcfile)

        # this should handle lists/tuples being returned by readerfunc
        # we assume that the first element is the actual lcdict
        # FIXME: figure out how to not need this assumption
        if ( (isinstance(lcdict, (list, tuple))) and
             (isinstance(lcdict[0], dict)) ):
            lcdict = lcdict[0]

        #
        # collect the necessary stats for this light curve
        #

        # 1. number of observations
        # 2. median mag
        # 3. eta_normal
        # 4. MAD
        # 5. objectid
        # 6. get mags and colors from objectinfo if there's one in lcdict

        if 'objectid' in lcdict:
            objectid = lcdict['objectid']
        elif 'objectinfo' in lcdict and 'objectid' in lcdict['objectinfo']:
            objectid = lcdict['objectinfo']['objectid']
        elif 'objectinfo' in lcdict and 'hatid' in lcdict['objectinfo']:
            objectid = lcdict['objectinfo']['hatid']
        else:
            LOGERROR('no objectid present in lcdict for LC %s, '
                     'using filename prefix as objectid' % lcfile)
            objectid = os.path.splitext(os.path.basename(lcfile))[0]

        if 'objectinfo' in lcdict:

            colorfeat = starfeatures.color_features(
                lcdict['objectinfo'],
                deredden=False,
                custom_bandpasses=custom_bandpasses
            )

        else:
            LOGERROR('no objectinfo dict in lcdict, '
                     'could not get magnitudes for LC %s, '
                     'cannot use for TFA template ensemble' %
                     lcfile)
            return None

        # this is the initial dict
        resultdict = {'objectid':objectid,
                      'ra':lcdict['objectinfo']['ra'],
                      'decl':lcdict['objectinfo']['decl'],
                      'colorfeat':colorfeat,
                      'lcfpath':os.path.abspath(lcfile),
                      'lcformat':lcformat,
                      'lcformatdir':lcformatdir,
                      'timecols':timecols,
                      'magcols':magcols,
                      'errcols':errcols}

        for tcol, mcol, ecol in zip(timecols, magcols, errcols):

            try:

                # dereference the columns and get them from the lcdict
                if '.' in tcol:
                    tcolget = tcol.split('.')
                else:
                    tcolget = [tcol]
                times = _dict_get(lcdict, tcolget)

                if '.' in mcol:
                    mcolget = mcol.split('.')
                else:
                    mcolget = [mcol]
                mags = _dict_get(lcdict, mcolget)

                if '.' in ecol:
                    ecolget = ecol.split('.')
                else:
                    ecolget = [ecol]
                errs = _dict_get(lcdict, ecolget)

                # normalize here if not using special normalization
                if normfunc is None:
                    ntimes, nmags = normalize_magseries(
                        times, mags,
                        magsarefluxes=magsarefluxes
                    )

                    times, mags, errs = ntimes, nmags, errs

                # get the variability features for this object
                varfeat = varfeatures.all_nonperiodic_features(
                    times, mags, errs
                )

                resultdict[mcol] = varfeat

            except Exception:

                LOGEXCEPTION('%s, magcol: %s, probably ran into all-nans' %
                             (lcfile, mcol))
                resultdict[mcol] = {'ndet':0,
                                    'mad':np.nan,
                                    'eta_normal':np.nan}

        return resultdict

    except Exception:

        LOGEXCEPTION('could not execute get_tfa_stats for task: %s' %
                     repr(task))
        return None


def _reform_templatelc_for_tfa(task):
    '''
    This is a parallel worker that reforms light curves for TFA.

    task[0] = lcfile
    task[1] = lcformat
    task[2] = lcformatdir
    task[3] = timecol
    task[4] = magcol
    task[5] = errcol
    task[6] = timebase
    task[7] = interpolate_type
    task[8] = sigclip

    '''

    try:

        (lcfile, lcformat, lcformatdir,
         tcol, mcol, ecol,
         timebase, interpolate_type, sigclip) = task

        try:
            formatinfo = get_lcformat(lcformat,
                                      use_lcformat_dir=lcformatdir)
            if formatinfo:
                (dfileglob, readerfunc,
                 dtimecols, dmagcols, derrcols,
                 magsarefluxes, normfunc) = formatinfo
            else:
                LOGERROR("can't figure out the light curve format")
                return None
        except Exception:
            LOGEXCEPTION("can't figure out the light curve format")
            return None

        # get the LC into a dict
        lcdict = readerfunc(lcfile)

        # this should handle lists/tuples being returned by readerfunc
        # we assume that the first element is the actual lcdict
        # FIXME: figure out how to not need this assumption
        if ( (isinstance(lcdict, (list, tuple))) and
             (isinstance(lcdict[0], dict)) ):
            lcdict = lcdict[0]

        outdict = {}

        # dereference the columns and get them from the lcdict
        if '.' in tcol:
            tcolget = tcol.split('.')
        else:
            tcolget = [tcol]
        times = _dict_get(lcdict, tcolget)

        if '.' in mcol:
            mcolget = mcol.split('.')
        else:
            mcolget = [mcol]
        mags = _dict_get(lcdict, mcolget)

        if '.' in ecol:
            ecolget = ecol.split('.')
        else:
            ecolget = [ecol]
        errs = _dict_get(lcdict, ecolget)

        # normalize here if not using special normalization
        if normfunc is None:
            ntimes, nmags = normalize_magseries(
                times, mags,
                magsarefluxes=magsarefluxes
            )

        times, mags, errs = ntimes, nmags, errs

        #
        # now we'll do: 1. sigclip, 2. reform to timebase, 3. renorm to zero
        #

        # 1. sigclip as requested
        stimes, smags, serrs = sigclip_magseries(times,
                                                 mags,
                                                 errs,
                                                 sigclip=sigclip)

        # 2. now, we'll renorm to the timebase
        mags_interpolator = spi.interp1d(stimes, smags,
                                         kind=interpolate_type,
                                         fill_value='extrapolate')
        errs_interpolator = spi.interp1d(stimes, serrs,
                                         kind=interpolate_type,
                                         fill_value='extrapolate')

        interpolated_mags = mags_interpolator(timebase)
        interpolated_errs = errs_interpolator(timebase)

        # 3. renorm to zero
        magmedian = np.median(interpolated_mags)

        renormed_mags = interpolated_mags - magmedian

        # update the dict
        outdict = {'mags':renormed_mags,
                   'errs':interpolated_errs,
                   'origmags':interpolated_mags}

        #
        # done with this magcol
        #
        return outdict

    except Exception:

        LOGEXCEPTION('reform LC task failed: %s' % repr(task))
        return None


[docs]def tfa_templates_lclist( lclist, outfile, lcinfo_pkl=None, target_template_frac=0.1, max_target_frac_obs=0.25, min_template_number=10, max_template_number=1000, max_rms=0.15, max_mult_above_magmad=1.5, max_mult_above_mageta=1.5, mag_bandpass='sdssr', custom_bandpasses=None, mag_bright_limit=10.0, mag_faint_limit=12.0, process_template_lcs=True, template_sigclip=5.0, template_interpolate='nearest', lcformat='hat-sql', lcformatdir=None, timecols=None, magcols=None, errcols=None, nworkers=NCPUS, maxworkertasks=1000, ): '''This selects template objects for TFA. Selection criteria for TFA template ensemble objects: - not variable: use a poly fit to the mag-MAD relation and eta-normal variability index to get nonvar objects - not more than 10% of the total number of objects in the field or `max_tfa_templates` at most and no more than `max_target_frac_obs x template_ndet` objects. - allow shuffling of the templates if the target ends up in them - nothing with less than the median number of observations in the field - sigma-clip the input time series observations - TODO: select randomly in xi-eta space. This doesn't seem to make a huge difference at the moment, so removed those bits for now. This function makes plots of xi-eta for the selected template objects so the distributions can be visualized. This also determines the effective cadence that all TFA LCs will be binned to as the template LC with the largest number of non-nan observations will be used. All template LCs will be renormed to zero. Parameters ---------- lclist : list of str This is a list of light curves to use as input to generate the template set. outfile : str This is the pickle filename to which the TFA template list will be written to. lcinfo_pkl : str or None If provided, is a file path to a pickle file created by this function on a previous run containing the LC information. This will be loaded directly instead of having to re-run LC info collection. If None, will be placed in the same directory as outfile. target_template_frac : float This is the fraction of total objects in lclist to use for the number of templates. max_target_frac_obs : float This sets the number of templates to generate if the number of observations for the light curves is smaller than the number of objects in the collection. The number of templates will be set to this fraction of the number of observations if this is the case. min_template_number : int This is the minimum number of templates to generate. max_template_number : int This is the maximum number of templates to generate. If `target_template_frac` times the number of objects is greater than `max_template_number`, only `max_template_number` templates will be used. max_rms : float This is the maximum light curve RMS for an object to consider it as a possible template ensemble member. max_mult_above_magmad : float This is the maximum multiplier above the mag-RMS fit to consider an object as variable and thus not part of the template ensemble. max_mult_above_mageta : float This is the maximum multiplier above the mag-eta (variable index) fit to consider an object as variable and thus not part of the template ensemble. mag_bandpass : str This sets the key in the light curve dict's objectinfo dict to use as the canonical magnitude for the object and apply any magnitude limits to. custom_bandpasses : dict or None This can be used to provide any custom band name keys to the star feature collection function. mag_bright_limit : float or list of floats This sets the brightest mag (in the `mag_bandpass` filter) for a potential member of the TFA template ensemble. If this is a single float, the value will be used for all magcols. If this is a list of floats with len = len(magcols), the specific bright limits will be used for each magcol individually. mag_faint_limit : float or list of floats This sets the faintest mag (in the `mag_bandpass` filter) for a potential member of the TFA template ensemble. If this is a single float, the value will be used for all magcols. If this is a list of floats with len = len(magcols), the specific faint limits will be used for each magcol individually. process_template_lcs : bool If True, will reform the template light curves to the chosen time-base. If False, will only select light curves for templates but not process them. This is useful for initial exploration of how the template LC are selected. template_sigclip : float or sequence of floats or None This sets the sigma-clip to be applied to the template light curves. template_interpolate : str This sets the kwarg to pass to `scipy.interpolate.interp1d` to set the kind of interpolation to use when reforming light curves to the TFA template timebase. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. timecols : list of str or None The timecol keys to use from the lcdict in calculating the features. magcols : list of str or None The magcol keys to use from the lcdict in calculating the features. errcols : list of str or None The errcol keys to use from the lcdict in calculating the features. nworkers : int The number of parallel workers to launch. maxworkertasks : int The maximum number of tasks to run per worker before it is replaced by a fresh one. Returns ------- dict This function returns a dict that can be passed directly to `apply_tfa_magseries` below. It can optionally produce a pickle with the same dict, which can also be passed to that function. ''' try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (dfileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return None except Exception: LOGEXCEPTION("can't figure out the light curve format") return None # override the default timecols, magcols, and errcols # using the ones provided to the function if timecols is None: timecols = dtimecols if magcols is None: magcols = dmagcols if errcols is None: errcols = derrcols LOGINFO('collecting light curve information for %s objects in list...' % len(lclist)) # # check if we have cached results for this run # check_lcinfo_path = os.path.join( os.path.dirname(outfile), 'tfa-collected-lcinfo-%s.pkl' % lcformat ) # case where we provide a cache info pkl directly if lcinfo_pkl and os.path.exists(lcinfo_pkl): with open(lcinfo_pkl,'rb') as infd: results = pickle.load(infd) # if we don't provide an lcinfo pkl elif not lcinfo_pkl and os.path.exists(check_lcinfo_path): with open(check_lcinfo_path, 'rb') as infd: results = pickle.load(infd) # otherwise, we have to redo the LC info collection else: # first, we'll collect the light curve info tasks = [(x, lcformat, lcformat, timecols, magcols, errcols, custom_bandpasses) for x in lclist] pool = mp.Pool(nworkers, maxtasksperchild=maxworkertasks) results = pool.map(_collect_tfa_stats, tasks) pool.close() pool.join() # save these results so we don't have to redo if something breaks here if lcinfo_pkl: with open(lcinfo_pkl,'wb') as outfd: pickle.dump(results, outfd, pickle.HIGHEST_PROTOCOL) else: with open(check_lcinfo_path,'wb') as outfd: pickle.dump(results, outfd, pickle.HIGHEST_PROTCOL) # # now, go through the light curve information # # find the center RA and center DEC -> median of all LC RAs and DECs all_ras = np.array([res['ra'] for res in results]) all_decls = np.array([res['decl'] for res in results]) center_ra = np.nanmedian(all_ras) center_decl = np.nanmedian(all_decls) outdict = { 'timecols':[], 'magcols':[], 'errcols':[], 'center_ra':center_ra, 'center_decl':center_decl, } # for each magcol, we'll generate a separate template list for tcol, mcol, ecol in zip(timecols, magcols, errcols): if '.' in tcol: tcolget = tcol.split('.') else: tcolget = [tcol] # these are the containers for possible template collection LC info (lcmag, lcmad, lceta, lcndet, lcobj, lcfpaths, lcra, lcdecl) = [], [], [], [], [], [], [], [] outdict['timecols'].append(tcol) outdict['magcols'].append(mcol) outdict['errcols'].append(ecol) # add to the collection of all light curves outdict[mcol] = {'collection':{'mag':[], 'mad':[], 'eta':[], 'ndet':[], 'obj':[], 'lcf':[], 'ra':[], 'decl':[]}} LOGINFO('magcol: %s, collecting prospective template LC info...' % mcol) # collect the template LCs for this magcol for result in results: # we'll only append objects that have all of these elements try: thismag = result['colorfeat'][mag_bandpass] thismad = result[mcol]['mad'] thiseta = result[mcol]['eta_normal'] thisndet = result[mcol]['ndet'] thisobj = result['objectid'] thislcf = result['lcfpath'] thisra = result['ra'] thisdecl = result['decl'] outdict[mcol]['collection']['mag'].append(thismag) outdict[mcol]['collection']['mad'].append(thismad) outdict[mcol]['collection']['eta'].append(thiseta) outdict[mcol]['collection']['ndet'].append(thisndet) outdict[mcol]['collection']['obj'].append(thisobj) outdict[mcol]['collection']['lcf'].append(thislcf) outdict[mcol]['collection']['ra'].append(thisra) outdict[mcol]['collection']['decl'].append(thisdecl) # check if we have more than one bright or faint limit elem if isinstance(mag_bright_limit, (list, tuple)): use_bright_maglim = mag_bright_limit[ magcols.index(mcol) ] else: use_bright_maglim = mag_bright_limit if isinstance(mag_faint_limit, (list, tuple)): use_faint_maglim = mag_faint_limit[ magcols.index(mcol) ] else: use_faint_maglim = mag_faint_limit # make sure the object lies in the mag limits and RMS limits we # set before to try to accept it into the TFA ensemble if ((use_bright_maglim < thismag < use_faint_maglim) and (1.4826*thismad < max_rms)): lcmag.append(thismag) lcmad.append(thismad) lceta.append(thiseta) lcndet.append(thisndet) lcobj.append(thisobj) lcfpaths.append(thislcf) lcra.append(thisra) lcdecl.append(thisdecl) except Exception: pass # make sure we have enough LCs to work on if len(lcobj) >= min_template_number: LOGINFO('magcol: %s, %s objects eligible for ' 'template selection after filtering on mag ' 'limits (%s, %s) and max RMS (%s)' % (mcol, len(lcobj), mag_bright_limit, mag_faint_limit, max_rms)) lcmag = np.array(lcmag) lcmad = np.array(lcmad) lceta = np.array(lceta) lcndet = np.array(lcndet) lcobj = np.array(lcobj) lcfpaths = np.array(lcfpaths) lcra = np.array(lcra) lcdecl = np.array(lcdecl) sortind = np.argsort(lcmag) lcmag = lcmag[sortind] lcmad = lcmad[sortind] lceta = lceta[sortind] lcndet = lcndet[sortind] lcobj = lcobj[sortind] lcfpaths = lcfpaths[sortind] lcra = lcra[sortind] lcdecl = lcdecl[sortind] # 1. get the mag-MAD relation # this is needed for spline fitting # should take care of the pesky 'x must be strictly increasing' bit splfit_ind = np.diff(lcmag) > 0.0 splfit_ind = np.concatenate((np.array([True]), splfit_ind)) fit_lcmag = lcmag[splfit_ind] fit_lcmad = lcmad[splfit_ind] fit_lceta = lceta[splfit_ind] magmadfit = np.poly1d(np.polyfit( fit_lcmag, fit_lcmad, 2 )) magmadind = lcmad/magmadfit(lcmag) < max_mult_above_magmad # 2. get the mag-eta relation magetafit = np.poly1d(np.polyfit( fit_lcmag, fit_lceta, 2 )) magetaind = magetafit(lcmag)/lceta < max_mult_above_mageta # 3. get the median ndet median_ndet = np.median(lcndet) ndetind = lcndet >= median_ndet # form the final template ensemble templateind = magmadind & magetaind & ndetind # check again if we have enough LCs in the template if templateind.sum() >= min_template_number: LOGINFO('magcol: %s, %s objects selectable for TFA templates' % (mcol, templateind.sum())) templatemag = lcmag[templateind] templatemad = lcmad[templateind] templateeta = lceta[templateind] templatendet = lcndet[templateind] templateobj = lcobj[templateind] templatelcf = lcfpaths[templateind] templatera = lcra[templateind] templatedecl = lcdecl[templateind] # now, check if we have no more than the required fraction of # TFA templates target_number_templates = int(target_template_frac*len(results)) if target_number_templates > max_template_number: target_number_templates = max_template_number LOGINFO('magcol: %s, selecting %s TFA templates randomly' % (mcol, target_number_templates)) # get the xi-eta template_cxi, template_ceta = coordutils.xieta_from_radecl( templatera, templatedecl, center_ra, center_decl ) # select random uniform objects from the template candidates targetind = npr.choice(templateobj.size, target_number_templates, replace=False) templatemag = templatemag[targetind] templatemad = templatemad[targetind] templateeta = templateeta[targetind] templatendet = templatendet[targetind] templateobj = templateobj[targetind] templatelcf = templatelcf[targetind] templatera = templatera[targetind] templatedecl = templatedecl[targetind] template_cxi = template_cxi[targetind] template_ceta = template_ceta[targetind] # get the max ndet so far to use that LC as the timebase maxndetind = templatendet == templatendet.max() timebaselcf = templatelcf[maxndetind][0] timebasendet = templatendet[maxndetind][0] LOGINFO('magcol: %s, selected %s as template time ' 'base LC with %s observations' % (mcol, timebaselcf, timebasendet)) if process_template_lcs: timebaselcdict = readerfunc(timebaselcf) if ( (isinstance(timebaselcdict, (list, tuple))) and (isinstance(timebaselcdict[0], dict)) ): timebaselcdict = timebaselcdict[0] # this is the timebase to use for all of the templates timebase = _dict_get(timebaselcdict, tcolget) else: timebase = None # also check if the number of templates is longer than the # actual timebase of the observations. this will cause issues # with overcorrections and will probably break TFA if target_number_templates > timebasendet: LOGWARNING('The number of TFA templates (%s) is ' 'larger than the number of observations ' 'of the time base (%s). This will likely ' 'overcorrect all light curves to a ' 'constant level. ' 'Will use up to %s x timebase ndet ' 'templates instead' % (target_number_templates, timebasendet, max_target_frac_obs)) # regen the templates based on the new number newmaxtemplates = int(max_target_frac_obs*timebasendet) # choose this number out of the already chosen templates # randomly LOGWARNING('magcol: %s, re-selecting %s TFA ' 'templates randomly' % (mcol, newmaxtemplates)) # select random uniform objects from the template candidates targetind = npr.choice(templateobj.size, newmaxtemplates, replace=False) templatemag = templatemag[targetind] templatemad = templatemad[targetind] templateeta = templateeta[targetind] templatendet = templatendet[targetind] templateobj = templateobj[targetind] templatelcf = templatelcf[targetind] templatera = templatera[targetind] templatedecl = templatedecl[targetind] template_cxi = template_cxi[targetind] template_ceta = template_ceta[targetind] plt.plot(template_cxi, template_ceta, marker='o', linestyle='none', ms=1.0) plt.xlabel('image plane-projected coordinate xi') plt.ylabel('image plane-projected coordinate eta') plt.title( 'image plane-projected coords - selected template objs' ) plt.savefig( os.path.join(os.path.dirname(outfile), 'template-cxi-ceta-%s.png' % mcol), bbox_inches='tight' ) plt.close('all') # get the max ndet so far to use that LC as the timebase maxndetind = templatendet == templatendet.max() timebaselcf = templatelcf[maxndetind][0] timebasendet = templatendet[maxndetind][0] LOGWARNING('magcol: %s, re-selected %s as template time ' 'base LC with %s observations' % (mcol, timebaselcf, timebasendet)) if process_template_lcs: timebaselcdict = readerfunc(timebaselcf) if ( (isinstance(timebaselcdict, (list, tuple))) and (isinstance(timebaselcdict[0], dict)) ): timebaselcdict = timebaselcdict[0] # this is the timebase to use for all of the templates timebase = _dict_get(timebaselcdict, tcolget) else: timebase = None # # end of check for ntemplates > timebase ndet # if process_template_lcs: LOGINFO('magcol: %s, reforming TFA template LCs to ' ' chosen timebase...' % mcol) # reform all template LCs to this time base, normalize to # zero, and sigclip as requested. this is a parallel op # first, we'll collect the light curve info tasks = [(x, lcformat, lcformatdir, tcol, mcol, ecol, timebase, template_interpolate, template_sigclip) for x in templatelcf] pool = mp.Pool(nworkers, maxtasksperchild=maxworkertasks) reform_results = pool.map(_reform_templatelc_for_tfa, tasks) pool.close() pool.join() # generate a 2D array for the template magseries with # dimensions = (n_objects, n_lcpoints) template_magseries = np.array([x['mags'] for x in reform_results]) template_errseries = np.array([x['errs'] for x in reform_results]) else: template_magseries = None template_errseries = None # put everything into a templateinfo dict for this magcol outdict[mcol].update({ 'timebaselcf':timebaselcf, 'timebase':timebase, 'trendfits':{'mag-mad':magmadfit, 'mag-eta':magetafit}, 'template_objects':templateobj, 'template_ra':templatera, 'template_decl':templatedecl, 'template_cxi':template_cxi, 'template_ceta':template_ceta, 'template_mag':templatemag, 'template_mad':templatemad, 'template_eta':templateeta, 'template_ndet':templatendet, 'template_magseries':template_magseries, 'template_errseries':template_errseries }) # make a KDTree on the template coordinates outdict[mcol]['template_radecl_kdtree'] = ( coordutils.make_kdtree( templatera, templatedecl ) ) # if we don't have enough, return nothing for this magcol else: LOGERROR('not enough objects meeting requested ' 'MAD, eta, ndet conditions to ' 'select templates for magcol: %s' % mcol) continue else: LOGERROR('nobjects: %s, not enough in requested mag range to ' 'select templates for magcol: %s' % (len(lcobj),mcol)) continue # make the plots for mag-MAD/mag-eta relation and fits used plt.plot(lcmag, lcmad, marker='o', linestyle='none', ms=1.0) modelmags = np.linspace(lcmag.min(), lcmag.max(), num=1000) plt.plot(modelmags, outdict[mcol]['trendfits']['mag-mad'](modelmags)) plt.yscale('log') plt.xlabel('catalog magnitude') plt.ylabel('light curve MAD') plt.title('catalog mag vs. light curve MAD and fit') plt.savefig( os.path.join(os.path.dirname(outfile), 'catmag-%s-lcmad-fit.png' % mcol), bbox_inches='tight' ) plt.close('all') plt.plot(lcmag, lceta, marker='o', linestyle='none', ms=1.0) modelmags = np.linspace(lcmag.min(), lcmag.max(), num=1000) plt.plot(modelmags, outdict[mcol]['trendfits']['mag-eta'](modelmags)) plt.yscale('log') plt.xlabel('catalog magnitude') plt.ylabel('light curve eta variable index') plt.title('catalog mag vs. light curve eta and fit') plt.savefig( os.path.join(os.path.dirname(outfile), 'catmag-%s-lceta-fit.png' % mcol), bbox_inches='tight' ) plt.close('all') # # end of operating on each magcol # if outfile.endswith('.gz'): outfd = gzip.open(outfile,'wb') else: outfd = open(outfile,'wb') with outfd: pickle.dump(outdict, outfd, protocol=pickle.HIGHEST_PROTOCOL) # return the templateinfo dict return outdict
[docs]def apply_tfa_magseries(lcfile, timecol, magcol, errcol, templateinfo, mintemplatedist_arcmin=10.0, lcformat='hat-sql', lcformatdir=None, interp='nearest', sigclip=5.0): '''This applies the TFA correction to an LC given TFA template information. Parameters ---------- lcfile : str This is the light curve file to apply the TFA correction to. timecol,magcol,errcol : str These are the column keys in the lcdict for the LC file to apply the TFA correction to. templateinfo : dict or str This is either the dict produced by `tfa_templates_lclist` or the pickle produced by the same function. mintemplatedist_arcmin : float This sets the minimum distance required from the target object for objects in the TFA template ensemble. Objects closer than this distance will be removed from the ensemble. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. interp : str This is passed to scipy.interpolate.interp1d as the kind of interpolation to use when reforming this light curve to the timebase of the TFA templates. sigclip : float or sequence of two floats or None This is the sigma clip to apply to this light curve before running TFA on it. Returns ------- str This returns the filename of the light curve file generated after TFA applications. This is a pickle (that can be read by `lcproc.read_pklc`) in the same directory as `lcfile`. The `magcol` will be encoded in the filename, so each `magcol` in `lcfile` gets its own output file. ''' try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (dfileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return None except Exception: LOGEXCEPTION("can't figure out the light curve format") return None # get the templateinfo from a pickle if necessary if isinstance(templateinfo,str) and os.path.exists(templateinfo): with open(templateinfo,'rb') as infd: templateinfo = pickle.load(infd) lcdict = readerfunc(lcfile) if ((isinstance(lcdict, (tuple, list))) and isinstance(lcdict[0], dict)): lcdict = lcdict[0] objectid = lcdict['objectid'] # this is the initial template array tmagseries = templateinfo[magcol][ 'template_magseries' ][::] # if the object itself is in the template ensemble, remove it if objectid in templateinfo[magcol]['template_objects']: LOGWARNING('object %s found in the TFA template ensemble, removing...' % objectid) templateind = templateinfo[magcol]['template_objects'] == objectid # get the objects in the tmagseries not corresponding to the current # object's index tmagseries = tmagseries[~templateind,:] # check if there are close matches to the current object in the templates object_matches = coordutils.conesearch_kdtree( templateinfo[magcol]['template_radecl_kdtree'], lcdict['objectinfo']['ra'], lcdict['objectinfo']['decl'], mintemplatedist_arcmin/60.0 ) if len(object_matches) > 0: LOGWARNING( "object %s is within %.1f arcminutes of %s " "template objects. Will remove these objects " "from the template applied to this object." % (objectid, mintemplatedist_arcmin, len(object_matches)) ) removalind = np.full( templateinfo[magcol]['template_objects'].size, False, dtype=np.bool ) removalind[np.array(object_matches)] = True tmagseries = tmagseries[~removalind,:] # # finally, proceed to TFA # # this is the normal matrix normal_matrix = np.dot(tmagseries, tmagseries.T) # get the inverse of the matrix normal_matrix_inverse = spla.pinv2(normal_matrix) # get the timebase from the template timebase = templateinfo[magcol]['timebase'] # use this to reform the target lc in the same manner as that for a TFA # template LC reformed_targetlc = _reform_templatelc_for_tfa(( lcfile, lcformat, lcformatdir, timecol, magcol, errcol, timebase, interp, sigclip )) # calculate the scalar products of the target and template magseries scalar_products = np.dot(tmagseries, reformed_targetlc['mags']) # calculate the corrections corrections = np.dot(normal_matrix_inverse, scalar_products) # finally, get the corrected time series for the target object corrected_magseries = ( reformed_targetlc['origmags'] - np.dot(tmagseries.T, corrections) ) outdict = { 'times':timebase, 'mags':corrected_magseries, 'errs':reformed_targetlc['errs'], 'mags_median':np.median(corrected_magseries), 'mags_mad': np.median(np.abs(corrected_magseries - np.median(corrected_magseries))), 'work':{'tmagseries':tmagseries, 'normal_matrix':normal_matrix, 'normal_matrix_inverse':normal_matrix_inverse, 'scalar_products':scalar_products, 'corrections':corrections, 'reformed_targetlc':reformed_targetlc}, } # we'll write back the tfa times and mags to the lcdict lcdict['tfa'] = outdict outfile = os.path.join( os.path.dirname(lcfile), '%s-tfa-%s-pklc.pkl' % ( squeeze(objectid).replace(' ','-'), magcol ) ) with open(outfile,'wb') as outfd: pickle.dump(lcdict, outfd, pickle.HIGHEST_PROTOCOL) return outfile
def _parallel_tfa_worker(task): ''' This is a parallel worker for the function below. task[0] = lcfile task[1] = timecol task[2] = magcol task[3] = errcol task[4] = templateinfo task[5] = lcformat task[6] = lcformatdir task[6] = interp task[7] = sigclip task[8] = mintemplatedist_arcmin ''' (lcfile, timecol, magcol, errcol, templateinfo, lcformat, lcformatdir, interp, sigclip, mintemplatedist_arcmin) = task try: res = apply_tfa_magseries( lcfile, timecol, magcol, errcol, templateinfo, lcformat=lcformat, lcformatdir=lcformatdir, interp=interp, sigclip=sigclip, mintemplatedist_arcmin=mintemplatedist_arcmin ) if res: LOGINFO('%s -> %s TFA OK' % (lcfile, res)) return res except Exception: LOGEXCEPTION('TFA failed for %s' % lcfile) return None
[docs]def parallel_tfa_lclist(lclist, templateinfo, timecols=None, magcols=None, errcols=None, lcformat='hat-sql', lcformatdir=None, interp='nearest', sigclip=5.0, mintemplatedist_arcmin=10.0, nworkers=NCPUS, maxworkertasks=1000): '''This applies TFA in parallel to all LCs in the given list of file names. Parameters ---------- lclist : str This is a list of light curve files to apply TFA correction to. templateinfo : dict or str This is either the dict produced by `tfa_templates_lclist` or the pickle produced by the same function. timecols : list of str or None The timecol keys to use from the lcdict in applying TFA corrections. magcols : list of str or None The magcol keys to use from the lcdict in applying TFA corrections. errcols : list of str or None The errcol keys to use from the lcdict in applying TFA corrections. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. interp : str This is passed to scipy.interpolate.interp1d as the kind of interpolation to use when reforming the light curves to the timebase of the TFA templates. sigclip : float or sequence of two floats or None This is the sigma clip to apply to the light curves before running TFA on it. mintemplatedist_arcmin : float This sets the minimum distance required from the target object for objects in the TFA template ensemble. Objects closer than this distance will be removed from the ensemble. nworkers : int The number of parallel workers to launch maxworkertasks : int The maximum number of tasks per worker allowed before it's replaced by a fresh one. Returns ------- dict Contains the input file names and output TFA light curve filenames per input file organized by each `magcol` in `magcols`. ''' # open the templateinfo first if isinstance(templateinfo,str) and os.path.exists(templateinfo): with open(templateinfo,'rb') as infd: templateinfo = pickle.load(infd) try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (dfileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return None except Exception: LOGEXCEPTION("can't figure out the light curve format") return None # override the default timecols, magcols, and errcols # using the ones provided to the function # we'll get the defaults from the templateinfo object if timecols is None: timecols = templateinfo['timecols'] if magcols is None: magcols = templateinfo['magcols'] if errcols is None: errcols = templateinfo['errcols'] outdict = {} # run by magcol for t, m, e in zip(timecols, magcols, errcols): tasks = [(x, t, m, e, templateinfo, lcformat, lcformatdir, interp, sigclip, mintemplatedist_arcmin) for x in lclist] pool = mp.Pool(nworkers, maxtasksperchild=maxworkertasks) results = pool.map(_parallel_tfa_worker, tasks) pool.close() pool.join() outdict[m] = results return outdict
[docs]def parallel_tfa_lcdir(lcdir, templateinfo, lcfileglob=None, timecols=None, magcols=None, errcols=None, lcformat='hat-sql', lcformatdir=None, interp='nearest', sigclip=5.0, mintemplatedist_arcmin=10.0, nworkers=NCPUS, maxworkertasks=1000): '''This applies TFA in parallel to all LCs in a directory. Parameters ---------- lcdir : str This is the directory containing the light curve files to process.. templateinfo : dict or str This is either the dict produced by `tfa_templates_lclist` or the pickle produced by the same function. lcfileglob : str or None The UNIX file glob to use when searching for light curve files in `lcdir`. If None, the default file glob associated with registered LC format provided is used. timecols : list of str or None The timecol keys to use from the lcdict in applying TFA corrections. magcols : list of str or None The magcol keys to use from the lcdict in applying TFA corrections. errcols : list of str or None The errcol keys to use from the lcdict in applying TFA corrections. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. interp : str This is passed to scipy.interpolate.interp1d as the kind of interpolation to use when reforming the light curves to the timebase of the TFA templates. sigclip : float or sequence of two floats or None This is the sigma clip to apply to the light curves before running TFA on it. mintemplatedist_arcmin : float This sets the minimum distance required from the target object for objects in the TFA template ensemble. Objects closer than this distance will be removed from the ensemble. nworkers : int The number of parallel workers to launch maxworkertasks : int The maximum number of tasks per worker allowed before it's replaced by a fresh one. Returns ------- dict Contains the input file names and output TFA light curve filenames per input file organized by each `magcol` in `magcols`. ''' # open the templateinfo first if isinstance(templateinfo,str) and os.path.exists(templateinfo): with open(templateinfo,'rb') as infd: templateinfo = pickle.load(infd) try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (dfileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return None except Exception: LOGEXCEPTION("can't figure out the light curve format") return None # find all the files matching the lcglob in lcdir if lcfileglob is None: lcfileglob = dfileglob lclist = sorted(glob.glob(os.path.join(lcdir, lcfileglob))) return parallel_tfa_lclist( lclist, templateinfo, timecols=timecols, magcols=magcols, errcols=errcols, lcformat=lcformat, lcformatdir=None, interp=interp, sigclip=sigclip, mintemplatedist_arcmin=mintemplatedist_arcmin, nworkers=nworkers, maxworkertasks=maxworkertasks )