#!/usr/bin/python3 """ Create iptables ipset based on Geo IP database from Maxmind """ from tempfile import TemporaryDirectory from io import BytesIO import csv import sys import os import zipfile import requests import netaddr DBURL = 'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country-CSV&license_key=LGWVg3A9Md9no07J&suffix=zip' # pylint: disable=line-too-long COUNTRYCODE = None IPSubnets = [] def load_DB(): """ Download the zip from from Maxmine, unzip, find country code, and load subnets into array """ with TemporaryDirectory() as tempDir: r = requests.get(DBURL, allow_redirects=True) try: with zipfile.ZipFile(BytesIO(r.content)) as zfile: zfile.extractall(path=tempDir) except zipfile.BadZipFile: return 'Bad zip archive' for root, dirs, files in os.walk(tempDir): # pylint: disable=unused-variable for archive_dir in dirs: locations_file = root + '/' + archive_dir + '/' + 'GeoLite2-Country-Locations-en.csv' with open(locations_file, newline='') as csv_locations_file: csv_locations_reader = csv.DictReader(csv_locations_file) try: for row in csv_locations_reader: if row['country_iso_code'] == 'AU': COUNTRYCODE = row['geoname_id'] break except csv.Error as error: sys.exit('file {}, line {}: {}'.format(csv_locations_file, csv_locations_reader.line_num, error)) if COUNTRYCODE: country_blocks_file = root + '/' + archive_dir + '/' + 'GeoLite2-Country-Blocks-IPv4.csv' with open(country_blocks_file, newline='') as csv_country_blocks_file: csv_country_blocks_reader = csv.DictReader(csv_country_blocks_file) try: for row in csv_country_blocks_reader: if row['geoname_id'] == COUNTRYCODE: IPSubnets.extend(netaddr.IPNetwork(row['network'])) except csv.Error as error: sys.exit('file {}, line {}: {}'.format(csv_country_blocks_file, csv_country_blocks_reader.line_num, error)) def build_ip_set(): """ Take the list of subnets, merge them and load the iptables ipset """ # summary_subnets = netaddr.cidr_merge(IPSubnets) os.system('sudo ipset create GEO hash:net -exist') os.system('sudo ipset create GEO2 hash:net -exist') os.system('sudo ipset flush GEO2') for subnet in IPSubnets: os.system('sudo ipset add GEO2 ' + subnet) os.system('sudo ipset swap GEO GEO2') os.system('sudo ipset destroy GEO2') load_DB() build_ip_set() # # inFilePath = 'GeoIPCountryWhois.csv' # # inFilePath = 'au.csv' # inFilePath = 'firewall.txt' # os.system('sudo ipset create TEST2 hash:net -exist') # os.system('sudo ipset flush TEST2') # iplist = [] # with open(inFilePath, 'r') as inFile: # # fieldnames = ['StartIP', 'EndIP', 'DecIPStart', 'DecIPEnd', 'CountryCode', "Country"] # # fieldnames = 'StartIP', 'EndIP','Number','Date'] # # csvReader = csv.DictReader(inFile, fieldnames=fieldnames) # # for row in csvReader: # # if row['CountryCode'] == 'AU': # # iplist.extend(netaddr.iprange_to_cidrs(row['StartIP'], row['EndIP'])) # for row in inFile: # print(row) # if not row.startswith("#"): # iplist.extend(netaddr.IPNetwork(row)) # print("########################################") # summary_subnets = netaddr.cidr_merge(iplist) # for subnet in summary_subnets: # print(subnet) # os.system('sudo ipset add TEST2 ' + str(subnet)) # os.system('sudo ipset swap TEST TEST2') # os.system('sudo ipset destroy TEST2')