Spring Integration - 自动轮询发送手机短信

 

Spring Integration 配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-jpa="http://www.springframework.org/schema/integration/jpa"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/context    http://www.springframework.org/schema/context/spring-context.xsd            
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/jpa http://www.springframework.org/schema/integration/jpa/spring-integration-jpa-2.2.xsd
            ">    

    <int-jpa:inbound-channel-adapter
        auto-startup="true" entity-manager="em"
        send-timeout="60000" channel="process.channel"
        expect-single-result="true"
        jpa-query="SELECT sysdate FROM dual">
        <int:poller fixed-delay="60000" />
    </int-jpa:inbound-channel-adapter>
        
    <int:channel id="process.channel">
        <int:queue capacity="1"/>        
    </int:channel>

    <int:chain input-channel="process.channel">    
        
        <int-jpa:retrieving-outbound-gateway entity-manager="em" jpa-query="SELECT sp FROM SmsMessage sp Where sp.tatus is null order by sp.requestOn,sp.id"/>    
                
        <int:splitter ref="process.processSplitter" method="split"/>
        
        <int:service-activator ref="process.smsSenderService"
            method="send" />
            
        <int:poller fixed-delay="5000" receive-timeout="-1"/>            
    </int:chain>        
    
    <bean id="process.smsSenderService" class="com.yd.core.service.SmsSenderService" />        
    
    <bean id="process.processSplitter" class="com.yd.core.service.PaymentProcessSplitter" />
</beans>

Job Worker

import org.springframework.context.ApplicationContext;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;

public class JobWorker implements Runnable {

    private static final int DEFAULT_WAIT_TIME = 3000;

    @Override
    public void run() {
        while (true) {
            try {
                LoggerUtil.getJobLogger().info("JobWorker, Ready for take job run request.");

                JobRunnerRequest jobRequest = JobManagerService.getJobManager().takeRequest();
                while (jobRequest == null) {
                    LoggerUtil.getJobLogger().warn("JobWorker, jobRequest is null, will try to get the job requet again.");
                    Thread.sleep(DEFAULT_WAIT_TIME);
                    jobRequest = JobManagerService.getJobManager().takeRequest();
                }

                LoggerUtil.getJobLogger().info("JobWorker, Received a job run request.");

                MessageChannel channel = findChannel(jobRequest.getJobChannelId());
                if (channel != null) {
                    channel.send(MessageBuilder.withPayload(jobRequest.getJobMessagePayload()).build());
                    LoggerUtil.getJobLogger().info("JobWorker, Completed to sned message to job channel");
                }
            }
            catch (Exception ex) {
                LoggerUtil.getJobLogger().warn("JobWorker, Completed to sned message to job channel");
            }
        }
    }

    private MessageChannel findChannel(String jobChannelId) {
        ApplicationContext context = ApplicationContextProvider.getContext();
        if (context == null) {
            LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the application context, to startup job %s", jobChannelId));
            return null;
        }

        Object channel = context.getBean(jobChannelId);
        if (channel instanceof MessageChannel) {
            return (MessageChannel) channel;
        }
        else {
            LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the message bean, to startup job %s", jobChannelId));
            return null;
        }
    }
}

JobManagerService

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public final class JobManagerService {
    private BlockingQueue<JobRunnerRequest> jobRequestQueue = new LinkedBlockingQueue<JobRunnerRequest>();
    private static volatile  JobManagerService jobManagerInstnce;
    private static Object objSyncLocker = new Object();

    private JobManagerService() {
    }

    private void startupWorker() {
        new Thread(new JobWorker()).start();
    }

    public static JobManagerService getJobManager() {
        if (jobManagerInstnce == null) {
            synchronized (objSyncLocker) {
                if (jobManagerInstnce == null) {
                    jobManagerInstnce = new JobManagerService();
                    jobManagerInstnce.startupWorker();
                }
            }
        }
        return jobManagerInstnce;
    }

    public void addRequest(JobRunnerRequest request) {
        try {
            jobRequestQueue.put(request);
        }
        catch (InterruptedException e) {
            LoggerUtil.getJobLogger().warn(e.getMessage(), e);
        }
    }

    public JobRunnerRequest takeRequest() {
        try {
            return jobRequestQueue.take();
        }
        catch (InterruptedException e) {
            LoggerUtil.getJobLogger().warn(e.getMessage(), e);
            return null;
        }
    }
}

 

ApplicatonContextProvider

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class ApplicationContextProvider implements ApplicationContextAware {

    private static volatile ApplicationContext ctx;

    public static ApplicationContext getContext() {
        return ctx;
    }

    private static synchronized void setContext(ApplicationContext applicationContext) {
        ctx = applicationContext;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext){
        setContext(applicationContext);
    }
}

 

Spring Integration - 自动轮询发送手机短信,,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。