@@ -12,7 +12,7 @@ module Concurrent
12
12
# @see Concurrent::MVar
13
13
# @see Concurrent::Dereferenceable
14
14
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger
15
- class Exchanger
15
+ class MVarExchanger
16
16
17
17
EMPTY = Object . new
18
18
@@ -34,23 +34,104 @@ def initialize(opts = {})
34
34
# thread. nil (default value) means no timeout
35
35
# @return [Object] the value exchanged by the other thread; nil if timed out
36
36
def exchange ( value , timeout = nil )
37
- first = @first . take ( timeout )
38
- if first == MVar ::TIMEOUT
39
- nil
40
- elsif first == EMPTY
41
- @first . put value
42
- second = @second . take timeout
43
- if second == MVar ::TIMEOUT
44
- nil
37
+
38
+ # Both threads modify the first variable
39
+ first_result = @first . modify ( timeout ) do |first |
40
+ # Does it currently contain the special empty value?
41
+ if first == EMPTY
42
+ # If so, modify it to contain our value
43
+ value
45
44
else
46
- second
45
+ # Otherwise, modify it back to the empty state
46
+ EMPTY
47
47
end
48
+ end
49
+
50
+ # If that timed out, the whole operation timed out
51
+ return nil if first_result == MVar ::TIMEOUT
52
+
53
+ # What was in @first before we modified it?
54
+ if first_result == EMPTY
55
+ # We stored our object - someone else will turn up with the second
56
+ # object at some point in the future
57
+
58
+ # Wait for the second object to appear
59
+ second_result = @second . take ( timeout )
60
+
61
+ # If that timed out, the whole operation timed out
62
+ return nil if second_result == MVar ::TIMEOUT
63
+
64
+ # BUT HOW DO WE CANCEL OUR RESULT BEING AVAILABLE IN @first?
65
+
66
+ # Return that second object
67
+ second_result
48
68
else
49
- @first . put EMPTY
50
- @second . put value
51
- first
69
+ # We reset @first to be empty again - so the other value is in
70
+ # first_result and we need to tell the other thread about our value
71
+
72
+ # Tell the other thread about our object
73
+ second_result = @second . put ( value , timeout )
74
+
75
+ # If that timed out, the whole operation timed out
76
+ return nil if second_result == MVar ::TIMEOUT
77
+
78
+ # We already have its object
79
+ first_result
52
80
end
53
81
end
82
+ end
83
+
84
+ class SlotExchanger
85
+
86
+ def initialize
87
+ @mutex = Mutex . new
88
+ @condition = Condition . new
89
+ @slot = new_slot
90
+ end
91
+
92
+ def exchange ( value , timeout = nil )
93
+ @mutex . synchronize do
94
+
95
+ replace_slot_if_fulfilled
54
96
97
+ slot = @slot
98
+
99
+ if slot . state == :empty
100
+ slot . value_1 = value
101
+ slot . state = :waiting
102
+ wait_for_value ( slot , timeout )
103
+ slot . value_2
104
+ else
105
+ slot . value_2 = value
106
+ slot . state = :fulfilled
107
+ @condition . broadcast
108
+ slot . value_1
109
+ end
110
+ end
111
+ end
112
+
113
+ Slot = Struct . new ( :value_1 , :value_2 , :state )
114
+
115
+ private_constant :Slot
116
+
117
+ private
118
+
119
+ def replace_slot_if_fulfilled
120
+ @slot = new_slot if @slot . state == :fulfilled
121
+ end
122
+
123
+ def wait_for_value ( slot , timeout )
124
+ remaining = Condition ::Result . new ( timeout )
125
+ while slot . state == :waiting && remaining . can_wait?
126
+ remaining = @condition . wait ( @mutex , remaining . remaining_time )
127
+ end
128
+ end
129
+
130
+ def new_slot
131
+ Slot . new ( nil , nil , :empty )
132
+ end
55
133
end
134
+
135
+ Exchanger = SlotExchanger
136
+
56
137
end
0 commit comments