import os import json import urllib3 class WinxinApi(object): def __init__(self,corpid,secret,chatid): self.secret = secret # 企业微信应用凭证 self.corpid = corpid # 企业微信id self.chatid = chatid # 企业微信群聊id self.http = urllib3.PoolManager() def __get_token(self): '''token获取''' url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}".format(self.corpid,self.secret) r = self.http.request('GET', url) req = r.data.decode("utf-8") data = json.loads(req) if data["errcode"] == 0: return data['access_token'] else: raise data["errmsg"] def __upload_file(self,file_path,type='file'): '''上传临时文件''' if not os.path.exists(file_path): raise ValueError("{},文件不存在".format(file_path)) file_name = file_path.split("\\")[-1] token = self.__get_token() with open(file_path,'rb') as f: file_content = f.read() files = {'filefield': (file_name, file_content, 'text/plain')} url = "https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={}&type={}".format(token,type) r = self.http.request('POST', url,fields=files) req = r.data.decode("utf-8") data = json.loads(req) if data["errcode"] == 0: return data['media_id'] else: raise data["errmsg"] def send_file_message(self,file_path): token = self.__get_token() media_id = self.__upload_file(file_path) body = { "chatid" : self.chatid, # 群聊id "msgtype" : "file", # 消息类型,此时固定为:file "file" : {"media_id" : media_id},# 文件id,可以调用上传临时素材接口获取 "safe":0 # 表示是否是保密消息,0表示否,1表示是 } url = "https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={}".format(token) bodyjson = json.dumps(body) r = self.http.request('POST', url, body=bodyjson) req = r.data.decode("utf-8") data = json.loads(req) if data["errcode"] == 0: print("发送文件到企业微信成功") else: raise data["errmsg"]