WebRTC的线程办理

为什么是从线程开端切入整个WebRTC源码?相信只要对WebRTC有必定的了解的都清楚WebRTC内部有着自己的一套线程办理机制,WebRTC经过这套线程办理机制,十分简略就达到了多线程安全编码的意图,而且给每个线程划分归于自己的责任,便利后续保护、阅读代码 (当然,WebRTC的线程办理和Chromium、Flutter都十分相似),假如你不了解WebRTC的这套线程办理机制,阅读WebRTC代码会很懵逼,又因为线程办理并不会触及到一些专业性知识,十分合适作为切入WebRTC源码的起点。

WebRTC代码逻辑首要经过三个线程办理(这儿不介绍一些编解码线程):

  • network_thread: 网络线程,一切触及到耗时的网络操作都在这个线程处理
  • worker_thread: 工作者线程,首要担任逻辑处理,比方一些初始化代码,还有比方在网络线程接收到数据然后会传递给工作者线程进行一些数据处理然后传给解码器线程
  • signal_thread: 信令线程,信令线程通常都是工作在PeerConnect层的,也便是咱们绝大部分者调用的API都必须在信令线程,比方AddCandidate、CreateOffer等,WebRTC为了让绝大部分API都运转在信令线程,还专门做了一层Proxy层,强制将API的调用分配到信令线程(后边假如有时机,能够剖析以下WebRTC的Proxy层完结原理)

WebRTC线程管理学习

WebRTC线程之间的使命投递

WebRTC线程之间的使命(这儿的使命首要指的是函数)投递首要有两种方法

  • 同步Invoke机制,经过这个机制能够将使命指定到某个线程运转,调用Invoke API的线程将会同步等候使命履行完结
  • 异步Post机制,经过这个机制也能够将使命指定到某个线程运转,可是调用PostTask API的线程不会同步等候

Invoke机制,代码如下:

// 比方NeedsIceRestart函数是在工作者线程被调用,那么network_thread()->Invoke将会将
// lambda匿名函数从工作者线程差遣到网络线程,并等候履行完结
bool PeerConnection::NeedsIceRestart(const std::string& content_name) const {
  return network_thread()->Invoke<bool>(RTC_FROM_HERE, [this, &content_name] {
    RTC_DCHECK_RUN_ON(network_thread());
    return transport_controller_->NeedsIceRestart(content_name);
  });
}

PostTask机制,代码如下:

// 同Invoke机制不同的是,调用完PostTask之后不必等候使命履行完结
void EmulatedNetworkManager::EnableEndpoint(EmulatedEndpointImpl* endpoint) {
  network_thread_->PostTask(RTC_FROM_HERE, [this, endpoint]() {
    endpoint->Enable();
    UpdateNetworksOnce();
  });
}

WebRTC线程完结细节剖析 – Thread

留意:源码版别 M92

线程发动流程

先从WebRTC的信令、工作者、网络线程的创立开端

file://src/pc_connection_context.cc:81

ConnectionContext::ConnectionContext(
    PeerConnectionFactoryDependencies* dependencies)
    : network_thread_(MaybeStartThread(dependencies->network_thread,
                                       "pc_network_thread",
                                       true,
                                       owned_network_thread_)),
      worker_thread_(MaybeStartThread(dependencies->worker_thread,
                                      "pc_worker_thread",
                                      false,
                                      owned_worker_thread_)),
      signaling_thread_(MaybeWrapThread(dependencies->signaling_thread,
                                        wraps_current_thread_)) {
}

经过MabeStartThread函数初始化了工作者、网络线程,信令线程比较特殊一点,是由于信令线程能够直接保管进程中的主线程(准确来说应该是当时调用线程),所以调用的函数是MaybeWrapThread

MaybeStartThread

file://src/pc_connection_context.cc:27

rtc::Thread* MaybeStartThread(rtc::Thread* old_thread,
                              const std::string& thread_name,
                              bool with_socket_server,
                              std::unique_ptr<rtc::Thread>& thread_holder) {
  if (old_thread) {
    return old_thread;
  }
  if (with_socket_server) {
    thread_holder = rtc::Thread::CreateWithSocketServer();
  } else {
    thread_holder = rtc::Thread::Create();
  }
  thread_holder->SetName(thread_name, nullptr);
  thread_holder->Start();
  return thread_holder.get();
}

