Getting started
Executing Code in an Environment
Wetlands provides two ways to execute functions in an isolated environment:
env.submit()returns aTask[T]immediately, letting you monitor progress, cancel execution, or wait for the result at your convenience.env.execute()andenv.import_module()are blocking shortcuts that wait for the result before returning — convenient when you don't need cancellation.
To demonstrate, we will create an environment, install cellpose, and run a segmentation function defined in a separate file (example_module.py) within that isolated environment.
Let's see the main script getting_started.py step by step.
We will segment the image img02.png (available here).
from pathlib import Path
image_path = Path("img02.png")
segmentation_path = image_path.parent / f"{image_path.stem}_segmentation.png"
1. Initialize the Environment Manager
We start by initializing the EnvironmentManager. We provide:
- A
wetlands_instance_pathwhere Wetlands stores logs and debug information (defaults to"wetlands/"). - Optionally, a
conda_pathwhere Wetlands should look for an existing Pixi (or Micromamba) installation or where it should download and set up a new one. If not provided, it defaults towetlands_instance_path / "pixi".
from wetlands.environment_manager import EnvironmentManager
environment_manager = EnvironmentManager()
# Or with explicit paths:
# environment_manager = EnvironmentManager(
# wetlands_instance_path="wetlands_state",
# conda_path="path/to/pixi/"
# )
Note
EnvironmentManager also accepts a main_conda_environment_path argument, useful if Wetlands is used in a conda environment (e.g. environment_manager = EnvironmentManager(main_conda_environment_path="/path/to/project/environment/")). Wetlands will activate this main environment and check if the installed packages satisfy the requirements when creating new environments. If the required dependencies are already installed in the main environment, EnvironmentManager.create() will return the main enviroment instead of creating a new one. The modules will be called directly, bypassing the Wetlands communication server.
Warning
On Windows, spaces are not allowed in the conda_path argument of EnvironmentManager().
2. Create (or get) an Environment and Install Dependencies
Next, we define and create the Conda environment. We give it a name ("cellpose_env") and specify its dependencies using a dictionary. Here, we require cellpose version 3.1.0, to be installed via Conda. If an environment with this name already exists, Wetlands will use it (and ignore the dependencies); otherwise, it will create it and install the specified dependencies. The create method returns an Environment object.
Note
If a main_conda_environment_path was provided when instanciating the EnvironmentManager, Wetlands will check if cellpose==3.1.0 is already installed in the main environment and return it if it is the case. If main_conda_environment_path is not provided but the required dependencies are only pip packages, Wetlands will check if the dependencies are installed in the current python environment and return it if it is the case.
Reusing existing environments with use_existing=True
You can pass use_existing=True to create() to search for and reuse an existing environment that satisfies the dependencies. This includes the main environment. If no environment satisfies the requirements, a new one will be created. By default, use_existing=False, which always creates a new environment.
Specifying dependencies
See the dependencies page to learn more on specifying dependencies.
Wetlands supports PEP 440 version specifiers, so you can use flexible version constraints like "numpy>=1.20,<2.0", "scipy~=1.5", or "pandas!=1.0.0".
You can also use EnvironmentManager.create_from_config() and provide a requirements.txt, environment.yml, pyproject.toml or pixi.toml file for your dependencies.
Load an existing environment
You can also load an existing environment with environment.load("env_name", Path("Path/to/existing/environment/pyproject.toml")). See EnvironmentManager.load().
3. Launch the Environment's Worker Processes
For Wetlands to execute code within the isolated environment, we need to launch its worker processes. Each worker runs inside the cellpose_env and listens for commands from our main script.
Note
By default, launch() starts a single worker process. For parallel execution, you can start multiple workers sharing the same Conda environment:
You can also assign specific environment variables per worker (e.g. for GPU assignment):
To detect hung workers that stop responding, set an inactivity timeout (in seconds):
Wetlands runs a background health monitor that checks all workers periodically. If a worker process crashes or exceeds the inactivity timeout, the monitor fails the active task, removes the dead worker, and launches a replacement automatically. See Worker health monitoring for details.
4. Execute Code in the Environment
Non-blocking execution with submit()
env.submit() sends a function call to a worker and returns a Task object immediately. You can then monitor progress, attach listeners, or wait for the result:
task = env.submit("example_module.py", "segment",
args=(str(image_path), str(segmentation_path)))
# Do other work while segmentation runs...
print(f"Task status: {task.status}")
# Block until the result is ready
task.wait_for()
diameters = task.result
print(f"Segmentation complete. Found diameters of {diameters} pixels.")
Tasks support progress reporting, cooperative cancellation, event listeners, context managers, concurrent.futures.Future interop, and async/await. See Tasks and parallel execution for the full API.
Blocking shortcuts: import_module() and execute()
For simple cases where you just need the result, Wetlands provides blocking shortcuts.
env.import_module() returns a proxy object. When you call a method on this proxy (like example_module.segment(...)), Wetlands sends the function name and arguments to the worker running in the cellpose_env, executes the function there, and returns the result:
example_module = env.import_module("example_module.py")
diameters = example_module.segment(str(image_path), str(segmentation_path))
print(f"Segmentation complete. Found diameters of {diameters} pixels.")
env.execute() calls a function directly without a proxy object:
args = (str(image_path), str(segmentation_path))
diameters = env.execute("example_module.py", "segment", args)
print(f"Segmentation complete. Found diameters of {diameters} pixels.")
Tracking progress with blocking calls
Even when using the blocking API, you can track what happens inside the environment through Wetlands' logging system. The environment's stdout is captured in real time and routed through Python's logging module, so print() statements in remote code appear as log messages that you can filter, redirect, or display in a GUI. See Logging for details.
Function arguments must be serializable
The arguments of the segment function will be sent to the other process via multiprocessing.connection.Connection.send() so the objects must be picklable.
5. Clean Up
Finally, we tell Wetlands to shut down the communication server and clean up resources associated with the launched environment.
getting_started.py source code
from pathlib import Path
from wetlands.environment_manager import EnvironmentManager
import urllib.request
import logging
def initialize(pip_deps=[]):
# Initialize the environment manager
# Wetlands will store logs and state in the wetlands_instance_path (defaults to "wetlands/")
# Pixi/Micromamba will be installed in wetlands_instance_path/pixi by default
logging.getLogger("wetlands").addHandler(logging.StreamHandler())
environment_manager = EnvironmentManager()
# Create and launch an isolated Conda environment named "cellpose"
env = environment_manager.create("cellpose", {"conda": ["cellpose==3.1.0"], "pip": pip_deps})
env.launch()
# Download example image from cellpose
image_path = Path("cellpose_img02.png")
image_url = "https://www.cellpose.org/static/images/img02.png"
with urllib.request.urlopen(image_url) as response:
image_data = response.read()
with open(image_path, "wb") as handler:
handler.write(image_data)
return image_path, env
if __name__ == "__main__":
# Initialize: create the environment manager, the Cellpose conda environment, and download the image to segment
image_path, env = initialize()
# Import example_module in the environment
example_module = env.import_module("example_module.py")
# exampleModule is a proxy to example_module.py in the environment,
# calling exampleModule.function_name(args) will run env.execute(module_name, function_name, args)
diameters = example_module.segment(str(image_path))
# Or use env.execute() directly to call a function in a module
# diameters = env.execute("example_module.py", "segment", (image_path))
# Alternatively, use env.run_script() to run an entire Python script
# env.run_script("script.py", args=(str(image_path)))
print(f"Found diameters of {diameters} pixels.")
# Clean up and exit the environment
env.exit()
Now, let's look at the example_module.py file. This code contains the actual segmentation logic and is executed inside the isolated cellpose_env when called via the proxy object.
Define the Segmentation Function
The module defines a segment function that takes input/output paths and other parameters. It uses a global variable model to potentially cache the loaded Cellpose model between calls within the same environment process lifetime.
# example_module.py
from pathlib import Path
from typing import Any, cast
model = None
def segment(
input_image: Path | str,
segmentation: Path | str,
model_type="cyto",
use_gpu=False,
channels=[0, 0],
auto_diameter=True,
diameter=30,
):
"""Performs cell segmentation using Cellpose."""
global model
input_image = Path(input_image)
if not input_image.exists():
raise FileNotFoundError(f"Error: input image {input_image}"\
"does not exist.")
Import Dependencies (Inside the Environment)
Crucially, the necessary libraries (cellpose, numpy) are imported within this function, meaning they are resolved using the packages installed inside the isolated cellpose_env, not the main script's environment. This is important to enable the main script to import example_module.py without raising a ModuleNotFoundError. In this way, the main script can see the functions defined in example_module.py. This is only necessary when using the proxy object (env.import_module("example_module.py") then example_module.function(args)) but it is not required when using env.execute("example_module.py", "function", (args)) directly.
print(f"[[1/4]] Load libraries and model '{model_type}'")
import cellpose.models
import cellpose.io
import numpy as np
Using try catch to prevent ModuleNotFoundError
A better approach is to use a try statement at the beginning of example_module.py to fail silently when importing modules which are not accessible in the main environment, like so:
try:
import cellpose.models
import cellpose.io
import numpy as np
except ModuleNotFoundError:
pass
...
This allows:
- to access the function definitions in the main environment (even if we won't be able to execute them in the main environment),
- to import the modules for all functions defined in example_module.py in the cellpose_env.
Load Model and Process Image
The code proceeds to load the Cellpose model (if not already cached) and the input image. All this happens within the context of the cellpose_env.
if model is None or model.cp.model_type != model_type:
print("Loading model...")
gpu_flag = str(use_gpu).lower() == 'true'
model = cellpose.models.Cellpose(gpu=gpu_flag, model_type=model_type)
print(f"[[2/4]] Load image {input_image}")
image = cast(np.ndarray, cellpose.io.imread(str(input_image)))
Perform Segmentation
The core segmentation task is performed using the loaded model and image. Any exceptions raised here will be captured by Wetlands and re-raised in the main script.
print(f"[[3/4]] Compute segmentation for image shape {image.shape}")
try:
kwargs: Any = dict(diameter=int(diameter)) if auto_diameter else {}
masks, _, _, diams = model.eval(image, channels=channels, **kwargs)
except Exception as e:
print(f"Error during segmentation: {e}")
raise e
print("Segmentation finished (inside environment).")
Save Results and Return Value
The segmentation results (masks) are saved to disk, potentially renaming the output file. The function then returns the calculated cell diameters (diams). This return value is serialized by Wetlands and sent back to the main script.
if segmentation is None: # If segmentation is None: return all results
return masks, flows, styles, diams
segmentation_path = Path(segmentation)
print(f"[[4/4]] Save segmentation to {segmentation_path}")
cellpose.io.save_masks(image, masks, flows, str(input_image), png=True)
default_output = input_image.parent / f"{input_image.stem}_cp_masks.png"
if default_output.exists():
if segmentation_path.exists():
segmentation_path.unlink()
default_output.rename(segmentation_path)
print(f"Saved mask: {segmentation_path}")
else:
print("Warning: Segmentation mask file was not generated by cellpose.")
return diams
example_module.py source code
from pathlib import Path
from typing import TYPE_CHECKING, Any
model = None
if TYPE_CHECKING:
import numpy
def segment_image(
image: "numpy.ndarray",
model_type="cyto",
use_gpu=False,
channels=[0, 0],
auto_diameter=True,
diameter=30,
) -> Any:
global model
print("Loading libraries...")
import cellpose.models # type: ignore
if model is None or model.cp.model_type != model_type:
print(f"Loading model {model_type}...")
model = cellpose.models.Cellpose(gpu=True if use_gpu == "True" else False, model_type=model_type)
print("Compute segmentation...")
try:
kwargs: Any = dict(diameter=int(diameter)) if auto_diameter else {}
masks, flows, styles, diams = model.eval(image, channels=channels, **kwargs)
except Exception as e:
print(e)
raise e
print("Segmentation finished.")
return masks, flows, styles, diams
def segment(
input_image: Path | str,
model_type="cyto",
use_gpu=False,
channels=[0, 0],
auto_diameter=True,
diameter=30,
return_segmentation=False, # return segmentation or save it in a file
) -> Any:
global model
import cellpose.io # type: ignore
input_image = Path(input_image)
image = cellpose.io.imread(input_image)
masks, flows, styles, diams = segment_image(image, model_type, use_gpu, channels, auto_diameter, diameter)
# If return_segmentation: return masks, otherwise save them and return diams
if return_segmentation:
return masks
# save results as png
cellpose.io.save_masks(image, masks, flows, str(input_image), png=True)
return diams
Summary of Example 1 Flow:
The main script uses EnvironmentManager to prepare an isolated environment. env.launch() starts one or more worker processes inside that environment. env.submit() dispatches work and returns a Task for non-blocking control, while env.import_module() and env.execute() provide blocking shortcuts. env.exit() cleans up all worker processes.
Next Steps
- See Tasks and parallel execution for the full task API: progress, cancellation, events, parallel workers
- See Shared memory to share memory between environments
- See Manual communication for low-level control over environment processes
- See Wetlands logging system
- See Debugging Guide to understand how to debug within environments