flume学习安装
最近项目组有需求点击流日志需要自己收集,学习了一下flume并且安装成功了。相关信息记录一下。
1)下载flume1.5版本
wget http://www.apache.org/dyn/closer.cgi/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz
2) 解压flume1.5
tar -zxvf apache-flume-1.5.0.1-bin.tar.gz
3) 配置环境变量
jdk已装
export FLUME_HOME=/XXX/XX/apache-flume-1.5.0.1-bin
export PATH=$FLUME_HOME/bin:$PATH</p><p align="left">
4) 配置conf相关文件</p><p align="left">
4.1) 配置flume-env.sh 主要设置一下JAVA_HOME
4.2) 配置log4j.properties
如果是测试环境注释掉flume.root.logger=INFO,LOGFILE选择flume.root.logger=DEBUG,console把日志打印到控制台
4.3) 配置flume-conf.properties 这个文件名可以随便改 运行命令时指定你自己创建的属性文件即可
#set agent 名字为a1 sources名字为r1 sinks名字为k1 channels名字为c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
sources组件类型为exec 执行linux命令
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/flume/flume/conf/source.txt (大小tail -f有很大区别,解决了我们一个大问题)
sinks组件类型为logger
a1.sinks.k1.type = logger
channels组件类型为内存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
把sources、sinks与管道连通起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5) 在flume目录下运行命令
bin/flume-ng agent -n a1 -f test/source-tail-sink-logger.properties --conf conf
初步的例子完成了。目前我们生产环境是有两个节点往metaq里面生产数据。 metaq自定义一个sink(自定义sink见后面代码)
记得把metaq相关jar放到flume/lib下 gecko-1.1.4.jar metamorphosis-client-1.4.6.2.jar metamorphosis-commons-1.4.6.2.jar zkclient-0.3.jar zookeeper-3.4.3.jar
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/flume/flume/conf/source.txt
a1.sinks.k1.type = com.XX.flume.sink.MetaQSink
a1.sinks.k1.sink.zkConnect = 0.0.0.0:2181,0.0.0.0:2181,0.0.0.0:2181
a1.sinks.k1.sink.zkRoot = /meta(此目录必须写死)
a1.sinks.k1.sink.topic = XXXX
a1.sinks.k1.sink.batchSize = 20000
#a1.channels.c1.type = memory
#a1.channels.c1.capacity = 1000000
#a1.channels.c1.transactionCapacity = 100000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop/flume/flume/channel/checkpoint
a1.channels.c1.dataDirs = /home/hadoop/flume/flume/channel/data
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
自定义sink代码
</pre><pre name="code" class="java">package com.jd.flume.sink; import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.producer.MessageProducer; import com.taobao.metamorphosis.client.producer.SendResult; import com.taobao.metamorphosis.exception.MetaClientException; import com.taobao.metamorphosis.utils.ZkUtils; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * 功能描述: * <p/> * 这个类主要是将flume收集的数据发送到metaq消息队列中 * <p/> * ---------------------------- * 姓名:毛祥溢 * 邮箱:[email protected] * 网站:www.maoxiangyi.cn */ public class MetaQSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MetaQSink.class); private MessageSessionFactory sessionFactory; private MessageProducer producer; private String zkConnect; private String zkRoot; private String topic; private int batchSize; private int threadNum; private ExecutorService executor; public MetaQSink() { } @Override public void configure(Context context) { this.zkConnect = context.getString("sink.zkConnect"); this.zkRoot = context.getString("sink.zkRoot"); this.topic = context.getString("sink.topic"); this.batchSize = context.getInteger("sink.batchSize", 10000); this.threadNum = context.getInteger("sink.threadNum", 50); executor = Executors.newCachedThreadPool(); MetaClientConfig metaClientConfig = new MetaClientConfig(); ZkUtils.ZKConfig zkConfig = new ZkUtils.ZKConfig(); zkConfig.zkConnect = zkConnect; zkConfig.zkRoot = zkRoot; metaClientConfig.setZkConfig(zkConfig); try { sessionFactory = new MetaMessageSessionFactory(metaClientConfig); } catch (MetaClientException e) { e.printStackTrace(); logger.error("", e); throw new RuntimeException("init error"); } producer = sessionFactory.createProducer(); logger.info("zkConnect:" + zkConnect + ", zkRoot:" + zkRoot + ", topic:" + topic); } @Override public Status process() throws EventDeliveryException { long start = System.currentTimeMillis(); producer.publish(topic); Status result = Status.READY; final Channel channel = getChannel(); final AtomicInteger al = new AtomicInteger(0); final CountDownLatch cdl = new CountDownLatch(threadNum); for (int t = 0; t < threadNum; t++) { executor.execute(new Runnable() { @Override public void run() { Event event = null; Transaction transaction = null; int i = 0; try { transaction = channel.getTransaction(); transaction.begin(); boolean startTransaction = false; for (i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { if (i == 0) { producer.beginTransaction(); startTransaction = true; } final SendResult sendResult = producer .sendMessage(new Message(topic, event .getBody())); // check result if (!sendResult.isSuccess()) { logger.error("Send message failed,error message:" + sendResult.getErrorMessage()); throw new RuntimeException( "Send message failed,error message:" + sendResult .getErrorMessage()); } else { logger.debug("Send message successfully,sent to " + sendResult.getPartition()); } } else { // No event found, request back-off semantics // from the sink // runner // result = Status.BACKOFF; break; } } if (startTransaction) { producer.commit(); } al.addAndGet(i); transaction.commit(); } catch (Exception ex) { logger.error("error while rollback:", ex); try { producer.rollback(); } catch (Exception e) { e.printStackTrace(); } transaction.rollback(); } finally { cdl.countDown(); transaction.close(); } } }); } try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (al.get() == 0) { result = Status.BACKOFF; } logger.info("metaqSink_new,process:{},time:{},queue_size:{}", new Object[] { al.get(), System.currentTimeMillis() - start }); return result; } }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。