【Flume】flume中攔截器的源碼分析,以TimestampInterceptor為例
來源:程序員人生 發布時間:2015-03-05 08:39:23 閱讀次數:4635次
本文將以TimestampInterceptor為例來分析1下flume中攔截器的工作原理
首先來看下改攔截器的實現結構
1、實現了Interceptor接口
該接口的方法定義以下:
public void initialize();
public Event intercept(Event event);
public List<Event> intercept(List<Event> events);
public void close();
/** Builder implementations MUST have a no-arg constructor */
public interface Builder extends Configurable {
public Interceptor build();
}
2、接口中定義了1個內部接口Builder
該接口又繼承自Configurable接口【接口只能繼承接口,不能實現接口】
該接口的方法定義以下:
public void configure(Context context);
該方法很容易理解,就是用來讀取flume的配置文件內容的。
下面來看TimestampInterceptor的具體實現
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
@Override
public Interceptor build() {
return new TimestampInterceptor(preserveExisting);
}
@Override
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}
}
該內部類實現了Interceptor的接口Builder,必須得有1個無參的構造方法,通過該構造方法就實例化了1個攔截器對象
并且在configure方法中讀取了preserveExisting屬性,默許值為false
該配置的作用表明
This interceptor can preserve an existing timestamp if it is already present in the configuration.
再來看intercept方法
批量的
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
簡單的循環調用了intercept對event逐1處理
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(TIMESTAMP)) {
// we must preserve the existing timestamp
} else {
long now = System.currentTimeMillis();
headers.put(TIMESTAMP, Long.toString(now));
}
return event;
}
該方法即攔截器的核心內容
1、如果拿到的event的header中本身包括timestamp這個key并且預留保存屬性為true,我們就直接返回該event就好了
2、否則的話,我們生成1個時間戳,并將這個時間戳放到event的header中,作為1個屬性保存,再返回給event
綜上所述:
Flume中攔截器的作用就是對event中header的部份可以按需塞入1些屬性,固然你如果想要處理event的body內容,也是可以的,但是event的body內容是系統下游階段真正處理的內容,如果讓Flume來修飾body的內容的話,那就是強耦合了,這就背背了當初使用Flume來解耦的初衷了。
做法可以有,但是合不適合是另外一回事了!!!!balance1直是1個世界性困難!!!
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