perf: 优化修改 sdk 位置

This commit is contained in:
ibuler
2021-10-21 16:50:11 +08:00
committed by xinwen
parent d5c9ec1c3d
commit 072865f3e5
26 changed files with 25 additions and 25 deletions

View File

View File

@@ -0,0 +1,178 @@
import time
import hmac
import base64
from common.utils import get_logger
from common.sdk.im.utils import digest, as_request
from common.sdk.im.mixin import BaseRequest
logger = get_logger(__file__)
def sign(secret, data):
digest = hmac.HMAC(
key=secret.encode('utf8'),
msg=data.encode('utf8'),
digestmod=hmac._hashlib.sha256
).digest()
signature = base64.standard_b64encode(digest).decode('utf8')
# signature = urllib.parse.quote(signature, safe='')
# signature = signature.replace('+', '%20').replace('*', '%2A').replace('~', '%7E').replace('/', '%2F')
return signature
class ErrorCode:
INVALID_TOKEN = 88
class URL:
QR_CONNECT = 'https://oapi.dingtalk.com/connect/qrconnect'
GET_USER_INFO_BY_CODE = 'https://oapi.dingtalk.com/sns/getuserinfo_bycode'
GET_TOKEN = 'https://oapi.dingtalk.com/gettoken'
SEND_MESSAGE_BY_TEMPLATE = 'https://oapi.dingtalk.com/topapi/message/corpconversation/sendbytemplate'
SEND_MESSAGE = 'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2'
GET_SEND_MSG_PROGRESS = 'https://oapi.dingtalk.com/topapi/message/corpconversation/getsendprogress'
GET_USERID_BY_UNIONID = 'https://oapi.dingtalk.com/topapi/user/getbyunionid'
class DingTalkRequests(BaseRequest):
invalid_token_errcodes = (ErrorCode.INVALID_TOKEN,)
def __init__(self, appid, appsecret, agentid, timeout=None):
self._appid = appid or ''
self._appsecret = appsecret or ''
self._agentid = agentid or ''
super().__init__(timeout=timeout)
def get_access_token_cache_key(self):
return digest(self._appid, self._appsecret)
def request_access_token(self):
# https://developers.dingtalk.com/document/app/obtain-orgapp-token?spm=ding_open_doc.document.0.0.3a256573JEWqIL#topic-1936350
params = {'appkey': self._appid, 'appsecret': self._appsecret}
data = self.raw_request('get', url=URL.GET_TOKEN, params=params)
access_token = data['access_token']
expires_in = data['expires_in']
return access_token, expires_in
def add_token(self, kwargs: dict):
params = kwargs.get('params')
if params is None:
params = {}
kwargs['params'] = params
params['access_token'] = self.access_token
def get(self, url, params=None,
with_token=False, with_sign=False,
check_errcode_is_0=True,
**kwargs):
pass
get = as_request(get)
def post(self, url, json=None, params=None,
with_token=False, with_sign=False,
check_errcode_is_0=True,
**kwargs) -> dict:
pass
post = as_request(post)
def _add_sign(self, kwargs: dict):
params = kwargs.get('params')
if params is None:
params = {}
kwargs['params'] = params
timestamp = str(int(time.time() * 1000))
signature = sign(self._appsecret, timestamp)
params['timestamp'] = timestamp
params['signature'] = signature
params['accessKey'] = self._appid
def request(self, method, url,
with_token=False, with_sign=False,
check_errcode_is_0=True,
**kwargs):
if with_sign:
self._add_sign(kwargs)
data = super().request(
method, url, with_token=with_token,
check_errcode_is_0=check_errcode_is_0, **kwargs
)
return data
class DingTalk:
def __init__(self, appid, appsecret, agentid, timeout=None):
self._appid = appid or ''
self._appsecret = appsecret or ''
self._agentid = agentid or ''
self._request = DingTalkRequests(
appid=appid, appsecret=appsecret, agentid=agentid,
timeout=timeout
)
def get_userinfo_bycode(self, code):
# https://developers.dingtalk.com/document/app/obtain-the-user-information-based-on-the-sns-temporary-authorization?spm=ding_open_doc.document.0.0.3a256573y8Y7yg#topic-1995619
body = {
"tmp_auth_code": code
}
data = self._request.post(URL.GET_USER_INFO_BY_CODE, json=body, with_sign=True)
return data['user_info']
def get_userid_by_code(self, code):
user_info = self.get_userinfo_bycode(code)
unionid = user_info['unionid']
userid = self.get_userid_by_unionid(unionid)
return userid
def get_userid_by_unionid(self, unionid):
body = {
'unionid': unionid
}
data = self._request.post(URL.GET_USERID_BY_UNIONID, json=body, with_token=True)
userid = data['result']['userid']
return userid
def send_by_template(self, template_id, user_ids, dept_ids, data):
body = {
'agent_id': self._agentid,
'template_id': template_id,
'userid_list': ','.join(user_ids),
'dept_id_list': ','.join(dept_ids),
'data': data
}
data = self._request.post(URL.SEND_MESSAGE_BY_TEMPLATE, json=body, with_token=True)
def send_text(self, user_ids, msg):
body = {
'agent_id': self._agentid,
'userid_list': ','.join(user_ids),
# 'dept_id_list': '',
'to_all_user': False,
'msg': {
'msgtype': 'text',
'text': {
'content': msg
}
}
}
logger.info(f'Dingtalk send text: user_ids={user_ids} msg={msg}')
data = self._request.post(URL.SEND_MESSAGE, json=body, with_token=True)
return data
def get_send_msg_progress(self, task_id):
body = {
'agent_id': self._agentid,
'task_id': task_id
}
data = self._request.post(URL.GET_SEND_MSG_PROGRESS, json=body, with_token=True)
return data

