This repository has been archived by the owner on Jul 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ip2asn.py
142 lines (110 loc) · 4.44 KB
/
ip2asn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Mastodon infrastructure analysis tool. See README for usage.
Copyright 2020 Dominik Pataky <[email protected]>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import csv
import gzip
import ipaddress
import json
import os.path
import hashlib
from tqdm import tqdm
ipaddressified_ip_networks = {}
def ipaddressify_ip_networks(ip_networks: list) -> list:
# print("Converting ip_networks to list with ipaddress.ip_address objects")
new_ip_networks = []
for entry in ip_networks:
entry["start"] = ipaddress.ip_address(entry["start"])
entry["end"] = ipaddress.ip_address(entry["end"])
new_ip_networks.append(entry)
return new_ip_networks
def asnfile_init(filename: str) -> dict:
ip_networks = {}
if not os.path.exists(filename):
raise FileNotFoundError
# Check hash of cache file
filehash = hashlib.sha1()
with open(filename, 'rb') as fh:
while True:
data = fh.read(65536) # read in 64kb chunks
if not data:
break
filehash.update(data)
# construct file name from hash
cachefile = ".asnfile_cached_{}.gz".format(filehash.hexdigest())
if os.path.exists(cachefile):
with gzip.open(cachefile, "rt") as fh:
# print("Using cached parsing result")
return json.load(fh)["ip_networks"]
if filename.endswith(".gz"):
fh = gzip.open(filename, "rt")
else:
fh = open(filename, "r")
tsv = csv.reader(fh, delimiter="\t")
# Using slices to chop up >400.000 entries which would later need to be iterated in whole
current_slice = None
current_slice_size = 0
for row in tqdm(tsv, desc="Parsing entries in AS file {}".format(filename)):
if current_slice is None:
# Initialization
current_slice = row[0]
if current_slice_size == 1000:
current_slice = row[0]
current_slice_size = 0
if current_slice not in ip_networks:
ip_networks[current_slice] = []
entry = {
"start": row[0],
"end": row[1],
"asn": int(row[2]),
"country": row[3],
"name": row[4]
}
ip_networks[current_slice].append(entry)
current_slice_size += 1
fh.close()
with gzip.open(cachefile, "wt") as fh:
print("Persisting cache file for AS parsing {}".format(filename))
json.dump({"ip_networks": ip_networks}, fh)
return ip_networks
def get_asn_of_ip(ip: [str, ipaddress.IPv4Address, ipaddress.IPv6Address], ip_networks: dict) -> list:
if not type(ip) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
ip = ipaddress.ip_address(ip)
candidates = []
# Iterate over slices to find the right network slice
match = None
for slice_start in ip_networks.keys():
slice_start_ip = ipaddress.ip_address(slice_start)
if ip == slice_start_ip:
# we exactly matched the beginning IP of a slice, set it as match
match = slice_start
break
elif ip < slice_start_ip:
# Searched IP is in previous slice
break
# Set slice as possible match, meaning it can be accessed as "previous" slice if the next iteration breaks
match = slice_start
if match is None:
return []
# Cache processed network conversions
if match in ipaddressified_ip_networks:
converted_network = ipaddressified_ip_networks[match]
else:
converted_network = ipaddressify_ip_networks(ip_networks[match])
ipaddressified_ip_networks[match] = converted_network
# The IP that is searched for is in the last matched slice
for network in converted_network:
# Using ipaddress objects as network start and end
if network["start"] < ip < network["end"] and network["asn"] != 0:
candidates.append(network)
return candidates