1. 前言

这道题的来源呢,是公司给我们入职前的自学计划的一个小测试,具体的内容见:Nn0vJH.png。可以说 catgrepwc 在日常使用 Linux 时经常使用的命令,以及 | 的管道命令。例如我们要查询 Python 启动的进程时,通常会输入:ps aux | grep python。但是我却从来没有思考过它是如何实现的,那么就先来了解一下管道的工作原理是什么。

我们以 cat tmp.txt | grep a 为例,简单地管道的工作原理。如下图所示:

Nn6yQJ.png

可以得出:

  • cat 命令的输入来自标准输入(即 tmp.txt 文件的内容);
  • cat 命令的输出将会通过管道作为 grep 命令的输入;
  • 最后一个命令的输出将会作为标准输出,打印在控制台上。

维基百科中是这么解释的:

管道(英语:Pipeline)是一系列将标准输入输出链接起来的进程,其中每一个进程的输出被直接作为下一个进程的输入。 每一个链接都由匿名管道实现,管道中的组成元素也被称作过滤程序。

从上面这段话中可以了解到:整个管道是多个元素组成的(也就是命令,如这里的 cat),它们运行在各自的进程,上一个元素的进程会通过管道与下一个元素的进程连接,并将输入流和输出流绑定在一起。从维基百科中的参考图可以很直观地看出:

NnREkt.png

了解了管道的基本原理之后,接下来我们来用 Java 代码来完成管道的功能,这样将会对底层的实现有更深层次的认知。

2. 实现

2.1 PipedStream

从上面的分析得知,每个命令的元素都需要运行在单独的线程,并且线程之间需要通过管道来将输出传递到下一个输入。而 Java 不像 Golang,具有通道 channel 可以很简单地实现协程之间的数据传递。不过在 Java 的 java.io 包中内置 PipedInputStreamPipedOutputStream ,也可以让两个线程之间具备通信能力。

例如,我们想通过 PipedStream 来实现生产者和消费者模型,需要让 Producer 持有一个 PipedOutputStream,同时 Consumer 持有一个 PipedInputStream ,并通过 connect 将其连接,这样生产者和消费者线程就可以通过该管道来传递数据了,如下列代码所示:

class Producer extends Thread {
    PipedOutputStream out;

