Skip to content

Bug in multiprocessing + Pipes on macOS #101225

Closed
@slbayer

Description

@slbayer

Bug report

I believe I've found a bug in how the multiprocessing package passes the Connections that Pipe creates down to the child worker process, but only on macOS.

The following minimal example demonstrates the problem:

def _mp_job(nth, child):
    print("Nth is", nth)

if __name__ == "__main__":
    from multiprocessing import Pool, Pipe, set_start_method, log_to_stderr
    import logging, time

    set_start_method("spawn")
    logger = log_to_stderr()

    logger.setLevel(logging.DEBUG)

    with Pool(processes = 10) as mp_pool:

        jobs = []
        for i in range(20):
            parent, child = Pipe()
            # child = None
            r = mp_pool.apply_async(_mp_job, args = (i, child))
            jobs.append(r)

        while jobs:
            new_jobs = []
            for job in jobs:
                if not job.ready():
                    new_jobs.append(job)
            jobs = new_jobs
            print("%d jobs remaining" % len(jobs))
            time.sleep(1)

On Linux, this script prints Nth is 0, etc., 20 times and exits. On macOS, it does the same if the line child = None is not commented out. If that line is commented out - i.e., if the child Connection is passed in the args of apply_async() - not all the jobs are done, and the script will frequently (if not always) loop forever, reporting some number of jobs remaining.

The logging shows approximately what's happening: the output will have a number of lines of this form:

[DEBUG/SpawnPoolWorker-10] worker got EOFError or OSError -- exiting

and the number of log records of that type is exactly the number of jobs reported remaining. This debug message is reported by the worker() function in multiprocessing/pool.py, as it dequeues a task:

        try:
            task = get()
        except (EOFError, OSError): 
            util.debug('worker got EOFError or OSError -- exiting')
            break

When I insert a traceback printout before the debug statement, I find that it's reporting ConnectionRefusedError, presumably as it attempts to unpickle the Connection object in the worker:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 112, in worker
    task = get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 961, in rebuild_connection
    fd = df.detach()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/resource_sharer.py", line 57, in detach
    with _resource_sharer.get_connection(self._id) as conn:
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/resource_sharer.py", line 87, in get_connection
    c = Client(address, authkey=process.current_process().authkey)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 620, in SocketClient
    s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

The error is caught and the worker exits, but it's already dequeued the task, so the task never gets done.

Note that this has to be due to the Connection object being passed; if I uncomment child = None, the code works fine. Note that it also has nothing to do with anything passed through the Pipe, since the code passes nothing through the pipe. It also has nothing to do with the connection objects being garbage collected because there's no reference to them in the parent process; if I save them in a global list, I get the same error.

I don't understand how this could possibly happen; the Pipe is created with socket.socketpair(), and I was under the impression that sockets created that way don't require any other initialization to communicate. I do know that it's a race condition; if I insert a short sleep after I create the Pipe, say, .1 second, the code works fine. I've also observed that this is much more likely to happen with large numbers of workers; if the number of workers is 2, I almost never observe the problem.

Your environment

Breaks:

  • CPython versions tested on: 3.7.9, 3.11
  • Operating system and architecture: macOS 12.6.2, Intel 6-core i7
  • CPython versions tested on: 3.8.2
  • Operating system and architecture: macOS 10.15.7, Intel quad core i7

Works:

  • CPython versions tested on: 3.8.10
  • Operating system and architecture: Linux

Linked PRs

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions