Cancel running work in ProcessPoolExecutor

In otherwise async code, I need to execute a sync function whose code is not under my control. This may run for some time and do a lot of CPU-intensive work and does not support async. I’m currently using ProcessPoolExecutor to do this, which almost does everything I need.

The remaining problem is cancellation and timeouts. The Executor framework doesn’t support stopping work in progress. For ThreadPoolExecutor, I think this is unavoidable, since I don’t think Python provides a way to cancel the thread. But for ProcessPoolExecutor, it is possible to kill the running process to force the execution to stop.

I’d like ProcessPoolExecutor to expose an API to do this, since it presumably knows which process is running a particular work item. I considered other approaches, such as using multiprocessing directly, but ProcessPoolExecutor already has a lot of useful integration with async code that I believe I’d have to reproduce if using a different approach.

The following ugly hack does already work today:

def _restart_pool(pool: ProcessPoolExecutor) -> ProcessPoolExecutor:
    """Restart the pool after timeout or job cancellation.

    This is a horrible, fragile hack, but it appears to be the only
    way to enforce a timeout currently in Python since there is no way
    to abort a job already in progress. Find the processes underlying
    the pool, kill them, and then shut down and recreate the pool.
    """
    for pid in pool._processes:  # noqa: SLF001
        os.kill(pid, signal.SIGINT)
    pool.shutdown(wait=True)
    return ProcessPoolExecutor(1)

This is used as follows (heavily simplified):

pool = ProcessPoolExecutor(1)
loop = asyncio.get_running_loop()
try:
    async with asyncio.timeout(timeout.total_seconds()):
        return await loop.run_in_executor(pool, worker, params)
except (asyncio.CancelledError, TimeoutError):
    pool = _restart_pool(pool)
    raise

However, this requires messing with the internals of ProcessPoolExecutor. I could use an initialization function to get the PID, but I’m still messing with its processes behind its back. Ideally, I’d be able to tell it to kill only the relevant process and restart it, raising some reasonable exception from the watcher task.

If I prepared a PR for this, is this something that might be considered?

7 Likes

This sounds like a useful feature request to me. I’m curious what your implementation would look like, feel free to open a feature request issue for this if one does not already exist and to loop me in on draft PR if you come up with an implementation.

This would very much be a concurrent.futures.ProcessPoolExecutor exclusive feature; it is not possible to do this safely and reliably with threads.

1 Like

I wonder if a more generic get_worker_object() method on the future object might be a bit better.

Then the API could allow you to get the thread objects if needed for thread pools while process pools could give the process object. (Then kill() could be called to stop the process worker). If the worker is not running, it would give None.

Similarly the executor could allow public access to its workers.

1 Like

Per future ability to get the worker is an interesting idea. But my mind races ahead to see race conditions with that approach.

The original post is looking for a way to terminate the entire pool early. Not terminate the worker processing one specific item.

If we aim at a specific future, even if the proposed get_worker_object() were to return something, who’s to say that the worker is still executing work for that future by the time we tried to .kill() it vs already executing something else from the pool?

The desire to immediately stop work of a specific future is far different than immediately stopping an entire pool. The former requires synchronization, the latter is brute force.

1 Like

Good points. I wonder if a proxy object that only allows access while running the task would work. The after the task finishes it could be given a value to invalidate or something.