Skip to content

multi

multifutures.multiprocess

multiprocess(
    func: collections.abc.Callable[..., typing.Any],
    func_kwargs: collections.abc.Collection[dict[str, typing.Any]],
    *,
    max_workers: int | None = None,
    executor: multifutures._common.ExecutorProtocol | None = None,
    check: bool = False,
    include_kwargs: bool = True,
    progress_bar: multifutures._common.ProgressBarProtocol | bool | None = True
) -> list[multifutures._multi.FutureResult]

Call func over all the items of func_kwargs using a multiprocessing Pool and return a list of FutureResult objects.

The pool by default is an instance of loky.ProcessPoolExecutor, or, if loky is not installed, an instance of concurrent.futures.ProcessPoolExecutor using the spawn multiprocessing context.

The pool size can be limited with max_workers. If more control is needed, e.g. to use a different multiprocessing context, then the caller should directly pass a ProcessPoolExecutor instance to executor. For example:

# Customize executor
mp_context = multiprocessing.get_context("forkserver")
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4, mp_context=mp_context)
multiprocess(func, func_kwargs, executor=executor)

The progress of the pool's workers can be monitored by using a tqdm based progress bar. The progress bar is displayed by default. Disabling the progress bar is possible by passing progress_bar=False. Further customizing of the progress bar should be done by creating a tqdm.tqdm instance and passing it as an argument to progress_bar. For example:

# Run without a progress bar
results = multithread(func, func_kwargs, progress_bar=False)

# Run with a rich based progress bar
rich_based_progress_bar = tqdm.rich.tqdm(func_kwargs)
results = multithread(func, func_kwargs, progress_bar=rich_based_progress_bar)
PARAMETER DESCRIPTION
func

The function that will be scheduled for execution via multiprocessing.

TYPE: collections.abc.Callable[..., typing.Any]

func_kwargs

A Collection of dictionaries that will be used for the calls of func.

TYPE: collections.abc.Collection[dict[str, typing.Any]]

executor

A multiprocessing Executor instance. Defaults to loky.get_reusable_executor(), which uses the loky multiprocessing context (more info). If loky is not installed, it defaults to Standard Library's ProcessPoolExecutor with the spawn multiprocessing start method (more info)

TYPE: multifutures._common.ExecutorProtocol | None DEFAULT: None

check

If True then if any of the func calls raised an exception, an ExceptionGroup containing all the exceptions will be raised.

TYPE: bool DEFAULT: False

include_kwargs

A boolean flag indicating whether the keyword arguments of the functions will be present in the FurureResult object. Setting this to False is useful for keeping memory usage down when the input are "heavy" objects like xarray.Dataset etc.

TYPE: bool DEFAULT: True

progress_bar

An instance of a progress bar implementing the tqdm API. Defaults to tqdm.auto.tqdm.

TYPE: multifutures._common.ProgressBarProtocol | bool | None DEFAULT: True

RETURNS DESCRIPTION
list[multifutures._multi.FutureResult]

A list of FutureResult objects. Each object will contain the output of func(**func_kwargs[i]) or an Exception.

RAISES DESCRIPTION
exceptiongroup.ExceptionGroup

If any of the function calls raises an exception and check is True.

