NingG +

Java实现tail命令

开篇闲谈

实时捕获文件的新增内容,如何实现?拍拍脑袋,需要借助3个变量:

具体过程:

我x,上面好复杂,不会这么困难吧。抓紧去学习一下别人的思路。

上述整个过程,都在避免一种情况:在发送一个文件新增内容的时候,文件又有新增内容;而最佳的逻辑是:

BufferedReader

利用BufferedReader下的readLine()方法来实现,示例代码如下:

public class JavaTail {

	public static void main(String[] args) throws IOException {
		
		String srcFilename = "E:/1.log";
		String charset = "GBK";
		
		InputStream fileInputStream = new FileInputStream(srcFilename);
		Reader fileReader = new InputStreamReader(fileInputStream, charset);
		BufferedReader bufferedReader = new BufferedReader(fileReader);
		
		String singleLine = "";
		while(true){
			if( (singleLine = bufferedReader.readLine()) != null ){
				System.out.println(singleLine);
				continue;
			}
			
			try {
				Thread.sleep(1000L);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				break;
			}
			
		}
		
		bufferedReader.close();
	}
}

编写成multi-thread方式:进程中,单独启动一个线程来监听文件,示例代码如下:

package com.github.ningg.tail;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class JavaTail{

	public static void main(String[] args) throws IOException, InterruptedException {
		
		String srcFilename = "E:/2.log";
		String charset = "GBK";
		
		Thread.sleep(5000L);
		
		ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
		SpoolingRunnable spool = new SpoolingRunnable(srcFilename, charset, true);
		
		executor.scheduleWithFixedDelay(spool, 1, 1, TimeUnit.SECONDS);
		Thread.sleep(20000L);
		
		System.out.println("--------------SHUTDOWN EXECUTOR----------------");
		spool.setKeepReading(false);
		Thread.sleep(20000L);

		System.out.println("-------------- RESTART EXECUTOR----------------");
		spool.setKeepReading(true);
		Thread.sleep(20000L);
		
		System.out.println("--------------SHUTDOWN EXECUTOR----------------");
		spool.setKeepReading(false);
		spool.destroy();
	}
	
}

以及另一个文件:SpoolingRunnable.java

package com.github.ningg.tail;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UnsupportedEncodingException;

public class SpoolingRunnable implements Runnable{

	private String filename;
	private String charset;
	private volatile boolean keepReading;
	
	private BufferedReader bufferedReader = null;
	
	public SpoolingRunnable( String filename, String charset,
			boolean keepReading) {
		this.filename = filename;
		this.charset = charset;
		this.keepReading = keepReading;
	}

	public void run() {
		try {
			if(bufferedReader == null){
				InputStream is = new FileInputStream(filename);
				Reader reader = new InputStreamReader(is, charset);
				bufferedReader = new BufferedReader(reader);
			}
			
			String singleLine = "";
			
			while(keepReading){
				if( (singleLine = bufferedReader.readLine()) != null ){
					System.out.println(singleLine);
					continue;
				}
				
				Thread.sleep(1000L);
			}
			
			System.out.println("-----[stop: keep reading]-----");
			
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
//		finally{
//			Thread.currentThread().interrupt();
//		}
		
	}

	public void destroy() throws IOException{
		bufferedReader.close();
	}
	
	public String getFilename() {
		return filename;
	}

	public SpoolingRunnable setFilename(String filename) {
		this.filename = filename;
		return this;
	}

	public String getCharset() {
		return charset;
	}

	public SpoolingRunnable setCharset(String charset) {
		this.charset = charset;
		return this;
	}

	public boolean isKeepReading() {
		return keepReading;
	}

	public SpoolingRunnable setKeepReading(boolean keepReading) {
		this.keepReading = keepReading;
		return this;
	}
	
	
}

特别说明:整理java实现tail,最初本意是因为需要在Flume的agent上利用java实现捕获文件增量内容,因为java编写的代码,有了JRE,就可以跨平台;最近事情又有新的进展:

利用Flume自带API,实现的java tail功能,源文件TestLineDeserializer.java

package com.github.ningg.flume.source;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.DecodeErrorPolicy;
import org.apache.flume.serialization.DurablePositionTracker;
import org.apache.flume.serialization.EventDeserializer;
import org.apache.flume.serialization.EventDeserializerFactory;
import org.apache.flume.serialization.PositionTracker;
import org.apache.flume.serialization.ResettableFileInputStream;
import org.apache.flume.serialization.ResettableInputStream;

public class TestLineDeserializer {

	public static void main(String[] args) throws IOException, InterruptedException {
		
		String srcFilename = "E:/2.log";
		String metaFilename = "E:/meta.log";
		String charset = "GBK";
		String decodeErrorPolicy = "IGNORE";
		String deserializerType = "LINE";
		Context context = new Context();
		
		
		File srcFile = new File(srcFilename);
		File metaFile = new File(metaFilename);
		
		PositionTracker tracker = DurablePositionTracker.getInstance(metaFile, srcFile.getPath());
		ResettableInputStream in = new ResettableFileInputStream(srcFile, tracker,
										ResettableFileInputStream.DEFAULT_BUF_SIZE, 
										Charset.forName(charset), DecodeErrorPolicy.valueOf(decodeErrorPolicy));
		
		EventDeserializer eventDeserializer = EventDeserializerFactory.getInstance(deserializerType, context, in);
		
		Event event = null;
		String singleLine = "";
		
		
		while(true){
			if ( (event = eventDeserializer.readEvent()) != null){
				singleLine = new String(event.getBody());
				System.out.println(singleLine);
				continue;
			}
			
			Thread.sleep(1000L);
		}
		
		
	}
	
}

参考来源

Top