要实现Java接受RTSP流解码,并推送给前端实现播放实时流,可以使用一些流媒体处理库,比如JavaCV或者FFmpeg等。以下是一个简单的示例代码:
根据视频rtsp流链接打开转换,通过响应写出流到前台使用flvjs播放视频
一个播放器销毁时,将对应转换器线程暂停
- @RestController
- @RequestMapping("flv")
- public class FlvVideoController {
-
- @Autowired
- private IFLVService iflvService;
-
- /**
- * 根据视频rtsp流链接打开转换,通过响应写出流到前台使用flvjs播放视频
- * @param url 视频链接
- * @param httpServletResponse 响应请求
- * @author xufeng
- */
- @RequestMapping(method = RequestMethod.GET, value = "/open/{param}")
- public void open(@PathVariable(value = "param") String url, HttpServletResponse httpServletResponse) {
- try {
- System.out.println("==url="+url);
- if(StringUtils.isBlank(url)) {
- url="";
- }
- BASE64Decoder base64Decoder = new BASE64Decoder();
- //获取当前登录用户主键
- String userId = "1";
- //String userId = UserContext.getCurrentUser().getId();
- //为保持url长度,需要先对前端传来的url进行base64解码,再调用flvService接口
- iflvService.open(new String(base64Decoder.decodeBuffer(url)), userId, httpServletResponse);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- /**
- * 一个播放器销毁时,将对应转换器线程暂停
- * @author xufeng
- * @param videoUrl 视频流链接
- * @return EosDataTransferObject
- */
- @ResponseBody
- @RequestMapping(method = RequestMethod.GET, value = "/closeTransThread")
- public JsonResult closeTransThread(/*@RequestParam(value = "videoUrl") String videoUrl*/) {
- try {
- String videoUrl="rtsp://admin:xxx:554/cam/realmonitor?channel=1&subtype=0";
- //视频流链接为空直接返回
- if (StringUtils.isBlank(videoUrl)) {
- return new JsonResult();
- }
- //获取当前登录用户主键
- String userId = "1";
- //String userId = UserContext.getCurrentUser().getId();
- //使用主键获取当前所有转换器
- ConcurrentHashMap
conMaps = ConverterRegistration.getAllConverters(userId); - //通过视频流链接取对应的转换器
- Converter converter = ConverterRegistration.isExist(videoUrl, conMaps);
- if (null != converter) {
- //暂停转换器线程,1分钟无新线程创建,该线程即被销毁
- converter.exit();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return new JsonResult();
- }
-
- }
- public interface IFLVService {
-
- /**
- * 打开一个流地址
- *
- * @param url rtsp流链接
- * @param userId 用户主键
- * @param response 响应请求
- * @author xufeng
- */
- void open(String url,String userId, Object response);
-
- }
FLV流转换
- @Service("flvService")
- public class FLVService implements IFLVService {
-
- /**
- * 打开一个流地址,写入response
- * @param url 流地址
- * @param userId 用户主键
- * @param object HttpServletResponse
- * @author xufeng
- */
- @Override
- public void open(String url, String userId, Object object) {
- //创建转换器线程并启动
- Converter c = ConverterRegistration.open(url, userId);
- //UUID设置一个key值
- String key = UUID.randomUUID().toString();
- //创建输出字节流
- OutputStreamEntity outEntity = new OutputStreamEntity(new ByteArrayOutputStream(), System.currentTimeMillis(),
- key);
- //添加流输出
- System.out.println("==添加流输出=="+key);
- c.addOutputStreamEntity(key, outEntity);
- try {
- HttpServletResponse response = (HttpServletResponse) object;
- //设置响应头
- response.setContentType("video/x-flv");
- response.setHeader("Connection", "keep-alive");
- response.setStatus(HttpServletResponse.SC_OK);
- //写出缓冲信息,并清空
- response.flushBuffer();
- //循环读取outEntity里的流输出给前台
- System.out.println(c.getConverterState()+"==(response)循环读取outEntity里的流输出给前台==");
- readFlvStream(c, outEntity, response);
- } catch (Exception e) {
- //客户端长连接过程中被异常关闭,关闭该长连接对应的转换器线程
- c.exit();
- e.printStackTrace();
- //c.removeOutputStreamEntity(outEntity.getKey());
- }
- }
-
- /**
- * 递归读取转换好的视频流
- *
- * @param c 转换器
- * @param outEntity 输出流
- * @param response 响应
- * @author xufeng
- * @throws Exception
- */
- public void readFlvStream(Converter c, OutputStreamEntity outEntity, HttpServletResponse response)
- throws Exception {
- //根据转换器状态来决定是继续等待、读取、结束流输出
- switch (c.getConverterState()) {
- case INITIAL:
- Thread.sleep(300);
- readFlvStream(c, outEntity, response);
- break;
- case OPEN:
- Thread.sleep(100);
- //System.out.println("=== OPEN递归读取转换好的视频流=="+c.getUrl());
- readFlvStream(c, outEntity, response);
- break;
- case RUN:
- if (outEntity.getOutput().size() > 0) {
-
- byte[] b = outEntity.getOutput().toByteArray();
- outEntity.getOutput().reset();
- response.getOutputStream().write(b);
- outEntity.setUpdateTime(System.currentTimeMillis());
- }
- System.out.println("=== RUN递归读取转换好的视频流=="+c.getUrl());
- c.setUpdateTime(System.currentTimeMillis());
- Thread.sleep(100);
- readFlvStream(c, outEntity, response);
- break;
- case CLOSE:
- //log.info("close");
- break;
- default:
- break;
- }
- }
-
- }
- public class ConverterRegistration {
-
- /**
- * 转换器集合(根据用户ID分类)
- */
- private static ConcurrentHashMap
> converters = new ConcurrentHashMap<>(); - /**
- * 线程池
- */
- private static ExecutorService executorService = Executors.newCachedThreadPool();
-
- /**
- * 开始一个转换
- * 如果已存在这个流的转换就直接返回已存在的转换器
- * @author xufeng
- * @param url 视频流链接
- * @param userId 用户主键
- * @return converter
- */
- public static Converter open(String url, String userId) {
- System.out.println("===开始一个转换==="+url);
- //判断当前用户是否存在转换器线程集合,没有则新建
- ConcurrentHashMap
concurrentHashMap = converters.get(userId); - if (concurrentHashMap == null) {
- concurrentHashMap = new ConcurrentHashMap<>(16);
- converters.put(userId, concurrentHashMap);
- }
- //判断是否已存在该转换器
- Converter c = isExist(url, concurrentHashMap);
- System.out.println("===判断是否已经存在转换器=="+c);
- try {
- if (null == c) {
- String key = UUID.randomUUID().toString();
- //创建线程
- c = new ConverterFactories(url, UUID.randomUUID().toString(), converters.get(userId));
- //记录到集合
- concurrentHashMap.put(key, c);
- //c.start();
- //用线程池启动
- executorService.execute((Runnable) c);
- }
- }catch (Exception e) {
- e.printStackTrace();
- }
- //如果该线程存在,但处于停止状态,则重新设置状态播放
- if (!c.isRuning()) {
- //设置运行状态
- c.setRuning(true);
- //设置初始化标志
- c.setState(ConverterState.INITIAL);
- //线程池启动
- executorService.execute((Runnable) c);
- }
- return c;
- }
-
- /**
- * 如果流已存在,就共用一个
- * @author xufeng
- * @param url 链接
- * @param concurrentHashMap 转换器集合
- * @return converter
- */
- public static Converter isExist(String url, ConcurrentHashMap
concurrentHashMap) { - //遍历集合,根据url判断是否已存在该流视频
- for (Converter c : concurrentHashMap.values()) {
- if (url.equals(c.getUrl())) {
- return c;
- }
- }
- return null;
- }
-
- /**
- * 返回集合中的所有转换器
- * @author xufeng
- * @param userId 用户主键
- * @return converters
- */
- public static ConcurrentHashMap
getAllConverters(String userId){ - return converters.get(userId);
- }
- }
- public class ConverterFactories extends Thread implements Converter {
-
- /**
- * 运行状态
- */
- public volatile boolean runing = true;
- /**
- * 读流器
- */
- private FFmpegFrameGrabber grabber;
- /**
- * 转码器
- */
- private FFmpegFrameRecorder recorder;
- /**
- * 转FLV格式的头信息
- * 如果有第二个客户端播放首先要返回头信息
- */
- private byte[] headers;
- /**
- * 保存转换好的流
- */
- private ByteArrayOutputStream stream;
- /**
- * 流地址,h264,aac
- */
- private String url;
- /**
- * 流输出
- */
- private Map
outEntitys; -
- /**
- * 当前转换器状态
- */
- private ConverterState state = ConverterState.INITIAL;
- /**
- * key用于表示这个转换器
- */
- private String key;
- /**
- * 上次更新时间
- * 客户端读取是刷新
- * 如果没有客户端读取,会在一分钟后销毁这个转换器
- */
- private long updateTime;
- /**
- * 转换队列
- */
- private Map
factories; -
- public ConverterFactories(String url, String key, Map
factories) { - this.url = url;
- this.key = key;
- this.factories = factories;
- this.updateTime = System.currentTimeMillis();
- }
-
- @Override
- public void run() {
- try {
- //使用ffmpeg抓取流,创建读流器
- grabber = new FFmpegFrameGrabber(url);
- //如果为rtsp流,增加配置
- if ("rtsp".equals(url.substring(0, 4))) {
- //设置打开协议tcp / udp
- grabber.setOption("rtsp_transport", "tcp");
- //设置未响应超时时间 0.5秒
- grabber.setOption("stimeout", "500000");
- //设置缓存大小,提高画质、减少卡顿花屏
- //grabber.setOption("buffer_size", "1024000");
- //设置视频比例
- //grabber.setAspectRatio(1.7777);
- } else {
- grabber.setOption("timeout", "500000");
- }
- grabber.start();
- stream = new ByteArrayOutputStream();
- outEntitys = new ConcurrentHashMap<>();
- //设置转换状态为打开
- state = ConverterState.OPEN;
- //创建转码器
- recorder = new FFmpegFrameRecorder(
- stream, grabber.getImageWidth(),
- grabber.getImageHeight(),
- grabber.getAudioChannels());
-
- //配置转码器
- recorder.setFrameRate(grabber.getFrameRate());
- recorder.setSampleRate(grabber.getSampleRate());
- if (grabber.getAudioChannels() > 0) {
- recorder.setAudioChannels(grabber.getAudioChannels());
- recorder.setAudioBitrate(grabber.getAudioBitrate());
- recorder.setAudioCodec(grabber.getAudioCodec());
- //设置视频比例
- //recorder.setAspectRatio(grabber.getAspectRatio());
- }
- recorder.setFormat("flv");
- recorder.setVideoBitrate(grabber.getVideoBitrate());
- recorder.setVideoCodec(grabber.getVideoCodec());
- recorder.start(grabber.getFormatContext());
- //进入写入运行状态
- state = ConverterState.RUN;
-
- if (headers == null) {
- headers = stream.toByteArray();
- stream.reset();
- for (OutputStreamEntity o : outEntitys.values()) {
-
- o.getOutput().write(headers);
- }
- }
- int errorNum = 0;
- //线程运行时
- while (runing) {
- //FFmpeg读流压缩
- AVPacket k = grabber.grabPacket();
- if (k != null) {
- try {
- //转换器转换
- recorder.recordPacket(k);
- } catch (Exception e) {
- }
- byte[] b = stream.toByteArray();
- stream.reset();
- for (OutputStreamEntity o : outEntitys.values()) {
- if (o.getOutput().size() < (1024 * 1024)) {
- o.getOutput().write(b);
- }
- }
- errorNum = 0;
- } else {
- errorNum++;
- if (errorNum > 500) {
- break;
- }
- }
- }
- } catch (Exception e) {
- //log.error(e.getMessage(), e);
- state = ConverterState.ERROR;
- } finally {
- closeConverter();
- //log.info("exit");
- state = ConverterState.CLOSE;
- factories.remove(this.key);
- }
- }
-
- /**
- * 退出转换
- */
- public void closeConverter() {
- try {
- //停止转码器
- if (null != recorder) {
- recorder.stop();
- }
- //停止、关闭读流器
- grabber.stop();
- grabber.close();
- //关闭转码器
- if (null != recorder) {
- recorder.close();
- }
- //关闭流
- if (null != stream) {
- stream.close();
- }
- if (null != outEntitys) {
- for (OutputStreamEntity o : outEntitys.values()) {
- o.getOutput().close();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- //log.error(e.getMessage(), e);
- }
- }
-
- @Override
- public String getKey() {
- return this.key;
- }
-
- @Override
- public String getUrl() {
- return this.url;
- }
-
- @Override
- public ConverterState getConverterState() {
- return this.state;
- }
-
- @Override
- public void addOutputStreamEntity(String key, OutputStreamEntity entity) {
- try {
- switch (this.state) {
- case INITIAL:
- Thread.sleep(100);
- addOutputStreamEntity(key, entity);
- break;
- case OPEN:
- outEntitys.put(key, entity);
- break;
- case RUN:
- entity.getOutput().write(this.headers);
- outEntitys.put(key, entity);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- //log.error(e.getMessage(), e);
- }
- }
-
- @Override
- public void setUpdateTime(long updateTime) {
- this.updateTime = updateTime;
- }
-
- @Override
- public long getUpdateTime() {
- return this.updateTime;
- }
-
- @Override
- public void exit() {
- //设置线程状态为非运行状态,最后会进入finally块关闭读流器、转码器、流
- this.runing = false;
- try {
- this.join();
- } catch (Exception e) {
- e.printStackTrace();
- //log.error(e.getMessage(), e);
- }
- }
-
- @Override
- public OutputStreamEntity getOutputStream(String key) {
- if (outEntitys.containsKey(key)) {
- return outEntitys.get(key);
- }
- return null;
- }
-
- @Override
- public Map
allOutEntity() { - return this.outEntitys;
- }
-
- @Override
- public void removeOutputStreamEntity(String key) {
- this.outEntitys.remove(key);
- }
-
- @Override
- public boolean isRuning() {
- return runing;
- }
-
- @Override
- public void setRuning(boolean runing) {
- this.runing = runing;
- }
-
- @Override
- public void setState(ConverterState state) {
- this.state = state;
- }
- }
- public interface Converter {
-
- /**
- * 设置线程状态
- * @param state 状态标志
- */
- void setState(ConverterState state);
-
- /**
- * 获取该转换的key
- */
- public String getKey();
-
- /**
- * 获取该转换的url
- *
- * @return
- */
- public String getUrl();
-
- /**
- * 获取转换的状态
- *
- * @return
- */
- public ConverterState getConverterState();
-
- /**
- * 添加一个流输出
- *
- * @param entity
- */
- public void addOutputStreamEntity(String key, OutputStreamEntity entity);
-
- /**
- * 所有流输出
- *
- * @return
- */
- public Map
allOutEntity(); -
- /**
- * 移除一个流输出
- *
- * @param key
- */
- public void removeOutputStreamEntity(String key);
-
- /**
- * 设置修改时间
- *
- * @param updateTime
- */
- public void setUpdateTime(long updateTime);
-
- /**
- * 获取修改时间
- *
- * @return
- */
- public long getUpdateTime();
-
- /**
- * 退出转换
- */
- public void exit();
-
- /**
- * 启动
- */
- public void start();
-
- /**
- * 获取输出的流
- *
- * @param key
- * @return
- */
- public OutputStreamEntity getOutputStream(String key);
-
- /**
- * 判断线程是否在运行
- * @return boolean
- */
- public boolean isRuning();
-
- /**
- * 设置运行状态
- * @param runing 运行标志
- */
- public void setRuning(boolean runing);
- }
- public class OutputStreamEntity {
-
- public OutputStreamEntity(ByteArrayOutputStream output, long updateTime, String key) {
- super();
- this.output = output;
- this.updateTime = updateTime;
- this.key = key;
- }
-
- /**
- * 字节数组输出流
- */
- private ByteArrayOutputStream output;
- /**
- * 更新时间
- */
- private long updateTime;
- /**
- * key标识
- */
- private String key;
-
- public ByteArrayOutputStream getOutput() {
- return output;
- }
-
- public void setOutput(ByteArrayOutputStream output) {
- this.output = output;
- }
-
- public long getUpdateTime() {
- return updateTime;
- }
-
- public void setUpdateTime(long updateTime) {
- this.updateTime = updateTime;
- }
-
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
-
- }
- 转换器状态(初始化、打开、关闭、错误、运行)
- public enum ConverterState {
- INITIAL, OPEN, CLOSE, ERROR, RUN
- }
- public class JsonResult extends HashMap
implements Serializable { -
- private static final long serialVersionUID = 1L;
- public static final int SUCCESS = 200;
-
- public JsonResult() {
- }
-
- /**
- * 返回成功
- */
- public static JsonResult ok() {
- return ok("操作成功");
- }
-
- /**
- * 返回成功
- */
- public static JsonResult okFallBack() {
- return okFallBack("操作成功");
- }
-
- /**
- * 返回成功
- */
- public JsonResult put(Object obj) {
- return this.put("data", obj);
- }
-
- /**
- * 返回成功
- */
- public static JsonResult ok(String message) {
- return result(200, message);
- }
-
- /**
- * 降级函数 - 返回成功
- */
- public static JsonResult okFallBack(String message) {
- return result(205, message);
- }
-
- /**
- * 返回成功
- */
- public static JsonResult result(int code, String message) {
- JsonResult jsonResult = new JsonResult();
- jsonResult.put("timestamp", System.currentTimeMillis());
- jsonResult.put("status", code);
- jsonResult.put("message", message);
- return jsonResult;
- }
-
- /**
- * 返回失败
- */
- public static JsonResult error() {
- return error("操作失败");
- }
-
- /**
- * 返回失败
- */
- public static JsonResult error(String message) {
- return error(500, message);
- }
-
- /**
- * 返回失败
- */
- public static JsonResult error(int code, String message) {
- JsonResult jsonResult = new JsonResult();
- jsonResult.put("timestamp", System.currentTimeMillis());
- jsonResult.put("status", code);
- jsonResult.put("message", message);
- return jsonResult;
- }
-
- /**
- * 设置code
- */
- public JsonResult setCode(int code) {
- super.put("status", code);
- return this;
- }
-
- /**
- * 设置message
- */
- public JsonResult setMessage(String message) {
- super.put("message", message);
- return this;
- }
-
- /**
- * 放入object
- */
- @Override
- public JsonResult put(String key, Object object) {
- super.put(key, object);
- return this;
- }
-
- /**
- * 权限禁止
- */
- public static JsonResult forbidden(String message) {
- JsonResult jsonResult = new JsonResult();
- jsonResult.put("timestamp", System.currentTimeMillis());
- jsonResult.put("status", 401);
- jsonResult.put("message", message);
- return jsonResult;
- }
-
- /*@Override
- public String toString() {
- return JSONObject.toJSONString(this);
- }
- public JSONObject toJSONObject() {
- return JSONObject.parseObject(toString());
- }*/
-
- }
7.前端展现
- "UTF-8">
Insert title here - "video-video-div">
-
-
- "text" id="url" value="rtsp://127.0.0.1/myvideo">
-