108 lines
3.9 KiB
Python
Executable File
108 lines
3.9 KiB
Python
Executable File
#!/usr/bin/python3
|
|
"""
|
|
Create iptables ipset based on Geo IP database from Maxmind
|
|
"""
|
|
from tempfile import TemporaryDirectory
|
|
from io import BytesIO
|
|
|
|
import csv
|
|
import sys
|
|
import os
|
|
import zipfile
|
|
import requests
|
|
import netaddr
|
|
|
|
DBURL = 'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country-CSV&license_key=LGWVg3A9Md9no07J&suffix=zip' # pylint: disable=line-too-long
|
|
|
|
COUNTRYCODE = None
|
|
IPSubnets = []
|
|
|
|
def load_DB():
|
|
"""
|
|
Download the zip from from Maxmine, unzip, find country code, and load subnets into array
|
|
"""
|
|
with TemporaryDirectory() as tempDir:
|
|
r = requests.get(DBURL, allow_redirects=True)
|
|
try:
|
|
with zipfile.ZipFile(BytesIO(r.content)) as zfile:
|
|
zfile.extractall(path=tempDir)
|
|
except zipfile.BadZipFile:
|
|
return 'Bad zip archive'
|
|
|
|
for root, dirs, files in os.walk(tempDir): # pylint: disable=unused-variable
|
|
for archive_dir in dirs:
|
|
locations_file = root + '/' + archive_dir + '/' + 'GeoLite2-Country-Locations-en.csv'
|
|
with open(locations_file, newline='') as csv_locations_file:
|
|
csv_locations_reader = csv.DictReader(csv_locations_file)
|
|
try:
|
|
for row in csv_locations_reader:
|
|
if row['country_iso_code'] == 'AU':
|
|
COUNTRYCODE = row['geoname_id']
|
|
break
|
|
except csv.Error as error:
|
|
sys.exit('file {}, line {}: {}'.format(csv_locations_file, csv_locations_reader.line_num, error))
|
|
|
|
if COUNTRYCODE:
|
|
country_blocks_file = root + '/' + archive_dir + '/' + 'GeoLite2-Country-Blocks-IPv4.csv'
|
|
with open(country_blocks_file, newline='') as csv_country_blocks_file:
|
|
csv_country_blocks_reader = csv.DictReader(csv_country_blocks_file)
|
|
try:
|
|
for row in csv_country_blocks_reader:
|
|
if row['geoname_id'] == COUNTRYCODE:
|
|
IPSubnets.extend(netaddr.IPNetwork(row['network']))
|
|
except csv.Error as error:
|
|
sys.exit('file {}, line {}: {}'.format(csv_country_blocks_file, csv_country_blocks_reader.line_num, error))
|
|
|
|
|
|
def build_ip_set():
|
|
"""
|
|
Take the list of subnets, merge them and load the iptables ipset
|
|
"""
|
|
# summary_subnets = netaddr.cidr_merge(IPSubnets)
|
|
os.system('sudo ipset create GEO hash:net -exist')
|
|
os.system('sudo ipset create GEO2 hash:net -exist')
|
|
os.system('sudo ipset flush GEO2')
|
|
|
|
for subnet in IPSubnets:
|
|
os.system('sudo ipset add GEO2 ' + subnet)
|
|
|
|
os.system('sudo ipset swap GEO GEO2')
|
|
os.system('sudo ipset destroy GEO2')
|
|
|
|
load_DB()
|
|
build_ip_set()
|
|
|
|
# # inFilePath = 'GeoIPCountryWhois.csv'
|
|
# # inFilePath = 'au.csv'
|
|
# inFilePath = 'firewall.txt'
|
|
|
|
# os.system('sudo ipset create TEST2 hash:net -exist')
|
|
# os.system('sudo ipset flush TEST2')
|
|
|
|
# iplist = []
|
|
|
|
# with open(inFilePath, 'r') as inFile:
|
|
# # fieldnames = ['StartIP', 'EndIP', 'DecIPStart', 'DecIPEnd', 'CountryCode', "Country"]
|
|
# # fieldnames = 'StartIP', 'EndIP','Number','Date']
|
|
# # csvReader = csv.DictReader(inFile, fieldnames=fieldnames)
|
|
|
|
# # for row in csvReader:
|
|
# # if row['CountryCode'] == 'AU':
|
|
# # iplist.extend(netaddr.iprange_to_cidrs(row['StartIP'], row['EndIP']))
|
|
|
|
# for row in inFile:
|
|
# print(row)
|
|
# if not row.startswith("#"):
|
|
# iplist.extend(netaddr.IPNetwork(row))
|
|
|
|
# print("########################################")
|
|
|
|
|
|
# summary_subnets = netaddr.cidr_merge(iplist)
|
|
# for subnet in summary_subnets:
|
|
# print(subnet)
|
|
# os.system('sudo ipset add TEST2 ' + str(subnet))
|
|
|
|
# os.system('sudo ipset swap TEST TEST2')
|
|
# os.system('sudo ipset destroy TEST2')
|