summaryrefslogtreecommitdiffstats
path: root/chromium/components/webrtc/thread_wrapper.h
blob: df5572fb746488f8ab39dab83b54d062f98048e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COMPONENTS_WEBRTC_THREAD_WRAPPER_H_
#define COMPONENTS_WEBRTC_THREAD_WRAPPER_H_

#include <stdint.h>

#include <list>
#include <map>
#include <memory>

#include "base/callback_forward.h"
#include "base/compiler_specific.h"
#include "base/feature_list.h"
#include "base/memory/weak_ptr.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/current_thread.h"
#include "base/task/single_thread_task_runner.h"
#include "base/time/time.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#include "third_party/webrtc/rtc_base/thread.h"
#include "third_party/webrtc_overrides/coalesced_tasks.h"

namespace webrtc {

// Whether ThreadWrapper should schedule low-precision tasks on the metronome.
// Default: disabled.
extern const base::Feature kThreadWrapperUsesMetronome;

// ThreadWrapper implements rtc::Thread interface on top of
// Chromium's SingleThreadTaskRunner interface. Currently only the bare minimum
// that is used by P2P part of libjingle is implemented. There are two ways to
// create this object:
//
// - Call EnsureForCurrentMessageLoop(). This approach works only on threads
//   that have MessageLoop In this case ThreadWrapper deletes itself
//   automatically when MessageLoop is destroyed.
// - Using ThreadWrapper() constructor. In this case the creating code
//   must pass a valid task runner for the current thread and also delete the
//   wrapper later.
class ThreadWrapper : public base::CurrentThread::DestructionObserver,
                      public rtc::Thread {
 public:
  // A repeating callback whose TimeDelta argument indicates a duration sample.
  // What the duration represents is contextual.
  using SampledDurationCallback =
      base::RepeatingCallback<void(base::TimeDelta)>;

  // Create ThreadWrapper for the current thread if it hasn't been created
  // yet. The thread wrapper is destroyed automatically when the current
  // MessageLoop is destroyed.
  static void EnsureForCurrentMessageLoop();

  // Creates ThreadWrapper for |task_runner| that runs tasks on the
  // current thread.
  static std::unique_ptr<ThreadWrapper> WrapTaskRunner(
      scoped_refptr<base::SingleThreadTaskRunner> task_runner);

  // Returns thread wrapper for the current thread or nullptr if it doesn't
  // exist.
  static ThreadWrapper* current();

  // Sets task latency & duration sample callbacks intended to gather UMA
  // statistics. Samples are acquired periodically every several seconds by
  // ThreadWrapper. In this context,
  // * task latency is defined as the duration between the moment a task is
  //   scheduled from ThreadWrapper's task runner, and the moment
  //   it begins running.
  // * task duration is defined as the duration between the moment the
  //   ThreadWrapper begins running a task and the moment it ends
  //   executing it. It only measures durations of tasks posted to rtc::Thread.
  // The passed callbacks are called in the ThreadWrapper's task runner
  // context.
  void SetLatencyAndTaskDurationCallbacks(
      SampledDurationCallback task_latency_callback,
      SampledDurationCallback task_duration_callback);

  ThreadWrapper(const ThreadWrapper&) = delete;
  ThreadWrapper& operator=(const ThreadWrapper&) = delete;

  ~ThreadWrapper() override;

  // Sets whether the thread can be used to send messages
  // synchronously to another thread using Send() method. Set to false
  // by default to avoid potential jankiness when Send() used on
  // renderer thread. It should be set explicitly for threads that
  // need to call Send() for other threads.
  void set_send_allowed(bool allowed) { send_allowed_ = allowed; }

  // CurrentThread::DestructionObserver implementation.
  void WillDestroyCurrentMessageLoop() override;

  // rtc::MessageQueue overrides.
  void Post(const rtc::Location& posted_from,
            rtc::MessageHandler* phandler,
            uint32_t id,
            rtc::MessageData* pdata,
            bool time_sensitive) override;
  void PostDelayed(const rtc::Location& posted_from,
                   int delay_ms,
                   rtc::MessageHandler* handler,
                   uint32_t id,
                   rtc::MessageData* data) override;
  void Clear(rtc::MessageHandler* handler,
             uint32_t id,
             rtc::MessageList* removed) override;
  void Dispatch(rtc::Message* message) override;
  void Send(const rtc::Location& posted_from,
            rtc::MessageHandler* handler,
            uint32_t id,
            rtc::MessageData* data) override;

  // Quitting is not supported (see below); this method performs
  // NOTIMPLEMENTED_LOG_ONCE() and returns false.
  // TODO(https://crbug.com/webrtc/10364): When rtc::MessageQueue::Post()
  // returns a bool, !IsQuitting() will not be needed to infer success and we
  // may implement this as NOTREACHED() like the rest of the methods.
  bool IsQuitting() override;
  // Following methods are not supported. They are overriden just to
  // ensure that they are not called (each of them contain NOTREACHED
  // in the body). Some of this methods can be implemented if it
  // becomes necessary to use libjingle code that calls them.
  void Quit() override;
  void Restart() override;
  bool Get(rtc::Message* message, int delay_ms, bool process_io) override;
  bool Peek(rtc::Message* message, int delay_ms) override;
  int GetDelay() override;

  // rtc::Thread overrides.
  void Stop() override;
  void Run() override;

 private:
  typedef std::map<int, rtc::Message> MessagesQueue;
  struct PendingSend;
  class PostTaskLatencySampler;

  explicit ThreadWrapper(
      scoped_refptr<base::SingleThreadTaskRunner> task_runner);

  void PostTaskInternal(const rtc::Location& posted_from,
                        int delay_ms,
                        rtc::MessageHandler* handler,
                        uint32_t message_id,
                        rtc::MessageData* data);
  void RunTask(int task_id);
  void RunTaskInternal(int task_id);
  void ProcessPendingSends();

  // TaskQueueBase overrides.
  void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;
  void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
                       uint32_t milliseconds) override;
  void PostDelayedHighPrecisionTask(std::unique_ptr<webrtc::QueuedTask> task,
                                    uint32_t milliseconds) override;

