Skip to content

Commit a3d9e74

Browse files
committed
First attempt on implementation of counting semaphores
1 parent 6af1316 commit a3d9e74

File tree

3 files changed

+441
-0
lines changed

3 files changed

+441
-0
lines changed

lib/concurrent/atomic/semaphore.rb

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
require 'concurrent/atomic/condition'
2+
3+
module Concurrent
4+
5+
class MutexSemaphore
6+
7+
# @!macro [attach] semaphore_method_initialize
8+
#
9+
# Create a new `Semaphore` with the initial `count`.
10+
#
11+
# @param [Fixnum] count the initial count
12+
#
13+
# @raise [ArgumentError] if `count` is not an integer or is less than zero
14+
def initialize(count)
15+
unless count.is_a?(Fixnum) && count >= 0
16+
raise ArgumentError.new('count must be an non-negative integer')
17+
end
18+
@mutex = Mutex.new
19+
@condition = Condition.new
20+
@free = count
21+
end
22+
23+
# @!macro [attach] semaphore_method_acquire
24+
#
25+
# Acquires the given number of permits from this semaphore, blocking until all are available.
26+
#
27+
# @param [Fixnum] permits Number of permits to acquire
28+
#
29+
# @raise [ArgumentError] if `permits` is not an integer or is less than one
30+
#
31+
# @return [True]
32+
def acquire(permits = 1)
33+
unless permits.is_a?(Fixnum) && permits > 0
34+
raise ArgumentError.new('permits must be an integer greater than zero')
35+
end
36+
@mutex.synchronize do
37+
try_acquire_timed(permits, nil)
38+
end
39+
end
40+
41+
# @!macro [attach] semaphore_method_available_permits
42+
#
43+
# Returns the current number of permits available in this semaphore.
44+
#
45+
# @return [Integer]
46+
def available_permits
47+
@mutex.synchronize { @free }
48+
end
49+
50+
# @!macro [attach] semaphore_method_drain_permits
51+
#
52+
# Acquires and returns all permits that are immediately available.
53+
#
54+
# @return [Integer]
55+
def drain_permits
56+
@mutex.synchronize do
57+
@free.tap { |_| @free = 0 }
58+
end
59+
end
60+
61+
# @!macro [attach] semaphore_method_try_acquire
62+
#
63+
# Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
64+
#
65+
# @param [Fixnum] permits the number of permits to acquire
66+
#
67+
# @param [Fixnum] timeout the number of seconds to wait for the counter or `nil`
68+
# to return immediately
69+
#
70+
# @raise [ArgumentError] if `permits` is not an integer or is less than one
71+
#
72+
# @return [Boolean] `false` if no permits are available, `true` when acquired a permit
73+
def try_acquire(permits = 1, timeout = nil)
74+
unless permits.is_a?(Fixnum) && permits > 0
75+
raise ArgumentError.new('permits must be an integer greater than zero')
76+
end
77+
@mutex.synchronize do
78+
if timeout.nil?
79+
try_acquire_now(permits)
80+
else
81+
try_acquire_timed(permits, timeout)
82+
end
83+
end
84+
end
85+
86+
# @!macro [attach] semaphore_method_release
87+
#
88+
# Releases the given number of permits, returning them to the semaphore.
89+
#
90+
# @param [Fixnum] permits Number of permits to return to the semaphore.
91+
#
92+
# @raise [ArgumentError] if `permits` is not a number or is less than one
93+
#
94+
# @return [True]
95+
def release(permits = 1)
96+
unless permits.is_a?(Fixnum) && permits > 0
97+
raise ArgumentError.new('permits must be an integer greater than zero')
98+
end
99+
@mutex.synchronize do
100+
@free += permits
101+
permits.times { @condition.signal }
102+
end
103+
true
104+
end
105+
106+
# @!macro [attach] semaphore_method_reduce_permits
107+
#
108+
# Shrinks the number of available permits by the indicated reduction.
109+
#
110+
# @param [Fixnum] reduction Number of permits to remove.
111+
#
112+
# @raise [ArgumentError] if `reduction` is not an integer or is negative
113+
#
114+
# @raise [ArgumentError] if `@free` - `@reduction` is less than zero
115+
#
116+
# @return [True]
117+
def reduce_permits(reduction)
118+
unless reduction.is_a?(Fixnum) && reduction >= 0
119+
raise ArgumentError.new('reduction must be an non-negative integer')
120+
end
121+
unless @free - reduction >= 0
122+
raise ArgumentError.new('cannot reduce number of available_permits below zero')
123+
end
124+
@mutex.synchronize do
125+
@free -= reduction
126+
end
127+
true
128+
end
129+
130+
private
131+
132+
def try_acquire_now(permits)
133+
if @free >= permits
134+
@free -= permits
135+
true
136+
else
137+
false
138+
end
139+
end
140+
141+
def try_acquire_timed(permits, timeout)
142+
remaining = Condition::Result.new(timeout)
143+
while !try_acquire_now(permits) && remaining.can_wait?
144+
@condition.signal
145+
remaining = @condition.wait(@mutex, remaining.remaining_time)
146+
end
147+
remaining.can_wait? ? true : false
148+
end
149+
end
150+
151+
if RUBY_PLATFORM == 'java'
152+
153+
# @!macro count_down_latch
154+
class JavaSemaphore
155+
156+
# @!macro count_down_latch_method_initialize
157+
def initialize(count)
158+
unless count.is_a?(Fixnum) && count >= 0
159+
raise ArgumentError.new('count must be in integer greater than or equal zero')
160+
end
161+
@semaphore = java.util.concurrent.Semaphore.new(count)
162+
end
163+
164+
def acquire(permits = 1)
165+
unless permits.is_a?(Fixnum) && permits > 0
166+
raise ArgumentError.new('permits must be an integer greater than zero')
167+
end
168+
@semaphore.acquire(permits);
169+
end
170+
171+
172+
# @!macro [attach] semaphore_method_available_permits
173+
#
174+
# Returns the current number of permits available in this semaphore.
175+
#
176+
# @return [Integer]
177+
def available_permits
178+
@semaphore.availablePermits
179+
end
180+
181+
# @!macro [attach] semaphore_method_drain_permits
182+
#
183+
# Acquires and returns all permits that are immediately available.
184+
#
185+
# @return [Integer]
186+
def drain_permits
187+
@semaphore.drainPermits
188+
end
189+
190+
# @!macro [attach] semaphore_method_try_acquire
191+
#
192+
# Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
193+
#
194+
# @param [Fixnum] permits the number of permits to acquire
195+
#
196+
# @param [Fixnum] timeout the number of seconds to wait for the counter or `nil`
197+
# to return immediately
198+
#
199+
# @raise [ArgumentError] if `permits` is not an integer or is less than one
200+
#
201+
# @return [Boolean] `false` if no permits are available, `true` when acquired a permit
202+
def try_acquire(permits = 1, timeout = nil)
203+
unless permits.is_a?(Fixnum) && permits > 0
204+
raise ArgumentError.new('permits must be an integer greater than zero')
205+
end
206+
if timeout.nil?
207+
@semaphore.try_acquire(permits)
208+
else
209+
@semaphore.try_acquire(permits, timeout, java.util.concurrent.TimeUnit::SECONDS)
210+
end
211+
end
212+
213+
# @!macro [attach] semaphore_method_release
214+
#
215+
# Releases the given number of permits, returning them to the semaphore.
216+
#
217+
# @param [Fixnum] permits Number of permits to return to the semaphore.
218+
#
219+
# @raise [ArgumentError] if `permits` is not a number or is less than one
220+
#
221+
# @raise [ArgumentError] if `permits` + `@free` is larger than `@count`
222+
#
223+
# @return [True]
224+
def release(permits = 1)
225+
unless permits.is_a?(Fixnum) && permits > 0
226+
raise ArgumentError.new('permits must be an integer greater than zero')
227+
end
228+
@semaphore.release(permits)
229+
true
230+
end
231+
232+
# @!macro [attach] semaphore_method_reduce_permits
233+
#
234+
# Shrinks the number of available permits by the indicated reduction.
235+
#
236+
# @param [Fixnum] reduction Number of permits to remove.
237+
#
238+
# @raise [ArgumentError] if `reduction` is not an integer or is negative
239+
#
240+
# @raise [ArgumentError] if the operation would bring `@free` below zero
241+
#
242+
# @return [True]
243+
def reduce_permits(reduction)
244+
unless reduction.is_a?(Fixnum) && reduction >= 0
245+
raise ArgumentError.new('reduction must be an non-negative integer')
246+
end
247+
unless @free - reduction >= 0
248+
raise ArgumentError.new('cannot reduce number of available_permits below zero')
249+
end
250+
@semaphore.reducePermits(void)
251+
end
252+
253+
254+
end
255+
256+
# @!macro count_down_latch
257+
class Semaphore < JavaSemaphore
258+
end
259+
260+
else
261+
262+
# @!macro count_down_latch
263+
class Semaphore < MutexSemaphore
264+
end
265+
end
266+
end

lib/concurrent/atomics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
require 'concurrent/atomic/count_down_latch'
99
require 'concurrent/atomic/event'
1010
require 'concurrent/atomic/synchronization'
11+
require 'concurrent/atomic/semaphore'

0 commit comments

Comments
 (0)