• 资料库的webrtc文件传输


    一、一个看似简单的事情往往不简单

    一个简单的事情往往会倾注你的心血,也许你看到很简单往往其实没那么简单;其实想想今年业余时间的大把代码,真正能成品的好像并不多。

    马上年底了,写下这篇文章。每一行程序就像写小说一样,不管好不好;代码都倾注了我的心血。

    真正把一个东西做好;做到好用,用的人多,具有很好的利他性,并不是一个容易的事情,需要不断的量变。

    webrtc演示网址:

    文件下载测试

    二、为什么放弃百度网盘

    把数据存储到百度网盘上大该有半年时间,后来发现了一些致命的问题

    1、发现分享外链到了10几万条之后就不能分享了

    2、分享的百度外链经常会被屏蔽掉

    3、分享速度不能过快,1小时只能分享100多条外链,如果分享的过快整个网盘的分享功能都会

    4、百度账号需要实名认证,很容易被人认出来,作为一个技术人员我认为应该藏在幕后,尽量不要让别人认出来,这回避免很多不必要的问题

    三、为什么不用云存储

    4个T的存储,每月就要400元,一年就是5600元,而且不算流量费用一年就是5600,这个费用是不高的,最关键的是使用了云存储很

    1、可能被黑客共计,如果别人使用curl去恶意刷流量现有的安全策略很可能被人把费用都刷去

    2、每年存储费用是递增的

    3、需要实名认证,一旦绑定域名,绑定着手机号,很可能被人找到

    四、我的WebRtc架构设计

    拜托传统的中继服务器,使用p2p打洞,调研了一些方案,本来打算使用 coturn + janus 的架构设计,但是发现janus c 写的,没有文件传输的插件,如果自己再用c去做一个插件真的是很累,于是就打算用golang 实现一个文件传输的网关。

    大体步骤

    1、启动网关

    2、存储节点 连接到网关

    3、用户浏览器请求网关,请求和存储节点交换信令

    4、信令交换完成,使用dataChannel进行通信

    五、网关设计

    网关主要是一个websocket server 采用golang编写

    网关设计主要分成了几大模块

    1、room

    room是一个公共领域,主要是做client和node的鉴权操作,如果鉴权成功,那么会进入Manager进行调度

    2、抽象client和node

    client和node 有一个共性是他们都具有连接属性,所以应该设计一个公共接口,把他们的共性抽象出来

    1. type Lifecycle interface {
    2. Stop()
    3. Start()
    4. }
    5. type Describable interface {
    6. Name() string
    7. Type() ConnType
    8. }
    9. type Component interface {
    10. Lifecycle
    11. Describable
    12. AddCallback(ctx EventCallback)
    13. GetConnNumber() uint64
    14. SetContext(interface{})
    15. GetContext() interface{}
    16. Product(ctx Context)
    17. }

    通过添加一个AddCallback回调函数,确保不同模块的业务处理完全玻璃开,node的逻辑就在node里处理,client的逻辑只在client里处理,不能把不同模块的代码交叉处理,至于上下问的传输,统一抽象一个Context,里面抽象存储着我们需要的上下文信息,供给不同的回调函数以及协成之间传输使用。

    1. type Context interface {
    2. GetData() []byte
    3. Error() error
    4. MessageType() int
    5. GetContext() interface{}
    6. SetContext(interface{})
    7. Name() string
    8. }
    9. type EventCallback interface {
    10. OnReceive(Context, *sync.WaitGroup)
    11. OnError(Context, *sync.WaitGroup)
    12. OnClose(Context, *sync.WaitGroup)
    13. OnWriteComplete(Context, *sync.WaitGroup)
    14. }
    15. type NodeCallback interface {
    16. OnReceive(Context, *sync.WaitGroup)
    17. OnError(Context, *sync.WaitGroup)
    18. OnClose(Context, *sync.WaitGroup)
    19. OnWriteComplete(Context, *sync.WaitGroup)
    20. }

    4、manager

    负责整体调度,比如client进入manager后,查找当前可以用的node存储节点,找到后进行信令交换

    节点调度,遍历查找拥有client最少的node节点,然后进行通信

    1. // 选择最优线路
    2. func (m *Manager) selectNode() *node.NodeClient {
    3. if len(m.nodeTree) == 0 {
    4. return nil
    5. }
    6. // 找一个挂载链接最少的节点
    7. var usableNode *node.NodeClient
    8. var weight uint64
    9. for _, conn := range m.nodeTree {
    10. if uint64(conn.GetConnNumber()) <= weight {
    11. usableNode = conn.(*node.NodeClient)
    12. }
    13. }
    14. return usableNode
    15. }

    六、node节点收到信令后进行应答

    1、创建RTCConnection

    1. blockSize := 16384
    2. //前端页面会对sdp进行base64的encode
    3. b, err := base64.StdEncoding.DecodeString(sdp)
    4. if err != nil {
    5. log.Error("error:%s", err)
    6. return nil
    7. }
    8. str, err := url.QueryUnescape(string(b))
    9. if err != nil {
    10. log.Error("error:%s", err)
    11. return nil
    12. }
    13. sdpDes := webrtc.SessionDescription{}
    14. fmt.Println(str)
    15. err = json.Unmarshal([]byte(str), &sdpDes)
    16. if err != nil {
    17. log.Error("json.Unmarshal err:%s", err)
    18. return nil
    19. }
    20. //创建pc, 并且指定stun服务器
    21. pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
    22. ICEServers: []webrtc.ICEServer{
    23. {
    24. URLs: []string{"stun:"},
    25. },
    26. },
    27. })
    28. stat, err := os.Stat("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
    29. if err != nil {
    30. log.Error("os.Stat %s error:%s", path, err)
    31. return nil
    32. }
    33. if offset > stat.Size() {
    34. log.Error("offset(%d) > stat.Size(%d)", offset, stat.Size())
    35. return nil
    36. }
    37. chunkSize := int(math.Ceil(float64(stat.Size() / int64(blockSize))))
    38. currentChunkSize := int(math.Ceil(float64(offset / int64(blockSize))))
    39. if err != nil {
    40. log.Error("%s", err)
    41. return nil
    42. }
    43. pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
    44. fmt.Println("OnConnectionStateChange")
    45. fmt.Printf("Peer Connection State has changed: %s (answerer)\n", s.String())
    46. if s == webrtc.PeerConnectionStateFailed {
    47. fmt.Println("webrtc.PeerConnectionStateFailed")
    48. }
    49. })
    50. // Register data channel creation handling
    51. pc.OnDataChannel(func(d *webrtc.DataChannel) {
    52. fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
    53. // Register channel opening handling
    54. d.OnOpen(func() {
    55. fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID())
    56. stat, err := os.Stat("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
    57. chunkSize = int(math.Ceil(float64(stat.Size() / int64(blockSize))))
    58. // 握手
    59. var chunk ChunkMessage
    60. chunk.Class = HANDSHAKE
    61. chunk.ChunkSize = uint64(chunkSize)
    62. handShakeBytes := serialize(&chunk)
    63. err = d.Send(handShakeBytes.Bytes())
    64. if err != nil {
    65. log.Error("%s", err)
    66. return
    67. }
    68. })
    69. // Register text message handling
    70. d.OnMessage(func(msg webrtc.DataChannelMessage) {
    71. fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label(), string(msg.Data))
    72. data, err := unSerialize(msg.Data)
    73. if err != nil {
    74. log.Error("os.Open is : %s", err)
    75. return
    76. }
    77. if data.Class == ACK {
    78. handle, err := os.Open("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
    79. if err != nil {
    80. log.Error("os.Open is : %s", err)
    81. return
    82. }
    83. defer handle.Close()
    84. bufferBytes := make([]byte, blockSize)
    85. read, err := handle.Read(bufferBytes)
    86. if err != nil {
    87. log.Error("handle.Read is : %s", err)
    88. return
    89. }
    90. if read < blockSize {
    91. bufferBytes = bufferBytes[:read]
    92. }
    93. var chunk ChunkMessage
    94. chunk.Class = SEND
    95. chunk.ChunkSize = uint64(chunkSize)
    96. chunk.CurrentChunk = uint64(currentChunkSize)
    97. chunk.Data = bufferBytes
    98. // 打包发送
    99. err = d.Send(serialize(&chunk).Bytes())
    100. if err != nil {
    101. log.Error("%s", err)
    102. return
    103. }
    104. return
    105. }
    106. if data.Class == RECEIVE {
    107. handle, err := os.Open("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
    108. if err != nil {
    109. log.Error("os.Open is : %s", err)
    110. return
    111. }
    112. if data.CurrentChunk == uint64(chunkSize) {
    113. log.Info("data transfer finish")
    114. return
    115. }
    116. nextChunk := data.CurrentChunk + 1
    117. bytes := make([]byte, blockSize)
    118. read, err := handle.ReadAt(bytes, int64(nextChunk)*int64(blockSize))
    119. if err != nil {
    120. if !errors.Is(err, io.EOF) {
    121. log.Error("handle.Read is : %s", err)
    122. return
    123. }
    124. }
    125. if read < blockSize {
    126. bytes = bytes[:read]
    127. }
    128. var sendData ChunkMessage
    129. sendData.Class = SEND
    130. sendData.CurrentChunk = nextChunk
    131. sendData.Data = bytes
    132. sendData.ChunkSize = uint64(chunkSize)
    133. sendDataBytes := serialize(&sendData)
    134. log.Info(" read %d", nextChunk)
    135. err = d.Send(sendDataBytes.Bytes())
    136. if err != nil {
    137. log.Error("%s", err)
    138. return
    139. }
    140. // 最后一块
    141. if nextChunk == uint64(chunkSize) {
    142. d.Close()
    143. pc.Close()
    144. }
    145. return
    146. }
    147. })
    148. })
    149. _, err = pc.CreateDataChannel("sendDataChannel", nil)
    150. if err != nil {
    151. log.Error("error:%s", err)
    152. return nil
    153. }
    154. //channel.OnOpen(func() {
    155. //
    156. //})
    157. //设置远端的sdp
    158. if err = pc.SetRemoteDescription(sdpDes); err != nil {
    159. log.Error("error:%s", err)
    160. return nil
    161. }
    162. //创建协商结果
    163. answer, err := pc.CreateAnswer(nil)
    164. if err != nil {
    165. log.Error("error:%s", err)
    166. return nil
    167. }
    168. pc.OnICECandidate(func(i *webrtc.ICECandidate) {
    169. fmt.Println("OnICECandidate")
    170. fmt.Println(i)
    171. })
    172. err = pc.SetLocalDescription(answer)
    173. if err != nil {
    174. log.Error("error:%s", err)
    175. return nil
    176. }
    177. //等待ice结束
    178. gatherCmp := webrtc.GatheringCompletePromise(pc)
    179. <-gatherCmp
    180. //将协商并且收集完candidate的answer,输出到控制台
    181. answerBytes, err := json.Marshal(*pc.LocalDescription())
    182. if err != nil {
    183. log.Error("error:%s", err)
    184. return nil
    185. }
    186. pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
    187. fmt.Println("OnICECandidate")
    188. fmt.Println(candidate)
    189. })
    190. t := &Transfer{
    191. sdp: sdp,
    192. pc: pc,
    193. offset: uint64(offset),
    194. currentChunkSize: currentChunkSize,
    195. chunkSize: chunkSize,
    196. answerSdp: answerBytes,
    197. blockSize: blockSize,
    198. }
    199. return t

    2、打包传输

    传输没有采用protobuf,自己写了个二进制传输

    1. func serialize(data *ChunkMessage) *bytes.Buffer {
    2. data.Version = CodeVersion
    3. writeBuffer := bytes.NewBuffer(nil)
    4. writeBuffer.Write([]byte{data.Version})
    5. writeBuffer.Write([]byte{data.Class})
    6. // ChunkSize
    7. binary.Write(writeBuffer, binary.BigEndian, data.ChunkSize)
    8. // CurrentChunk
    9. binary.Write(writeBuffer, binary.BigEndian, data.CurrentChunk)
    10. // DataLen
    11. data.DataLen = uint64(len(data.Data))
    12. binary.Write(writeBuffer, binary.BigEndian, data.DataLen)
    13. if len(data.Data) > 0 {
    14. // 添加body
    15. writeBuffer.Write(data.Data)
    16. }
    17. return writeBuffer
    18. }
    19. //判断我们系统中的字节序类型
    20. func systemEdian() binary.ByteOrder {
    21. var i int = 0x1
    22. bs := (*[int(unsafe.Sizeof(0))]byte)(unsafe.Pointer(&i))
    23. if bs[0] == 0 {
    24. return binary.LittleEndian
    25. } else {
    26. return binary.BigEndian
    27. }
    28. }
    29. func unSerialize(data []byte) (*ChunkMessage, error) {
    30. buf := bytes.NewBuffer(data)
    31. fmt.Println(buf)
    32. var chunk ChunkMessage
    33. binary.Read(buf, systemEdian(), &chunk.Version)
    34. binary.Read(buf, systemEdian(), &chunk.Class)
    35. binary.Read(buf, systemEdian(), &chunk.ChunkSize)
    36. binary.Read(buf, systemEdian(), &chunk.CurrentChunk)
    37. binary.Read(buf, systemEdian(), &chunk.DataLen)
    38. //chunkSize := uint64(unsafe.Pointer(&buf.Bytes()))
    39. //chunk.ChunkSize = chunkSize
    40. return &chunk, nil
    41. }

    7、js前端webrtc提供报价

    创建webrtc连接

    1. var pcConfig = {
    2. 'iceServers': [{
    3. 'urls': 'stun:',
    4. }]
    5. };
    6. localConnection = new RTCPeerConnection(pcConfig);
    7. receiveDataChannel = localConnection.createDataChannel("receiveDataChannel")
    8. receiveDataChannel.binaryType = "arraybuffer"
    9. receiveDataChannel.addEventListener('open', dataChannel.onopen);
    10. receiveDataChannel.addEventListener('close', dataChannel.onclose);
    11. receiveDataChannel.addEventListener('message', dataChannel.onmessage);
    12. receiveDataChannel.addEventListener('error', dataChannel.onError);
    13. try {
    14. this.offer = await localConnection.createOffer();
    15. } catch (e) {
    16. console.log('Failed to create session description: ', e);
    17. return
    18. }
    19. try {
    20. await localConnection.setLocalDescription(this.offer)
    21. } catch (e) {
    22. console.log('Failed to create session description: ', e);
    23. return
    24. }
    25. //eyJ1c2VyX3V1aWQiOiI1ZmZkNDE0N2JkMTMyNWNmMjYwNDAyMWYwODA5OWUyMyIsImxvZ2luX3RpbWUiOjE2Njg0NzgzOTEsIm5vd190aW1lIjoxNjY5OTgzNzkwLCJyYW5fc3RyIjoiZDA3MTczNzI3NjFjMzY0MGU2NmRlYWI5YmYyODZhNzYiLCJzaWduIjoiZjc3NzI0YjZmMTc3MzczNmVhZWFkMTM2NzllNTE0NTcifQ==
    26. transfer.ws.send((JSON.stringify(downloadRequest)));

    前端对收到的数据进行序列化和反序列化

    1. function serialize(data) {
    2. var bufLen = protoColMinSize;
    3. if (!data.Data) {
    4. bufLen += 0;
    5. } else {
    6. bufLen += data.Data.length;
    7. }
    8. data.Version = 1;
    9. var protocolBuf = new ArrayBuffer(bufLen);
    10. const bufView = new DataView(protocolBuf);
    11. bufView.setUint8(0, data.Version);
    12. bufView.setUint8(1, data.Class);
    13. if (!data.ChunkSize) {
    14. data.ChunkSize = 0
    15. }
    16. bufView.setBigUint64(2, BigInt(data.ChunkSize));
    17. if (!data.CurrentChunk) {
    18. data.CurrentChunk = 0
    19. }
    20. bufView.setBigUint64(10, BigInt(data.CurrentChunk));
    21. if (data.Data && data.Data.length > 0) {
    22. bufView.setBigUint64(18, BigInt(data.Data.length));
    23. } else {
    24. bufView.setBigUint64(18, BigInt(0));
    25. }
    26. console.log(protocolBuf)
    27. return protocolBuf;
    28. }
    29. function unSerialize(bytes) {
    30. var versionView = new DataView(bytes).getUint8(0);
    31. // 最小长度
    32. var classByteView = new DataView(bytes).getUint8(1);
    33. // chunk 长度
    34. var chunkSizeView = parseInt(new DataView(bytes).getBigUint64(2));
    35. var currentChunkView = parseInt(new DataView(bytes).getBigUint64(10));
    36. var bodyLenView = parseInt(new DataView(bytes).getBigUint64(18));
    37. var returnData = {
    38. Version: versionView,
    39. Class: classByteView,
    40. ChunkSize: (chunkSizeView),
    41. CurrentChunk: currentChunkView,
    42. PayloadLength: bodyLenView,
    43. Payload: [],
    44. };
    45. if (bodyLenView > 0) {
    46. returnData.Payload = new Uint8Array(bytes, protoColMinSize, bodyLenView)
    47. }
    48. return returnData;
    49. }

    八、webrtc文件传输的优缺

    1)优点

    1、点对点传输,不经过中继服务器

    2、民用带宽比较便宜,最差的情况下是带宽打满,不会出现很高的流量费用

    3、可以自建存储,存储节点可以使用群辉,可以买自己服务器

    4、不需要固定的ip地址

    2)缺点

    1、宽带线民用的不知道能申请几根

    2、存储维护硬件也是一个麻烦的事情,硬盘很可能出现故障,运维也是一个头痛的事情

  • 相关阅读:
    Patroni的执行流
    Chapter 2 Gradient Descent
    HTML5期末考核大作业——学生网页设计作业源码HTML+CSS+JavaScript 中华美德6页面带音乐文化
    【Java 进阶篇】数据定义语言(DDL)详解
    如何使用现有工具三分钟之内从无到有设计一款滤波器?
    JAVA 7-3 统计投票
    java中的并发工具类
    JAVA8时间工具类
    iOS 16适配屏幕旋转强制转屏切换大总结
    Lua篇笔记
  • 原文地址:https://blog.csdn.net/qq_32783703/article/details/128168660