暂时忽略with_socket_server,后边会说明CreateWithSocketServer,MaybeStartThread全体流程

  1. old_thread假如不为空直接回来,由于WebRTC的这三个线程都是能够由外部自界说的,所以假如外部有传入自界说线程,后续线程创立操作将不会进行
  2. 调用rtc::Thread::Create
  3. 调用rtc::Thread::SetName
  4. 调用rtc::Thread::Start
  5. 线程发动完结

MaybeWrapThread

file://src/pc_connection_context.cc:44

rtc::Thread* MaybeWrapThread(rtc::Thread* signaling_thread,
                             bool& wraps_current_thread) {
  wraps_current_thread = false;
  if (signaling_thread) {
    return signaling_thread;
  }
  auto this_thread = rtc::Thread::Current();
  if (!this_thread) {
    // If this thread isn't already wrapped by an rtc::Thread, create a
    // wrapper and own it in this class.
    this_thread = rtc::ThreadManager::Instance()->WrapCurrentThread();
    wraps_current_thread = true;
  }
  return this_thread;
}

假如外部没有传入signaling_thread,内部将会获取当时线程作为signaling_thread

rtc::Thread::Start流程

  1. 调用ThreadManager::Instance() 初始化ThreadManager目标
  2. windows上调用CreateThread,linux调用pthread_create创立线程
  3. 进入线程处理函数Thread::PreRun
  4. 调用Thread::Run函数
  5. Thread::Run函数调用ProcessMessage函数

ProcessMessage

file://src/rtc_base/thread.cc:1132

bool Thread::ProcessMessages(int cmsLoop) {
  //...
  int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
  int cmsNext = cmsLoop;
  while (true) {
#if defined(WEBRTC_MAC)
    ScopedAutoReleasePool pool;
#endif
    Message msg;
    if (!Get(&msg, cmsNext))
      return !IsQuitting();
    Dispatch(&msg);
    if (cmsLoop != kForever) {
      cmsNext = static_cast<int>(TimeUntil(msEnd));
      if (cmsNext < 0)
        return true;
    }
  }
}

首要逻辑如下:函数经过一个while循环处理音讯,每次循环都会经过Get获取一个可用的Message,然后调用Dispatch差遣获取到的Message,两个首要函数Dispatch、Get。到这儿整个WebRTC线程的初始化和发动流程就介绍完了

音讯获取、差遣、投递剖析

上面的ProcessMessages,能够把它当成一个音讯循环,循环中每次都会经过Get函数去获取音讯

Get (音讯获取)

file://src/rtc_base/thread.cc:472

bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
   // ......
  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
  int64_t cmsTotal = cmsWait;
  int64_t cmsElapsed = 0;
  int64_t msStart = TimeMillis();
  int64_t msCurrent = msStart;
  while (true) {
    // Check for posted events
    int64_t cmsDelayNext = kForever;
    bool first_pass = true;
    while (true) {
      // All queue operations need to be locked, but nothing else in this loop
      // (specifically handling disposed message) can happen inside the crit.
      // Otherwise, disposed MessageHandlers will cause deadlocks.
      {
        CritScope cs(&crit_);
        // On the first pass, check for delayed messages that have been
        // triggered and calculate the next trigger time.
        if (first_pass) {
          first_pass = false;
          while (!delayed_messages_.empty()) {
            if (msCurrent < delayed_messages_.top().run_time_ms_) {
              cmsDelayNext =
                  TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
              break;
            }
            messages_.push_back(delayed_messages_.top().msg_);
            delayed_messages_.pop();
          }
        }
        // Pull a message off the message queue, if available.
        if (messages_.empty()) {
          break;
        } else {
          *pmsg = messages_.front();
          messages_.pop_front();
        }
      }  // crit_ is released here.
      // If this was a dispose message, delete it and skip it.
      if (MQID_DISPOSE == pmsg->message_id) {
        RTC_DCHECK(nullptr == pmsg->phandler);
        delete pmsg->pdata;
        *pmsg = Message();
        continue;
      }
      return true;
    }
    if (IsQuitting())
      break;
    // Which is shorter, the delay wait or the asked wait?
    int64_t cmsNext;
    if (cmsWait == kForever) {
      cmsNext = cmsDelayNext;
    } else {
      cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
        cmsNext = cmsDelayNext;
    }
    {
      // Wait and multiplex in the meantime
      if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
        return false;
    }
    // If the specified timeout expired, return
    msCurrent = TimeMillis();
    cmsElapsed = TimeDiff(msCurrent, msStart);
    if (cmsWait != kForever) {
      if (cmsElapsed >= cmsWait)
        return false;
    }
  }
  return false;
}

