Tasks and parallel execution
Tasks and Parallel Execution
Wetlands provides a task-based API for non-blocking execution, progress reporting, cancellation, and parallel processing across multiple worker processes.
Every call to env.submit() or env.submit_script() returns a Task[T] object that you can monitor, cancel, or wait on.
Task lifecycle
PENDING ──(start)──> RUNNING ──(success)──> COMPLETED
│
├──(error)──────> FAILED
│
└──(cancel)──> CANCELED
By default, submit() starts the task immediately (start=True). With start=False, the task stays PENDING until task.start() is called — useful for attaching listeners before execution begins.
You can check whether a task has reached a terminal state with task.status.is_finished().
Basic usage
# Submit a function for non-blocking execution
task = env.submit("compute.py", "heavy_computation", args=(data,))
# Do other work while the task runs...
print(f"Status: {task.status}")
# Block for the result when ready
task.wait_for()
print(f"Result: {task.result}")
You can also submit scripts:
Task properties
Once a task is created, you can inspect its state through these read-only properties:
| Property | Type | Description |
|---|---|---|
id |
str |
Unique identifier (UUID4) |
status |
TaskStatus |
Current lifecycle state (PENDING, RUNNING, COMPLETED, FAILED, CANCELED) |
result |
T |
Return value of the remote function. Raises InvalidStateError if the task is not COMPLETED. |
error |
str \| None |
Error message string when FAILED, otherwise None |
exception |
ExecutionException \| None |
Exception wrapping the error message and traceback. None unless the task has failed. |
traceback |
list[str] \| None |
Traceback lines when FAILED, otherwise None |
message |
str \| None |
Latest progress message from update() |
current |
int \| None |
Current progress counter from update() |
maximum |
int \| None |
Maximum progress counter from update() |
progress |
float \| None |
Computed as current / maximum (a float in [0, 1]). None if either value is missing or maximum is 0. |
outputs |
dict[str, Any] |
Accumulated named intermediate outputs from set_output() |
future |
Future[T] |
Standard concurrent.futures.Future — see interop section |
result does not block
Unlike future.result(), accessing task.result never blocks. It returns the value immediately if the task is completed, or raises InvalidStateError otherwise. Use task.wait_for() or await task to wait first.
Progress Reporting
Remote code can report progress by declaring a task parameter in the function signature. Wetlands detects it via inspect.signature() and injects a RemoteTaskHandle automatically.
# remote_module.py — runs inside the isolated environment
def long_computation(data, *, task=None):
results = []
for i, item in enumerate(data):
if task and task.cancel_requested:
task.cancel() # acknowledge cancellation
return None
if task:
task.update(f"Processing item {i}", current=i, maximum=len(data))
results.append(expensive_operation(item))
return results
The RemoteTaskHandle provides:
task.update(message, current=, maximum=)— report progresstask.set_output(key, value)— publish named intermediate outputs (available viatask.outputs[key]on the caller side)task.cancel_requested— check if cancellation was requestedtask.cancel()— acknowledge cancellation (transitions the task toCANCELED)task.log(message, level=)— send log messages to the caller's logging system
Functions without a task parameter work exactly as before
The task parameter is optional. Functions that don't declare it receive no injection and behave identically to a plain execute() call.
Event Listeners
On the caller side, you can observe task events by registering a listener:
from wetlands.task import TaskEventType
def on_event(event):
match event.type:
case TaskEventType.UPDATE:
t = event.task
print(f"[{t.progress:.0%}] {t.message}")
case TaskEventType.COMPLETION:
print(f"Done: {event.task.result}")
case TaskEventType.FAILURE:
print(f"Failed: {event.task.error}")
# start=False to register listener before dispatch
task = env.submit("remote_module.py", "long_computation",
args=(dataset,), start=False)
task.listen(on_event).start()
task.wait_for()
Terminal events (COMPLETION, FAILURE, CANCELATION) are replayed to late listeners, so attaching a listener after the task finishes still delivers the final outcome. Progress updates are transient and not replayed.
A listener can be removed with task.remove_listener(callback).
Each TaskEvent has two fields:
| Field | Type | Description |
|---|---|---|
task |
Task |
The task that emitted the event |
type |
TaskEventType |
The kind of event |
Event types:
| Event | Meaning |
|---|---|
STARTED |
Task has been dispatched to a worker |
UPDATE |
Progress or intermediate output from the remote side |
COMPLETION |
Task finished successfully |
FAILURE |
Task raised an exception |
CANCELATION |
Task was canceled cooperatively |
Cancellation
Cancellation is cooperative: requesting cancellation sets a flag that the remote code checks via task.cancel_requested. The remote function must acknowledge cancellation by calling task.cancel().
task = env.submit("simulation.py", "run_simulation", args=(params,))
# ... later ...
task.cancel()
task.wait_for()
print(f"Final status: {task.status.name}") # COMPLETED or CANCELED
If the remote function returns normally after a cancel request without acknowledging, the result is delivered as a normal COMPLETION.
Waiting and Timeouts
task.wait_for() blocks until the task reaches a terminal state. An optional timeout (in seconds) raises TimeoutError if exceeded — but does not cancel the task:
try:
task.wait_for(timeout=30)
except TimeoutError:
print("Still running — deciding whether to cancel...")
task.cancel()
task.wait_for()
Error Handling
When a task fails, you can inspect the error in several ways:
task.wait_for()
if task.status == TaskStatus.FAILED:
print(task.error) # error message string
print(task.traceback) # list of traceback lines
print(task.exception) # ExecutionException wrapping both
# Or via the underlying Future:
print(task.future.exception()) # same ExecutionException
The exception property returns an ExecutionException that carries both the error message (.exception) and the traceback lines (.traceback).
Context Managers
Tasks can be used as context managers for automatic cancellation on early exit:
with env.submit("training.py", "train_model", args=(config,)) as task:
task.wait_for(timeout=300)
# If we exit the block early (exception, timeout, etc.),
# the task is automatically canceled and awaited.
Entering the context auto-starts a PENDING task. Exiting cancels and waits if the task is still running.
Async context managers are also supported:
concurrent.futures Interop
Each task wraps a standard concurrent.futures.Future[T], making it easy to integrate with existing concurrent code:
import concurrent.futures
t1 = env_a.submit("segment.py", "segment", args=(image,))
t2 = env_b.submit("segment.py", "segment", args=(image,))
concurrent.futures.wait([t1.future, t2.future])
print(t1.result, t2.result)
One-liner via Future:
Async/Await
Tasks are natively awaitable:
import asyncio
async def main():
result = await env.submit("compute.py", "fibonacci", args=(50,))
print(f"Result: {result}")
asyncio.run(main())
You can also iterate over events asynchronously:
async def monitor(task):
async for event in task.events():
match event.type:
case TaskEventType.UPDATE:
print(f"Progress: {event.task.progress:.0%}")
case TaskEventType.COMPLETION:
print(f"Result: {event.task.result}")
task = env.submit("compute.py", "long_work", args=(data,))
await monitor(task)
Parallel Execution
When max_workers > 1 is passed to launch(), Wetlands starts multiple worker processes all sharing the same Conda environment on disk. This provides true process-level parallelism with no environment duplication.
┌─ worker 0 (pid 1001) ─ port 5001
env.launch(max_workers=4) ──────├─ worker 1 (pid 1002) ─ port 5002
(one conda env on disk) ├─ worker 2 (pid 1003) ─ port 5003
└─ worker 3 (pid 1004) ─ port 5004
Tasks are dispatched to idle workers automatically. When all workers are busy, tasks queue internally and are dispatched as workers become available. The user never sees or manages individual workers.
You can assign specific environment variables per worker, for example to assign GPUs:
To detect hung workers that stop responding, set an inactivity timeout (in seconds):
You can check how many workers are currently alive with the worker_count property:
map() — batch execution
env.map() distributes work across workers and yields results, similar to concurrent.futures.Executor.map():
env.launch(max_workers=4)
images = load_images("data/")
results = list(env.map("segment.py", "segment", images))
print(f"Segmented {len(results)} images")
Use ordered=False to yield results as they complete (faster when items have varying processing times):
map_tasks() — batch execution with full Task control
env.map_tasks() returns a list of Task objects for when you need progress reporting or cancellation on individual items:
tasks = env.map_tasks("segment.py", "segment", images)
for task in tasks:
task.listen(lambda e: print(f"[{e.task.progress:.0%}] {e.task.message}"))
for task in tasks:
task.wait_for()
Worker Health Monitoring
Wetlands runs a background health monitor thread for each launched environment. Every few seconds, the monitor checks each worker that has an active task:
- Dead worker detection: If a worker process has exited unexpectedly (segfault, OOM kill, etc.), the monitor fails the active task, removes the worker from the pool, and launches a replacement.
- Inactivity timeout: If
worker_timeoutis set and a worker has not sent any IPC message (result, progress update, log) within that duration, it is treated as hung. The monitor fails the active task, kills and removes the worker, and launches a replacement.
Replacement workers are started transparently with the same configuration (environment variables, activation commands) as the original. Queued tasks waiting for a worker are dispatched to the replacement as soon as it is ready.
env.launch(
max_workers=4,
worker_env=lambda i: {"CUDA_VISIBLE_DEVICES": str(i)},
worker_timeout=600, # 10-minute inactivity timeout
)
# Workers that crash or hang are replaced automatically.
# The failed task receives a FAILED status with a descriptive error message.
results = list(env.map("process.py", "run", items))
The health monitor stops when env.exit() is called. Any tasks still queued at shutdown are failed with a descriptive error message.
GUI Integration
Tasks integrate naturally with GUI frameworks. Since task events are delivered from background threads, use thread-safe mechanisms to update the UI:
from PyQt6.QtCore import pyqtSignal, QObject
class TaskBridge(QObject):
progress = pyqtSignal(float, str)
completed = pyqtSignal(object)
failed = pyqtSignal(str)
bridge = TaskBridge()
bridge.progress.connect(progress_bar.setValue)
bridge.completed.connect(on_result)
def on_event(event):
match event.type:
case TaskEventType.UPDATE:
bridge.progress.emit(event.task.progress or 0.0, event.task.message or "")
case TaskEventType.COMPLETION:
bridge.completed.emit(event.task.result)
case TaskEventType.FAILURE:
bridge.failed.emit(event.task.error or "Unknown error")
task = env.submit("segment.py", "segment_image", args=(image,), start=False)
task.listen(on_event).start()