Source code in multifutures/_multi.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def multiprocess(
    func: abc.Callable[..., T.Any],
    func_kwargs: abc.Collection[dict[str, T.Any]],
    *,
    max_workers: int | None = None,
    executor: ExecutorProtocol | None = None,
    check: bool = False,
    include_kwargs: bool = True,
    progress_bar: ProgressBarProtocol | bool | None = True,
) -> list[FutureResult]:
    """
    Call `func` over all the items of `func_kwargs` using a multiprocessing Pool
    and return a list of [FutureResult][multifutures.FutureResult] objects.

    The pool by default is an instance of [loky.ProcessPoolExecutor](https://loky.readthedocs.io/en/stable/),
    or, if `loky` is not installed, an instance of
    [concurrent.futures.ProcessPoolExecutor][concurrent.futures.ProcessPoolExecutor]
    using the `spawn` multiprocessing context.

    The pool size can be limited with `max_workers`. If more control is needed, e.g. to use a different
    multiprocessing context, then the caller should directly pass a `ProcessPoolExecutor`
    instance to `executor`. For example:

    ``` python
    # Customize executor
    mp_context = multiprocessing.get_context("forkserver")
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=4, mp_context=mp_context)
    multiprocess(func, func_kwargs, executor=executor)
    ```

    The progress of the pool's workers can be monitored by using a
    [tqdm](https://tqdm.github.io/) based progress bar.
    The progress bar is displayed by default.
    Disabling the progress bar is possible by passing `progress_bar=False`.
    Further customizing of the progress bar should be done by creating a `tqdm.tqdm`
    instance and passing it as an argument to `progress_bar`. For example:

    ``` python
    # Run without a progress bar
    results = multithread(func, func_kwargs, progress_bar=False)

    # Run with a rich based progress bar
    rich_based_progress_bar = tqdm.rich.tqdm(func_kwargs)
    results = multithread(func, func_kwargs, progress_bar=rich_based_progress_bar)
    ```

    Arguments:
        func: The function that will be scheduled for execution via multiprocessing.
        func_kwargs: A Collection of dictionaries that will be used for the calls of `func`.
        executor: A multiprocessing [Executor][concurrent.futures.Executor] instance.
            Defaults to [loky.get_reusable_executor()][loky.get_reusable_executor],
            which uses the `loky` multiprocessing context
            ([more info](https://loky.readthedocs.io/en/stable/API.html#processes-start-methods-in-loky)).
            If [loky][loky] is not installed, it defaults to Standard Library's
            [ProcessPoolExecutor][concurrent.futures.ProcessPoolExecutor] with the `spawn` multiprocessing
            start method ([more info](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods))
        check: If `True` then if any of the `func` calls raised an exception, an
            [ExceptionGroup][ExceptionGroup] containing all the exceptions will be raised.
        include_kwargs: A boolean flag indicating whether the keyword arguments of the
            functions will be present in the [FurureResult][multifutures.FutureResult]
            object. Setting this to `False` is useful for keeping memory usage down when
            the input are "heavy" objects like [xarray.Dataset][xarray.Dataset] etc.
        progress_bar: An instance of a progress bar implementing the `tqdm` API. Defaults to
            `tqdm.auto.tqdm`.

    Returns:
        A list of [FutureResult][multifutures.FutureResult] objects. Each object
            will contain the output of `func(**func_kwargs[i])` or an `Exception`.

    Raises:
        exceptiongroup.ExceptionGroup: If any of the function calls raises an exception and `check` is True.

    """
    progress_bar = _resolve_progress_bar(progress_bar, func_kwargs)
    executor = _resolve_multiprocess_executor(executor=executor, max_workers=max_workers)
    results = _multi(
        executor=executor,
        func=func,
        func_kwargs=func_kwargs,
        include_kwargs=include_kwargs,
        progress_bar=progress_bar,
        check=check,
    )
    return results

multifutures.multithread

multithread(
    func: collections.abc.Callable[..., typing.Any],
    func_kwargs: collections.abc.Collection[dict[str, typing.Any]],
    *,
    max_workers: int | None = None,
    executor: multifutures._common.ExecutorProtocol | None = None,
    check: bool = False,
    include_kwargs: bool = True,
    progress_bar: multifutures._common.ProgressBarProtocol | bool | None = True
) -> list[multifutures._multi.FutureResult]

Call func over all the items of func_kwargs using a multithreading Pool and return a list of FutureResult objects.

The pool by default is an instance of ThreadPoolExecutor. The pool size can be limited with max_workers. If more control is needed, then the caller should directly pass a ThreadPoolExecutor instance to executor. For example:

# Customize executor
executor = concurrent.futures.ThreadPoolExecutor(
    max_workers=4, thread_name_prefix="AAA"
)
multithread(func, func_kwargs, executor=executor)

The progress of the pool's workers can be monitored by using a tqdm based progress bar. The progress bar is displayed by default. Disabling the progress bar is possible by passing progress_bar=False. Further customizing of the progress bar should be done by creating a tqdm.tqdm instance and passing it as an argument to progress_bar. For example:

# Run without a progress bar
results = multithread(func, func_kwargs, progress_bar=False)

# Run with a rich based progress bar
rich_based_progress_bar = tqdm.rich.tqdm(func_kwargs)
results = multithread(func, func_kwargs, progress_bar=rich_based_progress_bar)
PARAMETER DESCRIPTION
func