View File

@@ -0,0 +1,23 @@
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import APIException
class HTTPNot200(APIException):
default_code = 'http_not_200'
default_detail = 'HTTP status is not 200'
class ErrCodeNot0(APIException):
default_code = 'errcode_not_0'
default_detail = 'Error code is not 0'
class ResponseDataKeyError(APIException):
default_code = 'response_data_key_error'
default_detail = 'Response data key error'
class NetError(APIException):
default_code = 'net_error'
default_detail = _('Network error, please contact system administrator')

View File

@@ -0,0 +1,115 @@
import json
from django.utils.translation import ugettext_lazy as _
from rest_framework.exceptions import APIException
from common.utils.common import get_logger
from common.sdk.im.utils import digest
from common.sdk.im.mixin import RequestMixin, BaseRequest
logger = get_logger(__name__)
class URL:
AUTHEN = 'https://open.feishu.cn/open-apis/authen/v1/index'
GET_TOKEN = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/'
# https://open.feishu.cn/document/ukTMukTMukTM/uEDO4UjLxgDO14SM4gTN
GET_USER_INFO_BY_CODE = 'https://open.feishu.cn/open-apis/authen/v1/access_token'
SEND_MESSAGE = 'https://open.feishu.cn/open-apis/im/v1/messages'
class ErrorCode:
INVALID_APP_ACCESS_TOKEN = 99991664
INVALID_USER_ACCESS_TOKEN = 99991668
INVALID_TENANT_ACCESS_TOKEN = 99991663
class FeishuRequests(BaseRequest):
"""
处理系统级错误,抛出 API 异常,直接生成 HTTP 响应,业务代码无需关心这些错误
- 确保 status_code == 200
- 确保 access_token 无效时重试
"""
invalid_token_errcodes = (
ErrorCode.INVALID_USER_ACCESS_TOKEN, ErrorCode.INVALID_TENANT_ACCESS_TOKEN,
ErrorCode.INVALID_APP_ACCESS_TOKEN
)
code_key = 'code'
msg_key = 'msg'
def __init__(self, app_id, app_secret, timeout=None):
self._app_id = app_id
self._app_secret = app_secret
super().__init__(timeout=timeout)
def get_access_token_cache_key(self):
return digest(self._app_id, self._app_secret)
def request_access_token(self):
data = {'app_id': self._app_id, 'app_secret': self._app_secret}
response = self.raw_request('post', url=URL.GET_TOKEN, data=data)
self.check_errcode_is_0(response)
access_token = response['tenant_access_token']
expires_in = response['expire']
return access_token, expires_in
def add_token(self, kwargs: dict):
headers = kwargs.setdefault('headers', {})
headers['Authorization'] = f'Bearer {self.access_token}'
class FeiShu(RequestMixin):
"""
非业务数据导致的错误直接抛异常,说明是系统配置错误,业务代码不用理会
"""
def __init__(self, app_id, app_secret, timeout=None):
self._app_id = app_id or ''
self._app_secret = app_secret or ''
self._requests = FeishuRequests(
app_id=app_id,
app_secret=app_secret,
timeout=timeout
)
def get_user_id_by_code(self, code):
# https://open.feishu.cn/document/ukTMukTMukTM/uEDO4UjLxgDO14SM4gTN
body = {
'grant_type': 'authorization_code',
'code': code
}
data = self._requests.post(URL.GET_USER_INFO_BY_CODE, json=body, check_errcode_is_0=False)
self._requests.check_errcode_is_0(data)
return data['data']['user_id']
def send_text(self, user_ids, msg):
params = {
'receive_id_type': 'user_id'
}
body = {
'msg_type': 'text',
'content': json.dumps({'text': msg})
}
invalid_users = []
for user_id in user_ids:
body['receive_id'] = user_id
try:
logger.info(f'Feishu send text: user_ids={user_ids} msg={msg}')
self._requests.post(URL.SEND_MESSAGE, params=params, json=body)
except APIException as e:
# 只处理可预知的错误
logger.exception(e)
invalid_users.append(user_id)
return invalid_users

