Skip to content

Module executor

This script launches a server inside a specified conda environment. It listens on a dynamically assigned local port for incoming execution commands sent via a multiprocessing connection.

Clients can send instructions to: - Dynamically import a Python module from a specified path and execute a function - Run a Python script via runpy.run_path() - Receive the result or any errors from the execution

Designed to be run within isolated environments for sandboxed execution of Python code modules.

Functions:

Name Description
sendMessage

Thread-safe sending of messages.

handleExecutionError

Common error handling for any execution type.

executeFunction

Import a module and execute one of its functions.

runScript

Run a Python script via runpy.run_path(), simulating 'python script.py args...'.

executionWorker

Worker function handling both 'execute' and 'run' actions.

launchListener

Launches a listener on a random available port on localhost.

sendMessage(lock, connection, message)

Thread-safe sending of messages.

Source code in wetlands/_internal/module_executor.py
def sendMessage(lock: threading.Lock, connection: Connection, message: dict):
    """Thread-safe sending of messages."""
    with lock:
        connection.send(message)

handleExecutionError(lock, connection, e)

Common error handling for any execution type.

Source code in wetlands/_internal/module_executor.py
def handleExecutionError(lock: threading.Lock, connection: Connection, e: Exception):
    """Common error handling for any execution type."""
    logger.error(str(e))
    logger.error("Traceback:")
    tbftb = traceback.format_tb(e.__traceback__)
    for line in tbftb:
        logger.error(line)
    sys.stderr.flush()
    sendMessage(
        lock,
        connection,
        dict(
            action="error",
            exception=str(e),
            traceback=tbftb,
        ),
    )
    logger.debug("Error sent")

executeFunction(message)

Import a module and execute one of its functions.

Source code in wetlands/_internal/module_executor.py
def executeFunction(message: dict):
    """Import a module and execute one of its functions."""
    modulePath = Path(message["modulePath"])
    logger.debug(f"Import module {modulePath}")
    sys.path.append(str(modulePath.parent))
    module = import_module(modulePath.stem)
    if not hasattr(module, message["function"]):
        raise Exception(f"Module {modulePath} has no function {message['function']}.")
    args = message.get("args", [])
    kwargs = message.get("kwargs", {})
    logger.info(f"Execute {message['modulePath']}:{message['function']}({args})")
    try:
        result = getattr(module, message["function"])(*args, **kwargs)
    except SystemExit as se:
        raise Exception(f"Function raised SystemExit: {se}\n\n")
    logger.info("Executed")
    return result

runScript(message)

Run a Python script via runpy.run_path(), simulating 'python script.py args...'.

Source code in wetlands/_internal/module_executor.py
def runScript(message: dict):
    """Run a Python script via runpy.run_path(), simulating 'python script.py args...'."""
    scriptPath = message["scriptPath"]
    args = message.get("args", [])
    run_name = message.get("run_name", "__main__")

    sys.argv = [scriptPath] + list(args)
    logger.info(f"Running script {scriptPath} with args {args} and run_name={run_name}")
    runpy.run_path(scriptPath, run_name=run_name)
    logger.info("Script executed")
    return None

executionWorker(lock, connection, message)

Worker function handling both 'execute' and 'run' actions.

Source code in wetlands/_internal/module_executor.py
def executionWorker(lock: threading.Lock, connection: Connection, message: dict):
    """
    Worker function handling both 'execute' and 'run' actions.
    """
    try:
        action = message["action"]
        if action == "execute":
            result = executeFunction(message)
        elif action == "run":
            result = runScript(message)
        else:
            raise Exception(f"Unknown action: {action}")

        sendMessage(
            lock,
            connection,
            dict(
                action="execution finished",
                message=f"{action} completed",
                result=result,
            ),
        )
    except Exception as e:
        handleExecutionError(lock, connection, e)

launchListener()

Launches a listener on a random available port on localhost. Waits for client connections and handles 'execute', 'run', or 'exit' messages.

Source code in wetlands/_internal/module_executor.py
def launchListener():
    """
    Launches a listener on a random available port on localhost.
    Waits for client connections and handles 'execute', 'run', or 'exit' messages.
    """
    lock = threading.Lock()
    with Listener(("localhost", port)) as listener:
        while True:
            print(f"Listening port {listener.address[1]}")
            with listener.accept() as connection:
                logger.debug(f"Connection accepted {listener.address}")
                message = ""
                try:
                    while message := getMessage(connection):
                        logger.debug(f"Got message: {message}")

                        if message["action"] in ("execute", "run"):
                            logger.debug(f"Launch thread for action {message['action']}")
                            thread = threading.Thread(
                                target=executionWorker,
                                args=(lock, connection, message),
                            )
                            thread.start()

                        elif message["action"] == "exit":
                            logger.info("exit")
                            sendMessage(lock, connection, dict(action="exited"))
                            listener.close()
                            return
                except Exception as e:
                    handleExecutionError(lock, connection, e)