再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案。那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析。

核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占。对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法。比如有人喜欢简单、安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且是独享的,它的缺点是造成了资源浪费(比如你一个人需要占用洗衣机,电冰箱等其实你也没将它们高效得使用),因此你必须为这些资源的独享付出比较昂贵的代价;如果你不想那么浪费资源,有些东西跟别人共享一下也无所谓,并且你希望有较高的性价比,那么这种情况下,最好的选择其实是——合租。这也是自上次那篇文章之后,我对于自我实现的颠覆。这篇文章我将来探讨新的实现方式。

上次那篇文章中声明的那样,它不仅仅是针对这个消息总线的客户端,它是一种对其他通用技术组件都可参考的解决方案。

问题分析

首先来谈谈上次纠结的问题是什么?是connection的生命周期受限于Client主对象(其创建跟销毁都依赖Client主对象),而connection如果可以被同一个JVM进程内的多个client对象共享,那么它生命周期的控制权的归属问题。

想在一个JVM进程内共享一个RabbitMQ connection,我只能选择将其实现为一个singleton。对于这种实现方式,connection的生命周期是如何管理的?是通过Client主对象的open/close这一对方法操作的。第一个对client的open方法的调用会触发对connection的实例化,之后这个connection就一直被open,当调用close的时候,connection也一同被关闭(不知道有没有人会问,这里为什么要在调用close方法的时候关闭connection,这是因为在实现的时候肯定是先面向非多线程的实现,如果是单线程,client对象完成任务了,肯定要调用close方法,而此时也就必须要关闭connection,否则就没有调用的时机了)。但这里却牵扯到close方法调用权以及时机的问题,因为在多线程环境下每个Channel都创建于被共享的connection,如果某个client关闭了该connection那么其他正在使用中的channel将全部抛出异常。这也是,为什么在上次那篇文章中,我的思路转向每个Client独占connection的实现方式。

不过我还是希望找到一种解决方案来实现共享connection的模型。因为RabbitMQ 的channel这种多路复用的设计,已经为我们在多线程上复用一个connection提供了基础,如果每个Client独占connection,无疑是打破了最佳实践(redis的client jedis实现connection pool来独占connection是因为redis在通信时没有channel的概念)。

解决思路

上面那个问题最根本的症结在于connection对象的生命周期管理问题。connection对象是宿主在Client内部的。如果我们让它的创建、初始化与销毁只依赖对象池,不就没有这个问题了吗?与此同时,其实还有两个关键对象:pubsuberManager(用于获取一个配置中心的配置)、configManager(用于订阅配置中心的数据变更以及对配置进行解析),它们也可以一同被提出来在同一个池中共享同一实例。

技术实现

对象池用的是apache common pool提供的基础设施。对于上面几个共享对象的生命周期管理,这里在Pool中定义了两个方法:

-init:用于初始化connection等上述几个关键对象

-destroy:用于销毁init方法中的关键对象

init:

