feat: 支持 passkey 登录 (#11519)

* perf: 基本完成功能

* perf: 优化 passkey

* perf: 优化 passkey

* perf: 完成 passkey

---------

Co-authored-by: ibuler <ibuler@qq.com>
This commit is contained in:
fit2bot
2023-09-11 18:15:03 +08:00
committed by GitHub
parent d7ca1a09d4
commit 72b215ed03
37 changed files with 899 additions and 144 deletions

View File

@@ -1,15 +1,15 @@
# -*- coding: utf-8 -*-
#
from .connection_token import *
from .token import *
from .mfa import *
from .access_key import *
from .confirm import *
from .login_confirm import *
from .sso import *
from .wecom import *
from .connection_token import *
from .dingtalk import *
from .feishu import *
from .login_confirm import *
from .mfa import *
from .password import *
from .sso import *
from .temp_token import *
from .token import *
from .wecom import *

View File

@@ -0,0 +1 @@
from .backends import *

View File

@@ -0,0 +1,59 @@
from django.conf import settings
from django.http import JsonResponse
from django.shortcuts import render
from django.utils.translation import gettext as _
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework.viewsets import ModelViewSet
from authentication.mixins import AuthMixin
from .fido import register_begin, register_complete, auth_begin, auth_complete
from .models import Passkey
from .serializer import PasskeySerializer
from ...views import FlashMessageMixin
class PasskeyViewSet(AuthMixin, FlashMessageMixin, ModelViewSet):
serializer_class = PasskeySerializer
permission_classes = (IsAuthenticated,)
def get_queryset(self):
return Passkey.objects.filter(user=self.request.user)
@action(methods=['get', 'post'], detail=False, url_path='register')
def register(self, request):
if request.method == 'GET':
register_data, state = register_begin(request)
return JsonResponse(dict(register_data))
else:
passkey = register_complete(request)
return JsonResponse({'id': passkey.id.__str__(), 'name': passkey.name})
@action(methods=['get'], detail=False, url_path='login', permission_classes=[AllowAny])
def login(self, request):
return render(request, 'authentication/passkey.html', {})
def redirect_to_error(self, error):
self.send_auth_signal(success=False, username='unknown', reason='passkey')
return render(self.request, 'authentication/passkey.html', {'error': error})
@action(methods=['get', 'post'], detail=False, url_path='auth', permission_classes=[AllowAny])
def auth(self, request):
if request.method == 'GET':
auth_data = auth_begin(request)
return JsonResponse(dict(auth_data))
try:
user = auth_complete(request)
except ValueError as e:
return self.redirect_to_error(str(e))
if not user:
return self.redirect_to_error(_('Auth failed'))
try:
self.check_oauth2_auth(user, settings.AUTH_BACKEND_PASSKEY)
return self.redirect_to_guard_view()
except Exception as e:
msg = getattr(e, 'msg', '') or str(e)
return self.redirect_to_error(msg)

View File

@@ -0,0 +1,9 @@
from django.conf import settings
from ..base import JMSModelBackend
class PasskeyAuthBackend(JMSModelBackend):
@staticmethod
def is_enabled():
return settings.AUTH_PASSKEY

View File

@@ -0,0 +1,155 @@
import json
from urllib.parse import urlparse
import fido2.features
from django.conf import settings
from django.utils import timezone
from django.utils.translation import gettext as _
from fido2.server import Fido2Server
from fido2.utils import websafe_decode, websafe_encode
from fido2.webauthn import PublicKeyCredentialRpEntity, AttestedCredentialData, PublicKeyCredentialUserEntity
from rest_framework.serializers import ValidationError
from user_agents.parsers import parse as ua_parse
from common.utils import get_logger
from .models import Passkey
logger = get_logger(__name__)
try:
fido2.features.webauthn_json_mapping.enabled = True
except:
pass
def get_current_platform(request):
ua = ua_parse(request.META["HTTP_USER_AGENT"])
if 'Safari' in ua.browser.family:
return "Apple"
elif 'Chrome' in ua.browser.family and ua.os.family == "Mac OS X":
return "Chrome on Apple"
elif 'Android' in ua.os.family:
return "Google"
elif "Windows" in ua.os.family:
return "Microsoft"
else:
return "Key"
def get_server_id_from_request(request, allowed=()):
origin = request.META.get('HTTP_REFERER')
if not origin:
origin = request.get_host()
p = urlparse(origin)
if p.netloc in allowed or p.hostname in allowed:
return p.hostname
else:
return 'localhost'
def default_server_id(request):
domains = settings.ALLOWED_DOMAINS
return get_server_id_from_request(request, allowed=domains)
def get_server(request=None):
"""Get Server Info from settings and returns a Fido2Server"""
server_id = settings.FIDO_SERVER_ID or default_server_id(request)
if callable(server_id):
fido_server_id = settings.FIDO_SERVER_ID(request)
elif ',' in server_id:
fido_server_id = get_server_id_from_request(request, allowed=server_id.split(','))
else:
fido_server_id = server_id
logger.debug('Fido server id: {}'.format(fido_server_id))
if callable(settings.FIDO_SERVER_NAME):
fido_server_name = settings.FIDO_SERVER_NAME(request)
else:
fido_server_name = settings.FIDO_SERVER_NAME
rp = PublicKeyCredentialRpEntity(id=fido_server_id, name=fido_server_name)
return Fido2Server(rp)
def get_user_credentials(username):
user_passkeys = Passkey.objects.filter(user__username=username)
return [AttestedCredentialData(websafe_decode(uk.token)) for uk in user_passkeys]
def register_begin(request):
server = get_server(request)
user = request.user
user_credentials = get_user_credentials(user.username)
prefix = request.query_params.get('name', '')
prefix = '(' + prefix + ')'
user_entity = PublicKeyCredentialUserEntity(
id=str(user.id).encode('utf8'),
name=user.username + prefix,
display_name=user.name,
)
auth_attachment = getattr(settings, 'KEY_ATTACHMENT', None)
data, state = server.register_begin(
user_entity, user_credentials,
authenticator_attachment=auth_attachment,
resident_key_requirement=fido2.webauthn.ResidentKeyRequirement.PREFERRED
)
request.session['fido2_state'] = state
data = dict(data)
return data, state
def register_complete(request):
if not request.session.get("fido2_state"):
raise ValidationError("No state found")
data = request.data
server = get_server(request)
state = request.session.pop("fido2_state")
auth_data = server.register_complete(state, response=data)
encoded = websafe_encode(auth_data.credential_data)
platform = get_current_platform(request)
name = data.pop("key_name", '') or platform
passkey = Passkey.objects.create(
user=request.user,
token=encoded,
name=name,
platform=platform,
credential_id=data.get('id')
)
return passkey
def auth_begin(request):
server = get_server(request)
credentials = []
username = None
if request.user.is_authenticated:
username = request.user.username
if username:
credentials = get_user_credentials(username)
auth_data, state = server.authenticate_begin(credentials)
request.session['fido2_state'] = state
return auth_data
def auth_complete(request):
server = get_server(request)
data = request.data.get("passkeys")
data = json.loads(data)
cid = data['id']
key = Passkey.objects.filter(credential_id=cid, is_active=True).first()
if not key:
raise ValueError(_("This key is not registered"))
credentials = [AttestedCredentialData(websafe_decode(key.token))]
state = request.session.get('fido2_state')
server.authenticate_complete(state, credentials=credentials, response=data)
request.session["passkey"] = '{}_{}'.format(key.id, key.name)
key.date_last_used = timezone.now()
key.save(update_fields=['date_last_used'])
return key.user

View File

@@ -0,0 +1,19 @@
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _
from common.db.models import JMSBaseModel
class Passkey(JMSBaseModel):
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
name = models.CharField(max_length=255, verbose_name=_("Name"))
is_active = models.BooleanField(default=True, verbose_name=_("Enabled"))
platform = models.CharField(max_length=255, default='', verbose_name=_("Platform"))
added_on = models.DateTimeField(auto_now_add=True, verbose_name=_("Added on"))
date_last_used = models.DateTimeField(null=True, default=None, verbose_name=_("Date last used"))
credential_id = models.CharField(max_length=255, unique=True, null=False, verbose_name=_("Credential ID"))
token = models.CharField(max_length=255, null=False, verbose_name=_("Token"))
def __str__(self):
return self.name

View File

@@ -0,0 +1,13 @@
from rest_framework import serializers
from .models import Passkey
class PasskeySerializer(serializers.ModelSerializer):
class Meta:
model = Passkey
fields = [
'id', 'name', 'is_active', 'created_by',
'date_last_used', 'date_created',
]
read_only_fields = list(set(fields) - {'is_active'})

View File

@@ -0,0 +1,9 @@
from rest_framework.routers import DefaultRouter
from . import api
router = DefaultRouter()
router.register('passkeys', api.PasskeyViewSet, 'passkey')
urlpatterns = []
urlpatterns += router.urls

View File

@@ -0,0 +1,39 @@
# Generated by Django 4.1.10 on 2023-09-08 08:10
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import uuid
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('authentication', '0021_auto_20230713_1459'),
]
operations = [
migrations.CreateModel(
name='Passkey',
fields=[
('created_by', models.CharField(blank=True, max_length=128, null=True, verbose_name='Created by')),
('updated_by', models.CharField(blank=True, max_length=128, null=True, verbose_name='Updated by')),
('date_created', models.DateTimeField(auto_now_add=True, null=True, verbose_name='Date created')),
('date_updated', models.DateTimeField(auto_now=True, verbose_name='Date updated')),
('comment', models.TextField(blank=True, default='', verbose_name='Comment')),
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('name', models.CharField(max_length=255, verbose_name='Name')),
('is_active', models.BooleanField(default=True, verbose_name='Enabled')),
('platform', models.CharField(default='', max_length=255, verbose_name='Platform')),
('added_on', models.DateTimeField(auto_now_add=True, verbose_name='Added on')),
('date_last_used', models.DateTimeField(default=None, null=True, verbose_name='Date last used')),
('credential_id', models.CharField(max_length=255, unique=True, verbose_name='Credential ID')),
('token', models.CharField(max_length=255, verbose_name='Token')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
options={
'abstract': False,
},
),
]

View File

@@ -132,11 +132,11 @@ class CommonMixin:
return user
user_id = self.request.session.get('user_id')
auth_password = self.request.session.get('auth_password')
auth_ok = self.request.session.get('auth_password')
auth_expired_at = self.request.session.get('auth_password_expired_at')
auth_expired = auth_expired_at < time.time() if auth_expired_at else False
if not user_id or not auth_password or auth_expired:
if not user_id or not auth_ok or auth_expired:
raise errors.SessionEmptyError()
user = get_object_or_404(User, pk=user_id)
@@ -479,6 +479,7 @@ class AuthMixin(CommonMixin, AuthPreCheckMixin, AuthACLMixin, MFAMixin, AuthPost
request.session['auto_login'] = auto_login
if not auth_backend:
auth_backend = getattr(user, 'backend', settings.AUTH_BACKEND_MODEL)
request.session['auth_backend'] = auth_backend
def check_oauth2_auth(self, user: User, auth_backend):
@@ -511,7 +512,8 @@ class AuthMixin(CommonMixin, AuthPreCheckMixin, AuthACLMixin, MFAMixin, AuthPost
def clear_auth_mark(self):
keys = [
'auth_password', 'user_id', 'auth_confirm_required', 'auth_ticket_id', 'auth_acl_id'
'auth_password', 'user_id', 'auth_confirm_required',
'auth_ticket_id', 'auth_acl_id'
]
for k in keys:
self.request.session.pop(k, '')

View File

@@ -0,0 +1,191 @@
{% load static %}
{% load i18n %}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Login passkey</title>
<script src="{% static "js/jquery-3.6.1.min.js" %}?_=9"></script>
</head>
<body>
<form action='{% url 'api-auth:passkey-auth' %}' method="post" id="loginForm">
<input type="hidden" name="passkeys" id="passkeys"/>
</form>
</body>
<script>
const loginUrl = "/core/auth/login/";
window.conditionalUI = false;
window.conditionUIAbortController = new AbortController();
window.conditionUIAbortSignal = conditionUIAbortController.signal;
const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
// Use a lookup table to find the index.
const lookup = new Uint8Array(256)
for (let i = 0; i < chars.length; i++) {
lookup[chars.charCodeAt(i)] = i
}
const encode = function (arraybuffer) {
const bytes = new Uint8Array(arraybuffer)
let i;
const len = bytes.length;
let base64url = ''
for (i = 0; i < len; i += 3) {
base64url += chars[bytes[i] >> 2]
base64url += chars[((bytes[i] & 3) << 4) | (bytes[i + 1] >> 4)]
base64url += chars[((bytes[i + 1] & 15) << 2) | (bytes[i + 2] >> 6)]
base64url += chars[bytes[i + 2] & 63]
}
if ((len % 3) === 2) {
base64url = base64url.substring(0, base64url.length - 1)
} else if (len % 3 === 1) {
base64url = base64url.substring(0, base64url.length - 2)
}
return base64url
}
const decode = function (base64string) {
const bufferLength = base64string.length * 0.75
const len = base64string.length;
let i;
let p = 0
let encoded1;
let encoded2;
let encoded3;
let encoded4
const bytes = new Uint8Array(bufferLength)
for (i = 0; i < len; i += 4) {
encoded1 = lookup[base64string.charCodeAt(i)]
encoded2 = lookup[base64string.charCodeAt(i + 1)]
encoded3 = lookup[base64string.charCodeAt(i + 2)]
encoded4 = lookup[base64string.charCodeAt(i + 3)]
bytes[p++] = (encoded1 << 2) | (encoded2 >> 4)
bytes[p++] = ((encoded2 & 15) << 4) | (encoded3 >> 2)
bytes[p++] = ((encoded3 & 3) << 6) | (encoded4 & 63)
}
return bytes.buffer
}
function checkConditionalUI(form) {
if (!navigator.credentials) {
alert('WebAuthn is not supported in this browser')
return
}
if (window.PublicKeyCredential && PublicKeyCredential.isConditionalMediationAvailable) {
// Check if conditional mediation is available.
PublicKeyCredential.isConditionalMediationAvailable().then((result) => {
window.conditionalUI = result;
if (!window.conditionalUI) {
alert("Conditional UI is not available. Please use the legacy UI.");
} else {
return true
}
});
}
}
const publicKeyCredentialToJSON = (pubKeyCred) => {
if (pubKeyCred instanceof Array) {
const arr = []
for (const i of pubKeyCred) {
arr.push(publicKeyCredentialToJSON(i))
}
return arr
}
if (pubKeyCred instanceof ArrayBuffer) {
return encode(pubKeyCred)
}
if (pubKeyCred instanceof Object) {
const obj = {}
for (const key in pubKeyCred) {
obj[key] = publicKeyCredentialToJSON(pubKeyCred[key])
}
return obj
}
return pubKeyCred
}
function GetAssertReq(getAssert) {
getAssert.publicKey.challenge = decode(getAssert.publicKey.challenge)
for (const allowCred of getAssert.publicKey.allowCredentials) {
allowCred.id = decode(allowCred.id)
}
return getAssert
}
function startAuthn(form, conditionalUI = false) {
window.loginForm = form
fetch('/api/v1/authentication/passkeys/auth/', {method: 'GET'}).then(function (response) {
if (response.ok) {
return response.json().then(function (req) {
return GetAssertReq(req)
})
}
throw new Error('No credential available to authenticate!')
}).then(function (options) {
if (conditionalUI) {
options.mediation = 'conditional'
options.signal = window.conditionUIAbortSignal
} else {
window.conditionUIAbortController.abort()
}
return navigator.credentials.get(options)
}).then(function (assertion) {
const pk = $('#passkeys')
if (pk.length === 0) {
retry("Did you add the 'passkeys' hidden input field")
return
}
pk.val(JSON.stringify(publicKeyCredentialToJSON(assertion)))
const x = document.getElementById(window.loginForm)
if (x === null || x === undefined) {
console.error('Did you pass the correct form id to auth function')
return
}
x.submit()
}).catch(function (err) {
retry(err)
})
}
function safeStartAuthn(form) {
checkConditionalUI('loginForm')
const errorMsg = "{% trans 'This page is not served over HTTPS. Please use HTTPS to ensure security of your credentials.' %}"
const isSafe = window.location.protocol === 'https:'
if (!isSafe && location.hostname !== 'localhost') {
alert(errorMsg)
window.location.href = loginUrl
} else {
setTimeout(() => startAuthn('loginForm'), 100)
}
}
function retry(error) {
const fullError = "{% trans 'Error' %}" + ': ' + error + "\n\n " + "{% trans 'Do you want to retry ?' %}"
const result = confirm(fullError)
if (result) {
safeStartAuthn()
} else {
window.location.href = loginUrl
}
}
{% if not error %}
window.onload = function () {
safeStartAuthn()
}
{% else %}
const error = "{{ error }}"
retry(error)
{% endif %}
</script>
</html>

View File

@@ -4,6 +4,7 @@ from django.urls import path
from rest_framework.routers import DefaultRouter
from .. import api
from ..backends.passkey.urls import urlpatterns as passkey_urlpatterns
app_name = 'authentication'
router = DefaultRouter()
@@ -13,17 +14,19 @@ router.register('temp-tokens', api.TempTokenViewSet, 'temp-token')
router.register('connection-token', api.ConnectionTokenViewSet, 'connection-token')
router.register('super-connection-token', api.SuperConnectionTokenViewSet, 'super-connection-token')
urlpatterns = [
path('wecom/qr/unbind/', api.WeComQRUnBindForUserApi.as_view(), name='wecom-qr-unbind'),
path('wecom/qr/unbind/<uuid:user_id>/', api.WeComQRUnBindForAdminApi.as_view(), name='wecom-qr-unbind-for-admin'),
path('dingtalk/qr/unbind/', api.DingTalkQRUnBindForUserApi.as_view(), name='dingtalk-qr-unbind'),
path('dingtalk/qr/unbind/<uuid:user_id>/', api.DingTalkQRUnBindForAdminApi.as_view(), name='dingtalk-qr-unbind-for-admin'),
path('dingtalk/qr/unbind/<uuid:user_id>/', api.DingTalkQRUnBindForAdminApi.as_view(),
name='dingtalk-qr-unbind-for-admin'),
path('feishu/qr/unbind/', api.FeiShuQRUnBindForUserApi.as_view(), name='feishu-qr-unbind'),
path('feishu/qr/unbind/<uuid:user_id>/', api.FeiShuQRUnBindForAdminApi.as_view(), name='feishu-qr-unbind-for-admin'),
path('feishu/event/subscription/callback/', api.FeiShuEventSubscriptionCallback.as_view(), name='feishu-event-subscription-callback'),
path('feishu/qr/unbind/<uuid:user_id>/', api.FeiShuQRUnBindForAdminApi.as_view(),
name='feishu-qr-unbind-for-admin'),
path('feishu/event/subscription/callback/', api.FeiShuEventSubscriptionCallback.as_view(),
name='feishu-event-subscription-callback'),
path('auth/', api.TokenCreateApi.as_view(), name='user-auth'),
path('confirm/', api.ConfirmApi.as_view(), name='user-confirm'),
@@ -38,4 +41,4 @@ urlpatterns = [
path('login-confirm-ticket/status/', api.TicketStatusApi.as_view(), name='login-confirm-ticket-status'),
]
urlpatterns += router.urls
urlpatterns += router.urls + passkey_urlpatterns

View File

@@ -1,11 +1,11 @@
# coding:utf-8
#
from django.urls import path, include
from django.db.transaction import non_atomic_requests
from django.urls import path, include
from .. import views
from users import views as users_view
from .. import views
app_name = 'authentication'
@@ -18,7 +18,8 @@ urlpatterns = [
path('logout/', views.UserLogoutView.as_view(), name='logout'),
# 原来在users中的
path('password/forget/previewing/', users_view.UserForgotPasswordPreviewingView.as_view(), name='forgot-previewing'),
path('password/forget/previewing/', users_view.UserForgotPasswordPreviewingView.as_view(),
name='forgot-previewing'),
path('password/forgot/', users_view.UserForgotPasswordView.as_view(), name='forgot-password'),
path('password/reset/', users_view.UserResetPasswordView.as_view(), name='reset-password'),
path('password/verify/', users_view.UserVerifyPasswordView.as_view(), name='user-verify-password'),
@@ -26,7 +27,8 @@ urlpatterns = [
path('wecom/bind/start/', views.WeComEnableStartView.as_view(), name='wecom-bind-start'),
path('wecom/qr/bind/', views.WeComQRBindView.as_view(), name='wecom-qr-bind'),
path('wecom/qr/login/', views.WeComQRLoginView.as_view(), name='wecom-qr-login'),
path('wecom/qr/bind/<uuid:user_id>/callback/', views.WeComQRBindCallbackView.as_view(), name='wecom-qr-bind-callback'),
path('wecom/qr/bind/<uuid:user_id>/callback/', views.WeComQRBindCallbackView.as_view(),
name='wecom-qr-bind-callback'),
path('wecom/qr/login/callback/', views.WeComQRLoginCallbackView.as_view(), name='wecom-qr-login-callback'),
path('wecom/oauth/login/', views.WeComOAuthLoginView.as_view(), name='wecom-oauth-login'),
path('wecom/oauth/login/callback/', views.WeComOAuthLoginCallbackView.as_view(), name='wecom-oauth-login-callback'),
@@ -34,10 +36,12 @@ urlpatterns = [
path('dingtalk/bind/start/', views.DingTalkEnableStartView.as_view(), name='dingtalk-bind-start'),
path('dingtalk/qr/bind/', views.DingTalkQRBindView.as_view(), name='dingtalk-qr-bind'),
path('dingtalk/qr/login/', views.DingTalkQRLoginView.as_view(), name='dingtalk-qr-login'),
path('dingtalk/qr/bind/<uuid:user_id>/callback/', views.DingTalkQRBindCallbackView.as_view(), name='dingtalk-qr-bind-callback'),
path('dingtalk/qr/bind/<uuid:user_id>/callback/', views.DingTalkQRBindCallbackView.as_view(),
name='dingtalk-qr-bind-callback'),
path('dingtalk/qr/login/callback/', views.DingTalkQRLoginCallbackView.as_view(), name='dingtalk-qr-login-callback'),
path('dingtalk/oauth/login/', views.DingTalkOAuthLoginView.as_view(), name='dingtalk-oauth-login'),
path('dingtalk/oauth/login/callback/', views.DingTalkOAuthLoginCallbackView.as_view(), name='dingtalk-oauth-login-callback'),
path('dingtalk/oauth/login/callback/', views.DingTalkOAuthLoginCallbackView.as_view(),
name='dingtalk-oauth-login-callback'),
path('feishu/bind/start/', views.FeiShuEnableStartView.as_view(), name='feishu-bind-start'),
path('feishu/qr/bind/', views.FeiShuQRBindView.as_view(), name='feishu-qr-bind'),

View File

@@ -90,6 +90,12 @@ class UserLoginContextMixin:
'enabled': settings.AUTH_FEISHU,
'url': reverse('authentication:feishu-qr-login'),
'logo': static('img/login_feishu_logo.png')
},
{
'name': _("Passkey"),
'enabled': settings.AUTH_PASSKEY,
'url': reverse('api-auth:passkey-login'),
'logo': static('img/login_passkey.png')
}
]
return [method for method in auth_methods if method['enabled']]
@@ -304,6 +310,12 @@ class UserLoginGuardView(mixins.AuthMixin, RedirectView):
age = self.request.session.get_expiry_age()
self.request.session.set_expiry(age)
def get(self, request, *args, **kwargs):
response = super().get(request, *args, **kwargs)
if request.user.is_authenticated:
response.set_cookie('jms_username', request.user.username)
return response
def get_redirect_url(self, *args, **kwargs):
try:
user = self.get_user_from_session()

View File

@@ -16,12 +16,19 @@ class METAMixin:
class FlashMessageMixin:
@staticmethod
def get_response(redirect_url, title, msg, m_type='message'):
message_data = {'title': title, 'interval': 5, 'redirect_url': redirect_url, m_type: msg}
def get_response(redirect_url='', title='', msg='', m_type='message', interval=5):
message_data = {
'title': title, 'interval': interval,
'redirect_url': redirect_url,
}
if m_type == 'error':
message_data['error'] = msg
else:
message_data['message'] = msg
return FlashMessageUtil.gen_and_redirect_to(message_data)
def get_success_response(self, redirect_url, title, msg):
return self.get_response(redirect_url, title, msg)
def get_success_response(self, redirect_url, title, msg, **kwargs):
return self.get_response(redirect_url, title, msg, m_type='success', **kwargs)
def get_failed_response(self, redirect_url, title, msg):
return self.get_response(redirect_url, title, msg, 'error')
def get_failed_response(self, redirect_url, title, msg, interval=10):
return self.get_response(redirect_url, title, msg, 'error', interval)