• 使用javacv对摄像头视频转码并实现播放


    要实现Java接受RTSP流解码,并推送给前端实现播放实时流,可以使用一些流媒体处理库,比如JavaCV或者FFmpeg等。以下是一个简单的示例代码:

    1.控制层方面的

    根据视频rtsp流链接打开转换,通过响应写出流到前台使用flvjs播放视频

     一个播放器销毁时,将对应转换器线程暂停

    1. @RestController
    2. @RequestMapping("flv")
    3. public class FlvVideoController {
    4. @Autowired
    5. private IFLVService iflvService;
    6. /**
    7. * 根据视频rtsp流链接打开转换,通过响应写出流到前台使用flvjs播放视频
    8. * @param url 视频链接
    9. * @param httpServletResponse 响应请求
    10. * @author xufeng
    11. */
    12. @RequestMapping(method = RequestMethod.GET, value = "/open/{param}")
    13. public void open(@PathVariable(value = "param") String url, HttpServletResponse httpServletResponse) {
    14. try {
    15. System.out.println("==url="+url);
    16. if(StringUtils.isBlank(url)) {
    17. url="";
    18. }
    19. BASE64Decoder base64Decoder = new BASE64Decoder();
    20. //获取当前登录用户主键
    21. String userId = "1";
    22. //String userId = UserContext.getCurrentUser().getId();
    23. //为保持url长度,需要先对前端传来的url进行base64解码,再调用flvService接口
    24. iflvService.open(new String(base64Decoder.decodeBuffer(url)), userId, httpServletResponse);
    25. } catch (Exception e) {
    26. e.printStackTrace();
    27. }
    28. }
    29. /**
    30. * 一个播放器销毁时,将对应转换器线程暂停
    31. * @author xufeng
    32. * @param videoUrl 视频流链接
    33. * @return EosDataTransferObject
    34. */
    35. @ResponseBody
    36. @RequestMapping(method = RequestMethod.GET, value = "/closeTransThread")
    37. public JsonResult closeTransThread(/*@RequestParam(value = "videoUrl") String videoUrl*/) {
    38. try {
    39. String videoUrl="rtsp://admin:xxx:554/cam/realmonitor?channel=1&subtype=0";
    40. //视频流链接为空直接返回
    41. if (StringUtils.isBlank(videoUrl)) {
    42. return new JsonResult();
    43. }
    44. //获取当前登录用户主键
    45. String userId = "1";
    46. //String userId = UserContext.getCurrentUser().getId();
    47. //使用主键获取当前所有转换器
    48. ConcurrentHashMap conMaps = ConverterRegistration.getAllConverters(userId);
    49. //通过视频流链接取对应的转换器
    50. Converter converter = ConverterRegistration.isExist(videoUrl, conMaps);
    51. if (null != converter) {
    52. //暂停转换器线程,1分钟无新线程创建,该线程即被销毁
    53. converter.exit();
    54. }
    55. } catch (Exception e) {
    56. e.printStackTrace();
    57. }
    58. return new JsonResult();
    59. }
    60. }

    2.视频流转换接口

    1. public interface IFLVService {
    2. /**
    3. * 打开一个流地址
    4. *
    5. * @param url rtsp流链接
    6. * @param userId 用户主键
    7. * @param response 响应请求
    8. * @author xufeng
    9. */
    10. void open(String url,String userId, Object response);
    11. }
    FLV流转换
    1. @Service("flvService")
    2. public class FLVService implements IFLVService {
    3. /**
    4. * 打开一个流地址,写入response
    5. * @param url 流地址
    6. * @param userId 用户主键
    7. * @param object HttpServletResponse
    8. * @author xufeng
    9. */
    10. @Override
    11. public void open(String url, String userId, Object object) {
    12. //创建转换器线程并启动
    13. Converter c = ConverterRegistration.open(url, userId);
    14. //UUID设置一个key值
    15. String key = UUID.randomUUID().toString();
    16. //创建输出字节流
    17. OutputStreamEntity outEntity = new OutputStreamEntity(new ByteArrayOutputStream(), System.currentTimeMillis(),
    18. key);
    19. //添加流输出
    20. System.out.println("==添加流输出=="+key);
    21. c.addOutputStreamEntity(key, outEntity);
    22. try {
    23. HttpServletResponse response = (HttpServletResponse) object;
    24. //设置响应头
    25. response.setContentType("video/x-flv");
    26. response.setHeader("Connection", "keep-alive");
    27. response.setStatus(HttpServletResponse.SC_OK);
    28. //写出缓冲信息,并清空
    29. response.flushBuffer();
    30. //循环读取outEntity里的流输出给前台
    31. System.out.println(c.getConverterState()+"==(response)循环读取outEntity里的流输出给前台==");
    32. readFlvStream(c, outEntity, response);
    33. } catch (Exception e) {
    34. //客户端长连接过程中被异常关闭,关闭该长连接对应的转换器线程
    35. c.exit();
    36. e.printStackTrace();
    37. //c.removeOutputStreamEntity(outEntity.getKey());
    38. }
    39. }
    40. /**
    41. * 递归读取转换好的视频流
    42. *
    43. * @param c 转换器
    44. * @param outEntity 输出流
    45. * @param response 响应
    46. * @author xufeng
    47. * @throws Exception
    48. */
    49. public void readFlvStream(Converter c, OutputStreamEntity outEntity, HttpServletResponse response)
    50. throws Exception {
    51. //根据转换器状态来决定是继续等待、读取、结束流输出
    52. switch (c.getConverterState()) {
    53. case INITIAL:
    54. Thread.sleep(300);
    55. readFlvStream(c, outEntity, response);
    56. break;
    57. case OPEN:
    58. Thread.sleep(100);
    59. //System.out.println("=== OPEN递归读取转换好的视频流=="+c.getUrl());
    60. readFlvStream(c, outEntity, response);
    61. break;
    62. case RUN:
    63. if (outEntity.getOutput().size() > 0) {
    64. byte[] b = outEntity.getOutput().toByteArray();
    65. outEntity.getOutput().reset();
    66. response.getOutputStream().write(b);
    67. outEntity.setUpdateTime(System.currentTimeMillis());
    68. }
    69. System.out.println("=== RUN递归读取转换好的视频流=="+c.getUrl());
    70. c.setUpdateTime(System.currentTimeMillis());
    71. Thread.sleep(100);
    72. readFlvStream(c, outEntity, response);
    73. break;
    74. case CLOSE:
    75. //log.info("close");
    76. break;
    77. default:
    78. break;
    79. }
    80. }
    81. }

    3.转换

    1. public class ConverterRegistration {
    2. /**
    3. * 转换器集合(根据用户ID分类)
    4. */
    5. private static ConcurrentHashMap> converters = new ConcurrentHashMap<>();
    6. /**
    7. * 线程池
    8. */
    9. private static ExecutorService executorService = Executors.newCachedThreadPool();
    10. /**
    11. * 开始一个转换
    12. * 如果已存在这个流的转换就直接返回已存在的转换器
    13. * @author xufeng
    14. * @param url 视频流链接
    15. * @param userId 用户主键
    16. * @return converter
    17. */
    18. public static Converter open(String url, String userId) {
    19. System.out.println("===开始一个转换==="+url);
    20. //判断当前用户是否存在转换器线程集合,没有则新建
    21. ConcurrentHashMap concurrentHashMap = converters.get(userId);
    22. if (concurrentHashMap == null) {
    23. concurrentHashMap = new ConcurrentHashMap<>(16);
    24. converters.put(userId, concurrentHashMap);
    25. }
    26. //判断是否已存在该转换器
    27. Converter c = isExist(url, concurrentHashMap);
    28. System.out.println("===判断是否已经存在转换器=="+c);
    29. try {
    30. if (null == c) {
    31. String key = UUID.randomUUID().toString();
    32. //创建线程
    33. c = new ConverterFactories(url, UUID.randomUUID().toString(), converters.get(userId));
    34. //记录到集合
    35. concurrentHashMap.put(key, c);
    36. //c.start();
    37. //用线程池启动
    38. executorService.execute((Runnable) c);
    39. }
    40. }catch (Exception e) {
    41. e.printStackTrace();
    42. }
    43. //如果该线程存在,但处于停止状态,则重新设置状态播放
    44. if (!c.isRuning()) {
    45. //设置运行状态
    46. c.setRuning(true);
    47. //设置初始化标志
    48. c.setState(ConverterState.INITIAL);
    49. //线程池启动
    50. executorService.execute((Runnable) c);
    51. }
    52. return c;
    53. }
    54. /**
    55. * 如果流已存在,就共用一个
    56. * @author xufeng
    57. * @param url 链接
    58. * @param concurrentHashMap 转换器集合
    59. * @return converter
    60. */
    61. public static Converter isExist(String url, ConcurrentHashMap concurrentHashMap) {
    62. //遍历集合,根据url判断是否已存在该流视频
    63. for (Converter c : concurrentHashMap.values()) {
    64. if (url.equals(c.getUrl())) {
    65. return c;
    66. }
    67. }
    68. return null;
    69. }
    70. /**
    71. * 返回集合中的所有转换器
    72. * @author xufeng
    73. * @param userId 用户主键
    74. * @return converters
    75. */
    76. public static ConcurrentHashMap getAllConverters(String userId){
    77. return converters.get(userId);
    78. }
    79. }

    4.使用javacv

    1. public class ConverterFactories extends Thread implements Converter {
    2. /**
    3. * 运行状态
    4. */
    5. public volatile boolean runing = true;
    6. /**
    7. * 读流器
    8. */
    9. private FFmpegFrameGrabber grabber;
    10. /**
    11. * 转码器
    12. */
    13. private FFmpegFrameRecorder recorder;
    14. /**
    15. * 转FLV格式的头信息
    16. * 如果有第二个客户端播放首先要返回头信息
    17. */
    18. private byte[] headers;
    19. /**
    20. * 保存转换好的流
    21. */
    22. private ByteArrayOutputStream stream;
    23. /**
    24. * 流地址,h264,aac
    25. */
    26. private String url;
    27. /**
    28. * 流输出
    29. */
    30. private Map outEntitys;
    31. /**
    32. * 当前转换器状态
    33. */
    34. private ConverterState state = ConverterState.INITIAL;
    35. /**
    36. * key用于表示这个转换器
    37. */
    38. private String key;
    39. /**
    40. * 上次更新时间
    41. * 客户端读取是刷新
    42. * 如果没有客户端读取,会在一分钟后销毁这个转换器
    43. */
    44. private long updateTime;
    45. /**
    46. * 转换队列
    47. */
    48. private Map factories;
    49. public ConverterFactories(String url, String key, Map factories) {
    50. this.url = url;
    51. this.key = key;
    52. this.factories = factories;
    53. this.updateTime = System.currentTimeMillis();
    54. }
    55. @Override
    56. public void run() {
    57. try {
    58. //使用ffmpeg抓取流,创建读流器
    59. grabber = new FFmpegFrameGrabber(url);
    60. //如果为rtsp流,增加配置
    61. if ("rtsp".equals(url.substring(0, 4))) {
    62. //设置打开协议tcp / udp
    63. grabber.setOption("rtsp_transport", "tcp");
    64. //设置未响应超时时间 0.5秒
    65. grabber.setOption("stimeout", "500000");
    66. //设置缓存大小,提高画质、减少卡顿花屏
    67. //grabber.setOption("buffer_size", "1024000");
    68. //设置视频比例
    69. //grabber.setAspectRatio(1.7777);
    70. } else {
    71. grabber.setOption("timeout", "500000");
    72. }
    73. grabber.start();
    74. stream = new ByteArrayOutputStream();
    75. outEntitys = new ConcurrentHashMap<>();
    76. //设置转换状态为打开
    77. state = ConverterState.OPEN;
    78. //创建转码器
    79. recorder = new FFmpegFrameRecorder(
    80. stream, grabber.getImageWidth(),
    81. grabber.getImageHeight(),
    82. grabber.getAudioChannels());
    83. //配置转码器
    84. recorder.setFrameRate(grabber.getFrameRate());
    85. recorder.setSampleRate(grabber.getSampleRate());
    86. if (grabber.getAudioChannels() > 0) {
    87. recorder.setAudioChannels(grabber.getAudioChannels());
    88. recorder.setAudioBitrate(grabber.getAudioBitrate());
    89. recorder.setAudioCodec(grabber.getAudioCodec());
    90. //设置视频比例
    91. //recorder.setAspectRatio(grabber.getAspectRatio());
    92. }
    93. recorder.setFormat("flv");
    94. recorder.setVideoBitrate(grabber.getVideoBitrate());
    95. recorder.setVideoCodec(grabber.getVideoCodec());
    96. recorder.start(grabber.getFormatContext());
    97. //进入写入运行状态
    98. state = ConverterState.RUN;
    99. if (headers == null) {
    100. headers = stream.toByteArray();
    101. stream.reset();
    102. for (OutputStreamEntity o : outEntitys.values()) {
    103. o.getOutput().write(headers);
    104. }
    105. }
    106. int errorNum = 0;
    107. //线程运行时
    108. while (runing) {
    109. //FFmpeg读流压缩
    110. AVPacket k = grabber.grabPacket();
    111. if (k != null) {
    112. try {
    113. //转换器转换
    114. recorder.recordPacket(k);
    115. } catch (Exception e) {
    116. }
    117. byte[] b = stream.toByteArray();
    118. stream.reset();
    119. for (OutputStreamEntity o : outEntitys.values()) {
    120. if (o.getOutput().size() < (1024 * 1024)) {
    121. o.getOutput().write(b);
    122. }
    123. }
    124. errorNum = 0;
    125. } else {
    126. errorNum++;
    127. if (errorNum > 500) {
    128. break;
    129. }
    130. }
    131. }
    132. } catch (Exception e) {
    133. //log.error(e.getMessage(), e);
    134. state = ConverterState.ERROR;
    135. } finally {
    136. closeConverter();
    137. //log.info("exit");
    138. state = ConverterState.CLOSE;
    139. factories.remove(this.key);
    140. }
    141. }
    142. /**
    143. * 退出转换
    144. */
    145. public void closeConverter() {
    146. try {
    147. //停止转码器
    148. if (null != recorder) {
    149. recorder.stop();
    150. }
    151. //停止、关闭读流器
    152. grabber.stop();
    153. grabber.close();
    154. //关闭转码器
    155. if (null != recorder) {
    156. recorder.close();
    157. }
    158. //关闭流
    159. if (null != stream) {
    160. stream.close();
    161. }
    162. if (null != outEntitys) {
    163. for (OutputStreamEntity o : outEntitys.values()) {
    164. o.getOutput().close();
    165. }
    166. }
    167. } catch (Exception e) {
    168. e.printStackTrace();
    169. //log.error(e.getMessage(), e);
    170. }
    171. }
    172. @Override
    173. public String getKey() {
    174. return this.key;
    175. }
    176. @Override
    177. public String getUrl() {
    178. return this.url;
    179. }
    180. @Override
    181. public ConverterState getConverterState() {
    182. return this.state;
    183. }
    184. @Override
    185. public void addOutputStreamEntity(String key, OutputStreamEntity entity) {
    186. try {
    187. switch (this.state) {
    188. case INITIAL:
    189. Thread.sleep(100);
    190. addOutputStreamEntity(key, entity);
    191. break;
    192. case OPEN:
    193. outEntitys.put(key, entity);
    194. break;
    195. case RUN:
    196. entity.getOutput().write(this.headers);
    197. outEntitys.put(key, entity);
    198. break;
    199. default:
    200. break;
    201. }
    202. } catch (Exception e) {
    203. //log.error(e.getMessage(), e);
    204. }
    205. }
    206. @Override
    207. public void setUpdateTime(long updateTime) {
    208. this.updateTime = updateTime;
    209. }
    210. @Override
    211. public long getUpdateTime() {
    212. return this.updateTime;
    213. }
    214. @Override
    215. public void exit() {
    216. //设置线程状态为非运行状态,最后会进入finally块关闭读流器、转码器、流
    217. this.runing = false;
    218. try {
    219. this.join();
    220. } catch (Exception e) {
    221. e.printStackTrace();
    222. //log.error(e.getMessage(), e);
    223. }
    224. }
    225. @Override
    226. public OutputStreamEntity getOutputStream(String key) {
    227. if (outEntitys.containsKey(key)) {
    228. return outEntitys.get(key);
    229. }
    230. return null;
    231. }
    232. @Override
    233. public Map allOutEntity() {
    234. return this.outEntitys;
    235. }
    236. @Override
    237. public void removeOutputStreamEntity(String key) {
    238. this.outEntitys.remove(key);
    239. }
    240. @Override
    241. public boolean isRuning() {
    242. return runing;
    243. }
    244. @Override
    245. public void setRuning(boolean runing) {
    246. this.runing = runing;
    247. }
    248. @Override
    249. public void setState(ConverterState state) {
    250. this.state = state;
    251. }
    252. }

    rtsp流转换器接口

    1. public interface Converter {
    2. /**
    3. * 设置线程状态
    4. * @param state 状态标志
    5. */
    6. void setState(ConverterState state);
    7. /**
    8. * 获取该转换的key
    9. */
    10. public String getKey();
    11. /**
    12. * 获取该转换的url
    13. *
    14. * @return
    15. */
    16. public String getUrl();
    17. /**
    18. * 获取转换的状态
    19. *
    20. * @return
    21. */
    22. public ConverterState getConverterState();
    23. /**
    24. * 添加一个流输出
    25. *
    26. * @param entity
    27. */
    28. public void addOutputStreamEntity(String key, OutputStreamEntity entity);
    29. /**
    30. * 所有流输出
    31. *
    32. * @return
    33. */
    34. public Map allOutEntity();
    35. /**
    36. * 移除一个流输出
    37. *
    38. * @param key
    39. */
    40. public void removeOutputStreamEntity(String key);
    41. /**
    42. * 设置修改时间
    43. *
    44. * @param updateTime
    45. */
    46. public void setUpdateTime(long updateTime);
    47. /**
    48. * 获取修改时间
    49. *
    50. * @return
    51. */
    52. public long getUpdateTime();
    53. /**
    54. * 退出转换
    55. */
    56. public void exit();
    57. /**
    58. * 启动
    59. */
    60. public void start();
    61. /**
    62. * 获取输出的流
    63. *
    64. * @param key
    65. * @return
    66. */
    67. public OutputStreamEntity getOutputStream(String key);
    68. /**
    69. * 判断线程是否在运行
    70. * @return boolean
    71. */
    72. public boolean isRuning();
    73. /**
    74. * 设置运行状态
    75. * @param runing 运行标志
    76. */
    77. public void setRuning(boolean runing);
    78. }

    6.输出视频流

    1. public class OutputStreamEntity {
    2. public OutputStreamEntity(ByteArrayOutputStream output, long updateTime, String key) {
    3. super();
    4. this.output = output;
    5. this.updateTime = updateTime;
    6. this.key = key;
    7. }
    8. /**
    9. * 字节数组输出流
    10. */
    11. private ByteArrayOutputStream output;
    12. /**
    13. * 更新时间
    14. */
    15. private long updateTime;
    16. /**
    17. * key标识
    18. */
    19. private String key;
    20. public ByteArrayOutputStream getOutput() {
    21. return output;
    22. }
    23. public void setOutput(ByteArrayOutputStream output) {
    24. this.output = output;
    25. }
    26. public long getUpdateTime() {
    27. return updateTime;
    28. }
    29. public void setUpdateTime(long updateTime) {
    30. this.updateTime = updateTime;
    31. }
    32. public String getKey() {
    33. return key;
    34. }
    35. public void setKey(String key) {
    36. this.key = key;
    37. }
    38. }
    1. 转换器状态(初始化、打开、关闭、错误、运行)
    2. public enum ConverterState {
    3. INITIAL, OPEN, CLOSE, ERROR, RUN
    4. }
    1. public class JsonResult extends HashMap implements Serializable {
    2. private static final long serialVersionUID = 1L;
    3. public static final int SUCCESS = 200;
    4. public JsonResult() {
    5. }
    6. /**
    7. * 返回成功
    8. */
    9. public static JsonResult ok() {
    10. return ok("操作成功");
    11. }
    12. /**
    13. * 返回成功
    14. */
    15. public static JsonResult okFallBack() {
    16. return okFallBack("操作成功");
    17. }
    18. /**
    19. * 返回成功
    20. */
    21. public JsonResult put(Object obj) {
    22. return this.put("data", obj);
    23. }
    24. /**
    25. * 返回成功
    26. */
    27. public static JsonResult ok(String message) {
    28. return result(200, message);
    29. }
    30. /**
    31. * 降级函数 - 返回成功
    32. */
    33. public static JsonResult okFallBack(String message) {
    34. return result(205, message);
    35. }
    36. /**
    37. * 返回成功
    38. */
    39. public static JsonResult result(int code, String message) {
    40. JsonResult jsonResult = new JsonResult();
    41. jsonResult.put("timestamp", System.currentTimeMillis());
    42. jsonResult.put("status", code);
    43. jsonResult.put("message", message);
    44. return jsonResult;
    45. }
    46. /**
    47. * 返回失败
    48. */
    49. public static JsonResult error() {
    50. return error("操作失败");
    51. }
    52. /**
    53. * 返回失败
    54. */
    55. public static JsonResult error(String message) {
    56. return error(500, message);
    57. }
    58. /**
    59. * 返回失败
    60. */
    61. public static JsonResult error(int code, String message) {
    62. JsonResult jsonResult = new JsonResult();
    63. jsonResult.put("timestamp", System.currentTimeMillis());
    64. jsonResult.put("status", code);
    65. jsonResult.put("message", message);
    66. return jsonResult;
    67. }
    68. /**
    69. * 设置code
    70. */
    71. public JsonResult setCode(int code) {
    72. super.put("status", code);
    73. return this;
    74. }
    75. /**
    76. * 设置message
    77. */
    78. public JsonResult setMessage(String message) {
    79. super.put("message", message);
    80. return this;
    81. }
    82. /**
    83. * 放入object
    84. */
    85. @Override
    86. public JsonResult put(String key, Object object) {
    87. super.put(key, object);
    88. return this;
    89. }
    90. /**
    91. * 权限禁止
    92. */
    93. public static JsonResult forbidden(String message) {
    94. JsonResult jsonResult = new JsonResult();
    95. jsonResult.put("timestamp", System.currentTimeMillis());
    96. jsonResult.put("status", 401);
    97. jsonResult.put("message", message);
    98. return jsonResult;
    99. }
    100. /*@Override
    101. public String toString() {
    102. return JSONObject.toJSONString(this);
    103. }
    104. public JSONObject toJSONObject() {
    105. return JSONObject.parseObject(toString());
    106. }*/
    107. }

    7.前端展现

    1. "UTF-8">
    2. Insert title here
    3. "video-video-div">
  • "text" id="url" value="rtsp://127.0.0.1/myvideo">
  • 相关阅读:
    Cadence Allegro 通过全局视窗局部放大板卡的方法
    容器化部署,不能识别中文件夹和中文文件
    【SQL】按特定字符分割一行转多行
    623. 在二叉树中增加一行(难度:中等)
    C++ 读MTK代码 综测校准 PSU读开关电源电压或电流 visa
    微服务框架 SpringCloud微服务架构 4 Ribbon 4.1 负载均衡原理
    csapp-Machine-Level Representation of Program-review
    mysql下载和安装,使用
    @Bean注解详解
    Nginx的反向代理、动静分离、负载均衡
  • 原文地址:https://blog.csdn.net/ruiguang21/article/details/139353132