【反馈】缓存命中率控制系统-----------来自《企业级编程与控制理论》

前言

需要多大的缓存才能维持特定的命中率?

Cache Hit Rate 缓存命中率指的是某处内存被缓存数据后被访问到(命中)的概率,即一段时间内命中数与访问数的比例。


简介PID

【反馈】这个专栏现在是把《企业级编程与控制理论》中的一些知识写出来,其实就是模拟系统、建模、信号与系统相关知识加上Python实现不同案列的模拟系统的知识。一个反馈系统不能避免被讨论的就是PID了,即比例积分微分三相调节器。这里再简单介绍下我现在所理解的PID。
在这里插入图片描述
为了方便理解,做一个简图。蓝色的线是设定的目标输出值,红色的是系统实际的输出值,二者之差是error,其乘以取样间隔就是此点附近的矩形面积——两条线之间的矩形。两个取样点间的error变化率就是上式中的微分项。u是实际系统输入。假设系统为y=nx,那么u就是x,y是实际输出,error=r-y

因为不知道系统内部的n是多少,所以只能在外面瞎猜!假设这真的就是个线性系统,那么只要输入正确的x,就一定能得到想要的y。如果是曲线的方程的话,取一小段也可以当做线性。怕的就是如果输入同样一个数得到不同的输出,那就麻烦点了,这种情况可以考虑概率,某个输入使输出的最有可能满足目标就行!!!

当实际输出满足设定的目标时,error=0,则比例项为零,但是只有一个取样值满足还不能确定输出就稳定了,可能只是两条曲线交叉了一下而已!所以需要再进行取样,两次取样间的error=0变化率如果为零,说明输出稳定了。但是!!!仅仅这样还是不行,输出稳定不一定就是输出和目标一样了——可能两条线平行了!那你也会问,不是说好了目标和输出一样了吗???答案是式子中的比例项不是有个系数吗!如果比例系数比较小,那么细小的差别就会被忽略,即接近零,如果比例系数太大,输入又太大。

   \;
然后我们看积分项,积分对应的就是两条曲线之间的面积,如果面积不变了,那么也能说明error=0,此时积分项为某个特定的常数,这个常数输入系统就有可能得到目标值!但是!!!此时仍然没有解决上面这个问题,可能两条曲线平行了。当两条曲线平行时,error值太小以至于需要好一会儿才能在输出端感觉到这种差异性。所以此时需要加上反应快一些的比例项……

  • 介绍到这里并没有得出PID就能满足所有系统反馈的情况!都是大概满足要求,并且上面我所写的式子到文章后面还有优化!

模拟控制系统框架

制作一个feedback包。

#__init__.py
__name__="feedback package with PID and some filter"
__author__="应急食品派蒙"

#全局变量
__all__=["deltaT","yForPlot"]

#离散系统取样间隔,默认1s
deltaT=1
#记录实际输出命中率的变量,用来绘图
yForPlot=[]

#Component.py

'''
组件的基类
'''
class Component:
	#封装了组件的动态参数,每个时间帧调用一次这个函数
	def work(self,arr):
		return arr

	#返回一组表示组件内部状态的任意字符串
	def monitoring(self):
		return ""

#PID_Controller.py
import feedback
import feedback.Component


'''
	PID系数
	kp比例系数
	ki积分系数
	kd微分系数
	i 积分项
	d 微分项
	prevError 前一时间帧的偏移
'''



class PID_Controller(feedback.Component.Component):

	def __init__(self,kp,ki,kd=0):
		self.kp,self.ki,self.kd=kp,ki,kp
		self.i=0
		self.d=0
		self.prevError=0
	def work(self,error):
		self.i+=feedback.deltaT*error
		self.d=(error-self.prevError)/feedback.deltaT
		self.prevError=error
		return self.kp*error +self.ki*self.i + self.kd*self.d
#ADV_Controller.py
import feedback
import feedback.Component


'''
ADV_Controller提供了比PID_Controller更高级的实现
多了两个功能:一个是防止积分器饱和的“限值”,防止积分器饱和。
如果控制器输出超过构造函数所指定的限制,积分项在下一时间帧
中不会更新。限值这一任务由控制器下面的“执行器”来决定

另一个是为微分计算的过滤器。
默认情况下,不会对微分部分做平滑处理,但是如果给平滑参数
传递小于一的正数时,将调用简单的递归过滤器(单一指数
平滑),以平滑微分部分的贡献。


	PID系数
	kp比例系数
	ki积分系数
	kd微分系数
	i 积分项
	d 微分项
	prevError 前一时间帧的偏移
	clamped   是否限值
	clamp_low 限值的低值
	clamp_high 限值的高值
	ratio     当前微分占之后微分的比率
	
	
'''



