Source code for astrobase.lcproc.awsrun

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# awsrun.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Oct 2018
# License: MIT - see the LICENSE file for the full text.

"""

This contains lcproc worker loops useful for AWS processing of light curves.

The basic workflow is::

   LCs from S3 -> SQS -> worker loop -> products back to S3 | result JSON to SQS

All functions here assume AWS credentials have been loaded already using awscli
as described at:

https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html

General recommendations:

- use t3.medium or t3.micro instances for runcp_consumer_loop. Checkplot making
  isn't really a CPU intensive activity, so using these will be cheaper.

- use c5.2xlarge or above instances for runpf_consumer_loop. Period-finders
  require a decent number of fast cores, so a spot fleet of these instances
  should be cost-effective.

- you may want a t3.micro instance running in the same region and VPC as your
  worker node instances to serve as a head node driving the producer_loop
  functions. This can be done from a machine outside AWS, but you'll incur
  (probably tiny) charges for network egress from the output queues.

- It's best not to download results from S3 as soon as they're produced. Leave
  them on S3 until everything is done, then use rclone (https://rclone.org) to
  sync them back to your machines using --transfers <large number>.

The user_data and instance_user_data kwargs for the make_ec2_nodes and
make_spot_fleet_cluster functions can be used to start processing loops as soon
as EC2 brings up the VM instance. This is especially useful for spot fleets set
to maintain a target capacity, since worker nodes will be terminated and
automatically replaced. Bringing up the processing loop at instance start up
makes it easy to continue processing light curves exactly where you left off
without having to manually intervene.

Example script for user_data bringing up a checkplot-making loop on instance
creation (assuming we're using Amazon Linux 2)::

    #!/bin/bash

    cat << 'EOF' > /home/ec2-user/launch-runcp.sh
    #!/bin/bash
    sudo yum -y install python3-devel gcc-gfortran jq htop emacs-nox git

    # create the virtualenv
    python3 -m venv /home/ec2-user/py3

    # get astrobase
    cd /home/ec2-user
    git clone https://github.com/waqasbhatti/astrobase

    # install it
    cd /home/ec2-user/astrobase
    /home/ec2-user/py3/bin/pip install pip setuptools numpy -U
    /home/ec2-user/py3/bin/pip install -e .[aws]

    # make the work dir
    mkdir /home/ec2-user/work
    cd /home/ec2-user/work

    # wait a bit for the instance info to be populated
    sleep 5

    # set some environ vars for boto3 and the processing loop
    export AWS_DEFAULT_REGION=`curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document/ | jq '.region' | tr -d '"'`
    export NCPUS=`lscpu -J | jq ".lscpu[3].data|tonumber"`

    # launch the processor loops
    for s in `seq $NCPUS`; do nohup /home/ec2-user/py3/bin/python3 -u -c "from astrobase.lcproc import awsrun as lcp; lcp.runcp_consumer_loop('https://queue-url','.','s3://path/to/lclist.pkl')" > runcp-$s-loop.out & done
    EOF

    # run the script we just created as ec2-user
    chown ec2-user /home/ec2-user/launch-runcp.sh
    su ec2-user -c 'bash /home/ec2-user/launch-runcp.sh'

Here's a similar script for a runpf consumer loop. We launch only a single
instance of the loop because runpf will use all CPUs by default for its
period-finder parallelized functions::

    #!/bin/bash

    cat << 'EOF' > /home/ec2-user/launch-runpf.sh
    #!/bin/bash
    sudo yum -y install python3-devel gcc-gfortran jq htop emacs-nox git

    python3 -m venv /home/ec2-user/py3

    cd /home/ec2-user
    git clone https://github.com/waqasbhatti/astrobase

    cd /home/ec2-user/astrobase
    /home/ec2-user/py3/bin/pip install pip setuptools numpy -U
    /home/ec2-user/py3/bin/pip install -e .[aws]

    mkdir /home/ec2-user/work
    cd /home/ec2-user/work

    # wait a bit for the instance info to be populated
    sleep 5

    export AWS_DEFAULT_REGION=`curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document/ | jq '.region' | tr -d '"'`
    export NCPUS=`lscpu -J | jq ".lscpu[3].data|tonumber"`

    # launch the processes
    nohup /home/ec2-user/py3/bin/python3 -u -c "from astrobase.lcproc import awsrun as lcp; lcp.runpf_consumer_loop('https://input-queue-url','.')" > runpf-loop.out &
    EOF

    chown ec2-user /home/ec2-user/launch-runpf.sh
    su ec2-user -c 'bash /home/ec2-user/launch-runpf.sh'

"""

#############
## LOGGING ##
#############

import logging
from astrobase import log_sub, log_fmt, log_date_fmt

DEBUG = False
if DEBUG:
    level = logging.DEBUG
else:
    level = logging.INFO
LOGGER = logging.getLogger(__name__)
logging.basicConfig(
    level=level,
    style=log_sub,
    format=log_fmt,
    datefmt=log_date_fmt,
)

LOGDEBUG = LOGGER.debug
LOGINFO = LOGGER.info
LOGWARNING = LOGGER.warning
LOGERROR = LOGGER.error
LOGEXCEPTION = LOGGER.exception


#############
## IMPORTS ##
#############

import os.path
import os
import pickle
import time
import signal
import subprocess

import requests
from requests.exceptions import HTTPError

try:

    import boto3
    from botocore.exceptions import ClientError

except ImportError:
    raise ImportError(
        "This module requires the boto3 package from PyPI. "
        "You'll also need the awscli package to set up the "
        "AWS secret key config for this module."
    )

