这道题的来源呢,是公司给我们入职前的自学计划的一个小测试,具体的内容见:Nn0vJH.png。可以说 cat
、grep
、wc
在日常使用 Linux 时经常使用的命令,以及 |
的管道命令。例如我们要查询 Python 启动的进程时,通常会输入:ps aux | grep python
。但是我却从来没有思考过它是如何实现的,那么就先来了解一下管道的工作原理是什么。
我们以 cat tmp.txt | grep a
为例,简单地管道的工作原理。如下图所示:
可以得出:
cat
命令的输入来自标准输入(即 tmp.txt
文件的内容);cat
命令的输出将会通过管道作为 grep
命令的输入;维基百科中是这么解释的:
管道(英语:Pipeline)是一系列将标准输入输出链接起来的进程,其中每一个进程的输出被直接作为下一个进程的输入。 每一个链接都由匿名管道实现,管道中的组成元素也被称作过滤程序。
从上面这段话中可以了解到:整个管道是多个元素组成的(也就是命令,如这里的 cat
),它们运行在各自的进程,上一个元素的进程会通过管道与下一个元素的进程连接,并将输入流和输出流绑定在一起。从维基百科中的参考图可以很直观地看出:
了解了管道的基本原理之后,接下来我们来用 Java 代码来完成管道的功能,这样将会对底层的实现有更深层次的认知。
从上面的分析得知,每个命令的元素都需要运行在单独的线程,并且线程之间需要通过管道来将输出传递到下一个输入。而 Java 不像 Golang,具有通道 channel 可以很简单地实现协程之间的数据传递。不过在 Java 的 java.io
包中内置 PipedInputStream
与 PipedOutputStream
,也可以让两个线程之间具备通信能力。
例如,我们想通过 PipedStream 来实现生产者和消费者模型,需要让 Producer 持有一个 PipedOutputStream
,同时 Consumer 持有一个 PipedInputStream
,并通过 connect
将其连接,这样生产者和消费者线程就可以通过该管道来传递数据了,如下列代码所示:
class Producer extends Thread {
PipedOutputStream out;
@Override
public void run() {
try {
out.write("Hello World".getBytes()); // 将数据写入到输出流中
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class Consumer extends Thread {
PipedInputStream in;
@Override
public void run() {
try {
byte [] buf = new byte[1024];
int len = in.read(buf); // 从输入流读取数据
System.out.println("缓冲区的内容为: " + new String(buf, 0, len));
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class PipedDemo {
public static void main(String[] args) throws IOException {
Producer producer = new Producer();
producer.out = new PipedOutputStream();
Consumer consumer = new Consumer();
consumer.in = new PipedInputStream();
// 将生产者的输出流和消费者的输入流绑定,这样消费者就可以读取到生产者生产的数据了
producer.out.connect(consumer.in);
producer.start();
consumer.start();
}
}
也就是说:
PipedOutputStream
中;PipedInputStream
读出数据。其实,两个命令元素之间的交流模式也类似于生产者-消费者模型。即 cat 命令作为生产者将 cat tmp.txt
的数据传递给 grep a
进行“消费”。 同理 grep a
再将匹配出来关键行作为数据再传递给下一个元素(消费者)。
明白了 Java 的管道使用之后,我们需要对每个命令的元素进行编程抽象。即将每一个命令都抽象为一个类(并以 Handler 为后缀,意为处理器),在类中实现每个命令的具体逻辑。
但是因为这些 Handler 存在部分相同的功能和代码,我们可以先抽象出一个 AbstractPipeHandler
类,完成基本的功能,再定义一个 handler()
抽象方法让子类(各个命令)实现自己的处理逻辑。由于每一个命令元素都运行在各自的线程,所以 AbstractPipeHandler
需要实现 Runnable
接口:
public abstract class AbstractPipeHandler implements Runnable {
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
public AbstractPipeHandler(PipedInputStream inputStream, PipedOutputStream outputStream) {
this.inputStream = inputStream;
this.outputStream = outputStream;
}
// 每一个命令的具体处理逻辑
protected abstract void handle(BufferedReader reader);
// 每个命令所属线程启动执行的逻辑
@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
handle(reader);
}
}
另外,还要定义 writeToStream
方法,写入到 PipedInputStream
当中,以供下一个命令元素读取数据:
protected void writeToStream(String out) {
if (out.length() == 0) return;
try {
outputStream.write(out.getBytes());
outputStream.write("\n".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
// 在完成所有数据处理之后,关闭 PipedOutputStream 输出流
protected void closeOutStream() {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
完成 AbstractPipeHandler
类的抽象之后,接下来就可以根据不同的命令元素来创建各自的 Handler 类,并完成具体的逻辑实现。
cat
命令的处理逻辑非常简单,只需要将输入流中读取的数据直接转发给输出流即可,不需要做任何的处理。以简单的命令 cat tmp.txt
为例,仅仅只是将标准输入(文件数据)发送到标准输出(控制台)。
public class CatPipeHandler extends AbstractPipeHandler {
@Override
protected void handle(BufferedReader reader) {
String line;
try {
while ((line = reader.readLine()) != null) {
writeToStream(line);
}
} catch (IOException e) {
e.printStackTrace();
}
closeOutStream(); // 在处理完数据之后,一定要调用 closeOutStream 关闭输出流
}
}
wc
命令的功能是统计行数,具体的实现为:在每次调用 readLine()
后将计数器加一,最后将计数器 cnt
写入到管道中:
public class WcPipeHandler extends AbstractPipeHandler {
@Override
protected void handle(BufferedReader reader) {
int cnt = 0; // 计数器
try {
while (reader.readLine() != null) {
cnt++;
}
writeToStream(String.valueOf(cnt));
} catch (IOException e) {
e.printStackTrace();
}
closeOutStream();
}
}
GrepPipeHandler 类需要传入一个 keyword
变量,在每次读取输入行后进行关键字匹配,并将满足匹配的行输出到下一个管道当中:
public class GrepPipeHandler extends AbstractPipeHandler {
private String keyword;
public GrepPipeHandler(PipedInputStream inputStream, PipedOutputStream outputStream,
String keyword) {
super(inputStream, outputStream);
this.keyword = keyword;
}
@Override
protected void handle(BufferedReader reader) {
String line;
try {
while ((line = reader.readLine()) != null) {
if (line.contains(keyword)) {
writeToStream(line);
}
}
} catch (IOException e) {
e.printStackTrace();
}
closeOutStream();
}
}
StdoutPipeHandler 作为管道的最后一个元素,并不是某个命令的处理器。它的功能是将最后一个命令元素的输出打印在控制台上。因此它的 PipedInputStream 为空,并且在处理完输入数据(即最后一个元素的输出)之后无需调用 closeOutStream
。
public class StdoutPipeHandler extends AbstractPipeHandler {
public StdoutPipeHandler(PipedInputStream inputStream) {
super(inputStream, null);
}
@Override
protected void handle(BufferedReader reader) {
StringBuilder sb = new StringBuilder();
String line;
// 把每次读取到的行追加到 StringBuilder,读取完之后将数据打印在控制台上
try {
while ((line = reader.readLine()) != null) {
sb.append(line).append("\n");
}
System.out.println(sb.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
完成了各个命令元素的控制器时,现在需要实现如何通过一行字符串命令来处理,并最终得到正确的输出打印在控制台上。在 AbstractPipeHandler 类定义中说明了每一个命令元素对应的 Handler 都有各自的输入和输出流。并且在默认情况下(即没有文件的标准输入时),当前命令的输出流与下一个命令的输入流连接,以传递数据。如下图所示:
我们准备两个数组 OutputStreamList 和 InputStreamList,用来存放每个命令的输入流和输出流对象。tail 结点则是 StdoutPipeHandler 对象,负责将 grep
匹配的结果输出到控制台上。另外还需要考虑在有文件的标准输入的情况下:在 cat tmp.txt
命令元素中,需要启动新的线程读取文件的内容并输出到 cat
命令的输入流中。
编写 ReadFileThread 线程类,遍历所有的文件,将读取到的内容写入到与 cat
的输入流绑定 OutputStream 中:
public class ReadFileThread extends Thread {
private PipedOutputStream outputStream; // 输出流,用于和 cat 命令的输入流绑定
private List<String> files; // 需要读取的文件
@Override
public void run() {
try {
for (String filename : files) {
BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8));
String line;
while ((line = reader.readLine()) != null) {
outputStream.write(line.getBytes());
outputStream.write('\n');
}
outputStream.close();
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
当遇到有文件的标准输入情况下,原先 cat
命令的输入流将会被重新创建,并与 ReadFileThread 的输出流绑定,这样 cat
命令就能够读取到文件的标准输入并作出相应的逻辑处理。如下图所示:
本篇文章只说了主要的思路和部分的逻辑,具体的 Controller 的详细的代码如下:
如果你对此感兴趣,可以浏览完完整的代码,见 Github。编译后运行 jar 包并测试这条命令:
JavaShell /user/local/java-shell$ cat tmp.txt | grep a
apple
action
ack
username
This is last