一个简单的事情往往会倾注你的心血,也许你看到很简单往往其实没那么简单;其实想想今年业余时间的大把代码,真正能成品的好像并不多。
马上年底了,写下这篇文章。每一行程序就像写小说一样,不管好不好;代码都倾注了我的心血。
真正把一个东西做好;做到好用,用的人多,具有很好的利他性,并不是一个容易的事情,需要不断的量变。
webrtc演示网址:
把数据存储到百度网盘上大该有半年时间,后来发现了一些致命的问题
1、发现分享外链到了10几万条之后就不能分享了
2、分享的百度外链经常会被屏蔽掉
3、分享速度不能过快,1小时只能分享100多条外链,如果分享的过快整个网盘的分享功能都会
4、百度账号需要实名认证,很容易被人认出来,作为一个技术人员我认为应该藏在幕后,尽量不要让别人认出来,这回避免很多不必要的问题
4个T的存储,每月就要400元,一年就是5600元,而且不算流量费用一年就是5600,这个费用是不高的,最关键的是使用了云存储很
1、可能被黑客共计,如果别人使用curl去恶意刷流量现有的安全策略很可能被人把费用都刷去
2、每年存储费用是递增的
3、需要实名认证,一旦绑定域名,绑定着手机号,很可能被人找到
拜托传统的中继服务器,使用p2p打洞,调研了一些方案,本来打算使用 coturn + janus 的架构设计,但是发现janus c 写的,没有文件传输的插件,如果自己再用c去做一个插件真的是很累,于是就打算用golang 实现一个文件传输的网关。
大体步骤
1、启动网关
2、存储节点 连接到网关
3、用户浏览器请求网关,请求和存储节点交换信令
4、信令交换完成,使用dataChannel进行通信
网关主要是一个websocket server 采用golang编写
网关设计主要分成了几大模块
room是一个公共领域,主要是做client和node的鉴权操作,如果鉴权成功,那么会进入Manager进行调度
client和node 有一个共性是他们都具有连接属性,所以应该设计一个公共接口,把他们的共性抽象出来
- type Lifecycle interface {
- Stop()
- Start()
- }
-
- type Describable interface {
- Name() string
- Type() ConnType
- }
-
-
- type Component interface {
- Lifecycle
- Describable
- AddCallback(ctx EventCallback)
- GetConnNumber() uint64
- SetContext(interface{})
- GetContext() interface{}
- Product(ctx Context)
- }
通过添加一个AddCallback回调函数,确保不同模块的业务处理完全玻璃开,node的逻辑就在node里处理,client的逻辑只在client里处理,不能把不同模块的代码交叉处理,至于上下问的传输,统一抽象一个Context,里面抽象存储着我们需要的上下文信息,供给不同的回调函数以及协成之间传输使用。
- type Context interface {
- GetData() []byte
- Error() error
- MessageType() int
- GetContext() interface{}
- SetContext(interface{})
- Name() string
- }
-
- type EventCallback interface {
- OnReceive(Context, *sync.WaitGroup)
- OnError(Context, *sync.WaitGroup)
- OnClose(Context, *sync.WaitGroup)
- OnWriteComplete(Context, *sync.WaitGroup)
- }
-
- type NodeCallback interface {
- OnReceive(Context, *sync.WaitGroup)
- OnError(Context, *sync.WaitGroup)
- OnClose(Context, *sync.WaitGroup)
- OnWriteComplete(Context, *sync.WaitGroup)
- }
负责整体调度,比如client进入manager后,查找当前可以用的node存储节点,找到后进行信令交换
节点调度,遍历查找拥有client最少的node节点,然后进行通信
- // 选择最优线路
- func (m *Manager) selectNode() *node.NodeClient {
- if len(m.nodeTree) == 0 {
- return nil
- }
-
- // 找一个挂载链接最少的节点
- var usableNode *node.NodeClient
- var weight uint64
- for _, conn := range m.nodeTree {
- if uint64(conn.GetConnNumber()) <= weight {
- usableNode = conn.(*node.NodeClient)
- }
- }
- return usableNode
- }
- blockSize := 16384
- //前端页面会对sdp进行base64的encode
- b, err := base64.StdEncoding.DecodeString(sdp)
- if err != nil {
- log.Error("error:%s", err)
- return nil
- }
-
- str, err := url.QueryUnescape(string(b))
- if err != nil {
- log.Error("error:%s", err)
- return nil
- }
-
- sdpDes := webrtc.SessionDescription{}
- fmt.Println(str)
- err = json.Unmarshal([]byte(str), &sdpDes)
- if err != nil {
- log.Error("json.Unmarshal err:%s", err)
- return nil
- }
-
- //创建pc, 并且指定stun服务器
- pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
- ICEServers: []webrtc.ICEServer{
- {
- URLs: []string{"stun:"},
- },
- },
- })
-
- stat, err := os.Stat("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
- if err != nil {
- log.Error("os.Stat %s error:%s", path, err)
- return nil
- }
-
- if offset > stat.Size() {
- log.Error("offset(%d) > stat.Size(%d)", offset, stat.Size())
- return nil
- }
-
- chunkSize := int(math.Ceil(float64(stat.Size() / int64(blockSize))))
- currentChunkSize := int(math.Ceil(float64(offset / int64(blockSize))))
-
- if err != nil {
- log.Error("%s", err)
- return nil
- }
-
- pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
- fmt.Println("OnConnectionStateChange")
- fmt.Printf("Peer Connection State has changed: %s (answerer)\n", s.String())
- if s == webrtc.PeerConnectionStateFailed {
- fmt.Println("webrtc.PeerConnectionStateFailed")
- }
- })
-
- // Register data channel creation handling
- pc.OnDataChannel(func(d *webrtc.DataChannel) {
- fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
-
- // Register channel opening handling
- d.OnOpen(func() {
- fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID())
-
- stat, err := os.Stat("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
- chunkSize = int(math.Ceil(float64(stat.Size() / int64(blockSize))))
-
- // 握手
- var chunk ChunkMessage
- chunk.Class = HANDSHAKE
- chunk.ChunkSize = uint64(chunkSize)
- handShakeBytes := serialize(&chunk)
-
- err = d.Send(handShakeBytes.Bytes())
- if err != nil {
- log.Error("%s", err)
- return
- }
- })
-
- // Register text message handling
- d.OnMessage(func(msg webrtc.DataChannelMessage) {
- fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label(), string(msg.Data))
- data, err := unSerialize(msg.Data)
- if err != nil {
- log.Error("os.Open is : %s", err)
- return
- }
-
- if data.Class == ACK {
- handle, err := os.Open("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
- if err != nil {
- log.Error("os.Open is : %s", err)
- return
- }
-
- defer handle.Close()
-
- bufferBytes := make([]byte, blockSize)
- read, err := handle.Read(bufferBytes)
- if err != nil {
- log.Error("handle.Read is : %s", err)
- return
- }
-
- if read < blockSize {
- bufferBytes = bufferBytes[:read]
- }
-
- var chunk ChunkMessage
- chunk.Class = SEND
- chunk.ChunkSize = uint64(chunkSize)
- chunk.CurrentChunk = uint64(currentChunkSize)
- chunk.Data = bufferBytes
-
- // 打包发送
- err = d.Send(serialize(&chunk).Bytes())
- if err != nil {
- log.Error("%s", err)
- return
- }
- return
- }
-
- if data.Class == RECEIVE {
- handle, err := os.Open("/home/zhanglei/Downloads/《实践论》(原文)毛泽东.pdf")
- if err != nil {
- log.Error("os.Open is : %s", err)
- return
- }
-
- if data.CurrentChunk == uint64(chunkSize) {
- log.Info("data transfer finish")
- return
- }
-
- nextChunk := data.CurrentChunk + 1
-
- bytes := make([]byte, blockSize)
- read, err := handle.ReadAt(bytes, int64(nextChunk)*int64(blockSize))
-
- if err != nil {
- if !errors.Is(err, io.EOF) {
- log.Error("handle.Read is : %s", err)
- return
- }
-
- }
-
- if read < blockSize {
- bytes = bytes[:read]
- }
-
- var sendData ChunkMessage
- sendData.Class = SEND
- sendData.CurrentChunk = nextChunk
- sendData.Data = bytes
- sendData.ChunkSize = uint64(chunkSize)
- sendDataBytes := serialize(&sendData)
- log.Info(" read %d", nextChunk)
-
- err = d.Send(sendDataBytes.Bytes())
- if err != nil {
- log.Error("%s", err)
- return
- }
-
- // 最后一块
- if nextChunk == uint64(chunkSize) {
- d.Close()
- pc.Close()
- }
- return
- }
-
- })
- })
-
- _, err = pc.CreateDataChannel("sendDataChannel", nil)
- if err != nil {
- log.Error("error:%s", err)
- return nil
- }
-
- //channel.OnOpen(func() {
- //
- //})
-
- //设置远端的sdp
- if err = pc.SetRemoteDescription(sdpDes); err != nil {
- log.Error("error:%s", err)
- return nil
- }
-
- //创建协商结果
- answer, err := pc.CreateAnswer(nil)
- if err != nil {
- log.Error("error:%s", err)
- return nil
- }
-
- pc.OnICECandidate(func(i *webrtc.ICECandidate) {
- fmt.Println("OnICECandidate")
- fmt.Println(i)
-
- })
-
- err = pc.SetLocalDescription(answer)
- if err != nil {
- log.Error("error:%s", err)
- return nil
- }
-
- //等待ice结束
- gatherCmp := webrtc.GatheringCompletePromise(pc)
- <-gatherCmp
-
- //将协商并且收集完candidate的answer,输出到控制台
- answerBytes, err := json.Marshal(*pc.LocalDescription())
- if err != nil {
- log.Error("error:%s", err)
- return nil
- }
-
- pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
- fmt.Println("OnICECandidate")
- fmt.Println(candidate)
- })
-
- t := &Transfer{
- sdp: sdp,
- pc: pc,
- offset: uint64(offset),
- currentChunkSize: currentChunkSize,
- chunkSize: chunkSize,
- answerSdp: answerBytes,
- blockSize: blockSize,
- }
- return t
传输没有采用protobuf,自己写了个二进制传输
- func serialize(data *ChunkMessage) *bytes.Buffer {
- data.Version = CodeVersion
- writeBuffer := bytes.NewBuffer(nil)
- writeBuffer.Write([]byte{data.Version})
- writeBuffer.Write([]byte{data.Class})
-
- // ChunkSize
- binary.Write(writeBuffer, binary.BigEndian, data.ChunkSize)
-
- // CurrentChunk
- binary.Write(writeBuffer, binary.BigEndian, data.CurrentChunk)
-
- // DataLen
- data.DataLen = uint64(len(data.Data))
- binary.Write(writeBuffer, binary.BigEndian, data.DataLen)
-
- if len(data.Data) > 0 {
- // 添加body
- writeBuffer.Write(data.Data)
- }
-
- return writeBuffer
- }
-
- //判断我们系统中的字节序类型
- func systemEdian() binary.ByteOrder {
- var i int = 0x1
- bs := (*[int(unsafe.Sizeof(0))]byte)(unsafe.Pointer(&i))
- if bs[0] == 0 {
- return binary.LittleEndian
- } else {
- return binary.BigEndian
- }
- }
-
- func unSerialize(data []byte) (*ChunkMessage, error) {
- buf := bytes.NewBuffer(data)
- fmt.Println(buf)
- var chunk ChunkMessage
- binary.Read(buf, systemEdian(), &chunk.Version)
- binary.Read(buf, systemEdian(), &chunk.Class)
- binary.Read(buf, systemEdian(), &chunk.ChunkSize)
- binary.Read(buf, systemEdian(), &chunk.CurrentChunk)
- binary.Read(buf, systemEdian(), &chunk.DataLen)
- //chunkSize := uint64(unsafe.Pointer(&buf.Bytes()))
- //chunk.ChunkSize = chunkSize
- return &chunk, nil
- }
创建webrtc连接
- var pcConfig = {
- 'iceServers': [{
- 'urls': 'stun:',
- }]
- };
- localConnection = new RTCPeerConnection(pcConfig);
-
- receiveDataChannel = localConnection.createDataChannel("receiveDataChannel")
-
- receiveDataChannel.binaryType = "arraybuffer"
-
- receiveDataChannel.addEventListener('open', dataChannel.onopen);
- receiveDataChannel.addEventListener('close', dataChannel.onclose);
- receiveDataChannel.addEventListener('message', dataChannel.onmessage);
- receiveDataChannel.addEventListener('error', dataChannel.onError);
-
- try {
- this.offer = await localConnection.createOffer();
- } catch (e) {
- console.log('Failed to create session description: ', e);
- return
- }
-
- try {
- await localConnection.setLocalDescription(this.offer)
- } catch (e) {
- console.log('Failed to create session description: ', e);
- return
- }
-
- //eyJ1c2VyX3V1aWQiOiI1ZmZkNDE0N2JkMTMyNWNmMjYwNDAyMWYwODA5OWUyMyIsImxvZ2luX3RpbWUiOjE2Njg0NzgzOTEsIm5vd190aW1lIjoxNjY5OTgzNzkwLCJyYW5fc3RyIjoiZDA3MTczNzI3NjFjMzY0MGU2NmRlYWI5YmYyODZhNzYiLCJzaWduIjoiZjc3NzI0YjZmMTc3MzczNmVhZWFkMTM2NzllNTE0NTcifQ==
-
- transfer.ws.send((JSON.stringify(downloadRequest)));
前端对收到的数据进行序列化和反序列化
- function serialize(data) {
- var bufLen = protoColMinSize;
- if (!data.Data) {
- bufLen += 0;
- } else {
- bufLen += data.Data.length;
- }
- data.Version = 1;
- var protocolBuf = new ArrayBuffer(bufLen);
- const bufView = new DataView(protocolBuf);
- bufView.setUint8(0, data.Version);
-
- bufView.setUint8(1, data.Class);
-
- if (!data.ChunkSize) {
- data.ChunkSize = 0
- }
- bufView.setBigUint64(2, BigInt(data.ChunkSize));
-
-
- if (!data.CurrentChunk) {
- data.CurrentChunk = 0
- }
- bufView.setBigUint64(10, BigInt(data.CurrentChunk));
-
- if (data.Data && data.Data.length > 0) {
- bufView.setBigUint64(18, BigInt(data.Data.length));
- } else {
- bufView.setBigUint64(18, BigInt(0));
- }
-
- console.log(protocolBuf)
- return protocolBuf;
- }
-
- function unSerialize(bytes) {
-
- var versionView = new DataView(bytes).getUint8(0);
- // 最小长度
- var classByteView = new DataView(bytes).getUint8(1);
- // chunk 长度
- var chunkSizeView = parseInt(new DataView(bytes).getBigUint64(2));
- var currentChunkView = parseInt(new DataView(bytes).getBigUint64(10));
- var bodyLenView = parseInt(new DataView(bytes).getBigUint64(18));
- var returnData = {
- Version: versionView,
- Class: classByteView,
- ChunkSize: (chunkSizeView),
- CurrentChunk: currentChunkView,
- PayloadLength: bodyLenView,
- Payload: [],
- };
-
- if (bodyLenView > 0) {
- returnData.Payload = new Uint8Array(bytes, protoColMinSize, bodyLenView)
- }
-
- return returnData;
- }
1、点对点传输,不经过中继服务器
2、民用带宽比较便宜,最差的情况下是带宽打满,不会出现很高的流量费用
3、可以自建存储,存储节点可以使用群辉,可以买自己服务器
4、不需要固定的ip地址
1、宽带线民用的不知道能申请几根
2、存储维护硬件也是一个麻烦的事情,硬盘很可能出现故障,运维也是一个头痛的事情