本文继续对live555源码进行分析。以testOnDemandRTSPServer.cpp为例进行分析。本文只分析传输H264文件的过程。
int main(int argc, char** argv) { // Begin by setting up our usage environment: TaskScheduler* scheduler = BasicTaskScheduler::createNew(); env = BasicUsageEnvironment::createNew(*scheduler); UserAuthenticationDatabase* authDB = NULL; #ifdef ACCESS_CONTROL // To implement client access control to the RTSP server, do the following: authDB = new UserAuthenticationDatabase; authDB->addUserRecord("username1", "password1"); // replace these with real strings // Repeat the above with each <username>, <password> that you wish to allow // access to the server. #endif // Create the RTSP server: RTSPServer* rtspServer = RTSPServer::createNew(*env, 8554, authDB); if (rtspServer == NULL) { *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n"; exit(1); } char const* descriptionString = "Session streamed by \"testOnDemandRTSPServer\""; // A H.264 video elementary stream: { char const* streamName = "h264ESVideoTest"; char const* inputFileName = "test.264"; ServerMediaSession* sms = ServerMediaSession::createNew(*env, streamName, streamName, descriptionString); sms->addSubsession(H264VideoFileServerMediaSubsession ::createNew(*env, inputFileName, reuseFirstSource)); rtspServer->addServerMediaSession(sms); announceStream(rtspServer, sms, streamName, inputFileName); } ... ... env->taskScheduler().doEventLoop(); // does not return
省略掉发送其他格式文件,以及Http相关的代码进行分析。
首先创建了任务调度器和基础运行环境;
然后如果需要认证的话就添加用户名密码;
然后创建一个RTSPServer用来传输建立RTSP通信;
然后创建了一个ServerMediaSession,并在ServerMediaSession中添加了H264VideoFileServerMediaSubsession格式的subsession。
然后就进入了doEventLoop()一直处理任务了。
RTSPServer* RTSPServer::createNew(UsageEnvironment& env, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationSeconds) { int ourSocketIPv4 = setUpOurSocket(env, ourPort, AF_INET); int ourSocketIPv6 = setUpOurSocket(env, ourPort, AF_INET6); if (ourSocketIPv4 < 0 && ourSocketIPv6 < 0) return NULL; return new RTSPServer(env, ourSocketIPv4, ourSocketIPv6, ourPort, authDatabase, reclamationSeconds); }
创建RTSPServer,并根据端口创建了IPv4和IPv6的TCP Socket(后面分析就忽略IPv6了)。
这里有个参数reclamationSeconds,表明回收的时间,也就是说,当在这个超过这个时间还没收到客户端的RTSP或RTCP RR包信息,那么就任务这个客户端失联了,并把这个客户端相关的资源全部回收。
RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocketIPv4, int ourSocketIPv6, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationSeconds) : GenericMediaServer(env, ourSocketIPv4, ourSocketIPv6, ourPort, reclamationSeconds), fHTTPServerSocketIPv4(-1), fHTTPServerSocketIPv6(-1), fHTTPServerPort(0), fClientConnectionsForHTTPTunneling(NULL), // will get created if needed fTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS)), fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)), fRegisterOrDeregisterRequestCounter(0), fAuthDB(authDatabase), fAllowStreamingRTPOverTCP(True) { }
可以看到RTSPServer继承自GenericMediaServer,在构造RTSPServer时也对父类进行构造。
GenericMediaServer ::GenericMediaServer(UsageEnvironment& env, int ourSocketIPv4, int ourSocketIPv6, Port ourPort, unsigned reclamationSeconds) : Medium(env), fServerSocketIPv4(ourSocketIPv4), fServerSocketIPv6(ourSocketIPv6), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds), fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)), fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)), fClientSessions(HashTable::create(STRING_HASH_KEYS)), fPreviousClientSessionId(0) { ignoreSigPipeOnSocket(fServerSocketIPv4); // so that clients on the same host that are killed don't also kill us ignoreSigPipeOnSocket(fServerSocketIPv6); // ditto // Arrange to handle connections from others: env.taskScheduler().turnOnBackgroundReadHandling(fServerSocketIPv4, incomingConnectionHandlerIPv4, this); env.taskScheduler().turnOnBackgroundReadHandling(fServerSocketIPv6, incomingConnectionHandlerIPv6, this); }
在构造GenericMediaServer时,把刚刚创建的TCP socket加入到后台IO读任务中。当有客户端连接时,就会触发incomingConnectionHandlerIPv4函数。
void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket) { struct sockaddr_storage clientAddr; SOCKLEN_T clientAddrLen = sizeof clientAddr; int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen); if (clientSocket < 0) { int err = envir().getErrno(); if (err != EWOULDBLOCK) { envir().setResultErrMsg("accept() failed: "); } return; } ignoreSigPipeOnSocket(clientSocket); // so that clients on the same host that are killed don't also kill us makeSocketNonBlocking(clientSocket); increaseSendBufferTo(envir(), clientSocket, 50*1024); #ifdef DEBUG envir() << "accept()ed connection from " << AddressString(clientAddr).val() << "\n"; #endif // Create a new object for handling this connection: (void)createNewClientConnection(clientSocket, clientAddr); }
在incomingConnectionHandlerOnSocket函数中,首先accept与客户端建立连接。然后创建了一个客户端连接的对象。
createNewClientConnection是一个纯虚函数,由于实际对象的类型是RTSPServer,所以调用RTSPServer的createNewClientConnection。
GenericMediaServer::ClientConnection* RTSPServer::createNewClientConnection(int clientSocket, struct sockaddr_storage const& clientAddr) { return new RTSPClientConnection(*this, clientSocket, clientAddr); } ... ... RTSPServer::RTSPClientConnection ::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_storage const& clientAddr) : GenericMediaServer::ClientConnection(ourServer, clientSocket, clientAddr), fOurRTSPServer(ourServer), fClientInputSocket(fOurSocket), fClientOutputSocket(fOurSocket), fAddressFamily(clientAddr.ss_family), fIsActive(True), fRecursionCount(0), fOurSessionCookie(NULL), fScheduledDelayedTask(0) { resetRequestBuffer(); }
RTSPClientConnection继承自GenericMediaServer::ClientConnection,在构造时也会先构造父类。
GenericMediaServer::ClientConnection ::ClientConnection(GenericMediaServer& ourServer, int clientSocket, struct sockaddr_storage const& clientAddr) : fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr) { // Add ourself to our 'client connections' table: fOurServer.fClientConnections->Add((char const*)this, this); // Arrange to handle incoming requests: resetRequestBuffer(); envir().taskScheduler() .setBackgroundHandling(fOurSocket, SOCKET_READABLE|SOCKET_EXCEPTION, incomingRequestHandler, this); }
构造GenericMediaServer::ClientConnection时,将连接的socket加入到后台IO可读和异常任务队列中。
当有客户端数据到来时,会触发incomingRequestHandler函数。
void GenericMediaServer::ClientConnection::incomingRequestHandler() { struct sockaddr_storage dummy; // 'from' address, meaningless in this case int bytesRead = readSocket(envir(), fOurSocket, &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy); handleRequestBytes(bytesRead); }
incomingRequestHandler函数将数据读取到fRequestBuffer中,并执行handleRequestBytes。
handleRequestBytes也是个虚函数,所以执行的是RTSPClientConnection::handleRequestBytes。
Boolean parseSucceeded = parseRTSPRequestString((char*)fRequestBuffer, fLastCRLF+2 - fRequestBuffer, cmdName, sizeof cmdName, urlPreSuffix, sizeof urlPreSuffix, urlSuffix, sizeof urlSuffix, cseq, sizeof cseq, sessionIdStr, sizeof sessionIdStr, contentLength); ... ... handleCmd_OPTIONS(); ... handleCmd_GET_PARAMETER((char const*)fRequestBuffer); ... handleCmd_SET_PARAMETER((char const*)fRequestBuffer); ... handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); ... clientSession->handleCmd_SETUP(this, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); ... handleCmd_PLAY(ourClientConnection, subsession, fullRequestStr);
在handleRequestBytes中,先是对收到的数据进行解析,解析出本次通信是哪个命令,然后根据命令类型分别进行处理。
因为涉及到客户端,所以只看服务端代码,可能无法分析出RTSP连接的流程。所以我们用WireShark进行抓包分析。并过滤只看RTSP相关的包。
可以看到,
客户端发送
服务端进行回复
客户端发送
服务端回复
客户端发送
服务端回复
客户端发送
服务端回复
知道了这个流程,我们就可以在服务端的源码中按照抓包得到的流程进行分析了。