149
apps/common/sdk/im/mixin.py Normal file
View File

@@ -0,0 +1,149 @@
import requests
from requests import exceptions as req_exce
from rest_framework.exceptions import PermissionDenied
from django.core.cache import cache
from .utils import DictWrapper
from common.utils.common import get_logger
from common.utils import lazyproperty
from common.sdk.im.utils import set_default, as_request
from . import exceptions as exce
logger = get_logger(__name__)
class RequestMixin:
code_key: str
msg_key: str
class BaseRequest(RequestMixin):
"""
定义了 `access_token` 的过期刷新框架
"""
invalid_token_errcodes = ()
code_key = 'errcode'
msg_key = 'err_msg'
def __init__(self, timeout=None):
self._request_kwargs = {
'timeout': timeout
}
self.init_access_token()
@classmethod
def check_errcode_is_0(cls, data: DictWrapper):
errcode = data[cls.code_key]
if errcode != 0:
# 如果代码写的对,配置没问题,这里不该出错,系统性错误,直接抛异常
errmsg = data[cls.msg_key]
logger.error(f'Response 200 but errcode is not 0: '
f'errcode={errcode} '
f'errmsg={errmsg} ')
raise exce.ErrCodeNot0(detail=data.raw_data)
@staticmethod
def check_http_is_200(response):
if response.status_code != 200:
# 正常情况下不会返回非 200 响应码
logger.error(f'Response error: '
f'status_code={response.status_code} '
f'url={response.url}'
f'\ncontent={response.content}')
raise exce.HTTPNot200(detail=response.json())
def request_access_token(self):
"""
获取新的 `access_token` 的方法,子类需要实现
"""
raise NotImplementedError
def get_access_token_cache_key(self):
"""
获取 `access_token` 的缓存 key 子类需要实现
"""
raise NotImplementedError
def add_token(self, kwargs: dict):
"""
添加 token ,子类需要实现
"""
raise NotImplementedError
def is_token_invalid(self, data):
code = data[self.code_key]
if code in self.invalid_token_errcodes:
logger.error(f'OAuth token invalid: {data}')
return True
return False
@lazyproperty
def access_token_cache_key(self):
return self.get_access_token_cache_key()
def init_access_token(self):
access_token = cache.get(self.access_token_cache_key)
if access_token:
self.access_token = access_token
return
self.refresh_access_token()
def refresh_access_token(self):
access_token, expires_in = self.request_access_token()
self.access_token = access_token
cache.set(self.access_token_cache_key, access_token, expires_in - 10)
def raw_request(self, method, url, **kwargs):
set_default(kwargs, self._request_kwargs)
try:
response = getattr(requests, method)(url, **kwargs)
self.check_http_is_200(response)
raw_data = response.json()
data = DictWrapper(raw_data)
return data
except req_exce.ReadTimeout as e:
logger.exception(e)
raise exce.NetError
def token_request(self, method, url, **kwargs):
for i in range(3):
# 循环为了防止 access_token 失效
self.add_token(kwargs)
data = self.raw_request(method, url, **kwargs)
if self.is_token_invalid(data):
self.refresh_access_token()
continue
return data
logger.error(f'Get access_token error, check config: url={url} data={data.raw_data}')
raise PermissionDenied(data.raw_data)
def get(self, url, params=None, with_token=True,
check_errcode_is_0=True, **kwargs):
# self.request ...
pass
get = as_request(get)
def post(self, url, params=None, json=None,
with_token=True, check_errcode_is_0=True,
**kwargs):
# self.request ...
pass
post = as_request(post)
def request(self, method, url,
with_token=True,
check_errcode_is_0=True,
**kwargs):
if with_token:
data = self.token_request(method, url, **kwargs)
else:
data = self.raw_request(method, url, **kwargs)
if check_errcode_is_0:
self.check_errcode_is_0(data)
return data

