Flume一个数据源对应多个channel,多个sink
一、概述
1、现在有三台机器,分别是:Hadoop1,Hadoop2,Hadoop3,以Hadoop1为日志汇总
2、Hadoop1汇总的同时往多个目标进行输出
3、Flume一个数据源对应多个channel,多个sink,是在consolidation-accepter.conf文件里配置的
二、部署Flume来采集日志和汇总日志
1、在Hadoop1上运行
flume-ng agent --conf ./ -f consolidation-accepter.conf -n agent1 -Dflume.root.logger=INFO,console
其脚本(consolidation-accepter.conf)内容如下
# Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 ch2 agent1.sources = source1 agent1.sinks = hdfssink1 sink2 agent1.source.source1.selector.type = replicating agent1.source.source1.selector.optional = ch1 # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000000 agent1.channels.ch1.transactionCapacity = 1000000 agent1.channels.ch1.keep-alive = 10 agent1.channels.ch2.type = memory agent1.channels.ch2.capacity = 1000000 agent1.channels.ch2.transactionCapacity = 100000 agent1.channels.ch2.keep-alive = 10 # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.source1.channels = ch1 ch2 agent1.sources.source1.type = avro agent1.sources.source1.bind = con agent1.sources.source1.port = 44444 agent1.sources.source1.threads = 5 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.hdfssink1.channel = ch1 agent1.sinks.hdfssink1.type = hdfs agent1.sinks.hdfssink1.hdfs.path = hdfs://mycluster/flume/%Y-%m-%d/%H%M agent1.sinks.hdfssink1.hdfs.filePrefix = S1PA124-consolidation-accesslog-%H-%M-%S agent1.sinks.hdfssink1.hdfs.useLocalTimeStamp = true agent1.sinks.hdfssink1.hdfs.writeFormat = Text agent1.sinks.hdfssink1.hdfs.fileType = DataStream agent1.sinks.hdfssink1.hdfs.rollInterval = 1800 agent1.sinks.hdfssink1.hdfs.rollSize = 5073741824 agent1.sinks.hdfssink1.hdfs.batchSize = 10000 agent1.sinks.hdfssink1.hdfs.rollCount = 0 agent1.sinks.hdfssink1.hdfs.round = true agent1.sinks.hdfssink1.hdfs.roundValue = 60 agent1.sinks.hdfssink1.hdfs.roundUnit = minute agent1.sinks.sink2.type = logger agent1.sinks.sink2.sink.batchSize=10000 agent1.sinks.sink2.sink.batchTimeout=600000 agent1.sinks.sink2.sink.rollInterval = 1000 agent1.sinks.sink2.sink.directory=/root/data/flume-logs/ agent1.sinks.sink2.sink.fileName=accesslog agent1.sinks.sink2.channel = ch22、分别在Hadoop2和Hadoop3运行如下命令
flume-ng agent --conf ./ --conf-file collect-send.conf --name agent1
Flume数据发送器配置文件collect-send.conf内容如下
agent2.sources = source2 agent2.sinks = sink1 agent2.channels = ch2 agent2.sources.source2.type = exec agent2.sources.source2.command = tail -F /root/data/flume.log agent2.sources.source2.channels = ch2 #channels configuration agent2.channels.ch2.type = memory agent2.channels.ch2.capacity = 10000 agent2.channels.ch2.transactionCapacity = 10000 agent2.channels.ch2.keep-alive = 3 #sinks configuration agent2.sinks.sink1.type = avro agent2.sinks.sink1.hostname=consolidationIpAddress agent2.sinks.sink1.port = 44444 agent2.sinks.sink1.channel = ch2
三、总结
1、启动Flume汇总进程 flume-ng agent --conf ./ -f consolidation-accepter.conf -n agent1 -Dflume.root.logger=INFO,console 2、启动Flume采集进程 flume-ng agent --conf ./ --conf-file collect-send.conf --name agent1 3、配置参数说明(以下两个条件是or的关系,也就是当一个条件满足就触发) (1)每半小时把channel里的数据冲刷到sink中去,并且另起新的文件来存储 agent1.sinks.hdfssink1.hdfs.rollInterval = 1800 (2)当文件大小为5073741824字节时,另起新的文件来存储 agent1.sinks.hdfssink1.hdfs.rollSize = 5073741824
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。