该文章内容根据mysql官方文档内容进行整理、验证(所有图片都摘自官网)
mysql通信协连接阶段主要内容包括:
交换客户端和服务端的信息
如果客户端设置了SSL,设置SSL通信通道
服务端根据客户端返回的数据进行用户验证
客户端开始连接服务端时,服务端可以发送ERR_Packet完成握手或发送客户端初始握手需要的数据包,在该阶段客户端还可以请求SSL连接,在身份验证之前建立SSL通信通道。
在初次握手时服务端会发送用于验证的方法,客户端根据收到的信息进行数据包组装返回,整个连接过程直到服务端返回ERR_Packet或OK_Packet结束,整个过程见下图(摘自官网):
初次握手:
初次握手从服务器发送Protocol::Handshake数据包开始,在此之后,客户端可以请求使用Protocol::SSLRequest包建立SSL连接,或者客户端直接发送Protocol::HandshakeResponse包,下面分别介绍两种连接方式的情况
ssl 连接:
服务端发送Protocol::Handshake数据包
客户端回复Protocol::SSLRequest包
服务端设置ssl通道
客户端发送Protocol::HandshakeResponse数据包
普通连接:
服务端发送Protocol::Handshake数据包
客户端发送Protocol::HandshakeResponse数据包
握手时当然不可能都是一帆风顺的,在这途中会有各种情况发生,比如身份验证失败、密码验证方法不符合、客服端未按要求返回验证方法等,这里只介绍密码验证方法错误的情况,想详细了解见官网文档:
客户端连接到服务端
服务端发送Protocol::Handshake
客户端返回Protocol::HandshakeResponse
服务端发送Protocol::AuthSwitchRequest 告诉客户端它需要切换到新的身份验证方法。
客户端和服务端可能根据客户端进行身份验证的方法,要求交换更多数据包。
服务端发送OK_Packet或ERR_Packet数据包
可以看出该情况比一步完成连接的情况多了Protocol::AuthSwitchRequest和客户端重新回复包的两步,当然在这其中也可能继续发生该异常或其他异常,需要继续交换数据包,下面来看下Protocol::Handshake和Protocol::HandshakeResponse两个数据包的组装情况,mysql通信协议中的所有包会有4bytes的head,前3bytes记录包的大小,第4位置的1byts记录数据包的顺序id,客户端和服务端的一次协议交互直到结束,这途中所有数据包的id都是顺序增长,所以下面介绍的结构都不包含这4bytes需要先明白:
Protocol::Handshake:
图中我们可以很清楚的看到包中包含有协议版本、mysql版本、当前连接的id、默认的字符集还有一些需要交换的信息,而密码验证方法的内容分成了两个部分,这可能是出于安全考虑吧.....
Protocol::HandshakeResponse:
Response包有Protocol::HandshakeResponse320和Protocol::HandshakeResponse41,因为Protocol::HandshakeResponse320是4.1以下版本的协议包,所以只关注了Protocol::HandshakeResponse41包的内容,毕竟现在mysql使用的主流都是5.7了,更多详细内容还是看官方文档吧,毕竟篇幅有限,这里主要说下密码的加密方式,8.0默认密码验证方式为caching_sha2_password,还没来得及认真研究,这里讲讲我们普遍使用的加密方式:mysql_navicat_password:
上面看到Protocol::Handshake有发送两部分的auth-plugin-data,这两部分数据起到加密用户密码的作用,加密方式如下:
SHA1( password ) XOR SHA1( "20-bytes random data from server" <concat> SHA1( SHA1( password ) ) )
假如服务端未发送auth-plugin-data数据将只使用SHA1(password) ,如果有发送该部分数据就将结合这部分数据对密码进行加密,这样可以让嗅探工具不能直接查看到密码,起到一定的安全作用
mysql协议中连接的建立大概内容基本就这些,还是需要自己动手来验证,下面写了一个小脚本,直接看代码吧,只做了常规的步骤,过程中的各种异常并未进行处理,仅供参考:
#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @author: xiao cai niao ''' import struct,sys from socket import * from contextlib import closing import hashlib,os from functools import partial sha1_new = partial(hashlib.new, 'sha1') SHA1_HASH_SIZE = 20 MULTI_RESULTS = 1 << 17 SECURE_CONNECTION = 1 << 15 CLIENT_PLUGIN_AUTH = 1 << 19 CLIENT_CONNECT_ATTRS = 1<< 20 CLIENT_PROTOCOL_41 = 1 << 9 CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 1<<21 LONG_PASSWORD = 1 LONG_FLAG = 1 << 2 PROTOCOL_41 = 1 << 9 TRANSACTIONS = 1 << 13 CAPABILITIES = ( LONG_PASSWORD | LONG_FLAG | PROTOCOL_41 | TRANSACTIONS | SECURE_CONNECTION | MULTI_RESULTS | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_ATTRS) CLIENT_CONNECT_WITH_DB = 9 max_packet_size = 2 ** 24 - 1 charset_id = 45 class TcpClient: def __init__(self,host_content,user_name,password,databases): _host_content = host_content.split(':') self.user = user_name self.password = password self.database = databases HOST = _host_content[0] PORT = int(_host_content[1]) self.BUFSIZ = 1024 self.ADDR = (HOST, PORT) self.client=socket(AF_INET, SOCK_STREAM) self.client.connect(self.ADDR) self.client.settimeout(1) self.server_packet_info = {} self.packet = None def header(self): self.offset = 0 self.payload_length = self.packet[2] << 16 | self.packet[1] << 8 | self.packet[0] self.sequence_id = self.packet[3] self.offset += 4 def check_packet(self): self.header() packet_header = self.packet[self.offset] self.offset += 1 if packet_header == 0x00: print('connection ok') elif packet_header in (0xfe,0xff): print(self.packet[self.offset:]) def Send(self): self.packet=self.client.recv(self.BUFSIZ) self.header() self.__read_server_info() self.__handshakeresponsepacket() response_payload = len(self.response_packet) self.client.send(struct.pack('<I',response_payload)[:3] + struct.pack('!B',1) + self.response_packet) self.packet = self.client.recv(self.BUFSIZ) self.header() packet_header = self.packet[self.offset] self.offset += 1 if packet_header == 0xff: error_code = struct.unpack('<H', self.packet[self.offset:self.offset + 2]) self.offset+= 2 print(error_code,self.packet[self.offset:]) elif packet_header == 0xfe: """AuthSwitchRequest""" _data = self.__authswitchrequest() self.client.send(struct.pack('<I', len(_data))[:3] + struct.pack('!B', 3) + _data) self.packet = self.client.recv(self.BUFSIZ) self.check_packet() elif packet_header in (0x00,0xfe): if self.payload_length > 7: print('ok packet') elif self.payload_length < 9: print('error packet') print(self.server_packet_info) """在这里停留一段时间,在mysql查看连接是否正常""" import time time.sleep(1000) def __authswitchrequest(self): end_pos = self.packet.find(b'\0', self.offset) auth_name = self.packet[self.offset:end_pos].decode() self.offset = end_pos + 1 auth_plugin_data = self.packet[self.offset:] if self.server_packet_info['capability_flags'] & CLIENT_PLUGIN_AUTH and auth_name: data = self.__sha1_password(auth_plugin_data) return data def __read_server_info(self): PLUGIN_AUTH = 1 << 19 #数据包内容 self.server_packet_info['packet_header'] = self.packet[self.offset] self.offset += 1 _s_end = self.packet.find(b'\0', self.offset) self.server_packet_info['server_version'] = self.packet[self.offset:_s_end] self.offset = _s_end + 1 self.server_packet_info['thread_id'] = struct.unpack('<I',self.packet[self.offset:self.offset+4]) self.offset += 4 self.server_packet_info['auth_plugin_data'] = self.packet[self.offset:self.offset+8] self.offset += 8 + 1 self.server_packet_info['capability_flags'] = struct.unpack('<H',self.packet[self.offset:self.offset+2])[0] self.offset += 2 self.server_packet_info['character_set_id'],\ self.server_packet_info['status_flags'],\ capability_flags_2,auth_plugin_data_len = struct.unpack('<BHHB',self.packet[self.offset:self.offset+6]) self.server_packet_info['capability_flags'] |= capability_flags_2 << 16 self.offset += 6 self.offset += 10 auth_plugin_data_len = max(13,auth_plugin_data_len-8) if len(self.packet) - 4 >= self.offset + auth_plugin_data_len: # salt_len includes auth_plugin_data_part_1 and filler self.server_packet_info['auth_plugin_data'] += self.packet[self.offset:self.offset + auth_plugin_data_len] self.offset += auth_plugin_data_len if self.server_packet_info['capability_flags'] & PLUGIN_AUTH and len(self.packet) - 4 >= self.offset: _s_end = self.packet.find(b'\0',self.offset) self.server_packet_info['auth_plugin_name'] = self.packet[self.offset:_s_end] def __handshakeresponsepacket(self): client_flag = 0 client_flag |= CAPABILITIES if self.database: client_flag |= CLIENT_CONNECT_WITH_DB server_version = (self.server_packet_info['server_version']).decode() if int(server_version.split('.', 1)[0]) >= 5: client_flag |= MULTI_RESULTS self.response_packet = struct.pack('<iIB23s',client_flag,max_packet_size,charset_id,b'') self.response_packet += self.user.encode() + b'\0' sha1_password = self.__sha1_password() if self.server_packet_info['capability_flags'] & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA: self.response_packet += struct.pack('!B',len(sha1_password)) + sha1_password elif self.server_packet_info['capability_flags'] & SECURE_CONNECTION: self.response_packet += struct.pack('B',len(sha1_password)) + sha1_password else: self.response_packet += sha1_password + b'\0' if self.server_packet_info['capability_flags'] & CLIENT_CONNECT_WITH_DB: self.response_packet += self.database.encode() + b'\0' if self.server_packet_info['capability_flags'] & CLIENT_PLUGIN_AUTH: self.response_packet += b'' + b'\0' if self.server_packet_info['capability_flags'] & CLIENT_CONNECT_ATTRS: _connect_attrs = { '_client_name': 'pymysql', '_pid': str(os.getpid()), '_client_version': '3.6.5', 'program_name' : sys.argv[0] } connect_attrs = b'' for k, v in _connect_attrs.items(): k = k.encode('utf8') connect_attrs += struct.pack('B', len(k)) + k v = v.encode('utf8') connect_attrs += struct.pack('B', len(v)) + v self.response_packet += struct.pack('B', len(connect_attrs)) + connect_attrs def __sha1_password(self,auth_plugin_data=None): _pass1 = sha1_new(self.password.encode()).digest() _pass2 = sha1_new(_pass1).digest() s = sha1_new() if auth_plugin_data is None: s.update(self.server_packet_info['auth_plugin_data'][:SHA1_HASH_SIZE]) else: s.update(auth_plugin_data[:SHA1_HASH_SIZE]) s.update(_pass2) t = bytearray(s.digest()) for i in range(len(t)): t[i] ^= _pass1[i] return t def close(self): self.client.close() with closing(TcpClient('192.168.10.12:3306','root','root','sys')) as tcpclient: tcpclient.Send()
技术交流Q群(479472450)及个人公众号会不定期分享学习成果: