Cleanup code
Cleanup code tears down all resources created by an objective’s steps. It runs after testing to leave the sandbox account in a clean state.
One cleanup file per objective, located at cleanup/boto3/{objective_id}.py.
Function signature
async def run(session, context, logger):Per-objective scope
A single cleanup file handles all resources from all steps in that objective. If objective 01 has three steps that create a VPC, subnets, and a route table, the 01.py cleanup file deletes all three.
Reverse dependency order
Delete resources in reverse order of creation. Resources created later typically depend on resources created earlier, so you need to remove the dependents first.
Common dependency chains to respect:
- Delete listeners before load balancers
- Delete load balancers (and wait for full deletion) before target groups
- Detach internet gateways before deleting them
- Disassociate route tables before deleting them
- Deregister targets before terminating instances
- Wait for NAT gateway deletion (2–5 minutes) before releasing Elastic IPs
Check-before-delete pattern
Don’t maintain a list of “not found” error codes. Instead, query the resource to check if it exists before attempting deletion. Each service has its own describe/get/head API — use the appropriate one.
The pattern for each resource:
- Check — call the service’s describe/get/head API
- Delete — if it exists, call the delete API
- Wait — if a waiter is available, use it to confirm deletion completed
- Skip — if the check shows the resource is already gone, log and move on
This approach works across all AWS services without maintaining error code lists. A resource that doesn’t exist is the desired end state — it’s not a failure.
Waiters and async deletions
Many AWS deletions are asynchronous. Use boto3 waiters where available to confirm the resource is fully deleted before moving to the next one:
ec2.get_waiter("instance_terminated").wait(InstanceIds=[instance_id])elbv2.get_waiter("load_balancers_deleted").wait(LoadBalancerArns=[alb_arn])s3.get_waiter("bucket_not_exists").wait(Bucket=bucket_name)When no waiter exists, use a poll loop:
import asyncio
for _ in range(40): response = ec2.describe_nat_gateways(NatGatewayIds=[nat_gw_id]) state = response["NatGateways"][0]["State"] if state == "deleted": break await asyncio.sleep(15)Polling with asyncio.sleep is acceptable in cleanup code when no waiter exists. Don’t use it in solution or evaluation code.
Return format
{"cleaned": True}— all resources deleted or already gone{"cleaned": False, "errors": ["bucket(my-bucket): BucketNotEmpty", ...]}— one or more deletions failed
Each entry in errors describes what failed and why. Resources that don’t exist are not errors.
Discovering resources without resolved handles
If a resource ID isn’t in context["resolved"], check context["events"] to discover it. Iterate the events list and match by type — don’t assume position.
Worked example
Objective 01 has two steps: step 01 creates an S3 bucket, step 02 adds a bucket policy.
from botocore.exceptions import ClientError
async def run(session, context, logger): s3 = session.client("s3") resolved = {r["name"]: r for r in context.get("resolved", [])} errors = []
# Discover bucket from resolved handles or events bucket_name = resolved.get("01_01_bucket", {}).get("id") if not bucket_name: for event in context.get("events", []): if event["type"] == "AWS::S3::Bucket" and event.get("id"): bucket_name = event["id"] break
if bucket_name: # Delete policy first (reverse dependency order) try: s3.get_bucket_policy(Bucket=bucket_name) s3.delete_bucket_policy(Bucket=bucket_name) logger.info(f"Deleted bucket policy for {bucket_name}") except ClientError as e: code = e.response["Error"]["Code"] if code == "NoSuchBucketPolicy": logger.info(f"Bucket policy already gone for {bucket_name}") else: logger.warning(f"Failed to delete bucket policy: {e}") errors.append(f"bucket_policy({bucket_name}): {code}")
# Then delete the bucket try: s3.head_bucket(Bucket=bucket_name) s3.delete_bucket(Bucket=bucket_name) s3.get_waiter("bucket_not_exists").wait( Bucket=bucket_name, WaiterConfig={"Delay": 2, "MaxAttempts": 30} ) logger.info(f"Deleted bucket: {bucket_name}") except ClientError as e: code = e.response["Error"]["Code"] if code in ("404", "NoSuchBucket"): logger.info(f"Bucket already gone: {bucket_name}") else: logger.warning(f"Failed to delete bucket: {e}") errors.append(f"bucket({bucket_name}): {code}")
if errors: return {"cleaned": False, "errors": errors} return {"cleaned": True}Notice the structure: discover the resource, delete dependents first (policy), then the resource itself (bucket), wait for confirmation, and collect errors rather than raising exceptions.