Source code for astrobase.lcproc.checkplotgen

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# checkplotgen.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Feb 2019

'''
This contains functions to generate checkplot pickles from a collection of light
curves (optionally including period-finding results).

'''

#############
## LOGGING ##
#############

import logging
from astrobase import log_sub, log_fmt, log_date_fmt

DEBUG = False
if DEBUG:
    level = logging.DEBUG
else:
    level = logging.INFO
LOGGER = logging.getLogger(__name__)
logging.basicConfig(
    level=level,
    style=log_sub,
    format=log_fmt,
    datefmt=log_date_fmt,
)

LOGDEBUG = LOGGER.debug
LOGINFO = LOGGER.info
LOGWARNING = LOGGER.warning
LOGERROR = LOGGER.error
LOGEXCEPTION = LOGGER.exception


#############
## IMPORTS ##
#############

import pickle
import sys
import os
import os.path
import glob
import gzip
import uuid
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

from tornado.escape import squeeze

# to turn a list of keys into a dict address
# from https://stackoverflow.com/a/14692747
from functools import reduce
from operator import getitem


def _dict_get(datadict, keylist):
    return reduce(getitem, keylist, datadict)


import numpy as np


############
## CONFIG ##
############

NCPUS = mp.cpu_count()


###################
## LOCAL IMPORTS ##
###################

from astrobase.lcmath import normalize_magseries, sigclip_magseries

from astrobase.checkplot.pkl_io import _write_checkplot_picklefile
from astrobase.checkplot.pkl_utils import (
    _pkl_magseries_plot,
    _pkl_phased_magseries_plot
)
from astrobase.checkplot.pkl import checkplot_dict

from astrobase.lcproc import get_lcformat
from astrobase.lcproc.periodsearch import PFMETHODS


###################################
## CHECKPLOT NEIGHBOR OPERATIONS ##
###################################

# for the neighbors tab in checkplotserver: show a 5 row per neighbor x 3 col
# panel. Each col will have in order: best phased LC of target, phased LC of
# neighbor with same period and epoch, unphased LC of neighbor

