ES-JOB——分布式定时任务基础使用
参考文档:概览 :: ElasticJobhttps://shardingsphere.apache.org/elasticjob/current/cn/overview/
·
参考文档:概览 :: ElasticJobhttps://shardingsphere.apache.org/elasticjob/current/cn/overview/
elasticjob能做到同一类定时任务,几个定时任务分片处理提高处理效率,
1.pom引入:
<!-- elastic-job dependency --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${elastic-job.version}</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${elastic-job.version}</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bfxy</groupId>
<artifactId>es-job</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>es-job</name>
<description>es-job</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<elastic-job.version>2.1.4</elastic-job.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- elastic-job dependency -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<!-- spring boot dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<build>
<finalName>es-job</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.配置文件:
server.port=8881
zookeeper.address=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
zookeeper.namespace=elastic-job
zookeeper.connectionTimeout=10000
zookeeper.sessionTimeout=10000
zookeeper.maxRetries=3
##simpleJob.cron=0/10 * * * * ?
simpleJob.cron=00 03 21 * * ?
simpleJob.shardingTotalCount=5
simpleJob.shardingItemParameters=0=beijing,1=shanghai,2=changchun,3=changsha,4=hangzhou
simpleJob.jobParameter=source1=public,source2=private
simpleJob.failover=true
simpleJob.monitorExecution=true
simpleJob.monitorPort=8889
simpleJob.maxTimeDiffSeconds=-1
simpleJob.jobShardingStrategyClass=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
dataflowJob.cron=0/10 * * * * ?
dataflowJob.shardingTotalCount=2
dataflowJob.shardingItemParameters=0=Beijing,1=Shanghai
spring.datasource.url=jdbc:mysql://localhost:3306/elasticjob?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root
3.解析配置文件ZookeeperRegistryCenter注册中心
@ConditionalOnExpression("'${zookeeper.address}'.length() > 0")当address长度大于0时候才会注册注册中心
package com.bfxy.esjob.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
@Configuration
@ConditionalOnExpression("'${zookeeper.address}'.length() > 0")
public class RegistryCenterConfig {
/**
* 把注册中心加载到spring 容器中
* @return
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter registryCenter(@Value("${zookeeper.address}") final String serverLists,
@Value("${zookeeper.namespace}") final String namespace,
@Value("${zookeeper.connectionTimeout}") final int connectionTimeout,
@Value("${zookeeper.sessionTimeout}") final int sessionTimeout,
@Value("${zookeeper.maxRetries}") final int maxRetries) {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverLists, namespace);
zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeout);
zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeout);
zookeeperConfiguration.setMaxRetries(maxRetries);
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}
4.简单用例:
package com.bfxy.esjob.task;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.err.println("---------开始任务---------");
System.err.println(shardingContext.getJobName());
System.err.println(shardingContext.getJobParameter());
System.err.println(shardingContext.getShardingItem());
System.err.println(shardingContext.getShardingParameter());
System.err.println(shardingContext.getShardingTotalCount());
System.err.println("当前线程 : ---------------" + Thread.currentThread().getName());
System.err.println("----------结束任务------");
}
}
5,配置定时任务MySimpleJobConfig
##simpleJob.cron=0/10 * * * * ? simpleJob.cron=00 03 21 * * ? simpleJob.shardingTotalCount=5 simpleJob.shardingItemParameters=0=beijing,1=shanghai,2=changchun,3=changsha,4=hangzhou simpleJob.jobParameter=source1=public,source2=private simpleJob.failover=true simpleJob.monitorExecution=true simpleJob.monitorPort=8889 simpleJob.maxTimeDiffSeconds=-1 simpleJob.jobShardingStrategyClass=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
package com.bfxy.esjob.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.bfxy.esjob.listener.SimpleJobListener;
import com.bfxy.esjob.task.MySimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
@Configuration
public class MySimpleJobConfig {
@Autowired
private ZookeeperRegistryCenter registryCenter;
@Autowired
private JobEventConfiguration jobEventConfiguration;
@Bean
public SimpleJob simpleJob() {
return new MySimpleJob();
}
/**
* @param simpleJob
* @return
*/
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
@Value("${simpleJob.cron}") final String cron,
@Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters,
@Value("${simpleJob.jobParameter}") final String jobParameter,
@Value("${simpleJob.failover}") final boolean failover,
@Value("${simpleJob.monitorExecution}") final boolean monitorExecution,
@Value("${simpleJob.monitorPort}") final int monitorPort,
@Value("${simpleJob.maxTimeDiffSeconds}") final int maxTimeDiffSeconds,
@Value("${simpleJob.jobShardingStrategyClass}") final String jobShardingStrategyClass) {
return new SpringJobScheduler(simpleJob,
registryCenter,
getLiteJobConfiguration(simpleJob.getClass(),
cron,
shardingTotalCount,
shardingItemParameters,
jobParameter,
failover,
monitorExecution,
monitorPort,
maxTimeDiffSeconds,
jobShardingStrategyClass),
jobEventConfiguration,
new SimpleJobListener());
}
private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.misfire(true)
.failover(failover)
.jobParameter(jobParameter)
.shardingItemParameters(shardingItemParameters)
.build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
.jobShardingStrategyClass(jobShardingStrategyClass)
.monitorExecution(monitorExecution)
.monitorPort(monitorPort)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.overwrite(false)
.build();
return liteJobConfiguration;
}
}
6,监听:执行任务前后
package com.bfxy.esjob.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
public class SimpleJobListener implements ElasticJobListener {
private static Logger LOGGER = LoggerFactory.getLogger(SimpleJobListener.class);
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
LOGGER.info("-----------------执行任务之前:{}", shardingContexts);
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
LOGGER.info("-----------------执行任务之后:{}", shardingContexts);
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)