中心是经过一个循环来获取一个有效的音讯,循环会在Get成功、失败或许外部调用了Stop停止了线程时结束。

音讯的获取机制

  • 尝试获取推迟音讯,推迟音讯列表运用优先级列队存储,假如推迟音讯抵达运转时刻,推迟音讯将会从音讯音讯优先级行列出列,并将推迟音讯加入可履行音讯行列
  • 判断可履行音讯行列是否存在音讯,假如存在从行列头部取出一个音讯回来给外部
  • 假如可履行音讯行列为空,进行Wait操作,等候音讯到来触发WakeUp,这儿的Wait和WakeUp运用的是SocketServer目标,后边专门剖析SocketServer的Wait和wakeUp原理

或许在一开端看代码会对获取可用推迟音讯产生疑问,为什么只判断推迟音讯行列的榜首个元素的运转时刻有没有抵达,难道行列后边的音讯不会有比这个顶部音讯的运转时刻更小的吗?

while (!delayed_messages_.empty()) {
    if (msCurrent < delayed_messages_.top().run_time_ms_) {
        cmsDelayNext =
            TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
        break;
    }
    messages_.push_back(delayed_messages_.top().msg_);
    delayed_messages_.pop();
}

进一步检查delayed_messages_的界说PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);

  // DelayedMessage goes into a priority queue, sorted by trigger time. Messages
  // with the same trigger time are processed in num_ (FIFO) order.
  class DelayedMessage {
   public:
    DelayedMessage(int64_t delay,
                   int64_t run_time_ms,
                   uint32_t num,
                   const Message& msg)
        : delay_ms_(delay),
          run_time_ms_(run_time_ms),
          message_number_(num),
          msg_(msg) {}
    bool operator<(const DelayedMessage& dmsg) const {
      return (dmsg.run_time_ms_ < run_time_ms_) ||
             ((dmsg.run_time_ms_ == run_time_ms_) &&
              (dmsg.message_number_ < message_number_));
    }
    int64_t delay_ms_;  // for debugging
    int64_t run_time_ms_;
    // Monotonicaly incrementing number used for ordering of messages
    // targeted to execute at the same time.
    uint32_t message_number_;
    Message msg_;
  };
  class PriorityQueue : public std::priority_queue<DelayedMessage> {
   public:
    container_type& container() { return c; }
    void reheap() { make_heap(c.begin(), c.end(), comp); }
  };

推迟音讯行列其实便是一个大项堆的优先级音讯行列,也便是运用降序排序,DelayedMessage的大小比较是经过run_time_ms_参数,假如run_time_ms_越小其实DelayedMessage越大,假如run_time_ms_ 相等就运用message_number来比较,浅显说便是推迟时刻越小在行列中越靠前。

Message介绍

在介绍音讯差遣处理之前需求先弄清楚Message

file://src/rtc_base/thread_message.h

struct Message {
  Message() : phandler(nullptr), message_id(0), pdata(nullptr) {}
  inline bool Match(MessageHandler* handler, uint32_t id) const {
    return (handler == nullptr || handler == phandler) &&
           (id == MQID_ANY || id == message_id);
  }
  Location posted_from;
  MessageHandler* phandler;
  uint32_t message_id;
  MessageData* pdata;
};

首要看两个数据phander和pdata,对应类如下

class RTC_EXPORT MessageHandler {
 public:
  virtual ~MessageHandler() {}
  virtual void OnMessage(Message* msg) = 0;
};
class MessageData {
 public:
  MessageData() {}
  virtual ~MessageData() {}
};

两个虚基类,MesageData用来存储音讯的内容,MesageHandler用来处理音讯,运用者能够自界说归于自己的MessageHanlder和MessageData,比方咱们自界说一个自己的MessageData如下:

