本图来自https://shardingsphere.apache.org/elasticjob/current/cn/dev-manual/sharding/
JobShardingStrategy
public interface JobShardingStrategy extends TypedSPI {
/** 作业分片
* Sharding job.
*
* @param jobInstances all job instances which participate in sharding 所有参与分片的单元列表
* @param jobName job name 作业名称
* @param shardingTotalCount sharding total count 分片总数
* @return sharding result
*/
Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}
类图
AverageAllocationJobShardingStrategy
基于平均分配算法的分片策略
规则
- If there are 3 job servers and the total sharding count is 9, each job server is divided into: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8];
- If there are 3 job servers and the total sharding count is 8, each job server is divided into: 1=[0,1,6], 2=[2,3,7], 3=[4,5];
- If there are 3 job servers and the total sharding count is 10, each job server is divided into: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
源码
/** 基于平均分配算法的分片策略
* Sharding strategy which for average by sharding item.
*
* <p>
* If the job server number and sharding count cannot be divided,
* the redundant sharding item that cannot be divided will be added to the server with small sequence number in turn.
*
* For example:
*
* 1. If there are 3 job servers and the total sharding count is 9, each job server is divided into: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8];
* 2. If there are 3 job servers and the total sharding count is 8, each job server is divided into: 1=[0,1,6], 2=[2,3,7], 3=[4,5];
* 3. If there are 3 job servers and the total sharding count is 10, each job server is divided into: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
* </p>
*/
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount); // 分配能被整除的部分
addAliquant(jobInstances, shardingTotalCount, result); // 分配不能被整除的部分
return result;
}
private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
int itemCountPerSharding = shardingTotalCount / shardingUnits.size(); // 假设以8为例 itemCountPerSharding= 8/3=2
int count = 0;
for (JobInstance each : shardingUnits) {
// A:[0,1] B:[2,3] C: [4,5]
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
//连续添加itemCountPerSharding个
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
int aliquant = shardingTotalCount % shardingUnits.size(); //上面以8为例 aliquant=8%6=2
int count = 0;
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count); // 8/6*6+0 第一个是6 第2个是7
} // 最终 // A:[0,1,6] B:[2,3,7] C: [4,5]
count++;
}
}
@Override
public String getType() {
return "AVG_ALLOCATION";
}
}
OdevitySortByNameJobShardingStrategy
规则
For example:
- If there are 3 job servers with 2 sharding item, and the hash value of job name is odd, then each server is divided into: 1 = [0], 2 = [1], 3 = [];
- If there are 3 job servers with 2 sharding item, and the hash value of job name is even, then each server is divided into: 3 = [0], 2 = [1], 1 = [].
源码
/** 据作业名的哈希值奇偶数决定IP升降序算法的分片策略
* Sharding strategy which for hash with job name to determine IP asc or desc.
*
* <p>
* IP address asc if job name' hashcode is odd;
* IP address desc if job name' hashcode is even.
* Used to average assign to job server.
*
* For example:
* 1. If there are 3 job servers with 2 sharding item, and the hash value of job name is odd, then each server is divided into: 1 = [0], 2 = [1], 3 = [];
* 2. If there are 3 job servers with 2 sharding item, and the hash value of job name is even, then each server is divided into: 3 = [0], 2 = [1], 1 = [].
* </p>
*/
public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
long jobNameHash = jobName.hashCode();
if (0 == jobNameHash % 2) {
Collections.reverse(jobInstances);
}
return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
}
@Override
public String getType() {
return "ODEVITY";
}
}
RoundRobinByNameJobShardingStrategy
/** 根据作业名的哈希值对作业节点列表进行轮转的分片策略
* Sharding strategy which for round robin by name job.
*/
public final class RoundRobinByNameJobShardingStrategy implements JobShardingStrategy {
private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
}
private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) {
int shardingUnitsSize = shardingUnits.size();
int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
if (0 == offset) {
// 剛好是0,則直接返回shardingUnits
return shardingUnits;
}
List<JobInstance> result = new ArrayList<>(shardingUnitsSize);
for (int i = 0; i < shardingUnitsSize; i++) {
int index = (i + offset) % shardingUnitsSize; // 假設是offset=2 ,size=6 result順序為: 2、3、4、5、0、1
result.add(shardingUnits.get(index)); //(0+2)%6=2 (4+2)%6=0 (5+2)%6=1
}
return result;
}
@Override
public String getType() {
return "ROUND_ROBIN";
}
}
JobShardingStrategyFactory
public final class JobShardingStrategyFactory {
private static final Map<String, JobShardingStrategy> STRATEGIES = new LinkedHashMap<>();
private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
static {
for (JobShardingStrategy each : ServiceLoader.load(JobShardingStrategy.class)) {
STRATEGIES.put(each.getType(), each);
}
}
/**
* Get job sharding sharding.
*
* @param type job sharding sharding type
* @return job sharding sharding
*/
public static JobShardingStrategy getStrategy(final String type) {
if (Strings.isNullOrEmpty(type)) {
return STRATEGIES.get(DEFAULT_STRATEGY);
}
if (!STRATEGIES.containsKey(type)) {
throw new JobConfigurationException("Can not find sharding sharding type '%s'.", type);
}
return STRATEGIES.get(type);
}
}
spi文件定义
org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy
org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl.AverageAllocationJobShardingStrategy
org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl.OdevitySortByNameJobShardingStrategy
org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl.RoundRobinByNameJobShardingStrategy