View File

@@ -0,0 +1,65 @@
from collections import OrderedDict
import importlib
from django.utils.translation import gettext_lazy as _
from django.db.models import TextChoices
from django.conf import settings
from common.utils import get_logger
from common.exceptions import JMSException
logger = get_logger(__file__)
class BACKENDS(TextChoices):
ALIBABA = 'alibaba', _('Alibaba cloud')
TENCENT = 'tencent', _('Tencent cloud')
class BaseSMSClient:
"""
短信终端的基类
"""
SIGN_AND_TMPL_SETTING_FIELD_PREFIX: str
@classmethod
def new_from_settings(cls):
raise NotImplementedError
def send_sms(self, phone_numbers: list, sign_name: str, template_code: str, template_param: dict, **kwargs):
raise NotImplementedError
class SMS:
client: BaseSMSClient
def __init__(self, backend=None):
backend = backend or settings.SMS_BACKEND
if backend not in BACKENDS:
raise JMSException(
code='sms_provider_not_support',
detail=_('SMS provider not support: {}').format(backend)
)
m = importlib.import_module(f'.{backend or settings.SMS_BACKEND}', __package__)
self.client = m.client.new_from_settings()
def send_sms(self, phone_numbers: list, sign_name: str, template_code: str, template_param: dict, **kwargs):
return self.client.send_sms(
phone_numbers=phone_numbers,
sign_name=sign_name,
template_code=template_code,
template_param=template_param,
**kwargs
)
def send_verify_code(self, phone_number, code):
sign_name = getattr(settings, f'{self.client.SIGN_AND_TMPL_SETTING_FIELD_PREFIX}_VERIFY_SIGN_NAME')
template_code = getattr(settings, f'{self.client.SIGN_AND_TMPL_SETTING_FIELD_PREFIX}_VERIFY_TEMPLATE_CODE')
if not (sign_name and template_code):
raise JMSException(
code='verify_code_sign_tmpl_invalid',
detail=_('SMS verification code signature or template invalid')
)
return self.send_sms([phone_number], sign_name, template_code, OrderedDict(code=code))

View File

