Data Collection with Apache Flume(二)

      今天继续讨论几个agent的配置。

      第一个agent是从终端捕获特定命令执行的输出结果,并将文件输出到特定目录。先看一下配置的代码:      

agent2.sources = execsource      //指定为从命令获取输出的source
agent2.sinks = filesink          //输出到文件的sink
agent2.channels = filechannel     //输出到文件的channel

agent2.sources.execsource.type = exec  //类型
agent2.sources.execsource.command = cat /home/leung/message   //指定命令

agent2.sinks.filesink.type = FILE_ROLL
agent2.sinks.filesink.sink.directory = /home/leung/flume/files     //输出目录
agent2.sinks.filesink.sink.rollInterval = 0

agent2.channels.filechannel.type = file
agent2.channels.filechannel.checkpointDir = /home/leung/flume/fc/checkpoint //检查点
agent2.channels.filechannel.dataDirs = /home/leung/flume/fc/data  //channel的数据目录

agent2.sources.execsource.channels = filechannel
agent2.sinks.filesink.channel = filechannel

   OK,启动agent2,然后查看结果。

      技术分享

      结果如下图。可以看到,执行 cat /home/leung/message命令之后,输出的结果与files目录中的文件内容是一致的,证明已经成功写入文件。

      技术分享

      下一个agent是从网络端口 获取数据然后写到Hadoop集群的HDFS中。先看看配置代码:

agent4.sources = netsource 
agent4.sinks = hdfssink //HDFS sink
agent4.channels = memorychannel

agent4.sources.netsource.type = netcat
agent4.sources.netsource.bind = localhost
agent4.sources.netsource.port = 3000


agent4.sinks.hdfssink.type = hdfs
agent4.sinks.hdfssink.hdfs.path = /flume  //写出到HDFS上的文件目录,不需要提前创建
agent4.sinks.hdfssink.hdfs.filePrefix = log  //指定写出文件的文件名前缀
agent4.sinks.hdfssink.hdfs.rollInterval = 0
agent4.sinks.hdfssink.hdfs.rollCount = 3
agent4.sinks.hdfssink.hdfs.fileType = DataStream

agent4.channels.memorychannel.type = memory
agent4.channels.memorychannel.capacity = 1000
agent4.channels.memorychannel.transactionCapacity = 100

agent4.sources.netsource.channels = memorychannel
agent4.sinks.hdfssink.channel = memorychannel

    下面启动agent4以及查看一下结果。 

      技术分享

       下面查看一下结果。发现在HDFS中已经新建了一个flume文件夹,并且已经写入了指定的内容。

      技术分享

      接着我们为文件夹名加一个时间戳。详细看如下配置代码。

agent5.sources = netsource
agent5.sinks = hdfssink
agent5.channels = memorychannel

agent5.sources.netsource.type = netcat
agent5.sources.netsource.bind = localhost
agent5.sources.netsource.port = 3000
agent5.sources.netsource.interceptors = ts
agent5.sources.netsource.interceptors.ts.type = org.apache.flume.interceptor.TimestampInterceptor$Builder //引用这个类方法添加时间戳


agent5.sinks.hdfssink.type = hdfs
agent5.sinks.hdfssink.hdfs.path = /flume-%Y-%m-%d //定义文件夹名格式
agent5.sinks.hdfssink.hdfs.filePrefix = log-
agent5.sinks.hdfssink.hdfs.rollInterval = 0
agent5.sinks.hdfssink.hdfs.rollCount = 3
agent5.sinks.hdfssink.hdfs.fileType = DataStream

agent5.channels.memorychannel.type = memory
agent5.channels.memorychannel.capacity = 1000
agent5.channels.memorychannel.transactionCapacity = 100

agent5.sources.netsource.channels = memorychannel
agent5.sinks.hdfssink.channel = memorychannel

  OK,下面启动agent5。

     技术分享

     下面查看一下结果。可以看到文件夹的名字被如期地加上了日期。

     技术分享

     技术分享

     OK,先到这里,还有两个稍微复杂一点点的agent下次再讨论。本人水平有限,请各位不吝指正!谢谢!

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。