astrobase.lcproc.awsrun module

This contains lcproc worker loops useful for AWS processing of light curves.

The basic workflow is:

LCs from S3 -> SQS -> worker loop -> products back to S3 | result JSON to SQS

All functions here assume AWS credentials have been loaded already using awscli as described at:

https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html

General recommendations:

  • use t3.medium or t3.micro instances for runcp_consumer_loop. Checkplot making isn’t really a CPU intensive activity, so using these will be cheaper.
  • use c5.2xlarge or above instances for runpf_consumer_loop. Period-finders require a decent number of fast cores, so a spot fleet of these instances should be cost-effective.
  • you may want a t3.micro instance running in the same region and VPC as your worker node instances to serve as a head node driving the producer_loop functions. This can be done from a machine outside AWS, but you’ll incur (probably tiny) charges for network egress from the output queues.
  • It’s best not to download results from S3 as soon as they’re produced. Leave them on S3 until everything is done, then use rclone (https://rclone.org) to sync them back to your machines using –transfers <large number>.

The user_data and instance_user_data kwargs for the make_ec2_nodes and make_spot_fleet_cluster functions can be used to start processing loops as soon as EC2 brings up the VM instance. This is especially useful for spot fleets set to maintain a target capacity, since worker nodes will be terminated and automatically replaced. Bringing up the processing loop at instance start up makes it easy to continue processing light curves exactly where you left off without having to manually intervene.

Example script for user_data bringing up a checkplot-making loop on instance creation (assuming we’re using Amazon Linux 2):

#!/bin/bash

cat << 'EOF' > /home/ec2-user/launch-runcp.sh
#!/bin/bash
sudo yum -y install python3-devel gcc-gfortran jq htop emacs-nox git

# create the virtualenv
python3 -m venv /home/ec2-user/py3

# get astrobase
cd /home/ec2-user
git clone https://github.com/waqasbhatti/astrobase

# install it
cd /home/ec2-user/astrobase
/home/ec2-user/py3/bin/pip install pip setuptools numpy -U
/home/ec2-user/py3/bin/pip install -e .[aws]

# make the work dir
mkdir /home/ec2-user/work
cd /home/ec2-user/work

# wait a bit for the instance info to be populated
sleep 5

# set some environ vars for boto3 and the processing loop
export AWS_DEFAULT_REGION=`curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document/ | jq '.region' | tr -d '"'`
export NCPUS=`lscpu -J | jq ".lscpu[3].data|tonumber"`

# launch the processor loops
for s in `seq $NCPUS`; do nohup /home/ec2-user/py3/bin/python3 -u -c "from astrobase.lcproc import awsrun as lcp; lcp.runcp_consumer_loop('https://queue-url','.','s3://path/to/lclist.pkl')" > runcp-$s-loop.out & done
EOF

# run the script we just created as ec2-user
chown ec2-user /home/ec2-user/launch-runcp.sh
su ec2-user -c 'bash /home/ec2-user/launch-runcp.sh'

Here’s a similar script for a runpf consumer loop. We launch only a single instance of the loop because runpf will use all CPUs by default for its period-finder parallelized functions:

#!/bin/bash

cat << 'EOF' > /home/ec2-user/launch-runpf.sh
#!/bin/bash
sudo yum -y install python3-devel gcc-gfortran jq htop emacs-nox git

python3 -m venv /home/ec2-user/py3

cd /home/ec2-user
git clone https://github.com/waqasbhatti/astrobase

cd /home/ec2-user/astrobase
/home/ec2-user/py3/bin/pip install pip setuptools numpy -U
/home/ec2-user/py3/bin/pip install -e .[aws]

mkdir /home/ec2-user/work
cd /home/ec2-user/work

# wait a bit for the instance info to be populated
sleep 5

export AWS_DEFAULT_REGION=`curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document/ | jq '.region' | tr -d '"'`
export NCPUS=`lscpu -J | jq ".lscpu[3].data|tonumber"`

# launch the processes
nohup /home/ec2-user/py3/bin/python3 -u -c "from astrobase.lcproc import awsrun as lcp; lcp.runpf_consumer_loop('https://input-queue-url','.')" > runpf-loop.out &
EOF

chown ec2-user /home/ec2-user/launch-runpf.sh
su ec2-user -c 'bash /home/ec2-user/launch-runpf.sh'
astrobase.lcproc.awsrun.kill_handler(sig, frame)[source]

This raises a KeyboardInterrupt when a SIGKILL comes in.

This is a handle for use with the Python signal.signal function.

astrobase.lcproc.awsrun.cache_clean_handler(min_age_hours=1)[source]

This periodically cleans up the ~/.astrobase cache to save us from disk-space doom.

Parameters:min_age_hours (int) – Files older than this number of hours from the current time will be deleted.
Returns:
Return type:Nothing.
astrobase.lcproc.awsrun.shutdown_check_handler()[source]

This checks the AWS instance data URL to see if there’s a pending shutdown for the instance.

This is useful for AWS spot instances. If there is a pending shutdown posted to the instance data URL, we’ll use the result of this function break out of the processing loop and shut everything down ASAP before the instance dies.

Returns:
  • True if the instance is going to die soon.
  • False if the instance is still safe.
Return type:bool
astrobase.lcproc.awsrun.runcp_producer_loop(lightcurve_list, input_queue, input_bucket, result_queue, result_bucket, pfresult_list=None, runcp_kwargs=None, process_list_slice=None, purge_queues_when_done=False, delete_queues_when_done=False, download_when_done=True, save_state_when_done=True, s3_client=None, sqs_client=None)[source]

This sends checkplot making tasks to the input queue and monitors the result queue for task completion.

Parameters:
  • lightcurve_list (str or list of str) – This is either a string pointing to a file containing a list of light curves filenames to process or the list itself. The names must correspond to the full filenames of files stored on S3, including all prefixes, but not include the ‘s3://<bucket name>/’ bit (these will be added automatically).
  • input_queue (str) – This is the name of the SQS queue which will receive processing tasks generated by this function. The queue URL will automatically be obtained from AWS.
  • input_bucket (str) – The name of the S3 bucket containing the light curve files to process.
  • result_queue (str) – This is the name of the SQS queue that this function will listen to for messages from the workers as they complete processing on their input elements. This function will attempt to match input sent to the input_queue with results coming into the result_queue so it knows how many objects have been successfully processed. If this function receives task results that aren’t in its own input queue, it will acknowledge them so they complete successfully, but not download them automatically. This handles leftover tasks completing from a previous run of this function.
  • result_bucket (str) – The name of the S3 bucket which will receive the results from the workers.
  • pfresult_list (list of str or None) – This is a list of periodfinder result pickle S3 URLs associated with each light curve. If provided, this will be used to add in phased light curve plots to each checkplot pickle. If this is None, the worker loop will produce checkplot pickles that only contain object information, neighbor information, and unphased light curves.
  • runcp_kwargs (dict) – This is a dict used to pass any extra keyword arguments to the lcproc.checkplotgen.runcp function that will be run by the worker loop.
  • process_list_slice (list) –

    This is used to index into the input light curve list so a subset of the full list can be processed in this specific run of this function.

    Use None for a slice index elem to emulate single slice spec behavior:

    process_list_slice = [10, None] -> lightcurve_list[10:] process_list_slice = [None, 500] -> lightcurve_list[:500]

  • purge_queues_when_done (bool) – If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C), all outstanding elements in the input/output queues that have not yet been acknowledged by workers or by this function will be purged. This effectively cancels all outstanding work.
  • delete_queues_when_done (bool) – If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C’), all outstanding work items will be purged from the input/queues and the queues themselves will be deleted.
  • download_when_done (bool) – If this is True, the generated checkplot pickle for each input work item will be downloaded immediately to the current working directory when the worker functions report they’re done with it.
  • save_state_when_done (bool) – If this is True, will save the current state of the work item queue and the work items acknowledged as completed to a pickle in the current working directory. Call the runcp_producer_loop_savedstate function below to resume processing from this saved state later.
  • s3_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its S3 download operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
  • sqs_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its SQS operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
