Netty简介
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Redis也是基于事件驱动框架开发的。
Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
Netty官网:Netty: Home
源码在github镜像下载链接:gitclone.com
HashedWheelTimer
本篇文章的主角是Netty框架工具类下面的HashedWheelTimer,它是一个为I/O超时调度而优化的计时器。
官方文档:HashedWheelTimer (Netty API Reference (4.0.56.Final))
下面以一个简单的Demo为例,每秒打印一下当前时间,
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
public class HashedWheelTimerDemo {
// 1个槽代表的时间,默认100ms
private static final int TICK_DURATION = 1;
// 延时任务时间,即等待多久开始执行时间轮任务
private static final int DELAY_TASK_TIME = 60;
// hashedWheelTimer设置3600个槽,相当于一个圆的1/3600,每移动一个槽的时间是1秒。
private static final int TICKS_PER_WHEEL = 3600;
private static final HashedWheelTimer TIMER = new HashedWheelTimer(TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);
private static final int EACH_TASK_TIME = 1;
public static void main(String[] args) {
// 任务需要经过的tick数为: 60000 / 1000 = 60次,即经过1分钟后才是正常的每秒输出当前时间
executeTask(TIMER.newTimeout(HashedWheelTimerDemo::executeTask, DELAY_TASK_TIME, TimeUnit.SECONDS));
}
private static void executeTask(Timeout timeout) {
try {
LocalDateTime now = LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault());
LocalDateTime expiredTime = now.minusSeconds(EACH_TASK_TIME);
System.out.println(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(expiredTime));
} finally {
TIMER.newTimeout(timeout.task(), EACH_TASK_TIME, TimeUnit.SECONDS);
}
}
}
上面是一个简单的Demo,当然使用它还是很简单的,那么接下来了解一下它怎么实现的,可以找到HashedWheelTimer的源码,看看每次超时是在做什么?
HashedWheelTimer的有参构造函数传入了三个参数:
- tickDuration:一个 bucket 代表的时间,默认为 100ms
- unit:时间单位
- ticksPerWheel:一轮含有多少个 bucket ,默认为 512 个
HashedWheelTimer类中有多个有参构造函数,把默认值传递到了最下面的6个参数的有参构造函数中去。
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
this.worker = new HashedWheelTimer.Worker();
this.startTimeInitialized = new CountDownLatch(1);
this.timeouts = PlatformDependent.newMpscQueue();
this.cancelledTimeouts = PlatformDependent.newMpscQueue();
this.pendingTimeouts = new AtomicLong(0L);
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
} else if (unit == null) {
throw new NullPointerException("unit");
} else if (tickDuration <= 0L) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
} else if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
} else {
this.wheel = createWheel(ticksPerWheel);
this.mask = this.wheel.length - 1;
long duration = unit.toNanos(tickDuration);
if (duration >= 9223372036854775807L / (long)this.wheel.length) {
throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, 9223372036854775807L / (long)this.wheel.length));
} else {
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
this.workerThread = threadFactory.newThread(this.worker);
this.leak = !leakDetection && this.workerThread.isDaemon() ? null : leakDetector.track(this);
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > 64 && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
}
}
线程工厂根据worker创建的工作线程在下面的newTimeout方法中启动,
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 计时器任务为空抛异常
if (task == null) {
throw new NullPointerException("task");
// 时延单位为空抛异常
} else if (unit == null) {
throw new NullPointerException("unit");
} else {
// 原子计数器 +1
long pendingTimeoutsCount = this.pendingTimeouts.incrementAndGet();
// 计数器大于最大计数超时值时 -1并抛异常
if (this.maxPendingTimeouts > 0L && pendingTimeoutsCount > this.maxPendingTimeouts) {
this.pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending timeouts (" + this.maxPendingTimeouts + ")");
} else {
// 工作状态更新,并启动工作线程
this.start();
long deadline = System.nanoTime() + unit.toNanos(delay) - this.startTime;
// 如果时延大于0且deadline小于0,则循环
if (delay > 0L && deadline < 0L) {
deadline = 9223372036854775807L;
}
// 时间轮超时队列排队
HashedWheelTimer.HashedWheelTimeout timeout = new HashedWheelTimer.HashedWheelTimeout(this, task, deadline);
this.timeouts.add(timeout);
return timeout;
}
}
}
大体的实现结构如下图,
下面看一下Worker线程中做了什么?
private final class Worker implements Runnable {
... ...
public void run() {
// 初始化启动时间
HashedWheelTimer.this.startTime = System.nanoTime();
// startTime 0表示未被初始化,1表示已被初始化
if (HashedWheelTimer.this.startTime == 0L) {
HashedWheelTimer.this.startTime = 1L;
}
// CountDownLatch.countDown减1,直到计数器为0继续往下走
HashedWheelTimer.this.startTimeInitialized.countDown();
int idx;
HashedWheelTimer.HashedWheelBucket bucket;
do {
long deadline = this.waitForNextTick();
// 处理超时的任务
if (deadline > 0L) {
idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
this.processCancelledTasks();
bucket = HashedWheelTimer.this.wheel[idx];
this.transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
++this.tick;
}
} while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);
HashedWheelTimer.HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
int var2 = var5.length;
for(idx = 0; idx < var2; ++idx) {
bucket = var5[idx];
bucket.clearTimeouts(this.unprocessedTimeouts);
}
while(true) {
HashedWheelTimer.HashedWheelTimeout timeout = (HashedWheelTimer.HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
if (timeout == null) {
this.processCancelledTasks();
return;
}
if (!timeout.isCancelled()) {
this.unprocessedTimeouts.add(timeout);
}
}
}
... ...
}