一个spark receiver 或多个spark receiver 接收 多个flume agent
- 利用一个spark receiver 接收多个flume agent
String host = args[0];
int port = Integer.parseInt(args[1]);
String host1 = args[2];
int port1 = Integer.parseInt(args[3]);
InetSocketAddress address1 = new InetSocketAddress(host,port);
InetSocketAddress address2 = new InetSocketAddress(host1,port1);
InetSocketAddress[] InetSocketAddressArray = {address1,address2};
JavaStreamingContext jssc = new JavaStreamingContext(new SparkConf().setAppName("JavaFlumeEventHandle_1"), Durations.seconds(2));
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(jssc, InetSocketAddressArray, StorageLevel.MEMORY_AND_DISK_SER_2());
- 2个spark receiver接收多个flume agent
String host = args[0];
int port = Integer.parseInt(args[1]);
String host1 = args[2];
int port1 = Integer.parseInt(args[3]);
InetSocketAddress address1 = new InetSocketAddress(host,port);
InetSocketAddress address2 = new InetSocketAddress(host1,port1);
InetSocketAddress[] InetSocketAddressArray = {address1,address2};
JavaStreamingContext jssc = new JavaStreamingContext(new SparkConf().setAppName("JavaFlumeEventHandle_1"), Durations.seconds(2));
// JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(jssc, InetSocketAddressArray, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(jssc,host,port);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream1 = FlumeUtils.createPollingStream(jssc, host1, port1);
JavaDStream<SparkFlumeEvent> union = flumeStream.union(flumeStream1);
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。