【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现

HDFS sink里有个属性hdfs.rollInterval=86400,这个属性你设置了24小时滚动一次,它的确就到了24小时才滚动,但是我们的需求往往是到了0点就滚动文件了,因为离线的job因为都会放在夜里执行。


如果flume是早上9点启动的,那么要到明天早上9点,hdfs的文件才会关闭,难道job要等到9点后才执行,这显然不合适,所以通过修改源码使其能够在0点滚动文件。

首先添加一个属性,可配置为day,hour,min

 private String timeRollerFlag;

  timeRollerFlag = context.getString("hdfs.timeroller.flag", Constants.defaultTimeRollerFlagDay);

public class Constants {

	public static final String defaultTimeRollerFlagDay = "day";

	public static final String timeRollerFlagHour = "hour";

	public static final String timeRollerFlagMin = "min";
}

HDFS sink中在new BucketWriter的时候,需要将参数传递过去

BucketWriter bucketWriter = new BucketWriter(rollInterval,
      rollSize, rollCount,
      batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
      suffix, codeC, compType, hdfsWriter, timedRollerPool,
      proxyTicket, sinkCounter, idleTimeout, closeCallback,
      lookupPath, callTimeout, callTimeoutPool, retryInterval,
      tryCount,timeRollerFlag);

以上是针对org.apache.flume.sink.hdfs.HDFSEventSink的修改。

下面看org.apache.flume.sink.hdfs.BucketWriter的修改:

private final String timeRollerFlag;

	private Calendar calendar = Calendar.getInstance();
	private int lastDayOfYear;
	private int lastYear;
	private int lastHour;
	private int lastMin;
	private int nowDayOfYear;
	private int nowYear;
	private int nowHour;
	private int nowMin;

private static Date fileOpenTime = null;
<pre name="code" class="java">// when open the file in hdfs with inUseSuffix,instantiate the
				// fileOpenTime
				fileOpenTime = new Date();
			} catch (Exception ex) {



if (!isOpen) {
			if (closed) {
				throw new BucketClosedException("This bucket writer was closed and "
						+ "this handle is thus no longer valid");
			}
			open();
		} else {
			LOG.debug("##############the file is opened");
			calendar.setTime(fileOpenTime);
			lastDayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
			lastYear = calendar.get(Calendar.YEAR);
			lastHour = calendar.get(Calendar.HOUR_OF_DAY);
			lastMin = calendar.get(Calendar.MINUTE);
			Date now = new Date();
			calendar.setTime(now);
			nowDayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
			nowYear = calendar.get(Calendar.YEAR);
			nowHour = calendar.get(Calendar.HOUR_OF_DAY);
			nowMin = calendar.get(Calendar.MINUTE);
			LOG.debug("fileOpenTime = {},nowTime = {}", JodaTimeUtil.parseToString(fileOpenTime,
					JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL), JodaTimeUtil.parseToString(now,
					JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL));

			// 年份相同,日期+1,年份+1,now日期=1
			boolean condition1 = (lastYear == nowYear && (nowDayOfYear == (lastDayOfYear + 1)))
					|| (nowYear == (lastYear + 1) && nowDayOfYear == 1);
			// day相同,小时+1,或者day不同,小时=0
			boolean condition2 = (lastDayOfYear == nowDayOfYear && nowHour == (lastHour + 1))
					|| (lastDayOfYear != nowDayOfYear && nowHour == 0);
			// hour相同,分钟+1,或者hour不同,分钟=0
			boolean condition3 = (lastHour == nowHour && nowMin == (lastMin + 1))
					|| (lastHour != nowHour && nowMin == 0);
			// 判断滚动标识
			if (timeRollerFlag.equals(Constants.defaultTimeRollerFlagDay)) {
				if (condition1) {
					LOG.debug("rollflag = {},rolling", Constants.defaultTimeRollerFlagDay);
					close();
					open();
				}
			} else if (timeRollerFlag.equals(Constants.timeRollerFlagHour)) {
				if (condition2) {
					LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagHour);
					close();
					open();
				}
			} else if (timeRollerFlag.equals(Constants.timeRollerFlagMin)) {
				if (condition3) {
					LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagMin);
					close();
					open();
				}
			}
		}

以上的else部分即是修改的内容。


望各位不吝指教!!

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。