微信接入机器人实现对别人消息和群at消息的自动回复
微信接入机器人实现对别人消息和群at消息的自动回复
有时候,我们想让我们的微信号对别人发出的各种消息做出回复。我们可以通过接入图灵机器人的方式实现。
- IDLE编写py文件并保存,命名为wxbot。
#!/usr/bin/env python
# coding: utf-8
import os
import sys
import webbrowser
import pyqrcode
import requests
import json
import xml.dom.minidom
import urllib
import time
import re
import random
from requests.exceptions import ConnectionError, ReadTimeout
import HTMLParser
UNKONWN = ‘unkonwn‘
SUCCESS = ‘200‘
SCANED = ‘201‘
TIMEOUT = ‘408‘
def show_image(file):
"""
跨平台显示图片文件
:param file: 图片文件路径
"""
if sys.version_info >= (3, 3):
from shlex import quote
else:
from pipes import quote
if sys.platform == "darwin":
command = "open -a /Applications/Preview.app %s&" % quote(file)
os.system(command)
else:
webbrowser.open(file)
class WXBot:
"""WXBot功能类"""
def __init__(self):
self.DEBUG = False
self.uuid = ‘‘
self.base_uri = ‘‘
self.redirect_uri = ‘‘
self.uin = ‘‘
self.sid = ‘‘
self.skey = ‘‘
self.pass_ticket = ‘‘
self.device_id = ‘e‘ + repr(random.random())[2:17]
self.base_request = {}
self.sync_key_str = ‘‘
self.sync_key = []
self.sync_host = ‘‘
self.session = requests.Session()
self.session.headers.update({‘User-Agent‘: ‘Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5‘})
self.conf = {‘qr‘: ‘png‘}
self.my_account = {} # 当前账户
# 所有相关账号: 联系人, 公众号, 群组, 特殊账号
self.member_list = []
# 所有群组的成员, {‘group_id1‘: [member1, member2, ...], ...}
self.group_members = {}
# 所有账户, {‘group_member‘:{‘id‘:{‘type‘:‘group_member‘, ‘info‘:{}}, ...}, ‘normal_member‘:{‘id‘:{}, ...}}
self.account_info = {‘group_member‘: {}, ‘normal_member‘: {}}
self.contact_list = [] # 联系人列表
self.public_list = [] # 公众账号列表
self.group_list = [] # 群聊列表
self.special_list = [] # 特殊账号列表
@staticmethod
def to_unicode(string, encoding=‘utf-8‘):
"""
将字符串转换为Unicode
:param string: 待转换字符串
:param encoding: 字符串解码方式
:return: 转换后的Unicode字符串
"""
if isinstance(string, str):
return string.decode(encoding)
elif isinstance(string, unicode):
return string
else:
raise Exception(‘Unknown Type‘)
def get_contact(self):
"""获取当前账户的所有相关账号(包括联系人、公众号、群聊、特殊账号)"""
url = self.base_uri + ‘/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s‘ % (self.pass_ticket, self.skey, int(time.time()))
r = self.session.post(url, data=‘{}‘)
r.encoding = ‘utf-8‘
if self.DEBUG:
with open(‘contacts.json‘, ‘w‘) as f:
f.write(r.text.encode(‘utf-8‘))
dic = json.loads(r.text)
self.member_list = dic[‘MemberList‘]
special_users = [‘newsapp‘, ‘fmessage‘, ‘filehelper‘, ‘weibo‘, ‘qqmail‘,
‘fmessage‘, ‘tmessage‘, ‘qmessage‘, ‘qqsync‘, ‘floatbottle‘,
‘lbsapp‘, ‘shakeapp‘, ‘medianote‘, ‘qqfriend‘, ‘readerapp‘,
‘blogapp‘, ‘facebookapp‘, ‘masssendapp‘, ‘meishiapp‘,
‘feedsapp‘, ‘voip‘, ‘blogappweixin‘, ‘weixin‘, ‘brandsessionholder‘,
‘weixinreminder‘, ‘wxid_novlwrv3lqwv11‘, ‘gh_22b87fa7cb3c‘,
‘officialaccounts‘, ‘notification_messages‘, ‘wxid_novlwrv3lqwv11‘,
‘gh_22b87fa7cb3c‘, ‘wxitil‘, ‘userexperience_alarm‘, ‘notification_messages‘]
self.contact_list = []
self.public_list = []
self.special_list = []
self.group_list = []
for contact in self.member_list:
if contact[‘VerifyFlag‘] & 8 != 0: # 公众号
self.public_list.append(contact)
self.account_info[‘normal_member‘][contact[‘UserName‘]] = {‘type‘: ‘public‘, ‘info‘: contact}
elif contact[‘UserName‘] in special_users: # 特殊账户
self.special_list.append(contact)
self.account_info[‘normal_member‘][contact[‘UserName‘]] = {‘type‘: ‘special‘, ‘info‘: contact}
elif contact[‘UserName‘].find(‘@@‘) != -1: # 群聊
self.group_list.append(contact)
self.account_info[‘normal_member‘][contact[‘UserName‘]] = {‘type‘: ‘group‘, ‘info‘: contact}
elif contact[‘UserName‘] == self.my_account[‘UserName‘]: # 自己
self.account_info[‘normal_member‘][contact[‘UserName‘]] = {‘type‘: ‘self‘, ‘info‘: contact}
pass
else:
self.contact_list.append(contact)
self.account_info[‘normal_member‘][contact[‘UserName‘]] = {‘type‘: ‘contact‘, ‘info‘: contact}
self.group_members = self.batch_get_group_members()
for group in self.group_members:
for member in self.group_members[group]:
if member[‘UserName‘] not in self.account_info:
self.account_info[‘group_member‘][member[‘UserName‘]] = {‘type‘: ‘group_member‘,
‘info‘: member,
‘group‘: group}
if self.DEBUG:
with open(‘contact_list.json‘, ‘w‘) as f:
f.write(json.dumps(self.contact_list))
with open(‘special_list.json‘, ‘w‘) as f:
f.write(json.dumps(self.special_list))
with open(‘group_list.json‘, ‘w‘) as f:
f.write(json.dumps(self.group_list))
with open(‘public_list.json‘, ‘w‘) as f:
f.write(json.dumps(self.public_list))
with open(‘member_list.json‘, ‘w‘) as f:
f.write(json.dumps(self.member_list))
with open(‘group_users.json‘, ‘w‘) as f:
f.write(json.dumps(self.group_members))
with open(‘account_info.json‘, ‘w‘) as f:
f.write(json.dumps(self.account_info))
return True
def batch_get_group_members(self):
"""批量获取所有群聊成员信息"""
url = self.base_uri + ‘/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s‘ % (int(time.time()), self.pass_ticket)
params = {
‘BaseRequest‘: self.base_request,
"Count": len(self.group_list),
"List": [{"UserName": group[‘UserName‘], "EncryChatRoomId": ""} for group in self.group_list]
}
r = self.session.post(url, data=json.dumps(params))
r.encoding = ‘utf-8‘
dic = json.loads(r.text)
group_members = {}
for group in dic[‘ContactList‘]:
gid = group[‘UserName‘]
members = group[‘MemberList‘]
group_members[gid] = members
return group_members
def get_group_member_name(self, gid, uid):
"""
获取群聊中指定成员的名称信息
:param gid: 群id
:param uid: 群聊成员id
:return: 名称信息,类似 {"display_name": "test_user", "nickname": "test", "remark_name": "for_test" }
"""
if gid not in self.group_members:
return None
group = self.group_members[gid]
for member in group:
if member[‘UserName‘] == uid:
names = {}
if ‘RemarkName‘ in member and member[‘RemarkName‘]:
names[‘remark_name‘] = member[‘RemarkName‘]
if ‘NickName‘ in member and member[‘NickName‘]:
names[‘nickname‘] = member[‘NickName‘]
if ‘DisplayName‘ in member and member[‘DisplayName‘]:
names[‘display_name‘] = member[‘DisplayName‘]
return names
return None
def get_contact_info(self, uid):
if uid in self.account_info[‘normal_member‘]:
return self.account_info[‘normal_member‘][uid]
else:
return None
def get_group_member_info(self, uid):
if uid in self.account_info[‘group_member‘]:
return self.account_info[‘group_member‘][uid]
else:
return None
def get_group_member_info(self, uid, gid):
if gid not in self.group_members:
return None
for member in self.group_members[gid]:
if member[‘UserName‘] == uid:
return {‘type‘: ‘group_member‘, ‘info‘: member}
return None
def get_contact_name(self, uid):
info = self.get_contact_info(uid)
if info is None:
return None
info = info[‘info‘]
name = {}
if ‘RemarkName‘ in info and info[‘RemarkName‘]:
name[‘remark_name‘] = info[‘RemarkName‘]
if ‘NickName‘ in info and info[‘NickName‘]:
name[‘nickname‘] = info[‘NickName‘]
if ‘DisplayName‘ in info and info[‘DisplayName‘]:
name[‘display_name‘] = info[‘DisplayName‘]
if len(name) == 0:
return None
else:
return name
def get_group_member_name(self, uid):
info = self.get_group_member_info(uid)
if info is None:
return None
info = info[‘info‘]
name = {}
if ‘RemarkName‘ in info and info[‘RemarkName‘]:
name[‘remark_name‘] = info[‘RemarkName‘]
if ‘NickName‘ in info and info[‘NickName‘]:
name[‘nickname‘] = info[‘NickName‘]
if ‘DisplayName‘ in info and info[‘DisplayName‘]:
name[‘display_name‘] = info[‘DisplayName‘]
if len(name) == 0:
return None
else:
return name
def get_group_member_name(self, uid, gid):
info = self.get_group_member_info(uid, gid)
if info is None:
return None
info = info[‘info‘]
name = {}
if ‘RemarkName‘ in info and info[‘RemarkName‘]:
name[‘remark_name‘] = info[‘RemarkName‘]
if ‘NickName‘ in info and info[‘NickName‘]:
name[‘nickname‘] = info[‘NickName‘]
if ‘DisplayName‘ in info and info[‘DisplayName‘]:
name[‘display_name‘] = info[‘DisplayName‘]
if len(name) == 0:
return None
else:
return name
@staticmethod
def get_contact_prefer_name(name):
if name is None:
return None
if ‘remark_name‘ in name:
return name[‘remark_name‘]
if ‘nickname‘ in name:
return name[‘nickname‘]
if ‘display_name‘ in name:
return name[‘display_name‘]
return None
@staticmethod
def get_group_member_prefer_name(name):
if name is None:
return None
if ‘remark_name‘ in name:
return name[‘remark_name‘]
if ‘display_name‘ in name:
return name[‘display_name‘]
if ‘nickname‘ in name:
return name[‘nickname‘]
return None
def get_user_type(self, wx_user_id):
"""
获取特定账号与自己的关系
:param wx_user_id: 账号id:
:return: 与当前账号的关系
"""
for account in self.contact_list:
if wx_user_id == account[‘UserName‘]:
return ‘contact‘
for account in self.public_list:
if wx_user_id == account[‘UserName‘]:
return ‘public‘
for account in self.special_list:
if wx_user_id == account[‘UserName‘]:
return ‘special‘
for account in self.group_list:
if wx_user_id == account[‘UserName‘]:
return ‘group‘
for group in self.group_members:
for member in self.group_members[group]:
if member[‘UserName‘] == wx_user_id:
return ‘group_member‘
return ‘unknown‘
def is_contact(self, uid):
for account in self.contact_list:
if uid == account[‘UserName‘]:
return True
return False
def is_public(self, uid):
for account in self.public_list:
if uid == account[‘UserName‘]:
return True
return False
def is_special(self, uid):
for account in self.special_list:
if uid == account[‘UserName‘]:
return True
return False
def handle_msg_all(self, msg):
"""
处理所有消息,请子类化后覆盖此函数
msg:
msg_id -> 消息id
msg_type_id -> 消息类型id
user -> 发送消息的账号id
content -> 消息内容
:param msg: 收到的消息
"""
pass
@staticmethod
def proc_at_info(msg):
if not msg:
return ‘‘, []
segs = msg.split(u‘\u2005‘)
str_msg_all = ‘‘
str_msg = ‘‘
infos = []
if len(segs) > 1:
for i in range(0, len(segs)-1):
segs[i] += u‘\u2005‘
pm = re.search(u‘@.*\u2005‘, segs[i]).group()
if pm:
name = pm[1:-1]
string = segs[i].replace(pm, ‘‘)
str_msg_all += string + ‘@‘ + name + ‘ ‘
str_msg += string
if string:
infos.append({‘type‘: ‘str‘, ‘value‘: string})
infos.append({‘type‘: ‘at‘, ‘value‘: name})
else:
infos.append({‘type‘: ‘str‘, ‘value‘: segs[i]})
str_msg_all += segs[i]
str_msg += segs[i]
str_msg_all += segs[-1]
str_msg += segs[-1]
infos.append({‘type‘: ‘str‘, ‘value‘: segs[-1]})
else:
infos.append({‘type‘: ‘str‘, ‘value‘: segs[-1]})
str_msg_all = msg
str_msg = msg
return str_msg_all.replace(u‘\u2005‘, ‘‘), str_msg.replace(u‘\u2005‘, ‘‘), infos
def extract_msg_content(self, msg_type_id, msg):
"""
content_type_id:
0 -> Text
1 -> Location
3 -> Image
4 -> Voice
5 -> Recommend
6 -> Animation
7 -> Share
8 -> Video
9 -> VideoCall
10 -> Redraw
11 -> Empty
99 -> Unknown
:param msg_type_id: 消息类型id
:param msg: 消息结构体
:return: 解析的消息
"""
mtype = msg[‘MsgType‘]
content = HTMLParser.HTMLParser().unescape(msg[‘Content‘])
msg_id = msg[‘MsgId‘]
msg_content = {}
if msg_type_id == 0:
return {‘type‘: 11, ‘data‘: ‘‘}
elif msg_type_id == 2: # File Helper
return {‘type‘: 0, ‘data‘: content.replace(‘<br/>‘, ‘\n‘)}
elif msg_type_id == 3: # 群聊
sp = content.find(‘<br/>‘)
uid = content[:sp]
content = content[sp:]
content = content.replace(‘<br/>‘, ‘‘)
uid = uid[:-1]
name = self.get_contact_prefer_name(self.get_contact_name(uid))
if not name:
name = self.get_group_member_prefer_name(self.get_group_member_name(uid, msg[‘FromUserName‘]))
if not name:
name = ‘unknown‘
msg_content[‘user‘] = {‘id‘: uid, ‘name‘: name}
else: # Self, Contact, Special, Public, Unknown
pass
msg_prefix = (msg_content[‘user‘][‘name‘] + ‘:‘) if ‘user‘ in msg_content else ‘‘
if mtype == 1:
if content.find(‘http://weixin.qq.com/cgi-bin/redirectforward?args=‘) != -1:
r = self.session.get(content)
r.encoding = ‘gbk‘
data = r.text
pos = self.search_content(‘title‘, data, ‘xml‘)
msg_content[‘type‘] = 1
msg_content[‘data‘] = pos
msg_content[‘detail‘] = data
if self.DEBUG:
print ‘ %s[Location] %s ‘ % (msg_prefix, pos)
else:
msg_content[‘type‘] = 0
if msg_type_id == 3 or (msg_type_id == 1 and msg[‘ToUserName‘][:2] == ‘@@‘): # Group text message
msg_infos = self.proc_at_info(content)
str_msg_all = msg_infos[0]
str_msg = msg_infos[1]
detail = msg_infos[2]
msg_content[‘data‘] = str_msg_all
msg_content[‘detail‘] = detail
msg_content[‘desc‘] = str_msg
else:
msg_content[‘data‘] = content
if self.DEBUG:
try:
print ‘ %s[Text] %s‘ % (msg_prefix, msg_content[‘data‘])
except UnicodeEncodeError:
print ‘ %s[Text] (illegal text).‘ % msg_prefix
elif mtype == 3:
msg_content[‘type‘] = 3
msg_content[‘data‘] = self.get_msg_img_url(msg_id)
if self.DEBUG:
image = self.get_msg_img(msg_id)
print ‘ %s[Image] %s‘ % (msg_prefix, image)
elif mtype == 34:
msg_content[‘type‘] = 4
msg_content[‘data‘] = self.get_voice_url(msg_id)
if self.DEBUG:
voice = self.get_voice(msg_id)
print ‘ %s[Voice] %s‘ % (msg_prefix, voice)
elif mtype == 42:
msg_content[‘type‘] = 5
info = msg[‘RecommendInfo‘]
msg_content[‘data‘] = {‘nickname‘: info[‘NickName‘],
‘alias‘: info[‘Alias‘],
‘province‘: info[‘Province‘],
‘city‘: info[‘City‘],
‘gender‘: [‘unknown‘, ‘male‘, ‘female‘][info[‘Sex‘]]}
if self.DEBUG:
print ‘ %s[Recommend]‘ % msg_prefix
print ‘ -----------------------------‘
print ‘ | NickName: %s‘ % info[‘NickName‘]
print ‘ | Alias: %s‘ % info[‘Alias‘]
print ‘ | Local: %s %s‘ % (info[‘Province‘], info[‘City‘])
print ‘ | Gender: %s‘ % [‘unknown‘, ‘male‘, ‘female‘][info[‘Sex‘]]
print ‘ -----------------------------‘
elif mtype == 47:
msg_content[‘type‘] = 6
msg_content[‘data‘] = self.search_content(‘cdnurl‘, content)
if self.DEBUG:
print ‘ %s[Animation] %s‘ % (msg_prefix, msg_content[‘data‘])
elif mtype == 49:
msg_content[‘type‘] = 7
app_msg_type = ‘‘
if msg[‘AppMsgType‘] == 3:
app_msg_type = ‘music‘
elif msg[‘AppMsgType‘] == 5:
app_msg_type = ‘link‘
elif msg[‘AppMsgType‘] == 7:
app_msg_type = ‘weibo‘
else:
app_msg_type = ‘unknown‘
msg_content[‘data‘] = {‘type‘: app_msg_type,
‘title‘: msg[‘FileName‘],
‘desc‘: self.search_content(‘des‘, content, ‘xml‘),
‘url‘: msg[‘Url‘],
‘from‘: self.search_content(‘appname‘, content, ‘xml‘)}
if self.DEBUG:
print ‘ %s[Share] %s‘ % (msg_prefix, app_msg_type)
print ‘ --------------------------‘
print ‘ | title: %s‘ % msg[‘FileName‘]
print ‘ | desc: %s‘ % self.search_content(‘des‘, content, ‘xml‘)
print ‘ | link: %s‘ % msg[‘Url‘]
print ‘ | from: %s‘ % self.search_content(‘appname‘, content, ‘xml‘)
print ‘ --------------------------‘
elif mtype == 62:
msg_content[‘type‘] = 8
msg_content[‘data‘] = content
if self.DEBUG:
print ‘ %s[Video] Please check on mobiles‘ % msg_prefix
elif mtype == 53:
msg_content[‘type‘] = 9
msg_content[‘data‘] = content
if self.DEBUG:
print ‘ %s[Video Call]‘ % msg_prefix
elif mtype == 10002:
msg_content[‘type‘] = 10
msg_content[‘data‘] = content
if self.DEBUG:
print ‘ %s[Redraw]‘ % msg_prefix
elif mtype == 10000: # unknown, maybe red packet, or group invite
msg_content[‘type‘] = 12
msg_content[‘data‘] = msg[‘Content‘]
if self.DEBUG:
print ‘ [Unknown]‘
else:
msg_content[‘type‘] = 99
msg_content[‘data‘] = content
if self.DEBUG:
print ‘ %s[Unknown]‘ % msg_prefix
return msg_content
def handle_msg(self, r):
"""
处理原始微信消息的内部函数
msg_type_id:
0 -> Init
1 -> Self
2 -> FileHelper
3 -> Group
4 -> Contact
5 -> Public
6 -> Special
99 -> Unknown
:param r: 原始微信消息
"""
for msg in r[‘AddMsgList‘]:
msg_type_id = 99
user = {‘id‘: msg[‘FromUserName‘], ‘name‘: ‘unknown‘}
if msg[‘MsgType‘] == 51: # init message
msg_type_id = 0
user[‘name‘] = ‘system‘
elif msg[‘FromUserName‘] == self.my_account[‘UserName‘]: # Self
msg_type_id = 1
user[‘name‘] = ‘self‘
elif msg[‘ToUserName‘] == ‘filehelper‘: # File Helper
msg_type_id = 2
user[‘name‘] = ‘file_helper‘
elif msg[‘FromUserName‘][:2] == ‘@@‘: # Group
msg_type_id = 3
user[‘name‘] = self.get_contact_prefer_name(self.get_contact_name(user[‘id‘]))
elif self.is_contact(msg[‘FromUserName‘]): # Contact
msg_type_id = 4
user[‘name‘] = self.get_contact_prefer_name(self.get_contact_name(user[‘id‘]))
elif self.is_public(msg[‘FromUserName‘]): # Public
msg_type_id = 5
user[‘name‘] = self.get_contact_prefer_name(self.get_contact_name(user[‘id‘]))
elif self.is_special(msg[‘FromUserName‘]): # Special
msg_type_id = 6
user[‘name‘] = self.get_contact_prefer_name(self.get_contact_name(user[‘id‘]))
else:
msg_type_id = 99
user[‘name‘] = ‘unknown‘
if not user[‘name‘]:
user[‘name‘] = ‘unknown‘
user[‘name‘] = HTMLParser.HTMLParser().unescape(user[‘name‘])
if self.DEBUG and msg_type_id != 0:
print ‘[MSG] %s:‘ % user[‘name‘]
content = self.extract_msg_content(msg_type_id, msg)
message = {‘msg_type_id‘: msg_type_id,
‘msg_id‘: msg[‘MsgId‘],
‘content‘: content,
‘to_user_id‘: msg[‘ToUserName‘],
‘user‘: user}
self.handle_msg_all(message)
def schedule(self):
"""
做任务型事情的函数,如果需要,可以在子类中覆盖此函数
此函数在处理消息的间隙被调用,请不要长时间阻塞此函数
"""
pass
def proc_msg(self):
self.test_sync_check()
while True:
check_time = time.time()
[retcode, selector] = self.sync_check()
if retcode == ‘1100‘: # 从微信客户端上登出
break
elif retcode == ‘1101‘: # 从其它设备上登了网页微信
break
elif retcode == ‘0‘:
if selector == ‘2‘: # 有新消息
r = self.sync()
if r is not None:
self.handle_msg(r)
elif selector == ‘7‘: # 在手机上操作了微信
r = self.sync()
if r is not None:
self.handle_msg(r)
elif selector == ‘0‘: # 无事件
pass
else:
pass
self.schedule()
check_time = time.time() - check_time
if check_time < 0.8:
time.sleep(1 - check_time)
def send_msg_by_uid(self, word, dst=‘filehelper‘):
url = self.base_uri + ‘/webwxsendmsg?pass_ticket=%s‘ % self.pass_ticket
msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace(‘.‘, ‘‘)
if type(word) == ‘str‘:
word = word.decode(‘utf-8‘)
params = {
‘BaseRequest‘: self.base_request,
‘Msg‘: {
"Type": 1,
"Content": word,
"FromUserName": self.my_account[‘UserName‘],
"ToUserName": dst,
"LocalID": msg_id,
"ClientMsgId": msg_id
}
}
headers = {‘content-type‘: ‘application/json; charset=UTF-8‘}
data = json.dumps(params, ensure_ascii=False).encode(‘utf8‘)
try:
r = self.session.post(url, data=data, headers=headers)
except (ConnectionError, ReadTimeout):
return False
dic = r.json()
return dic[‘BaseResponse‘][‘Ret‘] == 0
def get_user_id(self, name):
if name == ‘‘:
return ‘‘
for contact in self.contact_list:
if ‘RemarkName‘ in contact and contact[‘RemarkName‘] == name:
return contact[‘UserName‘]
elif ‘NickName‘ in contact and contact[‘NickName‘] == name:
return contact[‘UserName‘]
elif ‘DisplayName‘ in contact and contact[‘DisplayName‘] == name:
return contact[‘UserName‘]
return ‘‘
def send_msg(self, name, word, isfile=False):
uid = self.get_user_id(name)
if uid:
if isfile:
with open(word, ‘r‘) as f:
result = True
for line in f.readlines():
line = line.replace(‘\n‘, ‘‘)
print ‘-> ‘ + name + ‘: ‘ + line
if self.send_msg_by_uid(line, uid):
pass
else:
result = False
time.sleep(1)
return result
else:
if self.send_msg_by_uid(word, uid):
return True
else:
return False
else:
if self.DEBUG:
print ‘[ERROR] This user does not exist .‘
return True
@staticmethod
def search_content(key, content, fmat=‘attr‘):
if fmat == ‘attr‘:
pm = re.search(key + ‘\s?=\s?"([^"<]+)"‘, content)
if pm:
return pm.group(1)
elif fmat == ‘xml‘:
pm = re.search(‘<{0}>([^<]+)</{0}>‘.format(key), content)
if pm:
return pm.group(1)
return ‘unknown‘
def run(self):
self.get_uuid()
self.gen_qr_code(‘qr.png‘)
print ‘[INFO] Please use WeChat to scan the QR code .‘
result = self.wait4login()
if result != SUCCESS:
print ‘[ERROR] Web WeChat login failed. failed code=%s‘%(result, )
return
if self.login():
print ‘[INFO] Web WeChat login succeed .‘
else:
print ‘[ERROR] Web WeChat login failed .‘
return
if self.init():
print ‘[INFO] Web WeChat init succeed .‘
else:
print ‘[INFO] Web WeChat init failed‘
return
self.status_notify()
self.get_contact()
print ‘[INFO] Get %d contacts‘ % len(self.contact_list)
print ‘[INFO] Start to process messages .‘
self.proc_msg()
def get_uuid(self):
url = ‘https://login.weixin.qq.com/jslogin‘
params = {
‘appid‘: ‘wx782c26e4c19acffb‘,
‘fun‘: ‘new‘,
‘lang‘: ‘zh_CN‘,
‘_‘: int(time.time()) * 1000 + random.randint(1, 999),
}
r = self.session.get(url, params=params)
r.encoding = ‘utf-8‘
data = r.text
regx = r‘window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"‘
pm = re.search(regx, data)
if pm:
code = pm.group(1)
self.uuid = pm.group(2)
return code == ‘200‘
return False
def gen_qr_code(self, qr_file_path):
string = ‘https://login.weixin.qq.com/l/‘ + self.uuid
qr = pyqrcode.create(string)
if self.conf[‘qr‘] == ‘png‘:
qr.png(qr_file_path, scale=8)
show_image(qr_file_path)
# img = Image.open(qr_file_path)
# img.show()
elif self.conf[‘qr‘] == ‘tty‘:
print(qr.terminal(quiet_zone=1))
def do_request(self, url):
r = self.session.get(url)
r.encoding = ‘utf-8‘
data = r.text
param = re.search(r‘window.code=(\d+);‘, data)
code = param.group(1)
return code, data
def wait4login(self):
"""
http comet:
tip=1, 等待用户扫描二维码,
201: scaned
408: timeout
tip=0, 等待用户确认登录,
200: confirmed
"""
LOGIN_TEMPLATE = ‘https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s‘
tip = 1
try_later_secs = 1
MAX_RETRY_TIMES = 10
code = UNKONWN
retry_time = MAX_RETRY_TIMES
while retry_time > 0:
url = LOGIN_TEMPLATE % (tip, self.uuid, int(time.time()))
code, data = self.do_request(url)
if code == SCANED:
print ‘[INFO] Please confirm to login .‘
tip = 0
elif code == SUCCESS: # 确认登录成功
param = re.search(r‘window.redirect_uri="(\S+?)";‘, data)
redirect_uri = param.group(1) + ‘&fun=new‘
self.redirect_uri = redirect_uri
self.base_uri = redirect_uri[:redirect_uri.rfind(‘/‘)]
return code
elif code == TIMEOUT:
print ‘[ERROR] WeChat login timeout. retry in %s secs later...‘%(try_later_secs, )
tip = 1 # 重置
retry_time -= 1
time.sleep(try_later_secs)
else:
print (‘[ERROR] WeChat login exception return_code=%s. retry in %s secs later...‘ %
(code, try_later_secs))
tip = 1
retry_time -= 1
time.sleep(try_later_secs)
return code
def login(self):
if len(self.redirect_uri) < 4:
print ‘[ERROR] Login failed due to network problem, please try again.‘
return False
r = self.session.get(self.redirect_uri)
r.encoding = ‘utf-8‘
data = r.text
doc = xml.dom.minidom.parseString(data)
root = doc.documentElement
for node in root.childNodes:
if node.nodeName == ‘skey‘:
self.skey = node.childNodes[0].data
elif node.nodeName == ‘wxsid‘:
self.sid = node.childNodes[0].data
elif node.nodeName == ‘wxuin‘:
self.uin = node.childNodes[0].data
elif node.nodeName == ‘pass_ticket‘:
self.pass_ticket = node.childNodes[0].data
if ‘‘ in (self.skey, self.sid, self.uin, self.pass_ticket):
return False
self.base_request = {
‘Uin‘: self.uin,
‘Sid‘: self.sid,
‘Skey‘: self.skey,
‘DeviceID‘: self.device_id,
}
return True
def init(self):
url = self.base_uri + ‘/webwxinit?r=%i&lang=en_US&pass_ticket=%s‘ % (int(time.time()), self.pass_ticket)
params = {
‘BaseRequest‘: self.base_request
}
r = self.session.post(url, data=json.dumps(params))
r.encoding = ‘utf-8‘
dic = json.loads(r.text)
self.sync_key = dic[‘SyncKey‘]
self.my_account = dic[‘User‘]
self.sync_key_str = ‘|‘.join([str(keyVal[‘Key‘]) + ‘_‘ + str(keyVal[‘Val‘])
for keyVal in self.sync_key[‘List‘]])
return dic[‘BaseResponse‘][‘Ret‘] == 0
def status_notify(self):
url = self.base_uri + ‘/webwxstatusnotify?lang=zh_CN&pass_ticket=%s‘ % self.pass_ticket
self.base_request[‘Uin‘] = int(self.base_request[‘Uin‘])
params = {
‘BaseRequest‘: self.base_request,
"Code": 3,
"FromUserName": self.my_account[‘UserName‘],
"ToUserName": self.my_account[‘UserName‘],
"ClientMsgId": int(time.time())
}
r = self.session.post(url, data=json.dumps(params))
r.encoding = ‘utf-8‘
dic = json.loads(r.text)
return dic[‘BaseResponse‘][‘Ret‘] == 0
def test_sync_check(self):
for host in [‘webpush‘, ‘webpush2‘]:
self.sync_host = host
retcode = self.sync_check()[0]
if retcode == ‘0‘:
return True
return False
def sync_check(self):
params = {
‘r‘: int(time.time()),
‘sid‘: self.sid,
‘uin‘: self.uin,
‘skey‘: self.skey,
‘deviceid‘: self.device_id,
‘synckey‘: self.sync_key_str,
‘_‘: int(time.time()),
}
url = ‘https://‘ + self.sync_host + ‘.weixin.qq.com/cgi-bin/mmwebwx-bin/synccheck?‘ + urllib.urlencode(params)
try:
r = self.session.get(url, timeout=60)
except (ConnectionError, ReadTimeout):
return [-1, -1]
r.encoding = ‘utf-8‘
data = r.text
pm = re.search(r‘window.synccheck=\{retcode:"(\d+)",selector:"(\d+)"\}‘, data)
retcode = pm.group(1)
selector = pm.group(2)
return [retcode, selector]
def sync(self):
url = self.base_uri + ‘/webwxsync?sid=%s&skey=%s&lang=en_US&pass_ticket=%s‘ % (self.sid, self.skey, self.pass_ticket)
params = {
‘BaseRequest‘: self.base_request,
‘SyncKey‘: self.sync_key,
‘rr‘: ~int(time.time())
}
try:
r = self.session.post(url, data=json.dumps(params), timeout=60)
except (ConnectionError, ReadTimeout):
return None
r.encoding = ‘utf-8‘
dic = json.loads(r.text)
if dic[‘BaseResponse‘][‘Ret‘] == 0:
self.sync_key = dic[‘SyncKey‘]
self.sync_key_str = ‘|‘.join([str(keyVal[‘Key‘]) + ‘_‘ + str(keyVal[‘Val‘])
for keyVal in self.sync_key[‘List‘]])
return dic
def get_icon(self, uid):
url = self.base_uri + ‘/webwxgeticon?username=%s&skey=%s‘ % (uid, self.skey)
r = self.session.get(url)
data = r.content
fn = ‘img_‘ + uid + ‘.jpg‘
with open(fn, ‘wb‘) as f:
f.write(data)
return fn
def get_head_img(self, uid):
url = self.base_uri + ‘/webwxgetheadimg?username=%s&skey=%s‘ % (uid, self.skey)
r = self.session.get(url)
data = r.content
fn = ‘img_‘ + uid + ‘.jpg‘
with open(fn, ‘wb‘) as f:
f.write(data)
return fn
def get_msg_img_url(self, msgid):
return self.base_uri + ‘/webwxgetmsgimg?MsgID=%s&skey=%s‘ % (msgid, self.skey)
def get_msg_img(self, msgid):
url = self.base_uri + ‘/webwxgetmsgimg?MsgID=%s&skey=%s‘ % (msgid, self.skey)
r = self.session.get(url)
data = r.content
fn = ‘img_‘ + msgid + ‘.jpg‘
with open(fn, ‘wb‘) as f:
f.write(data)
return fn
def get_voice_url(self, msgid):
return self.base_uri + ‘/webwxgetvoice?msgid=%s&skey=%s‘ % (msgid, self.skey)
def get_voice(self, msgid):
url = self.base_uri + ‘/webwxgetvoice?msgid=%s&skey=%s‘ % (msgid, self.skey)
r = self.session.get(url)
data = r.content
fn = ‘voice_‘ + msgid + ‘.mp3‘
with open(fn, ‘wb‘) as f:
f.write(data)
return fn
这样我们就做好了一个微信消息处理模块,我们只要在需要的时候将其导入我们的主程序,就可以调用其相关的类和方法。
- 接着我们要做的是去图灵机器人官网申请一个接口,也就是要有一个用于对别人消息做出自动回复的机器人。注册申请成功后,将其key记录下来。用记事本编写配置文件conf.ini,以供程序调用。
[main]
key=31fd87ea28a0e6cc774ef7913d4499c1
这里的key要换成你自己的。将这个文件放在和主程序相同的目录下。
- 编写主函数如下:
#!/usr/bin/env python
# coding: utf-8
from wxbot import *#导入类函数
import ConfigParser
import json
class TulingWXBot(WXBot):#图灵key的读入
def __init__(self):
WXBot.__init__(self)
self.tuling_key = ""
self.robot_switch = True
try:
cf = ConfigParser.ConfigParser()
cf.read(‘conf.ini‘)
self.tuling_key = cf.get(‘main‘, ‘key‘)#对配置文件写入
except Exception:
pass
print ‘tuling_key:‘, self.tuling_key
def tuling_auto_reply(self, uid, msg):
if self.tuling_key:
url = "http://www.tuling123.com/openapi/api"
user_id = uid.replace(‘@‘, ‘‘)[:30]
body = {‘key‘: self.tuling_key, ‘info‘: msg.encode(‘utf8‘), ‘userid‘: user_id}
r = requests.post(url, data=body)
respond = json.loads(r.text)
result = ‘‘
if respond[‘code‘] == 100000:
result = respond[‘text‘].replace(‘<br>‘, ‘ ‘)
elif respond[‘code‘] == 200000:
result = respond[‘url‘]
else:
result = respond[‘text‘].replace(‘<br>‘, ‘ ‘)
print ‘ ROBOT:‘, result
return result
else:
return u"有点忙,回聊哦。"
def auto_switch(self, msg):
msg_data = msg[‘content‘][‘data‘]
stop_cmd = [u‘退下‘, u‘走开‘, u‘关闭‘, u‘关掉‘, u‘休息‘, u‘滚开‘]
start_cmd = [u‘出来‘, u‘启动‘, u‘工作‘]
if self.robot_switch:
for i in stop_cmd:
if i == msg_data:
self.robot_switch = False
self.send_msg_by_uid(u‘[Robot]‘ + u‘机器人已关闭!‘, msg[‘to_user_id‘])
else:
for i in start_cmd:
if i == msg_data:
self.robot_switch = True
self.send_msg_by_uid(u‘[Robot]‘ + u‘机器人已开启!‘, msg[‘to_user_id‘])
def handle_msg_all(self, msg):
if not self.robot_switch and msg[‘msg_type_id‘] != 1:
return
if msg[‘msg_type_id‘] == 1 and msg[‘content‘][‘type‘] == 0: # reply to self
self.auto_switch(msg)
elif msg[‘msg_type_id‘] == 4 and msg[‘content‘][‘type‘] == 0: # text message from contact
self.send_msg_by_uid(self.tuling_auto_reply(msg[‘user‘][‘id‘], msg[‘content‘][‘data‘]), msg[‘user‘][‘id‘])
elif msg[‘msg_type_id‘] == 3 and msg[‘content‘][‘type‘] == 0: # group text message
if ‘detail‘ in msg[‘content‘]:
my_names = self.get_group_member_name(self.my_account[‘UserName‘], msg[‘user‘][‘id‘])
if my_names is None:
my_names = {}
if ‘NickName‘ in self.my_account and self.my_account[‘NickName‘]:
my_names[‘nickname2‘] = self.my_account[‘NickName‘]
if ‘RemarkName‘ in self.my_account and self.my_account[‘RemarkName‘]:
my_names[‘remark_name2‘] = self.my_account[‘RemarkName‘]
is_at_me = False
for detail in msg[‘content‘][‘detail‘]:
if detail[‘type‘] == ‘at‘:
for k in my_names:
if my_names[k] and my_names[k] == detail[‘value‘]:
is_at_me = True
break
if is_at_me:
src_name = msg[‘content‘][‘user‘][‘name‘]
reply = ‘to ‘ + src_name + ‘: ‘
if msg[‘content‘][‘type‘] == 0: # text message
reply += self.tuling_auto_reply(msg[‘content‘][‘user‘][‘id‘], msg[‘content‘][‘desc‘])
else:
reply += u"对不起,读的书少,不认识你发的乱七八糟的东西。"
self.send_msg_by_uid(reply, msg[‘user‘][‘id‘])
def main():
bot = TulingWXBot()
bot.DEBUG = True
bot.conf[‘qr‘] = ‘png‘
bot.run()
if __name__ == ‘__main__‘:
main()
摁F5,轻松执行。
wxBot 是用Python包装Web微信协议实现的微信机器人框架。
目前的消息支持情况:
[ ] 群消息
- [x] 文本
- [x] 图片
- [x] 地理位置
- [x] 个人名片
- [x] 语音
- [x] 动画
- [ ] 语音电话
- [ ] 红包
[ ] 联系人消息
- [x] 文本
- [x] 图片
- [x] 地理位置
- [x] 个人名片
- [x] 语音
- [x] 小视频
- [x] 动画
- [ ] 视频电话
- [ ] 红包
- [ ] 转账
Web微信协议参考资料:
qwx: WeChat Qt frontend 微信Qt前端
1 环境与依赖
此版本只能运行于Python 2环境 。
wxBot 用到了Python requests , pypng , Pillow* 以及 **pyqrcode 库。
使用之前需要所依赖的库:
pip install requests
pip install pyqrcode
pip install pypng
pip install Pillow
2 快速开发
利用 wxBot 最简单的方法就是继承WXBot类并实现 handle_msg_all
或者 schedule
函数,然后实例化子类并调用 run
方法 。
2.1 代码
以下的代码对所有来自好友的文本消息回复 hi , 并不断向好友 tb 发送 schedule 。
handle_msg_all
函数用于处理收到的每条消息,而 schedule
函数可以做一些任务性的工作(例如不断向好友推送信息或者一些定时任务)。
#!/usr/bin/env python
# coding: utf-8
import time
from wxbot import *
class MyWXBot(WXBot):
def handle_msg_all(self, msg):
if msg[‘msg_type_id‘] == 4 and msg[‘content‘][‘type‘] == 0:
self.send_msg_by_uid(‘hi‘, msg[‘user‘][‘id‘])
def schedule(self):
self.send_msg(‘tb‘, ‘schedule‘)
time.sleep(1)
def main():
bot = MyWXBot()
bot.DEBUG = True
bot.run()
if __name__ == ‘__main__‘:
main()
2.2 运行
直接用 python
运行代码(如运行测试代码 test.py ):
python test.py
2.3 登录微信
程序运行之后,会在当前目录下生成二维码图片文件 qr.png 并自动打开,用微信扫描此二维码并按操作指示确认登录网页微信。
如果运行在Linux下,还可以通过设置 WXBot 对象的 conf[‘qr‘]
为 tty
的方式直接在终端打印二维码(此方法只能在Linux终端下使用),效果如下:
3 接口
3.1 handle_msg_all
handle_msg_all
函数的参数 msg
是代表一条消息的字典。字段的内容为:
字段名 | 字段内容 |
---|---|
msg_type_id |
整数,消息类型,具体解释可以查看 消息类型表 |
msg_id |
字符串,消息id |
content |
字典,消息内容,具体含有的字段请参考 消息类型表 ,一般含有 type (数据类型)与 data (数据内容)字段,type 与 data 的对应关系可以参考 数据类型表 |
user |
字典,消息来源,字典包含 name (发送者名称,如果是群则为群名称,如果为微信号,有备注则为备注名,否则为微信号或者群昵称)字段与 id (发送者id)字段,都是字符串 |
3.2 消息类型表
类型号 | 消息类型 | content |
---|---|---|
0 | 初始化消息,内部数据 | 无意义,可以忽略 |
1 | 自己发送的消息 | 无意义,可以忽略 |
2 | 文件消息 | 字典,包含 type 与 data 字段 |
3 | 群消息 | 字典, 包含 user (字典,包含 id 与 name 字段,都是字符串,表示发送此消息的群用户)与 type 、 data 字段,红包消息只有 type 字段, 文本消息还有detail、desc字段, 参考 群文本消息 |
4 | 联系人消息 | 字典,包含 type 与 data 字段 |
5 | 公众号消息 | 字典,包含 type 与 data 字段 |
6 | 特殊账号消息 | 字典,包含 type 与 data 字段 |
99 | 未知账号消息 | 无意义,可以忽略 |
3.3 数据类型表
type |
数据类型 | data |
---|---|---|
0 | 文本 | 字符串,表示文本消息的具体内容 |
1 | 地理位置 | 字符串,表示地理位置 |
3 | 图片 | 字符串,图片数据的url,HTTP POST请求此url可以得到jpg文件格式的数据 |
4 | 语音 | 字符串,语音数据的url,HTTP POST请求此url可以得到mp3文件格式的数据 |
5 | 名片 | 字典,包含 nickname (昵称), alias (别名),province (省份),city (城市), gender (性别)字段 |
6 | 动画 | 字符串, 动画url, HTTP POST请求此url可以得到gif文件格式的数据 |
7 | 分享 | 字典,包含 type (类型),title (标题),desc (描述),url (链接),from (源网站)字段 |
8 | 视频 | 不可用 |
9 | 视频电话 | 不可用 |
10 | 撤回消息 | 不可用 |
11 | 空内容 | 空字符串 |
12 | 红包 | 不可用 |
99 | 未知类型 | 不可用 |
3.4 群文本消息
由于群文本消息中可能含有@信息,因此群文本消息的 content
字典除了含有 type
与 data
字段外,还含有 detail
与 desc
字段。
各字段内容为:
字段 | 内容 |
---|---|
type |
数据类型, 为0(文本) |
data |
字符串,消息内容,含有@信息 |
desc |
字符串,删除了所有@信息 |
detail |
数组,元素类型为含有 type 与 value 字段的字典, type 为字符串 str (表示元素为普通字符串,此时value为消息内容) 或 at (表示元素为@信息, 此时value为所@的用户名) |
3.5 WXBot对象属性
WXBot 对象在登录并初始化之后,含有以下的可用数据:
属性 | 描述 |
---|---|
contact_list |
当前用户的微信联系人列表 |
group_list |
当前用户的微信群列表 |
public_list |
当前用户关注的公众号列表 |
special_list |
特殊账号列表 |
session |
WXBot 与WEB微信服务器端交互所用的 Requests Session 对象 |
3.6 WXBot对象方法
WXBot 对象还含有一些可以利用的方法
方法 | 描述 |
---|---|
get_icon(id) |
获取用户icon并保存到本地文件 img_[id].jpg , id 为用户id(Web微信数据) |
get_head_img(id) |
获取用户头像并保存到本地文件 img_[id].jpg ,id 为用户id(Web微信数据) |
get_msg_img(msgid) |
获取图像消息并保存到本地文件 img_[msgid].jpg , msgid 为消息id(Web微信数据) |
get_voice(msgid) |
获取语音消息并保存到本地文件 voice_[msgid].mp3 , msgid 为消息id(Web微信数据) |
get_contact_name(uid) |
获取微信id对应的名称,返回一个可能包含 remark_name (备注名), nickname (昵称), display_name (群名称)的字典 |
send_msg_by_uid(word, dst) |
向好友发送消息,word 为消息字符串,dst 为好友用户id(Web微信数据) |
send_msg(name, word, isfile) |
向好友发送消息,name 为好友的备注名或者好友微信号, isfile 为 False 时 word 为消息,isfile 为 True 时 word 为文件路径(此时向好友发送文件里的每一行),此方法在有重名好友时会有问题,因此更推荐使用 send_msg_by_uid(word, dst) |
is_contact(uid) |
判断id为 uid 的账号是否是本帐号的好友,返回 True (是)或 False (不是) |
is_public(uid) |
判断id为 uid 的账号是否是本帐号所关注的公众号,返回 True (是)或 False (不是) |
4 群聊机器人示例
bot.py 用 图灵机器人 API 以及 wxBot 实现了一个自动回复机器人.
此机器人会回复来自联系人的消息,以及群里@此账号的消息。
并且本帐号可以通过发送 退下 、 走开 、 关闭 、 关掉 、 休息 、 滚开 来关闭机器人的自动回复。
也可以通过发送 出来 、 启动 、 工作 来再次开启机器人的自动回复。
群聊时需要将对应的群保存到联系人列表。
群聊实现效果:
bot.py 的运行方法:
要接入图灵机器人API时:
[main] key=1d2678900f734aa0a23734ace8aec5b1
- 运行 bot.py
python bot.py
不接入图灵机器人API时(此时机器人对联系人消息以及群里@自己的消息统一回复 知道了 ):
- 运行 bot.py
python bot.py
5 帮助项目
欢迎对本项目提意见、贡献代码,参考: 如何帮助项目