• Apache commons exec框架的简介说明


    转自:

    Apache commons exec框架的简介说明

    下文笔者讲述Apache commons exec框架的简介说明,如下所示

    Apache commmons exec框架的功能

    Apache commons  exec框架是对
      Process进行封装
      对外提供如下功能:
         为Process的stdin, stdout, stderr重定向流
          而不是File
         并发向Process stdin写入数据、读取Process stdout和stderr的数据
         避免进程阻塞
         超时终止Process
    
    实现思路:
        1.引入相应的Jar包
        2.调用相应的方法,即可运行Process中的方法
    

    例:
    引入相应的jar包

    
        org.apache.commons
        commons-exec
        1.3
    
    

    DefaultExecutor

    DefaultExecutor类的功能用于
        DefaultExecutor是启动子进程的执行类
    

    例:

    // 设置工作目录
    public void setWorkingDirectory(final File dir);
    
    // 设置流处理器
    public void setStreamHandler(final ExecuteStreamHandler streamHandler);
    
    // 执行命令
    public int execute(final CommandLine command);
    public int execute(final CommandLine command, final Map environment);
    public void execute(final CommandLine command, final ExecuteResultHandler handler);
    public void execute(final CommandLine command, final Map environment, 
        final ExecuteResultHandler handler);
    实际的执行方法
    private int executeInternal(final CommandLine command, final Map environment,
            final File dir, final ExecuteStreamHandler streams) throws IOException {
    
        setExceptionCaught(null);
    
        // launch():使用Runtime.exec创建Process
        // Runtime.getRuntime().exec(cmd.toStrings(), EnvironmentUtils.toStrings(env), workingDir);
        final Process process = this.launch(command, environment, dir);
    
        // 将用户提供的流与进程的流连接起来
        try {
            streams.setProcessInputStream(process.getOutputStream());
            streams.setProcessOutputStream(process.getInputStream());
            streams.setProcessErrorStream(process.getErrorStream());
        } catch (final IOException e) {
            process.destroy();
            throw e;
        }
    
        // 启动线程,从输入流复制数据写入输出流
        streams.start();
    
        try {
    
            // add the process to the list of those to destroy if the VM exits
            if (this.getProcessDestroyer() != null) {
              this.getProcessDestroyer().add(process);
            }
    
            // associate the watchdog with the newly created process
            if (watchdog != null) {
                watchdog.start(process);
            }
    
            int exitValue = Executor.INVALID_EXITVALUE;
    
            try {
                exitValue = process.waitFor();
            } catch (final InterruptedException e) {
                process.destroy();
            }
            finally {
                // see http://bugs.sun.com/view_bug.do?bug_id=6420270
                // see https://issues.apache.org/jira/browse/EXEC-46
                // Process.waitFor should clear interrupt status when throwing InterruptedException
                // but we have to do that manually
                Thread.interrupted();
            }            
    
            if (watchdog != null) {
                watchdog.stop();
            }
    
            try {
                streams.stop();
            }
            catch (final IOException e) {
                setExceptionCaught(e);
            }
    
            closeProcessStreams(process);
    
            if (getExceptionCaught() != null) {
                throw getExceptionCaught();
            }
    
            if (watchdog != null) {
                try {
                    watchdog.checkException();
                } catch (final IOException e) {
                    throw e;
                } catch (final Exception e) {
                    throw new IOException(e.getMessage());
                }
            }
    
            if (this.isFailure(exitValue)) {
                throw new ExecuteException("Process exited with an error: " + exitValue, exitValue);
            }
    
            return exitValue;
        } finally {
            // remove the process to the list of those to destroy if the VM exits
            if (this.getProcessDestroyer() != null) {
              this.getProcessDestroyer().remove(process);
            }
        }
    }
    

    CommandLine

    封装命令和参数。
    
    // 设置命令,无参数
    public CommandLine(final String executable);
    // 设置命令文件,无参数
    public CommandLine(final File executable);
    
    // 设置参数
    public CommandLine addArguments(final String[] addArguments);
    // 设置未分拆的参数
    public CommandLine addArguments(final String addArguments);
    // 设置替换值,替换参数中${}的占位变量
    public void setSubstitutionMap(final Map substitutionMap);
    
    // 直接解析命令行:第一个元素是命令,其余是参数
    public static CommandLine parse(final String line);
    public static CommandLine parse(final String line, final Map substitutionMap);
    3. ExecuteStreamHandler
    处理进程的输入流、输出流、错误流。
    
    // os为process.getOutputStream(),连接着进程stdin
    void setProcessInputStream(OutputStream os) throws IOException;
    
    // is为process.getInputStream(),连接着进程stdout
    void setProcessOutputStream(InputStream is) throws IOException;
    
    // is为process.getErrorStream(),连接着进程stderr
    void setProcessErrorStream(InputStream is) throws IOException;
    
    // 启动处理
    void start() throws IOException;
    
    // 停止处理
    void stop() throws IOException;
    4. PumpStreamHandler
    ExecuteStreamHandler的实现类
    启动三个线程:
    从用户提供的输入流input复制数据,到与进程stdin连接的输出流
    从与进程stdout连接的输入流,复制数据到用户提供的输出流out
    从与进程stderr连接的输入流,复制数据到用户提供的输出流err
    
    用户提供的输入流需要手动关闭,提供的输出流若是PipedOutputStream则会被自动关闭。
    
    // 设置进程的stdin、stdout、stderr
    public PumpStreamHandler(final OutputStream out, final OutputStream err, final InputStream input);
    
    // 设置进程的stdout、stderr
    public PumpStreamHandler(final OutputStream out, final OutputStream err);
    
    // 设置进程的stdout、stderr
    public PumpStreamHandler(final OutputStream outAndErr);
    
    // 设置为System.out, System.err
    public PumpStreamHandler();
    

    处理进程stdin

    用户提供一个输入流input,进程提供一个与stdin连接的输出流os
    启动一个线程,不断的从input复制数据到os,这样input就成为了进程的stdin
    input关闭或出错后,复制结束,关闭os
    
    // input是用户提供的输入流
    // os是进程提供的与stdin连接的输出流
    public void setProcessInputStream(final OutputStream os) {
        if (input != null) {
            if (input == System.in) {
                inputThread = createSystemInPump(input, os);
            } else {
                inputThread = createPump(input, os, true);
            }
        } else { // 无需输入流
            try {
                os.close();
            } catch (final IOException e) {
                final String msg = "Got exception while closing output stream";
                DebugUtils.handleException(msg, e);
            }
        }
    }
    创建线程和数据泵,连接输入流is和输出流os,线程启动后将不断从输入流复制数据到输出流。
    
    protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
        final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
        result.setDaemon(true);
        return result;
    }
    输入流与输出流之间的数据泵StreamPumper
    
    public class StreamPumper implements Runnable {
        // is,输入流;os,输出流;
        // closeWhenExhausted,输入流无数据是是否关闭输出流
        public StreamPumper(final InputStream is, final OutputStream os,
                final boolean closeWhenExhausted);
    
        public void run() {
            synchronized (this) {
                // Just in case this object is reused in the future
                finished = false;
            }
    
            final byte[] buf = new byte[this.size];
    
            int length;
            try {
                // 从输入流复制数据写入输出流
                while ((length = is.read(buf)) > 0) {
                    os.write(buf, 0, length);
                }
            } catch (final Exception e) {
                // nothing to do - happens quite often with watchdog
            } finally {
                if (closeWhenExhausted) {
                    try {
                        os.close();
                    } catch (final IOException e) {
                        final String msg = "Got exception while closing exhausted output stream";
                        DebugUtils.handleException(msg ,e);
                    }
                }
                synchronized (this) {
                    finished = true;
                    notifyAll(); // 通知等待泵结束的线程
                }
            }
        }
    
        // 阻塞等待泵结束
        public synchronized void waitFor() throws InterruptedException;
    }
    

    处理进程stdout

    用户提供一个输出流out,进程提供一个与stdout连接的输入流is
    启动一个线程,不断的从is复制数据到out,这样out就成为了进程的stdout
    is关闭或出错后,复制结束,关闭out
    
    // out是用户提供的输出流
    // is是进程提供的与stdout连接的输出流
    public void setProcessOutputStream(final InputStream is) {
        if (out != null) {
            createProcessOutputPump(is, out);
        }
    }
    
    protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
        outputThread = createPump(is, os);
    }
    
    protected Thread createPump(final InputStream is, final OutputStream os) {
        final boolean closeWhenExhausted = os instanceof PipedOutputStream ? true : false;
        return createPump(is, os, closeWhenExhausted);
    }
    

    处理进程stderr

    用户提供一个输出流err,进程提供一个与stderr连接的输入流is
    启动一个线程,不断的从is复制数据到err,这样err就成为了进程的stderr
    is关闭或出错后,复制结束,关闭err
    
    public void setProcessErrorStream(final InputStream is) {
        if (err != null) {
            createProcessErrorPump(is, err);
        }
    }
    
    protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
        errorThread = createPump(is, os);
    }
    

    启动与停止

    启动线程,开始复制数据
    
    public void start() {
        if (outputThread != null) {
            outputThread.start();
        }
        if (errorThread != null) {
            errorThread.start();
        }
        if (inputThread != null) {
            inputThread.start();
        }
    }
    等待线程结束,flush残留数据
    public void stop() throws IOException {
        // inputStreamPumper是处理System.in的泵
        if (inputStreamPumper != null) {
            inputStreamPumper.stopProcessing();
        }
    
        // 超时等待线程结束
        stopThread(outputThread, stopTimeout);
        stopThread(errorThread, stopTimeout);
        stopThread(inputThread, stopTimeout);
    
        // flush输出流
        if (err != null && err != out) {
            try {
                err.flush();
            } catch (final IOException e) {
                final String msg = "Got exception while flushing the error stream : " + e.getMessage();
                DebugUtils.handleException(msg, e);
            }
        }
    
        if (out != null) {
            try {
                out.flush();
            } catch (final IOException e) {
                final String msg = "Got exception while flushing the output stream";
                DebugUtils.handleException(msg, e);
            }
        }
    
        if (caught != null) {
            throw caught;
        }
    }
    

    超时终止ExecuteWatchdog

    执行器启动时,另起一个线程进行异步计时;
    进程提前执行完毕,则通知计时器终止;
    若计时器先计时结束,则通知执行器超时,执行器终止进程;
    
    创建时指定超时时间
    public ExecuteWatchdog(final long timeout) {
        this.killedProcess = false;
        this.watch = false;
        this.hasWatchdog = timeout != INFINITE_TIMEOUT; // -1表示不超时
        this.processStarted = false;
        if (this.hasWatchdog) {
            this.watchdog = new Watchdog(timeout); // 新建Watchdog进行计时
            this.watchdog.addTimeoutObserver(this); // ExecuteWatchdog作为WatchDog的观察者,计时超时后得到通知
        }
        else {
            this.watchdog = null;
        }
    }
    
    提交给DefaultExecutor
    public void setWatchdog(final ExecuteWatchdog watchDog);
    DefaultExecutor执行时启动ExecuteWatchdog,执行完毕停止:
    
    private int executeInternal(...) {
        ...
        if (watchdog != null) {
            watchdog.start(process);
        }
        ...
        if (watchdog != null) {
            watchdog.stop();
        }
    }
    ExecuteWatchdog启动:
    
    public synchronized void start(final Process processToMonitor) {
        if (processToMonitor == null) {
            throw new NullPointerException("process is null.");
        }
        if (this.process != null) {
            throw new IllegalStateException("Already running.");
        }
        this.caught = null;
        this.killedProcess = false;
        this.watch = true;
        this.process = processToMonitor;
        this.processStarted = true;
        this.notifyAll();
        if (this.hasWatchdog) {
            watchdog.start();  // 启动WatchDog计时
        }
    }
    
    WatchDog启动
    public synchronized void start() {
        stopped = false;
        final Thread t = new Thread(this, "WATCHDOG"); // 启动线程进行异步计时
        t.setDaemon(true);
        t.start();
    }
    
    public void run() {
        final long startTime = System.currentTimeMillis();
        boolean isWaiting; // 未超时
        synchronized (this) {
            long timeLeft = timeout - (System.currentTimeMillis() - startTime);
            isWaiting = timeLeft > 0;
            while (!stopped && isWaiting) { // stopped表示Process是否执行完毕,执行完毕时退出计时循环
                try {
                    wait(timeLeft); // 等待指定超时时间
                } catch (final InterruptedException e) {
                }
                timeLeft = timeout - (System.currentTimeMillis() - startTime);
                isWaiting = timeLeft > 0;
            }
        }
    
        // notify the listeners outside of the synchronized block (see EXEC-60)
        if (!isWaiting) { // 超时
            fireTimeoutOccured();
        }
    }
    
    // 观察者模式,通知观察者超时
    protected final void fireTimeoutOccured() {
        final Enumeration e = observers.elements();
        while (e.hasMoreElements()) {
            e.nextElement().timeoutOccured(this);
        }
    }
    
    ExecuteWatchdog被通知超时
    public synchronized void timeoutOccured(final Watchdog w) {
        try {
            try {
                if (process != null) {
                    process.exitValue(); // 再检查一次是否执行完毕
                }
            } catch (final IllegalThreadStateException itse) {
                // 未执行完毕
                if (watch) {
                    killedProcess = true;
                    process.destroy(); // 终止进程
                }
            }
        } catch (final Exception e) {
            caught = e;
            DebugUtils.handleException("Getting the exit value of the process failed", e);
        } finally {
            cleanUp();
        }
    }
    

    public static void main(String[] args) throws Exception {
        // 命令行
        CommandLine commandLine = CommandLine.parse("ping baidu.com");
    
        // 重定向stdout和stderr到文件
        FileOutputStream fileOutputStream = new FileOutputStream("D:\\Test\\exec.log");
        PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(fileOutputStream);
    
        // 超时终止:1秒
        ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(1000);
    
        // 创建执行器
        DefaultExecutor executor = new DefaultExecutor();
        executor.setStreamHandler(pumpStreamHandler);
        executor.setWatchdog(executeWatchdog);
    
        // 执行,打印退出码
        int exitValue = executor.execute(commandLine);
        System.out.println(exitValue);
    
        // 关闭流
        fileOutputStream.close();
    }
    

    版权声明

    本文仅代表作者观点,不代表本站立场。

  • 相关阅读:
    电源管理芯片直入电子设备心脏,让七大产业推动疫情的来临
    网络安全应急响应操作流程-打好应急响应保卫战
    Python日常办公10大小技巧
    AI先行者第三辑:石油专家正在加速“吸入”AI养分
    Java回顾-IO流的体系结构/File文件类的使用
    three.js学习笔记(十六)——汹涌的海洋
    力扣练习——44 路径总和 III
    Camunda 7.x 系列【54】管理服务 ManagementService
    IIS方式部署项目发布上线
    关于线程的那些事
  • 原文地址:https://blog.csdn.net/qq_25073223/article/details/127420017