class ADV_Controller(feedback.Component.Component):

	#默认PI调节,默认没有微分过滤器
	def __init__(self, kp, ki, kd=0,clampLimit=(-1e10,1e10),ratio=1):
		self.kp, self.ki, self.kd = kp, ki, kp
		self.i = 0
		self.d = 0
		self.prevError = 0
		self.clamped=True
		self.clamp_low,self.clamp_high=clampLimit
		self.ratio=ratio

	def clamp(self,arr):
		return self.clamp_low<arr<self.clamp_high

	def work(self, error):
		#如果没有限值(clamped=False),积分项就保持不变
		if self.clamped:
			self.i+=feedback.deltaT*error

		#微分项除了此时间帧的微分外,还增加了之前的微分项
		self.d=(self.ratio*(error-self.prevError)/feedback.deltaT + (1.0-self.ratio)*self.d )
		self.prevError=error

		u=self.kp*error + self.ki*self.i + self.kd*self.d

		self.clamped=ADV_Controller.clamp(self,u)
		return u

#Integrator.py
import feedback
import feedback.Component



'''
计算输入的积分,需要将累积项乘以每个时间帧的间隔
	data 存放所有arr的和
'''
class Integrator(feedback.Component.Component):
	def __init__(self):self.data=0
	def work(self,arr):
		self.data+=arr
		return feedback.deltaT*self.data

#FixedFilter.py
import feedback.Component



'''
平滑过滤器FixedFilter计算最近n次输入的未加权的平均值

	n    输入的未加权的数的个数
	data 存放所有arr的和
'''
class FixedFilter(feedback.Component.Component):
	def __init__(self,n):
		self.n=n
		self.data=[]

	def work(self,arr):
		self.data.append(arr)
		if len(self.data)>self.n:
			self.data.pop(0)  #移除列表data的第一个元素
		return float(sum(self.data))/len(self.data)
#RecurFilter.py
import feedback.Component


'''
平滑过滤器RecurFilter用简单的指数平滑算法实现
S(t)=α * X(t) + (1 - α) * S(t-1)

	ratio  当前值占输出值的比例
	y      输出值
'''
class RecurFilter(feedback.Component.Component):
	def __init__(self,ratio):
		self.ratio=ratio
		self.y=0
	#混合当前值与原始值
	def work(self,arr):
		self.y=self.ratio * arr +(1- self.ratio)*self.y
		return self.y


#Plant.py
import feedback.Component

'''
工厂类,也是组件之一。在模拟中作为系统的基类存在!

'''
class Plant(feedback.Component.Component):
	pass

#Loop.py
import feedback
import feedback.Component


'''
    GetTarget	传递的目标值函数
	controller  传递的控制器,例如PID_Controller的实例
	plant       传递的模拟系统
	tm			系统经历的时间,以deltaT为单位(默认为秒)。
	inverted    是否反转error的公式。反转为error=y-r,本来error=r-y。
	trainer     传递的执行器。执行器放在控制器和系统之间!
	filter   	传递的过滤器。过滤器放在反馈回路中,即把系统输出值y过滤后得到y2,用这个y2来算error!
	
'''


def ClosedLoop(GetTarget,controller,plant,
			   tm=500,inverted=False,
				trainer=feedback.Component.Component(),
			   filter=feedback.Component.Component()):

	print("==============模拟开始==================")

    #系统输出值经过滤器后的值
	y2=0
	for t in range(tm):
		r=GetTarget(t)
		error=r-y2
		if inverted : error = - error
		#得到控制器的输出值u
		u=controller.work(error)
		#得到执行器输出值v,即系统实际输入值
		v=trainer.work(u)
		#得到系统输出值y
		y=plant.work(v)
		#记录输出值
		feedback.yForPlot.append(y)
		#过滤输出值
		y2=filter.work(y)

		#打印看看!!!
		#print("遍历:",t,",时间:",t*feedback.deltaT,",目标:",r,",输出:",y,"\n",
		#	  "中间变量:u=",u,",v=",v,",y2=",y2,"[",plant.monitoring(),"]\n")

	print("==============模拟结束==================")







'''
这个函数内设置我们需要的目标值,交给用户自己设置
'''
def GetTarget(arr):
	pass



'''
	plant	模拟系统
	traversal	输入的遍历变量,是一个列表
	tm	遍历最大值
'''
def OpenedLoop(plant,traversal,tm=500):
	print("==============模拟开始==================")
	for t in range(tm):
		#得到系统输出值
		y=plant.work(traversal[t])
		#记录输出值
		feedback.yForPlot.append(y)
	print("==============模拟结束==================")


