Files
Aviat/aviat_config.py
2019-07-23 01:13:19 +08:00

244 lines
7.2 KiB
Python
Executable File

#!/usr/bin/python3.7
"""Programmatically configure Aviat INUes using their undocumented protocol."""
# import os
import sys
from dataclasses import dataclass
# import socket
import telnetlib
import csv
import threading
import argparse
try:
import queue
except ImportError:
import Queue as queue
PORT = 26000
NEXT = 1
WORKERS = 5
NET_MASK = "255.255.0.0"
GATEWAY = "10.250.0.0.255.255.0.0.0.172.16.0.1"
OLD_SUBNET = "192.168"
NEW_SUBNET = "172.16"
VLAN = 4095
ARG_PARSER = argparse.ArgumentParser(
description='Scriptable configuration of Aviat Eclipse INUes.')
ARG_PARSER.add_argument('csv')
@dataclass
class INUeCommand():
"""Command to be sent to an INUe, including the context and possible value"""
context: str = ""
command: str = ""
position: int = 0
value: int = None
reverse: bool = False
def __post_init__(self):
print("Context %s" % self.context)
if self.reverse:
self.position, self.value = self.value, self.position
def get_context(self):
"""Get the context of the command, ready to send."""
print (self.context)
return ("contextid %s\n" % self.context).encode('ascii')
def get_command(self):
"""Get the command, as either a get or set, based on the value."""
if self.value:
return ('set %s %s %s\n' % (self.command, self.position, self.value)).encode('ascii')
else:
return ('get %s %s\n' % (self.command, self.position)).encode('ascii')
def send_config(telnet, inue_command):
"""Send a config command to an INUe via a supplied telnet connection."""
# telnet.write(("contextid %s\n" % context).encode('ascii'))
telnet.write(inue_command.get_context())
telnet.read_until(b'Command Executed. Status unknown.\n')
# if inue_command.value:
# telnet.write(('set %s %s %s\n' %
# (command, position, value)).encode('ascii'))
# else:
# telnet.write(('get %s %s\n' % (command, position)).encode('ascii'))
telnet.write(inue_command.get_command())
response = telnet.read_until(b'\n').split()
if response[0].decode() == inue_command.command:
return response[-1]
else:
return False
def commit_save(telnet):
"""Save and commit the config changes, via a supplied telnet connection."""
send_config(telnet, INUeCommand("Terminal_I", "configCommitSave", 0, 1))
send_config(telnet, INUeCommand("Terminal_I", "configCommitSwitch", 0, 1))
# send_config(telnet, "Terminal_I", "configCommitSave", 0, 1)
# send_config(telnet, "Terminal_I", "configCommitSwitch", 0, 1)
class Worker(threading.Thread):
"""Class for worker threads which will process INUEs."""
def __init__(self, inues, passed, failed):
self.__inues = inues
self.__passed = passed
self.__failed = failed
threading.Thread.__init__(self)
def run(self):
while True:
inue = self.__inues.get()
if inue is None:
self.__passed.put(None)
self.__failed.put(None)
break
else:
new_ip = inue.replace(OLD_SUBNET, NEW_SUBNET)
rstp_dac = 0
try:
with telnetlib.Telnet(inue, PORT) as telnet:
for slot in range(1, 10):
print("Slot: %s" % slot)
if send_config(telnet,
INUeCommand("Slot%s_I" % slot, "ceConfigBridgeStpMode")):
rstp_dac = slot
if not rstp_dac:
self.__failed.put(inue)
continue
commands = [
INUeCommand("Slot%s_I" % rstp_dac, "ceConfigNmsVid", 0, VLAN),
INUeCommand("Terminal_I", "ipConfigAdEntAddress", rstp_dac + 1, new_ip),
INUeCommand("Terminal_I", "ipConfigAdEntelnetET_MASK", rstp_dac + 1, NET_MASK),
INUeCommand("Terminal_I", "ipConfigAutoroutingOspfEnable", rstp_dac + 1, 2),
INUeCommand("Terminal_I", "ipCidrRouteIfIndex", rstp_dac + 48, GATEWAY, True),
]
fail = False
for command in commands:
if not send_config(telnet, command):
self.__failed.put(inue)
fail = True
continue
if fail:
continue
commit_save(telnet)
except OSError:
self.__failed.put(inue)
continue
try:
with telnetlib.Telnet(new_ip, PORT) as telnet:
self.__passed.put(new_ip)
except OSError:
FAILED_QUEUE.put(inue)
continue
def input_inues():
"""Load list of INUEs from CSV file."""
with open(INFILEPATH, 'r', newline='') as in_file:
csv_reader = csv.DictReader(in_file)
for row in csv_reader:
INUE_QUEUE.put(row['IP Address'])
in_file.close()
#pylint: disable=unused-variable,redefined-outer-name
for worker in range(WORKERS):
INUE_QUEUE.put(None)
def failed_output():
"""Output a list of failed INUEs to standard out."""
count = WORKERS
# with open('/tmp/sc200.csv', 'w', newline='') as outFile:
# csvWriter = csv.DictWriter(outFile, fieldnames=fieldNames,
# delimiter=',', quotechar='"',
# quoting=csv.QUOTE_MINIMAL)
# csvWriter.writeheader()
print('Failed:')
while 1:
inue = FAILED_QUEUE.get()
if inue is None:
if count > 1:
count -= 1
# print(Count)
else:
break
else:
print(inue)
def passed_output():
"""Output a list of successfully modified INUes into a CSV file."""
count = WORKERS
field_names = ['IP Address']
with open('/tmp/aviat.csv', 'w', newline='') as out_file:
csv_writer = csv.DictWriter(out_file, fieldnames=field_names,
delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
csv_writer.writeheader()
while 1:
inue = PASSED_QUEUE.get()
if inue is None:
if count > 1:
count -= 1
else:
break
else:
csv_writer.writerow({"IP Address": inue})
out_file.close()
INUE_QUEUE = queue.Queue(0)
PASSED_QUEUE = queue.Queue(0)
FAILED_QUEUE = queue.Queue(0)
# for i in range(WORKERS):
# Worker(INUE_QUEUE, PASSED_QUEUE, FAILED_QUEUE).start()
# for INUE in sys.argv:
# if NEXT:
# NEXT=0
# else:
# INUE_QUEUE.put(INUE)
# for i in range(WORKERS):
# INUE_QUEUE.put(None)
INFILEPATH = sys.argv[1]
input_inues()
for worker in range(WORKERS):
Worker(INUE_QUEUE, PASSED_QUEUE, FAILED_QUEUE).start()
if threading.current_thread() is not threading.main_thread():
sys.exit(0)
passed_output()
failed_output()
sys.exit(0)