import pexpect import getpass import sys import time import os from dotenv import load_dotenv load_dotenv(override=True) # --- Firewall Rule Definitions are correct as is --- FIREWALL_RULE_1_CMDS = [ "firewall 3", "action accept", "interface LAN_ETH1_CUBE WAN", "protocol TCP", "mode ip", "src-ip all", "src-port all", "dst-ip all", "dst-port single 8080", "logging severity 6", "name proxy_retour", "exit", ] FIREWALL_RULE_2_CMDS = [ "firewall 4", "action accept", "interface WAN LAN_ETH1_CUBE", "protocol TCP", "mode ip", "src-ip all", "src-port single 8080", "dst-ip all", "dst-port all", "logging severity 6", "name proxy_aller", "exit", ] def configure_moxa_firewall_pexpect(router_ip): ip_address = router_ip username = os.getenv("DEFAULT_EDR_810_USER") password = os.getenv("DEFAULT_EDR_810_PASSWORD") try: command = f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {username}@{ip_address}" print(f"--- Starting SSH session...") child = pexpect.spawn(command, encoding='utf-8', timeout=20) child.logfile_read = sys.stdout child.expect("[Pp]assword:") child.sendline(password) child.expect(r"[\#\>\$]\s*$") base_prompt = child.after.strip() print(f"\n>>> SUCCESSFULLY CONNECTED! Base prompt is: '{base_prompt}'") # --- Check for existing rules --- # print("\n--- Checking for existing rules...") # child.sendline("terminal length 0") # child.expect(base_prompt) # child.sendline("show running-config") # child.expect(base_prompt) # running_config = child.before # if "name proxy_retour" in running_config and "name proxy_aller" in running_config: # print("--- Firewall rules already exist. No action needed.") # child.sendline("exit") # child.close() # return # print("--- Rules not found. Proceeding with configuration.") # --- Enter Configuration Mode --- # print("\n>>> Entering configuration mode...") # child.sendline("configure") # config_prompt_re = r"\(config\)#\s*$" # child.expect(config_prompt_re) # --- Apply Rule 1 --- # print("\n>>> Applying Rule 1: proxy_retour") # for cmd in FIREWALL_RULE_1_CMDS: # child.sendline(cmd) # child.expect([r"\(config-firewall\)#\s*$", config_prompt_re]) # time.sleep(0.2) # --- Apply Rule 2 --- # print("\n>>> Applying Rule 2: proxy_aller") # for cmd in FIREWALL_RULE_2_CMDS: # child.sendline(cmd) # child.expect([r"\(config-firewall\)#\s*$", config_prompt_re]) # time.sleep(0.2) # --- THE CORRECTED SAVE LOGIC --- # 1. Exit from configuration mode to return to the base prompt # print("\n>>> Exiting configuration mode to save...") # child.sendline("exit") # child.expect(base_prompt) # 2. Now, from the base prompt, issue the save command print("\n>>> Saving configuration...") child.sendline("save") child.expect(base_prompt) print("\n--- Configuration complete. Closing session.") child.sendline("exit") # Log out child.close() except pexpect.exceptions.TIMEOUT: print("\n!!! CRITICAL: Connection timed out.") print(f"--- Last output seen (child.before): {child.before}") except pexpect.exceptions.EOF: print("\n!!! CRITICAL: Connection closed unexpectedly.") print(f"--- Last output seen (child.before): {child.before}") except Exception as e: print(f"\n!!! An unexpected error occurred: {e}") if __name__ == "__main__": ip_prefix = "10.81.60." ip_suffixes = list(range(194, 215)) for suffix in ip_suffixes: router_ip = ip_prefix + str(suffix) configure_moxa_firewall_pexpect(router_ip)