#!/usr/bin/python3.7 """Programmatically configure Aviat INUes using their undocumented protocol.""" # import os import sys from dataclasses import dataclass # import socket import telnetlib import csv import threading import argparse try: import queue except ImportError: import Queue as queue PORT = 26000 NEXT = 1 WORKERS = 5 NET_MASK = "255.255.0.0" GATEWAY = "10.250.0.0.255.255.0.0.0.172.16.0.1" OLD_SUBNET = "192.168" NEW_SUBNET = "172.16" VLAN = 4095 ARG_PARSER = argparse.ArgumentParser( description='Scriptable configuration of Aviat Eclipse INUes.') ARG_PARSER.add_argument('csv') @dataclass class INUeCommand(): """Command to be sent to an INUe, including the context and possible value""" context: str command: str position: int = 0 value: int = None reverse: bool = False def __post_init__(self): if self.reverse: self.position, self.value = self.value, self.position def get_context(self): """Get the context of the command, ready to send.""" print ("contextid %s\n" % self.context).encode('ascii') return ("contextid %s\n" % self.context).encode('ascii') def get_command(self): """Get the command, as either a get or set, based on the value.""" if self.value: return ('set %s %s %s\n' % (self.command, self.position, self.value)).encode('ascii') else: return ('get %s %s\n' % (self.command, self.position)).encode('ascii') def send_config(telnet, inue_command): """Send a config command to an INUe via a supplied telnet connection.""" # telnet.write(("contextid %s\n" % context).encode('ascii')) telnet.write(inue_command.get_context()) telnet.read_until(b'Command Executed. Status unknown.\n') # if inue_command.value: # telnet.write(('set %s %s %s\n' % # (command, position, value)).encode('ascii')) # else: # telnet.write(('get %s %s\n' % (command, position)).encode('ascii')) telnet.write(inue_command.get_command()) response = telnet.read_until(b'\n').split() if response[0].decode() == inue_command.command: return response[-1] else: return False def commit_save(telnet): """Save and commit the config changes, via a supplied telnet connection.""" send_config(telnet, INUeCommand("Terminal_I", "configCommitSave", 0, 1)) send_config(telnet, INUeCommand("Terminal_I", "configCommitSwitch", 0, 1)) # send_config(telnet, "Terminal_I", "configCommitSave", 0, 1) # send_config(telnet, "Terminal_I", "configCommitSwitch", 0, 1) class Worker(threading.Thread): """Class for worker threads which will process INUEs.""" def __init__(self, inues, passed, failed): self.__inues = inues self.__passed = passed self.__failed = failed threading.Thread.__init__(self) def run(self): while True: inue = self.__inues.get() if inue is None: print('Here') self.__passed.put(None) self.__failed.put(None) break new_ip = inue.replace(OLD_SUBNET, NEW_SUBNET) rstp_dac = 0 try: with telnetlib.Telnet(inue, PORT) as telnet: for slot in range(1, 9): if send_config(telnet, INUeCommand("Slot%s_I" % slot, "ceConfigBridgeStpMode")): rstp_dac = slot if not rstp_dac: self.__failed.put(inue) continue commands = [ INUeCommand("Slot%s_I" % slot, "ceConfigNmsVid", 0, VLAN), INUeCommand("Terminal_I", "ipConfigAdEntAddress", slot + 1, new_ip), INUeCommand("Terminal_I", "ipConfigAdEntelnetET_MASK", slot + 1, NET_MASK), INUeCommand("Terminal_I", "ipConfigAutoroutingOspfEnable", slot + 1, 2), INUeCommand("Terminal_I", "ipCidrRouteIfIndex", slot + 48, GATEWAY, True), ] fail = False for command in commands: if not send_config(telnet, command): self.__failed.put(inue) fail = True continue if fail: continue commit_save(telnet) except OSError: self.__failed.put(inue) continue try: with telnetlib.Telnet(new_ip, PORT) as telnet: self.__passed.put(new_ip) except OSError: FAILED_QUEUE.put(inue) continue def input_inues(): """Load list of INUEs from CSV file.""" with open(INFILEPATH, 'r', newline='') as in_file: csv_reader = csv.DictReader(in_file) for row in csv_reader: INUE_QUEUE.put(row['IP Address']) in_file.close() #pylint: disable=unused-variable,redefined-outer-name for worker in range(WORKERS): INUE_QUEUE.put(None) def failed_output(): """Output a list of failed INUEs to standard out.""" count = WORKERS # with open('/tmp/sc200.csv', 'w', newline='') as outFile: # csvWriter = csv.DictWriter(outFile, fieldnames=fieldNames, # delimiter=',', quotechar='"', # quoting=csv.QUOTE_MINIMAL) # csvWriter.writeheader() print('Failed:') while 1: inue = FAILED_QUEUE.get() if inue is None: if count > 1: count -= 1 # print(Count) else: break else: print(inue) def passed_output(): """Output a list of successfully modified INUes into a CSV file.""" count = WORKERS field_names = ['IP Address'] with open('/tmp/aviat.csv', 'w', newline='') as out_file: csv_writer = csv.DictWriter(out_file, fieldnames=field_names, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) csv_writer.writeheader() while 1: inue = PASSED_QUEUE.get() if inue is None: if count > 1: count -= 1 else: break else: csv_writer.writerow({"IP Address": inue}) out_file.close() INUE_QUEUE = queue.Queue(0) PASSED_QUEUE = queue.Queue(0) FAILED_QUEUE = queue.Queue(0) # for i in range(WORKERS): # Worker(INUE_QUEUE, PASSED_QUEUE, FAILED_QUEUE).start() # for INUE in sys.argv: # if NEXT: # NEXT=0 # else: # INUE_QUEUE.put(INUE) # for i in range(WORKERS): # INUE_QUEUE.put(None) INFILEPATH = sys.argv[1] input_inues() for worker in range(WORKERS): Worker(INUE_QUEUE, PASSED_QUEUE, FAILED_QUEUE).start() if threading.current_thread() is not threading.main_thread(): sys.exit(0) passed_output() failed_output() sys.exit(0)