|
| 1 | +import boto3 |
| 2 | +import datetime |
| 3 | +import botocore |
| 4 | +import json |
| 5 | + |
| 6 | +s3 = boto3.client("s3") |
| 7 | +ecs = boto3.client("ecs") |
| 8 | +ec2 = boto3.client("ec2") |
| 9 | +cloudwatch = boto3.client("cloudwatch") |
| 10 | +sqs = boto3.client("sqs") |
| 11 | + |
| 12 | +bucket = "BUCKET_NAME" |
| 13 | + |
| 14 | + |
| 15 | +def killdeadAlarms(fleetId, monitorapp, project): |
| 16 | + checkdates = [ |
| 17 | + datetime.datetime.now().strftime("%Y-%m-%d"), |
| 18 | + (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d"), |
| 19 | + ] |
| 20 | + todel = [] |
| 21 | + for eachdate in checkdates: |
| 22 | + datedead = ec2.describe_spot_fleet_request_history( |
| 23 | + SpotFleetRequestId=fleetId, StartTime=eachdate |
| 24 | + ) |
| 25 | + for eachevent in datedead["HistoryRecords"]: |
| 26 | + if eachevent["EventType"] == "instanceChange": |
| 27 | + if eachevent["EventInformation"]["EventSubType"] == "terminated": |
| 28 | + todel.append(eachevent["EventInformation"]["InstanceId"]) |
| 29 | + todel = [f"{project}_{x}" for x in todel] |
| 30 | + cloudwatch.delete_alarms(AlarmNames=todel) |
| 31 | + print("Old alarms deleted") |
| 32 | + |
| 33 | + |
| 34 | +def seeIfLogExportIsDone(logExportId): |
| 35 | + while True: |
| 36 | + result = cloudwatch.describe_export_tasks(taskId=logExportId) |
| 37 | + if result["exportTasks"][0]["status"]["code"] != "PENDING": |
| 38 | + if result["exportTasks"][0]["status"]["code"] != "RUNNING": |
| 39 | + print(result["exportTasks"][0]["status"]["code"]) |
| 40 | + break |
| 41 | + time.sleep(30) |
| 42 | + |
| 43 | + |
| 44 | +def downscaleSpotFleet(queue, spotFleetID): |
| 45 | + response = sqs.get_queue_url(QueueName=queue) |
| 46 | + queueUrl = response["QueueUrl"] |
| 47 | + response = sqs.get_queue_attributes( |
| 48 | + QueueUrl=queueUrl, |
| 49 | + AttributeNames=[ |
| 50 | + "ApproximateNumberOfMessages", |
| 51 | + "ApproximateNumberOfMessagesNotVisible", |
| 52 | + ], |
| 53 | + ) |
| 54 | + visible = int(response["Attributes"]["ApproximateNumberOfMessages"]) |
| 55 | + nonvisible = int(response["Attributes"]["ApproximateNumberOfMessagesNotVisible"]) |
| 56 | + status = ec2.describe_spot_fleet_instances(SpotFleetRequestId=spotFleetID) |
| 57 | + if nonvisible < len(status["ActiveInstances"]): |
| 58 | + result = ec2.modify_spot_fleet_request( |
| 59 | + ExcessCapacityTerminationPolicy="noTermination", |
| 60 | + TargetCapacity=str(nonvisible), |
| 61 | + SpotFleetRequestId=spotFleetID, |
| 62 | + ) |
| 63 | + |
| 64 | + |
| 65 | +def lambda_handler(event, lambda_context): |
| 66 | + # Triggered any time SQS queue ApproximateNumberOfMessagesVisible = 0 |
| 67 | + # OR ApproximateNumberOfMessagesNotVisible = 0 |
| 68 | + messagestring = event["Records"][0]["Sns"]["Message"] |
| 69 | + messagedict = json.loads(messagestring) |
| 70 | + queueId = messagedict["Trigger"]["Dimensions"][0]["value"] |
| 71 | + project = queueId.rsplit("_", 1)[0] |
| 72 | + |
| 73 | + # Download monitor file |
| 74 | + monitor_file_name = f"{queueId.split('Queue')[0]}SpotFleetRequestId.json" |
| 75 | + monitor_local_name = f"/tmp/{monitor_file_name}" |
| 76 | + monitor_on_bucket_name = f"monitors/{monitor_file_name}" |
| 77 | + |
| 78 | + with open(monitor_local_name, "wb") as f: |
| 79 | + try: |
| 80 | + s3.download_fileobj(bucket, monitor_on_bucket_name, f) |
| 81 | + except botocore.exceptions.ClientError as error: |
| 82 | + print("Error retrieving monitor file.") |
| 83 | + return |
| 84 | + with open(monitor_local_name, "r") as input: |
| 85 | + monitorInfo = json.load(input) |
| 86 | + |
| 87 | + monitorcluster = monitorInfo["MONITOR_ECS_CLUSTER"] |
| 88 | + monitorapp = monitorInfo["MONITOR_APP_NAME"] |
| 89 | + fleetId = monitorInfo["MONITOR_FLEET_ID"] |
| 90 | + loggroupId = monitorInfo["MONITOR_LOG_GROUP_NAME"] |
| 91 | + starttime = monitorInfo["MONITOR_START_TIME"] |
| 92 | + CLEAN_DASHBOARD = monitorInfo["CLEAN_DASHBOARD"] |
| 93 | + print(f"Monitor triggered for {monitorcluster} {monitorapp} {fleetId} {loggroupId}") |
| 94 | + |
| 95 | + # If no visible messages, downscale machines |
| 96 | + if "ApproximateNumberOfMessagesVisible" in event["Records"][0]["Sns"]["Message"]: |
| 97 | + print("No visible messages. Tidying as we go.") |
| 98 | + killdeadAlarms(fleetId, monitorapp, project) |
| 99 | + downscaleSpotFleet(queueId, fleetId) |
| 100 | + |
| 101 | + # If no messages in progress, cleanup |
| 102 | + if "ApproximateNumberOfMessagesNotVisible" in event["Records"][0]["Sns"]["Message"]: |
| 103 | + print("No messages in progress. Cleaning up.") |
| 104 | + ecs.update_service( |
| 105 | + cluster=monitorcluster, |
| 106 | + service=f"{monitorapp}Service", |
| 107 | + desiredCount=0, |
| 108 | + ) |
| 109 | + print("Service has been downscaled") |
| 110 | + |
| 111 | + # Delete the alarms from active machines and machines that have died. |
| 112 | + active_dictionary = ec2.describe_spot_fleet_instances( |
| 113 | + SpotFleetRequestId=fleetId |
| 114 | + ) |
| 115 | + active_instances = [] |
| 116 | + for instance in active_dictionary["ActiveInstances"]: |
| 117 | + active_instances.append(instance["InstanceId"]) |
| 118 | + cloudwatch.delete_alarms(AlarmNames=active_instances) |
| 119 | + killdeadAlarms(fleetId, monitorapp, project) |
| 120 | + |
| 121 | + # Read spot fleet id and terminate all EC2 instances |
| 122 | + ec2.cancel_spot_fleet_requests( |
| 123 | + SpotFleetRequestIds=[fleetId], TerminateInstances=True |
| 124 | + ) |
| 125 | + print("Fleet shut down.") |
| 126 | + |
| 127 | + # Remove SQS queue, ECS Task Definition, ECS Service |
| 128 | + ECS_TASK_NAME = monitorapp + "Task" |
| 129 | + ECS_SERVICE_NAME = monitorapp + "Service" |
| 130 | + |
| 131 | + print("Deleting existing queue.") |
| 132 | + queueoutput = sqs.list_queues(QueueNamePrefix=queueId) |
| 133 | + try: |
| 134 | + if len(queueoutput["QueueUrls"]) == 1: |
| 135 | + queueUrl = queueoutput["QueueUrls"][0] |
| 136 | + else: # In case we have "AnalysisQueue" and "AnalysisQueue1" and only want to delete the first of those |
| 137 | + for eachUrl in queueoutput["QueueUrls"]: |
| 138 | + if eachUrl.split("/")[-1] == queueName: |
| 139 | + queueUrl = eachUrl |
| 140 | + sqs.delete_queue(QueueUrl=queueUrl) |
| 141 | + except KeyError: |
| 142 | + print("Can't find queue to delete.") |
| 143 | + |
| 144 | + print("Deleting service") |
| 145 | + try: |
| 146 | + ecs.delete_service(cluster=monitorcluster, service=ECS_SERVICE_NAME) |
| 147 | + except: |
| 148 | + print("Couldn't delete service.") |
| 149 | + |
| 150 | + print("De-registering task") |
| 151 | + taskArns = ecs.list_task_definitions() |
| 152 | + for eachtask in taskArns["taskDefinitionArns"]: |
| 153 | + fulltaskname = eachtask.split("/")[-1] |
| 154 | + ecs.deregister_task_definition(taskDefinition=fulltaskname) |
| 155 | + |
| 156 | + print("Removing cluster if it's not the default and not otherwise in use") |
| 157 | + if monitorcluster != "default": |
| 158 | + result = ecs.describe_clusters(clusters=[monitorcluster]) |
| 159 | + if ( |
| 160 | + sum( |
| 161 | + [ |
| 162 | + result["clusters"][0]["pendingTasksCount"], |
| 163 | + result["clusters"][0]["runningTasksCount"], |
| 164 | + result["clusters"][0]["activeServicesCount"], |
| 165 | + ] |
| 166 | + ) |
| 167 | + == 0 |
| 168 | + ): |
| 169 | + ecs.delete_cluster(cluster=monitorcluster) |
| 170 | + |
| 171 | + # Remove alarms that triggered monitor |
| 172 | + print("Removing alarms that triggered Monitor") |
| 173 | + cloudwatch.delete_alarms( |
| 174 | + AlarmNames=[ |
| 175 | + f"ApproximateNumberOfMessagesVisibleisZero_{monitorapp}", |
| 176 | + f"ApproximateNumberOfMessagesNotVisibleisZero_{monitorapp}", |
| 177 | + ] |
| 178 | + ) |
| 179 | + |
| 180 | + # Remove Cloudwatch dashboard if created and cleanup desired |
| 181 | + if CLEAN_DASHBOARD.lower() == "true": |
| 182 | + dashboard_list = cloudwatch.list_dashboards() |
| 183 | + for entry in dashboard_list["DashboardEntries"]: |
| 184 | + if monitorapp in entry["DashboardName"]: |
| 185 | + cloudwatch.delete_dashboards( |
| 186 | + DashboardNames=[entry["DashboardName"]] |
| 187 | + ) |
0 commit comments