import pexpect import getpass import sys import time import os from dotenv import load_dotenv load_dotenv(override=True) # --- Firewall Rule Definitions are correct as is --- FIREWALL_RULE_1_CMDS = [ "firewall 3", "action accept", "interface LAN_ETH1_CUBE WAN", "protocol TCP", "mode ip", "src-ip all", "src-port all", "dst-ip all", "dst-port single 8080", "logging severity 6", "name proxy_retour", "exit", ] FIREWALL_RULE_2_CMDS = [ "firewall 4", "action accept", "interface WAN LAN_ETH1_CUBE", "protocol TCP", "mode ip", "src-ip all", "src-port single 8080", "dst-ip all", "dst-port all", "logging severity 6", "name proxy_aller", "exit", ] def configure_moxa_firewall_pexpect(router_ip): ip_address = router_ip username = os.getenv("DEFAULT_EDR_810_USER") password = os.getenv("DEFAULT_EDR_810_PASSWORD") try: command = f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {username}@{ip_address}" print(f"--- Starting SSH session...") child = pexpect.spawn(command, encoding='utf-8', timeout=20) child.logfile_read = sys.stdout child.expect("[Pp]assword:") child.sendline(password) child.expect(r"[\#\>\$]\s*$") base_prompt = child.after.strip() print(f"\n>>> SUCCESSFULLY CONNECTED! Base prompt is: '{base_prompt}'") # --- Check for existing rules --- print("\n--- Checking for existing rules...") child.sendline("terminal length 0") child.expect(base_prompt) child.sendline("show running-config") child.expect(base_prompt) running_config = child.before if "name proxy_retour" in running_config and "name proxy_aller" in running_config: print("--- Firewall rules already exist. No action needed.") child.sendline("exit") child.close() return print("--- Rules not found. Proceeding with configuration.") # --- Enter Configuration Mode --- print("\n>>> Entering configuration mode...") child.sendline("configure") config_prompt_re = r"\(config\)#\s*$" child.expect(config_prompt_re) # --- Apply Rule 1 --- print("\n>>> Applying Rule 1: proxy_retour") for cmd in FIREWALL_RULE_1_CMDS: child.sendline(cmd) child.expect([r"\(config-firewall\)#\s*$", config_prompt_re]) time.sleep(0.2) # --- Apply Rule 2 --- print("\n>>> Applying Rule 2: proxy_aller") for cmd in FIREWALL_RULE_2_CMDS: child.sendline(cmd) child.expect([r"\(config-firewall\)#\s*$", config_prompt_re]) time.sleep(0.2) # --- THE CORRECTED SAVE LOGIC --- # 1. Exit from configuration mode to return to the base prompt print("\n>>> Exiting configuration mode to save...") child.sendline("exit") child.expect(base_prompt) # 2. Now, from the base prompt, issue the save command print("\n>>> Saving configuration...") child.sendline("save") child.expect(base_prompt) print("\n--- Configuration complete. Closing session.") child.sendline("exit") # Log out child.close() except pexpect.exceptions.TIMEOUT: print("\n!!! CRITICAL: Connection timed out.") print(f"--- Last output seen (child.before): {child.before}") except pexpect.exceptions.EOF: print("\n!!! CRITICAL: Connection closed unexpectedly.") print(f"--- Last output seen (child.before): {child.before}") except Exception as e: print(f"\n!!! An unexpected error occurred: {e}") if __name__ == "__main__": ip_prefix = "10.81.35." ip_suffixes = list(range(74, 75)) ip_suffixes.append(85) for suffix in ip_suffixes: router_ip = ip_prefix + str(suffix) print(f"Configuration of firewall {router_ip}...") configure_moxa_firewall_pexpect(router_ip) print(f"Done!") answer = input(f"Continue?")