【Flume】從入口Application來分析flume的source和sink是如何與channel交互的
來源:程序員人生 發布時間:2015-03-01 00:54:22 閱讀次數:7919次
大家在啟動flume的時候,輸入的命令就能夠看出flume的啟動入口了
[root@com21 apache-flume⑴.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1
Info: Sourcing environment configuration script /home/flume/apache-flume⑴.5.2-bin/conf/flume-env.sh
+ exec /home/flume/jdk1.7.0_71/bin/java -server -Xms2048m -Xmx2048m -Xss256K -XX:PermSize=32M -XX:MaxPermSize=512M -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedOops -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:SurvivorRatio=8 -cp '/home/flume/apache-flume⑴.5.2-bin/conf:/home/flume/apache-flume⑴.5.2-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/server.conf -n a1
從這里可以看出flume的啟動入口是:org.apache.flume.node.Application
下面我們就來看該入口程序是如何來運行的
找到main函數
附:flume每次啟動都會先判斷有無與當前配置的3大組件同名的組件存在,存在的話先停掉該組件,順序為source,sink,channel
其次是啟動所有當前配置的組件,啟動順序為channel,sink,source
通過這個啟動停止的順序可以看出flume也是對數據1致性做了保證的。
if(reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(agentName,
configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
這個if的作用就是是不是30秒讀1下配置,判斷是不是有更新
主要看1下對配置內容的處理,兩個分支雖然從代碼上看不1樣,但是處理的邏輯是1樣的
我們看else分支的代碼吧:
看configurationProvider.getConfiguration()
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames =
new HashSet<String>(channelComponentMap.keySet());
for(String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.
get(channelName);
if(channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap = channelCache.
get(channelComponent.channel.getClass());
if(nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
看loadXXX方法
我們看在載入source組件的時候有個方法: SourceRunner.forSource(source)
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
這個方法里面通過對source的類型判斷來選擇使用哪一種SourceRunner,
并且給sourcerunner中的對象source賦值了setter
我們來看1個具體例子吧AvroSource,它是事件驅動類型的source――EventDrivenSourceRunner
public void start() {
Source source = getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
lifecycleState = LifecycleState.START;
}
該方法
1、獲得前面賦值的source對象
2、setter通道處理器,那末通道處理器是在哪里賦值的呢?
在loadSources方法中
source.setChannelProcessor(channelProcessor);
這個channelProcessor對象是前面兩行
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
來初始化的,初始化時用到的selector對象
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
通過這行來實例化的,再往上就是觸及到讀取配置了,請自行查看。
3、再來看第3行cp.initialize()這個是初始化,進去看看:
public void initialize() {
interceptorChain.initialize();
}
4、再進去
public void initialize() {
Iterator<Interceptor> iter = interceptors.iterator();
while (iter.hasNext()) {
Interceptor interceptor = iter.next();
interceptor.initialize();
}
}
看到這里,我們知道了,攔截器是在這里開始起作用的,它是處在source和channel中間的1個環節
再后來就是source.start()了,啟動source組件了,這就調用到具體某個source的start方法了
最后將該組件的生命周期狀態標識為START。
這個方法LifecycleAware類會來調的
那末甚么時候來調呢?1旦調用這個方法,source與channel的交互就開始了
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
上面的代碼出現在LifecycleSupervisor類中的內部靜態類MonitorRunnable的run方法中,再來看這個線程類誰來調用?
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
在LifecycleSupervisor類中supervise方法
至于這個monitorRunnable干了甚么,我們來看它的run方法就好了,由于它是1個線程嘛
public void run() {
logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
supervisoree);
long now = System.currentTimeMillis();
try {
if (supervisoree.status.firstSeen == null) {
logger.debug("first time seeing {}", lifecycleAware);
supervisoree.status.firstSeen = now;
}
supervisoree.status.lastSeen = now;
synchronized (lifecycleAware) {
if (supervisoree.status.discard) {
// Unsupervise has already been called on this.
logger.info("Component has already been stopped {}", lifecycleAware);
return;
} else if (supervisoree.status.error) {
logger.info("Component {} is in error state, and Flume will not"
+ "attempt to change its state", lifecycleAware);
return;
}
supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
logger.debug("Want to transition {} from {} to {} (failures:{})",
new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
} catch (Throwable e) {
logger.error("Unable to start " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
// This component can never recover, shut it down.
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
logger.warn("Component {} stopped, since it could not be"
+ "successfully started due to missing dependencies",
lifecycleAware);
} catch (Throwable e1) {
logger.error("Unsuccessful attempt to "
+ "shutdown component: {} due to missing dependencies."
+ " Please shutdown the agent"
+ "or disable this component, or the agent will be"
+ "in an undefined state.", e1);
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
// Set the state to stop, so that the conf poller can
// proceed.
}
}
supervisoree.status.failures++;
}
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) {
logger.error("Unable to stop " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state",
supervisoree.status.desiredState);
}
if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
logger.error(
"Policy {} of {} has been violated - supervisor should exit!",
supervisoree.policy, lifecycleAware);
}
}
}
} catch(Throwable t) {
logger.error("Unexpected error", t);
}
logger.debug("Status check complete");
}
我們可以看到里面的核心就是通過判斷生命周期的狀態是不是是START,如果是,就履行lifecycleAware的start方法,而start方法的具體實現就是flume具體組件的start方法,一樣我們以AvroSource為例來看
server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
socketChannelFactory, pipelineFactory, null);
connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
server.start();
從這段代碼來看,start方法,啟動了1個NettyServer服務端,這個服務端啟動完以后就阻塞在這,1旦客戶端有數據過來,就處理了,也就是事件驅動了。
從這里我們終究看到核心中的核心了,也就是每隔3秒,source會和channel交互1次,條件是狀態START
這面的monitor線程對象會被方法supervise調用
而supervise方法又會被Application入口的 start方法調用
public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
這樣的話,全部鏈就串起來了
所以從這里看出來source和channel的交互頻率是3秒
看完source和channel的交互,再來看sink和channel的交互
到這里再看sink就很簡單了,由于flume中3大組件都實現自接口LifecycleAware
所以從flume的入口Application來看,從start開始終究都是到LifecycleSupervisor類的supervise方法,而該方法一樣:
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
這串邏輯,不分具體的source,sink,一樣是3秒履行1次。
至此,flume中3大組件的交互和交互頻率就說完了,望各位網友不吝指教!!
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