@@ -0,0 +1,61 @@
import json
from django.utils.translation import gettext_lazy as _
from django.conf import settings
from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models
from Tea.exceptions import TeaException
from common.utils import get_logger
from common.exceptions import JMSException
from . import BaseSMSClient
logger = get_logger(__file__)
class AlibabaSMS(BaseSMSClient):
SIGN_AND_TMPL_SETTING_FIELD_PREFIX = 'ALIBABA'
@classmethod
def new_from_settings(cls):
return cls(
access_key_id=settings.ALIBABA_ACCESS_KEY_ID,
access_key_secret=settings.ALIBABA_ACCESS_KEY_SECRET
)
def __init__(self, access_key_id: str, access_key_secret: str):
config = open_api_models.Config(
# 您的AccessKey ID,
access_key_id=access_key_id,
# 您的AccessKey Secret,
access_key_secret=access_key_secret
)
# 访问的域名
config.endpoint = 'dysmsapi.aliyuncs.com'
self.client = Dysmsapi20170525Client(config)
def send_sms(self, phone_numbers: list, sign_name: str, template_code: str, template_param: dict, **kwargs):
phone_numbers_str = ','.join(phone_numbers)
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
phone_numbers=phone_numbers_str, sign_name=sign_name,
template_code=template_code, template_param=json.dumps(template_param)
)
try:
logger.info(f'Alibaba sms send: '
f'phone_numbers={phone_numbers} '
f'sign_name={sign_name} '
f'template_code={template_code} '
f'template_param={template_param}')
response = self.client.send_sms(send_sms_request)
# 这里只判断是否成功,失败抛出异常
if response.body.code != 'OK':
raise JMSException(detail=response.body.message, code=response.body.code)
except TeaException as e:
if e.code == 'SignatureDoesNotMatch':
raise JMSException(code=e.code, detail=_('Signature does not match'))
raise JMSException(code=e.code, detail=e.message)
return response
client = AlibabaSMS

View File

@@ -0,0 +1,98 @@
from collections import OrderedDict
from django.conf import settings
from common.exceptions import JMSException
from common.utils import get_logger
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
# 导入对应产品模块的client models。
from tencentcloud.sms.v20210111 import sms_client, models
# 导入可选配置类
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from . import BaseSMSClient
logger = get_logger(__file__)
class TencentSMS(BaseSMSClient):
"""
https://cloud.tencent.com/document/product/382/43196#.E5.8F.91.E9.80.81.E7.9F.AD.E4.BF.A1
"""
SIGN_AND_TMPL_SETTING_FIELD_PREFIX = 'TENCENT'
@classmethod
def new_from_settings(cls):
return cls(
secret_id=settings.TENCENT_SECRET_ID,
secret_key=settings.TENCENT_SECRET_KEY,
sdkappid=settings.TENCENT_SDKAPPID
)
def __init__(self, secret_id: str, secret_key: str, sdkappid: str):
self.sdkappid = sdkappid
cred = credential.Credential(secret_id, secret_key)
httpProfile = HttpProfile()
httpProfile.reqMethod = "POST" # post请求(默认为post请求)
httpProfile.reqTimeout = 30 # 请求超时时间,单位为秒(默认60秒)
httpProfile.endpoint = "sms.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.signMethod = "TC3-HMAC-SHA256" # 指定签名算法
clientProfile.language = "en-US"
clientProfile.httpProfile = httpProfile
self.client = sms_client.SmsClient(cred, "ap-guangzhou", clientProfile)
def send_sms(self, phone_numbers: list, sign_name: str, template_code: str, template_param: OrderedDict, **kwargs):
try:
req = models.SendSmsRequest()
# 基本类型的设置:
# SDK采用的是指针风格指定参数即使对于基本类型你也需要用指针来对参数赋值。
# SDK提供对基本类型的指针引用封装函数
# 帮助链接:
# 短信控制台: https://console.cloud.tencent.com/smsv2
# sms helper: https://cloud.tencent.com/document/product/382/3773
# 短信应用ID: 短信SdkAppId在 [短信控制台] 添加应用后生成的实际SdkAppId示例如1400006666
req.SmsSdkAppId = self.sdkappid
# 短信签名内容: 使用 UTF-8 编码,必须填写已审核通过的签名,签名信息可登录 [短信控制台] 查看
req.SignName = sign_name
# 短信码号扩展号: 默认未开通,如需开通请联系 [sms helper]
req.ExtendCode = ""
# 用户的 session 内容: 可以携带用户侧 ID 等上下文信息server 会原样返回
req.SessionContext = "Jumpserver"
# 国际/港澳台短信 senderid: 国内短信填空,默认未开通,如需开通请联系 [sms helper]
req.SenderId = ""
# 下发手机号码,采用 E.164 标准,+[国家或地区码][手机号]
# 示例如:+8613711112222 其中前面有一个+号 86为国家码13711112222为手机号最多不要超过200个手机号
req.PhoneNumberSet = phone_numbers
# 模板 ID: 必须填写已审核通过的模板 ID。模板ID可登录 [短信控制台] 查看
req.TemplateId = template_code
# 模板参数: 若无模板参数,则设置为空
req.TemplateParamSet = list(template_param.values())
# 通过client对象调用DescribeInstances方法发起请求。注意请求方法名与请求对象是对应的。
# 返回的resp是一个DescribeInstancesResponse类的实例与请求对象对应。
logger.info(f'Tencent sms send: '
f'phone_numbers={phone_numbers} '
f'sign_name={sign_name} '
f'template_code={template_code} '
f'template_param={template_param}')
resp = self.client.SendSms(req)
try:
code = resp.SendStatusSet[0].Code
msg = resp.SendStatusSet[0].Message
except IndexError:
raise JMSException(code='response_bad', detail=resp)
if code.lower() != 'ok':
raise JMSException(code=code, detail=msg)
return resp
except TencentCloudSDKException as e:
raise JMSException(code=e.code, detail=e.message)
client = TencentSMS

