看流程之前先看理论
pacer理论
RTPSenderVideo::LogAndSendToNetwork RTPSender::EnqueuePackets PacedSender::EnqueuePackets PacingController::SetPacingRates PacingController::EnqueuePacketInternal RoundRobinPacketQueue::Push //放入队列
void PacedSender::EnqueuePackets( std::vector<std::unique_ptr<RtpPacketToSend>> packets) { { ...... //视频帧会拆分成多个rtp包 for (auto& packet : packets) { RTC_DCHECK_GE(packet->capture_time_ms(), 0); pacing_controller_.EnqueuePacket(std::move(packet)); } } MaybeWakupProcessThread(); }
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) { RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) << "SetPacingRate must be called before InsertPacket."; RTC_CHECK(packet->packet_type()); // Get priority first and store in temporary, to avoid chance of object being // moved before GetPriorityForType() being called. //优先级 const int priority = GetPriorityForType(*packet->packet_type()); EnqueuePacketInternal(std::move(packet), priority); }
priority 数字越小优先级越高
int GetPriorityForType(RtpPacketMediaType type) { // Lower number takes priority over higher. switch (type) { case RtpPacketMediaType::kAudio: // Audio is always prioritized over other packet types. return kFirstPriority + 1; case RtpPacketMediaType::kRetransmission: // Send retransmissions before new media. return kFirstPriority + 2; case RtpPacketMediaType::kVideo: case RtpPacketMediaType::kForwardErrorCorrection: // Video has "normal" priority, in the old speak. // Send redundancy concurrently to video. If it is delayed it might have a // lower chance of being useful. return kFirstPriority + 3; case RtpPacketMediaType::kPadding: // Packets that are in themselves likely useless, only sent to keep the // BWE high. return kFirstPriority + 4; } RTC_CHECK_NOTREACHED(); }
发送顺序
void PacingController::EnqueuePacketInternal( std::unique_ptr<RtpPacketToSend> packet, int priority) { //统计发送带宽 prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size())); Timestamp now = CurrentTime(); if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() && NextSendTime() <= now) { TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); UpdateBudgetWithElapsedTime(elapsed_time); } //放入队列 packet_queue_.Push(priority, now, packet_counter_++, std::move(packet)); }
void RoundRobinPacketQueue::Push(QueuedPacket packet) { auto stream_info_it = streams_.find(packet.Ssrc()); if (stream_info_it == streams_.end()) { stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first; stream_info_it->second.priority_it = stream_priorities_.end(); stream_info_it->second.ssrc = packet.Ssrc(); } Stream* stream = &stream_info_it->second; if (stream->priority_it == stream_priorities_.end()) { // If the SSRC is not currently scheduled, add it to `stream_priorities_`. RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); stream->priority_it = stream_priorities_.emplace( StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc()); } else if (packet.Priority() < stream->priority_it->first.priority) { // If the priority of this SSRC increased, remove the outdated StreamPrioKey // and insert a new one with the new priority. Note that `priority_` uses // lower ordinal for higher priority. stream_priorities_.erase(stream->priority_it); stream->priority_it = stream_priorities_.emplace( StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc()); } RTC_CHECK(stream->priority_it != stream_priorities_.end()); if (packet.EnqueueTimeIterator() == enqueue_times_.end()) { // Promotion from single-packet queue. Just add to enqueue times. packet.UpdateEnqueueTimeIterator( enqueue_times_.insert(packet.EnqueueTime())); } else { // In order to figure out how much time a packet has spent in the queue // while not in a paused state, we subtract the total amount of time the // queue has been paused so far, and when the packet is popped we subtract // the total amount of time the queue has been paused at that moment. This // way we subtract the total amount of time the packet has spent in the // queue while in a paused state. UpdateQueueTime(packet.EnqueueTime()); packet.SubtractPauseTime(pause_time_sum_); size_packets_ += 1; size_ += PacketSize(packet); } stream->packet_queue.push(packet); }
PacedSender::Process() PacingController::ProcessPackets() PacingController::GetPendingPacket() PacketRouter::SendPacket() ModuleRtpRtcpImpl2::TrySendPacket() RtpSenderEgress::SendPacket() RtpSenderEgress::SendPacketToNetwork() LayerFilteringTransport::SendRtp()