参考文档:概览 :: ElasticJobhttps://shardingsphere.apache.org/elasticjob/current/cn/overview/

elasticjob能做到同一类定时任务,几个定时任务分片处理提高处理效率,

1.pom引入:

<!--  elastic-job dependency -->
   <dependency>
     <groupId>com.dangdang</groupId>
     <artifactId>elastic-job-lite-core</artifactId>
     <version>${elastic-job.version}</version>
   </dependency>
   <dependency>
     <groupId>com.dangdang</groupId>
     <artifactId>elastic-job-lite-spring</artifactId>
     <version>${elastic-job.version}</version>
   </dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.bfxy</groupId>
	<artifactId>es-job</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>es-job</name>
	<description>es-job</description>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<elastic-job.version>2.1.4</elastic-job.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		
		<!--  elastic-job dependency -->
	    <dependency>
	      <groupId>com.dangdang</groupId>
	      <artifactId>elastic-job-lite-core</artifactId>
	      <version>${elastic-job.version}</version>
	    </dependency>
	    <dependency>
	      <groupId>com.dangdang</groupId>
	      <artifactId>elastic-job-lite-spring</artifactId>
	      <version>${elastic-job.version}</version>
	    </dependency>
	
	    <!-- spring boot dependency -->
	
	    <dependency>
	      	<groupId>org.springframework.boot</groupId>
	      	<artifactId>spring-boot-starter-web</artifactId>
	    </dependency>
	    <dependency>
	      	<groupId>org.springframework.boot</groupId>
	      	<artifactId>spring-boot-starter-actuator</artifactId>
	    </dependency>
	    <dependency>
	      	<groupId>org.springframework.boot</groupId>
	      	<artifactId>spring-boot-configuration-processor</artifactId>
	      	<optional>true</optional>
	    </dependency>

	    <dependency>
	      	<groupId>org.springframework.boot</groupId>
	      	<artifactId>spring-boot-starter-jdbc</artifactId>
	    </dependency>
	    <dependency>
	      	<groupId>mysql</groupId>
	      	<artifactId>mysql-connector-java</artifactId>
	    </dependency>
	    <dependency>
	      	<groupId>org.springframework.boot</groupId>
	      	<artifactId>spring-boot-starter-data-jpa</artifactId>
	    </dependency>

	</dependencies>

	<build>
		<finalName>es-job</finalName>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

2.配置文件:

server.port=8881

zookeeper.address=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
zookeeper.namespace=elastic-job
zookeeper.connectionTimeout=10000
zookeeper.sessionTimeout=10000
zookeeper.maxRetries=3


##simpleJob.cron=0/10 * * * * ?
simpleJob.cron=00 03 21 * * ?
simpleJob.shardingTotalCount=5
simpleJob.shardingItemParameters=0=beijing,1=shanghai,2=changchun,3=changsha,4=hangzhou
simpleJob.jobParameter=source1=public,source2=private
simpleJob.failover=true
simpleJob.monitorExecution=true
simpleJob.monitorPort=8889
simpleJob.maxTimeDiffSeconds=-1
simpleJob.jobShardingStrategyClass=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy

dataflowJob.cron=0/10 * * * * ?
dataflowJob.shardingTotalCount=2
dataflowJob.shardingItemParameters=0=Beijing,1=Shanghai


spring.datasource.url=jdbc:mysql://localhost:3306/elasticjob?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root 


3.解析配置文件ZookeeperRegistryCenter注册中心

@ConditionalOnExpression("'${zookeeper.address}'.length() > 0")当address长度大于0时候才会注册注册中心
package com.bfxy.esjob.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

@Configuration
@ConditionalOnExpression("'${zookeeper.address}'.length() > 0")
public class RegistryCenterConfig {
	
	/**
	 * 	把注册中心加载到spring 容器中
	 * @return
	 */
	@Bean(initMethod = "init")
	public ZookeeperRegistryCenter registryCenter(@Value("${zookeeper.address}") final String serverLists, 
			@Value("${zookeeper.namespace}") final String namespace, 
			@Value("${zookeeper.connectionTimeout}") final int connectionTimeout, 
			@Value("${zookeeper.sessionTimeout}") final int sessionTimeout,
			@Value("${zookeeper.maxRetries}") final int maxRetries) {
		ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverLists, namespace);
		zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeout);
		zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeout);
		zookeeperConfiguration.setMaxRetries(maxRetries);
		
		return new ZookeeperRegistryCenter(zookeeperConfiguration);
		
	}
	
	
}

