下面看代码:
webget.cc
// ccm #ifndef SPONGE_LIBSPONGE_BYTE_STREAM_HH #define SPONGE_LIBSPONGE_BYTE_STREAM_HH #include <deque> #include <string> //! \brief An in-order byte stream. //! Bytes are written on the "input" side and read from the "output" //! side. The byte stream is finite: the writer can end the input, //! and then no more bytes can be written. class ByteStream { private: // Your code here -- add private members as necessary. // Hint: This doesn't need to be a sophisticated data structure at // all, but if any of your tests are taking longer than a second, // that's a sign that you probably want to keep exploring // different approaches. size_t _capacity = 0; // define a buffer size std::deque<char> _buffer{};// define buffer for storing data bool _is_eof = false; // define EOF flag size_t _bytes_written = 0; // record the number of bytes written size_t _bytes_read = 0; // record the number of bytes read bool _error{}; //!< Flag indicating that the stream suffered an error. public: //! Construct a stream with room for `capacity` bytes. ByteStream(const size_t capacity); //! \name "Input" interface for the writer //!@{ //! Write a string of bytes into the stream. Write as many //! as will fit, and return how many were written. //! \returns the number of bytes accepted into the stream size_t write(const std::string &data); //! \returns the number of additional bytes that the stream has space for size_t remaining_capacity() const; //! Signal that the byte stream has reached its ending void end_input(); //! Indicate that the stream suffered an error. void set_error() { _error = true; } //!@} //! \name "Output" interface for the reader //!@{ //! Peek at next "len" bytes of the stream //! \returns a string std::string peek_output(const size_t len) const; //! Remove bytes from the buffer void pop_output(const size_t len); //! Read (i.e., copy and then pop) the next "len" bytes of the stream //! \returns a string std::string read(const size_t len); //! \returns `true` if the stream input has ended bool input_ended() const; //! \returns `true` if the stream has suffered an error bool error() const { return _error; } //! \returns the maximum amount that can currently be read from the stream size_t buffer_size() const; //! \returns `true` if the buffer is empty bool buffer_empty() const; //! \returns `true` if the output has reached the ending bool eof() const; //!@} //! \name General accounting //!@{ //! Total number of bytes written size_t bytes_written() const; //! Total number of bytes popped size_t bytes_read() const; //!@} }; #endif // SPONGE_LIBSPONGE_BYTE_STREAM_HH
// ccm #include "byte_stream.hh" // Dummy implementation of a flow-controlled in-memory byte stream. // For Lab 0, please replace with a real implementation that passes the // automated checks run by `make check_lab0`. // You will need to add private members to the class declaration in `byte_stream.hh` template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; ByteStream::ByteStream(const size_t capacity) : _capacity(capacity) {} size_t ByteStream::write(const string &data) { if( remaining_capacity() == 0 || data.size() == 0 ){ return 0; } size_t write_size = min( remaining_capacity() , data.size() ); for ( size_t i = 0 ;i<write_size ; i++ ){ _buffer.push_back( data[i] ); } _bytes_written += write_size; return write_size; } //! \param[in] len bytes will be copied from the output side of the buffer string ByteStream::peek_output(const size_t len) const { size_t peek_len = min( buffer_size(), len ); string a = string().assign( _buffer.begin(),_buffer.begin()+peek_len ); return a; } //! \param[in] len bytes will be removed from the output side of the buffer void ByteStream::pop_output(const size_t len) { size_t pop_len = min( buffer_size(),len ); for (size_t i = 0; i< pop_len ; i++){ _buffer.pop_front(); } _bytes_read += pop_len; } //! Read (i.e., copy and then pop) the next "len" bytes of the stream //! \param[in] len bytes will be popped and returned //! \returns a string std::string ByteStream::read(const size_t len) { string ans = peek_output(len); pop_output(len); return ans; } void ByteStream::end_input() { _is_eof = true; } bool ByteStream::input_ended() const { return _is_eof; } size_t ByteStream::buffer_size() const { return _buffer.size(); } bool ByteStream::buffer_empty() const { return buffer_size() == 0; } bool ByteStream::eof() const { return input_ended() && buffer_empty(); } size_t ByteStream::bytes_written() const { return _bytes_written; } size_t ByteStream::bytes_read() const { return _bytes_read; } size_t ByteStream::remaining_capacity() const { return _capacity - _buffer.size(); }
// ccm #ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH #define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH #include "byte_stream.hh" #include <algorithm> #include <cstdint> #include <iostream> #include <list> #include <set> #include <stdexcept> #include <string> #include <vector> //! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order, //! possibly overlapping) into an in-order byte stream. class StreamReassembler { private: // Your code here -- add private members as necessary. struct block_node { size_t begin = 0; size_t length = 0; std::string data = ""; bool operator<(const block_node t) const { return begin < t.begin; } }; std::set<block_node> _blocks = {}; std::vector<char> _buffer = {}; size_t _unassembled_byte = 0; size_t _head_index = 0; bool _eof_flag = false; ByteStream _output; //!< The reassembled in-order byte stream size_t _capacity; //!< The maximum number of bytes //! merge elm2 to elm1, return merged bytes long merge_block(block_node &elm1, const block_node &elm2); public: //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes. //! \note This capacity limits both the bytes that have been reassembled, //! and those that have not yet been reassembled. StreamReassembler(const size_t capacity); //! \brief Receives a substring and writes any newly contiguous bytes into the stream. //! //! If accepting all the data would overflow the `capacity` of this //! `StreamReassembler`, then only the part of the data that fits will be //! accepted. If the substring is only partially accepted, then the `eof` //! will be disregarded. //! //! \param data the string being added //! \param index the index of the first byte in `data` //! \param eof whether or not this segment ends with the end of the stream void push_substring(const std::string &data, const uint64_t index, const bool eof); //! \name Access the reassembled byte stream //!@{ const ByteStream &stream_out() const { return _output; } ByteStream &stream_out() { return _output; } //!@} //! The number of bytes in the substrings stored but not yet reassembled //! //! \note If the byte at a particular index has been submitted twice, it //! should only be counted once for the purpose of this function. size_t unassembled_bytes() const; //! \brief Is the internal state empty (other than the output stream)? //! \returns `true` if no substrings are waiting to be assembled bool empty() const; size_t head_index() const { return _head_index; } bool input_ended() const { return _output.input_ended(); } }; #endif // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
// ccm #include "stream_reassembler.hh" #include <iostream> // Dummy implementation of a stream reassembler. // For Lab 1, please replace with a real implementation that passes the // automated checks run by `make check_lab1`. // You will need to add private members to the class declaration in `stream_reassembler.hh` template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; StreamReassembler::StreamReassembler(const size_t capacity) : _output(capacity), _capacity(capacity) { _buffer.resize(capacity); } long StreamReassembler::merge_block(block_node &elm1, const block_node &elm2) { block_node x, y; if (elm1.begin > elm2.begin) { x = elm2; y = elm1; } else { x = elm1; y = elm2; } if (x.begin + x.length < y.begin) { // throw runtime_error("StreamReassembler: couldn't merge blocks\n"); return -1; // no intersection, couldn't merge } else if (x.begin + x.length >= y.begin + y.length) { elm1 = x; return y.length; } else { elm1.begin = x.begin; elm1.data = x.data + y.data.substr(x.begin + x.length - y.begin); elm1.length = elm1.data.length(); return x.begin + x.length - y.begin;; // 返回重复部分的长度 } } //! \details This function accepts a substring (aka a segment) of bytes, //! possibly out-of-order, from the logical stream, and assembles any newly //! contiguous substrings and writes them into the output stream in order. void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) { if (index >= _head_index + _capacity) { // capacity over return; } // handle extra substring prefix block_node elm; if (index + data.length() <= _head_index) { // couldn't equal, because there have emtpy substring goto JUDGE_EOF; } else if (index < _head_index) { size_t offset = _head_index - index; elm.data.assign(data.begin() + offset, data.end()); elm.begin = index + offset; elm.length = elm.data.length(); } else { elm.begin = index; elm.length = data.length(); elm.data = data; } _unassembled_byte += elm.length; // merge substring do { // merge next long merged_bytes = 0; auto iter = _blocks.lower_bound(elm); while (iter != _blocks.end() && (merged_bytes = merge_block(elm, *iter)) >= 0) { _unassembled_byte -= merged_bytes; _blocks.erase(iter); iter = _blocks.lower_bound(elm); } // merge prev if (iter == _blocks.begin()) { break; } iter--; while ((merged_bytes = merge_block(elm, *iter)) >= 0) { _unassembled_byte -= merged_bytes; _blocks.erase(iter); iter = _blocks.lower_bound(elm); if (iter == _blocks.begin()) { break; } iter--; break;// 这里只需要执行一次 } } while (false); _blocks.insert(elm); // write to ByteStream if (!_blocks.empty() && _blocks.begin()->begin == _head_index) { const block_node head_block = *_blocks.begin(); // modify _head_index and _unassembled_byte according to successful write to _output size_t write_bytes = _output.write(head_block.data); _head_index += write_bytes; _unassembled_byte -= write_bytes; _blocks.erase(_blocks.begin()); } JUDGE_EOF: if (eof) { _eof_flag = true; } if (_eof_flag && empty()) { _output.end_input(); } } size_t StreamReassembler::unassembled_bytes() const { return _unassembled_byte; } bool StreamReassembler::empty() const { return _unassembled_byte == 0; }
// ccm #include "wrapping_integers.hh" // Dummy implementation of a 32-bit wrapping integer // For Lab 2, please replace with a real implementation that passes the // automated checks run by `make check_lab2`. template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; //! Transform an "absolute" 64-bit sequence number (zero-indexed) into a WrappingInt32 //! \param n The input absolute 64-bit sequence number //! \param isn The initial sequence number WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) { DUMMY_CODE(n, isn); return WrappingInt32(n+isn.raw_value()); } //! Transform a WrappingInt32 into an "absolute" 64-bit sequence number (zero-indexed) //! \param n The relative sequence number //! \param isn The initial sequence number //! \param checkpoint A recent absolute 64-bit sequence number //! \returns the 64-bit sequence number that wraps to `n` and is closest to `checkpoint` //! //! \note Each of the two streams of the TCP connection has its own ISN. One stream //! runs from the local TCPSender to the remote TCPReceiver and has one ISN, //! and the other stream runs from the remote TCPSender to the local TCPReceiver and //! has a different ISN. uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) { uint32_t offset = n - isn; uint64_t ans = ( checkpoint & 0xffffffff00000000 ) + offset; uint64_t a=ans; if (abs(int64_t(a + (1ul << 32) - checkpoint)) < abs(int64_t(a - checkpoint))) ans = a+ (1ul << 32); else if (a >= (1ul << 32) && abs(int64_t(a - (1ul << 32) - checkpoint)) < abs(int64_t(a - checkpoint))) ans = a - (1ul << 32); return ans; }
// ccm #ifndef SPONGE_LIBSPONGE_TCP_RECEIVER_HH #define SPONGE_LIBSPONGE_TCP_RECEIVER_HH #include "byte_stream.hh" #include "stream_reassembler.hh" #include "tcp_segment.hh" #include "wrapping_integers.hh" #include <optional> //! \brief The "receiver" part of a TCP implementation. //! Receives and reassembles segments into a ByteStream, and computes //! the acknowledgment number and window size to advertise back to the //! remote TCPSender. class TCPReceiver { //! Our data structure for re-assembling bytes. StreamReassembler _reassembler; bool _syn_flag = false; bool _fin_flag = false; size_t _isn = 0; //! The maximum number of bytes we'll store. size_t _capacity; public: //! \brief Construct a TCP receiver //! //! \param capacity the maximum number of bytes that the receiver will //! store in its buffers at any give time. TCPReceiver(const size_t capacity) : _reassembler(capacity), _capacity(capacity) {} //! \name Accessors to provide feedback to the remote TCPSender //!@{ //! \brief The ackno that should be sent to the peer //! \returns empty if no SYN has been received //! //! This is the beginning of the receiver's window, or in other words, the sequence number //! of the first byte in the stream that the receiver hasn't received. std::optional<WrappingInt32> ackno() const; //! \brief The window size that should be sent to the peer //! //! Operationally: the capacity minus the number of bytes that the //! TCPReceiver is holding in its byte stream (those that have been //! reassembled, but not consumed). //! //! Formally: the difference between (a) the sequence number of //! the first byte that falls after the window (and will not be //! accepted by the receiver) and (b) the sequence number of the //! beginning of the window (the ackno). size_t window_size() const; //!@} //! \brief number of bytes stored but not yet reassembled size_t unassembled_bytes() const { return _reassembler.unassembled_bytes(); } //! \brief handle an inbound segment //! \returns `true` if any part of the segment was inside the window bool segment_received(const TCPSegment &seg); //! \name "Output" interface for the reader //!@{ ByteStream &stream_out() { return _reassembler.stream_out(); } const ByteStream &stream_out() const { return _reassembler.stream_out(); } //!@} }; #endif // SPONGE_LIBSPONGE_TCP_RECEIVER_HH
// ccm #include "tcp_receiver.hh" // Dummy implementation of a TCP receiver // For Lab 2, please replace with a real implementation that passes the // automated checks run by `make check_lab2`. template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; bool TCPReceiver::segment_received(const TCPSegment &seg) { bool ret = false; static size_t abs_seqno = 0; size_t length; if (seg.header().syn) { if (_syn_flag) { // already get a SYN, refuse other SYN. return false; } _syn_flag = true; ret = true; _isn = seg.header().seqno.raw_value(); abs_seqno = 1; length = seg.length_in_sequence_space() - 1; if (length == 0) { // segment's content only have a SYN flag return true; } } else if (!_syn_flag) { // before get a SYN, refuse any segment return false; } else { // not a SYN segment, compute it's abs_seqno abs_seqno = unwrap(WrappingInt32(seg.header().seqno.raw_value()), WrappingInt32(_isn), abs_seqno); length = seg.length_in_sequence_space(); } size_t base = _reassembler.head_index() + _syn_flag + _reassembler.input_ended(); if (seg.header().fin) { if (_fin_flag) { // already get a FIN, refuse other FIN return false; } _fin_flag = true; ret = true; } // not FIN and not one size's SYN, check border else if (seg.length_in_sequence_space() == 0 && abs_seqno == base) { return true; } else if (abs_seqno >=base + window_size() || abs_seqno + length <= base) { if (!ret) return false; } _reassembler.push_substring(seg.payload().copy(), abs_seqno - 1, seg.header().fin); return true; } optional<WrappingInt32> TCPReceiver::ackno() const { if (_syn_flag){ return WrappingInt32(wrap(_reassembler.head_index() + _syn_flag + _reassembler.input_ended(), WrappingInt32(_isn))); } else return std::nullopt; } size_t TCPReceiver::window_size() const { return _reassembler.stream_out().remaining_capacity(); }
// ccm #ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH #define SPONGE_LIBSPONGE_TCP_SENDER_HH #include "byte_stream.hh" #include "tcp_config.hh" #include "tcp_segment.hh" #include "wrapping_integers.hh" #include <functional> #include <queue> //! \brief The "sender" part of a TCP implementation. //! Accepts a ByteStream, divides it up into segments and sends the //! segments, keeps track of which segments are still in-flight, //! maintains the Retransmission Timer, and retransmits in-flight //! segments if the retransmission timer expires. class TCPSender { private: //! our initial sequence number, the number for our SYN. WrappingInt32 _isn; //! outbound queue of segments that the TCPSender wants sent std::queue<TCPSegment> _segments_out{}; //! retransmission timer for the connection unsigned int _initial_retransmission_timeout; //! outgoing stream of bytes that have not yet been sent ByteStream _stream; //! the (absolute) sequence number for the next byte to be sent uint64_t _next_seqno{0}; std::queue<TCPSegment> _segments_outstanding{}; size_t _bytes_in_flight = 0; size_t _recv_ackno = 0; bool _syn_flag = false; bool _fin_flag = false; size_t _window_size = 0; size_t _timer = 0; bool _timer_running = false; size_t _retransmission_timeout = 0; size_t _consecutive_retransmission = 0; void send_segment(TCPSegment &seg); public: //! Initialize a TCPSender TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY, const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT, const std::optional<WrappingInt32> fixed_isn = {}); //! \name "Input" interface for the writer //!@{ ByteStream &stream_in() { return _stream; } const ByteStream &stream_in() const { return _stream; } //!@} //! \name Methods that can cause the TCPSender to send a segment //!@{ //! \brief A new acknowledgment was received bool ack_received(const WrappingInt32 ackno, const uint16_t window_size); //! \brief Generate an empty-payload segment (useful for creating empty ACK segments) void send_empty_segment(); //! \brief create and send segments to fill as much of the window as possible void fill_window(bool send_syn = true); //! \brief Notifies the TCPSender of the passage of time void tick(const size_t ms_since_last_tick); //!@} //! \name Accessors //!@{ //! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged? //! \note count is in "sequence space," i.e. SYN and FIN each count for one byte //! (see TCPSegment::length_in_sequence_space()) size_t bytes_in_flight() const; //! \brief Number of consecutive retransmissions that have occurred in a row unsigned int consecutive_retransmissions() const; //! \brief TCPSegments that the TCPSender has enqueued for transmission. //! \note These must be dequeued and sent by the TCPConnection, //! which will need to fill in the fields that are set by the TCPReceiver //! (ackno and window size) before sending. std::queue<TCPSegment> &segments_out() { return _segments_out; } //!@} //! \name What is the next sequence number? (used for testing) //!@{ //! \brief absolute seqno for the next byte to be sent uint64_t next_seqno_absolute() const { return _next_seqno; } //! \brief relative seqno for the next byte to be sent WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); } //!@} }; #endif // SPONGE_LIBSPONGE_TCP_SENDER_HH
// ccm #include "tcp_sender.hh" #include "tcp_config.hh" #include "wrapping_integers.hh" #include <algorithm> #include <random> #include <string> // Dummy implementation of a TCP sender // For Lab 3, please replace with a real implementation that passes the // automated checks run by `make check_lab3`. template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; //! \param[in] capacity the capacity of the outgoing byte stream //! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment //! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN) TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn) : _isn(fixed_isn.value_or(WrappingInt32{random_device()()})) , _initial_retransmission_timeout{retx_timeout} , _stream(capacity) , _retransmission_timeout(retx_timeout) {} uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; } void TCPSender::fill_window(bool send_syn) { // sent a SYN before sent other segment if (!_syn_flag) { if (send_syn) { TCPSegment seg; seg.header().syn = true; send_segment(seg); _syn_flag = true; } return; } if( _fin_flag ){ return; } TCPSegment seg; // 到这里 已syn // take window_size as 1 when it equal 0 这里 在收到ack 并把报告窗口大小之前 为1 见讲义 size_t win_size = _window_size > 0 ? _window_size : 1; // size_t win = _window_size > 0 ? _window_size : 1;// syn 占用一个字节 size_t remain; // window's free space 窗口剩余空间 // when window isn't full and never sent FIN while ((remain = win_size - (_next_seqno - _recv_ackno)) != 0 ) {// _next_seqno - _recv_ackno ->_next_seqno 为下一个序列号 或者说下一个窗口的起始值。 表达式最大 等于窗口大小 即remain=0的情况 size_t size = min(TCPConfig::MAX_PAYLOAD_SIZE, remain);// payload 必须小于最大长度 ,即使窗口够大 // FIN stream is empty string str = _stream.read(size); seg.payload() = Buffer(std::move(str));// move 相当于 剪切 if (seg.length_in_sequence_space() < win_size && _stream.eof()) { seg.header().fin = true; _fin_flag = true; send_segment(seg); return; } if (seg.length_in_sequence_space() == 0) { return; } send_segment(seg); } } //! \param ackno The remote receiver's ackno (acknowledgment number) //! \param window_size The remote receiver's advertised window size //! \returns `false` if the ackno appears invalid (acknowledges something the TCPSender hasn't sent yet) bool TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) { size_t abs_ackno = unwrap( ackno , _isn , _recv_ackno ); // out of window, invalid ackno if(abs_ackno > _next_seqno){ return false;// 接收到的ackno 超过了将要发送的下一个sequence 应该是出现了错误 不可以接收 该数据不可信 } _window_size = window_size;// 接收方 发来 接收窗口大小和ackno 更新 窗口大小 // ack has been received if(abs_ackno <= _recv_ackno ){ return true;// 该arc 已被接收过了 但是不可拒绝 } // if ackno is legal, modify _window_size before return _recv_ackno = abs_ackno;// 相当于 更新checkpoint // pop all elment before ackno 弹出 已确认数据 已确认就不用保存副本了 while( !_segments_outstanding.empty() ){ TCPSegment seg = _segments_outstanding.front(); if( unwrap( seg.header().seqno , _isn ,_next_seqno ) + seg.length_in_sequence_space() <= abs_ackno ){// 试了下 这里用_next_seqno作为checkpoint 也是可以的 但是还是建议用_recv_ackno 除非窗口非常非常大 效果是一样的 _bytes_in_flight -= seg.length_in_sequence_space();// 更新 发送但为确认计数 _segments_outstanding.pop(); }else{ break; } } fill_window();// 窗口大小已更新 填充窗口 _retransmission_timeout = _initial_retransmission_timeout; _consecutive_retransmission = 0; // if have other outstanding segment, restart timer if( !_segments_outstanding.empty() ){ _timer_running = true; _timer = 0; } return true; } //! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method void TCPSender::tick(const size_t ms_since_last_tick) { _timer += ms_since_last_tick; if (_segments_outstanding.empty()) { _timer_running = false; } if (_timer >= _retransmission_timeout && !_segments_outstanding.empty()) { _segments_out.push(_segments_outstanding.front()); _consecutive_retransmission++; _retransmission_timeout *= 2; _timer_running = true; _timer = 0; } } unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmission; } void TCPSender::send_empty_segment() { // empty segment doesn't need store to outstanding queue TCPSegment seg; seg.header().seqno = wrap(_next_seqno, _isn); _segments_out.push(seg); } void TCPSender::send_segment(TCPSegment &seg) { seg.header().seqno = wrap(_next_seqno, _isn); _next_seqno += seg.length_in_sequence_space();// 这里注意顺序不可以反了 _bytes_in_flight += seg.length_in_sequence_space(); _segments_outstanding.push(seg); _segments_out.push(seg); if (!_timer_running) { // start timer _timer_running = true; _timer = 0; } }
#ifndef SPONGE_LIBSPONGE_TCP_FACTORED_HH #define SPONGE_LIBSPONGE_TCP_FACTORED_HH #include "tcp_config.hh" #include "tcp_receiver.hh" #include "tcp_sender.hh" #include "tcp_state.hh" //! \brief A complete endpoint of a TCP connection class TCPConnection { private: TCPConfig _cfg; TCPReceiver _receiver{_cfg.recv_capacity}; TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn}; //! outbound queue of segments that the TCPConnection wants sent std::queue<TCPSegment> _segments_out{}; //! Should the TCPConnection stay active (and keep ACKing) //! for 10 * _cfg.rt_timeout milliseconds after both streams have ended, //! in case the remote TCPConnection doesn't know we've received its whole stream? bool _linger_after_streams_finish{true}; size_t _time_since_last_segment_received = 0; bool _active = true; bool _need_send_rst = false; bool _ack_for_fin_sent = false; bool push_segments_out(bool send_syn = false); void unclean_shutdown(bool send_rst); bool clean_shutdown(); bool in_listen(); bool in_syn_recv(); bool in_syn_sent(); public: //! \name "Input" interface for the writer //!@{ //! \brief Initiate a connection by sending a SYN segment void connect(); //! \brief Write data to the outbound byte stream, and send it over TCP if possible //! \returns the number of bytes from `data` that were actually written. size_t write(const std::string &data); //! \returns the number of `bytes` that can be written right now. size_t remaining_outbound_capacity() const; //! \brief Shut down the outbound byte stream (still allows reading incoming data) void end_input_stream(); //!@} //! \name "Output" interface for the reader //!@{ //! \brief The inbound byte stream received from the peer ByteStream &inbound_stream() { return _receiver.stream_out(); } //!@} //! \name Accessors used for testing //!@{ //! \brief number of bytes sent and not yet acknowledged, counting SYN/FIN each as one byte size_t bytes_in_flight() const; //! \brief number of bytes not yet reassembled size_t unassembled_bytes() const; //! \brief Number of milliseconds since the last segment was received size_t time_since_last_segment_received() const; //!< \brief summarize the state of the sender, receiver, and the connection TCPState state() const { return {_sender, _receiver, active(), _linger_after_streams_finish}; }; //!@} //! \name Methods for the owner or operating system to call //!@{ //! Called when a new segment has been received from the network void segment_received(const TCPSegment &seg); //! Called periodically when time elapses void tick(const size_t ms_since_last_tick); //! \brief TCPSegments that the TCPConnection has enqueued for transmission. //! \note The owner or operating system will dequeue these and //! put each one into the payload of a lower-layer datagram (usually Internet datagrams (IP), //! but could also be user datagrams (UDP) or any other kind). std::queue<TCPSegment> &segments_out() { return _segments_out; } //! \brief Is the connection still alive in any way? //! \returns `true` if either stream is still running or if the TCPConnection is lingering //! after both streams have finished (e.g. to ACK retransmissions from the peer) bool active() const; //!@} //! Construct a new connection from a configuration explicit TCPConnection(const TCPConfig &cfg) : _cfg{cfg} {} //! \name construction and destruction //! moving is allowed; copying is disallowed; default construction not possible //!@{ ~TCPConnection(); //!< destructor sends a RST if the connection is still open TCPConnection() = delete; TCPConnection(TCPConnection &&other) = default; TCPConnection &operator=(TCPConnection &&other) = default; TCPConnection(const TCPConnection &other) = delete; TCPConnection &operator=(const TCPConnection &other) = delete; //!@} }; #endif // SPONGE_LIBSPONGE_TCP_FACTORED_HH
#include "tcp_connection.hh" #include <iostream> // Dummy implementation of a TCP connection // For Lab 4, please replace with a real implementation that passes the // automated checks run by `make check`. template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); } size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); } size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); } size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; } void TCPConnection::segment_received(const TCPSegment &seg) { if (!_active)// 连接未启动 return; _time_since_last_segment_received = 0;// 重置时间 // data segments with acceptable ACKs should be ignored in SYN_SENT if (in_syn_sent() && seg.header().ack && seg.payload().size() > 0) {// 对应 receive 收到syn 会回复syn return; } bool send_empty = false; if (_sender.next_seqno_absolute() > 0 && seg.header().ack) {// sender 可接受 且为ack // unacceptable ACKs should produced a segment that existed if (!_sender.ack_received(seg.header().ackno, seg.header().win)) {// 同步信号 send_empty = true; } } bool recv_flag = _receiver.segment_received(seg); if (!recv_flag) {// 数据 send_empty = true; } if (seg.header().syn && _sender.next_seqno_absolute() == 0) {// 连接报文 connect(); return; } if (seg.header().rst) {// 重置报文 // RST segments without ACKs should be ignored in SYN_SENT if (in_syn_sent() && !seg.header().ack) { return; } unclean_shutdown(false); return; } if (seg.length_in_sequence_space() > 0) { send_empty = true; } if (send_empty) { if (_receiver.ackno().has_value() && _sender.segments_out().empty()) { _sender.send_empty_segment(); } } push_segments_out(); } bool TCPConnection::active() const { return _active; } size_t TCPConnection::write(const string &data) { size_t ret = _sender.stream_in().write(data);// 写入缓存 push_segments_out();// 封装成帧 return ret; } //! \param[in] ms_since_last_tick number of milliseconds since the last call to this method void TCPConnection::tick(const size_t ms_since_last_tick) { if (!_active) return; _time_since_last_segment_received += ms_since_last_tick; _sender.tick(ms_since_last_tick);// 同步时间 参见上一次实验讲义 if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) { unclean_shutdown(true); } push_segments_out(); } void TCPConnection::end_input_stream() { _sender.stream_in().end_input(); push_segments_out(); } void TCPConnection::connect() {// 连接 // when connect, must active send a SYN push_segments_out(true);// 参数为 true 会在 push--- -> fillwindow中 发送syn } TCPConnection::~TCPConnection() { try { if (active()) { // Your code here: need to send a RST segment to the peer cerr << "Warning: Unclean shutdown of TCPConnection\n"; unclean_shutdown(true);// 关于处理 非正常关闭的函数 例如 设置 rst } } catch (const exception &e) { std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl; } } bool TCPConnection::push_segments_out(bool send_syn) { // default not send syn before recv a SYN _sender.fill_window( send_syn || in_syn_recv());// send_syn 或者 in_syn_recv 要回应 TCPSegment seg; while (!_sender.segments_out().empty()) { seg = _sender.segments_out().front(); _sender.segments_out().pop(); if (_receiver.ackno().has_value()) {// 可以发送数据了 seg.header().ack = true; seg.header().ackno = _receiver.ackno().value(); seg.header().win = _receiver.window_size();// 携带同步信息 } if (_need_send_rst) {// 重置连接 最重要 _need_send_rst = false; seg.header().rst = true; } _segments_out.push(seg); } clean_shutdown(); return true; } void TCPConnection::unclean_shutdown(bool send_rst) {// 非正常关闭 同时 需要重置连接 rst _receiver.stream_out().set_error(); _sender.stream_in().set_error(); _active = false; if (send_rst) { _need_send_rst = true; if (_sender.segments_out().empty()) { _sender.send_empty_segment(); } push_segments_out(); } } bool TCPConnection::clean_shutdown() { if (_receiver.stream_out().input_ended() && !(_sender.stream_in().eof())) {// 工作未完全结束 具体见讲义 prereq 1-2 _linger_after_streams_finish = false; } if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended()) { if (!_linger_after_streams_finish || time_since_last_segment_received() >= 10 * _cfg.rt_timeout) {// prereq1-3 or timeout _active = false; } } return !_active; } bool TCPConnection :: in_listen(){ return !_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0;// 前一个对应 接受端未收到 syn 即处于 listen 后一个对应发送端 处于listen 未发送 syn } bool TCPConnection :: in_syn_recv(){ // 服务端 收到 syn 有了 seq 同时 字节流打开 return _receiver.ackno().has_value() && !_receiver.stream_out().input_ended(); } bool TCPConnection :: in_syn_sent(){ // 请求服务端 发送syn syn 占用一个序号 其余未发送任何数据 return _sender.next_seqno_absolute() == 1 && _sender.bytes_in_flight() ==1; }
#ifndef SPONGE_LIBSPONGE_NETWORK_INTERFACE_HH #define SPONGE_LIBSPONGE_NETWORK_INTERFACE_HH #include "ethernet_frame.hh" #include "tcp_over_ip.hh" #include "tun.hh" #include <optional> #include <queue> #include <map> //! \brief A "network interface" that connects IP (the internet layer, or network layer) //! with Ethernet (the network access layer, or link layer). //! This module is the lowest layer of a TCP/IP stack //! (connecting IP with the lower-layer network protocol, //! e.g. Ethernet). But the same module is also used repeatedly //! as part of a router: a router generally has many network //! interfaces, and the router's job is to route Internet datagrams //! between the different interfaces. //! The network interface translates datagrams (coming from the //! "customer," e.g. a TCP/IP stack or router) into Ethernet //! frames. To fill in the Ethernet destination address, it looks up //! the Ethernet address of the next IP hop of each datagram, making //! requests with the [Address Resolution Protocol](\ref rfc::rfc826). //! In the opposite direction, the network interface accepts Ethernet //! frames, checks if they are intended for it, and if so, processes //! the the payload depending on its type. If it's an IPv4 datagram, //! the network interface passes it up the stack. If it's an ARP //! request or reply, the network interface processes the frame //! and learns or replies as necessary. class NetworkInterface { private: //! Ethernet (known as hardware, network-access-layer, or link-layer) address of the interface EthernetAddress _ethernet_address; //! IP (known as internet-layer or network-layer) address of the interface Address _ip_address; //! outbound queue of Ethernet frames that the NetworkInterface wants sent std::queue<EthernetFrame> _frames_out{}; struct arp_item{ EthernetAddress mac; size_t ttl; }; struct waiting_frame{ EthernetFrame frame; uint32_t ip; }; std::map<uint32_t, arp_item> _table{}; std::queue<waiting_frame> _frames_waiting{}; std::queue<uint32_t> _pending_arp{}; bool _pending_flag=false; size_t _pending_timer=0; size_t _timer=0; bool _ethernet_address_equal(EthernetAddress addr1,EthernetAddress addr2); void _retransmission_arp_frame(); public: //! \brief Construct a network interface with given Ethernet (network-access-layer) and IP (internet-layer) addresses NetworkInterface(const EthernetAddress ðernet_address, const Address &ip_address); //! \brief Access queue of Ethernet frames awaiting transmission std::queue<EthernetFrame> &frames_out() { return _frames_out; } //! \brief Sends an IPv4 datagram, encapsulated in an Ethernet frame (if it knows the Ethernet destination address). //! Will need to use [ARP](\ref rfc::rfc826) to look up the Ethernet destination address for the next hop //! ("Sending" is accomplished by pushing the frame onto the frames_out queue.) void send_datagram(const InternetDatagram &dgram, const Address &next_hop); //! \brief Receives an Ethernet frame and responds appropriately. //! If type is IPv4, returns the datagram. //! If type is ARP request, learn a mapping from the "sender" fields, and send an ARP reply. //! If type is ARP reply, learn a mapping from the "target" fields. std::optional<InternetDatagram> recv_frame(const EthernetFrame &frame); //! \brief Called periodically when time elapses void tick(const size_t ms_since_last_tick); }; #endif // SPONGE_LIBSPONGE_NETWORK_INTERFACE_HH
#include "network_interface.hh" #include "arp_message.hh" #include "ethernet_frame.hh" #include <iostream> // Dummy implementation of a network interface // Translates from {IP datagram, next hop address} to link-layer frame, and from link-layer frame to IP datagram // For Lab 5, please replace with a real implementation that passes the // automated checks run by `make check_lab5`. // You will need to add private members to the class declaration in `network_interface.hh` template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; //! \param[in] ethernet_address Ethernet (what ARP calls "hardware") address of the interface //! \param[in] ip_address IP (what ARP calls "protocol") address of the interface NetworkInterface::NetworkInterface(const EthernetAddress ðernet_address, const Address &ip_address) : _ethernet_address(ethernet_address), _ip_address(ip_address) { cerr << "DEBUG: Network interface has Ethernet address " << to_string(_ethernet_address) << " and IP address " << ip_address.ip() << "\n"; } //! \param[in] dgram the IPv4 datagram to be sent //! \param[in] next_hop the IP address of the interface to send it to (typically a router or default gateway, but may also be another host if directly connected to the same network as the destination) //! (Note: the Address type can be converted to a uint32_t (raw 32-bit IP address) with the Address::ipv4_numeric() method.) void NetworkInterface::send_datagram(const InternetDatagram &dgram, const Address &next_hop) { // convert IP address of next hop to raw 32-bit representation (used in ARP header) const uint32_t next_hop_ip = next_hop.ipv4_numeric(); EthernetFrame frame; frame.header().type = EthernetHeader::TYPE_IPv4;// ip数据包 frame.header().src = _ethernet_address; frame.payload() = move(dgram.serialize());// 负载为ip数据报 if (_table.count(next_hop_ip) && _timer <= _table[next_hop_ip].ttl) {// 表中有 frame.header().dst = _table[next_hop_ip].mac;// 查找并填充 _frames_out.push(frame); } else { _pending_arp.push(next_hop_ip);// 查询 _retransmission_arp_frame();// 发送 arp请求 _frames_waiting.push({frame, next_hop_ip});// 等待查询结果 } } //! \param[in] frame the incoming Ethernet frame optional<InternetDatagram> NetworkInterface::recv_frame(const EthernetFrame &frame) { if (!_ethernet_address_equal(frame.header().dst, ETHERNET_BROADCAST) && !_ethernet_address_equal(frame.header().dst, _ethernet_address)) { return nullopt;// 不是发给我的 不接收 非广播且目标地址不是我 } else if (frame.header().type == EthernetHeader::TYPE_IPv4) {// ipv4 InternetDatagram dgram; if (dgram.parse(frame.payload()) == ParseResult::NoError) { return dgram;// 无错误 } else { return nullopt; } } else if (frame.header().type == EthernetHeader::TYPE_ARP) {// arp ARPMessage msg; if (msg.parse(frame.payload()) == ParseResult::NoError) { uint32_t ip = msg.sender_ip_address; _table[ip].mac = msg.sender_ethernet_address;// 存储接收到的 映射信息 _table[ip].ttl = _timer + 30 * 1000; // active mappings last 30 seconds if (msg.opcode == ARPMessage::OPCODE_REQUEST && msg.target_ip_address == _ip_address.ipv4_numeric()) {// 请求我的地址信息 ARPMessage reply; reply.opcode = ARPMessage::OPCODE_REPLY; reply.sender_ethernet_address = _ethernet_address; reply.sender_ip_address = _ip_address.ipv4_numeric(); reply.target_ethernet_address = msg.sender_ethernet_address; reply.target_ip_address = msg.sender_ip_address; EthernetFrame arp_frame; arp_frame.header().type = EthernetHeader::TYPE_ARP; arp_frame.header().src = _ethernet_address; arp_frame.header().dst = msg.sender_ethernet_address; arp_frame.payload() = move(reply.serialize()); _frames_out.push(arp_frame); } while (!_pending_arp.empty()){// uint32_t t_ip=_pending_arp.front(); if(_table.count(t_ip) && _timer <= _table[t_ip].ttl){ _pending_arp.pop();// 弹出多余的等待arp请求 _pending_flag=false;// 更新等待标志 队头完成请求 更新挂起标志, 处理下一个 } else { break; } } while (!_frames_waiting.empty()) {// 根据更新的缓存 检查可匹配的等待帧 waiting_frame node = _frames_waiting.front(); if (_table.count(node.ip) && _timer <= _table[node.ip].ttl) { node.frame.header().dst = _table[node.ip].mac; _frames_waiting.pop(); _frames_out.push(move(node.frame)); } else { break; } } } else { return nullopt; } } return nullopt; } //! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method void NetworkInterface::tick(const size_t ms_since_last_tick) { _timer += ms_since_last_tick;// 更新时间 _retransmission_arp_frame(); } bool NetworkInterface::_ethernet_address_equal(EthernetAddress addr1, EthernetAddress addr2) { for (int i = 0; i < 6; i++) { if (addr1[i] != addr2[i]) { return false; } } return true; } void NetworkInterface::_retransmission_arp_frame(){ if(!_pending_arp.empty()){ // pending mappings last five seconds if (!_pending_flag || (_pending_flag && _timer - _pending_timer >= 5000)) {// 非挂起 或 等待超时 uint32_t ip=_pending_arp.front(); ARPMessage msg; msg.opcode = ARPMessage::OPCODE_REQUEST; msg.sender_ethernet_address = _ethernet_address; msg.sender_ip_address = _ip_address.ipv4_numeric(); msg.target_ethernet_address = {0, 0, 0, 0, 0, 0}; msg.target_ip_address = ip; EthernetFrame arp_frame; arp_frame.header().type = EthernetHeader::TYPE_ARP; arp_frame.header().src = _ethernet_address; arp_frame.header().dst = ETHERNET_BROADCAST; arp_frame.payload() = move(msg.serialize()); _frames_out.push(arp_frame); _pending_flag = true; _pending_timer = _timer; } } }
#ifndef SPONGE_LIBSPONGE_ROUTER_HH #define SPONGE_LIBSPONGE_ROUTER_HH #include "network_interface.hh" #include <optional> #include <queue> //! \brief A wrapper for NetworkInterface that makes the host-side //! interface asynchronous: instead of returning received datagrams //! immediately (from the `recv_frame` method), it stores them for //! later retrieval. Otherwise, behaves identically to the underlying //! implementation of NetworkInterface. class AsyncNetworkInterface : public NetworkInterface { std::queue<InternetDatagram> _datagrams_out{}; public: using NetworkInterface::NetworkInterface; //! Construct from a NetworkInterface AsyncNetworkInterface(NetworkInterface &&interface) : NetworkInterface(interface) {} //! \brief Receives and Ethernet frame and responds appropriately. //! - If type is IPv4, pushes to the `datagrams_out` queue for later retrieval by the owner. //! - If type is ARP request, learn a mapping from the "sender" fields, and send an ARP reply. //! - If type is ARP reply, learn a mapping from the "target" fields. //! //! \param[in] frame the incoming Ethernet frame void recv_frame(const EthernetFrame &frame) { auto optional_dgram = NetworkInterface::recv_frame(frame); if (optional_dgram.has_value()) { _datagrams_out.push(std::move(optional_dgram.value())); } }; //! Access queue of Internet datagrams that have been received std::queue<InternetDatagram> &datagrams_out() { return _datagrams_out; } }; //! \brief A router that has multiple network interfaces and //! performs longest-prefix-match routing between them. class Router { //! The router's collection of network interfaces std::vector<AsyncNetworkInterface> _interfaces{}; //! Send a single datagram from the appropriate outbound interface to the next hop, //! as specified by the route with the longest prefix_length that matches the //! datagram's destination address. void route_one_datagram(InternetDatagram &dgram); struct RouteItem {// 路由表项 uint32_t route_prefix = 0; uint8_t prefix_length = 0; std::optional<Address> next_hop = std::nullopt; size_t interface_num = 0; }; std::vector<RouteItem> _route_list{}; bool prefix_equal(uint32_t ip1, uint32_t ip2, uint8_t len); public: //! Add an interface to the router //! \param[in] interface an already-constructed network interface //! \returns The index of the interface after it has been added to the router size_t add_interface(AsyncNetworkInterface &&interface) { _interfaces.push_back(std::move(interface)); return _interfaces.size() - 1; } //! Access an interface by index AsyncNetworkInterface &interface(const size_t N) { return _interfaces.at(N); } //! Add a route (a forwarding rule) void add_route(const uint32_t route_prefix, const uint8_t prefix_length, const std::optional<Address> next_hop, const size_t interface_num); //! Route packets between the interfaces void route(); }; #endif // SPONGE_LIBSPONGE_ROUTER_HH
#include "router.hh" #include <iostream> using namespace std; // Dummy implementation of an IP router // Given an incoming Internet datagram, the router decides // (1) which interface to send it out on, and // (2) what next hop address to send it to. // For Lab 6, please replace with a real implementation that passes the // automated checks run by `make check_lab6`. // You will need to add private members to the class declaration in `router.hh` template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} //! \param[in] route_prefix The "up-to-32-bit" IPv4 address prefix to match the datagram's destination address against //! \param[in] prefix_length For this route to be applicable, how many high-order (most-significant) bits of the route_prefix will need to match the corresponding bits of the datagram's destination address? //! \param[in] next_hop The IP address of the next hop. Will be empty if the network is directly attached to the router (in which case, the next hop address should be the datagram's final destination). //! \param[in] interface_num The index of the interface to send the datagram out on. void Router::add_route(const uint32_t route_prefix, const uint8_t prefix_length, const optional<Address> next_hop, const size_t interface_num) { _route_list.push_back(RouteItem{route_prefix, prefix_length, next_hop, interface_num}); } //! \param[in] dgram The datagram to be routed void Router::route_one_datagram(InternetDatagram &dgram) { bool route_found = false; RouteItem item; uint32_t dst_ip = dgram.header().dst;// 取出目标地址 for (size_t i = 0; i < _route_list.size(); i++) { if (prefix_equal(dst_ip, _route_list[i].route_prefix, _route_list[i].prefix_length)) {// 路由匹配 if (!route_found || item.prefix_length < _route_list[i].prefix_length) {// 未匹配 或匹配到更长的 item = _route_list[i]; route_found = true; } } } if (!route_found) {// 未匹配 return; } if (dgram.header().ttl <= 1) {// 不可路由 防止成环 return; } dgram.header().ttl--; if (item.next_hop.has_value()) { _interfaces[item.interface_num].send_datagram(dgram, item.next_hop.value()); } else { // if not have next_hop, the next_hop is dgram's destination hop 抵达目的地 _interfaces[item.interface_num].send_datagram(dgram, Address::from_ipv4_numeric(dgram.header().dst)); } } void Router::route() { // Go through all the interfaces, and route every incoming datagram to its proper outgoing interface. for (auto &interface : _interfaces) { auto &queue = interface.datagrams_out(); while (not queue.empty()) { route_one_datagram(queue.front()); queue.pop(); }// 路由所有 数据报 } } bool Router::prefix_equal(uint32_t ip1, uint32_t ip2, uint8_t len) { // special judge right when shift 32 bit uint32_t offset = (len == 0) ? 0 : 0xffffffff << (32 - len); printf("ip cmp: %x %x, offset: %x\n", ip1 & offset, ip2 & offset, offset); return (ip1 & offset) == (ip2 & offset); }