feat: 企业微信、钉钉 工作台免密登录(飞书已实现) (#7855)

* feat: 添加oauth接口

* feat: 企业微信支持OAuth认证,工作台免密登录

* feat: 钉钉支持OAuth认证,工作台免密登录

* fix: 修复参数错误

Co-authored-by: halo <wuyihuangw@gmail.com>
This commit is contained in:
fit2bot
2022-03-29 13:19:13 +08:00
committed by GitHub
parent a20de3df16
commit 52709d2efa
5 changed files with 188 additions and 29 deletions

View File

@@ -28,7 +28,7 @@ logger = get_logger(__file__)
DINGTALK_STATE_SESSION_KEY = '_dingtalk_state'
class DingTalkQRMixin(PermissionsMixin, View):
class DingTalkBaseMixin(PermissionsMixin, View):
def dispatch(self, request, *args, **kwargs):
try:
return super().dispatch(request, *args, **kwargs)
@@ -54,20 +54,6 @@ class DingTalkQRMixin(PermissionsMixin, View):
msg = _("The system configuration is incorrect. Please contact your administrator")
return self.get_failed_response(redirect_uri, msg, msg)
def get_qr_url(self, redirect_uri):
state = random_string(16)
self.request.session[DINGTALK_STATE_SESSION_KEY] = state
params = {
'appid': settings.DINGTALK_APPKEY,
'response_type': 'code',
'scope': 'snsapi_login',
'state': state,
'redirect_uri': redirect_uri,
}
url = URL.QR_CONNECT + '?' + urlencode(params)
return url
@staticmethod
def get_success_response(redirect_url, title, msg):
message_data = {
@@ -94,6 +80,42 @@ class DingTalkQRMixin(PermissionsMixin, View):
return response
class DingTalkQRMixin(DingTalkBaseMixin, View):
def get_qr_url(self, redirect_uri):
state = random_string(16)
self.request.session[DINGTALK_STATE_SESSION_KEY] = state
params = {
'appid': settings.DINGTALK_APPKEY,
'response_type': 'code',
'scope': 'snsapi_login',
'state': state,
'redirect_uri': redirect_uri,
}
url = URL.QR_CONNECT + '?' + urlencode(params)
return url
class DingTalkOAuthMixin(DingTalkBaseMixin, View):
def get_oauth_url(self, redirect_uri):
if not settings.AUTH_DINGTALK:
return reverse('authentication:login')
state = random_string(16)
self.request.session[DINGTALK_STATE_SESSION_KEY] = state
params = {
'appid': settings.DINGTALK_APPKEY,
'response_type': 'code',
'scope': 'snsapi_auth',
'state': state,
'redirect_uri': redirect_uri,
}
url = URL.OAUTH_CONNECT + '?' + urlencode(params)
return url
class DingTalkQRBindView(DingTalkQRMixin, View):
permission_classes = (IsAuthenticated,)
@@ -230,3 +252,57 @@ class DingTalkQRLoginCallbackView(AuthMixin, DingTalkQRMixin, View):
return response
return self.redirect_to_guard_view()
class DingTalkOAuthLoginView(DingTalkOAuthMixin, View):
permission_classes = (AllowAny,)
def get(self, request: HttpRequest):
redirect_url = request.GET.get('redirect_url')
redirect_uri = reverse('authentication:dingtalk-oauth-login-callback', external=True)
redirect_uri += '?' + urlencode({'redirect_url': redirect_url})
url = self.get_oauth_url(redirect_uri)
return HttpResponseRedirect(url)
class DingTalkOAuthLoginCallbackView(AuthMixin, DingTalkOAuthMixin, View):
permission_classes = (AllowAny,)
def get(self, request: HttpRequest):
code = request.GET.get('code')
redirect_url = request.GET.get('redirect_url')
login_url = reverse('authentication:login')
if not self.verify_state():
return self.get_verify_state_failed_response(redirect_url)
dingtalk = DingTalk(
appid=settings.DINGTALK_APPKEY,
appsecret=settings.DINGTALK_APPSECRET,
agentid=settings.DINGTALK_AGENTID
)
userid = dingtalk.get_userid_by_code(code)
if not userid:
# 正常流程不会出这个错误hack 行为
msg = _('Failed to get user from DingTalk')
response = self.get_failed_response(login_url, title=msg, msg=msg)
return response
user = get_object_or_none(User, dingtalk_id=userid)
if user is None:
title = _('DingTalk is not bound')
msg = _('Please login with a password and then bind the DingTalk')
response = self.get_failed_response(login_url, title=title, msg=msg)
return response
try:
self.check_oauth2_auth(user, settings.AUTH_BACKEND_DINGTALK)
except errors.AuthFailedError as e:
self.set_login_failed_mark()
msg = e.msg
response = self.get_failed_response(login_url, title=msg, msg=msg)
return response
return self.redirect_to_guard_view()