from .. import awsutils
from .periodsearch import runpf
from .checkplotgen import runcp


####################################
## WORKER LOOPS UTILITY FUNCTIONS ##
####################################

[docs]def kill_handler(sig, frame): ''' This raises a KeyboardInterrupt when a SIGKILL comes in. This is a handle for use with the Python `signal.signal` function. ''' raise KeyboardInterrupt
[docs]def cache_clean_handler(min_age_hours=1): """This periodically cleans up the ~/.astrobase cache to save us from disk-space doom. Parameters ---------- min_age_hours : int Files older than this number of hours from the current time will be deleted. Returns ------- Nothing. """ # find the files to delete cmd = ( r"find ~ec2-user/.astrobase -type f -mmin +{mmin} -exec rm -v '{{}}' \;" ) mmin = '%.1f' % (min_age_hours*60.0) cmd = cmd.format(mmin=mmin) try: proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) ndeleted = len(proc.stdout.decode().split('\n')) LOGWARNING('cache clean: %s files older than %s hours deleted' % (ndeleted, min_age_hours)) except Exception: LOGEXCEPTION('cache clean: could not delete old files')
[docs]def shutdown_check_handler(): """This checks the AWS instance data URL to see if there's a pending shutdown for the instance. This is useful for AWS spot instances. If there is a pending shutdown posted to the instance data URL, we'll use the result of this function break out of the processing loop and shut everything down ASAP before the instance dies. Returns ------- bool - True if the instance is going to die soon. - False if the instance is still safe. """ url = 'http://169.254.169.254/latest/meta-data/spot/instance-action' try: resp = requests.get(url, timeout=1.0) resp.raise_for_status() stopinfo = resp.json() if 'action' in stopinfo and stopinfo['action'] in ('stop', 'terminate', 'hibernate'): stoptime = stopinfo['time'] LOGWARNING('instance is going to %s at %s' % (stopinfo['action'], stoptime)) resp.close() return True else: resp.close() return False except HTTPError: resp.close() return False except Exception: resp.close() return False
############################ ## CHECKPLOT MAKING LOOPS ## ############################
[docs]def runcp_producer_loop( lightcurve_list, input_queue, input_bucket, result_queue, result_bucket, pfresult_list=None, runcp_kwargs=None, process_list_slice=None, purge_queues_when_done=False, delete_queues_when_done=False, download_when_done=True, save_state_when_done=True, s3_client=None, sqs_client=None ): """This sends checkplot making tasks to the input queue and monitors the result queue for task completion. Parameters ---------- lightcurve_list : str or list of str This is either a string pointing to a file containing a list of light curves filenames to process or the list itself. The names must correspond to the full filenames of files stored on S3, including all prefixes, but not include the 's3://<bucket name>/' bit (these will be added automatically). input_queue : str This is the name of the SQS queue which will receive processing tasks generated by this function. The queue URL will automatically be obtained from AWS. input_bucket : str The name of the S3 bucket containing the light curve files to process. result_queue : str This is the name of the SQS queue that this function will listen to for messages from the workers as they complete processing on their input elements. This function will attempt to match input sent to the `input_queue` with results coming into the `result_queue` so it knows how many objects have been successfully processed. If this function receives task results that aren't in its own input queue, it will acknowledge them so they complete successfully, but not download them automatically. This handles leftover tasks completing from a previous run of this function. result_bucket : str The name of the S3 bucket which will receive the results from the workers. pfresult_list : list of str or None This is a list of periodfinder result pickle S3 URLs associated with each light curve. If provided, this will be used to add in phased light curve plots to each checkplot pickle. If this is None, the worker loop will produce checkplot pickles that only contain object information, neighbor information, and unphased light curves. runcp_kwargs : dict This is a dict used to pass any extra keyword arguments to the `lcproc.checkplotgen.runcp` function that will be run by the worker loop. process_list_slice : list This is used to index into the input light curve list so a subset of the full list can be processed in this specific run of this function. Use None for a slice index elem to emulate single slice spec behavior: process_list_slice = [10, None] -> lightcurve_list[10:] process_list_slice = [None, 500] -> lightcurve_list[:500] purge_queues_when_done : bool If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C), all outstanding elements in the input/output queues that have not yet been acknowledged by workers or by this function will be purged. This effectively cancels all outstanding work. delete_queues_when_done : bool If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C'), all outstanding work items will be purged from the input/queues and the queues themselves will be deleted. download_when_done : bool If this is True, the generated checkplot pickle for each input work item will be downloaded immediately to the current working directory when the worker functions report they're done with it. save_state_when_done : bool If this is True, will save the current state of the work item queue and the work items acknowledged as completed to a pickle in the current working directory. Call the `runcp_producer_loop_savedstate` function below to resume processing from this saved state later. s3_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its S3 download operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. sqs_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its SQS operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- dict or str Returns the current work state as a dict or str path to the generated work state pickle depending on if `save_state_when_done` is True. """ if not sqs_client: sqs_client = boto3.client('sqs') if not s3_client: s3_client = boto3.client('s3') if isinstance(lightcurve_list, str) and os.path.exists(lightcurve_list): # get the LC list with open(lightcurve_list, 'r') as infd: lclist = infd.readlines() lclist = [x.replace('\n','') for x in lclist if len(x) > 0] if process_list_slice is not None: lclist = lclist[process_list_slice[0]:process_list_slice[1]] lclist = [x[1:] for x in lclist if x.startswith('/')] lclist = ['s3://%s/%s' % (input_bucket, x) for x in lclist] # this handles direct invocation using lists of s3:// urls of light curves elif isinstance(lightcurve_list, list): lclist = lightcurve_list # set up the input and output queues # check if the queues by the input and output names given exist already # if they do, go ahead and use them # if they don't, make new ones. try: inq = sqs_client.get_queue_url(QueueName=input_queue) inq_url = inq['QueueUrl'] LOGINFO('input queue already exists, skipping creation...') except ClientError: inq = awsutils.sqs_create_queue(input_queue, client=sqs_client) inq_url = inq['url'] try: outq = sqs_client.get_queue_url(QueueName=result_queue) outq_url = outq['QueueUrl'] LOGINFO('result queue already exists, skipping creation...') except ClientError: outq = awsutils.sqs_create_queue(result_queue, client=sqs_client) outq_url = outq['url'] LOGINFO('input queue: %s' % inq_url) LOGINFO('output queue: %s' % outq_url) # wait until queues are up LOGINFO('waiting for queues to become ready...') time.sleep(10.0) # for each item in the lightcurve_list, send it to the input queue and wait # until it's done to send another one if pfresult_list is None: pfresult_list = [None for x in lclist] for lc, pf in zip(lclist, pfresult_list): this_item = { 'target': lc, 'action': 'runcp', 'args': (pf,), 'kwargs':runcp_kwargs if runcp_kwargs is not None else {}, 'outbucket': result_bucket, 'outqueue': outq_url } resp = awsutils.sqs_put_item(inq_url, this_item, client=sqs_client) if resp: LOGINFO('sent %s to queue: %s' % (lc,inq_url)) # now block until all objects are done done_objects = {} LOGINFO('all items queued, waiting for results...') # listen to the kill and term signals and raise KeyboardInterrupt when # called signal.signal(signal.SIGINT, kill_handler) signal.signal(signal.SIGTERM, kill_handler) while len(list(done_objects.keys())) < len(lclist): try: result = awsutils.sqs_get_item(outq_url, client=sqs_client) if result is not None and len(result) > 0: recv = result[0] try: processed_object = recv['item']['target'] except KeyError: LOGWARNING('unknown target in received item: %s' % recv) processed_object = 'unknown-lc' cpf = recv['item']['cpf'] receipt = recv['receipt_handle'] if processed_object in lclist: if processed_object not in done_objects: done_objects[processed_object] = [cpf] else: done_objects[processed_object].append(cpf) LOGINFO('done with %s -> %s' % (processed_object, cpf)) if download_when_done: getobj = awsutils.awsutils.s3_get_url( cpf, client=s3_client ) LOGINFO('downloaded %s -> %s' % (cpf, getobj)) else: LOGWARNING('processed object returned is not in ' 'queued target list, probably from an ' 'earlier run. accepting but not downloading.') awsutils.sqs_delete_item(outq_url, receipt) except KeyboardInterrupt: LOGWARNING('breaking out of producer wait-loop') break # delete the input and output queues when we're done LOGINFO('done with processing.') time.sleep(1.0) if purge_queues_when_done: LOGWARNING('purging queues at exit, please wait 10 seconds...') sqs_client.purge_queue(QueueUrl=inq_url) sqs_client.purge_queue(QueueUrl=outq_url) time.sleep(10.0) if delete_queues_when_done: LOGWARNING('deleting queues at exit') awsutils.sqs_delete_queue(inq_url) awsutils.sqs_delete_queue(outq_url) work_state = { 'done': done_objects, 'in_progress': list(set(lclist) - set(done_objects.keys())), 'args':((os.path.abspath(lightcurve_list) if isinstance(lightcurve_list, str) else lightcurve_list), input_queue, input_bucket, result_queue, result_bucket), 'kwargs':{'pfresult_list':pfresult_list, 'runcp_kwargs':runcp_kwargs, 'process_list_slice':process_list_slice, 'download_when_done':download_when_done, 'purge_queues_when_done':purge_queues_when_done, 'save_state_when_done':save_state_when_done, 'delete_queues_when_done':delete_queues_when_done} } if save_state_when_done: with open('runcp-queue-producer-loop-state.pkl','wb') as outfd: pickle.dump(work_state, outfd, pickle.HIGHEST_PROTOCOL) # at the end, return the done_objects dict # also return the list of unprocessed items if any return work_state
[docs]def runcp_producer_loop_savedstate( use_saved_state=None, lightcurve_list=None, input_queue=None, input_bucket=None, result_queue=None, result_bucket=None, pfresult_list=None, runcp_kwargs=None, process_list_slice=None, download_when_done=True, purge_queues_when_done=True, save_state_when_done=True, delete_queues_when_done=False, s3_client=None, sqs_client=None ): """This wraps the function above to allow for loading previous state from a file. Parameters ---------- use_saved_state : str or None This is the path to the saved state pickle file produced by a previous run of `runcp_producer_loop`. Will get all of the arguments to run another instance of the loop from that pickle file. If this is None, you MUST provide all of the appropriate arguments to that function. lightcurve_list : str or list of str or None This is either a string pointing to a file containing a list of light curves filenames to process or the list itself. The names must correspond to the full filenames of files stored on S3, including all prefixes, but not include the 's3://<bucket name>/' bit (these will be added automatically). input_queue : str or None This is the name of the SQS queue which will receive processing tasks generated by this function. The queue URL will automatically be obtained from AWS. input_bucket : str or None The name of the S3 bucket containing the light curve files to process. result_queue : str or None This is the name of the SQS queue that this function will listen to for messages from the workers as they complete processing on their input elements. This function will attempt to match input sent to the `input_queue` with results coming into the `result_queue` so it knows how many objects have been successfully processed. If this function receives task results that aren't in its own input queue, it will acknowledge them so they complete successfully, but not download them automatically. This handles leftover tasks completing from a previous run of this function. result_bucket : str or None The name of the S3 bucket which will receive the results from the workers. pfresult_list : list of str or None This is a list of periodfinder result pickle S3 URLs associated with each light curve. If provided, this will be used to add in phased light curve plots to each checkplot pickle. If this is None, the worker loop will produce checkplot pickles that only contain object information, neighbor information, and unphased light curves. runcp_kwargs : dict or None This is a dict used to pass any extra keyword arguments to the `lcproc.checkplotgen.runcp` function that will be run by the worker loop. process_list_slice : list or None This is used to index into the input light curve list so a subset of the full list can be processed in this specific run of this function. Use None for a slice index elem to emulate single slice spec behavior: process_list_slice = [10, None] -> lightcurve_list[10:] process_list_slice = [None, 500] -> lightcurve_list[:500] purge_queues_when_done : bool or None If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C), all outstanding elements in the input/output queues that have not yet been acknowledged by workers or by this function will be purged. This effectively cancels all outstanding work. delete_queues_when_done : bool or None If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C'), all outstanding work items will be purged from the input/queues and the queues themselves will be deleted. download_when_done : bool or None If this is True, the generated checkplot pickle for each input work item will be downloaded immediately to the current working directory when the worker functions report they're done with it. save_state_when_done : bool or None If this is True, will save the current state of the work item queue and the work items acknowledged as completed to a pickle in the current working directory. Call the `runcp_producer_loop_savedstate` function below to resume processing from this saved state later. s3_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its S3 download operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. sqs_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its SQS operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- dict or str Returns the current work state as a dict or str path to the generated work state pickle depending on if `save_state_when_done` is True. """ if use_saved_state is not None and os.path.exists(use_saved_state): with open(use_saved_state,'rb') as infd: saved_state = pickle.load(infd) # run the producer loop using the saved state's todo list return runcp_producer_loop( saved_state['in_progress'], saved_state['args'][1], saved_state['args'][2], saved_state['args'][3], saved_state['args'][4], **saved_state['kwargs'] ) else: return runcp_producer_loop( lightcurve_list, input_queue, input_bucket, result_queue, result_bucket, pfresult_list=pfresult_list, runcp_kwargs=runcp_kwargs, process_list_slice=process_list_slice, download_when_done=download_when_done, purge_queues_when_done=purge_queues_when_done, save_state_when_done=save_state_when_done, delete_queues_when_done=delete_queues_when_done, s3_client=s3_client, sqs_client=sqs_client )
[docs]def runcp_consumer_loop( in_queue_url, workdir, lclist_pkl_s3url, lc_altexts=('',), wait_time_seconds=5, cache_clean_timer_seconds=3600.0, shutdown_check_timer_seconds=60.0, sqs_client=None, s3_client=None ): """This runs checkplot pickle making in a loop until interrupted. Consumes work task items from an input queue set up by `runcp_producer_loop` above. For the moment, we don't generate neighbor light curves since this would require a lot more S3 calls. Parameters ---------- in_queue_url : str The SQS URL of the input queue to listen to for work assignment messages. The task orders will include the input and output S3 bucket names, as well as the URL of the output queue to where this function will report its work-complete or work-failed status. workdir : str The directory on the local machine where this worker loop will download the input light curves and associated period-finder results (if any), process them, and produce its output checkplot pickles. These will then be uploaded to the specified S3 output bucket and then deleted from the workdir when the upload is confirmed to make it safely to S3. lclist_pkl : str S3 URL of a catalog pickle generated by `lcproc.catalogs.make_lclist` that contains objectids and coordinates, as well as a kdtree for all of the objects in the current light curve collection being processed. This is used to look up neighbors for each object being processed. lc_altexts : sequence of str If not None, this is a sequence of alternate extensions to try for the input light curve file other than the one provided in the input task order. For example, to get anything that's an .sqlite where .sqlite.gz is expected, use altexts=[''] to strip the .gz. wait_time_seconds : int The amount of time to wait in the input SQS queue for an input task order. If this timeout expires and no task has been received, this function goes back to the top of the work loop. cache_clean_timer_seconds : float The amount of time in seconds to wait before periodically removing old files (such as finder chart FITS, external service result pickles) from the astrobase cache directory. These accumulate as the work items are processed, and take up significant space, so must be removed periodically. shutdown_check_timer_seconds : float The amount of time to wait before checking for a pending EC2 shutdown message for the instance this worker loop is operating on. If a shutdown is noticed, the worker loop is cancelled in preparation for instance shutdown. sqs_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its SQS operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. s3_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its S3 operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- Nothing. """ if not sqs_client: sqs_client = boto3.client('sqs') if not s3_client: s3_client = boto3.client('s3') lclist_pklf = lclist_pkl_s3url.split('/')[-1] if not os.path.exists(lclist_pklf): # get the lclist pickle from S3 to help with neighbor queries lclist_pklf = awsutils.s3_get_url( lclist_pkl_s3url, client=s3_client ) with open(lclist_pklf,'rb') as infd: lclistpkl = pickle.load(infd) # listen to the kill and term signals and raise KeyboardInterrupt when # called signal.signal(signal.SIGINT, kill_handler) signal.signal(signal.SIGTERM, kill_handler) shutdown_last_time = time.monotonic() diskspace_last_time = time.monotonic() while True: curr_time = time.monotonic() if (curr_time - shutdown_last_time) > shutdown_check_timer_seconds: shutdown_check = shutdown_check_handler() if shutdown_check: LOGWARNING('instance will die soon, breaking loop') break shutdown_last_time = time.monotonic() if (curr_time - diskspace_last_time) > cache_clean_timer_seconds: cache_clean_handler() diskspace_last_time = time.monotonic() try: # receive a single message from the inqueue work = awsutils.sqs_get_item(in_queue_url, client=sqs_client, raiseonfail=True) # JSON deserialize the work item if work is not None and len(work) > 0: recv = work[0] # skip any messages that don't tell us to runcp # FIXME: use the MessageAttributes for setting topics instead action = recv['item']['action'] if action != 'runcp': continue target = recv['item']['target'] args = recv['item']['args'] kwargs = recv['item']['kwargs'] outbucket = recv['item']['outbucket'] if 'outqueue' in recv['item']: out_queue_url = recv['item']['outqueue'] else: out_queue_url = None receipt = recv['receipt_handle'] # download the target from S3 to a file in the work directory try: lc_filename = awsutils.s3_get_url( target, altexts=lc_altexts, client=s3_client, ) # get the period-finder pickle if present in args if len(args) > 0 and args[0] is not None: pf_pickle = awsutils.s3_get_url( args[0], client=s3_client ) else: pf_pickle = None # now runcp cpfs = runcp( pf_pickle, workdir, workdir, lcfname=lc_filename, lclistpkl=lclistpkl, makeneighborlcs=False, **kwargs ) if cpfs and all(os.path.exists(x) for x in cpfs): LOGINFO('runcp OK for LC: %s, PF: %s -> %s' % (lc_filename, pf_pickle, cpfs)) # check if the file exists already because it's been # processed somewhere else resp = s3_client.list_objects_v2( Bucket=outbucket, MaxKeys=1, Prefix=cpfs[0] ) outbucket_list = resp.get('Contents',[]) if outbucket_list and len(outbucket_list) > 0: LOGWARNING( 'not uploading runcp results for %s because ' 'they exist in the output bucket already' % target ) awsutils.sqs_delete_item(in_queue_url, receipt) continue for cpf in cpfs: put_url = awsutils.s3_put_file(cpf, outbucket, client=s3_client) if put_url is not None: LOGINFO('result uploaded to %s' % put_url) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'cpf':put_url, 'target': target, 'lc_filename':lc_filename, 'lclistpkl':lclist_pklf, 'kwargs':kwargs}, raiseonfail=True ) # delete the result from the local directory os.remove(cpf) # if the upload fails, don't acknowledge the # message. might be a temporary S3 failure, so # another worker might succeed later. else: LOGERROR('failed to upload %s to S3' % cpf) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done and successful awsutils.sqs_delete_item(in_queue_url, receipt) # delete the light curve file when we're done with it if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) # if runcp failed outright, don't requeue. instead, write a # ('failed-checkplot-%s.pkl' % lc_filename) file to the # output S3 bucket. else: LOGWARNING('runcp failed for LC: %s, PF: %s' % (lc_filename, pf_pickle)) with open('failed-checkplot-%s.pkl' % lc_filename, 'wb') as outfd: pickle.dump( {'in_queue_url':in_queue_url, 'target':target, 'lc_filename':lc_filename, 'lclistpkl':lclist_pklf, 'kwargs':kwargs, 'outbucket':outbucket, 'out_queue_url':out_queue_url}, outfd, pickle.HIGHEST_PROTOCOL ) put_url = awsutils.s3_put_file( 'failed-checkplot-%s.pkl' % lc_filename, outbucket, client=s3_client ) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'cpf':put_url, 'lc_filename':lc_filename, 'lclistpkl':lclist_pklf, 'kwargs':kwargs}, raiseonfail=True ) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done awsutils.sqs_delete_item(in_queue_url, receipt, raiseonfail=True) # delete the light curve file when we're done with it if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) except ClientError: LOGWARNING('queues have disappeared. stopping worker loop') break # if there's any other exception, put a failed response into the # output bucket and queue except Exception: LOGEXCEPTION('could not process input from queue') if 'lc_filename' in locals(): with open('failed-checkplot-%s.pkl' % lc_filename,'wb') as outfd: pickle.dump( {'in_queue_url':in_queue_url, 'target':target, 'lc_filename':lc_filename, 'lclistpkl':lclist_pklf, 'kwargs':kwargs, 'outbucket':outbucket, 'out_queue_url':out_queue_url}, outfd, pickle.HIGHEST_PROTOCOL ) put_url = awsutils.s3_put_file( 'failed-checkplot-%s.pkl' % lc_filename, outbucket, client=s3_client ) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'cpf':put_url, 'lc_filename':lc_filename, 'lclistpkl':lclist_pklf, 'kwargs':kwargs}, raiseonfail=True ) if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done awsutils.sqs_delete_item(in_queue_url, receipt, raiseonfail=True) # a keyboard interrupt kills the loop except KeyboardInterrupt: LOGWARNING('breaking out of the processing loop.') break # if the queues disappear, then the producer loop is done and we should # exit except ClientError: LOGWARNING('queues have disappeared. stopping worker loop') break # any other exception continues the loop we'll write the output file to # the output S3 bucket (and any optional output queue), but add a # failed-* prefix to it to indicate that processing failed. FIXME: could # use a dead-letter queue for this instead except Exception: LOGEXCEPTION('could not process input from queue') if 'lc_filename' in locals(): with open('failed-checkplot-%s.pkl' % lc_filename,'wb') as outfd: pickle.dump( {'in_queue_url':in_queue_url, 'target':target, 'lclistpkl':lclist_pklf, 'kwargs':kwargs, 'outbucket':outbucket, 'out_queue_url':out_queue_url}, outfd, pickle.HIGHEST_PROTOCOL ) put_url = awsutils.s3_put_file( 'failed-checkplot-%s.pkl' % lc_filename, outbucket, client=s3_client ) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'cpf':put_url, 'lclistpkl':lclist_pklf, 'kwargs':kwargs}, raiseonfail=True ) if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done awsutils.sqs_delete_item(in_queue_url, receipt, raiseonfail=True)
######################### ## PERIOD-FINDER LOOPS ## #########################
[docs]def runpf_producer_loop( lightcurve_list, input_queue, input_bucket, result_queue, result_bucket, pfmethods=('gls','pdm','mav','bls','win'), pfkwargs=({}, {}, {}, {}, {}), extra_runpf_kwargs={'getblssnr':True}, process_list_slice=None, purge_queues_when_done=False, delete_queues_when_done=False, download_when_done=True, save_state_when_done=True, s3_client=None, sqs_client=None ): """This queues up work for period-finders using SQS. Parameters ---------- lightcurve_list : str or list of str This is either a string pointing to a file containing a list of light curves filenames to process or the list itself. The names must correspond to the full filenames of files stored on S3, including all prefixes, but not include the 's3://<bucket name>/' bit (these will be added automatically). input_queue : str This is the name of the SQS queue which will receive processing tasks generated by this function. The queue URL will automatically be obtained from AWS. input_bucket : str The name of the S3 bucket containing the light curve files to process. result_queue : str This is the name of the SQS queue that this function will listen to for messages from the workers as they complete processing on their input elements. This function will attempt to match input sent to the `input_queue` with results coming into the `result_queue` so it knows how many objects have been successfully processed. If this function receives task results that aren't in its own input queue, it will acknowledge them so they complete successfully, but not download them automatically. This handles leftover tasks completing from a previous run of this function. result_bucket : str The name of the S3 bucket which will receive the results from the workers. pfmethods : sequence of str This is a list of period-finder method short names as listed in the `lcproc.periodfinding.PFMETHODS` dict. This is used to tell the worker loop which period-finders to run on the input light curve. pfkwargs : sequence of dicts This contains optional kwargs as dicts to be supplied to all of the period-finder functions listed in `pfmethods`. This should be the same length as that sequence. extra_runpf_kwargs : dict This is a dict of kwargs to be supplied to `runpf` driver function itself. process_list_slice : list This is used to index into the input light curve list so a subset of the full list can be processed in this specific run of this function. Use None for a slice index elem to emulate single slice spec behavior: process_list_slice = [10, None] -> lightcurve_list[10:] process_list_slice = [None, 500] -> lightcurve_list[:500] purge_queues_when_done : bool If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C), all outstanding elements in the input/output queues that have not yet been acknowledged by workers or by this function will be purged. This effectively cancels all outstanding work. delete_queues_when_done : bool If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C'), all outstanding work items will be purged from the input/queues and the queues themselves will be deleted. download_when_done : bool If this is True, the generated periodfinding result pickle for each input work item will be downloaded immediately to the current working directory when the worker functions report they're done with it. save_state_when_done : bool If this is True, will save the current state of the work item queue and the work items acknowledged as completed to a pickle in the current working directory. Call the `runcp_producer_loop_savedstate` function below to resume processing from this saved state later. s3_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its S3 download operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. sqs_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its SQS operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- dict or str Returns the current work state as a dict or str path to the generated work state pickle depending on if `save_state_when_done` is True. """ if not sqs_client: sqs_client = boto3.client('sqs') if not s3_client: s3_client = boto3.client('s3') if isinstance(lightcurve_list, str) and os.path.exists(lightcurve_list): # get the LC list with open(lightcurve_list, 'r') as infd: lclist = infd.readlines() lclist = [x.replace('\n','') for x in lclist if len(x) > 0] if process_list_slice is not None: lclist = lclist[process_list_slice[0]:process_list_slice[1]] lclist = [x[1:] for x in lclist if x.startswith('/')] lclist = ['s3://%s/%s' % (input_bucket, x) for x in lclist] # this handles direct invocation using lists of s3:// urls of light curves elif isinstance(lightcurve_list, list): lclist = lightcurve_list # set up the input and output queues # check if the queues by the input and output names given exist already # if they do, go ahead and use them # if they don't, make new ones. try: inq = sqs_client.get_queue_url(QueueName=input_queue) inq_url = inq['QueueUrl'] LOGINFO('input queue already exists, skipping creation...') except ClientError: inq = awsutils.sqs_create_queue(input_queue, client=sqs_client) inq_url = inq['url'] try: outq = sqs_client.get_queue_url(QueueName=result_queue) outq_url = outq['QueueUrl'] LOGINFO('result queue already exists, skipping creation...') except ClientError: outq = awsutils.sqs_create_queue(result_queue, client=sqs_client) outq_url = outq['url'] LOGINFO('input queue: %s' % inq_url) LOGINFO('output queue: %s' % outq_url) # wait until queues are up LOGINFO('waiting for queues to become ready...') time.sleep(10.0) all_runpf_kwargs = {'pfmethods':pfmethods, 'pfkwargs':pfkwargs} if isinstance(extra_runpf_kwargs, dict): all_runpf_kwargs.update(extra_runpf_kwargs) # enqueue the work items for lc in lclist: this_item = { 'target': lc, 'action': 'runpf', 'args': ('.',), 'kwargs':all_runpf_kwargs, 'outbucket': result_bucket, 'outqueue': outq_url } resp = awsutils.sqs_put_item(inq_url, this_item, client=sqs_client) if resp: LOGINFO('sent %s to queue: %s' % (lc, inq_url)) # now block until all objects are done done_objects = {} LOGINFO('all items queued, waiting for results...') # listen to the kill and term signals and raise KeyboardInterrupt when # called signal.signal(signal.SIGINT, kill_handler) signal.signal(signal.SIGTERM, kill_handler) while len(list(done_objects.keys())) < len(lclist): try: result = awsutils.sqs_get_item(outq_url, client=sqs_client) if result is not None and len(result) > 0: recv = result[0] try: processed_object = recv['item']['target'] except KeyError: LOGWARNING('unknown target in received item: %s' % recv) processed_object = 'unknown-lc' pfresult = recv['item']['pfresult'] receipt = recv['receipt_handle'] if processed_object in lclist: if processed_object not in done_objects: done_objects[processed_object] = [pfresult] else: done_objects[processed_object].append(pfresult) LOGINFO('done with %s -> %s' % (processed_object, pfresult)) if download_when_done: getobj = awsutils.s3_get_url( pfresult, client=s3_client ) LOGINFO('downloaded %s -> %s' % (pfresult, getobj)) else: LOGWARNING('processed object returned is not in ' 'queued target list, probably from an ' 'earlier run. accepting but not downloading.') awsutils.sqs_delete_item(outq_url, receipt) except KeyboardInterrupt: LOGWARNING('breaking out of runpf producer wait-loop') break # delete the input and output queues when we're done LOGINFO('done with processing.') time.sleep(1.0) if purge_queues_when_done: LOGWARNING('purging queues at exit, please wait 10 seconds...') sqs_client.purge_queue(QueueUrl=inq_url) sqs_client.purge_queue(QueueUrl=outq_url) time.sleep(10.0) if delete_queues_when_done: LOGWARNING('deleting queues at exit') awsutils.sqs_delete_queue(inq_url) awsutils.sqs_delete_queue(outq_url) work_state = { 'done': done_objects, 'in_progress': list(set(lclist) - set(done_objects.keys())), 'args':((os.path.abspath(lightcurve_list) if isinstance(lightcurve_list, str) else lightcurve_list), input_queue, input_bucket, result_queue, result_bucket), 'kwargs':{'pfmethods':pfmethods, 'pfkwargs':pfkwargs, 'extra_runpf_kwargs':extra_runpf_kwargs, 'process_list_slice':process_list_slice, 'purge_queues_when_done':purge_queues_when_done, 'delete_queues_when_done':delete_queues_when_done, 'download_when_done':download_when_done, 'save_state_when_done':save_state_when_done} } if save_state_when_done: with open('runpf-queue-producer-loop-state.pkl','wb') as outfd: pickle.dump(work_state, outfd, pickle.HIGHEST_PROTOCOL) # at the end, return the done_objects dict # also return the list of unprocessed items if any return work_state
[docs]def runpf_consumer_loop( in_queue_url, workdir, lc_altexts=('',), wait_time_seconds=5, shutdown_check_timer_seconds=60.0, sqs_client=None, s3_client=None ): """This runs period-finding in a loop until interrupted. Consumes work task items from an input queue set up by `runpf_producer_loop` above. Parameters ---------- in_queue_url : str The SQS URL of the input queue to listen to for work assignment messages. The task orders will include the input and output S3 bucket names, as well as the URL of the output queue to where this function will report its work-complete or work-failed status. workdir : str The directory on the local machine where this worker loop will download the input light curves, process them, and produce its output periodfinding result pickles. These will then be uploaded to the specified S3 output bucket, and then deleted from the local disk. lc_altexts : sequence of str If not None, this is a sequence of alternate extensions to try for the input light curve file other than the one provided in the input task order. For example, to get anything that's an .sqlite where .sqlite.gz is expected, use altexts=[''] to strip the .gz. wait_time_seconds : int The amount of time to wait in the input SQS queue for an input task order. If this timeout expires and no task has been received, this function goes back to the top of the work loop. shutdown_check_timer_seconds : float The amount of time to wait before checking for a pending EC2 shutdown message for the instance this worker loop is operating on. If a shutdown is noticed, the worker loop is cancelled in preparation for instance shutdown. sqs_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its SQS operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. s3_client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its S3 operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- Nothing. """ if not sqs_client: sqs_client = boto3.client('sqs') if not s3_client: s3_client = boto3.client('s3') # listen to the kill and term signals and raise KeyboardInterrupt when # called signal.signal(signal.SIGINT, kill_handler) signal.signal(signal.SIGTERM, kill_handler) shutdown_last_time = time.monotonic() while True: curr_time = time.monotonic() if (curr_time - shutdown_last_time) > shutdown_check_timer_seconds: shutdown_check = shutdown_check_handler() if shutdown_check: LOGWARNING('instance will die soon, breaking loop') break shutdown_last_time = time.monotonic() try: # receive a single message from the inqueue work = awsutils.sqs_get_item(in_queue_url, client=sqs_client, raiseonfail=True) # JSON deserialize the work item if work is not None and len(work) > 0: recv = work[0] # skip any messages that don't tell us to runpf action = recv['item']['action'] if action != 'runpf': continue target = recv['item']['target'] args = recv['item']['args'] kwargs = recv['item']['kwargs'] outbucket = recv['item']['outbucket'] if 'outqueue' in recv['item']: out_queue_url = recv['item']['outqueue'] else: out_queue_url = None receipt = recv['receipt_handle'] # download the target from S3 to a file in the work directory try: lc_filename = awsutils.s3_get_url( target, altexts=lc_altexts, client=s3_client ) runpf_args = (lc_filename, args[0]) # now runpf pfresult = runpf( *runpf_args, **kwargs ) if pfresult and os.path.exists(pfresult): LOGINFO('runpf OK for LC: %s -> %s' % (lc_filename, pfresult)) # check if the file exists already because it's been # processed somewhere else resp = s3_client.list_objects_v2( Bucket=outbucket, MaxKeys=1, Prefix=pfresult ) outbucket_list = resp.get('Contents',[]) if outbucket_list and len(outbucket_list) > 0: LOGWARNING( 'not uploading pfresult for %s because ' 'it exists in the output bucket already' % target ) awsutils.sqs_delete_item(in_queue_url, receipt) continue put_url = awsutils.s3_put_file(pfresult, outbucket, client=s3_client) if put_url is not None: LOGINFO('result uploaded to %s' % put_url) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'pfresult':put_url, 'target': target, 'lc_filename':lc_filename, 'kwargs':kwargs}, raiseonfail=True ) # delete the result from the local directory os.remove(pfresult) # if the upload fails, don't acknowledge the # message. might be a temporary S3 failure, so # another worker might succeed later. # FIXME: add SNS bits to warn us of failures else: LOGERROR('failed to upload %s to S3' % pfresult) os.remove(pfresult) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done and successful awsutils.sqs_delete_item(in_queue_url, receipt) # delete the light curve file when we're done with it if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) # if runcp failed outright, don't requeue. instead, write a # ('failed-checkplot-%s.pkl' % lc_filename) file to the # output S3 bucket. else: LOGWARNING('runpf failed for LC: %s' % (lc_filename,)) with open('failed-periodfinding-%s.pkl' % lc_filename, 'wb') as outfd: pickle.dump( {'in_queue_url':in_queue_url, 'target':target, 'lc_filename':lc_filename, 'kwargs':kwargs, 'outbucket':outbucket, 'out_queue_url':out_queue_url}, outfd, pickle.HIGHEST_PROTOCOL ) put_url = awsutils.s3_put_file( 'failed-periodfinding-%s.pkl' % lc_filename, outbucket, client=s3_client ) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'pfresult':put_url, 'lc_filename':lc_filename, 'kwargs':kwargs}, raiseonfail=True ) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done awsutils.sqs_delete_item(in_queue_url, receipt, raiseonfail=True) # delete the light curve file when we're done with it if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) except ClientError: LOGWARNING('queues have disappeared. stopping worker loop') break # if there's any other exception, put a failed response into the # output bucket and queue except Exception: LOGEXCEPTION('could not process input from queue') if 'lc_filename' in locals(): with open('failed-periodfinding-%s.pkl' % lc_filename,'wb') as outfd: pickle.dump( {'in_queue_url':in_queue_url, 'target':target, 'lc_filename':lc_filename, 'kwargs':kwargs, 'outbucket':outbucket, 'out_queue_url':out_queue_url}, outfd, pickle.HIGHEST_PROTOCOL ) put_url = awsutils.s3_put_file( 'failed-periodfinding-%s.pkl' % lc_filename, outbucket, client=s3_client ) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'pfresult':put_url, 'lc_filename':lc_filename, 'kwargs':kwargs}, raiseonfail=True ) # delete the light curve file when we're done with it if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done awsutils.sqs_delete_item(in_queue_url, receipt, raiseonfail=True) # a keyboard interrupt kills the loop except KeyboardInterrupt: LOGWARNING('breaking out of the processing loop.') break # if the queues disappear, then the producer loop is done and we should # exit except ClientError: LOGWARNING('queues have disappeared. stopping worker loop') break # any other exception continues the loop we'll write the output file to # the output S3 bucket (and any optional output queue), but add a # failed-* prefix to it to indicate that processing failed. FIXME: could # use a dead-letter queue for this instead except Exception: LOGEXCEPTION('could not process input from queue') if 'lc_filename' in locals(): with open('failed-periodfinding-%s.pkl' % lc_filename,'wb') as outfd: pickle.dump( {'in_queue_url':in_queue_url, 'target':target, 'kwargs':kwargs, 'outbucket':outbucket, 'out_queue_url':out_queue_url}, outfd, pickle.HIGHEST_PROTOCOL ) put_url = awsutils.s3_put_file( 'failed-periodfinding-%s.pkl' % lc_filename, outbucket, client=s3_client ) # put the S3 URL of the output into the output # queue if requested if out_queue_url is not None: awsutils.sqs_put_item( out_queue_url, {'cpf':put_url, 'kwargs':kwargs}, raiseonfail=True ) if ( (lc_filename is not None) and (os.path.exists(lc_filename)) ): os.remove(lc_filename) # delete the input item from the input queue to # acknowledge its receipt and indicate that # processing is done awsutils.sqs_delete_item(in_queue_url, receipt, raiseonfail=True)