  // Executes WebRTC queued tasks from TaskQueueBase overrides on
  // |task_runner_|.
  void RunTaskQueueTask(std::unique_ptr<webrtc::QueuedTask> task);
  void RunCoalescedTaskQueueTasks(base::TimeTicks scheduled_time);

  // Called before a task runs, returns an opaque optional timestamp which
  // should be passed into FinalizeRunTask.
  absl::optional<base::TimeTicks> PrepareRunTask();
  // Called after a task has run. Move the return value of PrepareRunTask as
  // |task_start_timestamp|.
  void FinalizeRunTask(absl::optional<base::TimeTicks> task_start_timestamp);

  // Task runner used to execute messages posted on this thread.
  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;

  bool send_allowed_;

  const bool use_metronome_;
  // |lock_| must be locked when accessing |messages_|.
  base::Lock lock_;
  int last_task_id_;
  MessagesQueue messages_;
  std::list<PendingSend*> pending_send_messages_;
  base::WaitableEvent pending_send_event_;
  std::unique_ptr<PostTaskLatencySampler> latency_sampler_;
  SampledDurationCallback task_latency_callback_;
  SampledDurationCallback task_duration_callback_;
  // If |kThreadWrapperUsesMetronome| is enabled, low precision tasks are
  // coalesced onto metronome ticks and stored in |coalesced_tasks_| until they
  // are ready to run.
  blink::CoalescedTasks coalesced_tasks_;

  base::WeakPtr<ThreadWrapper> weak_ptr_;
  base::WeakPtrFactory<ThreadWrapper> weak_ptr_factory_{this};
};

}  // namespace webrtc

#endif  // COMPONENTS_WEBRTC_THREAD_WRAPPER_H_