In otherwise async code, I need to execute a sync function whose code is not under my control. This may run for some time and do a lot of CPU-intensive work and does not support async. I’m currently using ProcessPoolExecutor
to do this, which almost does everything I need.
The remaining problem is cancellation and timeouts. The Executor
framework doesn’t support stopping work in progress. For ThreadPoolExecutor
, I think this is unavoidable, since I don’t think Python provides a way to cancel the thread. But for ProcessPoolExecutor
, it is possible to kill the running process to force the execution to stop.
I’d like ProcessPoolExecutor
to expose an API to do this, since it presumably knows which process is running a particular work item. I considered other approaches, such as using multiprocessing
directly, but ProcessPoolExecutor
already has a lot of useful integration with async code that I believe I’d have to reproduce if using a different approach.
The following ugly hack does already work today:
def _restart_pool(pool: ProcessPoolExecutor) -> ProcessPoolExecutor:
"""Restart the pool after timeout or job cancellation.
This is a horrible, fragile hack, but it appears to be the only
way to enforce a timeout currently in Python since there is no way
to abort a job already in progress. Find the processes underlying
the pool, kill them, and then shut down and recreate the pool.
"""
for pid in pool._processes: # noqa: SLF001
os.kill(pid, signal.SIGINT)
pool.shutdown(wait=True)
return ProcessPoolExecutor(1)
This is used as follows (heavily simplified):
pool = ProcessPoolExecutor(1)
loop = asyncio.get_running_loop()
try:
async with asyncio.timeout(timeout.total_seconds()):
return await loop.run_in_executor(pool, worker, params)
except (asyncio.CancelledError, TimeoutError):
pool = _restart_pool(pool)
raise
However, this requires messing with the internals of ProcessPoolExecutor
. I could use an initialization function to get the PID, but I’m still messing with its processes behind its back. Ideally, I’d be able to tell it to kill only the relevant process and restart it, raising some reasonable exception from the watcher task.
If I prepared a PR for this, is this something that might be considered?