4.简单用例:

package com.bfxy.esjob.task;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

public class MySimpleJob implements SimpleJob {

	@Override
	public void execute(ShardingContext shardingContext) {

		System.err.println("---------开始任务---------");
		
		System.err.println(shardingContext.getJobName());
		System.err.println(shardingContext.getJobParameter());
		System.err.println(shardingContext.getShardingItem());
		System.err.println(shardingContext.getShardingParameter());
		System.err.println(shardingContext.getShardingTotalCount());
		System.err.println("当前线程 : ---------------" + Thread.currentThread().getName());
		
		System.err.println("----------结束任务------");
	}
	
	
	
}

5,配置定时任务MySimpleJobConfig

##simpleJob.cron=0/10 * * * * ?
simpleJob.cron=00 03 21 * * ?
simpleJob.shardingTotalCount=5
simpleJob.shardingItemParameters=0=beijing,1=shanghai,2=changchun,3=changsha,4=hangzhou
simpleJob.jobParameter=source1=public,source2=private
simpleJob.failover=true
simpleJob.monitorExecution=true
simpleJob.monitorPort=8889
simpleJob.maxTimeDiffSeconds=-1
simpleJob.jobShardingStrategyClass=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
package com.bfxy.esjob.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.bfxy.esjob.listener.SimpleJobListener;
import com.bfxy.esjob.task.MySimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

@Configuration
public class MySimpleJobConfig {

	@Autowired
	private ZookeeperRegistryCenter registryCenter;
	
	@Autowired
	private JobEventConfiguration jobEventConfiguration;
	
	@Bean
	public SimpleJob simpleJob() {
		return new MySimpleJob();
	}
	/**
	 * @param simpleJob
	 * @return
	 */
	@Bean(initMethod = "init")
	public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
			@Value("${simpleJob.cron}") final String cron,
			@Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
			@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters,
			@Value("${simpleJob.jobParameter}") final String jobParameter,
			@Value("${simpleJob.failover}") final boolean failover,
			@Value("${simpleJob.monitorExecution}") final boolean monitorExecution,
			@Value("${simpleJob.monitorPort}") final int monitorPort,
			@Value("${simpleJob.maxTimeDiffSeconds}") final int maxTimeDiffSeconds,
			@Value("${simpleJob.jobShardingStrategyClass}") final String jobShardingStrategyClass) {
		
		return new SpringJobScheduler(simpleJob,
				registryCenter,
				getLiteJobConfiguration(simpleJob.getClass(),
						cron,
						shardingTotalCount,
						shardingItemParameters,
						jobParameter,
						failover,
						monitorExecution,
						monitorPort,
						maxTimeDiffSeconds,
						jobShardingStrategyClass),
				jobEventConfiguration,
				new SimpleJobListener());
		
	}
	
	
	private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
			int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
			boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {

		JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
				.newBuilder(jobClass.getName(), cron, shardingTotalCount)
				.misfire(true)
				.failover(failover)
				.jobParameter(jobParameter)
				.shardingItemParameters(shardingItemParameters)
				.build();
		
		SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
		
		LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
				.jobShardingStrategyClass(jobShardingStrategyClass)
				.monitorExecution(monitorExecution)
				.monitorPort(monitorPort)
				.maxTimeDiffSeconds(maxTimeDiffSeconds)
				.overwrite(false)
				.build();
		
		return liteJobConfiguration;
	}
	
	
}

6,监听:执行任务前后

package com.bfxy.esjob.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;

public class SimpleJobListener implements ElasticJobListener {

	private static Logger LOGGER = LoggerFactory.getLogger(SimpleJobListener.class);
	
	@Override
	public void beforeJobExecuted(ShardingContexts shardingContexts) {
		LOGGER.info("-----------------执行任务之前:{}", shardingContexts);
	}

	@Override
	public void afterJobExecuted(ShardingContexts shardingContexts) {
		LOGGER.info("-----------------执行任务之后:{}", shardingContexts);		
	}
	
}

Logo

鸿蒙生态一站式服务平台。

更多推荐