// 界说了一个自己的MyMessageTask,其间保存了一个function,而且对外供给了一个Run方法
template <class FunctorT>
class MyMessageTask final : public MessageData {
 public:
  explicit MessageWithFunctor(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  void Run() { functor_(); }
 private:
  ~MessageWithFunctor() override {}
  typename std::remove_reference<FunctorT>::type functor_;
};

在自己界说一个MessageHandler用来处理音讯

// OnMessage函数会在差遣音讯的时候被调用,里边的msg存放着一个MessageData目标,这个MessageData目标便是咱们自界说的MyMessageTask,获取到这个目标直接调用咱们刚刚写好的Run函数运转。
class MyMessageHandlerWithTask : public MessageHandler {
  public:
    void OnMessage(Message* msg) overrider {
      static_cast<MyMesageTask*>(msg->pdata)->Run();
      delete msg->pdata;
    }
}

上面咱们界说了一个handler和data,首要用来在收到差遣过来的音讯时经过handler处理音讯,来看看怎么运用咱们自界说的handler和data吧

// Thread::Post原型
virtual void Post(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id = 0,
                  MessageData* pdata = nullptr,
                  bool time_sensitive = false);
// 留意看Post函数里边有需求咱们传入MessageHandler和MessageData,咱们只需求将自界说
// 的MessageHandler和MessageData传入即可
static MyMessageHandlerWithTask* myhandler = new MyMessageHandlerWithTask;
MyMessageTask* mytask = new MyMessageTask([]() {int c = a+b;});
Post(FROME_HERE, myhandler, 0, mytask);

履行完上面的Post,MyMessageTask里边的匿名函数将被履行

Dispatch (音讯差遣)

介绍完Message,就能够看看Dispatch是怎么将音讯差遣到MessageHandler去处理的

file://src/rtc_base/thread.cc

void Thread::Dispatch(Message* pmsg) {
  TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
               pmsg->posted_from.file_name(), "src_func",
               pmsg->posted_from.function_name());
  RTC_DCHECK_RUN_ON(this);
  int64_t start_time = TimeMillis();
  pmsg->phandler->OnMessage(pmsg);
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= dispatch_warning_ms_) {
    RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
                     << "ms to dispatch. Posted from: "
                     << pmsg->posted_from.ToString();
    // To avoid log spew, move the warning limit to only give warning
    // for delays that are larger than the one observed.
    dispatch_warning_ms_ = diff + 1;
  }
}

Dispatch函数十分简略,抓住要点便是调用了传入的Message的OnMessage,将音讯传递给MessageHandler去处理

音讯的投递

前面有看了音讯获取的完结原理,假如没有音讯将会调用Wait进行等候,已然有Wait,那么必定就有当地触发WaitUp,没错,便是在外部投递音讯的时候会触发WaitUp, 在 WebRTC线程之间的使命投递中有介绍了两种方法,一种同步Invoke,一种异步Post

file://src/rtc_base/thread.h:449

  template <class FunctorT>
  void PostTask(const Location& posted_from, FunctorT&& functor) {
    Post(posted_from, GetPostTaskMessageHandler(), /*id=*/0,
         new rtc_thread_internal::MessageWithFunctor<FunctorT>(
             std::forward<FunctorT>(functor)));
  }

PostTask中心仍是调用了Post函数,而且传入了归于自己的MessageData和MessageHandler

file://src/rtc_base/thread.cc:563

void Thread::Post(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata,
                  bool time_sensitive) {
  RTC_DCHECK(!time_sensitive);
  if (IsQuitting()) {
    delete pdata;
    return;
  }
  // Keep thread safe
  // Add the message to the end of the queue
  // Signal for the multiplexer to return
  {
    CritScope cs(&crit_);
    Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    messages_.push_back(msg);
  }
  WakeUpSocketServer();
}
void Thread::WakeUpSocketServer() {
  ss_->WakeUp();
}

Post函数完结十分简略明晰,结构一个Message添加到行列,然后调用ss_->WakeUp()唤醒Wait,ss_是一个SocketServer目标,后边在剖析, 先看同步Invoke

file://src/rtc_base/thread.h:388

  template <
      class ReturnT,
      typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
  ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
    ReturnT result;
    InvokeInternal(posted_from, [functor, &result] { result = functor(); });
    return result;
  }
  template <
      class ReturnT,
      typename = typename std::enable_if<std::is_void<ReturnT>::value>::type>
  void Invoke(const Location& posted_from, FunctionView<void()> functor) {
    InvokeInternal(posted_from, functor);
  }