[docs]def update_checkplotdict_nbrlcs( checkplotdict, timecol, magcol, errcol, lcformat='hat-sql', lcformatdir=None, verbose=True, ): '''For all neighbors in a checkplotdict, make LCs and phased LCs. Parameters ---------- checkplotdict : dict This is the checkplot to process. The light curves for the neighbors to the object here will be extracted from the stored file paths, and this function will make plots of these time-series. If the object has 'best' periods and epochs generated by period-finder functions in this checkplotdict, phased light curve plots of each neighbor will be made using these to check the effects of blending. timecol,magcol,errcol : str The timecol, magcol, and errcol keys used to generate this object's checkplot. This is used to extract the correct times-series from the neighbors' light curves. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. Returns ------- dict The input checkplotdict is returned with the neighor light curve plots added in. ''' try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (dfileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return checkplotdict except Exception: LOGEXCEPTION("can't figure out the light curve format") return checkplotdict if not ('neighbors' in checkplotdict and checkplotdict['neighbors'] and len(checkplotdict['neighbors']) > 0): LOGERROR('no neighbors for %s, not updating...' % (checkplotdict['objectid'])) return checkplotdict # get our object's magkeys to compare to the neighbor objmagkeys = {} # handle diff generations of checkplots if 'available_bands' in checkplotdict['objectinfo']: mclist = checkplotdict['objectinfo']['available_bands'] else: mclist = ('bmag','vmag','rmag','imag','jmag','hmag','kmag', 'sdssu','sdssg','sdssr','sdssi','sdssz') for mc in mclist: if (mc in checkplotdict['objectinfo'] and checkplotdict['objectinfo'][mc] is not None and np.isfinite(checkplotdict['objectinfo'][mc])): objmagkeys[mc] = checkplotdict['objectinfo'][mc] # if there are actually neighbors, go through them in order for nbr in checkplotdict['neighbors']: objectid, lcfpath = (nbr['objectid'], nbr['lcfpath']) # get the light curve if not os.path.exists(lcfpath): LOGERROR('objectid: %s, neighbor: %s, ' 'lightcurve: %s not found, skipping...' % (checkplotdict['objectid'], objectid, lcfpath)) continue lcdict = readerfunc(lcfpath) # this should handle lists/tuples being returned by readerfunc # we assume that the first element is the actual lcdict # FIXME: figure out how to not need this assumption if ( (isinstance(lcdict, (list, tuple))) and (isinstance(lcdict[0], dict)) ): lcdict = lcdict[0] # 0. get this neighbor's magcols and get the magdiff and colordiff # between it and the object nbrmagkeys = {} for mc in objmagkeys: if (('objectinfo' in lcdict) and (isinstance(lcdict['objectinfo'], dict)) and (mc in lcdict['objectinfo']) and (lcdict['objectinfo'][mc] is not None) and (np.isfinite(lcdict['objectinfo'][mc]))): nbrmagkeys[mc] = lcdict['objectinfo'][mc] # now calculate the magdiffs magdiffs = {} for omc in objmagkeys: if omc in nbrmagkeys: magdiffs[omc] = objmagkeys[omc] - nbrmagkeys[omc] # calculate colors and colordiffs colordiffs = {} # generate the list of colors to get # NOTE: here, we don't really bother with new/old gen checkplots # maybe change this later to handle arbitrary colors for ctrio in (['bmag','vmag','bvcolor'], ['vmag','kmag','vkcolor'], ['jmag','kmag','jkcolor'], ['sdssi','jmag','ijcolor'], ['sdssg','kmag','gkcolor'], ['sdssg','sdssr','grcolor']): m1, m2, color = ctrio if (m1 in objmagkeys and m2 in objmagkeys and m1 in nbrmagkeys and m2 in nbrmagkeys): objcolor = objmagkeys[m1] - objmagkeys[m2] nbrcolor = nbrmagkeys[m1] - nbrmagkeys[m2] colordiffs[color] = objcolor - nbrcolor # finally, add all the color and magdiff info to the nbr dict nbr.update({'magdiffs':magdiffs, 'colordiffs':colordiffs}) # # process magcols # # normalize using the special function if specified if normfunc is not None: lcdict = normfunc(lcdict) try: # get the times, mags, and errs # dereference the columns and get them from the lcdict if '.' in timecol: timecolget = timecol.split('.') else: timecolget = [timecol] times = _dict_get(lcdict, timecolget) if '.' in magcol: magcolget = magcol.split('.') else: magcolget = [magcol] mags = _dict_get(lcdict, magcolget) if '.' in errcol: errcolget = errcol.split('.') else: errcolget = [errcol] errs = _dict_get(lcdict, errcolget) except KeyError: LOGERROR('LC for neighbor: %s (target object: %s) does not ' 'have one or more of the required columns: %s, ' 'skipping...' % (objectid, checkplotdict['objectid'], ', '.join([timecol, magcol, errcol]))) continue # filter the input times, mags, errs; do sigclipping and normalization stimes, smags, serrs = sigclip_magseries(times, mags, errs, magsarefluxes=magsarefluxes, sigclip=4.0) # normalize here if not using special normalization if normfunc is None: ntimes, nmags = normalize_magseries( stimes, smags, magsarefluxes=magsarefluxes ) xtimes, xmags, xerrs = ntimes, nmags, serrs else: xtimes, xmags, xerrs = stimes, smags, serrs # check if this neighbor has enough finite points in its LC # fail early if not enough light curve points if ((xtimes is None) or (xmags is None) or (xerrs is None) or (xtimes.size < 49) or (xmags.size < 49) or (xerrs.size < 49)): LOGERROR("one or more of times, mags, errs appear to be None " "after sig-clipping. are the measurements all nan? " "can't make neighbor light curve plots " "for target: %s, neighbor: %s, neighbor LC: %s" % (checkplotdict['objectid'], nbr['objectid'], nbr['lcfpath'])) continue # # now we can start doing stuff if everything checks out # # make an unphased mag-series plot nbrdict = _pkl_magseries_plot(xtimes, xmags, xerrs, magsarefluxes=magsarefluxes) # update the nbr nbr.update(nbrdict) # for each lspmethod in the checkplot, make a corresponding plot for # this neighbor # figure out the period finder methods present if 'pfmethods' in checkplotdict: pfmethods = checkplotdict['pfmethods'] else: pfmethods = [] for cpkey in checkplotdict: for pfkey in PFMETHODS: if pfkey in cpkey: pfmethods.append(pfkey) for lspt in pfmethods: # initialize this lspmethod entry nbr[lspt] = {} # we only care about the best period and its options operiod, oepoch = (checkplotdict[lspt][0]['period'], checkplotdict[lspt][0]['epoch']) (ophasewrap, ophasesort, ophasebin, ominbinelems, oplotxlim) = ( checkplotdict[lspt][0]['phasewrap'], checkplotdict[lspt][0]['phasesort'], checkplotdict[lspt][0]['phasebin'], checkplotdict[lspt][0]['minbinelems'], checkplotdict[lspt][0]['plotxlim'], ) # make the phasedlc plot for this period nbr = _pkl_phased_magseries_plot( nbr, lspt.split('-')[1], # this splits '<pfindex>-<pfmethod>' 0, xtimes, xmags, xerrs, operiod, oepoch, phasewrap=ophasewrap, phasesort=ophasesort, phasebin=ophasebin, minbinelems=ominbinelems, plotxlim=oplotxlim, magsarefluxes=magsarefluxes, verbose=verbose, override_pfmethod=lspt ) # at this point, this neighbor's dict should be up to date with all # info, magseries plot, and all phased LC plots # return the updated checkplotdict return checkplotdict
######################## ## RUNNING CHECKPLOTS ## ########################
[docs]def runcp( pfpickle, outdir, lcbasedir, lcfname=None, cprenorm=False, lclistpkl=None, nbrradiusarcsec=60.0, maxnumneighbors=5, makeneighborlcs=True, fast_mode=False, gaia_max_timeout=60.0, gaia_mirror=None, xmatchinfo=None, xmatchradiusarcsec=3.0, minobservations=99, sigclip=10.0, lcformat='hat-sql', lcformatdir=None, timecols=None, magcols=None, errcols=None, skipdone=False, done_callback=None, done_callback_args=None, done_callback_kwargs=None ): '''This makes a checkplot pickle for the given period-finding result pickle produced by `lcproc.periodfinding.runpf`. Parameters ---------- pfpickle : str or None This is the filename of the period-finding result pickle file created by `lcproc.periodfinding.runpf`. If this is None, the checkplot will be made anyway, but no phased LC information will be collected into the output checkplot pickle. This can be useful for just collecting GAIA and other external information and making LC plots for an object. outdir : str This is the directory to which the output checkplot pickle will be written. lcbasedir : str The base directory where this function will look for the light curve file associated with the object in the input period-finding result pickle file. lcfname : str or None This is usually None because we'll get the path to the light curve associated with this period-finding pickle from the pickle itself. If `pfpickle` is None, however, this function will use `lcfname` to look up the light curve file instead. If both are provided, the value of `lcfname` takes precedence. Providing the light curve file name in this kwarg is useful when you're making checkplots directly from light curve files and not including period-finder results (perhaps because period-finding takes a long time for large collections of LCs). cprenorm : bool Set this to True if the light curves should be renormalized by `checkplot.checkplot_pickle`. This is set to False by default because we do our own normalization in this function using the light curve's registered normalization function and pass the normalized times, mags, errs to the `checkplot.checkplot_pickle` function. lclistpkl : str or dict This is either the filename of a pickle or the actual dict produced by lcproc.make_lclist. This is used to gather neighbor information. nbrradiusarcsec : float The radius in arcseconds to use for a search conducted around the coordinates of this object to look for any potential confusion and blending of variability amplitude caused by their proximity. maxnumneighbors : int The maximum number of neighbors that will have their light curves and magnitudes noted in this checkplot as potential blends with the target object. makeneighborlcs : bool If True, will make light curve and phased light curve plots for all neighbors to the current object found in the catalog passed in using `lclistpkl`. fast_mode : bool or float This runs the external catalog operations in a "fast" mode, with short timeouts and not trying to hit external catalogs that take a long time to respond. If this is set to True, the default settings for the external requests will then become:: skyview_lookup = False skyview_timeout = 10.0 skyview_retry_failed = False dust_timeout = 10.0 gaia_submit_timeout = 7.0 gaia_max_timeout = 10.0 gaia_submit_tries = 2 complete_query_later = False search_simbad = False If this is a float, will run in "fast" mode with the provided timeout value in seconds and the following settings:: skyview_lookup = True skyview_timeout = fast_mode skyview_retry_failed = False dust_timeout = fast_mode gaia_submit_timeout = 0.66*fast_mode gaia_max_timeout = fast_mode gaia_submit_tries = 2 complete_query_later = False search_simbad = False gaia_max_timeout : float Sets the timeout in seconds to use when waiting for the GAIA service to respond to our request for the object's information. Note that if `fast_mode` is set, this is ignored. gaia_mirror : str or None This sets the GAIA mirror to use. This is a key in the `services.gaia.GAIA_URLS` dict which defines the URLs to hit for each mirror. xmatchinfo : str or dict This is either the xmatch dict produced by the function `load_xmatch_external_catalogs` above, or the path to the xmatch info pickle file produced by that function. xmatchradiusarcsec : float This is the cross-matching radius to use in arcseconds. minobservations : int The minimum of observations the input object's mag/flux time-series must have for this function to plot its light curve and phased light curve. If the object has less than this number, no light curves will be plotted, but the checkplotdict will still contain all of the other information. sigclip : float or int or sequence of two floats/ints or None If a single float or int, a symmetric sigma-clip will be performed using the number provided as the sigma-multiplier to cut out from the input time-series. If a list of two ints/floats is provided, the function will perform an 'asymmetric' sigma-clip. The first element in this list is the sigma value to use for fainter flux/mag values; the second element in this list is the sigma value to use for brighter flux/mag values. For example, `sigclip=[10., 3.]`, will sigclip out greater than 10-sigma dimmings and greater than 3-sigma brightenings. Here the meaning of "dimming" and "brightening" is set by *physics* (not the magnitude system), which is why the `magsarefluxes` kwarg must be correctly set. If `sigclip` is None, no sigma-clipping will be performed, and the time-series (with non-finite elems removed) will be passed through to the output. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. timecols : list of str or None The timecol keys to use from the lcdict in generating this checkplot. magcols : list of str or None The magcol keys to use from the lcdict in generating this checkplot. errcols : list of str or None The errcol keys to use from the lcdict in generating this checkplot. skipdone : bool This indicates if this function will skip creating checkplots that already exist corresponding to the current `objectid` and `magcol`. If `skipdone` is set to True, this will be done. done_callback : Python function or None This is used to provide a function to execute after the checkplot pickles are generated. This is useful if you want to stream the results of checkplot making to some other process, e.g. directly running an ingestion into an LCC-Server collection. The function will always get the list of the generated checkplot pickles as its first arg, and all of the kwargs for runcp in the kwargs dict. Additional args and kwargs can be provided by giving a list in the `done_callbacks_args` kwarg and a dict in the `done_callbacks_kwargs` kwarg. NOTE: the function you pass in here should be pickleable by normal Python if you want to use it with the parallel_cp and parallel_cp_lcdir functions below. done_callback_args : tuple or None If not None, contains any args to pass into the `done_callback` function. done_callback_kwargs : dict or None If not None, contains any kwargs to pass into the `done_callback` function. Returns ------- list of str This returns a list of checkplot pickle filenames with one element for each (timecol, magcol, errcol) combination provided in the default lcformat config or in the timecols, magcols, errcols kwargs. ''' try: formatinfo = get_lcformat(lcformat, use_lcformat_dir=lcformatdir) if formatinfo: (fileglob, readerfunc, dtimecols, dmagcols, derrcols, magsarefluxes, normfunc) = formatinfo else: LOGERROR("can't figure out the light curve format") return None except Exception: LOGEXCEPTION("can't figure out the light curve format") return None if pfpickle is not None: if pfpickle.endswith('.gz'): infd = gzip.open(pfpickle,'rb') else: infd = open(pfpickle,'rb') pfresults = pickle.load(infd) infd.close() # override the default timecols, magcols, and errcols # using the ones provided to the function if timecols is None: timecols = dtimecols if magcols is None: magcols = dmagcols if errcols is None: errcols = derrcols if ((lcfname is not None or pfpickle is None) and os.path.exists(lcfname)): lcfpath = lcfname objectid = None else: if pfpickle is not None: objectid = pfresults['objectid'] lcfbasename = pfresults['lcfbasename'] lcfsearchpath = os.path.join(lcbasedir, lcfbasename) if os.path.exists(lcfsearchpath): lcfpath = lcfsearchpath elif lcfname is not None and os.path.exists(lcfname): lcfpath = lcfname else: LOGERROR('could not find light curve for ' 'pfresult %s, objectid %s, ' 'used search path: %s, lcfname kwarg: %s' % (pfpickle, objectid, lcfsearchpath, lcfname)) return None else: LOGERROR("no light curve provided and pfpickle is None, " "can't continue") return None lcdict = readerfunc(lcfpath) # this should handle lists/tuples being returned by readerfunc # we assume that the first element is the actual lcdict # FIXME: figure out how to not need this assumption if ( (isinstance(lcdict, (list, tuple))) and (isinstance(lcdict[0], dict)) ): lcdict = lcdict[0] # get the object ID from the light curve if pfpickle is None or we used # lcfname directly if objectid is None: if 'objectid' in lcdict: objectid = lcdict['objectid'] elif ('objectid' in lcdict['objectinfo'] and lcdict['objectinfo']['objectid']): objectid = lcdict['objectinfo']['objectid'] elif 'hatid' in lcdict['objectinfo'] and lcdict['objectinfo']['hatid']: objectid = lcdict['objectinfo']['hatid'] else: objectid = uuid.uuid4().hex[:5] LOGWARNING('no objectid found for this object, ' 'generated a random one: %s' % objectid) # normalize using the special function if specified if normfunc is not None: lcdict = normfunc(lcdict) cpfs = [] for tcol, mcol, ecol in zip(timecols, magcols, errcols): # dereference the columns and get them from the lcdict if '.' in tcol: tcolget = tcol.split('.') else: tcolget = [tcol] times = _dict_get(lcdict, tcolget) if '.' in mcol: mcolget = mcol.split('.') else: mcolget = [mcol] mags = _dict_get(lcdict, mcolget) if '.' in ecol: ecolget = ecol.split('.') else: ecolget = [ecol] errs = _dict_get(lcdict, ecolget) # get all the period-finder results from this magcol if pfpickle is not None: if 'pfmethods' in pfresults[mcol]: pflist = [ pfresults[mcol][x] for x in pfresults[mcol]['pfmethods'] if len(pfresults[mcol][x].keys()) > 0 ] else: pflist = [] for pfm in PFMETHODS: if (pfm in pfresults[mcol] and len(pfresults[mcol][pfm].keys()) > 0): pflist.append(pfresults[mcol][pfm]) # special case of generating a checkplot with no phased LCs else: pflist = [] # generate the output filename outfile = os.path.join(outdir, 'checkplot-%s-%s.pkl' % ( squeeze(objectid).replace(' ','-'), mcol )) if skipdone and os.path.exists(outfile): LOGWARNING('skipdone = True and ' 'checkplot for this objectid/magcol combination ' 'exists already: %s, skipping...' % outfile) return outfile # make sure the checkplot has a valid objectid if 'objectid' not in lcdict['objectinfo']: lcdict['objectinfo']['objectid'] = objectid # normalize here if not using special normalization if normfunc is None: ntimes, nmags = normalize_magseries( times, mags, magsarefluxes=magsarefluxes ) xtimes, xmags, xerrs = ntimes, nmags, errs else: xtimes, xmags, xerrs = times, mags, errs # generate the checkplotdict cpd = checkplot_dict( pflist, xtimes, xmags, xerrs, objectinfo=lcdict['objectinfo'], gaia_max_timeout=gaia_max_timeout, gaia_mirror=gaia_mirror, lclistpkl=lclistpkl, nbrradiusarcsec=nbrradiusarcsec, maxnumneighbors=maxnumneighbors, xmatchinfo=xmatchinfo, xmatchradiusarcsec=xmatchradiusarcsec, sigclip=sigclip, mindet=minobservations, verbose=False, fast_mode=fast_mode, magsarefluxes=magsarefluxes, normto=cprenorm # we've done the renormalization already, so this # should be False by default. just messes up the # plots otherwise, destroying LPVs in particular ) if makeneighborlcs: # include any neighbor information as well cpdupdated = update_checkplotdict_nbrlcs( cpd, tcol, mcol, ecol, lcformat=lcformat, verbose=False ) else: cpdupdated = cpd # write the update checkplot dict to disk cpf = _write_checkplot_picklefile( cpdupdated, outfile=outfile, protocol=pickle.HIGHEST_PROTOCOL, outgzip=False ) cpfs.append(cpf) # # done with checkplot making # LOGINFO('done with %s -> %s' % (objectid, repr(cpfs))) if done_callback is not None: if (done_callback_args is not None and isinstance(done_callback_args,list)): done_callback_args = tuple([cpfs] + done_callback_args) else: done_callback_args = (cpfs,) if (done_callback_kwargs is not None and isinstance(done_callback_kwargs, dict)): done_callback_kwargs.update(dict( fast_mode=fast_mode, lcfname=lcfname, cprenorm=cprenorm, lclistpkl=lclistpkl, nbrradiusarcsec=nbrradiusarcsec, maxnumneighbors=maxnumneighbors, gaia_max_timeout=gaia_max_timeout, gaia_mirror=gaia_mirror, xmatchinfo=xmatchinfo, xmatchradiusarcsec=xmatchradiusarcsec, minobservations=minobservations, sigclip=sigclip, lcformat=lcformat, fileglob=fileglob, readerfunc=readerfunc, normfunc=normfunc, magsarefluxes=magsarefluxes, timecols=timecols, magcols=magcols, errcols=errcols, skipdone=skipdone, )) else: done_callback_kwargs = dict( fast_mode=fast_mode, lcfname=lcfname, cprenorm=cprenorm, lclistpkl=lclistpkl, nbrradiusarcsec=nbrradiusarcsec, maxnumneighbors=maxnumneighbors, gaia_max_timeout=gaia_max_timeout, gaia_mirror=gaia_mirror, xmatchinfo=xmatchinfo, xmatchradiusarcsec=xmatchradiusarcsec, minobservations=minobservations, sigclip=sigclip, lcformat=lcformat, fileglob=fileglob, readerfunc=readerfunc, normfunc=normfunc, magsarefluxes=magsarefluxes, timecols=timecols, magcols=magcols, errcols=errcols, skipdone=skipdone, ) # fire the callback try: done_callback(*done_callback_args, **done_callback_kwargs) LOGINFO('callback fired successfully for %r' % cpfs) except Exception: LOGEXCEPTION('callback function failed for %r' % cpfs) # at the end, return the list of checkplot files generated return cpfs
[docs]def runcp_worker(task): ''' This is the worker for running checkplots. Parameters ---------- task : tuple This is of the form: (pfpickle, outdir, lcbasedir, kwargs). Returns ------- list of str The list of checkplot pickles returned by the `runcp` function. ''' pfpickle, outdir, lcbasedir, kwargs = task try: return runcp(pfpickle, outdir, lcbasedir, **kwargs) except Exception as e: LOGEXCEPTION(' could not make checkplots for %s: %s' % (pfpickle, e)) return None
[docs]def parallel_cp( pfpicklelist, outdir, lcbasedir, fast_mode=False, lcfnamelist=None, cprenorm=False, lclistpkl=None, gaia_max_timeout=60.0, gaia_mirror=None, nbrradiusarcsec=60.0, maxnumneighbors=5, makeneighborlcs=True, xmatchinfo=None, xmatchradiusarcsec=3.0, sigclip=10.0, minobservations=99, lcformat='hat-sql', lcformatdir=None, timecols=None, magcols=None, errcols=None, skipdone=False, done_callback=None, done_callback_args=None, done_callback_kwargs=None, liststartindex=None, maxobjects=None, nworkers=NCPUS, ): '''This drives the parallel execution of `runcp` for a list of periodfinding result pickles. Parameters ---------- pfpicklelist : list of str or list of Nones This is the list of the filenames of the period-finding result pickles to process. To make checkplots using the light curves directly, set this to a list of Nones with the same length as the list of light curve files that you provide in `lcfnamelist`. outdir : str The directory the checkplot pickles will be written to. lcbasedir : str The base directory that this function will look in to find the light curves pointed to by the period-finding result files. If you're using `lcfnamelist` to provide a list of light curve filenames directly, this arg is ignored. lcfnamelist : list of str or None If this is provided, it must be a list of the input light curve filenames to process. These can either be associated with each input period-finder result pickle, or can be provided standalone to make checkplots without phased LC plots in them. In the second case, you must set `pfpicklelist` to a list of Nones that matches the length of `lcfnamelist`. cprenorm : bool Set this to True if the light curves should be renormalized by `checkplot.checkplot_pickle`. This is set to False by default because we do our own normalization in this function using the light curve's registered normalization function and pass the normalized times, mags, errs to the `checkplot.checkplot_pickle` function. lclistpkl : str or dict This is either the filename of a pickle or the actual dict produced by lcproc.make_lclist. This is used to gather neighbor information. nbrradiusarcsec : float The radius in arcseconds to use for a search conducted around the coordinates of this object to look for any potential confusion and blending of variability amplitude caused by their proximity. maxnumneighbors : int The maximum number of neighbors that will have their light curves and magnitudes noted in this checkplot as potential blends with the target object. makeneighborlcs : bool If True, will make light curve and phased light curve plots for all neighbors found in the object collection for each input object. fast_mode : bool or float This runs the external catalog operations in a "fast" mode, with short timeouts and not trying to hit external catalogs that take a long time to respond. If this is set to True, the default settings for the external requests will then become:: skyview_lookup = False skyview_timeout = 10.0 skyview_retry_failed = False dust_timeout = 10.0 gaia_submit_timeout = 7.0 gaia_max_timeout = 10.0 gaia_submit_tries = 2 complete_query_later = False search_simbad = False If this is a float, will run in "fast" mode with the provided timeout value in seconds and the following settings:: skyview_lookup = True skyview_timeout = fast_mode skyview_retry_failed = False dust_timeout = fast_mode gaia_submit_timeout = 0.66*fast_mode gaia_max_timeout = fast_mode gaia_submit_tries = 2 complete_query_later = False search_simbad = False gaia_max_timeout : float Sets the timeout in seconds to use when waiting for the GAIA service to respond to our request for the object's information. Note that if `fast_mode` is set, this is ignored. gaia_mirror : str or None This sets the GAIA mirror to use. This is a key in the `services.gaia.GAIA_URLS` dict which defines the URLs to hit for each mirror. xmatchinfo : str or dict This is either the xmatch dict produced by the function `load_xmatch_external_catalogs` above, or the path to the xmatch info pickle file produced by that function. xmatchradiusarcsec : float This is the cross-matching radius to use in arcseconds. minobservations : int The minimum of observations the input object's mag/flux time-series must have for this function to plot its light curve and phased light curve. If the object has less than this number, no light curves will be plotted, but the checkplotdict will still contain all of the other information. sigclip : float or int or sequence of two floats/ints or None If a single float or int, a symmetric sigma-clip will be performed using the number provided as the sigma-multiplier to cut out from the input time-series. If a list of two ints/floats is provided, the function will perform an 'asymmetric' sigma-clip. The first element in this list is the sigma value to use for fainter flux/mag values; the second element in this list is the sigma value to use for brighter flux/mag values. For example, `sigclip=[10., 3.]`, will sigclip out greater than 10-sigma dimmings and greater than 3-sigma brightenings. Here the meaning of "dimming" and "brightening" is set by *physics* (not the magnitude system), which is why the `magsarefluxes` kwarg must be correctly set. If `sigclip` is None, no sigma-clipping will be performed, and the time-series (with non-finite elems removed) will be passed through to the output. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. timecols : list of str or None The timecol keys to use from the lcdict in generating this checkplot. magcols : list of str or None The magcol keys to use from the lcdict in generating this checkplot. errcols : list of str or None The errcol keys to use from the lcdict in generating this checkplot. skipdone : bool This indicates if this function will skip creating checkplots that already exist corresponding to the current `objectid` and `magcol`. If `skipdone` is set to True, this will be done. done_callback : Python function or None This is used to provide a function to execute after the checkplot pickles are generated. This is useful if you want to stream the results of checkplot making to some other process, e.g. directly running an ingestion into an LCC-Server collection. The function will always get the list of the generated checkplot pickles as its first arg, and all of the kwargs for runcp in the kwargs dict. Additional args and kwargs can be provided by giving a list in the `done_callbacks_args` kwarg and a dict in the `done_callbacks_kwargs` kwarg. NOTE: the function you pass in here should be pickleable by normal Python if you want to use it with the parallel_cp and parallel_cp_lcdir functions below. done_callback_args : tuple or None If not None, contains any args to pass into the `done_callback` function. done_callback_kwargs : dict or None If not None, contains any kwargs to pass into the `done_callback` function. liststartindex : int The index of the `pfpicklelist` (and `lcfnamelist` if provided) to start working at. maxobjects : int The maximum number of objects to process in this run. Use this with `liststartindex` to effectively distribute working on a large list of input period-finding result pickles (and light curves if `lcfnamelist` is also provided) over several sessions or machines. nworkers : int The number of parallel workers that will work on the checkplot generation process. Returns ------- dict This returns a dict with keys = input period-finding pickles and vals = list of the corresponding checkplot pickles produced. ''' # work around the Darwin segfault after fork if no network activity in # main thread bug: https://bugs.python.org/issue30385#msg293958 if sys.platform == 'darwin': import requests requests.get('http://captive.apple.com/hotspot-detect.html') if not os.path.exists(outdir): os.mkdir(outdir) # handle the start and end indices if (liststartindex is not None) and (maxobjects is None): pfpicklelist = pfpicklelist[liststartindex:] if lcfnamelist is not None: lcfnamelist = lcfnamelist[liststartindex:] elif (liststartindex is None) and (maxobjects is not None): pfpicklelist = pfpicklelist[:maxobjects] if lcfnamelist is not None: lcfnamelist = lcfnamelist[:maxobjects] elif (liststartindex is not None) and (maxobjects is not None): pfpicklelist = ( pfpicklelist[liststartindex:liststartindex+maxobjects] ) if lcfnamelist is not None: lcfnamelist = lcfnamelist[liststartindex:liststartindex+maxobjects] # if the lcfnamelist is not provided, create a dummy if lcfnamelist is None: lcfnamelist = [None]*len(pfpicklelist) tasklist = [(x, outdir, lcbasedir, {'lcformat':lcformat, 'lcformatdir':lcformatdir, 'lcfname':y, 'timecols':timecols, 'magcols':magcols, 'errcols':errcols, 'lclistpkl':lclistpkl, 'gaia_max_timeout':gaia_max_timeout, 'gaia_mirror':gaia_mirror, 'nbrradiusarcsec':nbrradiusarcsec, 'maxnumneighbors':maxnumneighbors, 'makeneighborlcs':makeneighborlcs, 'xmatchinfo':xmatchinfo, 'xmatchradiusarcsec':xmatchradiusarcsec, 'sigclip':sigclip, 'minobservations':minobservations, 'skipdone':skipdone, 'cprenorm':cprenorm, 'fast_mode':fast_mode, 'done_callback':done_callback, 'done_callback_args':done_callback_args, 'done_callback_kwargs':done_callback_kwargs}) for x,y in zip(pfpicklelist, lcfnamelist)] resultfutures = [] results = [] with ProcessPoolExecutor(max_workers=nworkers) as executor: resultfutures = executor.map(runcp_worker, tasklist) results = list(resultfutures) executor.shutdown() return results
[docs]def parallel_cp_pfdir(pfpickledir, outdir, lcbasedir, pfpickleglob='periodfinding-*.pkl*', lclistpkl=None, cprenorm=False, nbrradiusarcsec=60.0, maxnumneighbors=5, makeneighborlcs=True, fast_mode=False, gaia_max_timeout=60.0, gaia_mirror=None, xmatchinfo=None, xmatchradiusarcsec=3.0, minobservations=99, sigclip=10.0, lcformat='hat-sql', lcformatdir=None, timecols=None, magcols=None, errcols=None, skipdone=False, done_callback=None, done_callback_args=None, done_callback_kwargs=None, maxobjects=None, nworkers=32): '''This drives the parallel execution of `runcp` for a directory of periodfinding pickles. Parameters ---------- pfpickledir : str This is the directory containing all of the period-finding pickles to process. outdir : str The directory the checkplot pickles will be written to. lcbasedir : str The base directory that this function will look in to find the light curves pointed to by the period-finding result files. If you're using `lcfnamelist` to provide a list of light curve filenames directly, this arg is ignored. pkpickleglob : str This is a UNIX file glob to select period-finding result pickles in the specified `pfpickledir`. lclistpkl : str or dict This is either the filename of a pickle or the actual dict produced by lcproc.make_lclist. This is used to gather neighbor information. cprenorm : bool Set this to True if the light curves should be renormalized by `checkplot.checkplot_pickle`. This is set to False by default because we do our own normalization in this function using the light curve's registered normalization function and pass the normalized times, mags, errs to the `checkplot.checkplot_pickle` function. nbrradiusarcsec : float The radius in arcseconds to use for a search conducted around the coordinates of this object to look for any potential confusion and blending of variability amplitude caused by their proximity. maxnumneighbors : int The maximum number of neighbors that will have their light curves and magnitudes noted in this checkplot as potential blends with the target object. makeneighborlcs : bool If True, will make light curve and phased light curve plots for all neighbors found in the object collection for each input object. fast_mode : bool or float This runs the external catalog operations in a "fast" mode, with short timeouts and not trying to hit external catalogs that take a long time to respond. If this is set to True, the default settings for the external requests will then become:: skyview_lookup = False skyview_timeout = 10.0 skyview_retry_failed = False dust_timeout = 10.0 gaia_submit_timeout = 7.0 gaia_max_timeout = 10.0 gaia_submit_tries = 2 complete_query_later = False search_simbad = False If this is a float, will run in "fast" mode with the provided timeout value in seconds and the following settings:: skyview_lookup = True skyview_timeout = fast_mode skyview_retry_failed = False dust_timeout = fast_mode gaia_submit_timeout = 0.66*fast_mode gaia_max_timeout = fast_mode gaia_submit_tries = 2 complete_query_later = False search_simbad = False gaia_max_timeout : float Sets the timeout in seconds to use when waiting for the GAIA service to respond to our request for the object's information. Note that if `fast_mode` is set, this is ignored. gaia_mirror : str or None This sets the GAIA mirror to use. This is a key in the `services.gaia.GAIA_URLS` dict which defines the URLs to hit for each mirror. xmatchinfo : str or dict This is either the xmatch dict produced by the function `load_xmatch_external_catalogs` above, or the path to the xmatch info pickle file produced by that function. xmatchradiusarcsec : float This is the cross-matching radius to use in arcseconds. minobservations : int The minimum of observations the input object's mag/flux time-series must have for this function to plot its light curve and phased light curve. If the object has less than this number, no light curves will be plotted, but the checkplotdict will still contain all of the other information. sigclip : float or int or sequence of two floats/ints or None If a single float or int, a symmetric sigma-clip will be performed using the number provided as the sigma-multiplier to cut out from the input time-series. If a list of two ints/floats is provided, the function will perform an 'asymmetric' sigma-clip. The first element in this list is the sigma value to use for fainter flux/mag values; the second element in this list is the sigma value to use for brighter flux/mag values. For example, `sigclip=[10., 3.]`, will sigclip out greater than 10-sigma dimmings and greater than 3-sigma brightenings. Here the meaning of "dimming" and "brightening" is set by *physics* (not the magnitude system), which is why the `magsarefluxes` kwarg must be correctly set. If `sigclip` is None, no sigma-clipping will be performed, and the time-series (with non-finite elems removed) will be passed through to the output. lcformat : str This is the `formatkey` associated with your light curve format, which you previously passed in to the `lcproc.register_lcformat` function. This will be used to look up how to find and read the light curves specified in `basedir` or `use_list_of_filenames`. lcformatdir : str or None If this is provided, gives the path to a directory when you've stored your lcformat description JSONs, other than the usual directories lcproc knows to search for them in. Use this along with `lcformat` to specify an LC format JSON file that's not currently registered with lcproc. timecols : list of str or None The timecol keys to use from the lcdict in generating this checkplot. magcols : list of str or None The magcol keys to use from the lcdict in generating this checkplot. errcols : list of str or None The errcol keys to use from the lcdict in generating this checkplot. skipdone : bool This indicates if this function will skip creating checkplots that already exist corresponding to the current `objectid` and `magcol`. If `skipdone` is set to True, this will be done. done_callback : Python function or None This is used to provide a function to execute after the checkplot pickles are generated. This is useful if you want to stream the results of checkplot making to some other process, e.g. directly running an ingestion into an LCC-Server collection. The function will always get the list of the generated checkplot pickles as its first arg, and all of the kwargs for runcp in the kwargs dict. Additional args and kwargs can be provided by giving a list in the `done_callbacks_args` kwarg and a dict in the `done_callbacks_kwargs` kwarg. NOTE: the function you pass in here should be pickleable by normal Python if you want to use it with the parallel_cp and parallel_cp_lcdir functions below. done_callback_args : tuple or None If not None, contains any args to pass into the `done_callback` function. done_callback_kwargs : dict or None If not None, contains any kwargs to pass into the `done_callback` function. maxobjects : int The maximum number of objects to process in this run. nworkers : int The number of parallel workers that will work on the checkplot generation process. Returns ------- dict This returns a dict with keys = input period-finding pickles and vals = list of the corresponding checkplot pickles produced. ''' pfpicklelist = sorted(glob.glob(os.path.join(pfpickledir, pfpickleglob))) LOGINFO('found %s period-finding pickles, running cp...' % len(pfpicklelist)) return parallel_cp(pfpicklelist, outdir, lcbasedir, fast_mode=fast_mode, lclistpkl=lclistpkl, nbrradiusarcsec=nbrradiusarcsec, gaia_max_timeout=gaia_max_timeout, gaia_mirror=gaia_mirror, maxnumneighbors=maxnumneighbors, makeneighborlcs=makeneighborlcs, xmatchinfo=xmatchinfo, xmatchradiusarcsec=xmatchradiusarcsec, sigclip=sigclip, minobservations=minobservations, cprenorm=cprenorm, maxobjects=maxobjects, lcformat=lcformat, lcformatdir=lcformatdir, timecols=timecols, magcols=magcols, errcols=errcols, skipdone=skipdone, nworkers=nworkers, done_callback=done_callback, done_callback_args=done_callback_args, done_callback_kwargs=done_callback_kwargs)