Flume Spool Source 源码过程分析(未运行)
主要涉及到的类:
1 @Override 2 public void run() { 3 int backoffInterval = 250; 4 try { 5 while (!Thread.interrupted()) { 6 List<Event> events = reader.readEvents(batchSize); 7 if (events.isEmpty()) { 8 break; 9 } 10 sourceCounter.addToEventReceivedCount(events.size()); 11 sourceCounter.incrementAppendBatchReceivedCount(); 12 13 try { 14 getChannelProcessor().processEventBatch(events); 15 reader.commit(); 16 } catch (ChannelException ex) { 17 logger.warn("The channel is full, and cannot write data now. The " + 18 "source will try again after " + String.valueOf(backoffInterval) + 19 " milliseconds"); 20 hitChannelException = true; 21 if (backoff) { 22 TimeUnit.MILLISECONDS.sleep(backoffInterval); 23 backoffInterval = backoffInterval << 1; 24 backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : 25 backoffInterval; 26 } 27 continue; 28 } 29 backoffInterval = 250; 30 sourceCounter.addToEventAcceptedCount(events.size()); 31 sourceCounter.incrementAppendBatchAcceptedCount(); 32 } 33 logger.info("Spooling Directory Source runner has shutdown."); 34 } catch (Throwable t) { 35 logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " + 36 "Uncaught exception in SpoolDirectorySource thread. " + 37 "Restart or reconfigure Flume to continue processing.", t); 38 hasFatalError = true; 39 Throwables.propagate(t); 40 } 41 }
1 public List<Event> readEvents(int numEvents) throws IOException { 2 if (!committed) { 3 if (!currentFile.isPresent()) { 4 throw new IllegalStateException("File should not roll when " + 5 "commit is outstanding."); 6 } 7 logger.info("Last read was never committed - resetting mark position."); 8 currentFile.get().getDeserializer().reset(); 9 } else { 10 // Check if new files have arrived since last call 11 if (!currentFile.isPresent()) { 12 currentFile = getNextFile(); 13 } 14 // Return empty list if no new files 15 if (!currentFile.isPresent()) { 16 return Collections.emptyList(); 17 } 18 } 19 20 EventDeserializer des = currentFile.get().getDeserializer(); 21 List<Event> events = des.readEvents(numEvents); 22 23 /* It‘s possible that the last read took us just up to a file boundary. 24 * If so, try to roll to the next file, if there is one. */ 25 if (events.isEmpty()) { 26 retireCurrentFile(); 27 currentFile = getNextFile(); 28 if (!currentFile.isPresent()) { 29 return Collections.emptyList(); 30 } 31 events = currentFile.get().getDeserializer().readEvents(numEvents); 32 } 33 34 //写入header值,略47 48 committed = false; 49 lastFileRead = currentFile; 50 return events; 51 }
在这个方法中,我们看到了
currentFile:该对象采用了谷歌的Optional进行封装,更加容易判断空指针等等。Optional<FileInfo>,该FileInfo封装了普通的File对象和针对该file对象的EventDeserializer(事件序列器)
该currentFile主要在ReliableSpoolingFileEventReader 类中的Optional<FileInfo> openFile(File file),Optional<FileInfo> getNextFile() 方法中调用。
EventDeserializer:事件序列器的主要作用在于定义一些读取的基本操作
其中mark是读取的行position进行标记
EventDeserializer的实现子类,很多,这里只讲LineDeserializer,顾名思义,按照行去读取,一行就是一个Event
虽然EventDeserializer已经涉及到读取行了,但是真正读取记录的还不是他。
我们看openfile函数中
1 String nextPath = file.getPath(); 2 PositionTracker tracker = 3 DurablePositionTracker.getInstance(metaFile, nextPath); 4 if (!tracker.getTarget().equals(nextPath)) { 5 tracker.close(); 6 deleteMetaFile(); 7 tracker = DurablePositionTracker.getInstance(metaFile, nextPath); 8 } 15 ResettableInputStream in = 16 new ResettableFileInputStream(file, tracker, 17 ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset, 18 decodeErrorPolicy); 19 EventDeserializer deserializer = EventDeserializerFactory.getInstance 20 (deserializerType, deserializerContext, in);
因此可以看出EventDeserializer读取记录是靠 ResettableFileInputStream(in对象),ResettableFileInputStream的初始化需要File类和一个DurablePositionTracker,
因此,ResettableFileInputStream在读取File内容同时,使用DurablePositionTracker去记录position的信息。
DurablePositionTracker使用了apache avro来进行持久化
private final DataFileWriter<TransferStateFileMeta> writer;
private final DataFileReader<TransferStateFileMeta> reader;
这样,当我们使用EventDeserializer读取一个event的时候,就会从当前文件流中获取信息,同时也能够记录读取的位置信息。
当读取batchsize数量的event都正确处理后,ReliableSpoolingFileEventReader 会commit(),持久化位置信息
public void commit() throws IOException { if (!committed && currentFile.isPresent()) { currentFile.get().getDeserializer().mark(); committed = true; } }
这里的mark方法,调用
LineDeserializer的
@Override public void mark() throws IOException { ensureOpen(); in.mark(); }
在调用ResettableFileInputStream(in)的mark方法
@Override public void mark() throws IOException { tracker.storePosition(tell()); }
之后调用位置tracker的storePostition方法(DurablePositionTracker)
@Override public synchronized void storePosition(long position) throws IOException { metaCache.setOffset(position); writer.append(metaCache); writer.sync(); writer.flush(); }
之后,调用avro的DataFileWriter,完成写入操作。
最后,至于postition位置的持久化逻辑判断,基本也能猜到,当出现trash时候,从未读取的地方开始读取,等等,所以说,是ResettableFileInputStream的输入流,因为他能够读取信息,也能持久化读取的信息位置。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。