Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling tweaks #5

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 122 additions & 187 deletions remove_vpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,250 +7,185 @@

"""

import concurrent.futures
import sys

import boto3
from botocore.exceptions import ClientError
import traceback


def delete_igw(ec2, vpc_id):
"""
Detach and delete the internet gateway
"""

args = {
'Filters' : [
{
'Name' : 'attachment.vpc-id',
'Values' : [ vpc_id ]
}
]
}

try:
igw = ec2.describe_internet_gateways(**args)['InternetGateways']
except ClientError as e:
print(e.response['Error']['Message'])

if igw:
igw_id = igw[0]['InternetGatewayId']
"""
Detach and delete the internet gateway
"""

try:
result = ec2.detach_internet_gateway(InternetGatewayId=igw_id, VpcId=vpc_id)
except ClientError as e:
print(e.response['Error']['Message'])
args = {"Filters": [{"Name": "attachment.vpc-id", "Values": [vpc_id]}]}
igws = ec2.describe_internet_gateways(**args)["InternetGateways"]

try:
result = ec2.delete_internet_gateway(InternetGatewayId=igw_id)
except ClientError as e:
print(e.response['Error']['Message'])
for igw in igws:
igw_id = igw["InternetGatewayId"]

return
ec2.detach_internet_gateway(InternetGatewayId=igw_id, VpcId=vpc_id)
ec2.delete_internet_gateway(InternetGatewayId=igw_id)


def delete_subs(ec2, args):
"""
Delete the subnets
"""
"""
Delete the subnets
"""

try:
subs = ec2.describe_subnets(**args)['Subnets']
except ClientError as e:
print(e.response['Error']['Message'])
subs = ec2.describe_subnets(**args)["Subnets"]

if subs:
for sub in subs:
sub_id = sub['SubnetId']
sub_id = sub["SubnetId"]
ec2.delete_subnet(SubnetId=sub_id)

try:
result = ec2.delete_subnet(SubnetId=sub_id)
except ClientError as e:
print(e.response['Error']['Message'])

return
def delete_rtbs(ec2, args):
"""
Delete the route tables
"""

rtbs = ec2.describe_route_tables(**args)["RouteTables"]

def delete_rtbs(ec2, args):
"""
Delete the route tables
"""

try:
rtbs = ec2.describe_route_tables(**args)['RouteTables']
except ClientError as e:
print(e.response['Error']['Message'])

if rtbs:
for rtb in rtbs:
main = 'false'
for assoc in rtb['Associations']:
main = assoc['Main']
if main == True:
continue
rtb_id = rtb['RouteTableId']

try:
result = ec2.delete_route_table(RouteTableId=rtb_id)
except ClientError as e:
print(e.response['Error']['Message'])

return
if rtbs:
for rtb in rtbs:
main = False
for assoc in rtb["Associations"]:
main = assoc["Main"]
if not main:
rtb_id = rtb["RouteTableId"]
ec2.delete_route_table(RouteTableId=rtb_id)


def delete_acls(ec2, args):
"""
Delete the network access lists (NACLs)
"""
"""
Delete the network access lists (NACLs)
"""

try:
acls = ec2.describe_network_acls(**args)['NetworkAcls']
except ClientError as e:
print(e.response['Error']['Message'])
acls = ec2.describe_network_acls(**args)["NetworkAcls"]

if acls:
for acl in acls:
default = acl['IsDefault']
if default == True:
continue
acl_id = acl['NetworkAclId']

try:
result = ec2.delete_network_acl(NetworkAclId=acl_id)
except ClientError as e:
print(e.response['Error']['Message'])

return
is_default = acl["IsDefault"]
if not is_default:
acl_id = acl["NetworkAclId"]
ec2.delete_network_acl(NetworkAclId=acl_id)
break


def delete_sgps(ec2, args):
"""
Delete any security groups
"""
"""
Delete any security groups
"""

try:
sgps = ec2.describe_security_groups(**args)['SecurityGroups']
except ClientError as e:
print(e.response['Error']['Message'])
sgps = ec2.describe_security_groups(**args)["SecurityGroups"]

if sgps:
for sgp in sgps:
default = sgp['GroupName']
if default == 'default':
continue
sg_id = sgp['GroupId']
non_default_sg_ids = [i["GroupId"] for i in sgps if i["GroupName"] != "default"]
tries = 0
max_tries = len(sgps)

try:
result = ec2.delete_security_group(GroupId=sg_id)
except ClientError as e:
print(e.response['Error']['Message'])
while len(non_default_sg_ids) > 1 and tries < max_tries:
for sg_id in non_default_sg_ids:
try:
ec2.delete_security_group(GroupId=sg_id)
except ClientError as exc:
print(exc, file=sys.stderr)

return
tries += 1


def delete_vpc(ec2, vpc_id, region):
"""
Delete the VPC
"""

try:
result = ec2.delete_vpc(VpcId=vpc_id)
except ClientError as e:
print(e.response['Error']['Message'])
"""
Delete the VPC
"""

else:
print('VPC {} has been deleted from the {} region.'.format(vpc_id, region))

return
ec2.delete_vpc(VpcId=vpc_id)
print(f"VPC {vpc_id} has been deleted from the {region} region.")


def get_regions(ec2):
"""
Return all AWS regions
"""

regions = []
"""
Return all AWS regions
"""

try:
aws_regions = ec2.describe_regions()['Regions']
except ClientError as e:
print(e.response['Error']['Message'])
aws_regions = ec2.describe_regions()["Regions"]
return [region["RegionName"] for region in aws_regions]

else:
for region in aws_regions:
regions.append(region['RegionName'])

return regions
def process_region(region):
ec2 = boto3.Session().client("ec2", region_name=region)

try:
attribs = ec2.describe_account_attributes(AttributeNames=["default-vpc"])
except ClientError as e:
# this can happen with e.g. SCPs preventing access to a particular region:
raise RuntimeError(
"Unable to query VPCs in {}: {}".format(
region, e.response["Error"]["Message"]
)
) from e

def main(profile):
"""
Do the work..
assert 1 == len(attribs["AccountAttributes"])
vpc_id = attribs["AccountAttributes"][0]["AttributeValues"][0]["AttributeValue"]

