Source code for astrobase.awsutils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# awsutils.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Oct 2018
# License: MIT - see the LICENSE file for the full text.

"""
This contains functions that handle various AWS services for use with
lcproc_aws.py.

"""

#############
## LOGGING ##
#############

import logging
from astrobase import log_sub, log_fmt, log_date_fmt

DEBUG = False
if DEBUG:
    level = logging.DEBUG
else:
    level = logging.INFO
LOGGER = logging.getLogger(__name__)
logging.basicConfig(
    level=level,
    style=log_sub,
    format=log_fmt,
    datefmt=log_date_fmt,
)

LOGDEBUG = LOGGER.debug
LOGINFO = LOGGER.info
LOGWARNING = LOGGER.warning
LOGERROR = LOGGER.error
LOGEXCEPTION = LOGGER.exception


#############
## IMPORTS ##
#############

import copy
import os.path
import os
import json
import time
from datetime import datetime, timedelta
import base64

try:

    import boto3
    from botocore.exceptions import ClientError
    import paramiko
    import paramiko.client

except ImportError:
    raise ImportError(
        "This module requires the boto3 and paramiko packages from PyPI. "
        "You'll also need the awscli package to set up the "
        "AWS secret key config for this module."
    )


#############################
## SSHING TO EC2 INSTANCES ##
#############################

