Skip to content

Commit 9db56e4

Browse files
committed
Add AtExit hook manager
executors' auto_termination feature is migrated
1 parent 5495f9d commit 9db56e4

File tree

5 files changed

+124
-61
lines changed

5 files changed

+124
-61
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'concurrent/version'
22

33
require 'concurrent/synchronization'
4+
require 'concurrent/at_exit'
45

56
require 'concurrent/configuration'
67

lib/concurrent/at_exit.rb

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
require 'concurrent/logging'
2+
require 'concurrent/synchronization'
3+
4+
module Concurrent
5+
6+
# Provides ability to add and remove hooks to be run at `Kernel#at_exit`, order is undefined.
7+
# Each hook is executed at most once.
8+
class AtExitImplementation < Synchronization::Object
9+
include Logging
10+
11+
def initialize(enabled = true)
12+
super()
13+
synchronize do
14+
@hooks = {}
15+
@enabled = enabled
16+
end
17+
end
18+
19+
# Add a hook to be run at `Kernel#at_exit`
20+
# @yield the hook
21+
# @return id of the hook
22+
def add(&hook)
23+
synchronize { @hooks[hook.object_id] = hook }
24+
self
25+
end
26+
27+
# Delete a hook by hook_id
28+
# @return [true, false]
29+
def delete(hook_id)
30+
!!synchronize { @hooks.delete hook_id }
31+
end
32+
33+
# Is hook with hook_id rpesent?
34+
# @return [true, false]
35+
def hook?(hook_id)
36+
synchronize { @hooks.key? hook_id }
37+
end
38+
39+
# @return copy of the hooks
40+
def hooks
41+
synchronize { @hooks }.clone
42+
end
43+
44+
# install `Kernel#at_exit` callback to execute added hooks
45+
def install
46+
synchronize do
47+
@installed ||= begin
48+
at_exit { runner }
49+
true
50+
end
51+
self
52+
end
53+
end
54+
55+
# Will it run during `Kernel#at_exit`
56+
def enabled?
57+
synchronize { @enabled }
58+
end
59+
60+
# Configure if it runs during `Kernel#at_exit`
61+
def enabled=(value)
62+
synchronize { @enabled = value }
63+
end
64+
65+
# run the hooks manually
66+
# @return ids of the hooks
67+
def run
68+
hooks, _ = synchronize { hooks, @hooks = @hooks, {} }
69+
hooks.each do |_, hook|
70+
begin
71+
hook.call
72+
rescue => error
73+
log ERROR, error
74+
end
75+
end
76+
hooks.keys
77+
end
78+
79+
private
80+
81+
def runner
82+
run if synchronize { @enabled }
83+
end
84+
end
85+
86+
private_constant :AtExitImplementation
87+
88+
# @see AtExitImplementation
89+
AtExit = AtExitImplementation.new.install
90+
end

lib/concurrent/configuration.rb

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'thread'
22
require 'concurrent/atomics'
33
require 'concurrent/errors'
4+
require 'concurrent/at_exit'
45
require 'concurrent/executors'
56
require 'concurrent/utility/processor_count'
67

@@ -39,11 +40,11 @@ def self.global_logger=(value)
3940
GLOBAL_LOGGER.value = value
4041
end
4142

42-
# Disables pool auto-termination which is called on `at_exit` callback.
43+
# Disables AtExit hooks including pool auto-termination hooks.
4344
# When disabled it will be the application
44-
# programmer's responsibility to ensure that the thread pools
45+
# programmer's responsibility to ensure that the hooks
4546
# are shutdown properly prior to application exit
46-
# by calling {.terminate_pools!} method.
47+
# by calling {AtExit.run} method.
4748
#
4849
# @note this option should be needed only because of `at_exit` ordering
4950
# issues which may arise when running some of the testing frameworks.
@@ -53,20 +54,28 @@ def self.global_logger=(value)
5354
# @note This method should *never* be called
5455
# from within a gem. It should *only* be used from within the main
5556
# application and even then it should be used only when necessary.
57+
# @see AtExit
58+
def self.disable_at_exit_hooks!
59+
AtExit.enabled = false
60+
end
61+
5662
def self.disable_executor_auto_termination!
57-
Executor::AT_EXIT_AUTO_TERMINATION.enabled = false
63+
warn '[DEPRECATED] Use Concurrent.disable_at_exit_hooks! instead'
64+
disable_at_exit_hooks!
5865
end
5966

