mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-25 14:50:29 +00:00
Add webdav secret api
This commit is contained in:
@@ -19,3 +19,5 @@ django-simple-captcha==0.5.6
|
||||
gunicorn==19.8.1
|
||||
django-webpack-loader==0.6.0
|
||||
git+git://github.com/haiwen/python-cas.git@ffc49235fd7cc32c4fdda5acfa3707e1405881df#egg=python_cas
|
||||
|
||||
pycrypto==2.6.1
|
||||
|
53
seahub/api2/endpoints/webdav_secret.py
Normal file
53
seahub/api2/endpoints/webdav_secret.py
Normal file
@@ -0,0 +1,53 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
import logging
|
||||
from django.conf import settings
|
||||
from rest_framework import status
|
||||
from rest_framework.authentication import SessionAuthentication
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
from seahub.api2.utils import api_error
|
||||
from seahub.options.models import UserOptions
|
||||
from seahub.utils.hasher import AESPasswordHasher
|
||||
|
||||
# Get an instance of a logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebdavSecretView(APIView):
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAuthenticated, )
|
||||
throttle_classes = (UserRateThrottle, )
|
||||
|
||||
def get(self, request, format=None):
|
||||
if not settings.ENABLE_WEBDAV_SECRET:
|
||||
return api_error(status.HTTP_403_FORBIDDEN,
|
||||
'Feature is not enabled.')
|
||||
|
||||
username = request.user.username
|
||||
decoded = UserOptions.objects.get_webdav_decoded_secret(username)
|
||||
|
||||
return Response({
|
||||
'secret': decoded,
|
||||
})
|
||||
|
||||
def put(self, request, format=None):
|
||||
if not settings.ENABLE_WEBDAV_SECRET:
|
||||
return api_error(status.HTTP_403_FORBIDDEN,
|
||||
'Feature is not enabled.')
|
||||
|
||||
aes = AESPasswordHasher()
|
||||
|
||||
username = request.user.username
|
||||
secret = request.data.get("secret", None)
|
||||
|
||||
if secret:
|
||||
encoded = aes.encode(secret)
|
||||
UserOptions.objects.set_webdav_secret(username, encoded)
|
||||
else:
|
||||
UserOptions.objects.unset_webdav_secret(username)
|
||||
|
||||
return self.get(request, format)
|
@@ -28,6 +28,7 @@ KEY_USER_LOGGED_IN = "user_logged_in"
|
||||
VAL_USER_LOGGED_IN = "1"
|
||||
|
||||
KEY_DEFAULT_REPO = "default_repo"
|
||||
KEY_WEBDAV_SECRET = "webdav_secret"
|
||||
|
||||
class CryptoOptionNotSetError(Exception):
|
||||
pass
|
||||
@@ -231,6 +232,32 @@ class UserOptionsManager(models.Manager):
|
||||
except UserOptions.DoesNotExist:
|
||||
return False
|
||||
|
||||
def set_webdav_secret(self, username, secret):
|
||||
return self.set_user_option(username, KEY_WEBDAV_SECRET,
|
||||
secret)
|
||||
|
||||
def unset_webdav_secret(self, username):
|
||||
return self.unset_user_option(username, KEY_WEBDAV_SECRET)
|
||||
|
||||
def get_webdav_secret(self, username):
|
||||
try:
|
||||
r = super(UserOptionsManager, self).get(
|
||||
email=username, option_key=KEY_WEBDAV_SECRET
|
||||
)
|
||||
return r.option_val
|
||||
except UserOptions.DoesNotExist:
|
||||
return None
|
||||
|
||||
def get_webdav_decoded_secret(self, username):
|
||||
from seahub.utils.hasher import AESPasswordHasher
|
||||
|
||||
secret = UserOptions.objects.get_webdav_secret(username)
|
||||
if secret:
|
||||
aes = AESPasswordHasher()
|
||||
decoded = aes.decode(secret)
|
||||
else:
|
||||
decoded = None
|
||||
return decoded
|
||||
|
||||
class UserOptions(models.Model):
|
||||
email = LowerCaseCharField(max_length=255, db_index=True)
|
||||
|
@@ -647,6 +647,8 @@ THUMBNAIL_VIDEO_FRAME_TIME = 5 # use the frame at 5 second as thumbnail
|
||||
# template for create new office file
|
||||
OFFICE_TEMPLATE_ROOT = os.path.join(MEDIA_ROOT, 'office-template')
|
||||
|
||||
ENABLE_WEBDAV_SECRET = False
|
||||
|
||||
#####################
|
||||
# Global AddressBook #
|
||||
#####################
|
||||
|
@@ -70,6 +70,7 @@ from seahub.api2.endpoints.wikis import WikisView, WikiView
|
||||
from seahub.api2.endpoints.wiki_pages import WikiPageView, WikiPagesView, WikiPagesDirView, WikiPageContentView
|
||||
from seahub.api2.endpoints.revision_tag import TaggedItemsView, TagNamesView
|
||||
from seahub.api2.endpoints.user import User
|
||||
from seahub.api2.endpoints.webdav_secret import WebdavSecretView
|
||||
|
||||
# Admin
|
||||
from seahub.api2.endpoints.admin.revision_tag import AdminTaggedItemsView
|
||||
@@ -309,6 +310,9 @@ urlpatterns = [
|
||||
## user::avatar
|
||||
url(r'^api/v2.1/user-avatar/$', UserAvatarView.as_view(), name='api-v2.1-user-avatar'),
|
||||
|
||||
## user:webdav
|
||||
url(r'^api/v2.1/webdav-secret/$', WebdavSecretView.as_view(), name='api-v2.1-webdav-secret'),
|
||||
|
||||
## user::wiki
|
||||
url(r'^api/v2.1/wikis/$', WikisView.as_view(), name='api-v2.1-wikis'),
|
||||
url(r'^api/v2.1/wikis/(?P<slug>[^/]+)/$', WikiView.as_view(), name='api-v2.1-wiki'),
|
||||
|
51
seahub/utils/hasher.py
Normal file
51
seahub/utils/hasher.py
Normal file
@@ -0,0 +1,51 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
import base64
|
||||
try:
|
||||
from Crypto.Cipher import AES
|
||||
except ImportError:
|
||||
AES = None
|
||||
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
class AESPasswordDecodeError(Exception):
|
||||
pass
|
||||
|
||||
# the block size for the cipher object; must be 16, 24, or 32 for AES
|
||||
BLOCK_SIZE = 32
|
||||
|
||||
# the character used for padding--with a block cipher such as AES, the value
|
||||
# you encrypt must be a multiple of BLOCK_SIZE in length. This character is
|
||||
# used to ensure that your value is always a multiple of BLOCK_SIZE
|
||||
PADDING = '{'
|
||||
|
||||
# one-liner to sufficiently pad the text to be encrypted
|
||||
pad = lambda s: s + (16 - len(s) % 16) * PADDING
|
||||
|
||||
# one-liners to encrypt/encode and decrypt/decode a string
|
||||
# encrypt with AES, encode with base64
|
||||
EncodeAES = lambda c, s: base64.b64encode(c.encrypt(pad(s)))
|
||||
DecodeAES = lambda c, e: c.decrypt(base64.b64decode(e)).rstrip(PADDING)
|
||||
|
||||
class AESPasswordHasher:
|
||||
algorithm = 'aes'
|
||||
|
||||
def __init__(self, secret=None):
|
||||
if not secret:
|
||||
secret = settings.SECRET_KEY[:BLOCK_SIZE]
|
||||
|
||||
self.cipher = AES.new(secret)
|
||||
|
||||
def encode(self, password):
|
||||
return "%s$%s" % (self.algorithm, EncodeAES(self.cipher, password))
|
||||
|
||||
def verify(self, password, encoded):
|
||||
return self.decode(encoded) == password
|
||||
|
||||
def decode(self, encoded):
|
||||
algorithm, data = encoded.split('$', 1)
|
||||
if algorithm != self.algorithm:
|
||||
raise AESPasswordDecodeError
|
||||
|
||||
return DecodeAES(self.cipher, data)
|
37
tests/api/endpoints/test_webdav_secret.py
Normal file
37
tests/api/endpoints/test_webdav_secret.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import json
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test import override_settings
|
||||
|
||||
from seahub.test_utils import BaseTestCase
|
||||
|
||||
|
||||
@override_settings(ENABLE_WEBDAV_SECRET=True)
|
||||
class WebdavSecretTest(BaseTestCase):
|
||||
def setUp(self, ):
|
||||
self.login_as(self.user)
|
||||
|
||||
def test_can_get(self, ):
|
||||
resp = self.client.get(reverse('api-v2.1-webdav-secret'))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
assert json_resp['secret'] is None
|
||||
|
||||
def test_can_put(self, ):
|
||||
resp = self.client.put(
|
||||
reverse('api-v2.1-webdav-secret'), 'secret=123456',
|
||||
'application/x-www-form-urlencoded',
|
||||
)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
assert json_resp['secret'] == '123456'
|
||||
|
||||
resp = self.client.put(
|
||||
reverse('api-v2.1-webdav-secret'), 'secret=',
|
||||
'application/x-www-form-urlencoded',
|
||||
)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
assert json_resp['secret'] is None
|
@@ -2,7 +2,8 @@ from seahub.test_utils import BaseTestCase
|
||||
from seahub.options.models import (UserOptions, KEY_USER_GUIDE,
|
||||
VAL_USER_GUIDE_ON, VAL_USER_GUIDE_OFF,
|
||||
KEY_DEFAULT_REPO,
|
||||
KEY_FORCE_2FA, VAL_FORCE_2FA)
|
||||
KEY_FORCE_2FA, VAL_FORCE_2FA,
|
||||
KEY_WEBDAV_SECRET)
|
||||
|
||||
class UserOptionsManagerTest(BaseTestCase):
|
||||
def test_is_user_guide_enabled(self):
|
||||
@@ -69,3 +70,18 @@ class UserOptionsManagerTest(BaseTestCase):
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_FORCE_2FA)) == 0
|
||||
assert UserOptions.objects.is_force_2fa(self.user.email) is False
|
||||
|
||||
def test_webdav_secret(self, ):
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_WEBDAV_SECRET)) == 0
|
||||
|
||||
assert UserOptions.objects.get_webdav_secret(self.user.email) is None
|
||||
|
||||
UserOptions.objects.set_webdav_secret(self.user.email, '123456')
|
||||
assert UserOptions.objects.get_webdav_secret(self.user.email) == '123456'
|
||||
|
||||
UserOptions.objects.unset_webdav_secret(self.user.email)
|
||||
assert UserOptions.objects.get_webdav_secret(self.user.email) is None
|
||||
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_WEBDAV_SECRET)) == 0
|
||||
|
Reference in New Issue
Block a user