两个重载函数一个有回来成果,一个没有,内部都调用InvokeInternal完结,InvokeInternal紧接着调用了Send函数

file://src/rtc_base/thread.cc:914

void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
  RTC_DCHECK(!IsQuitting());
  if (IsQuitting())
    return;
  // Sent messages are sent to the MessageHandler directly, in the context
  // of "thread", like Win32 SendMessage. If in the right context,
  // call the handler directly.
  Message msg;
  msg.posted_from = posted_from;
  msg.phandler = phandler;
  msg.message_id = id;
  msg.pdata = pdata;
  if (IsCurrent()) {
#if RTC_DCHECK_IS_ON
    RTC_DCHECK_RUN_ON(this);
    could_be_blocking_call_count_++;
#endif
    msg.phandler->OnMessage(&msg);
    return;
  }
  AssertBlockingIsAllowedOnCurrentThread();
  Thread* current_thread = Thread::Current();
#if RTC_DCHECK_IS_ON
  if (current_thread) {
    RTC_DCHECK_RUN_ON(current_thread);
    current_thread->blocking_call_count_++;
    RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
    ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
                                                             this);
  }
#endif
  // Perhaps down the line we can get rid of this workaround and always require
  // current_thread to be valid when Send() is called.
  std::unique_ptr<rtc::Event> done_event;
  if (!current_thread)
    done_event.reset(new rtc::Event());
  bool ready = false;
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },
      [this, &ready, current_thread, done = done_event.get()] {
        if (current_thread) {
          CritScope cs(&crit_);
          ready = true;
          current_thread->socketserver()->WakeUp();
        } else {
          done->Set();
        }
      }));
  if (current_thread) {
    bool waited = false;
    crit_.Enter();
    while (!ready) {
      crit_.Leave();
      current_thread->socketserver()->Wait(kForever, false);
      waited = true;
      crit_.Enter();
    }
    crit_.Leave();
    // Our Wait loop above may have consumed some WakeUp events for this
    // Thread, that weren't relevant to this Send.  Losing these WakeUps can
    // cause problems for some SocketServers.
    //
    // Concrete example:
    // Win32SocketServer on thread A calls Send on thread B.  While processing
    // the message, thread B Posts a message to A.  We consume the wakeup for
    // that Post while waiting for the Send to complete, which means that when
    // we exit this loop, we need to issue another WakeUp, or else the Posted
    // message won't be processed in a timely manner.
    if (waited) {
      current_thread->socketserver()->WakeUp();
    }
  } else {
    done_event->Wait(rtc::Event::kForever);
  }
}

Send函数的代码比较多,不过全体思路仍是很明晰

  • 假如调用Send的线程便是Send所拥有的当时线程,直接运转Message中的OnMessage,不需求使命差遣
  • 不在同一个线程,调用PostTask将音讯传递对应线程,这儿读者或许会有一个疑问这个PostTask中的使命被差遣到什么线程了,假如你有一个Thread目标workerThread,你现在再main线程中调用workerThread.PostTask,这个使命将会被投递到你创立的Thread目标办理的的线程中,也便是workerThread中。
  • 使命被PostTask到对应线程中之后,存在两种状况,再函数运转之前或许之后,线程现已开释
  • 假如线程现已开释,只是等候一个函数履行完结的Event信号
  • 线程还存在,等候音讯履行完结,履行完结之后再调用一次WakeUp,注释中也十分具体的解说了为什么需求再履行完结之后再调用一次WakeUp,原因便是再while(!ready) {… current_thread->socketserver()->Wait()}中或许会消费掉一些外部触发的WakeUp事情,假如在履行完结之后不调用一次WakeUp或许导致外部新Post的音讯无法被即时消费

音讯投递、差遣、获取状况转移图

WebRTC线程管理学习

为了更加清楚的了解WebRTC的音讯投递、差遣、获取机制,我自己界说了4种状况,便利理解

  • Idel状况:经过调用Start,而且还没有调用Get函数前
  • Wait状况:经过调用Get函数,将Idel状况转换成Wait状况
  • Ready状况:经过调用Post状况从而触发Waitup,将Wait状况转换成Ready状况
  • Running状况:经过调用Dispatch进行音讯的处理,转换成Running状况

