From 5549bd0fdc50afcc779bd37c518c6254edc2ed25 Mon Sep 17 00:00:00 2001 From: Adrian Woodley Date: Fri, 8 Oct 2021 12:23:37 +0800 Subject: [PATCH] Reworked for Maxmine CSV --- geo-firewall.py | 39 ------------------ geofirewall.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 39 deletions(-) delete mode 100755 geo-firewall.py create mode 100755 geofirewall.py diff --git a/geo-firewall.py b/geo-firewall.py deleted file mode 100755 index 00de7ba..0000000 --- a/geo-firewall.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/python3 -import netaddr -import csv -import sys -import os - -# inFilePath = 'GeoIPCountryWhois.csv' -# inFilePath = 'au.csv' -inFilePath = 'firewall.txt' - -os.system('sudo ipset create TEST2 hash:net -exist') -os.system('sudo ipset flush TEST2') - -iplist = [] - -with open(inFilePath, 'r') as inFile: - # fieldnames = ['StartIP', 'EndIP', 'DecIPStart', 'DecIPEnd', 'CountryCode', "Country"] - # fieldnames = 'StartIP', 'EndIP','Number','Date'] - # csvReader = csv.DictReader(inFile, fieldnames=fieldnames) - - # for row in csvReader: - # if row['CountryCode'] == 'AU': - # iplist.extend(netaddr.iprange_to_cidrs(row['StartIP'], row['EndIP'])) - - for row in inFile: - print(row) - if not row.startswith("#"): - iplist.extend(netaddr.IPNetwork(row)) - -print("########################################") - - -summary_subnets = netaddr.cidr_merge(iplist) -for subnet in summary_subnets: - print(subnet) - os.system('sudo ipset add TEST2 ' + str(subnet)) - -os.system('sudo ipset swap TEST TEST2') -os.system('sudo ipset destroy TEST2') \ No newline at end of file diff --git a/geofirewall.py b/geofirewall.py new file mode 100755 index 0000000..acd7c1d --- /dev/null +++ b/geofirewall.py @@ -0,0 +1,107 @@ +#!/usr/bin/python3 +""" +Create iptables ipset based on Geo IP database from Maxmind +""" +from tempfile import TemporaryDirectory +from io import BytesIO + +import csv +import sys +import os +import zipfile +import requests +import netaddr + +DBURL = 'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country-CSV&license_key=LGWVg3A9Md9no07J&suffix=zip' # pylint: disable=line-too-long + +COUNTRYCODE = None +IPSubnets = [] + +def load_DB(): + """ + Download the zip from from Maxmine, unzip, find country code, and load subnets into array + """ + with TemporaryDirectory() as tempDir: + r = requests.get(DBURL, allow_redirects=True) + try: + with zipfile.ZipFile(BytesIO(r.content)) as zfile: + zfile.extractall(path=tempDir) + except zipfile.BadZipFile: + return 'Bad zip archive' + + for root, dirs, files in os.walk(tempDir): # pylint: disable=unused-variable + for archive_dir in dirs: + locations_file = root + '/' + archive_dir + '/' + 'GeoLite2-Country-Locations-en.csv' + with open(locations_file, newline='') as csv_locations_file: + csv_locations_reader = csv.DictReader(csv_locations_file) + try: + for row in csv_locations_reader: + if row['country_iso_code'] == 'AU': + COUNTRYCODE = row['geoname_id'] + break + except csv.Error as error: + sys.exit('file {}, line {}: {}'.format(csv_locations_file, csv_locations_reader.line_num, error)) + + if COUNTRYCODE: + country_blocks_file = root + '/' + dir + '/' + 'GeoLite2-Country-Blocks-IPv4.csv' + with open(country_blocks_file, newline='') as csv_country_blocks_file: + csv_country_blocks_reader = csv.DictReader(csv_country_blocks_file) + try: + for row in csv_country_blocks_reader: + if row['geoname_id'] == COUNTRYCODE: + IPSubnets.extend(netaddr.IPNetwork(row['network'])) + except csv.Error as error: + sys.exit('file {}, line {}: {}'.format(csv_country_blocks_file, csv_country_blocks_reader.line_num, error)) + + +def build_ip_set(): + """ + Take the list of subnets, merge them and load the iptables ipset + """ + summary_subnets = netaddr.cidr_merge(IPSubnets) + os.system('sudo ipset create GEO hash:net -exist') + os.system('sudo ipset create GEO2 hash:net -exist') + os.system('sudo ipset flush GEO2') + + for subnet in summary_subnets: + os.system('sudo ipset add GEO2 ' + str(subnet)) + + os.system('sudo ipset swap GEO GEO2') + os.system('sudo ipset destroy GEO2') + +load_DB() +build_ip_set() + +# # inFilePath = 'GeoIPCountryWhois.csv' +# # inFilePath = 'au.csv' +# inFilePath = 'firewall.txt' + +# os.system('sudo ipset create TEST2 hash:net -exist') +# os.system('sudo ipset flush TEST2') + +# iplist = [] + +# with open(inFilePath, 'r') as inFile: +# # fieldnames = ['StartIP', 'EndIP', 'DecIPStart', 'DecIPEnd', 'CountryCode', "Country"] +# # fieldnames = 'StartIP', 'EndIP','Number','Date'] +# # csvReader = csv.DictReader(inFile, fieldnames=fieldnames) + +# # for row in csvReader: +# # if row['CountryCode'] == 'AU': +# # iplist.extend(netaddr.iprange_to_cidrs(row['StartIP'], row['EndIP'])) + +# for row in inFile: +# print(row) +# if not row.startswith("#"): +# iplist.extend(netaddr.IPNetwork(row)) + +# print("########################################") + + +# summary_subnets = netaddr.cidr_merge(iplist) +# for subnet in summary_subnets: +# print(subnet) +# os.system('sudo ipset add TEST2 ' + str(subnet)) + +# os.system('sudo ipset swap TEST TEST2') +# os.system('sudo ipset destroy TEST2')