Flume Spool Source 源码过程分析(未运行)

主要涉及到的类:

 

SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中。SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取。文件切换等操作,都是reader去实现
内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取)
 
 1 @Override
 2     public void run() {
 3       int backoffInterval = 250;
 4       try {
 5         while (!Thread.interrupted()) {
 6           List<Event> events = reader.readEvents(batchSize);
 7           if (events.isEmpty()) {
 8             break;
 9           }
10           sourceCounter.addToEventReceivedCount(events.size());
11           sourceCounter.incrementAppendBatchReceivedCount();
12 
13           try {
14             getChannelProcessor().processEventBatch(events);
15             reader.commit();
16           } catch (ChannelException ex) {
17             logger.warn("The channel is full, and cannot write data now. The " +
18               "source will try again after " + String.valueOf(backoffInterval) +
19               " milliseconds");
20             hitChannelException = true;
21             if (backoff) {
22               TimeUnit.MILLISECONDS.sleep(backoffInterval);
23               backoffInterval = backoffInterval << 1;
24               backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
25                                 backoffInterval;
26             }
27             continue;
28           }
29           backoffInterval = 250;
30           sourceCounter.addToEventAcceptedCount(events.size());
31           sourceCounter.incrementAppendBatchAcceptedCount();
32         }
33         logger.info("Spooling Directory Source runner has shutdown.");
34       } catch (Throwable t) {
35         logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
36             "Uncaught exception in SpoolDirectorySource thread. " +
37             "Restart or reconfigure Flume to continue processing.", t);
38         hasFatalError = true;
39         Throwables.propagate(t);
40       }
41     }

 

 
ReliableSpoolingFileEventReader 定义在SpoolDirectorySource中的reader。看这个名字就知道碉堡了,reliable的,怎么实现reliable的??
reader的readEvent方法,会根据batchSize大小读取指定的Event
 
该方法的大致意思:
如果没有提交,如果当前文件空,错,否则获取EventDeserializer
如果已经提交,如果当前文件空,则获得下一个文件,之后,如果文件还是空,则返回空Event列表。
 
之后,调用EventDeserializer的readEvents。
 
 1  public List<Event> readEvents(int numEvents) throws IOException {
 2     if (!committed) {
 3       if (!currentFile.isPresent()) {
 4         throw new IllegalStateException("File should not roll when " +
 5             "commit is outstanding.");
 6       }
 7       logger.info("Last read was never committed - resetting mark position.");
 8       currentFile.get().getDeserializer().reset();
 9     } else {
10       // Check if new files have arrived since last call
11       if (!currentFile.isPresent()) {
12         currentFile = getNextFile();
13       }
14       // Return empty list if no new files
15       if (!currentFile.isPresent()) {
16         return Collections.emptyList();
17       }
18     }
19 
20     EventDeserializer des = currentFile.get().getDeserializer();
21     List<Event> events = des.readEvents(numEvents);
22 
23     /* It‘s possible that the last read took us just up to a file boundary.
24      * If so, try to roll to the next file, if there is one. */
25     if (events.isEmpty()) {
26       retireCurrentFile();
27       currentFile = getNextFile();
28       if (!currentFile.isPresent()) {
29         return Collections.emptyList();
30       }
31       events = currentFile.get().getDeserializer().readEvents(numEvents);
32     }
33 
34     //写入header值,略47 
48     committed = false;
49     lastFileRead = currentFile;
50     return events;
51   }

 在这个方法中,我们看到了

currentFile:该对象采用了谷歌的Optional进行封装,更加容易判断空指针等等。Optional<FileInfo>,该FileInfo封装了普通的File对象和针对该file对象的EventDeserializer(事件序列器)

该currentFile主要在ReliableSpoolingFileEventReader 类中的Optional<FileInfo> openFile(File file),Optional<FileInfo> getNextFile() 方法中调用。

 

EventDeserializer:事件序列器的主要作用在于定义一些读取的基本操作

技术分享

其中mark是读取的行position进行标记

 

EventDeserializer的实现子类,很多,这里只讲LineDeserializer,顾名思义,按照行去读取,一行就是一个Event

虽然EventDeserializer已经涉及到读取行了,但是真正读取记录的还不是他。

 

我们看openfile函数中

 1 String nextPath = file.getPath();
 2       PositionTracker tracker =
 3           DurablePositionTracker.getInstance(metaFile, nextPath);
 4       if (!tracker.getTarget().equals(nextPath)) {
 5         tracker.close();
 6         deleteMetaFile();
 7         tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
 8       }
15       ResettableInputStream in =
16           new ResettableFileInputStream(file, tracker,
17               ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
18               decodeErrorPolicy);
19       EventDeserializer deserializer = EventDeserializerFactory.getInstance
20           (deserializerType, deserializerContext, in);

 

 

因此可以看出EventDeserializer读取记录是靠 ResettableFileInputStream(in对象),ResettableFileInputStream的初始化需要File类和一个DurablePositionTracker,

因此,ResettableFileInputStream在读取File内容同时,使用DurablePositionTracker去记录position的信息。

DurablePositionTracker使用了apache avro来进行持久化

private final DataFileWriter<TransferStateFileMeta> writer;
private final DataFileReader<TransferStateFileMeta> reader;

 

这样,当我们使用EventDeserializer读取一个event的时候,就会从当前文件流中获取信息,同时也能够记录读取的位置信息。

当读取batchsize数量的event都正确处理后,ReliableSpoolingFileEventReader 会commit(),持久化位置信息

 public void commit() throws IOException {
    if (!committed && currentFile.isPresent()) {
      currentFile.get().getDeserializer().mark();
      committed = true;
    }
  }

 

这里的mark方法,调用

LineDeserializer的

 @Override
  public void mark() throws IOException {
    ensureOpen();
    in.mark();
  }

 

在调用ResettableFileInputStream(in)的mark方法

 @Override
  public void mark() throws IOException {
    tracker.storePosition(tell());
  }

 

之后调用位置tracker的storePostition方法(DurablePositionTracker)

 @Override
  public synchronized void storePosition(long position) throws IOException {
    metaCache.setOffset(position);
    writer.append(metaCache);
    writer.sync();
    writer.flush();
  }

 

之后,调用avro的DataFileWriter,完成写入操作。

 

最后,至于postition位置的持久化逻辑判断,基本也能猜到,当出现trash时候,从未读取的地方开始读取,等等,所以说,是ResettableFileInputStream的输入流,因为他能够读取信息,也能持久化读取的信息位置。

 

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。