110 lines
4.0 KiB
Python
110 lines
4.0 KiB
Python
import pexpect
|
|
import getpass
|
|
import sys
|
|
import time
|
|
|
|
import os
|
|
from dotenv import load_dotenv
|
|
load_dotenv(override=True)
|
|
|
|
# --- Firewall Rule Definitions are correct as is ---
|
|
FIREWALL_RULE_1_CMDS = [
|
|
"firewall 3", "action accept", "interface LAN_ETH1_CUBE WAN", "protocol TCP",
|
|
"mode ip", "src-ip all", "src-port all", "dst-ip all", "dst-port single 8080",
|
|
"logging severity 6", "name proxy_retour", "exit",
|
|
]
|
|
|
|
FIREWALL_RULE_2_CMDS = [
|
|
"firewall 4", "action accept", "interface WAN LAN_ETH1_CUBE", "protocol TCP",
|
|
"mode ip", "src-ip all", "src-port single 8080", "dst-ip all", "dst-port all",
|
|
"logging severity 6", "name proxy_aller", "exit",
|
|
]
|
|
|
|
def configure_moxa_firewall_pexpect(router_ip):
|
|
ip_address = router_ip
|
|
username = os.getenv("DEFAULT_EDR_810_USER")
|
|
password = os.getenv("DEFAULT_EDR_810_PASSWORD")
|
|
|
|
try:
|
|
command = f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {username}@{ip_address}"
|
|
print(f"--- Starting SSH session...")
|
|
child = pexpect.spawn(command, encoding='utf-8', timeout=20)
|
|
|
|
child.logfile_read = sys.stdout
|
|
|
|
child.expect("[Pp]assword:")
|
|
child.sendline(password)
|
|
child.expect(r"[\#\>\$]\s*$")
|
|
base_prompt = child.after.strip()
|
|
print(f"\n>>> SUCCESSFULLY CONNECTED! Base prompt is: '{base_prompt}'")
|
|
|
|
# --- Check for existing rules ---
|
|
print("\n--- Checking for existing rules...")
|
|
child.sendline("terminal length 0")
|
|
child.expect(base_prompt)
|
|
child.sendline("show running-config")
|
|
child.expect(base_prompt)
|
|
running_config = child.before
|
|
|
|
if "name proxy_retour" in running_config and "name proxy_aller" in running_config:
|
|
print("--- Firewall rules already exist. No action needed.")
|
|
child.sendline("exit")
|
|
child.close()
|
|
return
|
|
|
|
print("--- Rules not found. Proceeding with configuration.")
|
|
|
|
# --- Enter Configuration Mode ---
|
|
print("\n>>> Entering configuration mode...")
|
|
child.sendline("configure")
|
|
config_prompt_re = r"\(config\)#\s*$"
|
|
child.expect(config_prompt_re)
|
|
|
|
# --- Apply Rule 1 ---
|
|
print("\n>>> Applying Rule 1: proxy_retour")
|
|
for cmd in FIREWALL_RULE_1_CMDS:
|
|
child.sendline(cmd)
|
|
child.expect([r"\(config-firewall\)#\s*$", config_prompt_re])
|
|
time.sleep(0.2)
|
|
|
|
# --- Apply Rule 2 ---
|
|
print("\n>>> Applying Rule 2: proxy_aller")
|
|
for cmd in FIREWALL_RULE_2_CMDS:
|
|
child.sendline(cmd)
|
|
child.expect([r"\(config-firewall\)#\s*$", config_prompt_re])
|
|
time.sleep(0.2)
|
|
|
|
# --- THE CORRECTED SAVE LOGIC ---
|
|
# 1. Exit from configuration mode to return to the base prompt
|
|
print("\n>>> Exiting configuration mode to save...")
|
|
child.sendline("exit")
|
|
child.expect(base_prompt)
|
|
|
|
# 2. Now, from the base prompt, issue the save command
|
|
print("\n>>> Saving configuration...")
|
|
child.sendline("save")
|
|
child.expect(base_prompt)
|
|
|
|
print("\n--- Configuration complete. Closing session.")
|
|
child.sendline("exit") # Log out
|
|
child.close()
|
|
|
|
except pexpect.exceptions.TIMEOUT:
|
|
print("\n!!! CRITICAL: Connection timed out.")
|
|
print(f"--- Last output seen (child.before): {child.before}")
|
|
except pexpect.exceptions.EOF:
|
|
print("\n!!! CRITICAL: Connection closed unexpectedly.")
|
|
print(f"--- Last output seen (child.before): {child.before}")
|
|
except Exception as e:
|
|
print(f"\n!!! An unexpected error occurred: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
ip_prefix = "10.81.35."
|
|
ip_suffixes = list(range(74, 75))
|
|
ip_suffixes.append(85)
|
|
for suffix in ip_suffixes:
|
|
router_ip = ip_prefix + str(suffix)
|
|
print(f"Configuration of firewall {router_ip}...")
|
|
configure_moxa_firewall_pexpect(router_ip)
|
|
print(f"Done!")
|
|
answer = input(f"Continue?") |