# #测试样式
# p=Plant()
# c=PID_Controller(0.5,0.05)
# ClosedLoop(GetTarget,c,p)



案例:高速缓存命中率

这个案例会介绍一种经典的反馈原理应用,它使用调整高速缓存容量的方法来改进高速缓存的命中率

受控系统是一个高速缓存器,例如Web缓存器或者数据库缓存器。假设高速缓存器容量为n,使用“最近访问协议”的高速缓存器。

“最近访问协议”
如果用户请求的条目已经在高速缓存器中找到,就将其返回给请求者,如果没有找到就从后台存储器中获取,然后返回给请求者,同时将此内容添加到高速缓存器中。如果高速缓存器中的条目超过n,那么就把最老的条目删除掉。

要求就是70%的请求应从高速缓存器中获取,并且高速缓存器应尽可能小,以便最大程度地减少内存消耗,并为其他任务腾出空间!

定义组件

在这里插入图片描述
请求”设在某个概率上浮动的数A,想访问的条目在某个概率上的数B,因为实际系统千差万别,这里可以随便设个A与B的关系,比如没有关系!

缓存的设计就简单了,只需要用一个字典表示高速缓存器就行,后台缓存器可以省略。因为后台缓存器的使用是在没命中的情况下,去后台缓存中查找条目,再返回给用户,并且保留一份在高速缓存中——在这个过程中,后台查找的步骤可以省略,返回给用户也可以省略,我们只需要命中率就行!所以后台缓存器也就可以省略了。

因为这里讨论的只是如何用反馈思维解决控制的问题,不考虑性能,所以能省就省。

系统输入为缓存大小,输出为命中率!这样就可以通过缓存大小来影响命中率了!!!

如何判断条目的新旧
用遍历的变量就可以,当某个条目被访问到,即在字典中被查找到后,就更新该条目的值为变量的变量。
当条目被访问到就返回1,未被访问到就返回0,通过取多次的平均值,记为该次访问的命中率。

#main.py
import feedback as fb
import feedback.Plant
import feedback.FixedFilter
import feedback.RecurFilter
import feedback.PID_Controller
import feedback.ADV_Controller
import feedback.Loop
import random
import numpy as np
import matplotlib.pyplot as plt
from  mpl_toolkits.axisartist.axislines import SubplotZero








'''
缓存器
这个系统传入u,传出0/1,经过过滤后会得到一个小数,这个小数就是命中率


	t 			遍历变量,可作为时间。
				可以当做条目新旧的判据,当访问到某条目时就更新其数据为t
	size		条目数量
	cache		缓存。这个字典的键是条目,值是遍历变量t
	SendDemand	传递的要求函数
	
'''

#记录缓存器大小的变量,用于作图
yCacheSize=[]


class Cache(fb.Plant.Plant):
	def __init__(self,size,SendDemand):
		self.t=0
		self.size=size
		self.cache={
    
    }
		self.SendDemand=SendDemand

	def work(self,u):
		self.t+=1
		#size为非零整数
		self.size = max(0,int(u))

		#记录缓存器大小
		yCacheSize.append(self.size)


		#取出对应的条目,这里是个小的不确定的模拟系统,就用一个概率代替了
		#item是个随机数,模拟请求到的条目/数据
		item = self.SendDemand(self.t)


		if  item in self.cache:
			self.cache[item]=self.t  #更新条目,表示此条目最近访问过
			return 1


		#删除旧的条目
		cacheLen=len(self.cache)
		if cacheLen>= self.size:
			#要被删除的数量
			num = cacheLen - self.size + 1
			tmp={
    
    }
			for k in self.cache.keys():
				tmp[self.cache[k]]=k
			for t in sorted(tmp.keys()):
				#删除最老的元素
				del self.cache[tmp[t]]
				num -= 1
				if num == 0:break

		#不在缓存中的条目添加进缓存,这里忽略了从后台缓存中查找。
		self.cache[item]=self.t
		return 0







