微信数据小助手
使用前先看 一步一步教你用wechaty+百度云主机打造一个带你穿越星际的微信机器人aistudio.baidu.com/aistudio/pr…, 基础配置都一样。
1.简介
你想导出喜欢得表情包嘛?自定义的,购买的,想看怎么办?导不出来看不了?重要文件能不能默认接受?发的语音、视频能不能自动保存?聊天记录自动保存?那么微信数据小助手来了,可以根据发送的文件类型,按原文件名保存,聊天记录也可以保存下来,完美满足各类需求。
2.功能
- 保存接收的图片到本地
- 保存接收的表情到本地
- 保存接收的语音到本地
- 保存接收的视频到本地
- 保存接收的文件到本地
- 保存接收的聊天记录到本地
3.外挂文件管理系统
通过文件管理系统挂载数据目录,可提供WEB操作界面
4.图片、视频展示
v0.2版本
v0.1版本
5.实现代码
import asyncio
import logging
from typing import Optional, Union
import os
from wechaty_puppet import FileBox, ScanStatus # type: ignore
from wechaty_puppet import MessageType
from wechaty import Wechaty, Contact
from wechaty.user import Message, Room
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
import time
class MyBot(Wechaty):
"""
listen wechaty event with inherited functions, which is more friendly for
oop developer
"""
def __init__(self):
super().__init__()
async def on_message(self, msg: Message):
"""
listen for message event
"""
from_contact = msg.talker()
text = msg.text()
room = msg.room()
if room:
await room.ready()
# 保存图片
if msg.type() == MessageType.MESSAGE_TYPE_IMAGE:
img = await msg.to_file_box()
# save the image as local file
await img.to_file(os.path.join('img', img.name))
# 保存视频
elif msg.type() == MessageType.MESSAGE_TYPE_VIDEO:
video = await msg.to_file_box()
# save the video as local file
await video.to_file(os.path.join('video', video.name))
# 保存语音
elif msg.type() == MessageType.MESSAGE_TYPE_AUDIO:
audio = await msg.to_file_box()
# save the audio file as local file
await audio.to_file(os.path.join('audio', audio.name))
# 保存表情
elif msg.type() == MessageType.MESSAGE_TYPE_EMOTICON:
emoticon = await msg.to_file_box()
# save the audio file as local file
await emoticon.to_file(os.path.join('emoticon', emoticon.name))
# 保存文件
elif msg.type() == MessageType.MESSAGE_TYPE_ATTACHMENT:
attachment = await msg.to_file_box()
# save the audio file as local file
await attachment.to_file(os.path.join('file', attachment.name))
# 保存聊天记录
elif msg.type() == MessageType.MESSAGE_TYPE_TEXT:
txt = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ',' + from_contact.name + ',' + text + '\n'
print(txt)
with open('chat_log.csv', 'a', encoding='utf-8') as f:
f.write(txt)
print('done')
async def on_login(self, contact: Contact):
log.info(f'user: {contact} has login')
async def on_scan(self, status: ScanStatus, qr_code: Optional[str] = None,
data: Optional[str] = None):
contact = self.Contact.load(self.contact_id)
await contact.ready()
print(f'user <{contact}> scan status: {status.name} , '
f'qr_code: {qr_code}')
bot: Optional[MyBot] = None
async def main():
"""doc"""
# pylint: disable=W0603
global bot
bot = MyBot()
await bot.start()
asyncio.run(main())
复制代码