Flume 源码阅读

 

Flume架构

主要由3个组件,分别是Source,Channel和Sink,3个组件组成Event在Flume中得数据流向或者说流水线,功能可以由Flume的介绍看出:When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. 

源码中有对应的3个接口:

public interface Source extends LifecycleAware, NamedComponent {
  /**
   * Specifies which channel processor will handle this source‘s events.
   *
   * @param channelProcessor
   */
  public void setChannelProcessor(ChannelProcessor channelProcessor);

  /**
   * Returns the channel processor that will handle this source‘s events.
   */
  public ChannelProcessor getChannelProcessor();

}

从Source接口看,Source并没有与定义与Event有关的接口,它的接口只是对ChannelProcessor的get和set方法。Source通过获取对应的ChannelProcessor来完成消息的投递。

public interface Sink extends LifecycleAware, NamedComponent {
    public void setChannel(Channel channel);
    public Channel getChannel();
    public Status process() throws EventDeliveryException;
    public static enum Status {
        READY, BACKOFF
    }
}

Sink接口,也有与Channel有关的set和get方法,且是直接对应Channel,而不是Source那样对应ChannelProcessor。ChannelProcessor其实是位于Source和Channel之间的东西,相当于一个Event分发器,实现Source的mapping、replicating,optional机制(需要进一步看看代码)。 Sink之所以直接对应Channel,是因为任何一个Sink只从唯一的Channel中消费数据并发送到目标端。

public class ChannelProcessor implements Configurable {
  private final ChannelSelector selector;
  private final InterceptorChain interceptorChain;
  public ChannelProcessor(ChannelSelector selector) {
    this.selector = selector;
    this.interceptorChain = new InterceptorChain();
  }
  public void initialize() {
    interceptorChain.initialize();
  }
  public void close() {
    interceptorChain.close();
  }
  /**
   * The Context of the associated Source is passed.
   * @param context
   */
  @Override
  public void configure(Context context) {
    configureInterceptors(context);
  }
  public ChannelSelector getSelector() {
    return selector;
  }
  public void processEventBatch(List<Event> events) {
   。。。
  }
  public void processEvent(Event event) {
    。。。。
  }

从ChannelProcessor简略代码看主要就是实现mapping、replicating和optional机制的数据分发,以及拦截机制。

再看下Channel的接口源码:

public interface Channel extends LifecycleAware, NamedComponent {
  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
}

再看看官方描述:

A channel connects a Source to a Sink. The source acts as producer while the sink acts as a consumer of events. The channel itself is the buffer between the two.

A channel exposes a Transaction interface that can be used by its clients to ensure atomic put and take semantics. This is necessary to guarantee single hop reliability between agents. For instance, a source will successfully produce an event if and only if that event can be committed to the source‘s associated channel. Similarly, a sink will consume an event if and only if its respective endpoint can accept the event. The extent of transaction support varies for different channel implementations ranging from strong to best-effort semantics.

Flume uses a transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate in a transaction the storage/retrieval, respectively, of the events placed in or provided by a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop.

 从以上看出,Channel主要是存储Event,Event的存和取操作由Transaction控制。 后面再重点学习下Transaction

 

Transaction

Flume中的事务用来保证消息的可靠传递。

同样也先看下官方注释中关于Transaction的使用例子和介绍:

org.apache.flume.Transaction 
Provides the transaction boundary while accessing a channel
A Transaction instance is used to encompass channel access via the following idiom:

