【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下【这里取出了parent】:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-kafka-sink</artifactId> <name>Flume Kafka Sink</name> <version>1.0.0</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> </dependencies> </project>这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误https://issues.apache.org/jira/browse/FLUME-1372
自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:
package com.cmcc.chiwei.kafka; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; public class CmccKafkaSink extends AbstractSink implements Configurable { private static final Logger log = LoggerFactory .getLogger(CmccKafkaSink.class); public static final String KEY_HDR = "key"; public static final String TOPIC_HDR = "topic"; private static final String CHARSET = "UTF-8"; private Properties kafkaProps; private Producer<String, byte[]> producer; private String topic; private int batchSize;// 一次事务的event数量,整体提交 private List<KeyedMessage<String, byte[]>> messageList; @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = null; Event event = null; String eventTopic = null; String eventKey = null; try { long processedEvent = 0; transaction = channel.getTransaction(); transaction.begin();// 事务开始 messageList.clear(); for (; processedEvent < batchSize; processedEvent++) { event = channel.take();// 从channel取出一个事件 if (event == null) { break; } // Event对象有头和体之分 Map<String, String> headers = event.getHeaders(); byte[] eventBody = event.getBody(); if ((eventTopic = headers.get(TOPIC_HDR)) == null) {// 判断event头部中的topic是否为null eventTopic = topic; } eventKey = headers.get(KEY_HDR); if (log.isDebugEnabled()) { log.debug("{Event}" + eventTopic + ":" + eventKey + ":" + new String(eventBody, CHARSET)); log.debug("event #{}", processedEvent); } KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>( eventTopic, eventKey, eventBody); messageList.add(data); } if (processedEvent > 0) { producer.send(messageList); } transaction.commit();// batchSize个事件处理完成,一次事务提交 } catch (Exception e) { String errorMsg = "Failed to publish events !"; log.error(errorMsg, e); result = Status.BACKOFF; if (transaction != null) { try { transaction.rollback(); log.debug("transaction rollback success !"); } catch (Exception ex) { log.error(errorMsg, ex); throw Throwables.propagate(ex); } } throw new EventDeliveryException(errorMsg, e); } finally { if (transaction != null) { transaction.close(); } } return result; } @Override public synchronized void start() { // TODO Auto-generated method stub ProducerConfig config = new ProducerConfig(kafkaProps); producer = new Producer<String, byte[]>(config); super.start(); } @Override public synchronized void stop() { // TODO Auto-generated method stub producer.close(); super.stop(); } @Override public void configure(Context context) { // TODO Auto-generated method stub batchSize = context.getInteger(Constants.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE); messageList = new ArrayList<KeyedMessage<String, byte[]>>(batchSize); log.debug("Using batch size: {}", batchSize); topic = context.getString(Constants.TOPIC, Constants.DEFAULT_TOPIC); if (topic.equals(Constants.DEFAULT_TOPIC)) { log.warn("The property 'topic' is not set . Using the default topic name [" + Constants.DEFAULT_TOPIC + "]"); } else { log.info("Using the configured topic:[" + topic + "] this may be over-ridden by event headers"); } kafkaProps = KafkaUtil.getKafkaConfig(context); if (log.isDebugEnabled()) { log.debug("Kafka producer properties : " + kafkaProps); } } }然后mvn clean install编译打包jar,将此jar包丢到flume安装目录的lib下就可以了,下面就是编辑conf文件了
当然conf文件中具体属性的key跟你自定义sink中的属性是一致的,自定义中读的key就是你配置文件中的key
如:
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=async producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=testToptic
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。