首先,网页版微信登录大致分为以下几个流程(都是大家可以通过抓包得到):
1、登陆主页后,会生成一个UUID,这是个用户标识,在后面请求二维码会用到
def get_uuid(self): '''获取uuid''' url = 'https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={}'.format(get_time()) response = self.session.get(url).text self.uuid = re.findall(r'uuid = "(.*?)"',response)[0] # 文本较少,用正则匹配即可 return self.uuid
2、请求二维码图片
def qrcode(self): url = 'https://login.weixin.qq.com/qrcode/{}'.format(self.uuid) response = self.session.get(url).content # 请求得到二维码,由于是图片,得到字节码即可 with open ('qrcode.jpg','wb') as f: f.write(response) # 把二维码保存到一张图片里 im = Image.open('qrcode.jpg') im.show() # 把二维码图片展示出来
3、扫描二维码,得到重定向的链接
def get_redirect_uri(self): while True: # 由于需要不断请求,让我们有时间来扫描二维码 url = 'https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={}&tip=0&r=-2109595288&_={}'.format(self.uuid,get_time()) result = self.session.get(url,allow_redirects=False) code = re.findall(r'window.code=(.*?);',result.text)[0] if code == '200': # 没有扫描的时候是400,扫描之后就是200 print('已成功扫描二维码!') break self.redirect_uri = re.findall(r'window.redirect_uri="(.*?)"',result.text)[0] # 得到一个链接,请求之后会得到一些有用的参数
这个url里面有个参数在前面请求应答中没有的,在js中可以找到,大概是个13位的时间戳
4、请求上面得到的链接,得到一些必要的参数(这些参数在后面的登录、收发消息都是必需的),注意这里一定要不允许重定向,因为请求这个url的时候,会跳转到一个初始化的链接,这样我们将不能正确获得这些参数。
def get_require_data(self): result = self.session.get(self.redirect_uri,allow_redirects=False).text # 这里注意一定要去掉重定向 self.skey = re.findall(r'<skey>(.*?)</skey>',result)[0] # 这里用Beautifulsoup也能解析,文本是xml格式的,用匹配标签就可以得到 self.wxsid = re.findall(r'<wxsid>(.*?)</wxsid>',result)[0] self.wxuin = re.findall(r'<wxuin>(.*?)</wxuin>',result)[0] self.pass_ticket = re.findall(r'<pass_ticket>(.*?)</pass_ticket>',result)[0]
5、发送登录请求
def login(self): url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-2109580211&pass_ticket={}'.format(self.pass_ticket) params = { "BaseRequest":{ "Uin":self.wxuin, "Sid":self.wxsid, "Skey":self.skey, "DeviceID":get_DeviceID(), # DeviceID这个参数在前面的应答中没有找到,它是js生成的,通过读js代码,我们可以构造出来 } } result = self.session.post(url,data=json.dumps(params,ensure_ascii=False)) result.encoding = 'utf-8' data = result.json() user = data['User'] nickname = user['NickName'] username = user['UserName'] self.user_list[nickname] = username # 把自己账号的昵称和用户编号保存起来,方便后面收发消息 self.synckey_list = data['SyncKey'] # synckey 用于后面同步消息 print('已成功登录!!') self.synckey = format_synckey(self.synckey_list['List']) # 把 synckey构造成查询字符串的模式
至此,就完成了基本的登陆认证过程,其实大部分的登录都差不多,可能有得是通过验证码和账号密码加密的方式,这些到后面再说
6、得到好友列表并保存起来
def get_userlist(self): '''把用户的好友列表保存起来,方便后面收发消息''' url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?pass_ticket={}&r={}&seq=0&skey={}'.format(self.pass_ticket,get_time(),self.skey) response = self.session.get(url) response.encoding = 'utf-8' result = response.json() memberlist = result['MemberList'] for member in memberlist: nickname = member['NickName'] username = member['UserName'] self.user_list[nickname] = username # 按照昵称-用户编号的一一对应关系保存
下面就到了比较繁琐的环节了,收发消息:
我们在登录的时候就得到一个synckey了,这个东西在每次收消息的请求中都要带上,而且每次收发消息,这个值都会向服务器请求更新一次。那么我们怎么确定正在收发消息呢,微信网页版用的是轮询的方式发送一个请求,这个请求的响应大概是{retcode:”0″,selector:”0″}这样的,如果selector变得不为0了,说明有消息要收发。所以我们也可以通过不断地发送请求,然后判断响应里的selector的值来处理。
1、更新synckey
def get_new_synckey(self): # 得到最新的synckey 发送或者接收消息后都有这个操作 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket) data = {"BaseRequest":{"Uin":self.wxuin,"Sid":self.wxsid,"Skey":self.skey,"DeviceID":get_DeviceID()},"SyncKey":self.synckey_list,"rr":2112925520} response = self.session.post(url,data=json.dumps(data,ensure_ascii=False)) response.encoding = 'utf8' result = response.json() self.synckey_list = result['SyncKey'] self.synckey = format_synckey(self.synckey_list['List'])
2、检查selector值是否不为0
def sync_check(self): # 检查是否有新消息 while True: # 注意这里的synckey是在url里面,所以要进行urlencode,而且synckey改变之后,url也会变,所以url要放在while True里面 url = 'https://webpush.wx.qq.com/cgi-bin/mmwebwx-bin/synccheck?r={}&skey={}&sid={}&uin={}&deviceid={}&{}'.format(get_time(),self.skey,self.wxsid,self.wxuin,get_DeviceID(),self.synckey) response = self.session.get(url) response.encoding = 'utf8' result = response.text # print('正在轮询是否有新消息...') selector = re.findall(r'selector:"(.*?)"',result)[0] if selector != '0': self.get_msg() # 我这里发消息是在另一个线程,收消息在这个线程 self.get_new_synckey() # 每次都需要更新
3、接受消息
def get_msg(self): '''接收消息''' url = ' https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket) data = { "BaseRequest":{ "Uin":self.wxuin, "Sid":self.wxsid, "Skey":self.skey, "DeviceID":get_DeviceID()}, "SyncKey":self.synckey_list, "rr":-2123282759} response = self.session.post(url,data=json.dumps(data)) response.encoding = 'utf-8' result = response.json() self.synckey_list = result['SyncKey'] # 接受消息的时候,响应里面有这个值,在更新synckey的时候,参数和响应里都有synckey self.synckey = format_synckey(self.synckey_list['List']) msglist = result['AddMsgList'] for msg in msglist: if msg['ToUserName'] == self.user_list['XXXX']: # 这里面填你自己微信的昵称,不过加上之后不会受到群消息,大家可以试试不加这个判断 fromName = msg['FromUserName'] for k,v in self.user_list.items(): if v == fromName: fromNickName = k content = msg['Content'] print('来自{}的消息:{}'.format(fromNickName,content))
4、发送消息
def send_msg(self): url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?pass_ticket={}'.format(self.pass_ticket) while True: msg = input('>>>>>') if msg == 'q': break data = { "BaseRequest":{ "Uin":self.wxuin, "Sid":self.wxsid, "Skey":self.skey, "DeviceID":get_DeviceID()}, "Msg":{ "Type":1, "Content":msg, "FromUserName":self.user_list['xxx'], # 这里填你自己的昵称 "ToUserName":self.user_list['xxx'], # 这里填你想发送消息的好友的昵称,你也可以用input键盘输入的方式 "LocalID":get_time(), "ClientMsgId":get_time()}, "Scene":0 } self.session.post(url,data=(json.dumps(data,ensure_ascii=False)).encode('utf-8')) self.get_new_synckey() # 发送消息之后也要更新synckey
5、把发送消息和接受消息写成多线程的方式
大致流程就是这样啦!下面展示全部代码:
1 import time 2 import re 3 import random 4 import requests 5 import urllib3 6 import json 7 from urllib import parse 8 from PIL import Image 9 from threading import Thread 10 11 urllib3.disable_warnings() 12 13 14 def get_time(): 15 return str(int(time.time()*1000)) 16 17 def get_DeviceID(): 18 return 'e'+str(round(random.random(),15))[2:17] 19 20 def format_synckey(synckey_list): 21 ''' 22 把列表形式转成查询字符串 23 ''' 24 tem_synckey = '' 25 for synckey in synckey_list: 26 tem_synckey += str(synckey['Key']) + '_' +str(synckey['Val']) + '|' 27 new_synckey = {'synckey':tem_synckey.rstrip('|')} 28 return parse.urlencode(new_synckey) 29 30 class WeChat(): 31 def __init__(self): 32 headers = { 33 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36' 34 } # 请求头信息 35 proxies = { 36 'http': '192.168.105.71:80', 37 'https': '192.168.105.71:80' 38 } # 使用代理 39 self.user_list = {} 40 self.session = requests.session() 41 self.session.headers = headers 42 self.session.proxies = proxies 43 self.session.verify = False 44 45 def get_uuid(self): 46 '''获取uuid''' 47 url = 'https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={}'.format(get_time()) 48 response = self.session.get(url).text 49 self.uuid = re.findall(r'uuid = "(.*?)"',response)[0] # 文本较少,用正则匹配即可 50 return self.uuid 51 52 def qrcode(self): 53 url = 'https://login.weixin.qq.com/qrcode/{}'.format(self.uuid) 54 response = self.session.get(url).content # 请求得到二维码,由于是图片,得到字节码即可 55 with open ('qrcode.jpg','wb') as f: 56 f.write(response) # 把二维码保存到一张图片里 57 im = Image.open('qrcode.jpg') 58 im.show() # 把二维码图片展示出来 59 60 def get_redirect_uri(self): 61 while True: # 由于需要不断请求,让我们有时间来扫描二维码 62 url = 'https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={}&tip=0&r=-2109595288&_={}'.format(self.uuid,get_time()) 63 result = self.session.get(url,allow_redirects=False) 64 code = re.findall(r'window.code=(.*?);',result.text)[0] 65 if code == '200': # 没有扫描的时候是400,扫描之后就是200 66 print('已成功扫描二维码!') 67 break 68 self.redirect_uri = re.findall(r'window.redirect_uri="(.*?)"',result.text)[0] # 得到一个链接,请求之后会得到一些有用的参数 69 70 def get_require_data(self): 71 result = self.session.get(self.redirect_uri,allow_redirects=False).text # 这里注意一定要去掉重定向 72 self.skey = re.findall(r'<skey>(.*?)</skey>',result)[0] # 这里用Beautifulsoup也能解析,文本是xml格式的,用匹配标签就可以得到 73 self.wxsid = re.findall(r'<wxsid>(.*?)</wxsid>',result)[0] 74 self.wxuin = re.findall(r'<wxuin>(.*?)</wxuin>',result)[0] 75 self.pass_ticket = re.findall(r'<pass_ticket>(.*?)</pass_ticket>',result)[0] 76 77 def login(self): 78 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-2109580211&pass_ticket={}'.format(self.pass_ticket) 79 params = { 80 "BaseRequest":{ 81 "Uin":self.wxuin, 82 "Sid":self.wxsid, 83 "Skey":self.skey, 84 "DeviceID":get_DeviceID(), # DeviceID这个参数在前面的应答中没有找到,它是js生成的 85 } 86 } 87 result = self.session.post(url,data=json.dumps(params,ensure_ascii=False)) 88 result.encoding = 'utf-8' 89 data = result.json() 90 user = data['User'] 91 nickname = user['NickName'] 92 username = user['UserName'] 93 self.user_list[nickname] = username # 把自己账号的昵称和用户编号保存起来,方便后面收发消息 94 self.synckey_list = data['SyncKey'] # synckey 用于后面同步消息 95 print('已成功登录!!') 96 self.synckey = format_synckey(self.synckey_list['List']) # 把 synckey构造成查询字符串的模式 97 98 99 100 def get_userlist(self): 101 '''把用户的好友列表保存起来,方便后面收发消息''' 102 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?pass_ticket={}&r={}&seq=0&skey={}'.format(self.pass_ticket,get_time(),self.skey) 103 response = self.session.get(url) 104 response.encoding = 'utf-8' 105 result = response.json() 106 memberlist = result['MemberList'] 107 for member in memberlist: 108 nickname = member['NickName'] 109 username = member['UserName'] 110 self.user_list[nickname] = username # 按照昵称-用户编号的一一对应关系保存 111 112 def get_new_synckey(self): # 得到最新的synckey 发送或者接收消息后都有这个操作 113 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket) 114 data = {"BaseRequest":{"Uin":self.wxuin,"Sid":self.wxsid,"Skey":self.skey,"DeviceID":get_DeviceID()},"SyncKey":self.synckey_list,"rr":2112925520} 115 response = self.session.post(url,data=json.dumps(data,ensure_ascii=False)) 116 response.encoding = 'utf8' 117 result = response.json() 118 self.synckey_list = result['SyncKey'] 119 self.synckey = format_synckey(self.synckey_list['List']) 120 121 def sync_check(self): # 检查是否有新消息 122 while True: 123 # 注意这里的synckey是在url里面,所以要进行urlencode,而且synckey改变之后,url也会变,所以url要放在while True里面 124 url = 'https://webpush.wx.qq.com/cgi-bin/mmwebwx-bin/synccheck?r={}&skey={}&sid={}&uin={}&deviceid={}&{}'.format(get_time(),self.skey,self.wxsid,self.wxuin,get_DeviceID(),self.synckey) 125 response = self.session.get(url) 126 response.encoding = 'utf8' 127 result = response.text 128 # print('正在轮询是否有新消息...') 129 selector = re.findall(r'selector:"(.*?)"',result)[0] 130 if selector != '0': 131 self.get_msg() # 我这里发消息是在另一个线程,收消息在这个线程 132 self.get_new_synckey() # 每次都需要更新 133 134 135 136 def send_msg(self): 137 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?pass_ticket={}'.format(self.pass_ticket) 138 while True: 139 msg = input('>>>>>') 140 if msg == 'q': 141 break 142 data = { 143 "BaseRequest":{ 144 "Uin":self.wxuin, 145 "Sid":self.wxsid, 146 "Skey":self.skey, 147 "DeviceID":get_DeviceID()}, 148 "Msg":{ 149 "Type":1, 150 "Content":msg, 151 "FromUserName":self.user_list['xxx'], # 这里填你自己微信昵称 152 "ToUserName":self.user_list['xxx'], # 这里填你想发送消息的好友的昵称,你也可以用input键盘输入的方式 153 "LocalID":get_time(), 154 "ClientMsgId":get_time()}, 155 "Scene":0 156 } 157 158 self.session.post(url,data=(json.dumps(data,ensure_ascii=False)).encode('utf-8')) 159 self.get_new_synckey() # 发送消息之后也会更新synckey 160 161 162 163 def get_msg(self): 164 '''接收消息''' 165 url = ' https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket) 166 data = { 167 "BaseRequest":{ 168 "Uin":self.wxuin, 169 "Sid":self.wxsid, 170 "Skey":self.skey, 171 "DeviceID":get_DeviceID()}, 172 "SyncKey":self.synckey_list, 173 "rr":-2123282759} 174 response = self.session.post(url,data=json.dumps(data)) 175 response.encoding = 'utf-8' 176 result = response.json() 177 self.synckey_list = result['SyncKey'] # 接受消息的时候,响应里面有这个值,在更新synckey的时候,参数和响应里都有synckey 178 self.synckey = format_synckey(self.synckey_list['List']) 179 msglist = result['AddMsgList'] 180 for msg in msglist: 181 if msg['ToUserName'] == self.user_list['XXXX']: # 这里面填你自己微信的昵称,不过加上之后不会受到群消息,大家可以试试不加这个判断 182 fromName = msg['FromUserName'] 183 for k,v in self.user_list.items(): 184 if v == fromName: 185 fromNickName = k 186 content = msg['Content'] 187 print('来自{}的消息:{}'.format(fromNickName,content)) 188 189 def main(self): 190 self.get_uuid() 191 self.qrcode() 192 self.get_redirect_uri() 193 self.get_require_data() 194 self.login() 195 self.get_userlist() 196 send_msg = Thread(target=self.send_msg) 197 recieve_msg = Thread(target=self.sync_check) 198 send_msg.start() 199 recieve_msg.start() 200 send_msg.join() 201 recieve_msg.join() 202 203 204 if __name__ == '__main__': 205 wechat = WeChat() 206 wechat.main()