protected void init() {
        this.exchangeManager = new ExchangerManager(this.pubsuberHost, this.pubsuberPort);

        if (!this.exchangeManager.isPubsuberAlive())
            throw new RuntimeException("can not connect to pubsub server.");

        this.configManager = new ConfigManager();
        this.configManager.setExchangeManager(this.exchangeManager);
        this.poolId = RandomHelper.randomNumberAndCharacter(12);
        this.exchangeManager.registerWithMultiChannels(poolId, this.configManager, new String[]{
            Constants.PUBSUB_ROUTER_CHANNEL,
            Constants.PUBSUB_CONFIG_CHANNEL,
            Constants.PUBSUB_EVENT_CHANNEL,
            Constants.PUBSUB_SINK_CHANNEL,
            Constants.PUBSUB_CHANNEL_CHANNEL,
        });

        try {
            this.configManager.parseRealTimeData();

            String host = this.configManager.getClientConfigMap().get("messagebus.client.host").getValue();

            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(host);

            this.connection = connectionFactory.newConnection();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

destroy:

public void destroy() {
        this.innerPool.destroy();

        //release resource
        if (exchangeManager != null)
            exchangeManager.removeRegister(this.poolId);

        if (configManager != null)
            configManager.destroy();


        if (this.connection != null && this.connection.isOpen()) {
            try {
                this.connection.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

可以看到在上面destroy方法中,首先destroy真实的对象池之后,在最后才关闭了connection。这样, connection对象的生命周期跟pool的生命周期关联起来,而跟放入pool中的client对象没有关系,它们只需要获得打开后的connection对象来创建用于通信的Channel即可(现在只有Channel跟Client对象是一对一的关系)。

虽然我们已经将Client依赖的几个关键对象放到Pool中来构建,但我们还需要将他们传递给Client对象才行。两种常见的注入依赖对象的方式:构造器注入、setter方法注入在这里都不可行,因为我们不期望外部对象了解Client的细节。因此这里选择了不对外提供公共的注入点,而是将它们定义为client内部的私有实例字段,然后通过反射打开访问权限,注入后再关闭访问权限。

私有实例字段:

//inject by reflector
    private ExchangerManager exchangeManager;
    private ConfigManager    configManager;
    private Connection       connection;

反射注入实例的引用:

public PooledObject<Messagebus> makeObject() throws Exception {
        Constructor<Messagebus> privateCtor = Messagebus.class.getDeclaredConstructor();
        privateCtor.setAccessible(true);
        Messagebus client = privateCtor.newInstance();
        privateCtor.setAccessible(false);

        Class<?> superClient = Messagebus.class.getSuperclass();

        //set private field
        Field exchangeManagerField = superClient.getDeclaredField("exchangeManager");
        exchangeManagerField.setAccessible(true);
        exchangeManagerField.set(client, this.exchangeManager);
        exchangeManagerField.setAccessible(false);

        Field configManagerField = superClient.getDeclaredField("configManager");
        configManagerField.setAccessible(true);
        configManagerField.set(client, this.configManager);
        configManagerField.setAccessible(false);

        Field connectionField = superClient.getDeclaredField("connection");
        connectionField.setAccessible(true);
        connectionField.set(client, this.connection);
        connectionField.setAccessible(false);

多线程与单线程一致的模型

Pool的构建通常位于主线程上,它构建于所有Client之前,销毁于所有Client完成任务之后,因此connection的生命周期会长于任何一个Client的生命周期,而这些Client使用该connection创建的channel自然也都可以在Client关闭时被顺便关闭。无论是多线程环境,还是单线程环境, Client主对象的实现机制最好只有一套,如果机制不同,内部的实现也会截然不同,这样无论是从设计的优雅程度以及工作量上来看都不是好的设计方案。因此既然将connection的实例化提到Client主对象的外面,在单线程模型下也必须做出相应的改变。最终的方案是——单线程环境下也用对象池,只不过是容量为1的对象池,它的实现方式是:继承一般的对象池,然后配置池的maxTotal(对象的最大数目)为1,并且让调用者从外部无法修改池的参数配置。

技术分享

内置maxTotal为1的对象池配置:

public MessagebusSinglePool(String pubsuberHost, int pubsuberPort) {
        super(pubsuberHost,
              pubsuberPort,
              new DefaultMessagebusPoolConfig());
    }

    /**
     * inner class : single messagebus instance config
     */
    private static class DefaultMessagebusPoolConfig extends GenericObjectPoolConfig {

        public DefaultMessagebusPoolConfig() {
            setTestWhileIdle(false);
            setMaxTotal(1);
            setMinEvictableIdleTimeMillis(60000);
            setTimeBetweenEvictionRunsMillis(30000);
            setNumTestsPerEvictionRun(-1);
        }

    }

私有化构造器:

private Messagebus() {
        super();

        producer = new GenericProducer();
        consumer = new GenericConsumer();
        publisher = new GenericPublisher();
        subscriber = new GenericSubscriber();
        requester = new GenericRequester();
        responser = new GenericResponser();
        broadcaster = new GenericBroadcaster();
    }

单这样还不行,我们必须阻止用户能够创建client的所有途经,使得他们不能不从这个对象池中去获取对象,所以我们还需要将Client主对象的构造器设置为私有,然后在对象池中创建池化对象的方法中通过反射来创建对象:

public PooledObject<Messagebus> makeObject() throws Exception {
        Constructor<Messagebus> privateCtor = Messagebus.class.getDeclaredConstructor();
        privateCtor.setAccessible(true);
        Messagebus client = privateCtor.newInstance();
        privateCtor.setAccessible(false);

问题到这里还没有结束,在原来的方案中connection等几个关键对象的创建、销毁其实是依赖Client的两个关键API:open、close。现在这种模式下,这几个关键对象的生命周期,也不能再由这两个API来控制(因为被connection共享了),这两个API唯一可以控制的是跟每个Client主对象关联的Channel对象,但其实对象池中的对象应该是无状态或状态一致的,并且Apache pool自身也提供了对对象状态的管控方法:make/destroy/active等。因此为了让Client主对象状态一致(不至于出现某些Client对象的channel开着,某些被关闭了),我们有必要收回这两个API的对外可见性(访问标识符设置为:private)。然后被Pool统一调用。而对于这两个方法的调用,我们还是通过反射来调用:

获得方法的反射实例(Method对象的实例):

try {
            openMethod = Messagebus.class.getSuperclass().getDeclaredMethod("open");
            openMethod.setAccessible(true);
            closeMethod = Messagebus.class.getSuperclass().getDeclaredMethod("close");
            closeMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException(e);
        }

在线程安全以及一致的环境下统一改变状态(Pool内部触发以下方法):

public void destroyObject(PooledObject<Messagebus> pooledObject) throws Exception {
        Messagebus client = pooledObject.getObject();
        if (client != null) {
            if (client.isOpen()) {
                closeMethod.invoke(client);
            }
        }
    }

public void activateObject(PooledObject<Messagebus> pooledObject) throws Exception {
        Messagebus client = pooledObject.getObject();
        if (client != null) {
            if (!client.isOpen()) {
                openMethod.invoke(client);
            }
        }
    }

public void passivateObject(PooledObject<Messagebus> pooledObject) throws Exception {
        Messagebus client = pooledObject.getObject();
        if (client != null) {
            if (client.isOpen()) {
                closeMethod.invoke(client);
            }
        }
    }

现在,无论我们在单线程或多线程下使用Client,都是从对象池中获取,而Client现在只负责通过Channel跟RabbitMQ Server通信,其他的事情它都不需要操心了。

以上,三个使用反射的动机说明:

  • 动态注入Client主对象依赖的connection对象,对外不暴露访问接口;
  • 动态创建Client主对象,对外不暴露构造器;
  • 动态调用Client的open/close,对外部暴露方法;

模型优势

共享connection的最大优势是顺应了httpBridge的实现。消息总线除了提供了java Client的调用方式,还提供了一个跨平台的通信方案——HttpBridge。这种模式原理也很简单,就是利用http的平台无关性传递请求参数到后台,后台是servlet 容器,可以再次将环境转换为java,然后再次依靠java client进行调用,你可以将其视为一个client的代理。现在Client主对象都被池化后,就可以通过配置池中对象的大小来控制httpBridge部署节点的并发数。而且对RabbitMQ Server资源的占用(connection所消耗的TCP连接)比以前少得多,这种模式下一个部署实例才需要一个connection对象,而之前的那种模型在httpBridge是,有多少并发正在处理的请求就需要多少个connection。现在这种模型可以非常方便水平扩容。

完整实现代码请移步:banyan

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。