Skip to content

Commit 8ef62bb

Browse files
committed
Merge pull request #217 from rkday/timer_set_fixes
Timer set fixes
2 parents 4e7c5b8 + 960af94 commit 8ef62bb

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ def post(intended_time, *args, &task)
5555
@queue.push(Task.new(time, args, task))
5656
@timer_executor.post(&method(:process_tasks))
5757
end
58-
59-
true
6058
end
6159

60+
@condition.signal
61+
true
6262
end
6363

6464
# For a timer, #kill is like an orderly shutdown, except we need to manually
@@ -129,8 +129,20 @@ def process_tasks
129129
interval = task.time - Time.now.to_f
130130

131131
if interval <= 0
132+
# We need to remove the task from the queue before passing
133+
# it to the executor, to avoid race conditions where we pass
134+
# the peek'ed task to the executor and then pop a different
135+
# one that's been added in the meantime.
136+
#
137+
# Note that there's no race condition between the peek and
138+
# this pop - this pop could retrieve a different task from
139+
# the peek, but that task would be due to fire now anyway
140+
# (because @queue is a priority queue, and this thread is
141+
# the only reader, so whatever timer is at the head of the
142+
# queue now must have the same pop time, or a closer one, as
143+
# when we peeked).
144+
task = mutex.synchronize { @queue.pop }
132145
@task_executor.post(*task.args, &task.op)
133-
mutex.synchronize { @queue.pop }
134146
else
135147
mutex.synchronize do
136148
@condition.wait(mutex, [interval, 60].min)

spec/concurrent/executor/timer_set_spec.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ module Concurrent
3535
expect(latch.wait(0.2)).to be_truthy
3636
end
3737

38+
it 'returns true when posting a task' do
39+
expect(subject.post(0.1) { nil }).to be true
40+
end
41+
42+
it 'executes a given task when given an interval in seconds, even if longer tasks have been scheduled' do
43+
latch = CountDownLatch.new(1)
44+
subject.post(0.5){ nil }
45+
sleep 0.1
46+
subject.post(0.1){ latch.count_down }
47+
expect(latch.wait(0.2)).to be_truthy
48+
end
49+
3850
it 'passes all arguments to the task on execution' do
3951
expected = nil
4052
latch = CountDownLatch.new(1)

0 commit comments

Comments
 (0)