转自:
下文笔者讲述Apache commons exec框架的简介说明,如下所示
Apache commmons exec框架的功能
Apache commons exec框架是对
Process进行封装
对外提供如下功能:
为Process的stdin, stdout, stderr重定向流
而不是File
并发向Process stdin写入数据、读取Process stdout和stderr的数据
避免进程阻塞
超时终止Process
实现思路: 1.引入相应的Jar包 2.调用相应的方法,即可运行Process中的方法
例:
引入相应的jar包
org.apache.commons commons-exec 1.3
DefaultExecutor
DefaultExecutor类的功能用于 DefaultExecutor是启动子进程的执行类
例:
// 设置工作目录 public void setWorkingDirectory(final File dir); // 设置流处理器 public void setStreamHandler(final ExecuteStreamHandler streamHandler); // 执行命令 public int execute(final CommandLine command); public int execute(final CommandLine command, final Mapenvironment); public void execute(final CommandLine command, final ExecuteResultHandler handler); public void execute(final CommandLine command, final Map environment, final ExecuteResultHandler handler); 实际的执行方法 private int executeInternal(final CommandLine command, final Map environment, final File dir, final ExecuteStreamHandler streams) throws IOException { setExceptionCaught(null); // launch():使用Runtime.exec创建Process // Runtime.getRuntime().exec(cmd.toStrings(), EnvironmentUtils.toStrings(env), workingDir); final Process process = this.launch(command, environment, dir); // 将用户提供的流与进程的流连接起来 try { streams.setProcessInputStream(process.getOutputStream()); streams.setProcessOutputStream(process.getInputStream()); streams.setProcessErrorStream(process.getErrorStream()); } catch (final IOException e) { process.destroy(); throw e; } // 启动线程,从输入流复制数据写入输出流 streams.start(); try { // add the process to the list of those to destroy if the VM exits if (this.getProcessDestroyer() != null) { this.getProcessDestroyer().add(process); } // associate the watchdog with the newly created process if (watchdog != null) { watchdog.start(process); } int exitValue = Executor.INVALID_EXITVALUE; try { exitValue = process.waitFor(); } catch (final InterruptedException e) { process.destroy(); } finally { // see http://bugs.sun.com/view_bug.do?bug_id=6420270 // see https://issues.apache.org/jira/browse/EXEC-46 // Process.waitFor should clear interrupt status when throwing InterruptedException // but we have to do that manually Thread.interrupted(); } if (watchdog != null) { watchdog.stop(); } try { streams.stop(); } catch (final IOException e) { setExceptionCaught(e); } closeProcessStreams(process); if (getExceptionCaught() != null) { throw getExceptionCaught(); } if (watchdog != null) { try { watchdog.checkException(); } catch (final IOException e) { throw e; } catch (final Exception e) { throw new IOException(e.getMessage()); } } if (this.isFailure(exitValue)) { throw new ExecuteException("Process exited with an error: " + exitValue, exitValue); } return exitValue; } finally { // remove the process to the list of those to destroy if the VM exits if (this.getProcessDestroyer() != null) { this.getProcessDestroyer().remove(process); } } }
CommandLine
封装命令和参数。
// 设置命令,无参数 public CommandLine(final String executable); // 设置命令文件,无参数 public CommandLine(final File executable); // 设置参数 public CommandLine addArguments(final String[] addArguments); // 设置未分拆的参数 public CommandLine addArguments(final String addArguments); // 设置替换值,替换参数中${}的占位变量 public void setSubstitutionMap(final MapsubstitutionMap); // 直接解析命令行:第一个元素是命令,其余是参数 public static CommandLine parse(final String line); public static CommandLine parse(final String line, final Map substitutionMap); 3. ExecuteStreamHandler 处理进程的输入流、输出流、错误流。 // os为process.getOutputStream(),连接着进程stdin void setProcessInputStream(OutputStream os) throws IOException; // is为process.getInputStream(),连接着进程stdout void setProcessOutputStream(InputStream is) throws IOException; // is为process.getErrorStream(),连接着进程stderr void setProcessErrorStream(InputStream is) throws IOException; // 启动处理 void start() throws IOException; // 停止处理 void stop() throws IOException; 4. PumpStreamHandler ExecuteStreamHandler的实现类 启动三个线程: 从用户提供的输入流input复制数据,到与进程stdin连接的输出流 从与进程stdout连接的输入流,复制数据到用户提供的输出流out 从与进程stderr连接的输入流,复制数据到用户提供的输出流err 用户提供的输入流需要手动关闭,提供的输出流若是PipedOutputStream则会被自动关闭。 // 设置进程的stdin、stdout、stderr public PumpStreamHandler(final OutputStream out, final OutputStream err, final InputStream input); // 设置进程的stdout、stderr public PumpStreamHandler(final OutputStream out, final OutputStream err); // 设置进程的stdout、stderr public PumpStreamHandler(final OutputStream outAndErr); // 设置为System.out, System.err public PumpStreamHandler();
处理进程stdin
用户提供一个输入流input,进程提供一个与stdin连接的输出流os 启动一个线程,不断的从input复制数据到os,这样input就成为了进程的stdin input关闭或出错后,复制结束,关闭os
// input是用户提供的输入流 // os是进程提供的与stdin连接的输出流 public void setProcessInputStream(final OutputStream os) { if (input != null) { if (input == System.in) { inputThread = createSystemInPump(input, os); } else { inputThread = createPump(input, os, true); } } else { // 无需输入流 try { os.close(); } catch (final IOException e) { final String msg = "Got exception while closing output stream"; DebugUtils.handleException(msg, e); } } } 创建线程和数据泵,连接输入流is和输出流os,线程启动后将不断从输入流复制数据到输出流。 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) { final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper"); result.setDaemon(true); return result; } 输入流与输出流之间的数据泵StreamPumper public class StreamPumper implements Runnable { // is,输入流;os,输出流; // closeWhenExhausted,输入流无数据是是否关闭输出流 public StreamPumper(final InputStream is, final OutputStream os, final boolean closeWhenExhausted); public void run() { synchronized (this) { // Just in case this object is reused in the future finished = false; } final byte[] buf = new byte[this.size]; int length; try { // 从输入流复制数据写入输出流 while ((length = is.read(buf)) > 0) { os.write(buf, 0, length); } } catch (final Exception e) { // nothing to do - happens quite often with watchdog } finally { if (closeWhenExhausted) { try { os.close(); } catch (final IOException e) { final String msg = "Got exception while closing exhausted output stream"; DebugUtils.handleException(msg ,e); } } synchronized (this) { finished = true; notifyAll(); // 通知等待泵结束的线程 } } } // 阻塞等待泵结束 public synchronized void waitFor() throws InterruptedException; }
处理进程stdout
用户提供一个输出流out,进程提供一个与stdout连接的输入流is 启动一个线程,不断的从is复制数据到out,这样out就成为了进程的stdout is关闭或出错后,复制结束,关闭out
// out是用户提供的输出流 // is是进程提供的与stdout连接的输出流 public void setProcessOutputStream(final InputStream is) { if (out != null) { createProcessOutputPump(is, out); } } protected void createProcessOutputPump(final InputStream is, final OutputStream os) { outputThread = createPump(is, os); } protected Thread createPump(final InputStream is, final OutputStream os) { final boolean closeWhenExhausted = os instanceof PipedOutputStream ? true : false; return createPump(is, os, closeWhenExhausted); }
处理进程stderr
用户提供一个输出流err,进程提供一个与stderr连接的输入流is 启动一个线程,不断的从is复制数据到err,这样err就成为了进程的stderr is关闭或出错后,复制结束,关闭err
public void setProcessErrorStream(final InputStream is) { if (err != null) { createProcessErrorPump(is, err); } } protected void createProcessErrorPump(final InputStream is, final OutputStream os) { errorThread = createPump(is, os); }
启动与停止
启动线程,开始复制数据
public void start() { if (outputThread != null) { outputThread.start(); } if (errorThread != null) { errorThread.start(); } if (inputThread != null) { inputThread.start(); } } 等待线程结束,flush残留数据 public void stop() throws IOException { // inputStreamPumper是处理System.in的泵 if (inputStreamPumper != null) { inputStreamPumper.stopProcessing(); } // 超时等待线程结束 stopThread(outputThread, stopTimeout); stopThread(errorThread, stopTimeout); stopThread(inputThread, stopTimeout); // flush输出流 if (err != null && err != out) { try { err.flush(); } catch (final IOException e) { final String msg = "Got exception while flushing the error stream : " + e.getMessage(); DebugUtils.handleException(msg, e); } } if (out != null) { try { out.flush(); } catch (final IOException e) { final String msg = "Got exception while flushing the output stream"; DebugUtils.handleException(msg, e); } } if (caught != null) { throw caught; } }
超时终止ExecuteWatchdog
执行器启动时,另起一个线程进行异步计时; 进程提前执行完毕,则通知计时器终止; 若计时器先计时结束,则通知执行器超时,执行器终止进程;
创建时指定超时时间 public ExecuteWatchdog(final long timeout) { this.killedProcess = false; this.watch = false; this.hasWatchdog = timeout != INFINITE_TIMEOUT; // -1表示不超时 this.processStarted = false; if (this.hasWatchdog) { this.watchdog = new Watchdog(timeout); // 新建Watchdog进行计时 this.watchdog.addTimeoutObserver(this); // ExecuteWatchdog作为WatchDog的观察者,计时超时后得到通知 } else { this.watchdog = null; } } 提交给DefaultExecutor public void setWatchdog(final ExecuteWatchdog watchDog); DefaultExecutor执行时启动ExecuteWatchdog,执行完毕停止: private int executeInternal(...) { ... if (watchdog != null) { watchdog.start(process); } ... if (watchdog != null) { watchdog.stop(); } } ExecuteWatchdog启动: public synchronized void start(final Process processToMonitor) { if (processToMonitor == null) { throw new NullPointerException("process is null."); } if (this.process != null) { throw new IllegalStateException("Already running."); } this.caught = null; this.killedProcess = false; this.watch = true; this.process = processToMonitor; this.processStarted = true; this.notifyAll(); if (this.hasWatchdog) { watchdog.start(); // 启动WatchDog计时 } } WatchDog启动 public synchronized void start() { stopped = false; final Thread t = new Thread(this, "WATCHDOG"); // 启动线程进行异步计时 t.setDaemon(true); t.start(); } public void run() { final long startTime = System.currentTimeMillis(); boolean isWaiting; // 未超时 synchronized (this) { long timeLeft = timeout - (System.currentTimeMillis() - startTime); isWaiting = timeLeft > 0; while (!stopped && isWaiting) { // stopped表示Process是否执行完毕,执行完毕时退出计时循环 try { wait(timeLeft); // 等待指定超时时间 } catch (final InterruptedException e) { } timeLeft = timeout - (System.currentTimeMillis() - startTime); isWaiting = timeLeft > 0; } } // notify the listeners outside of the synchronized block (see EXEC-60) if (!isWaiting) { // 超时 fireTimeoutOccured(); } } // 观察者模式,通知观察者超时 protected final void fireTimeoutOccured() { final Enumeratione = observers.elements(); while (e.hasMoreElements()) { e.nextElement().timeoutOccured(this); } } ExecuteWatchdog被通知超时 public synchronized void timeoutOccured(final Watchdog w) { try { try { if (process != null) { process.exitValue(); // 再检查一次是否执行完毕 } } catch (final IllegalThreadStateException itse) { // 未执行完毕 if (watch) { killedProcess = true; process.destroy(); // 终止进程 } } } catch (final Exception e) { caught = e; DebugUtils.handleException("Getting the exit value of the process failed", e); } finally { cleanUp(); } }
例
public static void main(String[] args) throws Exception { // 命令行 CommandLine commandLine = CommandLine.parse("ping baidu.com"); // 重定向stdout和stderr到文件 FileOutputStream fileOutputStream = new FileOutputStream("D:\\Test\\exec.log"); PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(fileOutputStream); // 超时终止:1秒 ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(1000); // 创建执行器 DefaultExecutor executor = new DefaultExecutor(); executor.setStreamHandler(pumpStreamHandler); executor.setWatchdog(executeWatchdog); // 执行,打印退出码 int exitValue = executor.execute(commandLine); System.out.println(exitValue); // 关闭流 fileOutputStream.close(); }
版权声明
本文仅代表作者观点,不代表本站立场。