311 lines
10 KiB
Python
Executable File
311 lines
10 KiB
Python
Executable File
#!/usr/bin/python3
|
|
"""Programmatically configure Aviat INUes using their undocumented protocol."""
|
|
|
|
# import os
|
|
import sys
|
|
import logging
|
|
from dataclasses import dataclass
|
|
# import socket
|
|
import telnetlib
|
|
import re
|
|
import csv
|
|
import threading
|
|
import time
|
|
import argparse
|
|
try:
|
|
import queue
|
|
except ImportError:
|
|
import Queue as queue
|
|
|
|
PORT = 26000
|
|
NEXT = 1
|
|
WORKERS = 1
|
|
|
|
Section = "CDRN / TRN WAN"
|
|
|
|
fields = [
|
|
'Section',
|
|
'Subnet',
|
|
'Mask',
|
|
'Description'
|
|
]
|
|
|
|
subnets = []
|
|
|
|
ARG_PARSER = argparse.ArgumentParser(
|
|
description='Subnet discovery of Aviat Eclipse INUes.')
|
|
ARG_PARSER.add_argument('-s', sction="store", dest="subnet", default="10.250.0.0/16")
|
|
ARG_PARSER.add_argument('-o', action="store", dest="out_file", default="import.csv")
|
|
ARG_PARSER.add_argument('--debug', action="store_true")
|
|
arguments = ARG_PARSER.parse_args()
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
OUTFILEPATH = arguments.out_file
|
|
SUBNET = arguments.subnet
|
|
DEBUG = arguments.debug
|
|
|
|
def debug(message):
|
|
if DEBUG:
|
|
logging.debug(message)
|
|
|
|
@dataclass
|
|
class INUeCommand():
|
|
"""Command to be sent to an INUe, including the context and possible value"""
|
|
context: str
|
|
command: str
|
|
position: int = 0
|
|
value: int = None
|
|
reverse: bool = False
|
|
|
|
def __post_init__(self):
|
|
if self.reverse:
|
|
self.position, self.value = self.value, self.position
|
|
|
|
def get_context(self):
|
|
"""Get the context of the command, ready to send."""
|
|
debug ("contextid %s\n" % self.context)
|
|
return ("contextid %s\n" % self.context).encode('ascii')
|
|
|
|
def get_command(self):
|
|
"""Get the command, as either a get or set, based on the value."""
|
|
if self.value is None:
|
|
debug ('get %s %s\n' % (self.command, self.position))
|
|
return ('get %s %s\n' % (self.command, self.position)).encode('ascii')
|
|
else:
|
|
debug ('set %s %s %s\n' % (self.command, self.position, self.value))
|
|
return ('set %s %s %s\n' % (self.command, self.position, self.value)).encode('ascii')
|
|
|
|
|
|
def send_config(telnet, inue_command):
|
|
"""Send a config command to an INUe via a supplied telnet connection."""
|
|
# telnet.write(("contextid %s\n" % context).encode('ascii'))
|
|
telnet.write(inue_command.get_context())
|
|
telnet.read_until(b'Command Executed. Status unknown.\n')
|
|
|
|
telnet.write(inue_command.get_command())
|
|
response = telnet.read_until(b'\n').decode()
|
|
if inue_command.command in response:
|
|
debug("Response: %s" % response.split(" = ",1)[-1])
|
|
return response.split(" = ",1)[-1]
|
|
else:
|
|
return False
|
|
|
|
|
|
# def commit_save(telnet):
|
|
# """Save and commit the config changes, via a supplied telnet connection."""
|
|
# send_config(telnet, INUeCommand("Terminal_I", "configCommitSave", 0, 1))
|
|
# send_config(telnet, INUeCommand("Terminal_I", "configCommitSwitch", 0, 1))
|
|
# # send_config(telnet, "Terminal_I", "configCommitSave", 0, 1)
|
|
# # send_config(telnet, "Terminal_I", "configCommitSwitch", 0, 1)
|
|
|
|
|
|
def get_district(site_name):
|
|
"""Determine the segment district from the site name."""
|
|
district = False
|
|
debug("Site Name: %s" % site_name)
|
|
if site_name:
|
|
district = re.search(r"(([a-zA-Z]+S\d)|(C[a-zA-Z]+\d+)|(T[a-zA-Z]+)|([a-zA-Z]+(-Core)?))([\- ]+\d )?.*", site_name).group(1)
|
|
# district=re.split(r'[\d]+', site_name)[0]
|
|
# if district.endswith('S') and "-" not in site_name:
|
|
# district = district[:-1]
|
|
|
|
return district
|
|
|
|
def get_site(site_name):
|
|
"""Determine the site ID from the site name."""
|
|
site = False
|
|
debug("Site Name: %s" % site_name)
|
|
if site_name:
|
|
site = re.search(r"([\w\-]*).*", site_name).group(1)
|
|
|
|
return site
|
|
|
|
class Worker(threading.Thread):
|
|
"""Class for worker threads which will process INUEs."""
|
|
|
|
def __init__(self, inues, passed, failed):
|
|
self.__inues = inues
|
|
self.__passed = passed
|
|
self.__failed = failed
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
while True:
|
|
inue = self.__inues.get()
|
|
if inue is None:
|
|
self.__passed.put(None)
|
|
self.__failed.put(None)
|
|
break
|
|
else:
|
|
new_ip = inue.replace(OLD_SUBNET, NEW_SUBNET)
|
|
|
|
rstp_dac = 0
|
|
|
|
try:
|
|
with telnetlib.Telnet(inue, PORT) as telnet:
|
|
district = get_district(send_config(telnet,
|
|
INUeCommand("Terminal_I", "sysDetailsInfoSiteName")))
|
|
debug("District: %s" % district)
|
|
for slot in range(1, 10):
|
|
if (send_config(telnet, INUeCommand(
|
|
"Slot%s_I" % slot, "ceConfigBridgeStpMode")) or
|
|
send_config(telnet, INUeCommand(
|
|
"Slot%s_I" % slot, "ceConfigPortMacLearning", 1)) == 2):
|
|
dac_name = send_config(telnet, INUeCommand(
|
|
"Terminal_I", "slotConfigName", slot))
|
|
debug("DAC: %s" % dac_name)
|
|
if district in dac_name:
|
|
rstp_dac = slot
|
|
break
|
|
|
|
if not rstp_dac:
|
|
self.__failed.put(inue)
|
|
continue
|
|
|
|
if OSPF_AREA:
|
|
commands = [
|
|
INUeCommand("Terminal_I", "ipConfigOspfAreaId", 1, OSPF_AREA)
|
|
]
|
|
|
|
elif CLEANUP:
|
|
debug('cleanup')
|
|
commands = [
|
|
INUeCommand("Terminal_I", "ipConfigAdEntAddress", rstp_dac + 1, '0.0.0.0'),
|
|
INUeCommand("Terminal_I", "ipConfigAdEntNetMask", rstp_dac + 1, '255.255.255.255'),
|
|
INUeCommand("Slot%s_I" % rstp_dac, "ceConfigNmsVid", 0, 0),
|
|
# INUeCommand("Terminal_I", "ipConfigAutoroutingOspfEnable", rstp_dac + 1, 2),
|
|
# INUeCommand("Terminal_I", "ipCidrRouteIfIndex", rstp_dac + 48, GATEWAY, True),
|
|
]
|
|
|
|
elif TEST:
|
|
debug('test')
|
|
self.__passed.put({'IP Address': inue, 'RSTP DAC': rstp_dac})
|
|
continue
|
|
|
|
else:
|
|
commands = [
|
|
INUeCommand("Slot%s_I" % rstp_dac, "ceConfigNmsVid", 0, VLAN),
|
|
INUeCommand("Terminal_I", "ipConfigAdEntAddress", rstp_dac + 1, new_ip),
|
|
INUeCommand("Terminal_I", "ipConfigAdEntNetMask", rstp_dac + 1, NET_MASK),
|
|
INUeCommand("Terminal_I", "ipConfigAutoroutingOspfEnable", rstp_dac + 1, 2),
|
|
INUeCommand("Terminal_I", "ipCidrRouteIfIndex", rstp_dac + 48, GATEWAY, True),
|
|
]
|
|
|
|
fail = False
|
|
|
|
for command in commands:
|
|
if not send_config(telnet, command):
|
|
self.__failed.put(inue)
|
|
fail = True
|
|
break
|
|
|
|
if fail:
|
|
continue
|
|
|
|
commit_save(telnet)
|
|
|
|
except OSError:
|
|
self.__failed.put(inue)
|
|
continue
|
|
|
|
if CLEANUP:
|
|
new_ip = inue
|
|
|
|
time.sleep(5)
|
|
|
|
|
|
try:
|
|
with telnetlib.Telnet(new_ip, PORT) as telnet:
|
|
self.__passed.put(new_ip)
|
|
except OSError:
|
|
self.__failed.put(inue)
|
|
continue
|
|
|
|
|
|
def input_inues():
|
|
"""Load list of INUEs from CSV file."""
|
|
with open(INFILEPATH, 'r', newline='') as in_file:
|
|
csv_reader = csv.DictReader(in_file)
|
|
|
|
for row in csv_reader:
|
|
INUE_QUEUE.put(row['IP Address'])
|
|
in_file.close()
|
|
|
|
#pylint: disable=unused-variable,redefined-outer-name
|
|
for worker in range(WORKERS):
|
|
INUE_QUEUE.put(None)
|
|
|
|
|
|
def failed_output():
|
|
"""Output a list of failed INUEs to standard out."""
|
|
count = WORKERS
|
|
|
|
# with open('/tmp/sc200.csv', 'w', newline='') as outFile:
|
|
# csvWriter = csv.DictWriter(outFile, fieldnames=fieldNames,
|
|
# delimiter=',', quotechar='"',
|
|
# quoting=csv.QUOTE_MINIMAL)
|
|
|
|
# csvWriter.writeheader()
|
|
|
|
print('Failed:')
|
|
|
|
while 1:
|
|
inue = FAILED_QUEUE.get()
|
|
if inue is None:
|
|
if count > 1:
|
|
count -= 1
|
|
# print(Count)
|
|
else:
|
|
break
|
|
else:
|
|
print(inue)
|
|
|
|
|
|
def passed_output():
|
|
"""Output a list of successfully modified INUes into a CSV file."""
|
|
count = WORKERS
|
|
|
|
field_names = ['IP Address']
|
|
if TEST:
|
|
field_names = ['IP Address', 'RSTP DAC']
|
|
with open(OUTFILEPATH, 'w', newline='') as out_file:
|
|
csv_writer = csv.DictWriter(out_file, fieldnames=field_names,
|
|
delimiter=',', quotechar='"',
|
|
quoting=csv.QUOTE_MINIMAL)
|
|
|
|
csv_writer.writeheader()
|
|
|
|
while 1:
|
|
inue = PASSED_QUEUE.get()
|
|
if inue is None:
|
|
if count > 1:
|
|
count -= 1
|
|
else:
|
|
break
|
|
else:
|
|
if TEST:
|
|
csv_writer.writerow(inue)
|
|
else:
|
|
csv_writer.writerow({"IP Address": inue})
|
|
|
|
out_file.close()
|
|
|
|
|
|
INUE_QUEUE = queue.Queue(0)
|
|
PASSED_QUEUE = queue.Queue(0)
|
|
FAILED_QUEUE = queue.Queue(0)
|
|
|
|
input_inues()
|
|
|
|
for worker in range(WORKERS):
|
|
Worker(INUE_QUEUE, PASSED_QUEUE, FAILED_QUEUE).start()
|
|
|
|
if threading.current_thread() is not threading.main_thread():
|
|
sys.exit(0)
|
|
|
|
passed_output()
|
|
failed_output()
|
|
|
|
sys.exit(0)
|