View File

@@ -0,0 +1,78 @@
import hashlib
import inspect
from inspect import Parameter
from common.utils.common import get_logger
from common.sdk.im import exceptions as exce
logger = get_logger(__name__)
def digest(corp_id, corp_secret):
md5 = hashlib.md5()
md5.update(corp_id.encode())
md5.update(corp_secret.encode())
dist = md5.hexdigest()
return dist
def update_values(default: dict, others: dict):
for key in default.keys():
if key in others:
default[key] = others[key]
def set_default(data: dict, default: dict):
for key in default.keys():
if key not in data:
data[key] = default[key]
class DictWrapper:
def __init__(self, data:dict):
self.raw_data = data
def __getitem__(self, item):
# 网络请求返回的数据,不能完全信任,所以字典操作包在异常里
try:
return self.raw_data[item]
except KeyError as e:
msg = f'Response 200 but get field from json error: error={e} data={self.raw_data}'
logger.error(msg)
raise exce.ResponseDataKeyError(detail=self.raw_data)
def __getattr__(self, item):
return getattr(self.raw_data, item)
def __contains__(self, item):
return item in self.raw_data
def __str__(self):
return str(self.raw_data)
def __repr__(self):
return str(self.raw_data)
def as_request(func):
def inner(*args, **kwargs):
signature = inspect.signature(func)
bound_args = signature.bind(*args, **kwargs)
bound_args.apply_defaults()
arguments = bound_args.arguments
self = arguments['self']
request_method = func.__name__
parameters = {}
for k, v in signature.parameters.items():
if k == 'self':
continue
if v.kind is Parameter.VAR_KEYWORD:
parameters.update(arguments[k])
continue
parameters[k] = arguments[k]
response = self.request(request_method, **parameters)
return response
return inner

View File

