【Flume】【*】深入flume-ng的三大组件——source,channel,sink
概览
1、Source
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {
/**
* Specifies which channel processor will handle this source‘s events.
*
* @param channelProcessor
*/
public void setChannelProcessor(ChannelProcessor channelProcessor);
/**
* Returns the channel processor that will handle this source‘s events.
*/
public ChannelProcessor getChannelProcessor();
}
|
该source继承了两个类
1、NamedComponent
2、LifecycleAware
Source接口的直接实现类是AbstractSource抽象类
public enum LifecycleState {
IDLE, START, STOP, ERROR;
public static final LifecycleState[] START_OR_ERROR = new LifecycleState[] {
START, ERROR };
public static final LifecycleState[] STOP_OR_ERROR = new LifecycleState[] {
STOP, ERROR };
}
|
public void start() {
logger.info("Exec source starting with command:{}", command);
executor = Executors.newSingleThreadExecutor();
runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
// FIXME: Use a callback-like executor / future to signal us upon failure.
runnerFuture = executor.submit(runner);
/*
* NB: This comes at the end rather than the beginning of the method because
* it sets our state to running. We want to make sure the executor is alive
* and well first.
*/
sourceCounter.start();
super.start();
logger.debug("Exec source started");
}
|
2、channel
Channel接口定义如下:
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {
/**
* <p>Puts the given event into the channel.</p>
* <p><strong>Note</strong>: This method must be invoked within an active
* {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
* @param event the event to transport.
* @throws ChannelException in case this operation fails.
* @see org.apache.flume.Transaction#begin()
*/
public void put(Event event) throws ChannelException;
/**
* <p>Returns the next event from the channel if available. If the channel
* does not have any events available, this method must return {@code null}.
* </p>
* <p><strong>Note</strong>: This method must be invoked within an active
* {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
* @return the next available event or {@code null} if no events are
* available.
* @throws ChannelException in case this operation fails.
* @see org.apache.flume.Transaction#begin()
*/
public Event take() throws ChannelException;
/**
* @return the transaction instance associated with this channel.
*/
public Transaction getTransaction();
}
|
public interface Transaction {
public enum TransactionState {Started, Committed, RolledBack, Closed };
/**
* <p>Starts a transaction boundary for the current channel operation. If a
* transaction is already in progress, this method will join that transaction
* using reference counting.</p>
* <p><strong>Note</strong>: For every invocation of this method there must
* be a corresponding invocation of {@linkplain #close()} method. Failure
* to ensure this can lead to dangling transactions and unpredictable results.
* </p>
*/
public void begin();
/**
* Indicates that the transaction can be successfully committed. It is
* required that a transaction be in progress when this method is invoked.
*/
public void commit();
/**
* Indicates that the transaction can must be aborted. It is
* required that a transaction be in progress when this method is invoked.
*/
public void rollback();
/**
* <p>Ends a transaction boundary for the current channel operation. If a
* transaction is already in progress, this method will join that transaction
* using reference counting. The transaction is completed only if there
* are no more references left for this transaction.</p>
* <p><strong>Note</strong>: For every invocation of this method there must
* be a corresponding invocation of {@linkplain #begin()} method. Failure
* to ensure this can lead to dangling transactions and unpredictable results.
* </p>
*/
public void close();
}
|
基本事务语义抽象类是对它的实现BasicTransactionSemantics
protected void doBegin() throws InterruptedException {}
protected abstract void doPut(Event event) throws InterruptedException;
protected abstract Event doTake() throws InterruptedException;
protected abstract void doCommit() throws InterruptedException;
protected abstract void doRollback() throws InterruptedException;
protected void doClose() {}
|
1、doBegin
2、doPut
private final LinkedBlockingDeque<FlumeEventPointer> takeList;
private final LinkedBlockingDeque<FlumeEventPointer> putList;
|
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
if(putList.remainingCapacity() == 0) {
throw new ChannelException("Put queue for FileBackedTransaction " +
"of capacity " + putList.size() + " full, consider " +
"committing more frequently, increasing capacity or " +
"increasing thread count. " + channelNameDescriptor);
}
// this does not need to be in the critical section as it does not
// modify the structure of the log or queue.
if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
throw new ChannelFullException("The channel has reached it‘s capacity. "
+ "This might be the result of a sink on the channel having too "
+ "low of batch size, a downstream system running slower than "
+ "normal, or that the channel capacity is just too low. "
+ channelNameDescriptor);
}
boolean success = false;
log.lockShared();
try {
FlumeEventPointer ptr = log.put(transactionID, event);
Preconditions.checkState(putList.offer(ptr), "putList offer failed "
+ channelNameDescriptor);
queue.addWithoutCommit(ptr, transactionID);
success = true;
} catch (IOException e) {
throw new ChannelException("Put failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
if(!success) {
// release slot obtained in the case
// the put fails for any reason
queueRemaining.release();
}
}
}
|
1、检查队列的剩余空间
2、keepAlive秒时间内获取一个共享信号量,说明put的过程是互斥的
3、FlumeEventPointer是用来做检查点机制的,因为这是文件通道,会用日志记录的
4、释放共享信号量
3、doTake
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for FileBackedTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count. "
+ channelNameDescriptor);
}
log.lockShared();
/*
* 1. Take an event which is in the queue.
* 2. If getting that event does not throw NoopRecordException,
* then return it.
* 3. Else try to retrieve the next event from the queue
* 4. Repeat 2 and 3 until queue is empty or an event is returned.
*/
try {
while (true) {
FlumeEventPointer ptr = queue.removeHead(transactionID);
if (ptr == null) {
return null;
} else {
try {
// first add to takeList so that if write to disk
// fails rollback actually does it‘s work
Preconditions.checkState(takeList.offer(ptr),
"takeList offer failed "
+ channelNameDescriptor);
log.take(transactionID, ptr); // write take to disk
Event event = log.get(ptr);
return event;
} catch (IOException e) {
throw new ChannelException("Take failed due to IO error "
+ channelNameDescriptor, e);
} catch (NoopRecordException e) {
LOG.warn("Corrupt record replaced by File Channel Integrity " +
"tool found. Will retrieve next event", e);
takeList.remove(ptr);
} catch (CorruptEventException ex) {
if (fsyncPerTransaction) {
throw new ChannelException(ex);
}
LOG.warn("Corrupt record found. Event will be " +
"skipped, and next event will be read.", ex);
takeList.remove(ptr);
}
}
}
} finally {
log.unlockShared();
}
}
|
1、剩余容量检查
2、检查点机制,日志记录操作,从头部取event
3、从takeList中删除该event
4、doCommit
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if(puts > 0) {
Preconditions.checkState(takes == 0, "nonzero puts and takes "
+ channelNameDescriptor);
log.lockShared();
try {
log.commitPut(transactionID);
channelCounter.addToEventPutSuccessCount(puts);
synchronized (queue) {
while(!putList.isEmpty()) {
if(!queue.addTail(putList.removeFirst())) {
StringBuilder msg = new StringBuilder();
msg.append("Queue add failed, this shouldn‘t be able to ");
msg.append("happen. A portion of the transaction has been ");
msg.append("added to the queue but the remaining portion ");
msg.append("cannot be added. Those messages will be consumed ");
msg.append("despite this transaction failing. Please report.");
msg.append(channelNameDescriptor);
LOG.error(msg.toString());
Preconditions.checkState(false, msg.toString());
}
}
queue.completeTransaction(transactionID);
}
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
}
} else if (takes > 0) {
log.lockShared();
try {
log.commitTake(transactionID);
queue.completeTransaction(transactionID);
channelCounter.addToEventTakeSuccessCount(takes);
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
}
queueRemaining.release(takes);
}
putList.clear();
takeList.clear();
channelCounter.setChannelSize(queue.getSize());
}
|
1、如果putList不为空,提交的是放入通道的事件数量
2、如果takeList不为空,提交的是从通道拿走的事件数量
5、doRollback
protected void doRollback() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
log.lockShared();
try {
if(takes > 0) {
Preconditions.checkState(puts == 0, "nonzero puts and takes "
+ channelNameDescriptor);
synchronized (queue) {
while (!takeList.isEmpty()) {
Preconditions.checkState(queue.addHead(takeList.removeLast()),
"Queue add failed, this shouldn‘t be able to happen "
+ channelNameDescriptor);
}
}
}
putList.clear();
takeList.clear();
queue.completeTransaction(transactionID);
channelCounter.setChannelSize(queue.getSize());
log.rollback(transactionID);
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
// since rollback is being called, puts will never make it on
// to the queue and we need to be sure to release the resources
queueRemaining.release(puts);
}
}
|
3、sink
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
/**
* <p>Sets the channel the sink will consume from</p>
* @param channel The channel to be polled
*/
public void setChannel(Channel channel);
/**
* @return the channel associated with this sink
*/
public Channel getChannel();
/**
* <p>Requests the sink to attempt to consume data from attached channel</p>
* <p><strong>Note</strong>: This method should be consuming from the channel
* within the bounds of a Transaction. On successful delivery, the transaction
* should be committed, and on failure it should be rolled back.
* @return READY if 1 or more Events were successfully delivered, BACKOFF if
* no data could be retrieved from the channel feeding this sink
* @throws EventDeliveryException In case of any kind of failure to
* deliver data to the next hop destination.
*/
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}
|
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketWriter> writers = Lists.newArrayList();
transaction.begin();
…………………………
transaction.commit();
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}
|
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。