'''
高速缓存器

	n 		使用FixedFilter过滤器时,平均计算间隔
	ratio	使用RecurFilter过滤器时,当前传入数占之后数的比例
			因为要对传出的0/1进行求和平均,所以只能使用FixedFilter这个过滤器
	f		过滤器
'''
class SmoothCache(Cache):
	def __init__(self,size,demand,n):
		super().__init__(size,demand)
		self.f = fb.FixedFilter.FixedFilter(n)
	# def __init__(self, size, demand, ratio):
	# 	super().__init__(size, demand)
	# 	self.f = fb.RecurFilter.RecurFilter(ratio)
	#arr是输入值,先经过一个缓存系统,得到的输出值再经过一个过滤器输出
	def work(self,arr):
		#print("系统输入>",arr,end="")
		y=super().work(arr)
		yy=self.f.work(y)
		#print("系统输出>",yy)
		return yy
	def monitoring(self):
		return "高速缓存器"




'''
目标函数,返回需要的曲线。这里是命中率

目标曲线如下
t=[0,  2500,  2500, 4500, 4500, 7800, 7800, 10000]
y=[0.7,0.7,  0.9,0 .9, 0.15,0.15 ,0.5, 0.5]
'''
def GetTarget(arr):
	if arr<2500:return 0.7
	if arr<4500:return 0.9
	if arr<7800:return 0.15
	return 0.5




'''
需求函数
要是分三个不同的概率那就特别不准了。。。

高斯分布:以 mu 为均值,sigma 为标准差
random.gauss(mu, sigma)        
'''
def SendDemand(arr):
	# if arr<3000:return int(random.gauss(0,15))
	# elif arr<5000:return int(random.gauss(0,35))
	# else:return int(random.gauss(100,15))
	return int(random.gauss(0,15))



'''
工具作图
'''
def showInPlot():
	# 实际的输出数量的曲线
	xS = np.arange(0, 10000, 1)
	yS = fb.yForPlot

	# 目标曲线
	xTarget = [0, 2500, 2500, 4500, 4500, 7800, 7800, 10000]
	yTarget = [0.7, 0.7, 0.9, 0.9, 0.15, 0.15, 0.5, 0.5]

	#第一个子图
	plt.subplot(2,1,1)
	#实际命中率(系统输出)
	plt.plot(xS, yS, color="g", linestyle="-", linewidth=1.0, label="real rate")
	#目标命中率
	plt.plot(xTarget, yTarget, color="b", linestyle="-", linewidth=1.0, label="targeted rate")
	plt.xlabel("traversal variable is t")
	plt.ylabel("cache hit rate")
	plt.legend(loc="upper right", bbox_to_anchor=(0.9, 0.95))


	#第二个子图
	plt.subplot(2, 1,2)
	#缓存器大小(系统输入)
	plt.plot(xS, yCacheSize, color="m", linestyle="-", linewidth=1.0, label="cache size")
	plt.xlabel("traversal variable is t")
	plt.ylabel("cache size")
	plt.legend(loc="upper right", bbox_to_anchor=(0.9, 0.95))
	plt.show()




if __name__=="__main__":
	#
	# #传递条目数量为零、需求函数、过滤器的间隔或之前值占之后值比例
	p=SmoothCache(0,SendDemand,150) #每10个平均计算一次命中率

	#传递比例项系数、积分项系数、微分项系数
	#c=fb.PID_Controller.PID_Controller(1.5,1.3)
	c=fb.ADV_Controller.ADV_Controller(1.5,1.0,1.01,ratio=0.9)


	fb.Loop.ClosedLoop(GetTarget,c,p,10000)


	#作图
	showInPlot()


	# print(fb.__name__,fb.__author__,fb.__all__,fb.deltaT)
	# if 'deltaT' in globals().keys():	print('yes')
	# else:print('no')
	#
	# print(globals().keys())

具有控制器的系统输出和控制系统输出的效果
在这里插入图片描述

没有控制器的系统输出的效果

	#不进行控制,显示本来的面貌
	p = SmoothCache(0, SendDemand, 150)
	t=np.arange(0, 200, 0.02)
	fb.Loop.OpenedLoop(p,t,10000)

在这里插入图片描述

高速缓存器越大,命中率越大,但是容量超过100后命中率基本没有变化。一百以内容量与命中率的关系是线性的。

挺奇怪的结论!100这个数也太小了,实际容量都远远高于这个数,等于就是说高速缓存器容量和命中率没什么关系(误)。

其实原因是请求太少了,请求的种类多少影响缓存器的大小。

#重写SendDemand
return int(random.gauss(300,115))
#更改遍历的参数
t=np.arange(0, 900, 0.09)

在增加了请求种类后,相同的缓存大小下命中率就下降了。
在这里插入图片描述

结论

当请求种类大小不变时,增加缓存器大小可以增加命中率。但是超过某个临界点,命中率就与缓存器大小无关了。


参考:《企业级编程与控制理论》

猜你喜欢

转载自blog.csdn.net/weixin_41374099/article/details/104019657