@@ -0,0 +1,172 @@
from typing import Iterable, AnyStr
from django.utils.translation import ugettext_lazy as _
from rest_framework.exceptions import APIException
from common.utils.common import get_logger
from common.sdk.im.utils import digest, DictWrapper, update_values, set_default
from common.sdk.im.mixin import RequestMixin, BaseRequest
logger = get_logger(__name__)
class WeComError(APIException):
default_code = 'wecom_error'
default_detail = _('WeCom error, please contact system administrator')
class URL:
GET_TOKEN = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
SEND_MESSAGE = 'https://qyapi.weixin.qq.com/cgi-bin/message/send'
QR_CONNECT = 'https://open.work.weixin.qq.com/wwopen/sso/qrConnect'
# https://open.work.weixin.qq.com/api/doc/90000/90135/91437
GET_USER_ID_BY_CODE = 'https://qyapi.weixin.qq.com/cgi-bin/user/getuserinfo'
GET_USER_DETAIL = 'https://qyapi.weixin.qq.com/cgi-bin/user/get'
class ErrorCode:
# https://open.work.weixin.qq.com/api/doc/90000/90139/90313#%E9%94%99%E8%AF%AF%E7%A0%81%EF%BC%9A81013
RECIPIENTS_INVALID = 81013 # UserID、部门ID、标签ID全部非法或无权限。
# https: // open.work.weixin.qq.com / devtool / query?e = 82001
RECIPIENTS_EMPTY = 82001 # 指定的成员/部门/标签全部为空
# https://open.work.weixin.qq.com/api/doc/90000/90135/91437
INVALID_CODE = 40029
INVALID_TOKEN = 40014 # 无效的 access_token
class WeComRequests(BaseRequest):
"""
处理系统级错误,抛出 API 异常,直接生成 HTTP 响应,业务代码无需关心这些错误
- 确保 status_code == 200
- 确保 access_token 无效时重试
"""
invalid_token_errcodes = (ErrorCode.INVALID_TOKEN,)
def __init__(self, corpid, corpsecret, agentid, timeout=None):
self._corpid = corpid or ''
self._corpsecret = corpsecret or ''
self._agentid = agentid or ''
super().__init__(timeout=timeout)
def get_access_token_cache_key(self):
return digest(self._corpid, self._corpsecret)
def request_access_token(self):
params = {'corpid': self._corpid, 'corpsecret': self._corpsecret}
data = self.raw_request('get', url=URL.GET_TOKEN, params=params)
access_token = data['access_token']
expires_in = data['expires_in']
return access_token, expires_in
def add_token(self, kwargs: dict):
params = kwargs.get('params')
if params is None:
params = {}
kwargs['params'] = params
params['access_token'] = self.access_token
class WeCom(RequestMixin):
"""
非业务数据导致的错误直接抛异常,说明是系统配置错误,业务代码不用理会
"""
def __init__(self, corpid, corpsecret, agentid, timeout=None):
self._corpid = corpid or ''
self._corpsecret = corpsecret or ''
self._agentid = agentid or ''
self._requests = WeComRequests(
corpid=corpid,
corpsecret=corpsecret,
agentid=agentid,
timeout=timeout
)
def send_text(self, users: Iterable, msg: AnyStr, **kwargs):
"""
https://open.work.weixin.qq.com/api/doc/90000/90135/90236
对于业务代码,只需要关心由 用户id 或 消息不对 导致的错误,其他错误不予理会
"""
users = tuple(users)
extra_params = {
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
update_values(extra_params, kwargs)
body = {
"touser": '|'.join(users),
"msgtype": "text",
"agentid": self._agentid,
"text": {
"content": msg
},
**extra_params
}
logger.info(f'Wecom send text: users={users} msg={msg}')
data = self._requests.post(URL.SEND_MESSAGE, json=body, check_errcode_is_0=False)
errcode = data['errcode']
if errcode in (ErrorCode.RECIPIENTS_INVALID, ErrorCode.RECIPIENTS_EMPTY):
# 全部接收人无权限或不存在
return users
self._requests.check_errcode_is_0(data)
invaliduser = data['invaliduser']
if not invaliduser:
return ()
if isinstance(invaliduser, str):
logger.error(f'WeCom send text 200, but invaliduser is not str: invaliduser={invaliduser}')
raise WeComError
invalid_users = invaliduser.split('|')
return invalid_users
def get_user_id_by_code(self, code):
# # https://open.work.weixin.qq.com/api/doc/90000/90135/91437
params = {
'code': code,
}
data = self._requests.get(URL.GET_USER_ID_BY_CODE, params=params, check_errcode_is_0=False)
errcode = data['errcode']
if errcode == ErrorCode.INVALID_CODE:
logger.warn(f'WeCom get_user_id_by_code invalid code: code={code}')
return None, None
self._requests.check_errcode_is_0(data)
USER_ID = 'UserId'
OPEN_ID = 'OpenId'
if USER_ID in data:
return data[USER_ID], USER_ID
elif OPEN_ID in data:
return data[OPEN_ID], OPEN_ID
else:
logger.error(f'WeCom response 200 but get field from json error: fields=UserId|OpenId')
raise WeComError
def get_user_detail(self, id):
# https://open.work.weixin.qq.com/api/doc/90000/90135/90196
params = {
'userid': id,
}
data = self._requests.get(URL.GET_USER_DETAIL, params)
return data