上一篇文章简单介绍了java如何调用ffmpeg的命令:http://blog.csdn.net/eguid_1/article/details/51777716
上上一篇介绍了nginx-rtmp服务器的搭建:http://blog.csdn.net/eguid_1/article/details/51749830
这一篇将进一步深挖java对ffmepg命令的控制并最终实现服务接口化
本篇文章源码:http://download.csdn.net/detail/eguid_1/9563637
通知:由于很多同学反映本章代码的命令封装设计的不是很好,所以对本章代码重新进行了实现,新版本推翻了本章原有代码内部实现,接口设计更加利于注入自己的实现,并增加可执行原生ffmpeg命令功能
新版本请到这里查看:java封装FFmpeg命令,支持原生ffmpeg全部命令,实现FFmpeg多进程处理与多线程输出控制(开启、关闭、查询),rtsp/rtmp推流、拉流
该服务接口可实现rtsp协议转换为rtmp协议且可以实现rtmp直播流发布到nginx流媒体服务器,其中最为重要的是如何实现通过参数生成ffmpeg命令并执行,且可以通过接口进行控制ffmpeg命令的停止
push端接口化管理
1、采用多线程方式,每次调用push端口开启一个主进程及两个输出线程
2、可以对每个push端(线程)进行开启和关闭的控制
3、统一接口参数,对ffmpeg的命令做到参数可控制
PushManager提供push(开启一个push处理器),closePush(关闭push处理器),viewAppName接口(查看当前已经开启的应用)
1.1、应用名和push处理器的关系
一个处理器对应一个应用名
1.2、push处理器
一个处理器对应一个push主进程和两个输出线程
2.1、主进程开启
服务接口调用push处理器开启push主进程,主进程会自动开启两个输出线程用于消息输出,
开启后会将主进程Process和两个输出线程OutHandler通过map返回给服务接口。
2.2、主进程关闭
主进程可通过Process的destroy方法进行安全关闭
3.1、输出线程开启
输出线程从主进程获取到输出流进行输出
3.2、输出线程关闭
输出线程重写了destory方法,用于安全的关闭输出线程
持久层分为两个:
1、appName(应用名)-pushId(push处理器的ID)对应关系
用于维护应用名和push处理器ID的对应关系,pushId为随机生成id
2、pushId-主进程-输出线程对应关系
主要用于存放主进程(Process)和两个输出线程,建立两者对应关系,方便服务接口管理
/** * 实现push管理器的push,delete,view服务 * * @author eguid * @see PushMangerImpl * @since jdk1.7 */ public class PushManagerImpl implements PushManager { /** * 引用push处理器 */ private PushHandler pusher = new PushHandlerImpl(); /** * 管理应用名和push处理器之间的关系 */ private PushId_AppRelshipDao pard=new PushId_AppRelshipDaoImpl(); /** * 管理处理器的主进程Process及两个输出线程的关系 */ private HandlerDao hd = new HandlerDaoImpl(); public void setPusher(PushHandler pusher) { this.pusher = pusher; } public void setPard(PushId_AppRelshipDao pard) { this.pard = pard; } public void setHd(HandlerDao hd) { this.hd = hd; } @Override public String push(Map<String, Object> map) { if(map==null||map.isEmpty()||!map.containsKey("appName")) { return null; } String appName=null; ConcurrentMap<String, Object> resultMap = null; try { appName=(String)map.get("appName"); if(appName!=null&&"".equals(appName.trim())) { return null; } resultMap = pusher.push(map); // 生成一个标识该命令行线程集的key String pushId = UUID.randomUUID().toString(); hd.set(pushId, resultMap); pard.set(appName, pushId); } catch (IOException e) { // 暂时先写这样,后期加日志 System.err.println("发生一个异常" + e.getMessage()); } return appName; } @Override public void closePush(String appName) { String pushId=null; if(pard.isHave(appName)) { pushId= pard.getPushId(appName); } if (pushId!=null&&hd.isHave(pushId)) { ConcurrentMap<String, Object> map = hd.get(pushId); //关闭两个线程 ((OutHandler)map.get("error")).destroy(); ((OutHandler)map.get("info")).destroy(); //暂时先这样写,后期加日志 System.out.println("停止命令-----end commond"); //关闭命令主进程 ((Process)map.get("process")).destroy(); //删除处理器与线程对应关系表 hd.delete(pushId); //删除应用名对应关系表 pard.delete(appName); } } @Override public List<String> viewAppName() { return pard.getAll(); }
/** * 提供解析参数生成ffmpeg命令并处理push操作 * @see PushHandlerImpl * @since jdk1.7 */ public class PushHandlerImpl implements PushHandler { /* * "ffmpeg -i "+ "rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0 "+" -f flv -r 25 -s 640x360 -an" + " rtmp://192.168.30.21/live/test" * 推送流格式: name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频 */ @Override public ConcurrentMap<String, Object> push(Map<String, Object> paramMap) throws IOException { // 从map里面取数据,组装成命令 String comm = getComm4Map(paramMap); ConcurrentMap<String, Object> resultMap = null; // 执行命令行 final Process proc = Runtime.getRuntime().exec(comm); System.out.println("执行命令----start commond"); OutHandler errorGobbler = new OutHandler(proc.getErrorStream(), "Error"); OutHandler outputGobbler = new OutHandler(proc.getInputStream(), "Info"); errorGobbler.start(); outputGobbler.start(); // 返回参数 resultMap = new ConcurrentHashMap<String, Object>(); resultMap.put("info", outputGobbler); resultMap.put("error", errorGobbler); resultMap.put("process", proc); return resultMap; } /** * 通过解析参数生成可执行的命令行字符串; * name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频 * * @param paramMap * @return 命令行字符串 */ protected String getComm4Map(Map<String, Object> paramMap) { // -i:输入流地址或者文件绝对地址 StringBuilder comm = new StringBuilder("ffmpeg -i "); // 是否有必输项:输入地址,输出地址,应用名 if (paramMap.containsKey("input") && paramMap.containsKey("output") && paramMap.containsKey("appName")) { comm.append(paramMap.get("input")).append(" "); // -f :转换格式,默认flv comm.append(" -f ").append(paramMap.containsKey("fmt") ? paramMap.get("fmt") : "flv").append(" "); // -r :帧率,默认25 comm.append("-r ").append(paramMap.containsKey("fps") ? paramMap.get("fps") : "30").append(" "); // -s 分辨率 默认是原分辨率 comm.append("-s ").append(paramMap.containsKey("rs") ? paramMap.get("rs") : "").append(" "); // -an 禁用音频 comm.append("-an ").append(paramMap.containsKey("disableAudio") && ((Boolean)paramMap.get("disableAudio")) ? "-an" : "").append(" "); // 输出地址 comm.append(paramMap.get("output")); //发布的应用名 comm.append(paramMap.get("appName")); //一个视频源,可以有多个输出,第二个输出为拷贝源视频输出,不改变视频的各项参数并且命名为应用名+HD comm.append(" ").append(" -vcodec copy -f flv -an ").append(paramMap.get("output")).append(paramMap.get("appName")).append("HD"); System.out.println(comm.toString()); return comm.toString(); } else { throw new RuntimeException("输入流地址不能为空!"); } } }
** * 用于输出命令行主进程的消息线程(必须开启,否则命令行主进程无法正常执行) 重要:该类重写了destroy方法,用于安全的关闭该线程 * * @author eguid * @see OutHandler * @since jdk1.7 */ public class OutHandler extends Thread { // 控制状态 volatile boolean status = true; BufferedReader br = null; String type = null; public OutHandler(InputStream is, String type) { br = new BufferedReader(new InputStreamReader(is)); this.type = type; } /** * 重写线程销毁方法,安全的关闭线程 */ @Override public void destroy() { status = false; } /** * 执行输出线程 */ @Override public void run() { String msg = null; try { while (status) { if ((msg = br.readLine()) != null) { System.out.println(type + "消息:" + msg); } } } catch (IOException e) { e.printStackTrace(); } } }
/** * 命令行执行处理器缓存,方便管理处理器的开启和关闭 * @author eguid * @see HandlerDao * @since jdk1.7 */ public interface HandlerDao { /** * 获取某个处理器 * @param pushId * @return */ public ConcurrentMap get(String pushId); /** * 存放一个处理器 * @param handlerMap */ public void set(String key, ConcurrentMap<String, Object> resultMap); /** * 获取全部处理器的id * @return */ public ConcurrentMap getAll(); /** * 删除某个处理器 * @param pushId */ public void delete(String pushId); /** * 是否存在key */ public boolean isHave(String pushId); }
/** * 用于维护管理应用名与pushId的关系对应 * @author eguid * @see PushId_AppRelshipDao * @since jdk1.7 */ public interface PushId_AppRelshipDao { /** * 获取应用名对应的pushId * @param appName * @return pushId */ public String getPushId(String appName); /** * 插入一个应用名和pushId对应 * @param appName * @param pushId */ public void set(String appName,String pushId); /** * 通过应用名删除对应关系 * @param appName */ public void delete(String appName); /** * 获取全部应用 */ public List<String> getAll(); /** * 是否存在应用名 * @param appName * @return true:存在;false:不存在 */ public boolean isHave(String appName); }
下一篇将介绍一些支持rtmp直播的播放器