Order of operation:
if vpc_id == "none":
raise RuntimeError(f"Default VPC was not found in the {region} region.")

1.) Delete the internet gateway
2.) Delete subnets
3.) Delete route tables
4.) Delete network access lists
5.) Delete security groups
6.) Delete the VPC
"""
# Are there any existing resources? Since most resources attach an ENI, let's check..

# AWS Credentials
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
args = {"Filters": [{"Name": "vpc-id", "Values": [vpc_id]}]}
eni = ec2.describe_network_interfaces(**args)["NetworkInterfaces"]

session = boto3.Session(profile_name=profile)
ec2 = session.client('ec2', region_name='us-east-1')
if eni:
print(f"VPC {vpc_id} has existing resources in the {region} region.")
else:
print(f"Deleting unused VPC {vpc_id} in the {region} region.")
delete_igw(ec2, vpc_id)
delete_subs(ec2, args)
delete_rtbs(ec2, args)
delete_acls(ec2, args)
delete_sgps(ec2, args)
delete_vpc(ec2, vpc_id, region)

regions = get_regions(ec2)

for region in regions:
def main():
"""
Do the work..

ec2 = session.client('ec2', region_name=region)
Order of operation:

try:
attribs = ec2.describe_account_attributes(AttributeNames=[ 'default-vpc' ])['AccountAttributes']
except ClientError as e:
print(e.response['Error']['Message'])
return
1.) Confirm that the VPC has no allocated network interfaces
2.) Delete the internet gateway
3.) Delete subnets
4.) Delete route tables
5.) Delete network access lists
6.) Delete security groups
7.) Delete the VPC
"""

else:
vpc_id = attribs[0]['AttributeValues'][0]['AttributeValue']
# AWS Credentials
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html

if vpc_id == 'none':
print('VPC (default) was not found in the {} region.'.format(region))
continue
session = boto3.Session()
ec2 = session.client("ec2", region_name="us-east-1")

# Are there any existing resources? Since most resources attach an ENI, let's check..
regions = get_regions(ec2)

args = {
'Filters' : [
{
'Name' : 'vpc-id',
'Values' : [ vpc_id ]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(process_region, region): region for region in regions
}
]
}

try:
eni = ec2.describe_network_interfaces(**args)['NetworkInterfaces']
except ClientError as e:
print(e.response['Error']['Message'])
return

if eni:
print('VPC {} has existing resources in the {} region.'.format(vpc_id, region))
continue
for future in concurrent.futures.as_completed(futures):
region = futures[future]

result = delete_igw(ec2, vpc_id)
result = delete_subs(ec2, args)
result = delete_rtbs(ec2, args)
result = delete_acls(ec2, args)
result = delete_sgps(ec2, args)
result = delete_vpc(ec2, vpc_id, region)

return
try:
future.result()
except Exception as exc:
print(f"Error processing {region}: {exc}", file=sys.stderr)
if not isinstance(exc, (ClientError, RuntimeError)):
traceback.print_exc(file=sys.stderr)


if __name__ == "__main__":

main(profile = '<YOUR_PROFILE>')

main()