Current完结机制

提出疑问点:假如我想要在代码恣意位置获取当时线程的Thread目标,要怎么做?单例?

看看WebRTC Thread的Current函数原型:

class Thread {
  public:
    //......
    static Thread* Current();
}

当咱们在线程A调用Thread::Current将会获得一个线程A的Thread目标,在线程B调用Thread::Current将会获取一个线程B的Thread目标, 来看看内部完结

// static
Thread* Thread::Current() {
  ThreadManager* manager = ThreadManager::Instance();
  Thread* thread = manager->CurrentThread();
#ifndef NO_MAIN_THREAD_WRAPPING
  // Only autowrap the thread which instantiated the ThreadManager.
  if (!thread && manager->IsMainThread()) {
    thread = new Thread(CreateDefaultSocketServer());
    thread->WrapCurrentWithThreadManager(manager, true);
  }
#endif
  return thread;
}

中心完结都在ThreadManager中,ThreadManager是针对WebRTC Thread供给的一个办理类,里边会存放一切外部创立的Thread

Thread* ThreadManager::CurrentThread() {
  return static_cast<Thread*>(TlsGetValue(key_));
}

ThreadManager::CurrentThread完结很简略,经过TlsGetValue获取了私有变量key_,那这个key_必定有Set操作,没错,这个key_的Set操作,是在Thread的结构函数中进行的 Thraed() -> DoInit() -> ThreadManager::SetCurrentThread -> ThreadManager::SetCurrentThreadInternal

void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
  TlsSetValue(key_, thread);
}

TlsSetValue和TlsGetValue是什么意思? 这儿触及到了一个知识点,也便是TLS

TLS介绍

TLS全称是Thread Local Storage 线程局部变量或许线程私有变量,私有的意思是每个线程都将单独拥有这个变量

  • 在Windows中选用TlsAlloc获取进程中一个未运用的TLS slot index,运用TlsSetValue进行值的设置,TlsGetValue进行值的获取
  • 在linux中选用pthread_key_create、pthread_getspecific、pthread_setspecific对TLS进行操作
  • C++11中选用thread_local

具体链接:

www.notion.so/TLS-78870a0…

www.notion.so/TLS-78870a0…

回归Current函数完结,它便是凭借了TLS技能得以完结在不同线程存储归于自己的私有变量(这个私有变量便是Thread*),然后再对应线程调用Current获取到的Thread*也便是当时线程的了

WebRTC线程Proxy机制

前面有说到,WebRTC对外暴露的API比方PeerConnectionInterface在内部都一层代理机制,来保证每一个API调用在正确的线程,先看PeerConnectiontProxy

file://src/api/peer_connection_proxy.h

BEGIN_PROXY_MAP(PeerConnection)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
PROXY_METHOD1(bool, AddStream, MediaStreamInterface*)
PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*)
PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>,
              AddTrack,
              rtc::scoped_refptr<MediaStreamTrackInterface>,
              const std::vector<std::string>&)
// ......
// This method will be invoked on the network thread. See
// PeerConnectionFactory::CreatePeerConnectionOrError for more details.
PROXY_SECONDARY_METHOD1(rtc::scoped_refptr<DtlsTransportInterface>,
                        LookupDtlsTransportByMid,
                        const std::string&)
// This method will be invoked on the network thread. See
// PeerConnectionFactory::CreatePeerConnectionOrError for more details.
PROXY_SECONDARY_CONSTMETHOD0(rtc::scoped_refptr<SctpTransportInterface>,
                             GetSctpTransport)   

上面的一堆宏,会生成一个PeerConnectionProxyWithInternal类,咱们首要看三个宏 BEGIN_PROXY_MAP、PROXY_METHOD0、PROXY_SECONDARY_METHOD1

BEGIN_PROXY_MAP

#define BEGIN_PROXY_MAP(c)                                                   \
  PROXY_MAP_BOILERPLATE(c)                                                   \
  SECONDARY_PROXY_MAP_BOILERPLATE(c)                                         \
  REFCOUNTED_PROXY_MAP_BOILERPLATE(c)                                        \
 public:                                                                     \
  static rtc::scoped_refptr<c##ProxyWithInternal> Create(                    \
      rtc::Thread* primary_thread, rtc::Thread* secondary_thread,            \
      INTERNAL_CLASS* c) {                                                   \
    return rtc::make_ref_counted<c##ProxyWithInternal>(primary_thread,       \
                                                       secondary_thread, c); \
  }
