Reworked for Maxmine CSV
This commit is contained in:
@@ -1,39 +0,0 @@
|
|||||||
#!/usr/bin/python3
|
|
||||||
import netaddr
|
|
||||||
import csv
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
|
|
||||||
# inFilePath = 'GeoIPCountryWhois.csv'
|
|
||||||
# inFilePath = 'au.csv'
|
|
||||||
inFilePath = 'firewall.txt'
|
|
||||||
|
|
||||||
os.system('sudo ipset create TEST2 hash:net -exist')
|
|
||||||
os.system('sudo ipset flush TEST2')
|
|
||||||
|
|
||||||
iplist = []
|
|
||||||
|
|
||||||
with open(inFilePath, 'r') as inFile:
|
|
||||||
# fieldnames = ['StartIP', 'EndIP', 'DecIPStart', 'DecIPEnd', 'CountryCode', "Country"]
|
|
||||||
# fieldnames = 'StartIP', 'EndIP','Number','Date']
|
|
||||||
# csvReader = csv.DictReader(inFile, fieldnames=fieldnames)
|
|
||||||
|
|
||||||
# for row in csvReader:
|
|
||||||
# if row['CountryCode'] == 'AU':
|
|
||||||
# iplist.extend(netaddr.iprange_to_cidrs(row['StartIP'], row['EndIP']))
|
|
||||||
|
|
||||||
for row in inFile:
|
|
||||||
print(row)
|
|
||||||
if not row.startswith("#"):
|
|
||||||
iplist.extend(netaddr.IPNetwork(row))
|
|
||||||
|
|
||||||
print("########################################")
|
|
||||||
|
|
||||||
|
|
||||||
summary_subnets = netaddr.cidr_merge(iplist)
|
|
||||||
for subnet in summary_subnets:
|
|
||||||
print(subnet)
|
|
||||||
os.system('sudo ipset add TEST2 ' + str(subnet))
|
|
||||||
|
|
||||||
os.system('sudo ipset swap TEST TEST2')
|
|
||||||
os.system('sudo ipset destroy TEST2')
|
|
||||||
107
geofirewall.py
Executable file
107
geofirewall.py
Executable file
@@ -0,0 +1,107 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
"""
|
||||||
|
Create iptables ipset based on Geo IP database from Maxmind
|
||||||
|
"""
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
import csv
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import zipfile
|
||||||
|
import requests
|
||||||
|
import netaddr
|
||||||
|
|
||||||
|
DBURL = 'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country-CSV&license_key=LGWVg3A9Md9no07J&suffix=zip' # pylint: disable=line-too-long
|
||||||
|
|
||||||
|
COUNTRYCODE = None
|
||||||
|
IPSubnets = []
|
||||||
|
|
||||||
|
def load_DB():
|
||||||
|
"""
|
||||||
|
Download the zip from from Maxmine, unzip, find country code, and load subnets into array
|
||||||
|
"""
|
||||||
|
with TemporaryDirectory() as tempDir:
|
||||||
|
r = requests.get(DBURL, allow_redirects=True)
|
||||||
|
try:
|
||||||
|
with zipfile.ZipFile(BytesIO(r.content)) as zfile:
|
||||||
|
zfile.extractall(path=tempDir)
|
||||||
|
except zipfile.BadZipFile:
|
||||||
|
return 'Bad zip archive'
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(tempDir): # pylint: disable=unused-variable
|
||||||
|
for archive_dir in dirs:
|
||||||
|
locations_file = root + '/' + archive_dir + '/' + 'GeoLite2-Country-Locations-en.csv'
|
||||||
|
with open(locations_file, newline='') as csv_locations_file:
|
||||||
|
csv_locations_reader = csv.DictReader(csv_locations_file)
|
||||||
|
try:
|
||||||
|
for row in csv_locations_reader:
|
||||||
|
if row['country_iso_code'] == 'AU':
|
||||||
|
COUNTRYCODE = row['geoname_id']
|
||||||
|
break
|
||||||
|
except csv.Error as error:
|
||||||
|
sys.exit('file {}, line {}: {}'.format(csv_locations_file, csv_locations_reader.line_num, error))
|
||||||
|
|
||||||
|
if COUNTRYCODE:
|
||||||
|
country_blocks_file = root + '/' + dir + '/' + 'GeoLite2-Country-Blocks-IPv4.csv'
|
||||||
|
with open(country_blocks_file, newline='') as csv_country_blocks_file:
|
||||||
|
csv_country_blocks_reader = csv.DictReader(csv_country_blocks_file)
|
||||||
|
try:
|
||||||
|
for row in csv_country_blocks_reader:
|
||||||
|
if row['geoname_id'] == COUNTRYCODE:
|
||||||
|
IPSubnets.extend(netaddr.IPNetwork(row['network']))
|
||||||
|
except csv.Error as error:
|
||||||
|
sys.exit('file {}, line {}: {}'.format(csv_country_blocks_file, csv_country_blocks_reader.line_num, error))
|
||||||
|
|
||||||
|
|
||||||
|
def build_ip_set():
|
||||||
|
"""
|
||||||
|
Take the list of subnets, merge them and load the iptables ipset
|
||||||
|
"""
|
||||||
|
summary_subnets = netaddr.cidr_merge(IPSubnets)
|
||||||
|
os.system('sudo ipset create GEO hash:net -exist')
|
||||||
|
os.system('sudo ipset create GEO2 hash:net -exist')
|
||||||
|
os.system('sudo ipset flush GEO2')
|
||||||
|
|
||||||
|
for subnet in summary_subnets:
|
||||||
|
os.system('sudo ipset add GEO2 ' + str(subnet))
|
||||||
|
|
||||||
|
os.system('sudo ipset swap GEO GEO2')
|
||||||
|
os.system('sudo ipset destroy GEO2')
|
||||||
|
|
||||||
|
load_DB()
|
||||||
|
build_ip_set()
|
||||||
|
|
||||||
|
# # inFilePath = 'GeoIPCountryWhois.csv'
|
||||||
|
# # inFilePath = 'au.csv'
|
||||||
|
# inFilePath = 'firewall.txt'
|
||||||
|
|
||||||
|
# os.system('sudo ipset create TEST2 hash:net -exist')
|
||||||
|
# os.system('sudo ipset flush TEST2')
|
||||||
|
|
||||||
|
# iplist = []
|
||||||
|
|
||||||
|
# with open(inFilePath, 'r') as inFile:
|
||||||
|
# # fieldnames = ['StartIP', 'EndIP', 'DecIPStart', 'DecIPEnd', 'CountryCode', "Country"]
|
||||||
|
# # fieldnames = 'StartIP', 'EndIP','Number','Date']
|
||||||
|
# # csvReader = csv.DictReader(inFile, fieldnames=fieldnames)
|
||||||
|
|
||||||
|
# # for row in csvReader:
|
||||||
|
# # if row['CountryCode'] == 'AU':
|
||||||
|
# # iplist.extend(netaddr.iprange_to_cidrs(row['StartIP'], row['EndIP']))
|
||||||
|
|
||||||
|
# for row in inFile:
|
||||||
|
# print(row)
|
||||||
|
# if not row.startswith("#"):
|
||||||
|
# iplist.extend(netaddr.IPNetwork(row))
|
||||||
|
|
||||||
|
# print("########################################")
|
||||||
|
|
||||||
|
|
||||||
|
# summary_subnets = netaddr.cidr_merge(iplist)
|
||||||
|
# for subnet in summary_subnets:
|
||||||
|
# print(subnet)
|
||||||
|
# os.system('sudo ipset add TEST2 ' + str(subnet))
|
||||||
|
|
||||||
|
# os.system('sudo ipset swap TEST TEST2')
|
||||||
|
# os.system('sudo ipset destroy TEST2')
|
||||||
Reference in New Issue
Block a user