The function that will be scheduled for execution via multithreading.

TYPE: collections.abc.Callable[..., typing.Any]

func_kwargs

A Collection of dictionaries that will be used for the calls of func.

TYPE: collections.abc.Collection[dict[str, typing.Any]]

executor

The multithreading Executor instance that we want to use. Defaults to the standard library's ThreadPoolExecutor.

TYPE: multifutures._common.ExecutorProtocol | None DEFAULT: None

check

If True then if any of the func calls raised an exception, an ExceptionGroup containing all the exceptions will be raised.

TYPE: bool DEFAULT: False

include_kwargs

A boolean flag indicating whether the keyword arguments of the functions will be present in the FurureResult object. Setting this to False is useful for keeping memory usage down when the input are "heavy" objects like xarray.Dataset etc.

TYPE: bool DEFAULT: True

progress_bar

An instance of a progress bar implementing the tqdm API. Defaults to tqdm.auto.tqdm.

TYPE: multifutures._common.ProgressBarProtocol | bool | None DEFAULT: True

RETURNS DESCRIPTION
list[multifutures._multi.FutureResult]

A list of FutureResult objects. Each object will contain the output of func(**func_kwarg) or an Exception.

RAISES DESCRIPTION
exceptiongroup.ExceptionGroup

If any of the function calls raises an exception and check is True.

Source code in multifutures/_multi.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def multithread(
    func: abc.Callable[..., T.Any],
    func_kwargs: abc.Collection[dict[str, T.Any]],
    *,
    max_workers: int | None = None,
    executor: ExecutorProtocol | None = None,
    check: bool = False,
    include_kwargs: bool = True,
    progress_bar: ProgressBarProtocol | bool | None = True,
) -> list[FutureResult]:
    """
    Call `func` over all the items of `func_kwargs` using a multithreading Pool
    and return a list of [FutureResult][multifutures.FutureResult] objects.

    The pool by default is an instance of [ThreadPoolExecutor][concurrent.futures.ThreadPoolExecutor].
    The pool size can be limited with `max_workers`. If more control is needed,
    then the caller should directly pass a [ThreadPoolExecutor][concurrent.futures.ThreadPoolExecutor]
    instance to `executor`. For example:

    ``` python
    # Customize executor
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=4, thread_name_prefix="AAA"
    )
    multithread(func, func_kwargs, executor=executor)
    ```

    The progress of the pool's workers can be monitored by using a
    [tqdm](https://tqdm.github.io/) based progress bar.
    The progress bar is displayed by default.
    Disabling the progress bar is possible by passing `progress_bar=False`.
    Further customizing of the progress bar should be done by creating a `tqdm.tqdm`
    instance and passing it as an argument to `progress_bar`. For example:

    ``` python
    # Run without a progress bar
    results = multithread(func, func_kwargs, progress_bar=False)

    # Run with a rich based progress bar
    rich_based_progress_bar = tqdm.rich.tqdm(func_kwargs)
    results = multithread(func, func_kwargs, progress_bar=rich_based_progress_bar)
    ```

    Arguments:
        func: The function that will be scheduled for execution via multithreading.
        func_kwargs: A Collection of dictionaries that will be used for the calls of `func`.
        executor: The multithreading [Executor][concurrent.futures.Executor] instance that we want to use.
            Defaults to the standard library's [ThreadPoolExecutor][concurrent.futures.ThreadPoolExecutor].
        check: If `True` then if any of the `func` calls raised an exception, an
            [ExceptionGroup][ExceptionGroup] containing all the exceptions will be raised.
        include_kwargs: A boolean flag indicating whether the keyword arguments of the
            functions will be present in the [FurureResult][multifutures.FutureResult]
            object. Setting this to `False` is useful for keeping memory usage down when
            the input are "heavy" objects like [xarray.Dataset][xarray.Dataset] etc.
        progress_bar: An instance of a progress bar implementing the `tqdm` API. Defaults to
            `tqdm.auto.tqdm`.

    Returns:
        A list of [FutureResult][multifutures.FutureResult] objects. Each object
            will contain the output of `func(**func_kwarg)` or an `Exception`.

    Raises:
        exceptiongroup.ExceptionGroup: If any of the function calls raises an exception and `check` is True.

    """
    progress_bar = _resolve_progress_bar(progress_bar, func_kwargs)
    executor = _resolve_multithreading_executor(executor=executor, max_workers=max_workers)
    results = _multi(
        executor=executor,
        func=func,
        func_kwargs=func_kwargs,
        include_kwargs=include_kwargs,
        progress_bar=progress_bar,
        check=check,
    )
    return results