 Channel ch = ...
 Transaction tx = ch.getTransaction();
 try {
   tx.begin();
   ...
   // ch.put(event) or ch.take()
   ...
   tx.commit();
 } catch (ChannelException ex) {
   tx.rollback();
   ...
 } finally {
   tx.close();
 }
 
Depending upon the implementation of the channel, the transaction semantics may be strong, or best-effort only.
Transactions must be thread safe. To provide a guarantee of thread safe access to Transactions, see BasicChannelSemantics and BasicTransactionSemantics.

使用继承来自BasicChannelSemantics的Channel时,Flume强制在操作Channel时采用特定的程序结构,并且强制channel实现特定的方法以使得Channel本身可以应对存入或取出失败的情况,并且使得channel的使用者有可能根据操作是否成功采取适当的方法。

Channel好比是数据库,而事务类就好比为数据库事务,于是Flume通过事务来确保Source和Sink采用特定的方式访问Channel,以保证Channel状态的一致性。例如当一个事务中需要把一个batch的event全放入Channel时,需要确保这个操作是原子的,要不全放进去,要不一个不放。

MemoryChannel实现的关系图:

技术分享

AbstractChannel主要实现了NamedComponent、LifecycleAware和Configurable这几个基本的接口,还并没有与事务有关。

BasicChannelSemantics实现在在local-thread中保存一个BasicTransactionSemantics对象的功能。它对Channel接口中take和put方法的实现为:确保当前的线程中有Transaction的一个可用的实例,然后把take和put代理给本线程transaction对象的同名方法。

BasicTransactionSemantics确保了事务相关的操作只有按正确的顺序执行才可以。即tx.begin =》 channel.take/put =》 tx.commit =》  tx.close。它只保证了对Channel操作的顺序,由子类实现doBegin, doTake, doPut, doCommit, doRollback, doClose等方法。

因此BasicChannelSemantic类和BasicTransactionSemantics类一起保证了操作Channel的逻辑。提供了所有Channel的父类。事务中的各个操作的语义,则由BasicTransactionSemantics的子类去实现,即它的子类来说明事务开始时干嘛,事务回滚时干嘛,取出消息时干嘛、放入消息时干嘛等等。

 

BasicTransactionSemantic类

BasicChannelSemantics实现了基础的Channel语法,包括了Transaction的thread-local语法。每个线程都包括了一个唯一的Transaction对象,保证了事务的隔离性:

private ThreadLocal<BasicTransactionSemantics> currentTransaction
      = new ThreadLocal<BasicTransactionSemantics>();

在线程中获取当前线程中的事务通过getTransaction方法,它会调用BasicChannelSemantics中定义的的抽象方法createTransaction()来获取BasicTransactionSemantics的实例。

/**
   * <p>
   * Initializes the channel if it is not already, then checks to see
   * if there is an open transaction for this thread, creating a new
   * one via <code>createTransaction</code> if not.
   * @return the current <code>Transaction</code> object for the
   *     calling thread
   * </p>
   */
  @Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }
    BasicTransactionSemantics transaction = currentTransaction.get();
    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }
    return transaction;
  }
@Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

真正的存取Event,是由其代理的BaseTransactionSemantics对象完成。

 

BaseTransactionSemantics类

也先看官方描述:

org.apache.flume.channel.BasicTransactionSemantics

An implementation of basic Transaction semantics designed to work in concert with BasicChannelSemantics to simplify creation of robust Channel implementations. This class ensures that each transaction implementation method is called only while the transaction is in the correct state for that method, and only by the thread that created the transaction. Nested calls to begin()and close() are supported as long as they are balanced.

Subclasses need only implement doPutdoTakedoCommit, and doRollback, and the developer can rest assured that those methods are called only after transaction state preconditions have been properly met. doBegin and doClose may also be implemented if there is work to be done at those points.

All InterruptedException exceptions thrown from the implementations of the doXXX methods are automatically wrapped to become ChannelExceptions, but only after restoring the interrupted status of the thread so that any subsequent blocking method calls will themselves throw InterruptedException rather than blocking. The exception to this rule is doTake, which simply returns null instead of wrapping and propagating the InterruptedException, though it still first restores the interrupted status of the thread.

主要成员函数有:begin,commit,close, doBegin,doCommit,doClose,doPut,doTake,doRollback,getState,toString,put,take,rollback

其中put take begin close commit rollback 的结构都很相似。主要结构都是确保这些操作时Transaction在正确的对应状态,然后调用doXXX方法。如果当前的线程不拥有当前的事务或者事务的状态不对,就抛出异常。如果doXXX方法抛出InterruptedException就通过Thread.currentThread.interrupt()方法恢复当前线程的interrupted状态,然后将捕获的InterruptedException包装成一个ChannelException抛出:

 @Override
  public void rollback() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "rollback() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "rollback() called when transaction is %s!", state);

    state = State.COMPLETED;
    try {
      doRollback();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }

之前说过BasicChannelSemantic类对Channel接口定义的put和take的实现是代理给了自己线程中的BasicTransactionSemantic的put和get方法。而BasicTransactionSemantic中这两个方法,可以看看take实现:

  protected Event take() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "take() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "take() called when transaction is %s!", state);

    try {
      return doTake();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    }
  }

从代码中显然看到需要子类要实现的doTake和doPut方法。因此一个Channel的具体实现,其消息的处理逻辑是由自己的getTransaction()方法返回的Trasaction对象来实现的。具体可以再阅读MemoryChannel的内部类MemoryTransaction代码。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。