Redis Protocol即为client与server交互时,所使用的数据格式;符合格式的数据能够被server端解析并返回结果,client端如果按照格式要求既可以解析“结果”并将结构化数据反馈给调用者。有些时候,我们可以通过改造协议的方式构建redis-client套层或者server端的Proxy。
Redis-Client与Server之间进行的任何通讯,均是通过普通的TCP链接进行,因为TCP通讯是面向字节流的,因此它和其他任何基于字节流的信息交互的平台一样,需要“protocol”(协议);任何protocol本身需要至少具备2种能力:1)字节码成帧策略 2)字符序列格式约束。
“字节码成帧”被广泛的应用在基于字节流的网络交互中,以TCP通讯为例,在Socket通讯中所发送的数据(特别是长链接,持续交付packet的场景下),对于socket的一端都需要知道每个packet的“终止符”位置,以及packet中每个数据field的偏移量;只有这样,对于socket的数据的接收端才能将“无边界流数据”有效的转化成结构化/可读的字符序列。如下展示一个packet帧的结构,其中"[""]"只是为了标记参数:
[magic-header][packet-bytes-length][field-name-bytes-length][field-name-bytes][field-data-bytes-length][field-data-bytes]
实际数据大概为:
[“magic-header”][10][4]["name".getBytes()][6]["012345".getBytes()]
不同的设计者可能考虑成帧的方式不同,但是都需要描述当前packet字节的长度/filed顺序/每个filed的字节长度等。上述例子则表达:此packet需要以“magic-header”开头(主要用来防止字节流被意外破坏或者乱序,同时用来表示一个新帧的开始),此后的总字节长度为10,其中“name”这个filed名称占4个字节,“name”这个filed对应的数据占6个字节;最终我们还原成name=012345这么一个信息。
“字符序列格式约束”即将字符串按照一定的规则进行解析,并获得有效数据;其中“规则”就是格式约束,对于符合规则的字符串才能被接受和实施,否则将会被丢弃。比如xml,json等等,只有符合格式约束的字符串才能被相应的引擎所解析。
Redis协议非常简单且容易理解,request和reply数据都遵循同一个协议;对于client端的每个request,最终会被描述为一个command,command所包含的信息只会包括:指令名称 + 参数列表。那么对于server端的response,最终会被描述为一个result,那么result可能包括此次操作的状态(status)码、错误信息、结果内容等。协议就是为双端交互中的数据格式提供约束。
1. Request部分:对于指令操作[command-name][arg...]最终将会转换成如下格式在网络中传输并交付给server端:
*[参数数量] \r\n $[参数字节个数]\r\n [参数字节序列] ... ##以“SET key value”指令为例: *3 #表示有3个参数 $3 #表示“参数”有三个字节("SET"字符串为3个字节) SET $4 name $5 01234 ##流的方式 *3\r\n$3\r\nSET\r\n$4\r\nname\r\n$5\r\n01234\r\n
2.Reply部分:server端需要返回结果的类型,redis中,reply的首个字符用来表示结果的类型,最终以“\r\n”结束.
- “+”:“状态”类型reply,对于无需实际数据返回的相应,只是用来表示此次操作成功与否,例如SET KEY-VALUE指令。比如“+OK”表示操作成功,如果不成功将会返回“-ERROR”。
- “-”:“异常”信息类型reply,对于操作失败时,将会返回此类型的信息告知client端。
lrange testtt w x -ERR value is not an integer or out of range
在“-”之后为“错误类型”,此后为一个空格或者“\r\n”,然后为异常信息内容,内容可能有多行。 - “:”:表示返回结果为integer类型,此结果只包括一个数字,也可以用来表示true/false的结果类型,比如INCR/DECR/EXISTS/SISMEMBER等指令;需要注意的是,integer结果也是按照“字符串”方式传输的,你不能按照“4个字节=integer”的思路去使用它。比如integer = 23456,那么“23456”实际上是5个字节
exists fck-me :0 exists fck-you :1
- “$”:表示返回的结果为普通数据结果,格式为:[$][字节个数][\r\n][字节序列][\r\n],如果结果中存在$-1则表示当前请求的数据为“null”。
get fck-you $3 123
- “*”:表示返回的结果为复合数据结果,结果中包括多个子数据集合。
lrange testlist 0 -1 *2 $1 b $1 a
qing@qing-tp:~$ telnet 127.0.0.1 6379 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. SET test 1 +OK incr test :2 exist test -ERR unknown command 'exist'3.协议与实践:在下文的代码中,我们将通过代码展示如何直接通过Socket-IO的方式直接与redis-server交互,可以比较直观的明白protocol数据格式;其实,可以认为它是一个java redis-client,不过我本人采取了和Jedis不同的请求处理手段,本人采取了“请求队列 + 同步”的方式进行,其实如下代码可以在极少的修改下,改变为“异步”的方式。在此需要提醒:protocol中的字节流需要UTF-8编码之后。 TestMain.java
public class TestMain { public static void main(String[] args){ Client client = new Client("127.0.0.1", 6379); client.set("testset", "012xyz中国_?"); System.out.println(client.get("testset")); List<String> list = client.lrange("testlist", 0, -1); for(String item : list){ System.out.println("--:" + item); } System.out.println("incr:" + client.incr("testincr")); } }Client.java (完整代码,参见附件)
public class Client { private BlockingQueue<Request> requests = new LinkedBlockingQueue<Request>(); private Charset charset = Charset.forName("utf-8"); private Handler handler; Client(String host,int port){ handler = new Handler(host, port); handler.setDaemon(true); handler.start(); } public void set(String key,String value){ Request request = new Request(Command.SET, key,value); try{ synchronized (request) { requests.put(request); request.wait(); } Reply reply = request.reply; if(reply == null || reply.code == -1 || !reply.success){ throw new RuntimeException("operation fail.."); } }catch(InterruptedException e){ return; } } public String get(String key){ Request request = new Request(Command.GET, key); try{ synchronized (request) { requests.put(request); request.wait(); } Reply reply = request.reply; if(reply == null || reply.code == -1 || !reply.success){ throw new RuntimeException("operation fail.."); } return reply.result; }catch(InterruptedException e){ return null; } } public List<String> lrange(String key,int from,int to){ Request request = new Request(Command.LRANGE, key,String.valueOf(from),String.valueOf(to)); try{ synchronized (request) { requests.put(request); request.wait(); } Reply reply = request.reply; if(reply == null || reply.code == -1 || !reply.success){ throw new RuntimeException("operation fail.."); } return reply.lresult; }catch(InterruptedException e){ return null; } } public Integer incr(String key){ Request request = new Request(Command.INCR,key); try{ synchronized (request) { requests.put(request); request.wait(); } Reply reply = request.reply; if(reply == null || reply.code == -1 || !reply.success){ throw new RuntimeException("operation fail.."); } return Integer.valueOf(reply.result); }catch(InterruptedException e){ return null; } } public void close(){ handler.close(); } class Handler extends Thread{ Socket socket = null; boolean closed = false; BufferedReader is = null; OutputStream os = null; String host; int port; Handler(String host,int port){ try{ this.host = host; this.port = port; connect(); }catch(Exception e){ e.printStackTrace(); } } private void connect() throws IOException{ socket = new Socket(); SocketAddress addr = new InetSocketAddress(host,port); socket.setKeepAlive(true); //socket.setSoTimeout(10000); socket.setSoLinger(true,0); //socket.setReceiveBufferSize(1024); socket.setTcpNoDelay(true); socket.connect(addr,10000); //blocking is = new BufferedReader(new InputStreamReader(socket.getInputStream(),charset)); os = socket.getOutputStream(); } public void close(){ closed = true; this.interrupt(); } private void write(Request request) throws IOException{ os.write('*'); String[] args = request.args; os.write(String.valueOf(args.length + 1).getBytes(charset)); os.write('\r'); os.write('\n'); //*2 os.write('$'); byte[] cb = request.command.name().getBytes(charset); os.write(String.valueOf(cb.length).getBytes(charset)); os.write('\r'); os.write('\n'); //$3 os.write(cb); os.write('\r'); os.write('\n'); //GET for(String arg : args){ byte[] ab = arg.getBytes(charset); os.write('$'); os.write(String.valueOf(ab.length).getBytes(charset)); os.write('\r'); os.write('\n'); os.write(ab); os.write('\r'); os.write('\n'); } } @Override public void run(){ try{ while(!closed){ Request request = requests.take(); try{ write(request); char status = (char)is.read(); Reply reply = new Reply(); if(status != '-'){ reply.success = true; } if(status == '+' || status == '-'){ reply.message = read(); }else if(status == '$'){ reply.result = readString(); }else if(status == '*'){ reply.lresult = readMulti(); }else if(status == ':'){ reply.result = read(); }else{ request.reply = new Reply(-1); throw new RuntimeException("packet error.."); } synchronized (request) { request.reply = reply; request.notifyAll(); } }catch(Exception e){ try{ socket.close(); this.connect(); }catch(Exception ex){ // } synchronized (request) { request.notifyAll(); } } } }catch(InterruptedException e){ try{ closed = true; socket.close(); for(Request request : requests){ request.blocker.interrupt(); } requests.clear(); }catch(Exception ex){ // } } } //read line private String read() throws IOException{ StringBuilder sb = new StringBuilder(); //\r\n必须互为成对 //不能直接使用is.readline() boolean lfcr = false; while(true){ char _char = (char)is.read(); if(_char == -1){ close(); break; } //如果上一个字符为\r if(lfcr == true){ if(_char == '\n'){ break; } sb.append('\r'); lfcr = false; } if(_char == '\r'){ lfcr = true; continue; } sb.append(_char); } return sb.toString(); } private List<String> readMulti() throws IOException{ Integer size = Integer.valueOf(read()); List<String> lresult = new ArrayList<String>(); //eg: *3 if(size > 0) { for(int i=0;i<size;i++){ while(true){ char _char = (char)is.read();//$3 if(_char == '$'){ lresult.add(readString()); break; } } } } return lresult; } //such as: //$3 //012 private String readString() throws IOException{ Integer size = Integer.valueOf(read()); //-1 is null if(size > 0){ return read(); } return null; } } }