Returns:

Returns the current work state as a dict or str path to the generated work state pickle depending on if save_state_when_done is True.

Return type:

dict or str

astrobase.lcproc.awsrun.runcp_producer_loop_savedstate(use_saved_state=None, lightcurve_list=None, input_queue=None, input_bucket=None, result_queue=None, result_bucket=None, pfresult_list=None, runcp_kwargs=None, process_list_slice=None, download_when_done=True, purge_queues_when_done=True, save_state_when_done=True, delete_queues_when_done=False, s3_client=None, sqs_client=None)[source]

This wraps the function above to allow for loading previous state from a file.

Parameters:
  • use_saved_state (str or None) – This is the path to the saved state pickle file produced by a previous run of runcp_producer_loop. Will get all of the arguments to run another instance of the loop from that pickle file. If this is None, you MUST provide all of the appropriate arguments to that function.
  • lightcurve_list (str or list of str or None) – This is either a string pointing to a file containing a list of light curves filenames to process or the list itself. The names must correspond to the full filenames of files stored on S3, including all prefixes, but not include the ‘s3://<bucket name>/’ bit (these will be added automatically).
  • input_queue (str or None) – This is the name of the SQS queue which will receive processing tasks generated by this function. The queue URL will automatically be obtained from AWS.
  • input_bucket (str or None) – The name of the S3 bucket containing the light curve files to process.
  • result_queue (str or None) – This is the name of the SQS queue that this function will listen to for messages from the workers as they complete processing on their input elements. This function will attempt to match input sent to the input_queue with results coming into the result_queue so it knows how many objects have been successfully processed. If this function receives task results that aren’t in its own input queue, it will acknowledge them so they complete successfully, but not download them automatically. This handles leftover tasks completing from a previous run of this function.
  • result_bucket (str or None) – The name of the S3 bucket which will receive the results from the workers.
  • pfresult_list (list of str or None) – This is a list of periodfinder result pickle S3 URLs associated with each light curve. If provided, this will be used to add in phased light curve plots to each checkplot pickle. If this is None, the worker loop will produce checkplot pickles that only contain object information, neighbor information, and unphased light curves.
  • runcp_kwargs (dict or None) – This is a dict used to pass any extra keyword arguments to the lcproc.checkplotgen.runcp function that will be run by the worker loop.
  • process_list_slice (list or None) –

    This is used to index into the input light curve list so a subset of the full list can be processed in this specific run of this function.

    Use None for a slice index elem to emulate single slice spec behavior:

    process_list_slice = [10, None] -> lightcurve_list[10:] process_list_slice = [None, 500] -> lightcurve_list[:500]

  • purge_queues_when_done (bool or None) – If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C), all outstanding elements in the input/output queues that have not yet been acknowledged by workers or by this function will be purged. This effectively cancels all outstanding work.
  • delete_queues_when_done (bool or None) – If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C’), all outstanding work items will be purged from the input/queues and the queues themselves will be deleted.
  • download_when_done (bool or None) – If this is True, the generated checkplot pickle for each input work item will be downloaded immediately to the current working directory when the worker functions report they’re done with it.
  • save_state_when_done (bool or None) – If this is True, will save the current state of the work item queue and the work items acknowledged as completed to a pickle in the current working directory. Call the runcp_producer_loop_savedstate function below to resume processing from this saved state later.
  • s3_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its S3 download operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
  • sqs_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its SQS operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
Returns:

Returns the current work state as a dict or str path to the generated work state pickle depending on if save_state_when_done is True.

Return type:

dict or str

astrobase.lcproc.awsrun.runcp_consumer_loop(in_queue_url, workdir, lclist_pkl_s3url, lc_altexts=('', ), wait_time_seconds=5, cache_clean_timer_seconds=3600.0, shutdown_check_timer_seconds=60.0, sqs_client=None, s3_client=None)[source]

This runs checkplot pickle making in a loop until interrupted.

Consumes work task items from an input queue set up by runcp_producer_loop above. For the moment, we don’t generate neighbor light curves since this would require a lot more S3 calls.

Parameters:
  • in_queue_url (str) – The SQS URL of the input queue to listen to for work assignment messages. The task orders will include the input and output S3 bucket names, as well as the URL of the output queue to where this function will report its work-complete or work-failed status.
  • workdir (str) – The directory on the local machine where this worker loop will download the input light curves and associated period-finder results (if any), process them, and produce its output checkplot pickles. These will then be uploaded to the specified S3 output bucket and then deleted from the workdir when the upload is confirmed to make it safely to S3.
  • lclist_pkl (str) – S3 URL of a catalog pickle generated by lcproc.catalogs.make_lclist that contains objectids and coordinates, as well as a kdtree for all of the objects in the current light curve collection being processed. This is used to look up neighbors for each object being processed.
  • lc_altexts (sequence of str) – If not None, this is a sequence of alternate extensions to try for the input light curve file other than the one provided in the input task order. For example, to get anything that’s an .sqlite where .sqlite.gz is expected, use altexts=[‘’] to strip the .gz.
  • wait_time_seconds (int) – The amount of time to wait in the input SQS queue for an input task order. If this timeout expires and no task has been received, this function goes back to the top of the work loop.
  • cache_clean_timer_seconds (float) – The amount of time in seconds to wait before periodically removing old files (such as finder chart FITS, external service result pickles) from the astrobase cache directory. These accumulate as the work items are processed, and take up significant space, so must be removed periodically.
  • shutdown_check_timer_seconds (float) – The amount of time to wait before checking for a pending EC2 shutdown message for the instance this worker loop is operating on. If a shutdown is noticed, the worker loop is cancelled in preparation for instance shutdown.
  • sqs_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its SQS operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
  • s3_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its S3 operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
Returns:

Return type:

Nothing.

