Skip to content

Reduce overhead to run one iteration of the asyncio event loop #110733

Closed
@bdraco

Description

@bdraco

Feature or enhancement

Proposal:

Consider the following use case:

An asyncio event loop where the majority of the handles being run have a very small run time because they are decoding Bluetooth or Zigbee packets.

In this case _run_once becomes a bit of a bottleneck. In production, the min and max calls to find the timeout represent a significant portion of the run time. We can increase the number of packets that can be processed per second by switching the min and max to a simple > and < check.

import heapq
import timeit
from asyncio import events
from asyncio.base_events import (
    _MIN_CANCELLED_TIMER_HANDLES_FRACTION,
    _MIN_SCHEDULED_TIMER_HANDLES,
    MAXIMUM_SELECT_TIMEOUT,
    BaseEventLoop,
    _format_handle,
    logger,
)
from time import monotonic


class FakeSelector:
    def select(self, timeout):
        """Wait for I/O events."""


fake_selector = FakeSelector()

original_loop = BaseEventLoop()
original_loop._selector = fake_selector
original_loop._process_events = lambda events: None
timer = events.TimerHandle(
    monotonic() + 100000, lambda: None, ("any",), original_loop, None
)
timer._scheduled = True
heapq.heappush(original_loop._scheduled, timer)


class OptimizedBaseEventLoop(BaseEventLoop):
    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
        sched_count = len(self._scheduled)
        if (
            sched_count > _MIN_SCHEDULED_TIMER_HANDLES
            and self._timer_cancelled_count / sched_count
            > _MIN_CANCELLED_TIMER_HANDLES_FRACTION
        ):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            timeout = self._scheduled[0]._when - self.time()
            if timeout > MAXIMUM_SELECT_TIMEOUT:
                timeout = MAXIMUM_SELECT_TIMEOUT
            elif timeout < 0:
                timeout = 0

        event_list = self._selector.select(timeout)
        self._process_events(event_list)
        # Needed to break cycles when an exception occurs.
        event_list = None

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning(
                            "Executing %s took %.3f seconds", _format_handle(handle), dt
                        )
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.


new_loop = OptimizedBaseEventLoop()
new_loop._selector = fake_selector
new_loop._process_events = lambda events: None
timer = events.TimerHandle(monotonic() + 100000, lambda: None, ("any",), new_loop, None)
timer._scheduled = True
heapq.heappush(new_loop._scheduled, timer)


new_time = timeit.timeit(
    "loop._run_once()",
    number=1000000,
    globals={"loop": new_loop},
)
print("new: %s" % new_time)

original_time = timeit.timeit(
    "loop._run_once()",
    number=1000000,
    globals={"loop": original_loop},
)
print("original: %s" % original_time)
new: 0.36033158400096
original: 0.4667800000170246

Has this already been discussed elsewhere?

This is a minor feature, which does not need previous discussion elsewhere

Links to previous discussion of this feature:

No response

Linked PRs

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions