Flume自定义Source

 大家好。

 

    公司有个需求。要求Flumne 从MQ 取消息存储到DFS ,写了Flume自定义的source 。,由于我也是刚接触Flume 。 所以有啥不对的请谅解。

    查看了Flume-ng的源码。  一般都是根据不同的场景  extends AbstractSource implements EventDrivenSource, Configurable

 

   MQSource 代码如下:

  

 1 public class MQSource extends AbstractSource implements EventDrivenSource, Configurable {
 2 
 3     private Logger logger = org.slf4j.LoggerFactory.getLogger(MQSource.class);
 4 
 5     private long heartbeat;
 6 
 7     private MQReceiver receiver;
 8 
 9     private HandleLineCallBack handle;
10 
11     private Thread t;
12 
13     @Override
14     public void configure(Context context) {
15 
16         String mq_url = context.getString(MQContext.MQ_BROKER_URI, ActiveMQConnection.DEFAULT_BROKER_URL);
17         String mq_userName = context.getString(MQContext.MQ_USERNAME, ActiveMQConnection.DEFAULT_USER);
18         String mq_password = context.getString(MQContext.MQ_PASSWORD, ActiveMQConnection.DEFAULT_PASSWORD);
19         String mq_queueKey = context.getString(MQContext.MQ_QUEUEKEY, "NULL");
20         String handleClass = context.getString(MQContext.HANDLECLASS, "NULL");
21 
22         long mq_reciveTimeout = context.getLong(MQContext.MQ_RECIVETIMEOUT, 3000L);
23         long heartbeat = context.getLong(MQContext.HEARTBEAT, 3000L);
24         this.heartbeat = heartbeat;
25         if ("NULL".equals(mq_queueKey)) {
26             logger.error("{} :  Unable to load MQ_queueKey ", getName());
27             return;
28         }
29         if ("NULL".equals(handleClass)) {
30             logger.warn("{} :  Unable to handleClass using DefaultHandle ", getName());
31             handleClass = "com.bidlink.handle.DefaultHandle";
32         }
33 
34         MQConfig mqconfig = new MQConfig(mq_url, mq_userName, mq_password, mq_queueKey, mq_reciveTimeout);
35         logger.info("{}  MQ Configuration : {} ", getName(), mqconfig.toString());
36         receiver = MQFactory.MQ.getReceiver(mqconfig);
37         logger.info("{} .get recerver key is {} .  obj is  : {} ", getName(), mqconfig.getQueueKey(), receiver);
38 
39         try {
40             @SuppressWarnings("unchecked")
41             Class<?> handleClazz = (Class<? extends HandleLineCallBack>) Class.forName(handleClass);
42             handle = (HandleLineCallBack) handleClazz.newInstance();
43         } catch (ClassNotFoundException e) {
44             logger.error("{} unable to load class  {}  . {} ", getName(), handleClass, e);
45         } catch (InstantiationException e1) {
46             logger.error("{} instance class error {} . {} ", getName(), handleClass, e1);
47         } catch (IllegalAccessException e2) {
48             logger.error("{} occur  exception {} . {} ", getName(), handleClass, e2);
49         }
50     }
51 
52     @Override
53     public synchronized void start() {
54         logger.info("MQSource start.....");
55         // TODO Auto-generated method stub
56         try {
57             t = new Thread() {
58                 public void run() {
59                     while (true) {
60                         try {
61                             List<String> lines = receiver.getText();
62                             for (String line : lines) {
63                                 //logger.info("Message line : {} ",line);
64                                 Event e = new SimpleEvent();
65                                 String refStr = handle.refactor(line);
66                                 e.setBody(refStr.getBytes("GBK"));
67                                 getChannelProcessor().processEvent(e);
68                             }
69                             super.start();
70                             Thread.sleep(heartbeat);
71                         } catch (Exception e1) {
72                             e1.printStackTrace();
73                         }
74                         
75                     }
76                 };
77             };
78             t.start();
79         } catch (Exception e1) {
80             logger.error("error starting MQResource {} ",e1.getMessage());
81             e1.printStackTrace();
82         }
83     }
84 
85     @Override
86     public synchronized void stop() {
87         logger.info("MQSource stoping...");
88         if (t.isAlive()) {
89             try {
90                 t.join();
91             } catch (InterruptedException e) {
92                 e.printStackTrace();
93             }
94             t.interrupt();
95         }
96         super.stop();
97     }
98 
99 }

  start方法中主要代码:

 

  Event e = new SimpleEvent();
                              
  e.setBody("hello everyone ".getBytes("GBK"));
 
getChannelProcessor().processEvent(e);

super.start();
configure方法中的context中能获取各种自定义的配置信息。如在flume.conf中配置以下信息:

tier1.sources.testSources.type = org.yunjume.source.MQSource
tier1.sources.testSources.MQ_userName= admin
tier1.sources.testSources.MQ_password= admin123
tier1.sources.testSources.MQ_brokerURL=tcp://localhost:61616
tier1.sources.testSources.MQ_queueKey=FirstQueue
tier1.sources.testSources.MQ_reciveTimeout=30000
tier1.sources.testSources.heartbeat=30000
# to process mq message queue line and return new line . 
tier1.sources.testSources.handleClass=org.yunjume.handle.DefaultHandle
tier1.sources.testSources.channels = testChannels

 



获取MQ_userName值代码为:
String mq_userName = context.getString("MQ_userName", ActiveMQConnection.DEFAULT_USER);

 stop 就结束了。

 

 打包jar 放到Flume主目录的插件目录下。我的是/usr/lib/flume-ng/plugins.d

 如打包的名字叫MQSource.jar 那应该在plugins.d创建文件夹 MQSource 然后把MQSource.jar放到MQSource/lib下。

依赖的jar 放到 MQSource/libext下 。目录结构如下

 /usr/lib/flume-ng/plugins.d/MQSource/lib/MQSource.jar

 /usr/lib/flume-ng/plugins.d/MQSource/libext/   依赖的jar包

/usr/lib/flume-ng/plugins.d/MQSource/native   本地so文件或dll文件

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。