可重入函数
可重入函数主要用于多任务环境中,一个可重入的函数简单来说就是可以被中断的函数,也就是说,可以在这个函数执行的任何时刻中断它,转入OS调度下去执行另外一段代码,而返回控制时不会出现什么错误;而不可重入的函数由于使用了一些系统资源,比如全局变量区,中断向量表等
write 和 send
recv和send函数提供了和read和write差不多的功能。但是他们提供了第四个参数来控制读写操作.
int recv(int sockfd,void *buf,int len,int flags)
int send(int sockfd,void *buf,int len,int flags)
前面的三个参数和read,write相同,第四个参数能够是0或是以下的组合:
MSG_DONTROUTE | 不查找路由表 |
MSG_OOB | 接受或发送带外数据 |
MSG_PEEK | 查看数据,并不从系统缓冲区移走数据 |
MSG_WAITALL | 等待任何数据 |
如果flags为0,则和read,write一样的操作。
在unix中,所有的设备都可以看成是一个文件,所以我们可以用read来读取socket数据。
信号和系统调用
EINTR说明: 如果进程在一个慢系统调用(slow system call)中阻塞时,当捕获到某个信号且相应信号处理函数返回时,这个系统调用被中断,调用返回错误,设置errno为EINTR(相应的错误描述为“Interrupted system call”)。
怎么看哪些系统条用会产生EINTR错误呢?man 7 signal,在ubuntu 10.04上可以查看,哪些系统调用会产生 EINTR错误。
如何处理被中断的系统调用
既然系统调用会被中断,那么别忘了要处理被中断的系统调用。有三种处理方式:
-
人为重启被中断的系统调用
-
安装信号时设置 SA_RESTART属性(该方法对有的系统调用无效)
-
忽略信号(让系统不产生信号中断)
信号掩码
POSIX下,每个进程有一个信号掩码(signal mask)。简单地说,信号掩码是一个“位图”,其中每一位都对应着一种信号。如果位图中的某一位为1,就表示在执行当前信号的处理程序期间相应的信号暂时被“屏蔽”,使得在执行的过程中不会嵌套地响应那种信号。
https
https的请求流程:
客户端(浏览器)向服务器请求https连接。
服务器返回证书(公钥)到客户端。
客户端随机的秘钥A(用于对称加密)。
客户端用公钥对A进行加密。
客户端将加密A后的密文发送给服务器。
服务器通过私钥对密文进行解密得到对称加密的秘钥。
客户端与服务器通过对称秘钥加密的密文通信。
上述过程中第2步骤中是存在风险的,因为公钥是暴露出来的,当公钥被中间人非法截获时,同时将公钥替换成中间人自己的公钥发送给客户端,从而得到对称加密的秘钥,进而伪装与客户端通信。
为了解决这种问题,就引入了数字证书与数字签名
所以在第2步骤时,服务器发送了一个SSL证书给客户端,SSL证书中包含了具体的内容有证书的颁发机构、有效期、公钥、证书持有者、签名,通过第三方的校验保证身份的合法。
- 数字签名是对摘要(正文hash后)的一个加密编码,附在正文后面,证明正文未经过修改
- 证书中心(CA)存放着服务器的数字证书,包含服务器的公钥和一些相关信息
- 客户端请求服务器时,先向CA请求当前服务器的数字证书和数字证书的签名
- 客户端用CA公钥来解密证书的hash值,再计算CA发来的数字证书的hash值,两者比对,如果相同则说明CA发来的证书为真,且未被修改。
- 利用证书中的公钥与服务器通信
post
通过 form 表单提交文件操作如下:
<FORM method="POST" action="http://w.sohu.com/t2/upload.do" enctype="multipart/form-data">
<INPUT type="text" name="city" value="Santa colo">
<INPUT type="text" name="desc">
<INPUT type="file" name="pic">
</FORM>
浏览器将会发送以下数据:
POST /t2/upload.do HTTP/1.1
User-Agent: SOHUWapRebot
Accept-Language: zh-cn,zh;q=0.5
Accept-Charset: GBK,utf-8;q=0.7,*;q=0.7
Connection: keep-alive
Content-Length: 60408
Content-Type:multipart/form-data; boundary=ZnGpDtePMx0KrHh_G0X99Yef9r8JZsRJSXC
Host: w.sohu.com
--ZnGpDtePMx0KrHh_G0X99Yef9r8JZsRJSXC
Content-Disposition: form-data; name="city"
Santa colo
--ZnGpDtePMx0KrHh_G0X99Yef9r8JZsRJSXC
Content-Disposition: form-data;name="desc"
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
...
--ZnGpDtePMx0KrHh_G0X99Yef9r8JZsRJSXC
Content-Disposition: form-data;name="pic"; filename="photo.jpg"
Content-Type: application/octet-stream
Content-Transfer-Encoding: binary
... binary data of the jpg ...
--ZnGpDtePMx0KrHh_G0X99Yef9r8JZsRJSXC--
从上面的 multipart/form-data 格式发送的请求的样式来看,它包含了多个 Parts,每个 Part 都包含头信息部分,
Part 头信息中必须包含一个 Content-Disposition 头,其他的头信息则为可选项, 比如 Content-Type 等。
Content-Disposition 包含了 type 和 一个名字为 name 的 parameter,type 是 form-data,name 参数的值则为表单控件(也即 field)的名字,如果是文件,那么还有一个 filename 参数,值就是文件名。
base64
我们知道在计算机中任何数据都是按ascii码存储的,而ascii码的128~255之间的值是不可见字符。而在网络上交换数据时,比如说从A地传到B地,往往要经过多个路由设备,由于不同的设备对字符的处理方式有一些不同,这样那些不可见字符就有可能被处理错误,这是不利于传输的。所以就先把数据先做一个Base64编码,统统变成可见字符,这样出错的可能性就大降低了。
打洞过程
打洞过程
(1)ClientA请求Server。
(2)ClientB请求Server。
(3)Server把ClientA的IP和端口信息发给ClientB。
(4)Server把ClientB的IP和端口信息发给ClientA。
(5)ClientA利用信息给ClientB发消息。(A信任B)
(6)ClinetB利用信息给ClientA发消息。(B信任A)
(7)连接已经建立。两者可以直接通信了。
html引擎
- 上下文context,引擎每接受一个请求,就为其生成一个新的context.
type Context struct { // origin objects Writer http.ResponseWriter Req *http.Request // request info Path string Method string Params map[string]string // response info StatusCode int // middleware handlers []HandlerFunc index int // engine pointer engine *Engine }
- html模板的使用
在template中,点"."代表当前作用域的当前对象。使用 range .后, 会产生一个新的作用域,其中当前对象的子对象作为当前对象
package main
import (
"os"
"text/template"
)
type Friend struct {
Fname string
}
type Person struct {
UserName string
Emails []string
Friends []*Friend
}
func main() {
f1 := Friend{
Fname: "xiaofang"}
f2 := Friend{
Fname: "wugui"}
t := template.New("test")
t = template.Must(t.Parse(
`hello {
{.UserName}}!
{
{ range .Emails }}
an email {
{ . }}
{
{- end }}
{
{ with .Friends }}
{
{- range . }}
my friend name is {
{.Fname}}
{
{- end }}
{
{ end }}`))
p := Person{
UserName: "longshuai",
Emails: []string{
"[email protected]", "[email protected]"},
Friends: []*Friend{
&f1, &f2}}
t.Execute(os.Stdout, p)
}
输出结果:
hello longshuai!
an email [email protected]
an email [email protected]
my friend name is xiaofang
my friend name is wugui
这里定义了一个Person结构,它有两个slice结构的字段。在Parse()方法中:
顶级作用域的{
{.UserName}}、{
{.Emails}}、{
{.Friends}}中的点都代表Execute()的第二个参数,也就是Person对象p,它们在执行的时候会分别被替换成p.UserName、p.Emails、p.Friends。
因为Emails和Friend字段都是可迭代的,在{
{range .Emails}}…{
{end}}这一段结构内部an email {
{.}},这个".“代表的是range迭代时的每个元素对象,也就是p.Emails这个slice中的每个元素。
同理,with结构内部{
{range .}}的”."代表的是p.Friends,也就是各个,再此range中又有一层迭代,此内层{
{.Fname}}的点代表Friend结构的实例,分别是&f1和&f2,所以{
{.Fname}}代表实例对象的Fname字段。
-
路由
使用前缀树来进行路径的匹配
节点的数据结构type node struct { pattern string part string children []*node isWild bool }
每一个节点对应路径中的一段,段用part表示。如果节点对应某路径中的最后一段,则pattern属性是该完整路径的值,否则patern为空。若节点对应段的前缀为:或 *,则isWild属性为true。:和 *节点能匹配任意段,不同的是,节点匹配到 *就停止了。一次匹配返回唯一的成功节点或一个失败的空节点。之后,同时遍历待匹配的path和成功节点的pattern,可以找到 :段和 * 段对应的值并保存到params字典中。
在解析过程中把pattern对应的handler以及params保存到context对象中。
使用例子:func main() { r := gee.New() r.GET("/", func(c *gee.Context) { c.HTML(http.StatusOK, "<h1>Hello Gee</h1>") }) r.GET("/hello", func(c *gee.Context) { // expect /hello?name=geektutu c.String(http.StatusOK, "hello %s, you're at %s\n", c.Query("name"), c.Path) }) r.GET("/hello/:name", func(c *gee.Context) { // expect /hello/geektutu c.String(http.StatusOK, "hello %s, you're at %s\n", c.Param("name"), c.Path) }) r.GET("/assets/*filepath", func(c *gee.Context) { c.JSON(http.StatusOK, gee.H{ "filepath": c.Param("filepath")}) }) r.Run(":9999") }
-
分组
引入分组机制后routergroup和http引擎的数据结构type ( RouterGroup struct { //路由组 prefix string middlewares []HandlerFunc // 每个组的中间件 parent *RouterGroup // support nesting engine *Engine // 每个组共享一个引擎的实例 } Engine struct { // http处理引擎 *RouterGroup router *router groups []*RouterGroup // 储存所有组 } )
这里用go的组合机制实现engine对RouterGroup的继承
所有routerGroup共享一个engine的实例,prefix是每个组的前缀。在组上调用addRoute(method string, pattern string, handler HandlerFunc)时,组内部把组的前缀+pattern作为作为engine.addroute的参数。
分组可以嵌套:v1 := r.Group("/v1") v2 := v1.Group("/v2")
这时组2的前缀为/v1/v2,并且v2的parent为v1.
-
中间件
处理一个请求时,先匹配path和各个组的前缀,并把匹配到的组的中间件(handler)追加到请求对应的context中的handlers数组中。 之后再根据path匹配前缀树中的节点,拿到节点对应的handler,同样追加到handlers数组中。
利用context的next函数执行这些handler:func (c *Context) Next() { c.index++ s := len(c.handlers) for ; c.index < s; c.index++ { c.handlers[c.index](c) } }
-
错误恢复
向最大的组engine注册一个用于恢复错误的handler, 并在这个handler内部执行Next(),相当于把其它所有的handler包围起来:func Recovery() HandlerFunc { return func(c *Context) { defer func() { if err := recover(); err != nil { message := fmt.Sprintf("%s", err) log.Printf("%s\n\n", trace(message)) c.Fail(http.StatusInternalServerError, "Internal Server Error") } }() c.Next() } }
ORM
- 用法
var (
user1 = &User{
"Tom", 18}
user2 = &User{
"Sam", 25}
user3 = &User{
"Jack", 25}
)
s := NewSession().Model(&User{
})
err1 := s.DropTable()
err2 := s.CreateTable()
_, err3 := s.Insert(user1, user2)
Session负责和数据库进行交互,dialect保存数据库(比如sqlite3)中类型和go语言基本类型的映射.
type Session struct {
db *sql.DB
dialect dialect.Dialect
refTable *schema.Schema
clause clause.Clause
sql strings.Builder
sqlVars []interface{
}
}
reftable的基本类型是Schema,负责保存对象类型中各个字段(field)在数据库中的类型以及tag(例如主键),clause内部有不同的函数,可以根据参数生成对应的查询子句,例如insert操作可以分成INSERT 子句和VALUES两个子句,分别对应两个构造函数
func _insert(values ...interface{
}) (string, []interface{
}) {
// INSERT INTO $tableName ($fields)
tableName := values[0]
fields := strings.Join(values[1].([]string), ",")
return fmt.Sprintf("INSERT INTO %s (%v)", tableName, fields), []interface{
}{
}
}
func _values(values ...interface{
}) (string, []interface{
}) {
// VALUES ($v1), ($v2), ...
var bindStr string
var sql strings.Builder
var vars []interface{
}
sql.WriteString("VALUES ")
for i, value := range values {
v := value.([]interface{
})
if bindStr == "" {
bindStr = genBindVars(len(v))
}
sql.WriteString(fmt.Sprintf("(%v)", bindStr))
if i+1 != len(values) {
sql.WriteString(", ")
}
vars = append(vars, v...)
}
return sql.String(), vars
}
session可以调用Model函数解析一个对象,并把输出的schema保存在reftable字段中。注意,保存在schema中的字段必须是可导出的,并且顺序和在对象中保持一致
type Field struct {
Name string
Type string
Tag string
}
// Schema represents a table of database
type Schema struct {
Model interface{
}
Name string
Fields []*Field
FieldNames []string
fieldMap map[string]*Field
}
在下面的类型中,Name字段有一个值为PRIMARY KEY的tag, 在
解析时用modeltype.Field(0).Tag.Lookup(“geeorm”)来获取。
type User struct {
Name string `geeorm:"PRIMARY KEY"`
Age int
}
- find
find的任务是把查询结果保存到传入的对象数组中,因为涉及把查询结果反序列化为对象,所以事先比insert复杂一些。 这里通过dest.FieldByName(name).Addr().Interface()获取对象每个字段的指针,然后传给rows.Scan函数。
func (s *Session) Find(values interface{
}) error {
destSlice := reflect.Indirect(reflect.ValueOf(values))
destType := destSlice.Type().Elem()
table := s.Model(reflect.New(destType).Elem().Interface()).RefTable()
s.clause.Set(clause.SELECT, table.Name, table.FieldNames)
sql, vars := s.clause.Build(clause.SELECT, clause.WHERE, clause.ORDERBY, clause.LIMIT)
rows, err := s.Raw(sql, vars...).QueryRows()
if err != nil {
return err
}
for rows.Next() {
dest := reflect.New(destType).Elem()
var values []interface{
}
for _, name := range table.FieldNames {
values = append(values, dest.FieldByName(name).Addr().Interface())
}
if err := rows.Scan(values...); err != nil {
return err
}
destSlice.Set(reflect.Append(destSlice, dest))
}
return rows.Close()
}
分布式key-value cache
-
lru(Least Recently Used)
数据结构type Cache struct { maxBytes int64 nbytes int64 ll *list.List cache map[string]*list.Element // optional and executed when an entry is purged. OnEvicted func(key string, value Value) } type entry struct { key string value Value } // Value use Len to count how many bytes it takes type Value interface { Len() int }
nbytes为总占用内存,OnEvicted为删除一条数据时自动执行的函数。ll是一个双向列表,表中的元素是entry指针, entry中的value是任何实现了Len()的类型。cache字典中保存的value是压入列表后自动返回的类型*list.Element。
用list来记录加入的先后。get(key)先在cache字典中根据key找到列表元素,然后把列表元素压入队尾。add时,先检查nbytes是否大于maxBytes,若大于则从队首删除元素。 -
删除队首元素
func (c *Cache) RemoveOldest() { ele := c.ll.Back() if ele != nil { c.ll.Remove(ele) kv := ele.Value.(*entry) delete(c.cache, kv.key) c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key, kv.value) } } }
先从list中移出队首,然后利用返回的entry中key,删除cache中的key
-
ByteView
type ByteView struct { b []byte } // Len returns the view's length func (v ByteView) Len() int { return len(v.b) }
byteview是LRU里Value接口的实现
-
group
type Group struct { name string getter Getter mainCache cache } var ( mu sync.RWMutex groups = make(map[string]*Group) )
每个主机上有一个以group为元素的map, 每个group相当于一个命名空间,并且对应于一个独立的cache
type Getter interface { Get(key string) ([]byte, error) } // A GetterFunc implements Getter with a function. type GetterFunc func(key string) ([]byte, error) // Get implements Getter interface function func (f GetterFunc) Get(key string) ([]byte, error) { return f(key) }
getter 是在cache中get失败时的回调函数,用于从其它数据源获取数据并加入cache
-
http
约定访问路径格式为/<basepath>/<groupname>/<key>
最终使用 w.Write() 将缓存值作为 httpResponse 的 body 返回 -
一致性哈希
意义: 使相同的key对应相同的节点,即便中途新加入或删除了部分主机导致节点总数发生了变化,解决方法是把所有的key组织成一个环,并分割成多个区域,每个区域对应一个节点,如果某个区域的节点被删除则把区域内的节点加入下一个区域。为了解决数据倾斜,即某个节点对应特别多的key,可以为每个节点设置多个虚拟节点,使其在圆上的分布更加均匀。type Map struct { hash Hash replicas int keys []int // Sorted hashMap map[int]string }
节点用ip地址表示,虚拟节点在真实节点的ip地址前面加编号。 利用hash函数把节点ip转为int,并加入keys和hashmap中。
-
新group
加入分布式节点后group的数据结构变为type Group struct { name string getter Getter mainCache cache peers PeerPicker }
peerpicker是一个http客户端,负责利用一致性哈希进行节点选择,并从其它缓存节点获取数据。一个缓存节点初始化时,首先初始化group,然后把其它缓存节点的ip(已知)加入到peerpicker的一致性map里。如果当前节点找不到key就利用peerpicker,从当前缓存节点发起请求到其它节点去找。如果一致性哈希后还是指向本节点,就利用回调函数getter从获取数据
-
缓存击穿
大量相同请求一个不在缓存中的数据,所以每次都请求数据库,导致其宕机 -
防止缓存击穿
首先创建 call 和 Group 类型。package singleflight import "sync" type call struct { wg sync.WaitGroup val interface{ } err error } type Group struct { mu sync.Mutex // protects m m map[string]*call }
func (g *Group) Do(key string, fn func() (interface{ }, error)) (interface{ }, error) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { g.mu.Unlock() c.wg.Wait() return c.val, c.err } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete(g.m, key) g.mu.Unlock() return c.val, c.err }
用一个map加上waitgroup来确保,第一时间,对同一个key只有一个请求,用WaitGroup原因是,唤醒之后不会像互斥锁那样继续占有锁。从而多个协程可以同时唤醒并继续运行
RPC
一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 得 CodeType 解码剩余的内容。即报文将以这样的形式发送:
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod …} | Body interface{} |
| <------固定 JSON 编码 ------> | <-----编码方式由 CodeType 决定---->|
在一次连接中,Option 固定在报文的最开始,Header 和 Body 可以有多个,即报文可能是这样的。
| Option | Header1 | Body1 | Header2 | Body2 | … 不管是客户端还是服务端,报文都是header后跟着body,客户端基本上就是把服务端发来的header再发送回去,
这里的RPC要求客户端知道远程调用的参数类型和返回类型。
把指针或者接口形式的对象传给glob.decoder, 后者就能从二进制数据中反序列化出这个对象的内容。
服务器解析函数参数的过程:先解析固定类型的header,从中可以知道服务名和方法名。
利用这两个字符串检索得到methodtype对象,进而利用methodtype对象创建待调用函数的参数实例argv, 并把argv以接口形式传给gob.Decoder.
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
}
type Header struct {
ServiceMethod string // format "Service.Method"
Seq uint64 // sequence number chosen by client
Error string
}
type request struct {
h *codec.Header // header of request
argv, replyv reflect.Value // argv and replyv of request
mtype *methodType
svc *service
}
type methodType struct {
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint64
}
type service struct {
name string
typ reflect.Type
rcvr reflect.Value
method map[string]*methodType
}
客户端在和服务端建立连接后,会go一个receive协成,专门负责接受服务端回复的远程调用结果。客户端每发出一个远程调用请求,都会单独go一个send协程,并且创建一个call对象,其中的Done字段用于send协程和主线程的同步。客户端维护一个map,用来记录所有未收到回复的call对象。
type Call struct {
Seq uint64
ServiceMethod string // format "<service>.<method>"
Args interface{
} // arguments to the function
Reply interface{
} // reply from the function
Error error // if error occurs, it will be set
Done chan *Call // Strobes when call is complete.
}
type Server struct {
serviceMap sync.Map
}
reflect.Method 类型的方法的调用过程:
f := m.method.Func
returnValues := f.Call([]reflect.Value{
s.rcvr, argv, replyv})
service 对应一个类, 其中的rcvr对应类函数的reciever,. map中的所有方法必须在原类中是可导出的,并且满足只有arg和reply两个参数。
服务器启动时,手动把能对外提供服务的结构体注册到Server的一个线程安全的map(serviceMap)中, map中每一个service都保存了自己的实例和方法列表,方法列表中的每一项methodType存储了反射后的方法,反射后的输入参数ArgType和返回参数ReplyType。
客户端和服务器建立连接后,先发来一个option,用来协商编码方式(gob 或json),接着连续先后发header和body. header中制定了结构体名和方法名(A.method1), 然后服务器据此找到对应的service和methodtype,并利用其中的ArgType来解码body中的输入参数并存入argv,然后组合service,methodtype,argv, 和一个初始化的ReplyType成一个request ,交给call函数进行方法的调用和返回。
服务器每收到一个连接建立请求就go一个ServeConn协程,该协程负责解析对应客户端的所有请求,每解析完成一个请求就go一个handleRequest协程负责处理请求和发送回复,这里handleRequest协程和ServeConn协程用waitgroup来同步,所以handleRequest协程开始前调用wg.add,结束或意外退出前调用wg.done.
在创建一个新协程时,可以传入一个context.Context,用于在协程的外部利用context中的done管道来控制这个协程,协程内部又可以封装context(比如添加计时器)并层层传递给子协程。 只要某一层的context关闭,后面所有层的context的done都会受到关闭信号。
注册中心的实现很简单,它负责维护一个server地址数组,每当server发来一个心跳包就更新server数组,并定期清理数组中很久没更新的地址。xclient是可以进行负载均衡的客户端对象,当进行远程调用时,首先从注册中心拉取server数组,然后利用robin选择一个地址,再生成一个普通的client对象(已存在就相同地址的client就跳过)。
网盘项目
实现网盘共享列表,秒传等功能
-
mysql 中存在 三个表
用户信息表 (user_info):
记录用户账户密码信息,密码用MD5保存
文件信息表(不包含文件名):
文件在fastdfs上的路径,大小,md5值,和引用计数
用户文件列表:
文件所属用户,文件名,md5,下载量,共享状态,创建时间
共享文件列表:
文件所属用户,md5, 文件名,文件下载量,共享时间 -
秒传
客户端传来文件名和md5, 先利用md5查找文件信息表,
如果存在则进入下一步: 利用md5和文件名和用户名的组合
查询用户文件列表,如果存在则不需要秒传否则进入秒传步
骤:在用户文件列表中加入这一项,并且在文件信息表中的增
加文件的引用计数 -
上传
由客户端提供md5:Content-Disposition: form-data; user="mike"; filename="xxx.jpg"; md5="xxxx"; size=10240\r\n
服务要等nginx把用户上传的文件全部转发过来后再转存到fastdfs中。之后把fastdfs返回的信息写入mysql中
-
分享
- 先判断此文件是否已经分享, 利用文件名+md5的方式查询redis
- 利用判断集合有没有这个文件,如果有,说明别人已经分享此文件(redis操作)
因为客户端此时已经通过从用户文件列表获取到的文件信息知道此时文件的分享状态,所以不会重复发出分享请求 - 如果集合没有此元素,可能因为redis中没有记录,再从mysql中查询(利用md5, filename 查询共享文件列表),如果mysql也没有,说明真没有(mysql操作)
- 如果mysql有记录,而redis没有记录,说明redis没有保存此文件,redis保存此文件信息后,再中断操作
- 此时都没有记录,在用户文件列表中(user, md5, filename)查询并更新共享标志 ,更新共享文件列表,redis集合增加此项,更新一项文件名+md5到文件名的映射
-
删除
和分享一样,依次查询redis集合和用户文件列表判断共享状态,有则删除redis中的项,共享列表中和用户文件列表中的项(user, md5, filename),文件信息表中==(MD5)==引用计数减1, 如果计数为0,则删除此项和fastdfs中的数据 -
分享列表
-
排序
利用redis的有序集合zset保存下载量key用的是分享文件时用一样的集合
mysql共享文件数量和redis共享文件数量对比,判断是否相等
如果不相等,清空redis数据,从mysql中导入数据到redis (mysql和redis交互)
从redis读取数据,给前端反馈相应信息 -
下载
直接用url下载,用另外的api单独更新共享文件下载量:
先更新mysql,再更新redis。
-
mysql和redis 一致性
-
延时双删策略
具体的步骤就是:
先删除缓存;再写数据库;休眠500毫秒;再次删除缓存。
那么,这个500毫秒怎么确定的,具体该休眠多久呢?
需要评估自己的项目的读数据业务逻辑的耗时。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。
当然这种策略还要考虑redis和数据库主从同步的耗时。最后的的写数据的休眠时间:则在读数据业务逻辑的耗时基础上,加几百ms即可。比如:休眠1秒。
设置缓存过期时间
从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。所有的写操作以数据库为准,只要到达缓存过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。
该方案的弊端
结合双删策略+缓存超时设置,这样最差的情况就是在超时时间内数据存在不一致,而且又增加了写请求的耗时。
-
异步更新缓存
读取binlog后分析 ,利用消息队列,推送更新各台的redis缓存数据。这样一旦MySQL中产生了新的写入、更新、删除等操作,就可以把binlog相关的消息推送至Redis,Redis再根据binlog中的记录,对Redis进行更新。
spark 简介
我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。
在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批Task,然后将这些Task分配到各个Executor进程中执行。Task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个Task处理的数据不同而已。一个stage的所有Task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的Task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。
Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个Task可能都会从上一个stage的Task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个Task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
因此Executor的内存主要分为三块:第一块是让Task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让Task通过shuffle过程拉取了上一个stage的Task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。
Task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个Task,都是以每个Task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的Task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些Task线程。
以上就是Spark作业的基本运行原理的说明.