本实验基于搭建好的开发环境和硬件环境,通过编写简单的通信实验,验证开发环境,掌握多媒体开发编程基础,包括SOCKET编程、多线程编程和线程同步知识。
基于套接字、多线程、同步锁机制实现多媒体文件的收发;
发送端Ubuntu的PC机读取文件,每1024个字节组成一个包通过TCP报文发送到接收端;接收SE5上启动2个线程,线程1接收报文并将报文存入缓存;线程2通过缓存读取报文存入文件中;要求线程1和线程2之间通过同步锁进行线程同步。
开发主机:Ubuntu 22.04 LTS
硬件:算能SE5
本地如果有SE5硬件,则可以PC机作为客户端,SE5作为服务器端。本地如果没有SE5硬件,只有云空间,则可以直接将客户端和服务器端都通过云空间实现,机在云空间的SE5模拟环境中实现。
开发主机 + 云平台(或SE5硬件)
硬件部署环境如下图所示:

如上图所示,可以利用PC作为客户端,SE5作为服务器端,将PC机的文件传送至SE5中。如果是云平台开发,可以直接将客户端和服务器端都放在云平台的模拟器中。此时,即在一台机器内既实现客户端也实现服务器端,设置服务器端的通信地址为回环地址(127.0.0.1)。
客户端程序采用TCP协议进行文件收发。客户端程序采用单线程处理,在和服务器端建立连接后,循环读取流媒体文件,并进行套接字发送。客户端运行流程包含了:

图3-1 客户端操作流程图
接收端作为服务端采用多线程进行编程。主线程用于接收连接后接收客户端发送的报文存入缓存。另起一个线程用于从缓存中读取数据包并存入文件中。服务器端的运行流程包含如下关键步骤:

图3-2 服务端操作流程图
由于需要用到套接字进行编程,因此在头文件上需要包含一些必要的头文件:
- #include
- #include
- #include
- #include
- #include
创建套接字,可以直接利用操作系统的SOCKET接口实现,关键函数如下:
- int sockfd;
- struct sockaddr_in servaddr;
-
- if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) //创建套接字并判断是否成功
- {
- printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
- return 0;
- }
-
- memset(&servaddr, 0, sizeof(servaddr)); //初始化结构体
- servaddr.sin_family = AF_INET; //设置地址家族
- servaddr.sin_port = htons(atoi(argv[3])); //设置端口
-
- //发出连接请求判断是否连接成功
- if (connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
- {
- printf("connect error: %s(errno: %d)\n", strerror(errno), errno);
- return 0;
- }
至此,客户端主动向服务器发送链接。
在发送端可通过fopen打开文件,通过fread函数读取流媒体文件:
- if ((fq = fopen(argv[1], "rb")) == NULL)
- {
- /*判断文件是否打开*/
- close(serverFd);
- return -1;
- }
- ......
- /*循环读取文件并发送*/
- size_t readLen = fread(buffer, 1, sizeof(buffer), fq);
发送端启动TCP发送,这里的write函数中调用的sockfd是套接字的句柄:
- while (!feof(fq))
- {
- /*循环读取文件并发送*/
- size_t readLen = fread(buffer, 1, sizeof(buffer), fq);
- if (readLen != write(serverFd, buffer, readLen))
- {
- printf("write error.\n");
- break;
- }
- }
服务器端由于涉及到多线程,因此需要包含多线程头文件。并且服务器端还涉及到缓冲区,本实例可以通过队列方法设计缓冲区,因此可以包含队列头文件。还有涉及到同步锁机制,因此还需要包含同步锁头文件,具体如下:
- #include
- #include
- #include
- #include
- #include
- #include
- #include
服务器端首先也需要创建套接字,并等待客户端发起连接,服务器端的关键代码如下:
- int main(int argc, char **argv)
- {
- int listenFd, clientFd;
- struct sockaddr_in servaddr;
-
- if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
- {
- /*创建套接字*/
- printf("create socket error\n");
- return -1;
- }
-
- memset(&servaddr, 0, sizeof(servaddr)); //初始化结构体
- servaddr.sin_family = AF_INET; //设置地址族协议
- servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //设置地址
- servaddr.sin_port = htons(6666); //设置默认端口
-
- if (bind(listenFd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
- {
- /*绑定套接字地址和端口*/
- printf("bind socket error\n");
- return -1;
- }
-
- if (listen(listenFd, 10) < 0)
- {
- /*开启监听*/
- printf("listen socket error\n");
- return -1;
- }
-
- struct sockaddr_in client_addr;
- socklen_t size = sizeof(client_addr);
-
- if ((clientFd = accept(listenFd, (struct sockaddr *)&client_addr, &size)) < 0)
- {
- /*建立连接*/
- printf("accept socket error\n");
- return -1;
-
- }
-
- std::thread write_thread(writeThread);
- size_t readLen = 0;
-
- while (true)
- {
- /*循环读取客户端消息*/
- char buff[MAXBUFF] = {0};
- readLen = read(clientFd, buff, MAXBUFF);
-
- if (readLen <= 0)
- break;
- std::string data(buff, readLen);
- g_mx.lock(); //上锁
-
- g_dataQue.push(data);
- g_mx.unlock(); //解锁
-
- }
-
- write_thread.join();
- close(clientFd);
- close(listenFd);
- return 0;
- }
注意,在上述函数中定义了写文件线程:
std::thread write_thread(writeThread);
并且在主线程中启动了写文件线程:
write_thread.join();
接收线程执行函数:
- std::queue
g_dataQue; //全局队列 - std::mutex g_mx; //互斥锁
-
- void writeThread()
- {
- /*写线程*/
- FILE *out_put = fopen("recv_data.mp4", "w+");
- sleep(1); //休眠一秒,确保队列中有数据
-
- while (true)
- {
- /*从队列中读取数据并存储*/
- if (g_dataQue.size() == 0)
- break;
- g_mx.lock();
- std::string data = g_dataQue.front();
- g_dataQue.pop();
- g_mx.unlock();
- fwrite((void *)data.data(), 1, data.size(), out_put);
- }
-
- fclose(out_put);
- }
如上所示,同步锁用于进行缓冲区的读写同步。上述实例中通过std::mutex实现同步。
- g_lock.lock(); //上锁
- g_lock.unlock(); //解锁