• 基于pion生态的SFU实时音视频发布服务(二)


          上篇文章说了风头正健的pion生态之livekit,现在轮到pion生态第一个sfu ion,这个由国内大佬鱼大等主持开发两年多开源项目,为国人乃至开源社区普及pion起了至关重要的作用,得到了Sean-Der的大力支持,也汇集了众多高手加盟,livekit的风格也深受其影响,下面是发布ion的习作,望各位大佬指正

    1. package livekitclient
    2. import (
    3. "fmt"
    4. "strings"
    5. "time"
    6. // "github.com/livekit/server-sdk-go/pkg/media/ivfwriter"
    7. "github.com/livekit/server-sdk-go/pkg/samplebuilder"
    8. ionsdk "github.com/pion/ion-sdk-go"
    9. "github.com/pion/rtp"
    10. "github.com/pion/webrtc/v3"
    11. "github.com/pion/webrtc/v3/pkg/media"
    12. "github.com/pion/webrtc/v3/pkg/media/h264writer"
    13. "github.com/pion/webrtc/v3/pkg/media/ivfwriter"
    14. "github.com/pion/webrtc/v3/pkg/media/oggwriter"
    15. "github.com/xiangxud/rtmp_webrtc_server/identity"
    16. "github.com/xiangxud/rtmp_webrtc_server/log"
    17. // "github.com/livekit/server-sdk-go/pkg/samplebuilder"
    18. )
    19. const (
    20. // sid = "ion"
    21. // uid = ionsdk.RandomKey(6)
    22. )
    23. func (t *LocalTrackPublication) saveToDisk(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
    24. codec := track.Codec()
    25. var fileWriter media.Writer
    26. var err error
    27. if strings.EqualFold(codec.MimeType, webrtc.MimeTypeOpus) {
    28. log.Infof("Got Opus track, saving to disk as ogg (48 kHz, 2 channels)")
    29. fileWriter, err = oggwriter.New(fmt.Sprintf("%d_%d.ogg", codec.PayloadType, track.SSRC()), 48000, 2)
    30. } else if strings.EqualFold(codec.MimeType, webrtc.MimeTypeVP8) {
    31. log.Infof("Got VP8 track, saving to disk as ivf")
    32. fileWriter, err = ivfwriter.New(fmt.Sprintf("%d_%d.ivf", codec.PayloadType, track.SSRC()))
    33. } else if strings.EqualFold(codec.MimeType, webrtc.MimeTypeH264) {
    34. log.Infof("Got H264 track, saving to disk as h264")
    35. fileWriter, err = h264writer.New(fmt.Sprintf("%d_%d.h264", codec.PayloadType, track.SSRC()))
    36. }
    37. if err != nil {
    38. log.Errorf("error: %v", err)
    39. fileWriter.Close()
    40. return
    41. }
    42. for {
    43. rtpPacket, _, err := track.ReadRTP()
    44. if err != nil {
    45. log.Warnf("track.ReadRTP error: %v", err)
    46. break
    47. }
    48. if err := fileWriter.WriteRTP(rtpPacket); err != nil {
    49. log.Warnf("fileWriter.WriteRTP error: %v", err)
    50. break
    51. }
    52. }
    53. }
    54. func (t *LocalTrackPublication) INORoomRTCJoin(r *Room, streamname, identify string) (*ionsdk.RTC, error) {
    55. // join room
    56. uid := streamname + ":" + identify
    57. err := r.IONRoom.Join(
    58. ionsdk.JoinInfo{
    59. Sid: identify,
    60. Uid: uid,
    61. DisplayName: uid,
    62. Role: ionsdk.Role_Host,
    63. Protocol: ionsdk.Protocol_WebRTC,
    64. Direction: ionsdk.Peer_BILATERAL,
    65. },
    66. )
    67. if err != nil {
    68. log.Errorf("Join error: %v", err)
    69. return nil, err
    70. }
    71. // new sdk engine
    72. config := ionsdk.RTCConfig{
    73. WebRTC: ionsdk.WebRTCTransportConfig{
    74. VideoMime: ionsdk.MimeTypeH264,
    75. },
    76. }
    77. joinedch := make(chan struct{})
    78. r.IONRoom.OnJoin = func(success bool, info ionsdk.RoomInfo, err error) {
    79. // THIS IS ROOM SINGAL API
    80. // ===============================
    81. rtc, err1 := ionsdk.NewRTC(r.IONConnector, config)
    82. if err1 != nil {
    83. log.Error(err1)
    84. return
    85. }
    86. // user define receiving rtp
    87. rtc.OnTrack = t.saveToDisk
    88. rtc.GetPubTransport().GetPeerConnection().OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
    89. log.Infof("Connection state changed: %s", state)
    90. })
    91. rtc.OnDataChannel = func(dc *webrtc.DataChannel) {
    92. log.Infof("dc: %v", dc.Label())
    93. }
    94. rtc.OnError = func(err error) {
    95. log.Errorf("err: %v", err)
    96. }
    97. configjoin := ionsdk.JoinConfig{}
    98. configjoin.SetNoPublish()
    99. configjoin.SetNoSubscribe()
    100. configjoin.SetNoAutoSubscribe()
    101. uid = ionsdk.RandomKey(6)
    102. err1 = rtc.Join(identify, uid, &configjoin)
    103. if err1 != nil {
    104. log.Errorf("error: %v", err1)
    105. return
    106. }
    107. log.Infof("rtc.Join ok sid=%v username =%v", identify, uid)
    108. // err = rtc.Join(session, ionsdk.RandomKey(4))
    109. // if err != nil {
    110. // log.Errorf("error: %v", err)
    111. // return
    112. // }
    113. t.IONRtc = rtc
    114. joinedch <- struct{}{}
    115. }
    116. <-joinedch
    117. return t.IONRtc, nil
    118. }
    119. func (r *Room) CreateIonRoom(addr, session string) (*ionsdk.Room, error) {
    120. log.Debug("CreateIonRoom: ", addr, " session: ", session)
    121. r.HostIon = addr
    122. connector := ionsdk.NewConnector(addr)
    123. // uid := ionsdk.RandomKey(6)
    124. room := ionsdk.NewRoom(connector)
    125. peers := room.GetPeers(session)
    126. if len(peers) != 0 {
    127. log.Debug("room is exit peers :", peers)
    128. // err := room.Join(
    129. // ionsdk.JoinInfo{
    130. // Sid: session,
    131. // Uid: uid,
    132. // DisplayName: uid,
    133. // Role: ionsdk.Role_Host,
    134. // Protocol: ionsdk.Protocol_WebRTC,
    135. // Direction: ionsdk.Peer_BILATERAL,
    136. // },
    137. // )
    138. // if err != nil {
    139. // log.Errorf("Join error: %v", err)
    140. // return nil, err
    141. // }
    142. r.IONRoom = room
    143. r.IONConnector = connector
    144. return room, nil
    145. }
    146. // THIS IS ROOM MANAGEMENT API
    147. // ==========================
    148. // create room
    149. err := room.CreateRoom(ionsdk.RoomInfo{Sid: session})
    150. if err != nil {
    151. log.Errorf("error:%v", err)
    152. return nil, err
    153. }
    154. // // new sdk engine
    155. // config := ionsdk.RTCConfig{
    156. // WebRTC: ionsdk.WebRTCTransportConfig{
    157. // VideoMime: ionsdk.MimeTypeH264,
    158. // },
    159. // }
    160. // // THIS IS ROOM SINGAL API
    161. // // ===============================
    162. // rtc, err := ionsdk.NewRTC(connector, config)
    163. // if err != nil {
    164. // log.Error(err)
    165. // // return err
    166. // }
    167. // // user define receiving rtp
    168. // rtc.OnTrack = r.saveToDisk
    169. // rtc.OnDataChannel = func(dc *webrtc.DataChannel) {
    170. // log.Infof("dc: %v", dc.Label())
    171. // }
    172. // rtc.OnError = func(err error) {
    173. // log.Errorf("err: %v", err)
    174. // }
    175. // err = rtc.Join(session, ionsdk.RandomKey(4))
    176. // if err != nil {
    177. // log.Errorf("error: %v", err)
    178. // return nil, err
    179. // }
    180. // log.Infof("rtc.Join ok sid=%v", session)
    181. // // err = rtc.Join(session, ionsdk.RandomKey(4))
    182. // // if err != nil {
    183. // // log.Errorf("error: %v", err)
    184. // // return
    185. // // }
    186. // r.IONRtc = rtc
    187. room.OnJoin = func(success bool, info ionsdk.RoomInfo, err error) {
    188. log.Infof("OnJoin success = %v, info = %v, err = %v", success, info, err)
    189. }
    190. room.OnLeave = func(success bool, err error) {
    191. log.Infof("OnLeave success = %v err = %v", success, err)
    192. }
    193. room.OnPeerEvent = func(state ionsdk.PeerState, peer ionsdk.PeerInfo) {
    194. log.Infof("OnPeerEvent state = %v, peer = %v", state, peer)
    195. }
    196. room.OnMessage = func(from string, to string, data map[string]interface{}) {
    197. log.Infof("OnMessage from = %v, to = %v, data = %v", from, to, data)
    198. }
    199. room.OnDisconnect = func(sid, reason string) {
    200. log.Infof("OnDisconnect sid = %v, reason = %v", sid, reason)
    201. }
    202. room.OnRoomInfo = func(info ionsdk.RoomInfo) {
    203. log.Infof("OnRoomInfo info=%v", info)
    204. }
    205. // join room
    206. // err = room.Join(
    207. // ionsdk.JoinInfo{
    208. // Sid: session,
    209. // Uid: uid,
    210. // DisplayName: uid,
    211. // Role: ionsdk.Role_Host,
    212. // Protocol: ionsdk.Protocol_WebRTC,
    213. // Direction: ionsdk.Peer_BILATERAL,
    214. // },
    215. // )
    216. // if err != nil {
    217. // log.Errorf("Join error: %v", err)
    218. // return nil, err
    219. // }
    220. r.IONRoom = room
    221. r.IONConnector = connector
    222. return room, nil
    223. }
    224. // func (t *LocalTrackPublication) ConnectRoomIon(host, identity string) error {
    225. // // host := ""
    226. // // apiKey := "api-key"
    227. // // apiSecret := "api-secret"
    228. // // roomName := "myroom"
    229. // // identity := "botuser"
    230. // room, err := lksdk.ConnectToRoom(host, lksdk.ConnectInfo{
    231. // APIKey: apikey,
    232. // APISecret: apisecret,
    233. // RoomName: roomname,
    234. // ParticipantIdentity: identity,
    235. // })
    236. // if err != nil {
    237. // log.Debug(err)
    238. // return err
    239. // }
    240. // t.LiveKitRoomConnect = room
    241. // room.Callback.OnTrackSubscribed = t.TrackSubscribed
    242. // return nil
    243. // // room.Disconnect()
    244. // }
    245. func (r *Room) TrackPublished_to_ION(streamname string) error {
    246. // - `in` implements io.ReadCloser, such as buffer or file
    247. // - `mime` has to be one of webrtc.MimeType...
    248. // videoTrack, err := lksdk.NewLocalSampleTrack(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264})
    249. r.ionlock.Lock()
    250. defer r.ionlock.Unlock()
    251. if r.Ctx != nil && r.IONRoom == nil {
    252. var err error
    253. sn, _ := identity.GetSN()
    254. r.IONRoom, err = r.CreateIonRoom(r.HostIon, sn)
    255. if err != nil {
    256. log.Debug("room->", sn, "create room ok", r)
    257. return err
    258. }
    259. }
    260. t := r.Localtracks[streamname]
    261. if t == nil {
    262. t = &LocalTrackPublication{Streamname: streamname}
    263. t.INORoomRTCJoin(r, streamname, r.Identity)
    264. log.Debug("ion track->", streamname, "<-is nil ,Connect room", t, r)
    265. } else {
    266. if !t.IONRtc.Connected() {
    267. t.INORoomRTCJoin(r, streamname, r.Identity)
    268. log.Debug("ion track->", streamname, "<-is nil ,re Connect room", t, r)
    269. }
    270. }
    271. if t.IONSfuTrack.VideoTrack == nil && t.Videopub == nil && t.IONSfuTrack.AudioTrack == nil && t.Audiopub == nil {
    272. videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, streamname+"-video", streamname)
    273. if err != nil {
    274. panic(err)
    275. }
    276. // r.RoomClient.MutePublishedTrack(r.Ctx,)
    277. // var local_video *lksdk.LocalTrackPublication
    278. if _, err = t.IONRtc.Publish(videoTrack); err != nil {
    279. log.Debug("Error publishing video track->", err)
    280. return err
    281. }
    282. t.IONSfuTrack.VideoTrack = videoTrack
    283. // r.Localtracks[streamname] = &LocalTrackPublication{p: local_video, Track: videoTrack, Trackname: streamname + "-video"}
    284. log.Debug("[TrackPublished_to_ION]", "published video track -> ", streamname)
    285. if t.IONSfuTrack.AudioTrack == nil {
    286. audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, streamname+"-audio", streamname)
    287. //audioTrack, err := lksdk.NewLocalSampleTrack(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus})
    288. if err != nil {
    289. panic(err)
    290. }
    291. // var local_audio *lksdk.LocalTrackPublication
    292. if _, err = t.IONRtc.Publish(audioTrack); err != nil {
    293. log.Debug("Error publishing audio track->", err)
    294. return err
    295. }
    296. t.IONSfuTrack.AudioTrack = audioTrack
    297. log.Debug("[TrackPublished_to_ION]", "published audio track -> ", streamname)
    298. }
    299. r.Localtracks[streamname] = t
    300. } else {
    301. log.Debug(streamname, "is exit publish")
    302. }
    303. return nil
    304. }
    305. func (r *Room) RTPTrackPublished_to_ION(trackRemote []*webrtc.TrackRemote, streamname string) error {
    306. // - `in` implements io.ReadCloser, such as buffer or file
    307. // - `mime` has to be one of webrtc.MimeType...
    308. // videoTrack, err := lksdk.NewLocalSampleTrack(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264})
    309. // r.mux.lock()
    310. r.ionlock.Lock()
    311. defer r.ionlock.Unlock()
    312. if r.Ctx != nil && r.IONRoom == nil {
    313. var err error
    314. sn, _ := identity.GetSN()
    315. r.IONRoom, err = r.CreateIonRoom(r.HostIon, sn)
    316. if err != nil {
    317. log.Debug("room->", sn, "create room ok", r)
    318. return err
    319. }
    320. }
    321. t := r.Localtracks[streamname]
    322. if t == nil {
    323. t = &LocalTrackPublication{Streamname: streamname}
    324. t.INORoomRTCJoin(r, streamname, r.Identity)
    325. log.Debug("track->", streamname, "<-is nil ,Connect room", t, r)
    326. } else {
    327. if !t.IONRtc.Connected() {
    328. t.INORoomRTCJoin(r, streamname, r.Identity)
    329. log.Debug("ion track->", streamname, "<-is nil ,re Connect room", t, r)
    330. }
    331. }
    332. for _, v := range trackRemote {
    333. if v.Kind().String() == "video" {
    334. if t.IONSfuTrack.VideoRTPTrack == nil {
    335. if strings.Contains(v.Codec().MimeType, "video") {
    336. videoRTPTrack, err := webrtc.NewTrackLocalStaticRTP(v.Codec().RTPCodecCapability, streamname+"-video", streamname)
    337. if err != nil {
    338. panic(err)
    339. }
    340. var pub []*webrtc.RTPSender
    341. log.Debug("ion track publish", videoRTPTrack)
    342. if pub, err = t.IONRtc.Publish(videoRTPTrack); err != nil {
    343. log.Debug("Error publishing video RTP track->", err)
    344. return err
    345. }
    346. t.IONVideopub = pub[0]
    347. t.IONSfuTrack.VideoRTPTrack = videoRTPTrack
    348. r.Localtracks[streamname] = t
    349. log.Debug("[RTPTrackPublished_to_ION]", "published video track -> ", streamname)
    350. }
    351. }
    352. } else {
    353. if v.Kind().String() == "audio" {
    354. if t.IONSfuTrack.AudioRTPTrack == nil {
    355. if strings.Contains(v.Codec().MimeType, "audio") {
    356. audioRTPTrack, err := webrtc.NewTrackLocalStaticRTP(v.Codec().RTPCodecCapability, streamname+"-audio", streamname)
    357. if err != nil {
    358. panic(err)
    359. }
    360. // var local_audio *lksdk.LocalTrackPublication
    361. var pub []*webrtc.RTPSender
    362. log.Debug("ion track publish", audioRTPTrack)
    363. if pub, err = t.IONRtc.Publish(audioRTPTrack); err != nil {
    364. log.Debug("Error publishing audio track", err)
    365. return err
    366. }
    367. t.IONAudiopub = pub[0]
    368. t.IONSfuTrack.AudioRTPTrack = audioRTPTrack
    369. r.Localtracks[streamname] = t
    370. log.Debug("[RTPTrackPublished_to_ION]", "published audio track -> ", streamname)
    371. }
    372. }
    373. }
    374. }
    375. }
    376. // r.Localtracks[streamname] = t
    377. // } else {
    378. // log.Debug(streamname, "is exit publish")
    379. // }
    380. return nil
    381. }
    382. func (r *Room) TrackSendIonRtpPackets(trackname, kind string, data []byte) (n int, err error) {
    383. if trackname == "" {
    384. log.Debug("Track name is null")
    385. return 0, fmt.Errorf("input trackname is null")
    386. }
    387. // var t *webrtc.TrackLocalStaticSample
    388. var t *webrtc.TrackLocalStaticRTP
    389. track := r.Localtracks[trackname]
    390. if track == nil {
    391. log.Debug("TrackSendIonRtpPackets:", "Track is nil ->", trackname, "<- no to publish")
    392. return 0, fmt.Errorf(" track is null,no to publish")
    393. }
    394. if kind == "video" {
    395. t = track.IONSfuTrack.VideoRTPTrack
    396. } else if kind == "audio" {
    397. t = track.IONSfuTrack.AudioRTPTrack
    398. }
    399. if t == nil {
    400. log.Debug("TrackSendIonRtpPackets:", "t is nil ->", trackname, "<- no to publish")
    401. return 0, fmt.Errorf(" track is null,no to publish")
    402. }
    403. var sb *samplebuilder.SampleBuilder
    404. packets := &rtp.Packet{}
    405. if err := packets.Unmarshal(data); err != nil {
    406. return 0, err
    407. }
    408. sb.Push(packets)
    409. for _, p := range sb.PopPackets() {
    410. err = t.WriteRTP(p)
    411. if err != nil {
    412. log.Debug("[TrackSendIonRtpPackets] error", err)
    413. return 0, err
    414. }
    415. }
    416. //n, err = t.Write(data)
    417. return len(data), nil
    418. }
    419. func (r *Room) TrackSendIonData(trackname, kind string, data []byte, duration time.Duration) error {
    420. if trackname == "" {
    421. log.Debug("Track name is null")
    422. return fmt.Errorf("input trackname is null")
    423. }
    424. var t *webrtc.TrackLocalStaticSample
    425. track := r.Localtracks[trackname]
    426. if track == nil {
    427. log.Debug("Track is nil ->", trackname, "<- no to publish")
    428. return fmt.Errorf(" track is null,no to publish")
    429. }
    430. if kind == "video" {
    431. t = track.IONSfuTrack.VideoTrack
    432. } else if kind == "audio" {
    433. t = track.IONSfuTrack.AudioTrack
    434. }
    435. if t == nil {
    436. log.Debug("Track is nil ->", trackname, "<- no to publish")
    437. return fmt.Errorf(" track is null,no to publish")
    438. }
    439. if videoErr := t.WriteSample(media.Sample{
    440. Data: data,
    441. Duration: duration,
    442. }); videoErr != nil {
    443. log.Debug("WriteSample err", videoErr)
    444. // r.ConnectRoom()
    445. return nil //fmt.Errorf("WriteSample err %s", vedioErr)
    446. }
    447. return nil
    448. }
    449. func (r *Room) TrackCloseION(streamname string) error {
    450. if t := r.Localtracks[streamname]; t != nil {
    451. var pub []*webrtc.RTPSender
    452. if r.Localtracks[streamname].IONVideopub != nil {
    453. pub = append(pub, r.Localtracks[streamname].IONVideopub)
    454. }
    455. if r.Localtracks[streamname].IONAudiopub != nil {
    456. pub = append(pub, r.Localtracks[streamname].IONAudiopub)
    457. }
    458. if pub != nil {
    459. t.IONRtc.UnPublish(pub...)
    460. t.LiveKitRoomConnect.Disconnect()
    461. }
    462. r.Localtracks[streamname] = nil
    463. log.Debug("track ", streamname, "lost ,now removed", r)
    464. //r.RoomConnect.LocalParticipant.UnpublishTrack(r.RoomConnect.LocalParticipant.SID())
    465. // r.Localtracks[streamname+"-video"]
    466. }
    467. return nil
    468. }

     

     

  • 相关阅读:
    layui的一些问题
    2024-6-19(沉默springboot)
    ELK+kafka+filebeat企业内部日志分析系统
    从一次数据库误操作开始了解MySQL日志【bin log、redo log、undo log】
    Python接口自动化实战案例
    Android音频——音量调节
    (Hexagon_V65_Programmers_Reference_Manual(13)
    南大通用GBase8s 常用SQL语句(241)
    java JVM原理与常识知识点
    分块指北
  • 原文地址:https://blog.csdn.net/superxxd/article/details/126297780