// Helper macros to reduce code duplication.
#define PROXY_MAP_BOILERPLATE(c)                          \
  template <class INTERNAL_CLASS>                         \
  class c##ProxyWithInternal;                             \
  typedef c##ProxyWithInternal<c##Interface> c##Proxy;    \
  template <class INTERNAL_CLASS>                         \
  class c##ProxyWithInternal : public c##Interface {      \
   protected:                                             \
    typedef c##Interface C;                               \
                                                          \
   public:                                                \
    const INTERNAL_CLASS* internal() const { return c_; } \
    INTERNAL_CLASS* internal() { return c_; }

看要点, 榜首typedef c##ProxyWithInternal<c##Interface> c##Proxy;, 也便是外部运用的类名选用PeerConnectionProxy, c##ProxyWithInternal: public c##Interface,也便是继承自PeerConnectionInterface类,也便是咱们在外部拿到的PeerConnect指针目标,其实是PeerConnectionProxyWithInternal目标, 要点2 , Create函数,这个Create函数会在什么时候调用,而且primary_thread和secondary_thread别离对应着什么线程,看下面代码

RTCErrorOr<rtc::scoped_refptr<PeerConnectionInterface>>
PeerConnectionFactory::CreatePeerConnectionOrError(
    const PeerConnectionInterface::RTCConfiguration& configuration,
    PeerConnectionDependencies dependencies) {
  rtc::scoped_refptr<PeerConnectionInterface> result_proxy =
      PeerConnectionProxy::Create(signaling_thread(), network_thread(),
                                  result.MoveValue());
  return result_proxy;
}

经过上面的代码能够确认,在PeerConnectionProxy类中primary_thread对应的便是signaling_thread,secondary_thread线程便是network_thread线程

PROXY_METHOD0

#define PROXY_METHOD0(r, method)                         \
  r method() override {                                  \
    MethodCall<C, r> call(c_, &C::method);               \
    return call.Marshal(RTC_FROM_HERE, primary_thread_); \
  }

创立MethodCall类,并调用Marshal,留意调用Marshal传入的参数primary_thread_ ,在PeerConnectionProxy中也便是,signaling_thread

PROXY_SECONDARY_METHOD1

#define PROXY_SECONDARY_METHOD1(r, method, t1)                \
  r method(t1 a1) override {                                  \
    MethodCall<C, r, t1> call(c_, &C::method, std::move(a1)); \
    return call.Marshal(RTC_FROM_HERE, secondary_thread_);    \
  }

与PROXY_METHOD不同的是在调用Marshal时传入的是secondary_thread_,在PeerConnectionProxy也便是network_thread

MethodCall

template <typename C, typename R, typename... Args>
class MethodCall : public QueuedTask {
 public:
  typedef R (C::*Method)(Args...);
  MethodCall(C* c, Method m, Args&&... args)
      : c_(c),
        m_(m),
        args_(std::forward_as_tuple(std::forward<Args>(args)...)) {}
  R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
    if (t->IsCurrent()) {
      Invoke(std::index_sequence_for<Args...>());
    } else {
      t->PostTask(std::unique_ptr<QueuedTask>(this));
      event_.Wait(rtc::Event::kForever);
    }
    return r_.moved_result();
  }
 private:
  bool Run() override {
    Invoke(std::index_sequence_for<Args...>());
    event_.Set();
    return false;
  }
  template <size_t... Is>
  void Invoke(std::index_sequence<Is...>) {
    r_.Invoke(c_, m_, std::move(std::get<Is>(args_))...);
  }
  C* c_;
  Method m_;
  ReturnType<R> r_;
  std::tuple<Args&&...> args_;
  rtc::Event event_;
};

首要看Marshal函数,假如是在当时线程直接调用Invoke,否则调用PostTask将使命投递到指定线程,并等候运转完结. 关于std::tuple 的运用能够检查官方文档,上面的代码用到了两个C++14的新特性 std::index_sequence_for和 std::get 来辅助tuple的运用