flume介绍
flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的channel。Flume部署的最小单元是Flume Agent ,一个Flume Agent可以连接一个或者多个其他的Agent。一个Agent也可以从一个或者多个Agent接收数据。通过互相连接的多个Flume Agent,一个流作业被建立,这个Flume Agent链条可以将数据从一个位置移动到另一个位置-----特别是,从生产数据的应用程序到HDFS、HBase等。
Flume Agent内部原理
每个Flume Agent有三个组件:source,channel和sink。source负责获取事件到Flume Agent,而sink负责从Agent移走事件并转发他们到拓扑结构中的下一个agent,或者到HDFS、HBase、Solr等。channel是一个存储source已经接收到的数据的缓冲区,直到sink已经将数据成功写入到下一阶段或者最终目的地。
source是从一些其他产生数据的应用中接收数据的活跃组件。有自己产生数据的source,不过source通常用于测试目的,source可以监听一个或者多个网络端口,用于接收或者可以从本地文件系统读取数据。每个source必须至少连接一个channel。基于一些标准,一个source可以写入几个channel,复制事件到所有或者某些channel。
channel是被动组件,缓冲agent已经接收,但尚未写出到另一个agent或者存储系统的数据。channel就像一个队列,source写入到他们,sink从他们中读取,多个source可以安全的写入到相同的channel,并且多个sink可以从相同的channel进行读取。可以一个sink只能从一个channel读取。如果多个sink从相同的channel读取,它可以保证只有一个sink将会从channel读取一个特定的事件。
sink连续轮询各自的channel来读取和删除事件。sink将事件推送到下一阶段或者最终目的地。一旦在下一阶段或者其目的地中数据是安全的,sink通过事务通知channel,可以从channel中删除这些事件。
数据处理过程
flume本身不限制agent中的source、channel和sink的数量。因此source可以接收事件,并可以通过配置将事件复制到多个目的地。这使得source通过channel处理器、拦截器和channel选择器,最后写入数据到channel。
每个source都有自己的channel处理器,每次source将数据写入channel,他是通过委派该任务到其channel处理器来完成的,然后channel处理器将这些事件传到一个或多个source配置的拦截器中。
拦截器是一段代码,可以基于某些他完成的处理来读取、修改和删除事件。基于某些标准,如正则表达式,拦截器可以来删除事件,为事件添加新报头或者移除现有的报头等。每个source可以配置成使用多个拦截器,按照配置中定义的顺序被调用,将拦截器处理后的结果传递给下一个单元。这也是责任链的设计模式,一旦拦截器处理完事件,拦截器返回的事件列表传递到channel列表,即通过channel选择器为每个事件选择的channel。
配置Flume Agent
Flume Agent使用纯文本的配置文件来配置。Flume配置使用属性文件格式,仅仅是用换行符分隔的键值对的纯文本文件。属性文件的实例如下所示:
key1 = value1
key2 = value2
通过该格式,Flume可以很容易的将配置传递给Agent和它的各种组件。在配置文件中,Flume使用分层结构,每个Flume
Agent都有一个名称,在Flume Agent使用flume-ng命令启动的时候设置。配置文件可以包含若干个FLume Agent的配置,但实际只加载flume-ng命令中指定名称的Agent配置。
在Flume Agent有一些组件可以有若干个实例,像source、sink、channel等。为了能够识别出这些组件的每一个配置,需要对这些组件进行命名。对一个Agent,配置文件必须使用下面的格式列出 source、channel、sink的名称,该列表称为活跃列表:
agent.sources = source1 source2
agent.sinks = sink1 sink2 sink3 sink4
agent.sinkgroup = sg1 sg2
agent.channels = channel1 channel2
上面的配置片段表示名为agent的Flume Agent,带有两个source,两个sink组、两个channel、四个sink。即使某些部件罗列出配置参数,如果他们不是在活跃列表中,则他们不创建、配置或启动。其他组件,例如拦截器和channel选择器,不需要存在于活跃列表中。当和他们有关的组件(sink、source)是活跃的,他们会自动创建并激活。对于需要配置的每个组件,组件的配置使用下面的格式前缀传递:
<agent-name>.<component-type>.<component-name>.<configuration-parameter> = value
用于source的<component-type>部分是sources,sink是sinks,channel是channnels,sink组是sinkgroup。
Flume Agent配置实例
agent.sources = httpSrc
agent.channels = memory1 memory2
agent.sinks = hdfsSink hbaseSink
# source的配置参数
agent.sources.httpSrc.type = http
agent.sources.httpSrc.channels = memory1 memory2
agent.sources.httpSrc.bind = 0.0.0.0
agent.sources.httpSrc.port = 4353
agent.sources.httpSrc.ssl = true
agent.sources.httpSrc.keystore = /tmp/keystore
agent.sources.httpSrc.keystore.password = UsingFlume
agent.sources.httpSrc.handler = usingflume.ch03.HTTPSourceXMLHandler
agent.sources.httpSrc.insertTimetamp = true
agent.sources.httpSrc.interceptors = hostInterceptor
agent.sources.httpSrc.interceptors.hostInterceptor.type = host
# channel 的配置
agent.channels.memory1.type = memory
agent.channels.memory2.type = memory
# hdfsSink配置
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.channel = memory1
agent.sinks.hdfsSink.path = /data/usingFlume/%{topic}/%Y%m/%d/%H/%M
agent.sinks.hdfsSink.filePrefix = UsingFlume
# hbaseSink配置
agent.sinks.hbaseSink.type = asynchbase
agent.sinks.hbaseSink.channel = memory2
agent.sinks.hbaseSink.serializer = usingflume.ch05.AsyncHBaseDirectSerializer
agent.sinks.hbaseSink.table = usingFlumeTable
上面展示了具有多个组件的Flume Agent的一个例子,其中一些组件拥有子组件,在Agent中,有一个source,两个channel和两个sink,source是一个http source,命名为httpSrc。该source写入两个内存channel,memory和memory-----这是由配置系统在channel处理器中设置,source实现不需要设置channel,多个参数bind、port、ssl、keystore、keystore-password/handler和handler.insertTimestamp 他们的值在传递给context实例(源码中传递参数的对象)中是可以用的,对于此配置文件,配置系统还创建了一个拦截器。项目启动时加载该配置,agent就会按照配置中的那样进行工作,第一次写博客,简要介绍了flume的工作原理和内部组件,有什么不对的地方欢迎指正。
登录 | 立即注册