-
Notifications
You must be signed in to change notification settings - Fork 0
/
map.py
executable file
·2997 lines (2874 loc) · 188 KB
/
map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import boto3, botocore.exceptions, requests, sys, datetime, json, os, argparse, pathlib, datetime, platform
from rich import print as rprint
from docx import Document
from dcnet_msofficetools.docx_extensions import build_table, replace_placeholder_with_table
from copy import deepcopy
import word_table_models
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# SET COMMANDLINE ARGUMENT PARAMETERS
parser = argparse.ArgumentParser()
parser.add_argument(
'-t',
action='store_true',
default=False,
dest='skip_topology',
help='Skip bulding topology file from AWS API (use existing JSON topology files in working directory)'
)
args = parser.parse_args()
# GLOBAL VARIABLES
output_verbosity = 0 # 0 (Default) or 1 (Verbose)
topology_folder = "topologies"
word_template = "template.docx"
table_header_color = "506279" # Dark Blue
green_spacer = "8FD400" # CC Green/Lime
red_spacer = "F12938" # CC Red
orange_spacer = "FF7900" # CC Orange
alternating_row_color = "D5DCE4" # Light Blue
region_list = [] # Leave blank to auto-pull and check all regions
aws_protocol_map = { # Maps AWS protocol numbers to user-friendly names
"-1": "All Traffic",
"6": "TCP",
"17": "UDP",
"1": "ICMPv4",
"58": "ICMPv6"
}
# HELPER FUNCTIONS
def datetime_converter(obj):
# Converts datetime objects to a string timestamp. Needed to render JSON
if isinstance(obj, datetime.datetime):
return obj.__str__()
def create_word_obj_from_template(template_file):
# Attempt to read a Word document from filesystem and load it as a docx object
try:
return Document(template_file)
except:
rprint(f"\n\n:x: [red]Could not open [blue]{template_file}[red]. Please make sure it exists and is a valid Microsoft Word document. Exiting...")
sys.exit(1)
def extract_name_from_aws_tags(obj):
# Extract the name from a given object by searching the object's list of AWS tags
try:
name = [tag['Value'] for tag in obj['Tags'] if tag['Key'] == "Name"][0]
except KeyError:
# Object has no name
name = "<unnamed>"
except IndexError:
# Object has no name
name = "<unnamed>"
return name
def get_subnet_name_by_id(source_subnet_id, source_vpc=None):
# Return a subnet's name tag (if available) by looking up the name in the topology VPC structure
if source_vpc:
for subnet in source_vpc['subnets']:
if subnet['SubnetId'] == source_subnet_id:
subnet_name = extract_name_from_aws_tags(subnet)
break
else:
subnet_name = None
for vpcs in region_vpcs.values():
for vpc in vpcs:
for subnet in vpc['subnets']:
if subnet['SubnetId'] == source_subnet_id:
subnet_name = extract_name_from_aws_tags(subnet)
break
if subnet_name:
break
subnet_name = "" if not subnet_name else subnet_name
return subnet_name
# MAIN FUNCTIONS
def get_regions():
if region_list:
return region_list
else:
response = ec2.describe_regions()
discovered_regions = [x['RegionName'] for x in response['Regions']]
return discovered_regions
def add_regions_to_topology():
for region in available_regions:
topology['regions'][region] = {
"vpcs": [],
"prefix_lists": [],
"customer_gateways": [],
"vpn_tgw_connections": [],
"endpoint_services": [],
"transit_gateways": [],
"instances": []
}
def fingerprint_vpc(region, vpc, ec2):
'''
Fingerprint/interrogate a VPC to see if it is a default & unused VPC. These will be filtered from the results.
FINGERPRINT OF A DEFAULT UNUSED VPC:
1- IsDefault = true
2- CIDR Block = 172.31.0.0/16
3- Three (3) Subnets = 172.31.0.0/20, 172.31.16.0/20, & 172.31.32.0/20
4- One (1) Route Table with no tags and 2 routes with DestinationCidrBlock = 172.31.0.0/16 & 0.0.0.0/0
5- One (1) Internet Gateway with no tags
6- Zero (0) NAT Gateways
7- One (1) Security Group with description = "default VPC security group"
8- Zero (0) EC2 Instances
'''
fingerprint_checks = []
if output_verbosity == 1:
rprint(f"\n[white]Running VPC fingerprint on: {region}\{vpc['VpcId']}...")
# TEST 1
status = "pass" if vpc['IsDefault'] else "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]IsDefault = true Check: [green]PASS" if status == "pass" else " [white]IsDefault = true Check: [red]FAIL"
rprint(msg)
# TEST 2
status = "pass" if vpc['CidrBlock'] == "172.31.0.0/16" else "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]CIDR = 172.31.0.0/16 Check: [green]PASS" if status == "pass" else " [white]CIDR = 172.31.0.0/16 Check: [red]FAIL"
rprint(msg)
# TEST 3
subnets = [subnet['CidrBlock'] for subnet in ec2.describe_subnets()['Subnets'] if subnet['VpcId'] == vpc['VpcId']]
if len(subnets) == 3 and "172.31.0.0/20" in subnets and "172.31.16.0/20" in subnets and "172.31.32.0/20" in subnets:
status = "pass"
elif len(subnets) == 4 and "172.31.0.0/20" in subnets and "172.31.16.0/20" in subnets and "172.31.32.0/20" in subnets and "172.31.48.0/20" in subnets:
status = "pass"
else:
status = "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]3 Default Subnets Check: [green]PASS" if status == "pass" else " [white]3 Default Subnets Check: [red]FAIL"
rprint(msg)
# TEST 4
route_tables = [rt for rt in ec2.describe_route_tables()['RouteTables'] if rt['VpcId'] == vpc['VpcId']]
if len(route_tables) == 1 and not route_tables[0]['Tags']: # Only 1 route table and it has no tags
routes = [route['DestinationCidrBlock'] for route in route_tables[0]['Routes']]
if len(routes) == 2 and "0.0.0.0/0" in routes and "172.31.0.0/16" in routes: # Exactly 2 specific routes
status = "pass"
else:
status = "fail"
else:
status = "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]Route Table Check: [green]PASS" if status == "pass" else " [white]Route Table Check: [red]FAIL"
rprint(msg)
# TEST 5
igws = [igw for igw in ec2.describe_internet_gateways()['InternetGateways'] if igw['Attachments'][0]['VpcId'] == vpc['VpcId']]
if len(igws) == 1 and not igws[0]['Tags']: # Exactly 1 IGW with no Tags
status = "pass"
else:
status = "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]Internet Gateway Check: [green]PASS" if status == "pass" else " [white]Internet Gateway Check: [red]FAIL"
rprint(msg)
# TEST 6
natgws = [natgw for natgw in ec2.describe_nat_gateways()['NatGateways'] if natgw['VpcId'] == vpc['VpcId']]
status = "pass" if len(natgws) == 0 else "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]NAT Gateway Check: [green]PASS" if status == "pass" else " [white]NAT Gateway Check: [red]FAIL"
rprint(msg)
# TEST 7
sec_grps = [sg for sg in ec2.describe_security_groups()['SecurityGroups'] if sg['VpcId'] == vpc['VpcId']]
if len(sec_grps) == 1 and sec_grps[0]['Description'].lower() == "default vpc security group":
status = "pass"
else:
status = "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]Security Group Check: [green]PASS" if status == "pass" else " [white]Security Group Check: [red]FAIL"
rprint(msg)
# TEST 8
reservations = ec2.describe_instances()['Reservations']
instances = sum([inst['Instances'] for inst in reservations], [])
this_vpc_instances = [inst for inst in instances if "VpcId" in inst.keys() and inst['VpcId'] == vpc['VpcId']]
status = "pass" if len(this_vpc_instances) == 0 else "fail"
fingerprint_checks.append(status)
if output_verbosity == 1:
msg = " [white]EC2 Instance Check: [green]PASS" if status == "pass" else " [white]EC2 Instance Check: [red]FAIL"
rprint(msg)
return False if "fail" in fingerprint_checks else True
def add_vpcs_to_topology():
for region in topology['regions']:
rprint(f" [yellow]Interrogating Region {region} for VPCs...")
ec2 = boto3.client('ec2',region_name=region,verify=False)
elb = boto3.client('elbv2',region_name=region,verify=False)
try:
response = ec2.describe_vpcs()['Vpcs']
topology['regions'][region]["non_vpc_lb_target_groups"] = [tg for tg in elb.describe_target_groups()['TargetGroups'] if not "VpcId" in tg.keys()]
for tg in topology['regions'][region]["non_vpc_lb_target_groups"]:
tg['HealthChecks'] = elb.describe_target_health(TargetGroupArn=tg['TargetGroupArn'])['TargetHealthDescriptions']
for vpc in response:
is_empty_default_vpc = fingerprint_vpc(region, vpc, ec2)
if not is_empty_default_vpc:
topology['regions'][region]['vpcs'].append(vpc)
except botocore.exceptions.ClientError:
rprint(f":x: [red]Client Error reported for region {region}. Most likely no VPCs exist, continuing...")
def add_network_elements_to_vpcs():
for k, v in topology['regions'].items():
ec2 = boto3.client('ec2',region_name=k,verify=False)
elb = boto3.client('elbv2',region_name=k,verify=False)
for vpc in v['vpcs']:
rprint(f" [yellow]Discovering network elements (subnets, route tables, etc.) for {k}/{vpc['VpcId']}...")
subnets = [subnet for subnet in ec2.describe_subnets()['Subnets'] if subnet['VpcId'] == vpc['VpcId']]
vpc['subnets'] = subnets
route_tables = [rt for rt in ec2.describe_route_tables()['RouteTables'] if rt['VpcId'] == vpc['VpcId']]
vpc['route_tables'] = route_tables
igws = [igw for igw in ec2.describe_internet_gateways()['InternetGateways'] if igw['Attachments'][0]['VpcId'] == vpc['VpcId']]
vpc['internet_gateways'] = igws
natgws = [natgw for natgw in ec2.describe_nat_gateways()['NatGateways'] if natgw['VpcId'] == vpc['VpcId']]
vpc['nat_gateways'] = natgws
eigws = [eigw for eigw in ec2.describe_egress_only_internet_gateways()['EgressOnlyInternetGateways'] if eigw['Attachments'][0]['VpcId'] == vpc['VpcId']]
vpc['egress_only_internet_gateways'] = eigws
sec_grps = [sg for sg in ec2.describe_security_groups()['SecurityGroups'] if sg['VpcId'] == vpc['VpcId']]
vpc['security_groups'] = sec_grps
net_acls = [acl for acl in ec2.describe_network_acls()['NetworkAcls'] if acl['VpcId'] == vpc['VpcId']]
vpc['network_acls'] = net_acls
vpn_gateways = [gw for gw in ec2.describe_vpn_gateways()['VpnGateways'] for attch in gw['VpcAttachments'] if attch['VpcId'] == vpc['VpcId']]
vpc['vpn_gateways'] = vpn_gateways
for gw in vpn_gateways: # Add VPN connections to owner VPN gateways
gw['connections'] = [conn for conn in ec2.describe_vpn_connections()['VpnConnections'] if conn['VpnGatewayId'] == gw['VpnGatewayId']]
cgw_ids = [cgw['CustomerGatewayId'] for cgw in gw['connections']]
gw['customer_gateways'] = [cgw for cgw in ec2.describe_customer_gateways()['CustomerGateways'] if cgw['CustomerGatewayId'] in cgw_ids]
ec2_instances = [inst for each in ec2.describe_instances()['Reservations'] for inst in each['Instances'] if "VpcId" in inst.keys() and inst['VpcId'] == vpc['VpcId']]
ec2_groups = [grp for each in ec2.describe_instances()['Reservations'] for grp in each['Groups']]
vpc['ec2_instances'] = ec2_instances
vpc['ec2_groups'] = ec2_groups
vpc['endpoints'] = [ep for ep in ec2.describe_vpc_endpoints()['VpcEndpoints'] if ep['VpcId'] == vpc['VpcId']]
vpc['load_balancers'] = [lb for lb in elb.describe_load_balancers()['LoadBalancers'] if lb['VpcId'] == vpc['VpcId']]
for lb in vpc['load_balancers']:
lb['Listeners'] = elb.describe_listeners(LoadBalancerArn=lb['LoadBalancerArn'])['Listeners']
vpc['lb_target_groups'] = [tg for tg in elb.describe_target_groups()['TargetGroups'] if "VpcId" in tg.keys() and tg['VpcId'] == vpc['VpcId']]
for tg in vpc['lb_target_groups']:
tg['HealthChecks'] = elb.describe_target_health(TargetGroupArn=tg['TargetGroupArn'])['TargetHealthDescriptions']
def add_prefix_lists_to_topology():
for region in topology['regions']:
rprint(f" [yellow]Interrogating Region {region} for Prefix Lists...")
ec2 = boto3.client('ec2',region_name=region,verify=False)
try:
pls = [pl for pl in ec2.describe_prefix_lists()['PrefixLists']]
topology['regions'][region]['prefix_lists'] = pls
except botocore.exceptions.ClientError:
rprint(f":x: [red]Client Error reported for region {region}. Skipping...")
def add_vpn_customer_gateways_to_topology():
for region in topology['regions']:
rprint(f" [yellow]Interrogating Region {region} for Customer Gateways...")
ec2 = boto3.client('ec2',region_name=region,verify=False)
try:
cgws = [cgw for cgw in ec2.describe_customer_gateways()['CustomerGateways']]
topology['regions'][region]['customer_gateways'] = cgws
except botocore.exceptions.ClientError:
rprint(f":x: [red]Client Error reported for region {region}. Skipping...")
def add_vpn_tgw_connections_to_topology():
for region in topology['regions']:
rprint(f" [yellow]Interrogating Region {region} for VPN Connections Attached to Transit Gateways...")
ec2 = boto3.client('ec2',region_name=region,verify=False)
try:
tgw_vpns = [conn for conn in ec2.describe_vpn_connections()['VpnConnections'] if "TransitGatewayId" in conn.keys()]
topology['regions'][region]['vpn_tgw_connections'] = tgw_vpns
except botocore.exceptions.ClientError:
rprint(f":x: [red]Client Error reported for region {region}. Skipping...")
def add_endpoint_services_to_topology():
for region in topology['regions']:
rprint(f" [yellow]Interrogating Region {region} for Endpoint Services...")
ec2 = boto3.client('ec2',region_name=region,verify=False)
try:
ep_svcs = [svc for svc in ec2.describe_vpc_endpoint_services()['ServiceDetails'] if not svc['Owner'] == "amazon"]
topology['regions'][region]['endpoint_services'] = ep_svcs
except botocore.exceptions.ClientError:
rprint(f":x: [red]Client Error reported for region {region}. Skipping...")
def add_vpc_peering_connections_to_topology():
pcx = [conn for conn in ec2.describe_vpc_peering_connections()['VpcPeeringConnections']]
topology['vpc_peering_connections'] = pcx
def add_direct_connect_to_topology():
dx = boto3.client('directconnect', verify=False)
dcgws = [dcgw for dcgw in dx.describe_direct_connect_gateways()['directConnectGateways']]
topology['direct_connect_gateways'] = dcgws
for dcgw in topology['direct_connect_gateways']:
dcgw['Attachments'] = [attch for attch in dx.describe_direct_connect_gateway_attachments(directConnectGatewayId=dcgw['directConnectGatewayId'])['directConnectGatewayAttachments']]
dcgw['Associations'] = [assoc for assoc in dx.describe_direct_connect_gateway_associations(directConnectGatewayId=dcgw['directConnectGatewayId'])['directConnectGatewayAssociations']]
virtual_interfaces = [vif for vif in dx.describe_virtual_interfaces()['virtualInterfaces'] if vif['directConnectGatewayId'] == dcgw['directConnectGatewayId']]
dcgw['Connections'] = [conn for conn in dx.describe_connections()['connections'] if conn['connectionId'] in [vif['connectionId'] for vif in virtual_interfaces]]
for conn in dcgw['Connections']:
conn['VirtualInterfaces'] = [vif for vif in virtual_interfaces if vif['connectionId'] == conn['connectionId']]
def add_transit_gateways_to_topology():
for region in topology['regions']:
rprint(f" [yellow]Interrogating Region {region} for Transit Gateways...")
ec2 = boto3.client('ec2',region_name=region,verify=False)
try:
tgws = [tgw for tgw in ec2.describe_transit_gateways()['TransitGateways']]
for tgw in tgws:
attachments = [attachment for attachment in ec2.describe_transit_gateway_attachments()['TransitGatewayAttachments'] if attachment['TransitGatewayId'] == tgw['TransitGatewayId']]
for attachment in attachments: # Loop through VPC attachments and set ApplianceModeSupport option
appliance_mode_support = "disable"
if attachment['ResourceType'] == "vpc":
attachment_options = ec2.describe_transit_gateway_vpc_attachments(Filters=[{'Name':'transit-gateway-attachment-id','Values':[attachment['TransitGatewayAttachmentId']]}])['TransitGatewayVpcAttachments'][0]
appliance_mode_support = attachment_options['Options']['ApplianceModeSupport']
attachment['SubnetIds'] = attachment_options['SubnetIds']
else:
attachment['SubnetIds'] = ["<NA>"]
attachment['ApplianceModeSupport'] = appliance_mode_support
tgw['attachments'] = attachments
rts = [rt for rt in ec2.describe_transit_gateway_route_tables()['TransitGatewayRouteTables'] if rt['TransitGatewayId'] == tgw['TransitGatewayId']]
tgw['route_tables'] = rts
topology['regions'][region]['transit_gateways'] = tgws
except botocore.exceptions.ClientError as e:
if "(UnauthorizedOperation)" in str(e):
rprint(f"[red]Unauthorized Operation reported while pulling Transit Gateways from {region}. Skipping...")
else:
print(e)
def add_transit_gateway_routes_to_topology():
for region in topology['regions']:
rprint(f" [yellow]Interrogating Region {region} for Transit Gateway Routes...")
ec2 = boto3.client('ec2',region_name=region,verify=False)
tgw_routes = []
try:
tgw_rts = [rt for rt in ec2.describe_transit_gateway_route_tables()['TransitGatewayRouteTables']]
for rt in tgw_rts:
routes = [route for route in ec2.search_transit_gateway_routes(
TransitGatewayRouteTableId = rt['TransitGatewayRouteTableId'],
Filters = [
{
"Name": "state",
"Values": ["active","blackhole"]
}
]
)['Routes']]
tgw_routes.append({
"TransitGatewayRouteTableId": rt['TransitGatewayRouteTableId'],
"TransitGatewayRouteTableName": extract_name_from_aws_tags(rt),
"Routes":routes
})
except botocore.exceptions.ClientError as e:
if "(UnauthorizedOperation)" in str(e):
rprint(f"[red]Unauthorized Operation reported while pulling Transit Gateway Route Tables from {region}. Skipping...")
else:
print(e)
topology['regions'][region]['transit_gateway_routes'] = tgw_routes
# BEST PRACTICE CHECK FUNCTIONS
def add_transit_gateway_best_practice_analysis_to_word_doc(doc_obj):
def run_tgw_quantity_check(tgws):
test_description = "Because Transit Gateways are highly available by design, multiple gateways per region are not required for high availability. https://docs.aws.amazon.com/vpc/latest/tgw/tgw-best-design-practices.html"
regions_with_multiple_tgws = [f"Region {region} has {len(gateways)} Transit Gateways." for tgw in tgws for region, gateways in tgw.items() if len(gateways) > 1]
if regions_with_multiple_tgws:
test_status = "warning"
regions_with_multiple_tgws.append("Confirm any specific use cases exist that call for multiple Transit Gateways in a region.")
test_results = regions_with_multiple_tgws
else:
test_status = "pass"
test_results = "All regions have no more than one Transit Gateway."
return {
"description": test_description,
"status": test_status,
"results": test_results
}
def run_unique_bgp_asn_check(tgws):
test_description = "For deployments with multiple transit gateways, a unique Autonomous System Number (ASN) for each transit gateway is recommended. https://docs.aws.amazon.com/vpc/latest/tgw/tgw-best-design-practices.html"
if len(tgws) == 1:
test_status = "not-applicable"
test_results = "Single Transit Gateway detected. This test is not applicable."
else:
tgw_bgp_asns = [gateway['Options']['AmazonSideAsn'] for tgw in tgws for gateways in tgw.values() for gateway in gateways]
if len(tgw_bgp_asns) == len(set(tgw_bgp_asns)):
test_status = "pass"
test_results = "All Transit Gateway ASNs are unique."
else:
# Get the duplicates
duplicates = set()
for i, asn1 in enumerate(tgw_bgp_asns):
for asn2 in tgw_bgp_asns[i+1:]:
if asn1 == asn2:
duplicates.add(asn1)
test_status = "fail"
test_results = f"Detected re-use of ASNs: {list(duplicates)}"
return {
"description": test_description,
"status": test_status,
"results": test_results
}
def run_one_net_acl_check(tgws):
test_description = "Create one network ACL and associate it with all of the subnets that are associated with the transit gateway. https://docs.aws.amazon.com/vpc/latest/tgw/tgw-best-design-practices.html"
# Build a list of Transit Gateway ID / Subnet, key / value pairs
subnets = [{gateway['TransitGatewayId']:subnet} for tgw in tgws for gateways in tgw.values() for gateway in gateways for attachment in gateway['attachments'] if "SubnetIds" in attachment.keys() for subnet in attachment['SubnetIds']]
# Build a list of Subnet IDs for each Transit Gateway
tgw_subnets = {}
for each in subnets:
for tgw, subnet in each.items():
if tgw in tgw_subnets:
tgw_subnets[tgw].append(subnet)
else:
tgw_subnets[tgw] = [subnet]
# Loop through the Transit Gateways and pull the Network ACL assigned to each subnet. Update fail_list if any VPCs have attachments in different NACLS
fail_list = []
account_net_acls_in_use = {}
for tgw, subnets in tgw_subnets.items():
network_acls_in_use = {}
for subnet in subnets:
# Loop through all VPC Network ACLs and pull the Network ACL ID where this subnet is associated
found = False
for vpcs in region_vpcs.values():
for vpc in vpcs:
for netacl in vpc['network_acls']:
for association in netacl['Associations']:
if association['SubnetId'] == subnet:
if not vpc['VpcId'] in network_acls_in_use.keys(): # VPC Not yet in dictionary
network_acls_in_use[vpc['VpcId']] = [netacl['NetworkAclId']]
elif not netacl['NetworkAclId'] in network_acls_in_use[vpc['VpcId']]: # VPC in dictionary but NetACL not in list yet
network_acls_in_use[vpc['VpcId']].append(netacl['NetworkAclId'])
found = True
break
if found:
break
if found:
break
account_net_acls_in_use[tgw] = network_acls_in_use
# Check to see if any VPCs have multiple Net ACLs, if so add it to the failed list
for vpc, nacl_list in network_acls_in_use.items():
if len(nacl_list) > 1: # Best Practice Check Failure
fail_list.append({
"TransitGatewayId": tgw,
"vpcs": [
{
"VpcId": vpc,
"NetworkAcls": nacl_list
}
]
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All Transit Gateway VPC Attachment Subnets are in a single Network ACL."
else:
test_status = "fail"
test_results = [
"The following Transit Gateways have Attachment Subnets in multiple Network ACLs:",
] + [tgw['TransitGatewayId'] + '/' + vpc['VpcId'] + '/NetACLs: ' + str(vpc['NetworkAcls']) for tgw in fail_list for vpc in tgw['vpcs']]
return {
"description": test_description,
"status": test_status,
"results": test_results,
"net_acls": account_net_acls_in_use
}
def run_net_acl_open_check(test_results):
test_description = "Create one network ACL and associate it with all of the subnets that are associated with the transit gateway. Keep the network ACL open in both the inbound and outbound directions. https://docs.aws.amazon.com/vpc/latest/tgw/tgw-best-design-practices.html"
fail_list = []
for tgw, vpc_dict in test_results['net_acls'].items():
for vpc_id, nacl_list in vpc_dict.items():
vpc_dict = [{"region":region,"vpc":this_vpc} for region, vpcs in region_vpcs.items() for this_vpc in vpcs if this_vpc['VpcId'] == vpc_id][0]
for nacl in nacl_list:
ingress_entries = [entry for acl in vpc_dict['vpc']['network_acls'] if acl['NetworkAclId'] == nacl for entry in sorted(acl['Entries'], key = lambda d : d['RuleNumber']) if not entry['Egress']]
egress_entries = [entry for acl in vpc_dict['vpc']['network_acls'] if acl['NetworkAclId'] == nacl for entry in sorted(acl['Entries'], key = lambda d : d['RuleNumber']) if entry['Egress']]
ingress_cidr_block = ingress_entries[0]['CidrBlock']
ingress_protocol = ingress_entries[0]['Protocol']
ingress_action = ingress_entries[0]['RuleAction']
if not ingress_cidr_block == "0.0.0.0/0" or not ingress_protocol == "-1" or not ingress_action == "allow": # Test Failed, add to fail_list
fail_list.append({
"TransitGatewayId": tgw,
"Region": vpc_dict['region'],
"VpcId": vpc_id,
"NetworkAclId": nacl,
"Direction": "Ingress",
"EntryRuleNumber": ingress_entries[0]['RuleNumber']
})
egress_cidr_block = egress_entries[0]['CidrBlock']
egress_protocol = egress_entries[0]['Protocol']
egress_action = egress_entries[0]['RuleAction']
if not egress_cidr_block == "0.0.0.0/0" or not egress_protocol == "-1" or not egress_action == "allow": # Test Failed, add to fail_list
fail_list.append({
"TransitGatewayId": tgw,
"Region": vpc_dict['region'],
"VpcId": vpc_id,
"NetworkAclId": nacl,
"Direction": "Egress",
"EntryRuleNumber": ingress_entries[0]['RuleNumber']
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All Transit Gateway VPC Attachment Subnet Network ACLs are open."
else:
test_status = "fail"
test_results = [
"The following Transit Gateways VPC Attachment Subnet ACLs don't appear to be open. Please review their configuration to confirm:",
] + [nacl['VpcId']+ '/' + nacl['NetworkAclId'] + '(' + nacl['Direction'] + ')' for nacl in fail_list]
return {
"description": test_description,
"status": test_status,
"results": test_results
}
def run_vpn_attachment_bgp_check(tgws):
test_description = "Use Border Gateway Protocol (BGP) Site-to-Site VPN connections. https://docs.aws.amazon.com/vpc/latest/tgw/tgw-best-design-practices.html"
fail_list = []
vpn_attachments = [{"TransitGatewayId":gw['TransitGatewayId'],"VpnId":attachment['ResourceId']} for each in tgws for gws in each.values() for gw in gws for attachment in gw['attachments'] if attachment['ResourceType'] == "vpn"]
for each in vpn_attachments:
each['vpn_connections'] = [conn for region, attributes in topology['regions'].items() if "vpn_tgw_connections" in attributes for conn in attributes['vpn_tgw_connections'] if conn['VpnConnectionId'] == each['VpnId']]
for attachment in vpn_attachments:
for vpn in attachment['vpn_connections']:
# Check if attachments have static routes (infer BGP disabled if so)
if vpn['Options']['StaticRoutesOnly']:
fail_list.append({
"TransitGatewayId": vpn['TransitGatewayId'],
"VpnId": vpn['VpnConnectionId'],
"Routing": "Static",
"Tunnels": vpn['VgwTelemetry']
})
elif sum([tunnel['AcceptedRouteCount'] for tunnel in vpn['VgwTelemetry']]) == 0: # No learned routes, infer BGP not enabled or not functioning
fail_list.append({
"TransitGatewayId": vpn['TransitGatewayId'],
"VpnId": vpn['VpnConnectionId'],
"Routing": "Dynamic",
"Tunnels": vpn['VgwTelemetry']
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All Transit Gateway VPN connections are learning routes dynamically (BGP enabled)."
else:
test_status = "fail"
test_results = [
"The following Transit Gateway VPNs are not enabled for dynamic routing or are not learning routes:",
] + [vpn['TransitGatewayId'] + '/' + vpn['VpnId'] for vpn in fail_list]
return {
"description": test_description,
"status": test_status,
"results": test_results
}
tgws = [{region:attributes['transit_gateways']} for region, attributes in topology['regions'].items() if attributes['transit_gateways']]
# Create the parent table model
parent_model = deepcopy(word_table_models.parent_tbl)
tgw_results = {
"passed": 0,
"failed": 0
}
if tgws:
# Run best practice checks
tgw_quantity_check = run_tgw_quantity_check(tgws)
if tgw_quantity_check['status'] == "pass":
tgw_results['passed'] += 1
else:
tgw_results['failed'] += 1
unique_bgp_asn_check = run_unique_bgp_asn_check(tgws)
if unique_bgp_asn_check['status'] in ["pass","not-applicable"]:
tgw_results['passed'] += 1
else:
tgw_results['failed'] += 1
one_net_acl_check = run_one_net_acl_check(tgws)
if one_net_acl_check['status'] == "pass":
tgw_results['passed'] += 1
else:
tgw_results['failed'] += 1
net_acl_open_check = run_net_acl_open_check(one_net_acl_check)
if net_acl_open_check['status'] == "pass":
tgw_results['passed'] += 1
else:
tgw_results['failed'] += 1
vpn_attachment_bgp_check = run_vpn_attachment_bgp_check(tgws)
if vpn_attachment_bgp_check['status'] == "pass":
tgw_results['passed'] += 1
else:
tgw_results['failed'] += 1
# Create the tgw_quantity_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Populate the child model with test data
header_color = green_spacer if tgw_quantity_check['status'] == "pass" else orange_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"TGW PER-REGION QUANTITY CHECK: {tgw_quantity_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = tgw_quantity_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = tgw_quantity_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Create the unique_bgp_asn_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Inject a space between child tables
parent_model['table']['rows'].append({"cells":[]})
# Populate the child model with test data
header_color = green_spacer if unique_bgp_asn_check['status'] in ["pass","not-applicable"] else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"UNIQUE BGP ASN CHECK: {unique_bgp_asn_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = unique_bgp_asn_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = unique_bgp_asn_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Create the one_net_acl_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Inject a space between child tables
parent_model['table']['rows'].append({"cells":[]})
# Populate the child model with test data
header_color = green_spacer if one_net_acl_check['status'] == "pass" else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"ONE NET ACL FOR ATTACHMENT SUBNETS CHECK: {one_net_acl_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = one_net_acl_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = one_net_acl_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Create the net_acl_open_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Inject a space between child tables
parent_model['table']['rows'].append({"cells":[]})
# Populate the child model with test data
header_color = green_spacer if net_acl_open_check['status'] == "pass" else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"NET ACL OPEN CHECK: {net_acl_open_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = net_acl_open_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = net_acl_open_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Create the vpn_attachment_bgp_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Inject a space between child tables
parent_model['table']['rows'].append({"cells":[]})
# Populate the child model with test data
header_color = green_spacer if vpn_attachment_bgp_check['status'] == "pass" else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"VPN ATTACHMENT BGP CHECK: {vpn_attachment_bgp_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = vpn_attachment_bgp_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = vpn_attachment_bgp_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Write parent table to Word
if not parent_model['table']['rows']: # Completely Empty Table (no Transit Gateways)
parent_model['table']['rows'].append({"cells":[{"paragraphs": [{"style": "No Spacing", "text": "No Transit Gateways Present"}]}]})
table = build_table(doc_obj, parent_model)
replace_placeholder_with_table(doc_obj, "{{py_tgw_best_practices}}", table)
return tgw_results
def add_vpn_best_practice_analysis_to_word_doc(doc_obj):
def run_vpn_tunnel_status_check():
test_description = "Report any VPN tunnel connections in the DOWN state."
fail_list = []
for vpn in vpns:
for tunnel in vpn['VgwTelemetry']:
if tunnel['Status'] == "DOWN":
fail_list.append({
"VpnId": vpn['VpnConnectionId'],
"TunnelIp": tunnel['OutsideIpAddress']
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All VPN Tunnel Connections are in Status 'UP' State."
else:
test_status = "fail"
test_results = [
"The following VPN Tunnel Connections are in Status 'DOWN' State:",
] + [vpn['VpnId'] + '/' + vpn['TunnelIp'] for vpn in fail_list]
return {
"description": test_description,
"status": test_status,
"results": test_results
}
# Get VPN connections in vpn_tgw_connections dictionary key
vpns = [vpn for attributes in topology['regions'].values() if "vpn_tgw_connections" in attributes for vpn in attributes['vpn_tgw_connections']]
# Add VPN connections in VPC VPN Gateways
vpns += [vpn for vpcs in region_vpcs.values() for vpc in vpcs for vgw in vpc['vpn_gateways'] for vpn in vgw['connections']]
# Create the parent table model
parent_model = deepcopy(word_table_models.parent_tbl)
vpn_results = {
"passed": 0,
"failed": 0
}
if vpns:
# Run best practice checks
vpn_tunnel_status_check = run_vpn_tunnel_status_check()
if vpn_tunnel_status_check['status'] == "pass":
vpn_results['passed'] += 1
else:
vpn_results['failed'] += 1
# Create the _vpn_tunnel_status_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Populate the child model with test data
header_color = green_spacer if vpn_tunnel_status_check['status'] == "pass" else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"VPN TUNNEL STATUS CHECK: {vpn_tunnel_status_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = vpn_tunnel_status_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = vpn_tunnel_status_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Write parent table to Word
if not parent_model['table']['rows']: # Completely Empty Table (no VPNs)
parent_model['table']['rows'].append({"cells":[{"paragraphs": [{"style": "No Spacing", "text": "No VPNs Present"}]}]})
table = build_table(doc_obj, parent_model)
replace_placeholder_with_table(doc_obj, "{{py_vpn_health}}", table)
return vpn_results
def add_vpc_best_practice_analysis_to_word_doc(doc_obj):
def run_empty_vpc_check():
test_description = "Report any VPCs with no EC2 instances."
fail_list = []
for region, vpcs in region_vpcs.items():
for vpc in vpcs:
if not vpc['ec2_instances']:
fail_list.append({
"region": region,
"vpc_id": vpc['VpcId'],
"vpc_name": extract_name_from_aws_tags(vpc)
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All VPCs have at least one EC2 instance."
else:
test_status = "warn"
test_results = [
"The following VPCs have no EC2 instances. Review VPC use case to confirm this is intentional:",
] + [vpc['region'] + '/' + vpc['vpc_id'] + '(' + vpc['vpc_name'] + ')' for vpc in fail_list]
return {
"description": test_description,
"status": test_status,
"results": test_results
}
def run_multi_az_check():
test_description = "When you add subnets to your VPC to host your application, create them in multiple Availability Zones. https://docs.aws.amazon.com/vpc/latest/userguide/vpc-security-best-practices.html"
fail_list = []
for region, vpcs in region_vpcs.items():
for vpc in vpcs:
availability_zones = list(set([subnet['AvailabilityZone'] for subnet in vpc['subnets']]))
if len(availability_zones) == 1:
fail_list.append({
"region": region,
"vpc_id": vpc['VpcId'],
"vpc_name": extract_name_from_aws_tags(vpc)
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All VPCs have subnets in two or more Availability Zones."
else:
test_status = "fail"
test_results = [
"The following VPCs have subnets in only one Availability Zone:",
] + [vpc['region'] + '/' + vpc['vpc_id'] + '(' + vpc['vpc_name'] + ')' for vpc in fail_list]
return {
"description": test_description,
"status": test_status,
"results": test_results
}
# Create the parent table model
parent_model = deepcopy(word_table_models.parent_tbl)
vpc_results = {
"passed": 0,
"failed": 0
}
if len([vpc['VpcId'] for region, vpcs in region_vpcs.items() for vpc in vpcs]) > 0:
# Run best practice checks
empty_vpc_check = run_empty_vpc_check()
if empty_vpc_check['status'] == "pass":
vpc_results['passed'] += 1
else:
vpc_results['failed'] += 1
multi_az_check = run_multi_az_check()
if multi_az_check['status'] == "pass":
vpc_results['passed'] += 1
else:
vpc_results['failed'] += 1
# Create the empty_vpc_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Populate the child model with test data
header_color = green_spacer if empty_vpc_check['status'] == "pass" else orange_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"EMPTY VPC CHECK: {empty_vpc_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = empty_vpc_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = empty_vpc_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Create the multi_az_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Inject a space between child tables
parent_model['table']['rows'].append({"cells":[]})
# Populate the child model with test data
header_color = green_spacer if multi_az_check['status'] == "pass" else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"MULTI-AZ VPC CHECK: {multi_az_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = multi_az_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = multi_az_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Write parent table to Word
if not parent_model['table']['rows']: # Completely Empty Table (no VPNs)
parent_model['table']['rows'].append({"cells":[{"paragraphs": [{"style": "No Spacing", "text": "No VPNs Present"}]}]})
table = build_table(doc_obj, parent_model)
replace_placeholder_with_table(doc_obj, "{{py_vpc_health}}", table)
return vpc_results
def add_lb_best_practice_analysis_to_word_doc(doc_obj):
def run_lb_target_health_check():
test_description = "Report any Load Balancers with Targets in Unhealthy State."
fail_list = []
for lbtg in lbtgs:
for target in lbtg['lbtg']['HealthChecks']:
if not target['TargetHealth']['State'] == "healthy":
# Get VPC Name
for vpcs in region_vpcs.values():
for vpc in vpcs:
if vpc['VpcId'] == lbtg['lbtg']['VpcId']:
vpc_name = extract_name_from_aws_tags(vpc)
fail_list.append({
"region": lbtg['region'],
"vpc_id": lbtg['lbtg']['VpcId'],
"vpc_name": vpc_name,
"target_id": target['Target']['Id']
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All load balancer targets are in a healthy state."
else:
test_status = "fail"
test_results = [
"The following Load Balancer Targets are in an unhealthy state:",
] + [target['region'] + '/' + target['vpc_id'] + '(' + target['vpc_name'] + ')/' + target['target_id'] for target in fail_list]
return {
"description": test_description,
"status": test_status,
"results": test_results
}
lbtgs = [{"region":region,"lbtg":lbtg} for region, vpcs in region_vpcs.items() for vpc in vpcs for lbtg in vpc['lb_target_groups']]
# Create the parent table model
parent_model = deepcopy(word_table_models.parent_tbl)
lb_results = {
"passed": 0,
"failed": 0
}
if len([vpc['VpcId'] for region, vpcs in region_vpcs.items() for vpc in vpcs]) > 0:
# Run best practice checks
lb_target_health_check = run_lb_target_health_check()
if lb_target_health_check['status'] == "pass":
lb_results['passed'] += 1
else:
lb_results['failed'] += 1
# Create the lb_target_health_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Populate the child model with test data
header_color = green_spacer if lb_target_health_check['status'] == "pass" else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"LB TARGET HEALTH CHECK: {lb_target_health_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = lb_target_health_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = lb_target_health_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Write parent table to Word
if not parent_model['table']['rows']: # Completely Empty Table (no VPNs)
parent_model['table']['rows'].append({"cells":[{"paragraphs": [{"style": "No Spacing", "text": "No Load Balancer Targets Present"}]}]})
table = build_table(doc_obj, parent_model)
replace_placeholder_with_table(doc_obj, "{{py_lb_health}}", table)
return lb_results
def add_ec2_best_practice_analysis_to_word_doc(doc_obj):
def run_ec2_ena_enabled_check():
test_description = "Amazon EC2 provides enhanced networking capabilities through the Elastic Network Adapter (ENA). To use enhanced networking, you must install the required ENA module and enable ENA support. https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/enhanced-networking-ena.html"
fail_list = []
for instance in instances:
if not "EnaSupport" in instance['instance'] or not instance['instance']['EnaSupport']:
fail_list.append({
"region": instance['region'],
"vpc_id": instance['vpc_id'],
"vpc_name": instance['vpc_name'],
"instance_id": instance['instance']['InstanceId'],
"instance_name": extract_name_from_aws_tags(instance['instance'])
})
# Create Best Practice Check Return Status
if not fail_list:
test_status = "pass"
test_results = "All EC2 instances have ENA Support enabled."
else:
test_status = "fail"
test_results = [
"The following EC2 Instances do not have ENA Support enabled:",
] + [instance['region'] + '/' + instance['vpc_id'] + '/' + instance['instance_id'] + '(' + instance['instance_name'] + ')' for instance in fail_list]
return {
"description": test_description,
"status": test_status,
"results": test_results
}
instances = [{"region":region,"vpc_id":vpc['VpcId'],"vpc_name":extract_name_from_aws_tags(vpc),"instance":instance} for region, vpcs in region_vpcs.items() for vpc in vpcs for instance in vpc['ec2_instances']]
# Create the parent table model
parent_model = deepcopy(word_table_models.parent_tbl)
ec2_results = {
"passed": 0,
"failed": 0
}
if instances:
# Run best practice checks
ec2_ena_enabled_check = run_ec2_ena_enabled_check()
if ec2_ena_enabled_check['status'] == "pass":
ec2_results['passed'] += 1
else:
ec2_results['failed'] += 1
# Create the ec2_ena_enabled_check child table model
child_model = deepcopy(word_table_models.best_practices_tbl)
# Populate the child model with test data
header_color = green_spacer if ec2_ena_enabled_check['status'] == "pass" else red_spacer
child_model['table']['rows'][0]['cells'][0]['background'] = header_color
child_model['table']['rows'][0]['cells'][0]['paragraphs'][0]['text'] = f"ENA SUPPORT ENABLED CHECK: {ec2_ena_enabled_check['status'].upper()}"
child_model['table']['rows'][1]['cells'][1]['paragraphs'][0]['text'] = ec2_ena_enabled_check['description']
child_model['table']['rows'][2]['cells'][1]['paragraphs'][0]['text'] = ec2_ena_enabled_check['results']
# Inject child model into parent model
parent_model['table']['rows'].append({"cells":[child_model]})
# Write parent table to Word
if not parent_model['table']['rows']: # Completely Empty Table (no VPNs)
parent_model['table']['rows'].append({"cells":[{"paragraphs": [{"style": "No Spacing", "text": "No EC2 Instances Present"}]}]})
table = build_table(doc_obj, parent_model)
replace_placeholder_with_table(doc_obj, "{{py_inst_health}}", table)
return ec2_results
# BUILD WORD TABLE FUNCTIONS
def add_vpcs_to_word_doc(doc_obj):
# Create the base table model
vpc_model = deepcopy(word_table_models.vpc_tbl)
# Populate the table model with data
vpcs = [{"region":region,"vpc":vpc} for region, children in topology['regions'].items() if "vpcs" in children and children['vpcs'] for vpc in children['vpcs']]
for rownum, vpc in enumerate(sorted(vpcs, key = lambda d : d['region']), start=1):
this_rows_cells = []
# Shade every other row for readability
if not (rownum % 2) == 0:
row_color = alternating_row_color
else:
row_color = None
vpc_name = extract_name_from_aws_tags(vpc['vpc'])
# Get number of instances in this VPC
inst_qty = str(len(vpc['vpc']['ec2_instances']))
this_rows_cells.append({"background":row_color,"paragraphs":[{"style":"No Spacing","text":vpc['region']}]})
this_rows_cells.append({"background":row_color,"paragraphs":[{"style":"No Spacing","text":vpc_name}]})
this_rows_cells.append({"background":row_color,"paragraphs":[{"style":"No Spacing","text":vpc['vpc']['CidrBlock']}]})