multifutures.FutureResult dataclass

A dataclass that holds the results of a function executed via multiprocess or multithread.

Note

Not meant to be instantiated directly.

For example in the following snippet, results is going to be a list of FutureResult objects

>>> func = lambda it: sum(it)
>>> func_kwargs = [dict(it=range(i)) for i in range(5)]
>>> results = multiprocess(func, func_kwargs)
>>> results
[FutureResult(exception=None, kwargs={'iterable': range(0, 0)}, result=0),
 FutureResult(exception=None, kwargs={'iterable': range(0, 1)}, result=0),
 FutureResult(exception=None, kwargs={'iterable': range(0, 2)}, result=1),
 FutureResult(exception=None, kwargs={'iterable': range(0, 3)}, result=3),
 FutureResult(exception=None, kwargs={'iterable': range(0, 4)}, result=6)]
PARAMETER DESCRIPTION
exception

Will be None unless an Exception has been raised during the function execution in which case the attribute will contain the exception object.

TYPE: Exception | None

kwargs

May contain the keyword arguments that were used in the function call. If the objects passed as kwargs are too big (RAM-wise), e.g. big dataframes, you can omit them by passing include_kwargs=False in multiprocess/multithread".

TYPE: dict[str, typing.Any] | None

result

Will contain the function's output. If an Exception has been raised, then it will be None.

TYPE: typing.Any

Source code in multifutures/_multi.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@dataclasses.dataclass
class FutureResult:
    """
    A [dataclass][dataclasses.dataclass] that holds the results of a function executed via
    [multiprocess][multifutures.multiprocess] or [multithread][multifutures.multithread].

    Note:
        Not meant to be instantiated directly.

    For example in the following snippet, `results` is going to be a list of `FutureResult` objects

        >>> func = lambda it: sum(it)
        >>> func_kwargs = [dict(it=range(i)) for i in range(5)]
        >>> results = multiprocess(func, func_kwargs)
        >>> results
        [FutureResult(exception=None, kwargs={'iterable': range(0, 0)}, result=0),
         FutureResult(exception=None, kwargs={'iterable': range(0, 1)}, result=0),
         FutureResult(exception=None, kwargs={'iterable': range(0, 2)}, result=1),
         FutureResult(exception=None, kwargs={'iterable': range(0, 3)}, result=3),
         FutureResult(exception=None, kwargs={'iterable': range(0, 4)}, result=6)]

    Parameters:
        exception:
            Will be `None` unless an `Exception` has been raised during the function execution
            in which case the attribute will contain the exception object.
        kwargs:
            May contain the keyword arguments that were used in the function call.
            If the objects passed as `kwargs` are too big (RAM-wise), e.g. big dataframes,
            you can omit them by passing `include_kwargs=False` in
            [multiprocess][multifutures.multiprocess]/[multithread][multifutures.multithread]".
        result:
            Will contain the function's output. If an `Exception` has been raised, then it will be `None`.
    """

    exception: Exception | None
    kwargs: dict[str, T.Any] | None
    result: T.Any

multifutures.check_results

check_results(results: list[multifutures._multi.FutureResult]) -> None

Raises an ExceptionGroup if any of the results contains an exception.

RAISES DESCRIPTION
ExceptionGroup

An exception group containing all the individual exceptions. Support for Python < 3.11 is provided via the exceptiongroup package. Check here for more info.

Source code in multifutures/_multi.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def check_results(results: list[FutureResult]) -> None:
    """
    Raises an [ExceptionGroup][ExceptionGroup] if any of the `results` contains an exception.

    Raises:
        ExceptionGroup: An exception group containing all the individual exceptions.
            Support for Python < 3.11 is provided via the [exceptiongroup](exceptiongroup) package.
            Check [here](https://github.com/agronholm/exceptiongroup#catching-exceptions) for more info.
    """

    exceptions = [r.exception for r in results if r.exception is not None]
    if exceptions:
        exception_group = exceptiongroup.ExceptionGroup("There were exceptions", exceptions)
        raise exception_group