spring的RabbitTemplate 发送Message源码导读
1,首先业务方法调用RabbitTemplate的convertAndSend方法:(RabbitTemplate继承RabbitAccessor,实现了RabbitOperations和MessageListener接口
@Override public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException { convertAndSend(exchange, routingKey, object, (CorrelationData) null); }
2,convertAndSend调用自己的重载方法:
public void convertAndSend(String exchange, String routingKey, final Object message, final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException { Message messageToSend = convertMessageIfNecessary(message); messageToSend = messagePostProcessor.postProcessMessage(messageToSend); send(exchange, routingKey, messageToSend, correlationData); }
3,convertAndSend调用send方法:
public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException { execute(new ChannelCallback<Object>() { @Override public Object doInRabbit(Channel channel) throws Exception { doSend(channel, exchange, routingKey, message, correlationData); return null; } }); }
4,send调用excute方法进行消息发送:
@Override public <T> T execute(final ChannelCallback<T> action) { if (this.retryTemplate != null) { try { return this.retryTemplate.execute(new RetryCallback<T, Exception>() { @Override public T doWithRetry(RetryContext context) throws Exception { return RabbitTemplate.this.doExecute(action); } }); } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException) e; } throw RabbitExceptionTranslator.convertRabbitAccessException(e); } } else { return this.doExecute(action); } }
5,execute方法调用doExecute
private <T> T doExecute(ChannelCallback<T> action) { Assert.notNull(action, "Callback object must not be null"); RabbitResourceHolder resourceHolder = getTransactionalResourceHolder(); Channel channel = resourceHolder.getChannel(); if (this.confirmCallback != null || this.returnCallback != null) { addListener(channel); } try { if (logger.isDebugEnabled()) { logger.debug("Executing callback on RabbitMQ Channel: " + channel); } return action.doInRabbit(channel); } catch (Exception ex) { if (isChannelLocallyTransacted(channel)) { resourceHolder.rollbackAll(); } throw convertRabbitAccessException(ex); } finally { ConnectionFactoryUtils.releaseResources(resourceHolder); } }
6,doExcute调用execute方法
public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException { execute(new ChannelCallback<Object>() { @Override public Object doInRabbit(Channel channel) throws Exception { doSend(channel, exchange, routingKey, message, correlationData); return null; } }); }
7,execute调用doSend方法
protected void doSend(Channel channel, String exchange, String routingKey, Message message, CorrelationData correlationData) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Publishing message on exchange [" + exchange + "], routingKey = [" + routingKey + "]"); } if (exchange == null) { // try to send to configured exchange exchange = this.exchange; } if (routingKey == null) { // try to send to configured routing key routingKey = this.routingKey; } if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) { PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel; publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(), new PendingConfirm(correlationData, System.currentTimeMillis())); } boolean mandatory = this.returnCallback != null && this.mandatory; MessageProperties messageProperties = message.getMessageProperties(); if (mandatory) { messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION, this.uuid); } BasicProperties convertedMessageProperties = this.messagePropertiesConverter .fromMessageProperties(messageProperties, encoding); channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody()); // Check if commit needed if (isChannelLocallyTransacted(channel)) { // Transacted channel created by this template -> commit. RabbitUtils.commitIfNecessary(channel); } }
8,调用CachingConnectionFactory的invoke方法
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (methodName.equals("txSelect") && !this.transactional) { throw new UnsupportedOperationException("Cannot start transaction on non-transactional channel"); } if (methodName.equals("equals")) { // Only consider equal when proxies are identical. return (proxy == args[0]); } else if (methodName.equals("hashCode")) { // Use hashCode of Channel proxy. return System.identityHashCode(proxy); } else if (methodName.equals("toString")) { return "Cached Rabbit Channel: " + this.target; } else if (methodName.equals("close")) { // Handle close method: don't pass the call on. if (active) { synchronized (this.channelList) { if (!RabbitUtils.isPhysicalCloseRequired() && this.channelList.size() < getChannelCacheSize()) { logicalClose((ChannelProxy) proxy); // Remain open in the channel list. return null; } } } // If we get here, we're supposed to shut down. physicalClose(); return null; } else if (methodName.equals("getTargetChannel")) { // Handle getTargetChannel method: return underlying Channel. return this.target; } else if (methodName.equals("isOpen")) { // Handle isOpen method: we are closed if the target is closed return this.target != null && this.target.isOpen(); } try { if (this.target == null || !this.target.isOpen()) { this.target = null; } synchronized (targetMonitor) { if (this.target == null) { this.target = createBareChannel(theConnection, transactional); } return method.invoke(this.target, args); } } catch (InvocationTargetException ex) { if (this.target == null || !this.target.isOpen()) { // Basic re-connection logic... this.target = null; if (logger.isDebugEnabled()) { logger.debug("Detected closed channel on exception. Re-initializing: " + target); } synchronized (targetMonitor) { if (this.target == null) { this.target = createBareChannel(theConnection, transactional); } } } throw ex.getTargetException(); } }
9,调用rabbitmq的ChannelN类的basicPublish方法(继承AMQChannel,实现接口com.rabbitmq.client.Channel)
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException { if (nextPublishSeqNo > 0) { unconfirmedSet.add(getNextPublishSeqNo()); nextPublishSeqNo++; } BasicProperties useProps = props; if (props == null) { useProps = MessageProperties.MINIMAL_BASIC; } transmit(new AMQCommand(new Basic.Publish.Builder() .exchange(exchange) .routingKey(routingKey) .mandatory(mandatory) .immediate(immediate) .build(), useProps, body)); }
10,调用AMQChannel的transmit方法
public void transmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { ensureIsOpen(); quiescingTransmit(c); } }
11,调用quiescingTransmit方法
public void quiescingTransmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { if (c.getMethod().hasContent()) { while (_blockContent) { try { _channelMutex.wait(); } catch (InterruptedException e) {} // This is to catch a situation when the thread wakes up during // shutdown. Currently, no command that has content is allowed // to send anything in a closing state. ensureIsOpen(); } } c.transmit(this); } }
12,调用AMQCommand的transmit方法
public void transmit(AMQChannel channel) throws IOException { int channelNumber = channel.getChannelNumber(); AMQConnection connection = channel.getConnection(); synchronized (assembler) { Method m = this.assembler.getMethod(); connection.writeFrame(m.toFrame(channelNumber)); if (m.hasContent()) { byte[] body = this.assembler.getContentBody(); connection.writeFrame(this.assembler.getContentHeader() .toFrame(channelNumber, body.length)); int frameMax = connection.getFrameMax(); int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE; for (int offset = 0; offset < body.length; offset += bodyPayloadMax) { int remaining = body.length - offset; int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength); connection.writeFrame(frame); } } } connection.flush(); }
13,调用AMQConnection的writeFrame方法送数据
public void writeFrame(Frame f) throws IOException { _frameHandler.writeFrame(f); _heartbeatSender.signalActivity(); }
14,调用SocketFrameHandler的方法writeFrame的方法,完成发送
/** * Public API - writes this Frame to the given DataOutputStream */ public void writeTo(DataOutputStream os) throws IOException { os.writeByte(type); os.writeShort(channel); if (accumulator != null) { os.writeInt(accumulator.size()); accumulator.writeTo(os); } else { os.writeInt(payload.length); os.write(payload); } os.write(AMQP.FRAME_END); }
附图:
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。