多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 互聯網 > 【Flume】從入口Application來分析flume的source和sink是如何與channel交互的

【Flume】從入口Application來分析flume的source和sink是如何與channel交互的

來源:程序員人生   發布時間:2015-03-01 00:54:22 閱讀次數:7919次

大家在啟動flume的時候,輸入的命令就能夠看出flume的啟動入口了

[root@com21 apache-flume⑴.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1 Info: Sourcing environment configuration script /home/flume/apache-flume⑴.5.2-bin/conf/flume-env.sh + exec /home/flume/jdk1.7.0_71/bin/java -server -Xms2048m -Xmx2048m -Xss256K -XX:PermSize=32M -XX:MaxPermSize=512M -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedOops -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:SurvivorRatio=8 -cp '/home/flume/apache-flume⑴.5.2-bin/conf:/home/flume/apache-flume⑴.5.2-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/server.conf -n a1

從這里可以看出flume的啟動入口是:org.apache.flume.node.Application

下面我們就來看該入口程序是如何來運行的

找到main函數

附:flume每次啟動都會先判斷有無與當前配置的3大組件同名的組件存在,存在的話先停掉該組件,順序為source,sink,channel

其次是啟動所有當前配置的組件,啟動順序為channel,sink,source

通過這個啟動停止的順序可以看出flume也是對數據1致性做了保證的。

if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); }
這個if的作用就是是不是30秒讀1下配置,判斷是不是有更新

主要看1下對配置內容的處理,兩個分支雖然從代碼上看不1樣,但是處理的邏輯是1樣的

我們看else分支的代碼吧:

看configurationProvider.getConfiguration()

public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for(String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap. get(channelName); if(channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache. get(channelComponent.channel.getClass()); if(nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }

看loadXXX方法 

我們看在載入source組件的時候有個方法: SourceRunner.forSource(source)

public static SourceRunner forSource(Source source) { SourceRunner runner = null; if (source instanceof PollableSource) { runner = new PollableSourceRunner(); ((PollableSourceRunner) runner).setSource((PollableSource) source); } else if (source instanceof EventDrivenSource) { runner = new EventDrivenSourceRunner(); ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); } else { throw new IllegalArgumentException("No known runner type for source " + source); } return runner; }
這個方法里面通過對source的類型判斷來選擇使用哪一種SourceRunner,并且給sourcerunner中的對象source賦值了setter

我們來看1個具體例子吧AvroSource,它是事件驅動類型的source――EventDrivenSourceRunner

public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); lifecycleState = LifecycleState.START; }

該方法

1、獲得前面賦值的source對象

2、setter通道處理器,那末通道處理器是在哪里賦值的呢?

在loadSources方法中

source.setChannelProcessor(channelProcessor);
這個channelProcessor對象是前面兩行

ChannelProcessor channelProcessor = new ChannelProcessor(selector);
來初始化的,初始化時用到的selector對象
ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig);
通過這行來實例化的,再往上就是觸及到讀取配置了,請自行查看。

3、再來看第3行cp.initialize()這個是初始化,進去看看:

public void initialize() { interceptorChain.initialize(); }

4、再進去

public void initialize() { Iterator<Interceptor> iter = interceptors.iterator(); while (iter.hasNext()) { Interceptor interceptor = iter.next(); interceptor.initialize(); } }
看到這里,我們知道了,攔截器是在這里開始起作用的,它是處在source和channel中間的1個環節
再后來就是source.start()了,啟動source組件了,這就調用到具體某個source的start方法了

最后將該組件的生命周期狀態標識為START。


這個方法LifecycleAware類會來調的

那末甚么時候來調呢?1旦調用這個方法,source與channel的交互就開始了

switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start();
上面的代碼出現在LifecycleSupervisor類中的內部靜態類MonitorRunnable的run方法中,再來看這個線程類誰來調用?

MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future);

在LifecycleSupervisor類中supervise方法

至于這個monitorRunnable干了甚么,我們來看它的run方法就好了,由于它是1個線程嘛

public void run() { logger.debug("checking process:{} supervisoree:{}", lifecycleAware, supervisoree); long now = System.currentTimeMillis(); try { if (supervisoree.status.firstSeen == null) { logger.debug("first time seeing {}", lifecycleAware); supervisoree.status.firstSeen = now; } supervisoree.status.lastSeen = now; synchronized (lifecycleAware) { if (supervisoree.status.discard) { // Unsupervise has already been called on this. logger.info("Component has already been stopped {}", lifecycleAware); return; } else if (supervisoree.status.error) { logger.info("Component {} is in error state, and Flume will not" + "attempt to change its state", lifecycleAware); return; } supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { logger.debug("Want to transition {} from {} to {} (failures:{})", new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start(); } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++; } break; case STOP: try { lifecycleAware.stop(); } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } } } } catch(Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); }
我們可以看到里面的核心就是通過判斷生命周期的狀態是不是是START,如果是,就履行lifecycleAware的start方法,而start方法的具體實現就是flume具體組件的start方法,一樣我們以AvroSource為例來看

server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), socketChannelFactory, pipelineFactory, null); connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); server.start();
從這段代碼來看,start方法,啟動了1個NettyServer服務端,這個服務端啟動完以后就阻塞在這,1旦客戶端有數據過來,就處理了,也就是事件驅動了。

從這里我們終究看到核心中的核心了,也就是每隔3秒,source會和channel交互1次,條件是狀態START

這面的monitor線程對象會被方法supervise調用

而supervise方法又會被Application入口的 start方法調用

public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
這樣的話,全部鏈就串起來了
所以從這里看出來source和channel的交互頻率是3秒


看完source和channel的交互,再來看sink和channel的交互

到這里再看sink就很簡單了,由于flume中3大組件都實現自接口LifecycleAware

所以從flume的入口Application來看,從start開始終究都是到LifecycleSupervisor類的supervise方法,而該方法一樣:

MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future);
這串邏輯,不分具體的source,sink,一樣是3秒履行1次。


至此,flume中3大組件的交互和交互頻率就說完了,望各位網友不吝指教!!





生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 精品伊人网| 中文乱码一本到无线202 | 亚洲欧美另类日本久久影院 | 久久精品国产久精国产80cm | 午夜在线播放视频在线观看视频 | 亚洲一区2区三区4区5区 | 欧美高清在线视频在线99精品 | 久久国产一区二区 | 欧美激情videos | 禁视频网站在线观看漫画 | 午夜dj在线观看免费高清在线 | 亚洲啊v在线 | 2019精品手机国产品在线 | 亚洲一区二区视频 | 国产欧美日韩精品一区二区三区 | 日韩一级欧美一级在线观看 | 一级黄色免费片 | 欧美俄罗斯一级毛片 | 免费看成人国产一区二区三区 | 五月婷婷在线观看 | 欧美第九页 | 国产精品亚洲欧美一级久久精品 | 又大又硬又黄又刺激的免费视频 | 国产成人精品区在线观看 | 欧美乱大交黑人 | 12一15女人a毛片 | 亚洲欧美日韩国产 | 欧美日韩国产亚洲综合不卡 | 午夜视频在线观看视频 | 国产精品久久久久久搜索 | 欧美伦理一区 | 一区二区高清视频 | 国产午夜精品久久久久九九 | 欧美人成片免费看视频不卡 | 伊人网99| 亚洲国产精品成人久久 | 日本特黄的免费大片视频 | 欧美激情视频一区二区 | 亚洲精品中文字幕乱码三区 | 最近新中文字幕大全高清视频 | 日本xxxxx黄区免费看动漫 |