1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-25 23:02:26 +00:00
Files
seahub/seahub/dingtalk/utils.py
lian 7f6f29ccfe Send notices to social account (#4606)
* dingtalk msg commit for master branch

* send_notices_to_social_account

Co-authored-by: lian <lian@seafile.com>
2020-07-10 16:00:52 +08:00

64 lines
1.9 KiB
Python

import logging
import requests
from django.core.cache import cache
from seahub.utils import normalize_cache_key
from seahub.dingtalk.settings import DINGTALK_DEPARTMENT_APP_KEY, \
DINGTALK_DEPARTMENT_APP_SECRET, \
DINGTALK_DEPARTMENT_GET_ACCESS_TOKEN_URL, \
DINGTALK_GET_USERID_BY_UNIONID
logger = logging.getLogger(__name__)
def dingtalk_get_access_token():
cache_key = normalize_cache_key('DINGTALK_ACCESS_TOKEN')
access_token = cache.get(cache_key, None)
if access_token:
return access_token
data = {
'appkey': DINGTALK_DEPARTMENT_APP_KEY,
'appsecret': DINGTALK_DEPARTMENT_APP_SECRET,
}
resp_json = requests.get(DINGTALK_DEPARTMENT_GET_ACCESS_TOKEN_URL,
params=data).json()
access_token = resp_json.get('access_token', '')
if not access_token:
logger.error('failed to get dingtalk access_token')
logger.error(DINGTALK_DEPARTMENT_GET_ACCESS_TOKEN_URL)
logger.error(data)
logger.error(resp_json)
return ''
expires_in = resp_json.get('expires_in', 7200)
cache.set(cache_key, access_token, expires_in)
return access_token
def dingtalk_get_userid_by_unionid(union_id):
cache_key = normalize_cache_key('DINGTALK_UNION_ID_%s' % union_id)
user_id = cache.get(cache_key, None)
if user_id:
return user_id
access_token = dingtalk_get_access_token()
data = {
'access_token': access_token,
'unionid': union_id,
}
resp_json = requests.get(DINGTALK_GET_USERID_BY_UNIONID, params=data).json()
user_id = resp_json.get('userid', '')
if not user_id:
logger.error('failed to get userid by unionid: %s' % union_id)
logger.error(DINGTALK_GET_USERID_BY_UNIONID)
logger.error(data)
logger.error(resp_json)
return ''
cache.set(cache_key, user_id)
return user_id