Description
Bug report
I believe I've found a bug in how the multiprocessing
package passes the Connection
s that Pipe
creates down to the child worker process, but only on macOS.
The following minimal example demonstrates the problem:
def _mp_job(nth, child):
print("Nth is", nth)
if __name__ == "__main__":
from multiprocessing import Pool, Pipe, set_start_method, log_to_stderr
import logging, time
set_start_method("spawn")
logger = log_to_stderr()
logger.setLevel(logging.DEBUG)
with Pool(processes = 10) as mp_pool:
jobs = []
for i in range(20):
parent, child = Pipe()
# child = None
r = mp_pool.apply_async(_mp_job, args = (i, child))
jobs.append(r)
while jobs:
new_jobs = []
for job in jobs:
if not job.ready():
new_jobs.append(job)
jobs = new_jobs
print("%d jobs remaining" % len(jobs))
time.sleep(1)
On Linux, this script prints Nth is 0
, etc., 20 times and exits. On macOS, it does the same if the line child = None
is not commented out. If that line is commented out - i.e., if the child Connection
is passed in the args
of apply_async()
- not all the jobs are done, and the script will frequently (if not always) loop forever, reporting some number of jobs remaining.
The logging shows approximately what's happening: the output will have a number of lines of this form:
[DEBUG/SpawnPoolWorker-10] worker got EOFError or OSError -- exiting
and the number of log records of that type is exactly the number of jobs reported remaining. This debug message is reported by the worker()
function in multiprocessing/pool.py
, as it dequeues a task:
try:
task = get()
except (EOFError, OSError):
util.debug('worker got EOFError or OSError -- exiting')
break
When I insert a traceback printout before the debug statement, I find that it's reporting ConnectionRefusedError
, presumably as it attempts to unpickle the Connection
object in the worker:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 112, in worker
task = get()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 354, in get
return _ForkingPickler.loads(res)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 961, in rebuild_connection
fd = df.detach()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/resource_sharer.py", line 57, in detach
with _resource_sharer.get_connection(self._id) as conn:
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/resource_sharer.py", line 87, in get_connection
c = Client(address, authkey=process.current_process().authkey)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 620, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused
The error is caught and the worker exits, but it's already dequeued the task, so the task never gets done.
Note that this has to be due to the Connection
object being passed; if I uncomment child = None
, the code works fine. Note that it also has nothing to do with anything passed through the Pipe
, since the code passes nothing through the pipe. It also has nothing to do with the connection objects being garbage collected because there's no reference to them in the parent process; if I save them in a global list, I get the same error.
I don't understand how this could possibly happen; the Pipe
is created with socket.socketpair()
, and I was under the impression that sockets created that way don't require any other initialization to communicate. I do know that it's a race condition; if I insert a short sleep after I create the Pipe
, say, .1 second, the code works fine. I've also observed that this is much more likely to happen with large numbers of workers; if the number of workers is 2, I almost never observe the problem.
Your environment
Breaks:
- CPython versions tested on: 3.7.9, 3.11
- Operating system and architecture: macOS 12.6.2, Intel 6-core i7
- CPython versions tested on: 3.8.2
- Operating system and architecture: macOS 10.15.7, Intel quad core i7
Works:
- CPython versions tested on: 3.8.10
- Operating system and architecture: Linux
Linked PRs
- gh-101225: Fix hang when passing Pipe instances to child in multiprocessing #113567
- [3.12] gh-101225: Increase the socket backlog when creating a multiprocessing.connection.Listener (GH-113567) #114018
- [3.11] gh-101225: Increase the socket backlog when creating a multiprocessing.connection.Listener (GH-113567) #114019
Metadata
Metadata
Assignees
Projects
Status