6067
# @return [true,false]
6168
# @see .disable_executor_auto_termination!
6269
def self.disable_executor_auto_termination?
63-
Executor::AT_EXIT_AUTO_TERMINATION.enabled?
70+
warn '[DEPRECATED] Use Concurrent::AtExit.enabled? instead'
71+
AtExit.enabled?
6472
end
6573

6674
# terminates all pools and blocks until they are terminated
6775
# @see .disable_executor_auto_termination!
6876
def self.terminate_pools!
69-
Executor::AT_EXIT_AUTO_TERMINATION.terminate
77+
warn '[DEPRECATED] Use Concurrent::AtExit.run instead'
78+
AtExit.run
7079
end
7180

7281
# Global thread pool optimized for short, fast *operations*.

lib/concurrent/executor/executor.rb

Lines changed: 16 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,11 @@
11
require 'concurrent/errors'
22
require 'concurrent/logging'
3+
require 'concurrent/at_exit'
34
require 'concurrent/atomic/event'
45

56
module Concurrent
67

78
module Executor
8-
class AtExitAutoTermination
9-
def initialize(enabled = true)
10-
@mutex = Mutex.new
11-
@executors = Set.new
12-
@enabled = enabled
13-
@installed = false
14-
end
15-
16-
def add(executor)
17-
@mutex.synchronize { @executors.add executor }
18-
end
19-
20-
def remove(executor)
21-
@mutex.synchronize { @executors.delete executor }
22-
end
23-
24-
def enabled?
25-
@mutex.synchronize { @enabled }
26-
end
27-
28-
def enabled=(value)
29-
@mutex.synchronize { @enabled = value }
30-
end
31-
32-
def terminate
33-
executors = @mutex.synchronize { @executors.to_a if @enabled }
34-
if executors
35-
executors.each(&:kill) # TODO be gentle first
36-
executors.each(&:wait_for_termination)
37-
executors.clear
38-
true
39-
else
40-
false
41-
end
42-
end
43-
44-
def install
45-
@installed ||= begin
46-
at_exit { terminate }
47-
true
48-
end
49-
self
50-
end
51-
52-
def executors
53-
@mutex.synchronize { @executors.to_a }
54-
end
55-
end
56-
57-
AT_EXIT_AUTO_TERMINATION = AtExitAutoTermination.new.install
58-
599
# The policy defining how rejected tasks (tasks received once the
6010
# queue size reaches the configured `max_queue`, or after the
6111
# executor has shut down) are handled. Must be one of the values
@@ -126,13 +76,24 @@ def serialized?
12676
private
12777

12878
def ns_auto_terminate?
129-
@auto_terminate
79+
!!@auto_terminate
13080
end
13181

13282
def ns_auto_terminate=(value)
133-
AT_EXIT_AUTO_TERMINATION.add self if value == true
134-
AT_EXIT_AUTO_TERMINATION.remove self if value == false
135-
@auto_terminate = value
83+
case value
84+
when true
85+
@auto_terminate = AtExit.add { terminate_at_exit }
86+
when false
87+
AtExit.delete(@auto_terminate)
88+
@auto_terminate = nil
89+
else
90+
raise ArgumentError
91+
end
92+
end
93+
94+
def terminate_at_exit
95+
kill # TODO be gentle first
96+
wait_for_termination
13697
end
13798
end
13899

lib/concurrent/executor/serialized_execution.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ def work(job)
9898
job = @stash.shift || (@being_executed = false)
9999
end
100100

101+
# TODO maybe be able to tell caching pool to just enqueue this job, because the current one end at the end
102+
# of this block
101103
call_job job if job
102104
end
103105
end

0 commit comments

Comments
 (0)