    @Override
    public void run() {
        try {
            out.write("Hello World".getBytes());  // 将数据写入到输出流中
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread {
    PipedInputStream in;

    @Override
    public void run() {
        try {
            byte [] buf = new byte[1024];
            int len = in.read(buf);  // 从输入流读取数据
            System.out.println("缓冲区的内容为: " + new String(buf, 0, len));
            in.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public class PipedDemo {
    public static void main(String[] args) throws IOException {
        Producer producer = new Producer();
        producer.out = new PipedOutputStream();
        Consumer consumer = new Consumer();
        consumer.in = new PipedInputStream();
        // 将生产者的输出流和消费者的输入流绑定,这样消费者就可以读取到生产者生产的数据了
        producer.out.connect(consumer.in);
        producer.start();
        consumer.start();
    }
}

也就是说:

  • Producer 将数据写入到 PipedOutputStream中;
  • Consumer 从 PipedInputStream 读出数据。

其实,两个命令元素之间的交流模式也类似于生产者-消费者模型。即 cat 命令作为生产者将 cat tmp.txt 的数据传递给 grep a 进行“消费”。 同理 grep a 再将匹配出来关键行作为数据再传递给下一个元素(消费者)。

2.2 Handler

明白了 Java 的管道使用之后,我们需要对每个命令的元素进行编程抽象。即将每一个命令都抽象为一个类(并以 Handler 为后缀,意为处理器),在类中实现每个命令的具体逻辑。

但是因为这些 Handler 存在部分相同的功能和代码,我们可以先抽象出一个 AbstractPipeHandler 类,完成基本的功能,再定义一个 handler() 抽象方法让子类(各个命令)实现自己的处理逻辑。由于每一个命令元素都运行在各自的线程,所以 AbstractPipeHandler 需要实现 Runnable 接口:

public abstract class AbstractPipeHandler implements Runnable {

    private PipedInputStream inputStream;

    private PipedOutputStream outputStream;

    public AbstractPipeHandler(PipedInputStream inputStream, PipedOutputStream outputStream) {
        this.inputStream = inputStream;
        this.outputStream = outputStream;
    }
    
    // 每一个命令的具体处理逻辑
    protected abstract void handle(BufferedReader reader);
    
    // 每个命令所属线程启动执行的逻辑
    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
        handle(reader);
    }
}

另外,还要定义 writeToStream 方法,写入到 PipedInputStream 当中,以供下一个命令元素读取数据:

protected void writeToStream(String out) {
    if (out.length() == 0) return;
    try {
        outputStream.write(out.getBytes());
        outputStream.write("\n".getBytes());
    } catch (IOException e) {
        e.printStackTrace();
    }
}

// 在完成所有数据处理之后,关闭 PipedOutputStream 输出流
protected void closeOutStream() {
    try {
        outputStream.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

完成 AbstractPipeHandler 类的抽象之后,接下来就可以根据不同的命令元素来创建各自的 Handler 类,并完成具体的逻辑实现。

CatPipeHandler

cat 命令的处理逻辑非常简单,只需要将输入流中读取的数据直接转发给输出流即可,不需要做任何的处理。以简单的命令 cat tmp.txt 为例,仅仅只是将标准输入(文件数据)发送到标准输出(控制台)。

public class CatPipeHandler extends AbstractPipeHandler {

    @Override
    protected void handle(BufferedReader reader) {
        String line;
        try {
            while ((line = reader.readLine()) != null) {
                writeToStream(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        closeOutStream();  // 在处理完数据之后,一定要调用 closeOutStream 关闭输出流
    }
}

WcPipeHandler

wc 命令的功能是统计行数,具体的实现为:在每次调用 readLine() 后将计数器加一,最后将计数器 cnt 写入到管道中:

public class WcPipeHandler extends AbstractPipeHandler {

    @Override
    protected void handle(BufferedReader reader) {
        int cnt = 0; // 计数器
        try {
            while (reader.readLine() != null) {
                cnt++;
            }
            writeToStream(String.valueOf(cnt));
        } catch (IOException e) {
            e.printStackTrace();
        }
        closeOutStream();
    }
}

GrepPipeHandler

GrepPipeHandler 类需要传入一个 keyword 变量,在每次读取输入行后进行关键字匹配,并将满足匹配的行输出到下一个管道当中:

public class GrepPipeHandler extends AbstractPipeHandler {

    private String keyword;

    public GrepPipeHandler(PipedInputStream inputStream, PipedOutputStream outputStream,
                           String keyword) {
        super(inputStream, outputStream);
        this.keyword = keyword;
    }

    @Override
    protected void handle(BufferedReader reader) {
        String line;
        try {
            while ((line = reader.readLine()) != null) {
                if (line.contains(keyword)) {
                    writeToStream(line);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        closeOutStream();
    }
}

StdoutPipeHandler

StdoutPipeHandler 作为管道的最后一个元素,并不是某个命令的处理器。它的功能是将最后一个命令元素的输出打印在控制台上。因此它的 PipedInputStream 为空,并且在处理完输入数据(即最后一个元素的输出)之后无需调用 closeOutStream

public class StdoutPipeHandler extends AbstractPipeHandler {
    
    public StdoutPipeHandler(PipedInputStream inputStream) {
        super(inputStream, null);
    }

    @Override
    protected void handle(BufferedReader reader) {
        StringBuilder sb = new StringBuilder();
        String line;
        // 把每次读取到的行追加到 StringBuilder,读取完之后将数据打印在控制台上
        try {
            while ((line = reader.readLine()) != null) {
                sb.append(line).append("\n");
            }
            System.out.println(sb.toString());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2.3 Controller

完成了各个命令元素的控制器时,现在需要实现如何通过一行字符串命令来处理,并最终得到正确的输出打印在控制台上。在 AbstractPipeHandler 类定义中说明了每一个命令元素对应的 Handler 都有各自的输入和输出流。并且在默认情况下(即没有文件的标准输入时),当前命令的输出流与下一个命令的输入流连接,以传递数据。如下图所示:

NMgiOe.png

我们准备两个数组 OutputStreamList 和 InputStreamList,用来存放每个命令的输入流和输出流对象。tail 结点则是 StdoutPipeHandler 对象,负责将 grep 匹配的结果输出到控制台上。另外还需要考虑在有文件的标准输入的情况下:在 cat tmp.txt 命令元素中,需要启动新的线程读取文件的内容并输出到 cat 命令的输入流中。

编写 ReadFileThread 线程类,遍历所有的文件,将读取到的内容写入到与 cat 的输入流绑定 OutputStream 中:

public class ReadFileThread extends Thread {
 
    private PipedOutputStream outputStream;  // 输出流,用于和 cat 命令的输入流绑定
    private List<String> files;              // 需要读取的文件

    @Override
    public void run() {
        try {
            for (String filename : files) {
                BufferedReader reader =
                        new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8));
                String line;
                while ((line = reader.readLine()) != null) {
                    outputStream.write(line.getBytes());
                    outputStream.write('\n');
                }
                outputStream.close();
                reader.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

当遇到有文件的标准输入情况下,原先 cat 命令的输入流将会被重新创建,并与 ReadFileThread 的输出流绑定,这样 cat 命令就能够读取到文件的标准输入并作出相应的逻辑处理。如下图所示:

NM2l36.png

本篇文章只说了主要的思路和部分的逻辑,具体的 Controller 的详细的代码如下:

JavaShellController.java

如果你对此感兴趣,可以浏览完完整的代码,见 Github。编译后运行 jar 包并测试这条命令:

JavaShell /user/local/java-shell$ cat tmp.txt | grep a
apple
action
ack
username
This is last