Process logger
ProcessLogger handles non-blocking stdout reading from subprocesses with log context tracking.
ProcessLogger
Reads subprocess stdout in a background thread and emits logs with context metadata.
This solves the problem of multiple threads competing for process.stdout and enables real-time log emission with attached context (log_source, env_name, etc.).
Usage
process = subprocess.Popen([...], stdout=subprocess.PIPE, text=True) logger = ProcessLogger(process, log_context={"log_source": "environment", "env_name": "cellpose"}) logger.subscribe(my_callback) logger.start_reading()
Methods:
| Name | Description |
|---|---|
__init__ |
Initialize ProcessLogger. |
subscribe |
Register a callback to be notified of each log line. |
update_log_context |
Update log context with thread safety. |
start_reading |
Start reading process stdout in a background daemon thread. |
get_output |
Get all accumulated output lines read so far. |
wait_for_line |
Wait for a line matching predicate and return it. |
Source code in wetlands/_internal/process_logger.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | |
__init__(process, log_context, base_logger)
Initialize ProcessLogger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
process
|
Popen
|
The subprocess.Popen instance to read from |
required |
log_context
|
dict[str, Any]
|
Dictionary of context to attach to all logs (log_source, env_name, stage, etc.) |
required |
base_logger
|
LoggerAdapter
|
The logging.Logger instance to emit logs to |
required |
Source code in wetlands/_internal/process_logger.py
subscribe(callback, include_history=True)
Register a callback to be notified of each log line.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[str, dict], None]
|
Function with signature callback(line: str, context: dict) called for each log line |
required |
include_history
|
bool
|
whether to execute callback on all messages which where produced by the process until now (True), or only the futur ones (False) |
True
|
Source code in wetlands/_internal/process_logger.py
update_log_context(context_update)
Update log context with thread safety.
Useful for dynamically updating context during execution (e.g., changing call_target).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context_update
|
dict[str, Any]
|
Dictionary with keys to update in log_context |
required |
Source code in wetlands/_internal/process_logger.py
start_reading()
Start reading process stdout in a background daemon thread.
Source code in wetlands/_internal/process_logger.py
get_output()
Get all accumulated output lines read so far.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of output lines (may be incomplete if process still running) |
wait_for_line(predicate, timeout=None, include_history=True)
Wait for a line matching predicate and return it.
Useful for parsing lines like "Listening port 12345" during env startup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate
|
Callable[[str], bool]
|
Function that takes a line and returns True if it's the line we're waiting for |
required |
timeout
|
Optional[float]
|
Maximum seconds to wait (None = wait forever) |
None
|
include_history
|
bool
|
whether to execute callback on all messages which where produced by the process until now (True), or only the futur ones (False) |
True
|
Returns:
| Type | Description |
|---|---|
Optional[str]
|
The first line matching predicate, or None if timeout occurs |