Closed
Description
Feature or enhancement
Proposal:
Consider the following use case:
An asyncio event loop where the majority of the handles being run have a very small run time because they are decoding Bluetooth or Zigbee packets.
In this case _run_once
becomes a bit of a bottleneck. In production, the min
and max
calls to find the timeout represent a significant portion of the run time. We can increase the number of packets that can be processed per second by switching the min
and max
to a simple >
and <
check.
import heapq
import timeit
from asyncio import events
from asyncio.base_events import (
_MIN_CANCELLED_TIMER_HANDLES_FRACTION,
_MIN_SCHEDULED_TIMER_HANDLES,
MAXIMUM_SELECT_TIMEOUT,
BaseEventLoop,
_format_handle,
logger,
)
from time import monotonic
class FakeSelector:
def select(self, timeout):
"""Wait for I/O events."""
fake_selector = FakeSelector()
original_loop = BaseEventLoop()
original_loop._selector = fake_selector
original_loop._process_events = lambda events: None
timer = events.TimerHandle(
monotonic() + 100000, lambda: None, ("any",), original_loop, None
)
timer._scheduled = True
heapq.heappush(original_loop._scheduled, timer)
class OptimizedBaseEventLoop(BaseEventLoop):
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (
sched_count > _MIN_SCHEDULED_TIMER_HANDLES
and self._timer_cancelled_count / sched_count
> _MIN_CANCELLED_TIMER_HANDLES_FRACTION
):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
if timeout > MAXIMUM_SELECT_TIMEOUT:
timeout = MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
timeout = 0
event_list = self._selector.select(timeout)
self._process_events(event_list)
# Needed to break cycles when an exception occurs.
event_list = None
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning(
"Executing %s took %.3f seconds", _format_handle(handle), dt
)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
new_loop = OptimizedBaseEventLoop()
new_loop._selector = fake_selector
new_loop._process_events = lambda events: None
timer = events.TimerHandle(monotonic() + 100000, lambda: None, ("any",), new_loop, None)
timer._scheduled = True
heapq.heappush(new_loop._scheduled, timer)
new_time = timeit.timeit(
"loop._run_once()",
number=1000000,
globals={"loop": new_loop},
)
print("new: %s" % new_time)
original_time = timeit.timeit(
"loop._run_once()",
number=1000000,
globals={"loop": original_loop},
)
print("original: %s" % original_time)
new: 0.36033158400096
original: 0.4667800000170246
Has this already been discussed elsewhere?
This is a minor feature, which does not need previous discussion elsewhere
Links to previous discussion of this feature:
No response
Linked PRs
Metadata
Metadata
Assignees
Projects
Status
Done