[docs]def ec2_ssh(ip_address, keypem_file, username='ec2-user', raiseonfail=False): """This opens an SSH connection to the EC2 instance at `ip_address`. Parameters ---------- ip_address : str IP address of the AWS EC2 instance to connect to. keypem_file : str The path to the keypair PEM file generated by AWS to allow SSH connections. username : str The username to use to login to the EC2 instance. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- paramiko.SSHClient This has all the usual `paramiko` functionality: - Use `SSHClient.exec_command(command, environment=None)` to exec a shell command. - Use `SSHClient.open_sftp()` to get a `SFTPClient` for the server. Then call SFTPClient.get() and .put() to copy files from and to the server. """ c = paramiko.client.SSHClient() c.load_system_host_keys() c.set_missing_host_key_policy(paramiko.client.AutoAddPolicy) # load the private key from the AWS keypair pem privatekey = paramiko.RSAKey.from_private_key_file(keypem_file) # connect to the server try: c.connect(ip_address, pkey=privatekey, username='ec2-user') return c except Exception: LOGEXCEPTION('could not connect to EC2 instance at %s ' 'using keyfile: %s and user: %s' % (ip_address, keypem_file, username)) if raiseonfail: raise return None
######## ## S3 ## ########
[docs]def s3_get_file(bucket, filename, local_file, altexts=None, client=None, raiseonfail=False): """This gets a file from an S3 bucket. Parameters ---------- bucket : str The AWS S3 bucket name. filename : str The full filename of the file to get from the bucket local_file : str Path to where the downloaded file will be stored. altexts : None or list of str If not None, this is a list of alternate extensions to try for the file other than the one provided in `filename`. For example, to get anything that's an .sqlite where .sqlite.gz is expected, use altexts=[''] to strip the .gz. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- str Path to the downloaded filename or None if the download was unsuccessful. """ if not client: client = boto3.client('s3') try: client.download_file(bucket, filename, local_file) return local_file except Exception: if altexts is not None: for alt_extension in altexts: split_ext = os.path.splitext(filename) check_file = split_ext[0] + alt_extension try: client.download_file( bucket, check_file, local_file.replace(split_ext[-1], alt_extension) ) return local_file.replace(split_ext[-1], alt_extension) except Exception: pass else: LOGEXCEPTION('could not download s3://%s/%s' % (bucket, filename)) if raiseonfail: raise return None
[docs]def s3_get_url(url, altexts=None, client=None, raiseonfail=False): """This gets a file from an S3 bucket based on its s3:// URL. Parameters ---------- url : str S3 URL to download. This should begin with 's3://'. altexts : None or list of str If not None, this is a list of alternate extensions to try for the file other than the one provided in `filename`. For example, to get anything that's an .sqlite where .sqlite.gz is expected, use altexts=[''] to strip the .gz. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- str Path to the downloaded filename or None if the download was unsuccessful. The file will be downloaded into the current working directory and will have a filename == basename of the file on S3. """ bucket_item = url.replace('s3://','') bucket_item = bucket_item.split('/') bucket = bucket_item[0] filekey = '/'.join(bucket_item[1:]) return s3_get_file(bucket, filekey, bucket_item[-1], altexts=altexts, client=client, raiseonfail=raiseonfail)
[docs]def s3_put_file(local_file, bucket, client=None, raiseonfail=False): """This uploads a file to S3. Parameters ---------- local_file : str Path to the file to upload to S3. bucket : str The AWS S3 bucket to upload the file to. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- str or None If the file upload is successful, returns the s3:// URL of the uploaded file. If it failed, will return None. """ if not client: client = boto3.client('s3') try: client.upload_file(local_file, bucket, os.path.basename(local_file)) return 's3://%s/%s' % (bucket, os.path.basename(local_file)) except Exception: LOGEXCEPTION('could not upload %s to bucket: %s' % (local_file, bucket)) if raiseonfail: raise return None
[docs]def s3_delete_file(bucket, filename, client=None, raiseonfail=False): """This deletes a file from S3. Parameters ---------- bucket : str The AWS S3 bucket to delete the file from. filename : str The full file name of the file to delete, including any prefixes. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- str or None If the file was successfully deleted, will return the delete-marker (https://docs.aws.amazon.com/AmazonS3/latest/dev/DeleteMarker.html). If it wasn't, returns None """ if not client: client = boto3.client('s3') try: resp = client.delete_object(Bucket=bucket, Key=filename) if not resp: LOGERROR('could not delete file %s from bucket %s' % (filename, bucket)) else: return resp['DeleteMarker'] except Exception: LOGEXCEPTION('could not delete file %s from bucket %s' % (filename, bucket)) if raiseonfail: raise return None
######### ## SQS ## #########
[docs]def sqs_create_queue(queue_name, options=None, client=None): """ This creates an SQS queue. Parameters ---------- queue_name : str The name of the queue to create. options : dict or None A dict of options indicate extra attributes the queue should have. See the SQS docs for details. If None, no custom attributes will be attached to the queue. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- dict This returns a dict of the form:: {'url': SQS URL of the queue, 'name': name of the queue} """ if not client: client = boto3.client('sqs') try: if isinstance(options, dict): resp = client.create_queue(QueueName=queue_name, Attributes=options) else: resp = client.create_queue(QueueName=queue_name) if resp is not None: return {'url':resp['QueueUrl'], 'name':queue_name} else: LOGERROR('could not create the specified queue: %s with options: %s' % (queue_name, options)) return None except Exception: LOGEXCEPTION('could not create the specified queue: %s with options: %s' % (queue_name, options)) return None
[docs]def sqs_delete_queue(queue_url, client=None): """This deletes an SQS queue given its URL Parameters ---------- queue_url : str The SQS URL of the queue to delete. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- bool True if the queue was deleted successfully. False otherwise. """ if not client: client = boto3.client('sqs') try: client.delete_queue(QueueUrl=queue_url) return True except Exception: LOGEXCEPTION('could not delete the specified queue: %s' % (queue_url,)) return False
[docs]def sqs_put_item(queue_url, item, delay_seconds=0, client=None, raiseonfail=False): """This pushes a dict serialized to JSON to the specified SQS queue. Parameters ---------- queue_url : str The SQS URL of the queue to push the object to. item : dict The dict passed in here will be serialized to JSON. delay_seconds : int The amount of time in seconds the pushed item will be held before going 'live' and being visible to all queue consumers. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- boto3.Response or None If the item was successfully put on the queue, will return the response from the service. If it wasn't, will return None. """ if not client: client = boto3.client('sqs') try: json_msg = json.dumps(item) resp = client.send_message( QueueUrl=queue_url, MessageBody=json_msg, DelaySeconds=delay_seconds, ) if not resp: LOGERROR('could not send item to queue: %s' % queue_url) return None else: return resp except Exception: LOGEXCEPTION('could not send item to queue: %s' % queue_url) if raiseonfail: raise return None
[docs]def sqs_get_item(queue_url, max_items=1, wait_time_seconds=5, client=None, raiseonfail=False): """This gets a single item from the SQS queue. The `queue_url` is composed of some internal SQS junk plus a `queue_name`. For our purposes (`lcproc_aws.py`), the queue name will be something like:: lcproc_queue_<action> where action is one of:: runcp runpf The item is always a JSON object:: {'target': S3 bucket address of the file to process, 'action': the action to perform on the file ('runpf', 'runcp', etc.) 'args': the action's args as a tuple (not including filename, which is generated randomly as a temporary local file), 'kwargs': the action's kwargs as a dict, 'outbucket: S3 bucket to write the result to, 'outqueue': SQS queue to write the processed item's info to (optional)} The action MUST match the <action> in the queue name for this item to be processed. Parameters ---------- queue_url : str The SQS URL of the queue to get messages from. max_items : int The number of items to pull from the queue in this request. wait_time_seconds : int This specifies how long the function should block until a message is received on the queue. If the timeout expires, an empty list will be returned. If the timeout doesn't expire, the function will return a list of items received (up to `max_items`). client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- list of dicts or None For each item pulled from the queue in this request (up to `max_items`), a dict will be deserialized from the retrieved JSON, containing the message items and various metadata. The most important item of the metadata is the `receipt_handle`, which can be used to acknowledge receipt of all items in this request (see `sqs_delete_item` below). If the queue pull fails outright, returns None. If no messages are available for this queue pull, returns an empty list. """ if not client: client = boto3.client('sqs') try: resp = client.receive_message( QueueUrl=queue_url, AttributeNames=['All'], MaxNumberOfMessages=max_items, WaitTimeSeconds=wait_time_seconds ) if not resp: LOGERROR('could not receive messages from queue: %s' % queue_url) else: messages = [] for msg in resp.get('Messages',[]): try: messages.append({ 'id':msg['MessageId'], 'receipt_handle':msg['ReceiptHandle'], 'md5':msg['MD5OfBody'], 'attributes':msg['Attributes'], 'item':json.loads(msg['Body']), }) except Exception: LOGEXCEPTION( 'could not deserialize message ID: %s, body: %s' % (msg['MessageId'], msg['Body']) ) continue return messages except Exception: LOGEXCEPTION('could not get items from queue: %s' % queue_url) if raiseonfail: raise return None
[docs]def sqs_delete_item(queue_url, receipt_handle, client=None, raiseonfail=False): """This deletes a message from the queue, effectively acknowledging its receipt. Call this only when all messages retrieved from the queue have been processed, since this will prevent redelivery of these messages to other queue workers pulling fromn the same queue channel. Parameters ---------- queue_url : str The SQS URL of the queue where we got the messages from. This should be the same queue used to retrieve the messages in `sqs_get_item`. receipt_handle : str The receipt handle of the queue message that we're responding to, and will acknowledge receipt of. This will be present in each message retrieved using `sqs_get_item`. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- Nothing. """ if not client: client = boto3.client('sqs') try: client.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) except Exception: LOGEXCEPTION( 'could not delete message with receipt handle: ' '%s from queue: %s' % (receipt_handle, queue_url) ) if raiseonfail: raise
######### ## EC2 ## ######### SUPPORTED_AMIS = [ # Debian 9 'ami-03006931f694ea7eb', # Amazon Linux 2 'ami-04681a1dbd79675a5', ]
[docs]def make_ec2_nodes( security_groupid, subnet_id, keypair_name, iam_instance_profile_arn, launch_instances=1, ami='ami-04681a1dbd79675a5', instance='t3.micro', ebs_optimized=True, user_data=None, wait_until_up=True, client=None, raiseonfail=False, ): """This makes new EC2 worker nodes. This requires a security group ID attached to a VPC config and subnet, a keypair generated beforehand, and an IAM role ARN for the instance. See: https://docs.aws.amazon.com/cli/latest/userguide/tutorial-ec2-ubuntu.html Use `user_data` to launch tasks on instance launch. Parameters ---------- security_groupid : str The security group ID of the AWS VPC where the instances will be launched. subnet_id : str The subnet ID of the AWS VPC where the instances will be launched. keypair_name : str The name of the keypair to be used to allow SSH access to all instances launched here. This corresponds to an already downloaded AWS keypair PEM file. iam_instance_profile_arn : str The ARN string corresponding to the AWS instance profile that describes the permissions the launched instances have to access other AWS resources. Set this up in AWS IAM. launch_instances : int The number of instances to launch in this request. ami : str The Amazon Machine Image ID that describes the OS the instances will use after launch. The default ID is Amazon Linux 2 in the US East region. instance : str The instance type to launch. See the following URL for a list of IDs: https://aws.amazon.com/ec2/pricing/on-demand/ ebs_optimized : bool If True, will enable EBS optimization to speed up IO. This is usually True for all instances made available in the last couple of years. user_data : str or None This is either the path to a file on disk that contains a shell-script or a string containing a shell-script that will be executed by root right after the instance is launched. Use to automatically set up workers and queues. If None, will not execute anything at instance start up. wait_until_up : bool If True, will not return from this function until all launched instances are verified as running by AWS. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- dict Returns launched instance info as a dict, keyed by instance ID. """ if not client: client = boto3.client('ec2') # get the user data from a string or a file # note: boto3 will base64 encode this itself if isinstance(user_data, str) and os.path.exists(user_data): with open(user_data,'r') as infd: udata = infd.read() elif isinstance(user_data, str): udata = user_data else: udata = ( '#!/bin/bash\necho "No user data provided. ' 'Launched instance at: %s UTC"' % datetime.utcnow().isoformat() ) # fire the request try: resp = client.run_instances( ImageId=ami, InstanceType=instance, SecurityGroupIds=[ security_groupid, ], SubnetId=subnet_id, UserData=udata, IamInstanceProfile={'Arn':iam_instance_profile_arn}, InstanceInitiatedShutdownBehavior='terminate', KeyName=keypair_name, MaxCount=launch_instances, MinCount=launch_instances, EbsOptimized=ebs_optimized, ) if not resp: LOGERROR('could not launch requested instance') return None else: instance_dict = {} instance_list = resp.get('Instances',[]) if len(instance_list) > 0: for instance in instance_list: LOGINFO('launched instance ID: %s of type: %s at: %s. ' 'current state: %s' % (instance['InstanceId'], instance['InstanceType'], instance['LaunchTime'].isoformat(), instance['State']['Name'])) instance_dict[instance['InstanceId']] = { 'type':instance['InstanceType'], 'launched':instance['LaunchTime'], 'state':instance['State']['Name'], 'info':instance } # if we're waiting until we're up, then do so if wait_until_up: ready_instances = [] LOGINFO('waiting until launched instances are up...') ntries = 5 curr_try = 0 while ( (curr_try < ntries) or ( len(ready_instances) < len(list(instance_dict.keys()))) ): resp = client.describe_instances( InstanceIds=list(instance_dict.keys()), ) if len(resp['Reservations']) > 0: for resv in resp['Reservations']: if len(resv['Instances']) > 0: for instance in resv['Instances']: if instance['State']['Name'] == 'running': ready_instances.append( instance['InstanceId'] ) instance_dict[ instance['InstanceId'] ]['state'] = 'running' instance_dict[ instance['InstanceId'] ]['ip'] = instance['PublicIpAddress'] instance_dict[ instance['InstanceId'] ]['info'] = instance # sleep for a bit so we don't hit the API too often curr_try = curr_try + 1 time.sleep(5.0) if len(ready_instances) == len(list(instance_dict.keys())): LOGINFO('all instances now up.') else: LOGWARNING( 'reached maximum number of tries for instance status, ' 'not all instances may be up.' ) return instance_dict except ClientError: LOGEXCEPTION('could not launch requested instance') if raiseonfail: raise return None except Exception: LOGEXCEPTION('could not launch requested instance') if raiseonfail: raise return None
[docs]def delete_ec2_nodes( instance_id_list, client=None ): """This deletes EC2 nodes and terminates the instances. Parameters ---------- instance_id_list : list of str A list of EC2 instance IDs to terminate. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- Nothing. """ if not client: client = boto3.client('ec2') resp = client.terminate_instances( InstanceIds=instance_id_list ) return resp
######################### ## SPOT FLEET CLUSTERS ## ######################### SPOT_FLEET_CONFIG = { "IamFleetRole": "iam-fleet-role-arn", "AllocationStrategy": "lowestPrice", "TargetCapacity": 20, "SpotPrice": "0.4", "TerminateInstancesWithExpiration": True, 'InstanceInterruptionBehavior': 'terminate', "LaunchSpecifications": [], "Type": "maintain", "ReplaceUnhealthyInstances": True, "ValidUntil": "datetime-utc" } SPOT_INSTANCE_TYPES = [ "m5.xlarge", "m5.2xlarge", "c5.xlarge", "c5.2xlarge", "c5.4xlarge", ] SPOT_PERINSTANCE_CONFIG = { "InstanceType": "instance-type", "ImageId": "image-id", "SubnetId": "subnet-id", "KeyName": "keypair-name", "IamInstanceProfile": { "Arn": "instance-profile-role-arn" }, "SecurityGroups": [ { "GroupId": "security-group-id" } ], "UserData":"base64-encoded-userdata", "EbsOptimized":True, }
[docs]def make_spot_fleet_cluster( security_groupid, subnet_id, keypair_name, iam_instance_profile_arn, spot_fleet_iam_role, target_capacity=20, spot_price=0.4, expires_days=7, allocation_strategy='lowestPrice', instance_types=SPOT_INSTANCE_TYPES, instance_weights=None, instance_ami='ami-04681a1dbd79675a5', instance_user_data=None, instance_ebs_optimized=True, wait_until_up=True, client=None, raiseonfail=False ): """This makes an EC2 spot-fleet cluster. This requires a security group ID attached to a VPC config and subnet, a keypair generated beforehand, and an IAM role ARN for the instance. See: https://docs.aws.amazon.com/cli/latest/userguide/tutorial-ec2-ubuntu.html Use `user_data` to launch tasks on instance launch. Parameters ---------- security_groupid : str The security group ID of the AWS VPC where the instances will be launched. subnet_id : str The subnet ID of the AWS VPC where the instances will be launched. keypair_name : str The name of the keypair to be used to allow SSH access to all instances launched here. This corresponds to an already downloaded AWS keypair PEM file. iam_instance_profile_arn : str The ARN string corresponding to the AWS instance profile that describes the permissions the launched instances have to access other AWS resources. Set this up in AWS IAM. spot_fleet_iam_role : str This is the name of AWS IAM role that allows the Spot Fleet Manager to scale up and down instances based on demand and instances failing, etc. Set this up in IAM. target_capacity : int The number of instances to target in the fleet request. The fleet manager service will attempt to maintain this number over the lifetime of the Spot Fleet Request. spot_price : float The bid price in USD for the instances. This is per hour. Keep this at about half the hourly on-demand price of the desired instances to make sure your instances aren't taken away by AWS when it needs capacity. expires_days : int The number of days this request is active for. All instances launched by this request will live at least this long and will be terminated automatically after. allocation_strategy : {'lowestPrice', 'diversified'} The allocation strategy used by the fleet manager. instance_types : list of str List of the instance type to launch. See the following URL for a list of IDs: https://aws.amazon.com/ec2/pricing/on-demand/ instance_weights : list of float or None If `instance_types` is a list of different instance types, this is the relative weight applied towards launching each instance type. This can be used to launch a mix of instances in a defined ratio among their types. Doing this can make the spot fleet more resilient to AWS taking back the instances if it runs out of capacity. instance_ami : str The Amazon Machine Image ID that describes the OS the instances will use after launch. The default ID is Amazon Linux 2 in the US East region. instance_user_data : str or None This is either the path to a file on disk that contains a shell-script or a string containing a shell-script that will be executed by root right after the instance is launched. Use to automatically set up workers and queues. If None, will not execute anything at instance start up. instance_ebs_optimized : bool If True, will enable EBS optimization to speed up IO. This is usually True for all instances made available in the last couple of years. wait_until_up : bool If True, will not return from this function until the spot fleet request is acknowledged by AWS. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. raiseonfail : bool If True, will re-raise whatever Exception caused the operation to fail and break out immediately. Returns ------- str or None This is the spot fleet request ID if successful. Otherwise, returns None. """ fleetconfig = copy.deepcopy(SPOT_FLEET_CONFIG) fleetconfig['IamFleetRole'] = spot_fleet_iam_role fleetconfig['AllocationStrategy'] = allocation_strategy fleetconfig['TargetCapacity'] = target_capacity fleetconfig['SpotPrice'] = str(spot_price) fleetconfig['ValidUntil'] = ( datetime.utcnow() + timedelta(days=expires_days) ).strftime( '%Y-%m-%dT%H:%M:%SZ' ) # get the user data from a string or a file # we need to base64 encode it here if (isinstance(instance_user_data, str) and os.path.exists(instance_user_data)): with open(instance_user_data,'rb') as infd: udata = base64.b64encode(infd.read()).decode() elif isinstance(instance_user_data, str): udata = base64.b64encode(instance_user_data.encode()).decode() else: udata = ( '#!/bin/bash\necho "No user data provided. ' 'Launched instance at: %s UTC"' % datetime.utcnow().isoformat() ) udata = base64.b64encode(udata.encode()).decode() for ind, itype in enumerate(instance_types): thisinstance = SPOT_PERINSTANCE_CONFIG.copy() thisinstance['InstanceType'] = itype thisinstance['ImageId'] = instance_ami thisinstance['SubnetId'] = subnet_id thisinstance['KeyName'] = keypair_name thisinstance['IamInstanceProfile']['Arn'] = iam_instance_profile_arn thisinstance['SecurityGroups'][0] = {'GroupId':security_groupid} thisinstance['UserData'] = udata thisinstance['EbsOptimized'] = instance_ebs_optimized # get the instance weights if isinstance(instance_weights, list): thisinstance['WeightedCapacity'] = instance_weights[ind] fleetconfig['LaunchSpecifications'].append(thisinstance) # # launch the fleet # if not client: client = boto3.client('ec2') try: resp = client.request_spot_fleet( SpotFleetRequestConfig=fleetconfig, ) if not resp: LOGERROR('spot fleet request failed.') return None else: spot_fleet_reqid = resp['SpotFleetRequestId'] LOGINFO('spot fleet requested successfully. request ID: %s' % spot_fleet_reqid) if not wait_until_up: return spot_fleet_reqid else: ntries = 10 curr_try = 0 while curr_try < ntries: resp = client.describe_spot_fleet_requests( SpotFleetRequestIds=[ spot_fleet_reqid ] ) curr_state = resp.get('SpotFleetRequestConfigs',[]) if len(curr_state) > 0: curr_state = curr_state[0]['SpotFleetRequestState'] if curr_state == 'active': LOGINFO('spot fleet with reqid: %s is now active' % spot_fleet_reqid) break LOGINFO( 'spot fleet not yet active, waiting 15 seconds. ' 'try %s/%s' % (curr_try, ntries) ) curr_try = curr_try + 1 time.sleep(15.0) return spot_fleet_reqid except ClientError: LOGEXCEPTION('could not launch spot fleet') if raiseonfail: raise return None except Exception: LOGEXCEPTION('could not launch spot fleet') if raiseonfail: raise return None
[docs]def delete_spot_fleet_cluster( spot_fleet_reqid, client=None, ): """ This deletes a spot-fleet cluster. Parameters ---------- spot_fleet_reqid : str The fleet request ID returned by `make_spot_fleet_cluster`. client : boto3.Client or None If None, this function will instantiate a new `boto3.Client` object to use in its operations. Alternatively, pass in an existing `boto3.Client` instance to re-use it here. Returns ------- Nothing. """ if not client: client = boto3.client('ec2') resp = client.cancel_spot_fleet_requests( SpotFleetRequestIds=[spot_fleet_reqid], TerminateInstances=True ) return resp