#!/usr/bin/python3 """Programmatically configure Aviat INUes using their undocumented protocol.""" # import os import sys import logging from dataclasses import dataclass # import socket import telnetlib import re import csv import threading import time import argparse try: import queue except ImportError: import Queue as queue PORT = 26000 NEXT = 1 WORKERS = 1 NET_MASK = "255.255.0.0" GATEWAY = "10.250.0.0.255.255.0.0.0.172.16.0.1" OLD_SUBNET = "192.168" NEW_SUBNET = "172.16" VLAN = 4095 ARG_PARSER = argparse.ArgumentParser( description='Scriptable configuration of Aviat Eclipse INUes.') ARG_PARSER.add_argument('-i', action="store", dest="in_file", required=True) ARG_PARSER.add_argument('-o', action="store", dest="out_file", default="passed.csv") ARG_PARSER.add_argument('--area', action="store", dest="ospf_area", default=None) ARG_PARSER.add_argument('--cleanup', action="store_true") ARG_PARSER.add_argument('--test', action="store_true") ARG_PARSER.add_argument('--debug', action="store_true") arguments = ARG_PARSER.parse_args() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') INFILEPATH = arguments.in_file OUTFILEPATH = arguments.out_file OSPF_AREA = arguments.ospf_area CLEANUP = arguments.cleanup DEBUG = arguments.debug TEST = arguments.test def debug(message): if DEBUG: logging.debug(message) @dataclass class INUeCommand(): """Command to be sent to an INUe, including the context and possible value""" context: str command: str position: int = 0 value: int = None reverse: bool = False def __post_init__(self): if self.reverse: self.position, self.value = self.value, self.position def get_context(self): """Get the context of the command, ready to send.""" debug ("contextid %s\n" % self.context) return ("contextid %s\n" % self.context).encode('ascii') def get_command(self): """Get the command, as either a get or set, based on the value.""" if self.value is None: debug ('get %s %s\n' % (self.command, self.position)) return ('get %s %s\n' % (self.command, self.position)).encode('ascii') else: debug ('set %s %s %s\n' % (self.command, self.position, self.value)) return ('set %s %s %s\n' % (self.command, self.position, self.value)).encode('ascii') def send_config(telnet, inue_command): """Send a config command to an INUe via a supplied telnet connection.""" # telnet.write(("contextid %s\n" % context).encode('ascii')) telnet.write(inue_command.get_context()) telnet.read_until(b'Command Executed. Status unknown.\n') telnet.write(inue_command.get_command()) response = telnet.read_until(b'\n').decode() if inue_command.command in response: debug("Response: %s" % response.split(" = ",1)[-1]) return response.split(" = ",1)[-1] else: return False def commit_save(telnet): """Save and commit the config changes, via a supplied telnet connection.""" send_config(telnet, INUeCommand("Terminal_I", "configCommitSave", 0, 1)) send_config(telnet, INUeCommand("Terminal_I", "configCommitSwitch", 0, 1)) # send_config(telnet, "Terminal_I", "configCommitSave", 0, 1) # send_config(telnet, "Terminal_I", "configCommitSwitch", 0, 1) def get_district(site_name): """Determine the segment district from the site name.""" district = False debug("Site Name: %s" % site_name) if site_name: district=re.split(r'[\d]+', site_name)[0] if district.endswith('S') and "-" not in site_name: district = district[:-1] return district class Worker(threading.Thread): """Class for worker threads which will process INUEs.""" def __init__(self, inues, passed, failed): self.__inues = inues self.__passed = passed self.__failed = failed threading.Thread.__init__(self) def run(self): while True: inue = self.__inues.get() if inue is None: self.__passed.put(None) self.__failed.put(None) break else: new_ip = inue.replace(OLD_SUBNET, NEW_SUBNET) rstp_dac = 0 try: with telnetlib.Telnet(inue, PORT) as telnet: district = get_district(send_config(telnet, INUeCommand("Terminal_I", "sysDetailsInfoSiteName"))) debug("District: %s" % district) for slot in range(1, 10): if (send_config(telnet, INUeCommand( "Slot%s_I" % slot, "ceConfigBridgeStpMode")) or send_config(telnet, INUeCommand( "Slot%s_I" % slot, "ceConfigPortMacLearning", 1)) == 2): dac_name = send_config(telnet, INUeCommand( "Terminal_I", "slotConfigName", slot)) debug("DAC: %s" % dac_name) if district in dac_name: rstp_dac = slot break if not rstp_dac: self.__failed.put(inue) continue if OSPF_AREA: commands = [ INUeCommand("Terminal_I", "ipConfigOspfAreaId", 1, OSPF_AREA) ] elif CLEANUP: debug('cleanup') commands = [ INUeCommand("Terminal_I", "ipConfigAdEntAddress", rstp_dac + 1, '0.0.0.0'), INUeCommand("Terminal_I", "ipConfigAdEntNetMask", rstp_dac + 1, '255.255.255.255'), INUeCommand("Slot%s_I" % rstp_dac, "ceConfigNmsVid", 0, 0), # INUeCommand("Terminal_I", "ipConfigAutoroutingOspfEnable", rstp_dac + 1, 2), # INUeCommand("Terminal_I", "ipCidrRouteIfIndex", rstp_dac + 48, GATEWAY, True), ] elif TEST: debug('test') self.__passed.put({'IP Address': inue, 'RSTP DAC': rstp_dac}) continue else: commands = [ INUeCommand("Slot%s_I" % rstp_dac, "ceConfigNmsVid", 0, VLAN), INUeCommand("Terminal_I", "ipConfigAdEntAddress", rstp_dac + 1, new_ip), INUeCommand("Terminal_I", "ipConfigAdEntNetMask", rstp_dac + 1, NET_MASK), INUeCommand("Terminal_I", "ipConfigAutoroutingOspfEnable", rstp_dac + 1, 2), INUeCommand("Terminal_I", "ipCidrRouteIfIndex", rstp_dac + 48, GATEWAY, True), ] fail = False for command in commands: if not send_config(telnet, command): self.__failed.put(inue) fail = True break if fail: continue commit_save(telnet) except OSError: self.__failed.put(inue) continue if CLEANUP: new_ip = inue time.sleep(5) try: with telnetlib.Telnet(new_ip, PORT) as telnet: self.__passed.put(new_ip) except OSError: self.__failed.put(inue) continue def input_inues(): """Load list of INUEs from CSV file.""" with open(INFILEPATH, 'r', newline='') as in_file: csv_reader = csv.DictReader(in_file) for row in csv_reader: INUE_QUEUE.put(row['IP Address']) in_file.close() #pylint: disable=unused-variable,redefined-outer-name for worker in range(WORKERS): INUE_QUEUE.put(None) def failed_output(): """Output a list of failed INUEs to standard out.""" count = WORKERS # with open('/tmp/sc200.csv', 'w', newline='') as outFile: # csvWriter = csv.DictWriter(outFile, fieldnames=fieldNames, # delimiter=',', quotechar='"', # quoting=csv.QUOTE_MINIMAL) # csvWriter.writeheader() print('Failed:') while 1: inue = FAILED_QUEUE.get() if inue is None: if count > 1: count -= 1 # print(Count) else: break else: print(inue) def passed_output(): """Output a list of successfully modified INUes into a CSV file.""" count = WORKERS field_names = ['IP Address'] if TEST: field_names = ['IP Address', 'RSTP DAC'] with open(OUTFILEPATH, 'w', newline='') as out_file: csv_writer = csv.DictWriter(out_file, fieldnames=field_names, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) csv_writer.writeheader() while 1: inue = PASSED_QUEUE.get() if inue is None: if count > 1: count -= 1 else: break else: if TEST: csv_writer.writerow(inue) else: csv_writer.writerow({"IP Address": inue}) out_file.close() INUE_QUEUE = queue.Queue(0) PASSED_QUEUE = queue.Queue(0) FAILED_QUEUE = queue.Queue(0) input_inues() for worker in range(WORKERS): Worker(INUE_QUEUE, PASSED_QUEUE, FAILED_QUEUE).start() if threading.current_thread() is not threading.main_thread(): sys.exit(0) passed_output() failed_output() sys.exit(0)