astrobase.lcproc.awsrun.runpf_producer_loop(lightcurve_list, input_queue, input_bucket, result_queue, result_bucket, pfmethods=('gls', 'pdm', 'mav', 'bls', 'win'), pfkwargs=({}, {}, {}, {}, {}), extra_runpf_kwargs={'getblssnr': True}, process_list_slice=None, purge_queues_when_done=False, delete_queues_when_done=False, download_when_done=True, save_state_when_done=True, s3_client=None, sqs_client=None)[source]

This queues up work for period-finders using SQS.

Parameters:
  • lightcurve_list (str or list of str) – This is either a string pointing to a file containing a list of light curves filenames to process or the list itself. The names must correspond to the full filenames of files stored on S3, including all prefixes, but not include the ‘s3://<bucket name>/’ bit (these will be added automatically).
  • input_queue (str) – This is the name of the SQS queue which will receive processing tasks generated by this function. The queue URL will automatically be obtained from AWS.
  • input_bucket (str) – The name of the S3 bucket containing the light curve files to process.
  • result_queue (str) – This is the name of the SQS queue that this function will listen to for messages from the workers as they complete processing on their input elements. This function will attempt to match input sent to the input_queue with results coming into the result_queue so it knows how many objects have been successfully processed. If this function receives task results that aren’t in its own input queue, it will acknowledge them so they complete successfully, but not download them automatically. This handles leftover tasks completing from a previous run of this function.
  • result_bucket (str) – The name of the S3 bucket which will receive the results from the workers.
  • pfmethods (sequence of str) – This is a list of period-finder method short names as listed in the lcproc.periodfinding.PFMETHODS dict. This is used to tell the worker loop which period-finders to run on the input light curve.
  • pfkwargs (sequence of dicts) – This contains optional kwargs as dicts to be supplied to all of the period-finder functions listed in pfmethods. This should be the same length as that sequence.
  • extra_runpf_kwargs (dict) – This is a dict of kwargs to be supplied to runpf driver function itself.
  • process_list_slice (list) –

    This is used to index into the input light curve list so a subset of the full list can be processed in this specific run of this function.

    Use None for a slice index elem to emulate single slice spec behavior:

    process_list_slice = [10, None] -> lightcurve_list[10:] process_list_slice = [None, 500] -> lightcurve_list[:500]

  • purge_queues_when_done (bool) – If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C), all outstanding elements in the input/output queues that have not yet been acknowledged by workers or by this function will be purged. This effectively cancels all outstanding work.
  • delete_queues_when_done (bool) – If this is True, and this function exits (either when all done, or when it is interrupted with a Ctrl+C’), all outstanding work items will be purged from the input/queues and the queues themselves will be deleted.
  • download_when_done (bool) – If this is True, the generated periodfinding result pickle for each input work item will be downloaded immediately to the current working directory when the worker functions report they’re done with it.
  • save_state_when_done (bool) – If this is True, will save the current state of the work item queue and the work items acknowledged as completed to a pickle in the current working directory. Call the runcp_producer_loop_savedstate function below to resume processing from this saved state later.
  • s3_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its S3 download operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
  • sqs_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its SQS operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
Returns:

Returns the current work state as a dict or str path to the generated work state pickle depending on if save_state_when_done is True.

Return type:

dict or str

astrobase.lcproc.awsrun.runpf_consumer_loop(in_queue_url, workdir, lc_altexts=('', ), wait_time_seconds=5, shutdown_check_timer_seconds=60.0, sqs_client=None, s3_client=None)[source]

This runs period-finding in a loop until interrupted.

Consumes work task items from an input queue set up by runpf_producer_loop above.

Parameters:
  • in_queue_url (str) – The SQS URL of the input queue to listen to for work assignment messages. The task orders will include the input and output S3 bucket names, as well as the URL of the output queue to where this function will report its work-complete or work-failed status.
  • workdir (str) – The directory on the local machine where this worker loop will download the input light curves, process them, and produce its output periodfinding result pickles. These will then be uploaded to the specified S3 output bucket, and then deleted from the local disk.
  • lc_altexts (sequence of str) – If not None, this is a sequence of alternate extensions to try for the input light curve file other than the one provided in the input task order. For example, to get anything that’s an .sqlite where .sqlite.gz is expected, use altexts=[‘’] to strip the .gz.
  • wait_time_seconds (int) – The amount of time to wait in the input SQS queue for an input task order. If this timeout expires and no task has been received, this function goes back to the top of the work loop.
  • shutdown_check_timer_seconds (float) – The amount of time to wait before checking for a pending EC2 shutdown message for the instance this worker loop is operating on. If a shutdown is noticed, the worker loop is cancelled in preparation for instance shutdown.
  • sqs_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its SQS operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
  • s3_client (boto3.Client or None) – If None, this function will instantiate a new boto3.Client object to use in its S3 operations. Alternatively, pass in an existing boto3.Client instance to re-use it here.
Returns:

Return type:

Nothing.