【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

技术分享

如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下【这里取出了parent】:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>org.apache.flume.flume-ng-sinks</groupId>
  <artifactId>flume-ng-kafka-sink</artifactId>
  <name>Flume Kafka Sink</name>
  <version>1.0.0</version>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-sdk</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-configuration</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.6.1</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.8.1.1</version>
    </dependency>
  </dependencies>

</project>
这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误https://issues.apache.org/jira/browse/FLUME-1372

自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:

package com.cmcc.chiwei.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;

public class CmccKafkaSink extends AbstractSink implements Configurable {

	private static final Logger log = LoggerFactory
			.getLogger(CmccKafkaSink.class);

	public static final String KEY_HDR = "key";
	public static final String TOPIC_HDR = "topic";
	private static final String CHARSET = "UTF-8";
	private Properties kafkaProps;
	private Producer<String, byte[]> producer;
	private String topic;
	private int batchSize;// 一次事务的event数量,整体提交
	private List<KeyedMessage<String, byte[]>> messageList;

	@Override
	public Status process() throws EventDeliveryException {
		// TODO Auto-generated method stub
		Status result = Status.READY;
		Channel channel = getChannel();
		Transaction transaction = null;
		Event event = null;
		String eventTopic = null;
		String eventKey = null;
		try {
			long processedEvent = 0;
			transaction = channel.getTransaction();
			transaction.begin();// 事务开始
			messageList.clear();
			for (; processedEvent < batchSize; processedEvent++) {
				event = channel.take();// 从channel取出一个事件
				if (event == null) {
					break;
				}
				// Event对象有头和体之分
				Map<String, String> headers = event.getHeaders();
				byte[] eventBody = event.getBody();
				if ((eventTopic = headers.get(TOPIC_HDR)) == null) {// 判断event头部中的topic是否为null
					eventTopic = topic;
				}
				eventKey = headers.get(KEY_HDR);

				if (log.isDebugEnabled()) {
					log.debug("{Event}" + eventTopic + ":" + eventKey + ":"
							+ new String(eventBody, CHARSET));
					log.debug("event #{}", processedEvent);
				}

				KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(
						eventTopic, eventKey, eventBody);
				messageList.add(data);

			}
			if (processedEvent > 0) {
				producer.send(messageList);
			}
			transaction.commit();// batchSize个事件处理完成,一次事务提交

		} catch (Exception e) {
			String errorMsg = "Failed to publish events !";
			log.error(errorMsg, e);
			result = Status.BACKOFF;
			if (transaction != null) {
				try {
					transaction.rollback();
					log.debug("transaction rollback success !");
				} catch (Exception ex) {
					log.error(errorMsg, ex);
					throw Throwables.propagate(ex);
				}
			}
			throw new EventDeliveryException(errorMsg, e);
		} finally {
			if (transaction != null) {
				transaction.close();
			}
		}
		return result;
	}

	@Override
	public synchronized void start() {
		// TODO Auto-generated method stub
		ProducerConfig config = new ProducerConfig(kafkaProps);
		producer = new Producer<String, byte[]>(config);
		super.start();
	}

	@Override
	public synchronized void stop() {
		// TODO Auto-generated method stub
		producer.close();
		super.stop();
	}

	@Override
	public void configure(Context context) {
		// TODO Auto-generated method stub
		batchSize = context.getInteger(Constants.BATCH_SIZE,
				Constants.DEFAULT_BATCH_SIZE);
		messageList = new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
		log.debug("Using batch size: {}", batchSize);
		topic = context.getString(Constants.TOPIC, Constants.DEFAULT_TOPIC);
		if (topic.equals(Constants.DEFAULT_TOPIC)) {
			log.warn("The property 'topic' is not set .  Using the default topic name ["
					+ Constants.DEFAULT_TOPIC + "]");
		} else {
			log.info("Using the configured topic:[" + topic
					+ "] this may be over-ridden by event headers");
		}
		kafkaProps = KafkaUtil.getKafkaConfig(context);
		if (log.isDebugEnabled()) {
			log.debug("Kafka producer properties : " + kafkaProps);
		}

	}

}
然后mvn clean install编译打包jar,将此jar包丢到flume安装目录的lib下就可以了,下面就是编辑conf文件了

当然conf文件中具体属性的key跟你自定义sink中的属性是一致的,自定义中读的key就是你配置文件中的key

如:

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=async
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=testToptic






















郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。