Quartz.NET学习系列(十二)--- Quartz.NET集群

        Quartz.NET提供了集群的配置,这个集群并不能达到负载均衡的效果,而仅仅提供了故障转移的功能,主要场景是:两个节点的Quartz.NET任务服务器同时连接到另一个节点的数据库服务器,当一个任务服务器宕机时,另一个任务服务器会启动。

       集群的主要配置如下:

properties["quartz.jobStore.clustered"] = "true";

总的代码如下:
    [PersistJobDataAfterExecution]
    [DisallowConcurrentExecution]
    public class SimpleRecoveryJob : IJob
    {
        private static readonly ILog log = LogManager.GetLogger(typeof (SimpleRecoveryJob));
        public const string Count = "count";


        public virtual void Execute(IJobExecutionContext context)
        {
            JobKey jobKey = context.JobDetail.Key;
            if (context.Recovering)
            {
                log.InfoFormat("SimpleRecoveryJob: {0} 恢复时间 at {1}", jobKey, DateTime.Now.ToString());
            }
            else
            {
                log.InfoFormat("SimpleRecoveryJob: {0} 开始执行 {1}", jobKey, DateTime.Now.ToString());
            }

            int delay = 10*1000;
            Thread.Sleep(delay);

            JobDataMap data = context.JobDetail.JobDataMap;
            int count;
            if (data.ContainsKey(Count))
            {
                count = data.GetInt(Count);
            }
            else
            {
                count = 0;
            }
            count++;
            data.Put(Count, count);

            log.InfoFormat("SimpleRecoveryJob: {0} 完成 {1}\n 执行第 #{2}次", jobKey, DateTime.Now.ToString(), count);
        }
    }


    public class ClusterExample 
    {
        private static ILog log = LogManager.GetLogger(typeof (ClusterExample));

        public static void Run(bool inClearJobs, bool inScheduleJobs)
        {
            NameValueCollection properties = new NameValueCollection();

            properties["quartz.scheduler.instanceName"] = "TestScheduler";
            properties["quartz.scheduler.instanceId"] = "instance_one";
            properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";
            properties["quartz.threadPool.threadCount"] = "5";
            properties["quartz.threadPool.threadPriority"] = "Normal";
            properties["quartz.jobStore.misfireThreshold"] = "60000";
            properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
            properties["quartz.jobStore.useProperties"] = "false";
            properties["quartz.jobStore.clustered"] = "true";
            // SQLit必须添加如下配置
            // properties["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz";
            //存储类型
            properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
            //表明前缀
            properties["quartz.jobStore.tablePrefix"] = "QRTZ_";
            //驱动类型
            properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz";
            //数据源名称
            properties["quartz.jobStore.dataSource"] = "myDS";
            //连接字符串
            properties["quartz.dataSource.myDS.connectionString"] = @"Data Source=(local);Initial Catalog=quartz;User ID=sa;Password=123";
            //sqlserver版本
            properties["quartz.dataSource.myDS.provider"] = "SqlServer-20";

            ISchedulerFactory sf = new StdSchedulerFactory(properties);
            IScheduler sched = sf.GetScheduler();

            if (inClearJobs)
            {
                log.Info("正在删除所有记录");
                sched.Clear();
            }


            if (inScheduleJobs)
            {
                string schedId = sched.SchedulerInstanceId;
                int count = 1;

                IJobDetail job = JobBuilder.Create<SimpleRecoveryJob>()
                    .WithIdentity("job_" + count, schedId) 
                    .RequestRecovery() //故障恢复
                    .Build();
                job.JobDataMap.Put(SimpleRecoveryJob.Count,1);
                ISimpleTrigger trigger = (ISimpleTrigger) TriggerBuilder.Create()
                                                              .WithIdentity("triger_" + count, schedId)
                                                              .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second))
                                                              .WithSimpleSchedule(x =>x.RepeatForever().WithInterval(TimeSpan.FromSeconds(5)))
                                                              .Build();

                log.InfoFormat("{0} 开始时间 {1} 重复次数 {2}, 重复间隔 {3}", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval.TotalSeconds);

                sched.ScheduleJob(job, trigger);
            }


            sched.Start();
            log.Info("------- 开始计划 ----------------");
            log.Info("------- 等待1小时 ----------");
            Thread.Sleep(TimeSpan.FromHours(1));
            sched.Shutdown();
            log.Info("------- 结束计划 ----------------");
        }

        public static void Run()
        {
            ClusterExample.Run(false, false);
        }
    }

当一个任务服务器宕机时,另一台任务服务器启动会重新从数据库中找到上次执行失败前的状态重新开始执行。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。