mirror of
https://github.com/haiwen/seahub.git
synced 2025-08-02 07:47:32 +00:00
file tag
This commit is contained in:
parent
f55d23b095
commit
b8a4a12880
@ -13,6 +13,7 @@ from django.utils.html import escape
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.utils import api_error
|
||||
from seahub.signals import rename_dirent_successful
|
||||
|
||||
from seahub.views import check_folder_permission
|
||||
from seahub.utils import check_filename_with_rename
|
||||
@ -137,6 +138,11 @@ class CopyMoveTaskView(APIView):
|
||||
src_dirent_name, dst_repo_id, dst_parent_dir,
|
||||
new_dirent_name, replace=False, username=username,
|
||||
need_progress=1)
|
||||
is_dir = True if dirent_type == 'dir' else False
|
||||
rename_dirent_successful.send(sender=None, src_repo_id=src_repo_id,
|
||||
src_parent_dir=src_parent_dir, src_filename=src_dirent_name,
|
||||
dst_repo_id=dst_repo_id, dst_parent_dir=dst_parent_dir,
|
||||
dst_filename=dst_dirent_name, is_dir=is_dir)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
|
@ -14,6 +14,7 @@ from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.utils import api_error
|
||||
from seahub.api2.views import get_dir_recursively, \
|
||||
get_dir_entrys_by_id
|
||||
from seahub.signals import rename_dirent_successful
|
||||
|
||||
from seahub.views import check_folder_permission
|
||||
from seahub.utils import check_filename_with_rename, is_valid_dirent_name, \
|
||||
@ -214,6 +215,10 @@ class DirView(APIView):
|
||||
# rename dir
|
||||
seafile_api.rename_file(repo_id, parent_dir, old_dir_name,
|
||||
new_dir_name, username)
|
||||
rename_dirent_successful.send(sender=None, src_repo_id=repo_id,
|
||||
src_parent_dir=parent_dir, src_filename=old_dir_name,
|
||||
dst_repo_id=repo_id, dst_parent_dir=parent_dir,
|
||||
dst_filename=new_dir_name, is_dir=True)
|
||||
|
||||
new_dir_path = posixpath.join(parent_dir, new_dir_name)
|
||||
dir_info = self.get_dir_info(repo_id, new_dir_path)
|
||||
|
@ -15,6 +15,7 @@ from django.utils.translation import ugettext as _
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.utils import api_error
|
||||
from seahub.signals import rename_dirent_successful
|
||||
|
||||
from seahub.utils import check_filename_with_rename, is_pro_version, \
|
||||
gen_file_upload_url, is_valid_dirent_name
|
||||
@ -244,6 +245,10 @@ class FileView(APIView):
|
||||
try:
|
||||
seafile_api.rename_file(repo_id, parent_dir, oldname,
|
||||
new_file_name, username)
|
||||
rename_dirent_successful.send(sender=None, src_repo_id=repo_id,
|
||||
src_parent_dir=parent_dir, src_filename=oldname,
|
||||
dst_repo_id=repo_id, dst_parent_dir=parent_dir,
|
||||
dst_filename=new_file_name, is_dir=False)
|
||||
except SearpcError as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
@ -314,6 +319,10 @@ class FileView(APIView):
|
||||
seafile_api.move_file(src_repo_id, src_dir, filename,
|
||||
dst_repo_id, dst_dir, new_file_name, replace=False,
|
||||
username=username, need_progress=0, synchronous=1)
|
||||
rename_dirent_successful.send(sender=None, src_repo_id=src_repo_id,
|
||||
src_parent_dir=src_dir, src_filename=filename,
|
||||
dst_repo_id=dst_repo_id, dst_parent_dir=dst_dir,
|
||||
dst_filename=new_file_name, is_dir=False)
|
||||
except SearpcError as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
|
174
seahub/api2/endpoints/file_tag.py
Normal file
174
seahub/api2/endpoints/file_tag.py
Normal file
@ -0,0 +1,174 @@
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
|
||||
from rest_framework import status
|
||||
from rest_framework.authentication import SessionAuthentication
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
from seahub.api2.utils import api_error, user_to_dict
|
||||
from seahub.tags.models import FileUUIDMap, FileTag, Tags
|
||||
from seahub.views import check_folder_permission
|
||||
|
||||
from seaserv import seafile_api
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def check_parameter(func):
|
||||
"""check if the param is given, check if the file or dir exists, and split file_path to
|
||||
parent_path and filename
|
||||
"""
|
||||
def _decorated(view, request, *args, **kwargs):
|
||||
file_path = None
|
||||
is_dir = None
|
||||
repo_id = kwargs.get('repo_id')
|
||||
if request.method == 'GET':
|
||||
file_path = request.GET.get('path', '')
|
||||
is_dir = request.GET.get('is_dir', '')
|
||||
elif request.method in ['POST', 'PUT']:
|
||||
file_path = request.data.get('path', '')
|
||||
is_dir = request.data.get('is_dir', '')
|
||||
elif request.method == 'DELETE':
|
||||
file_path = request.GET.get('path', '')
|
||||
is_dir = request.GET.get('is_dir', '')
|
||||
|
||||
try:
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_msg = _('Internal Server Error')
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
if not repo:
|
||||
error_msg = 'Library %s not found.' % repo_id
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
if not file_path:
|
||||
error_msg = "p %s invalid." % file_path
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
is_dir = is_dir.lower()
|
||||
if is_dir not in ['true', 'false']:
|
||||
error_msg = 'is_dir %s invalid' % is_dir
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
# split file_path to filename and parent_path
|
||||
# and check if the file exists
|
||||
new_file_path = file_path.rstrip('/')
|
||||
parent_path = os.path.dirname(new_file_path)
|
||||
filename = os.path.basename(new_file_path)
|
||||
if is_dir == 'true':
|
||||
dir_id = seafile_api.get_dir_id_by_path(repo_id, new_file_path)
|
||||
if not dir_id:
|
||||
error_msg = 'Folder %s not found.' % file_path
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
if check_folder_permission(request, repo_id, new_file_path) != 'rw':
|
||||
error_msg = _('Permission denied.')
|
||||
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
|
||||
else:
|
||||
if filename.strip() == '':
|
||||
error_msg = 'p %s invalid' % file_path
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
file_id = seafile_api.get_file_id_by_path(repo_id, new_file_path)
|
||||
if not file_id:
|
||||
error_msg = 'File %s not found.' % file_path
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
if check_folder_permission(request, repo_id, parent_path) != 'rw':
|
||||
error_msg = _('Permission denied.')
|
||||
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
|
||||
|
||||
kwargs['parent_path'] = parent_path
|
||||
kwargs['filename'] = filename
|
||||
kwargs['is_dir'] = is_dir
|
||||
return func(view, request, *args, **kwargs)
|
||||
return _decorated
|
||||
|
||||
def check_tagname(tagname):
|
||||
return True if re.match('^[\.\w-]+$', tagname, re.U) else False
|
||||
|
||||
class FileTagsView(APIView):
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAuthenticated, )
|
||||
throttle_classes = (UserRateThrottle, )
|
||||
|
||||
@check_parameter
|
||||
def get(self, request, repo_id, parent_path, filename, is_dir):
|
||||
tag_list = FileTag.objects.get_all_file_tag_by_path(
|
||||
repo_id, parent_path,
|
||||
filename, is_dir
|
||||
)
|
||||
tag_list = [tag.to_dict() for tag in tag_list]
|
||||
return Response({"tags": tag_list}, status=status.HTTP_200_OK)
|
||||
|
||||
@check_parameter
|
||||
def put(self, request, repo_id, parent_path, filename, is_dir):
|
||||
names = request.data.get('names', None)
|
||||
if names is None:
|
||||
error_msg = "Tag can not be empty"
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
res_tag_list = []
|
||||
if not names.strip():
|
||||
name_list = []
|
||||
else:
|
||||
name_list = [name.strip() for name in names.split(",")]
|
||||
for name in name_list:
|
||||
if not check_tagname(name):
|
||||
error_msg = _(u'Tag can only contain letters, numbers, dot, hyphen or underscore')
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
FileTag.objects.delete_all_filetag_by_path(repo_id, parent_path,
|
||||
filename, is_dir)
|
||||
for name in name_list:
|
||||
tag_obj, created = FileTag.objects.get_or_create_file_tag(
|
||||
repo_id, parent_path, filename, is_dir, name,
|
||||
request.user.username
|
||||
)
|
||||
res_tag_list.append(tag_obj.to_dict())
|
||||
|
||||
return Response({"tags": res_tag_list}, status=status.HTTP_200_OK)
|
||||
|
||||
@check_parameter
|
||||
def post(self, request, repo_id, parent_path, filename, is_dir):
|
||||
names = request.POST.get('names', None)
|
||||
if names is None or not names.strip():
|
||||
error_msg = "Tag can not be empty"
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
name_list = [name.strip() for name in names.split(",")]
|
||||
for name in name_list:
|
||||
if not check_tagname(name):
|
||||
error_msg = _(u'Tag can only contain letters, numbers, dot, hyphen or underscore')
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
res_tag_list = []
|
||||
for name in name_list:
|
||||
tag_obj, created = FileTag.objects.get_or_create_file_tag(
|
||||
repo_id, parent_path, filename, is_dir, name,
|
||||
request.user.username
|
||||
)
|
||||
res_tag_list.append(tag_obj.to_dict())
|
||||
return Response({"tags": res_tag_list}, status=status.HTTP_200_OK)
|
||||
|
||||
class FileTagView(APIView):
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAuthenticated, )
|
||||
throttle_classes = (UserRateThrottle, )
|
||||
|
||||
@check_parameter
|
||||
def delete(self, request, repo_id, parent_path, filename, name, is_dir):
|
||||
if not name or not check_tagname(name):
|
||||
error_msg = _(u'Tag can only contain letters, numbers, dot, hyphen or underscore')
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
if FileTag.objects.delete_file_tag_by_path(repo_id,
|
||||
parent_path,filename,is_dir,name):
|
||||
return Response({"success": True}, status=status.HTTP_200_OK)
|
||||
else:
|
||||
return Response({"success": True}, status=status.HTTP_202_ACCEPTED)
|
@ -224,6 +224,7 @@ INSTALLED_APPS = (
|
||||
'seahub.password_session',
|
||||
'seahub.admin_log',
|
||||
'seahub.wopi',
|
||||
'seahub.tags',
|
||||
)
|
||||
|
||||
# Enabled or disable constance(web settings).
|
||||
|
@ -6,3 +6,6 @@ repo_created = django.dispatch.Signal(providing_args=["org_id", "creator", "repo
|
||||
repo_deleted = django.dispatch.Signal(providing_args=["org_id", "usernames", "repo_owner", "repo_id", "repo_name"])
|
||||
upload_file_successful = django.dispatch.Signal(providing_args=["repo_id", "file_path", "owner"])
|
||||
comment_file_successful = django.dispatch.Signal(providing_args=["repo", "file_path", "comment", "author", "notify_users"])
|
||||
rename_dirent_successful = django.dispatch.Signal(providing_args=["src_repo_id", "src_parent_dir",
|
||||
"src_filename", "dst_repo_id",
|
||||
"dst_parent_dir", "dst_filename", "is_dir"])
|
||||
|
0
seahub/tags/__init__.py
Normal file
0
seahub/tags/__init__.py
Normal file
201
seahub/tags/models.py
Normal file
201
seahub/tags/models.py
Normal file
@ -0,0 +1,201 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
# -*- coding: utf-8 -*-
|
||||
import uuid
|
||||
|
||||
from django.db import models
|
||||
|
||||
from seahub.base.fields import LowerCaseCharField
|
||||
|
||||
########## Manager
|
||||
class FileUUIDMapManager(models.Manager):
|
||||
|
||||
def get_fileuuidmap_by_uuid(self, uuid):
|
||||
try:
|
||||
return super(FileUUIDMapManager, self).get(uuid=uuid)
|
||||
except self.model.DoesNotExist:
|
||||
return None
|
||||
|
||||
def get_or_create_fileuuidmap(self, repo_id, parent_path, filename, is_dir):
|
||||
""" create filemap by repo_id、 parent_path、filename、id_dir
|
||||
args:
|
||||
- `repo_id`:
|
||||
- `parent_path`:
|
||||
- `filename`: input a dirname if it's dir
|
||||
- `id_dir`: input True or False
|
||||
return:
|
||||
uuid of filemap
|
||||
"""
|
||||
uuid = self.get_fileuuidmap_by_path(repo_id, parent_path, filename, is_dir)
|
||||
if not uuid:
|
||||
uuid = self.model(repo_id=repo_id, parent_path=parent_path,
|
||||
filename=filename, is_dir=is_dir)
|
||||
uuid.save(using=self._db)
|
||||
return uuid
|
||||
|
||||
def get_fileuuidmap_by_path(self, repo_id, parent_path, filename, is_dir):
|
||||
""" get filemap uuid by repoid、 parent_path 、 filename 、is_dir
|
||||
args:
|
||||
- `repo_id`:
|
||||
- `parent_path`:
|
||||
- `filename`: input a dirname if it's dir
|
||||
- `id_dir`: input True or False
|
||||
return:
|
||||
return uuid if it's exist,otherwise return None
|
||||
"""
|
||||
try:
|
||||
uuid = super(FileUUIDMapManager, self).get(
|
||||
repo_id=repo_id, parent_path=parent_path,
|
||||
filename=filename, is_dir=is_dir)
|
||||
return uuid
|
||||
except self.model.DoesNotExist:
|
||||
return None
|
||||
|
||||
class TagsManager(models.Manager):
|
||||
def get_or_create_tag(self, tagname):
|
||||
try:
|
||||
return super(TagsManager, self).get(name=tagname)
|
||||
except self.model.DoesNotExist:
|
||||
tag = self.model(name=tagname)
|
||||
tag.save()
|
||||
return tag
|
||||
|
||||
class FileTagManager(models.Manager):
|
||||
def get_or_create_file_tag(self, repo_id, parent_path, filename, is_dir, tagname, creator):
|
||||
""" create filetag if tag does not exist, otherwise directly to True
|
||||
args:
|
||||
- `uuid`: uuid of filemap
|
||||
- `tagname`:
|
||||
- `creator`:
|
||||
|
||||
return:
|
||||
(tag_obj, is_created)
|
||||
"""
|
||||
fileuuidmap = FileUUIDMap.objects.get_or_create_fileuuidmap(repo_id, parent_path, filename, is_dir)
|
||||
tag = self.exists_filetag(fileuuidmap.uuid, tagname)
|
||||
if tag[1]:
|
||||
return (tag[0], False)
|
||||
else:
|
||||
tag = self.model(
|
||||
uuid=FileUUIDMap.objects.get_fileuuidmap_by_uuid(fileuuidmap.uuid),
|
||||
tag=Tags.objects.get_or_create_tag(tagname),
|
||||
username=creator
|
||||
)
|
||||
tag.save()
|
||||
return (tag, True)
|
||||
|
||||
def exists_filetag(self, uuid_id, tagname):
|
||||
""" To determine whether the filetag exists.
|
||||
args:
|
||||
- `uuid`:uuid of filemap
|
||||
- `tagname`: tag name
|
||||
return:
|
||||
(tag_obj, is_exist)
|
||||
"""
|
||||
try:
|
||||
tag = super(FileTagManager, self).get(uuid=uuid_id, tag__name=tagname)
|
||||
return (tag, True)
|
||||
except self.model.DoesNotExist:
|
||||
return (None, False)
|
||||
|
||||
def get_all_file_tag_by_path(self, repo_id, parent_path, filename, is_dir):
|
||||
"""
|
||||
args:
|
||||
- `repo_id`:
|
||||
- `parent_path`:
|
||||
- `filename`: file name or dir name
|
||||
- `is_dir`: True or False
|
||||
return list of filetag
|
||||
"""
|
||||
return super(FileTagManager, self).filter(
|
||||
uuid__repo_id=repo_id,
|
||||
uuid__parent_path=parent_path,
|
||||
uuid__filename=filename, uuid__is_dir=is_dir
|
||||
)
|
||||
|
||||
def delete_file_tag_by_path(self, repo_id, parent_path, filename, is_dir, tagname):
|
||||
""" delete one specific filetag
|
||||
args:
|
||||
- `uuid_id`:id of uuid in filemap
|
||||
- `tagname`:
|
||||
return:
|
||||
always return True
|
||||
"""
|
||||
try:
|
||||
filetag = super(FileTagManager, self).get(
|
||||
uuid__repo_id=repo_id,
|
||||
uuid__parent_path=parent_path,
|
||||
uuid__filename=filename,
|
||||
uuid__is_dir=is_dir,
|
||||
tag__name=tagname
|
||||
)
|
||||
filetag.delete()
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
def delete_all_filetag_by_path(self, repo_id, parent_path, filename, is_dir):
|
||||
""" delete all filetag
|
||||
args:
|
||||
- `repo_id`:
|
||||
- `parent_path`
|
||||
- `filename`
|
||||
- `is_dir`
|
||||
return:
|
||||
always return True
|
||||
"""
|
||||
filetags = super(FileTagManager, self).filter(
|
||||
uuid__repo_id=repo_id,
|
||||
uuid__parent_path=parent_path,
|
||||
uuid__filename=filename,
|
||||
uuid__is_dir=is_dir
|
||||
).delete()
|
||||
|
||||
########## Model
|
||||
class FileUUIDMap(models.Model):
|
||||
uuid = models.UUIDField(primary_key=True, default=uuid.uuid4)
|
||||
repo_id = models.CharField(max_length=36)
|
||||
parent_path = models.TextField()
|
||||
filename = models.CharField(max_length=1024)
|
||||
is_dir = models.BooleanField()
|
||||
|
||||
objects = FileUUIDMapManager()
|
||||
|
||||
class Tags(models.Model):
|
||||
name = models.CharField(max_length=1024, unique=True)
|
||||
|
||||
objects = TagsManager()
|
||||
|
||||
class FileTag(models.Model):
|
||||
uuid = models.ForeignKey(FileUUIDMap, on_delete=models.CASCADE)
|
||||
tag = models.ForeignKey(Tags)
|
||||
username = LowerCaseCharField(max_length=255)
|
||||
|
||||
objects = FileTagManager()
|
||||
|
||||
def to_dict(self):
|
||||
return {'id': self.tag.id,'name': self.tag.name,'creator': self.username}
|
||||
|
||||
########## handle signals
|
||||
import logging
|
||||
|
||||
from django.dispatch import receiver
|
||||
from seahub.signals import rename_dirent_successful
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@receiver(rename_dirent_successful)
|
||||
def update_fileuuidmap(sender, **kwargs):
|
||||
src_repo_id = kwargs.get('src_repo_id')
|
||||
src_parent_dir = kwargs.get('src_parent_dir')
|
||||
src_filename = kwargs.get('src_filename')
|
||||
dst_repo_id = kwargs.get('dst_repo_id')
|
||||
dst_parent_dir = kwargs.get('dst_parent_dir')
|
||||
dst_filename = kwargs.get('dst_filename')
|
||||
is_dir = kwargs.get('is_dir')
|
||||
src_fileuuidmap = FileUUIDMap.objects.get_fileuuidmap_by_path(src_repo_id,src_parent_dir, src_filename, is_dir)
|
||||
if src_fileuuidmap:
|
||||
src_fileuuidmap.repo_id = dst_repo_id
|
||||
src_fileuuidmap.parent_dir = dst_parent_dir
|
||||
src_fileuuidmap.filename = dst_filename
|
||||
src_fileuuidmap.is_dir = is_dir
|
||||
src_fileuuidmap.save()
|
@ -29,6 +29,9 @@ from seahub.api2.endpoints.repos_batch import ReposBatchView
|
||||
from seahub.api2.endpoints.repos import RepoView
|
||||
from seahub.api2.endpoints.file import FileView
|
||||
from seahub.api2.endpoints.dir import DirView, DirDetailView
|
||||
from seahub.api2.endpoints.file_tag import FileTagView
|
||||
from seahub.api2.endpoints.file_tag import FileTagsView
|
||||
from seahub.api2.endpoints.dir import DirView
|
||||
from seahub.api2.endpoints.repo_trash import RepoTrash
|
||||
from seahub.api2.endpoints.deleted_repos import DeletedRepos
|
||||
from seahub.api2.endpoints.repo_history import RepoHistory
|
||||
@ -212,6 +215,8 @@ urlpatterns = patterns(
|
||||
url(r'^api/v2.1/repos/batch/$', ReposBatchView.as_view(), name='api-v2.1-repos-batch'),
|
||||
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/$', RepoView.as_view(), name='api-v2.1-repo-view'),
|
||||
url(r'^api/v2.1/deleted-repos/$', DeletedRepos.as_view(), name='api2-v2.1-deleted-repos'),
|
||||
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/tags/$', FileTagsView.as_view(), name="api-v2.1-filetags-view"),
|
||||
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/tags/(?P<name>.*?)/$',FileTagView.as_view(), name="api-v2.1-filetag-view"),
|
||||
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/file/$', FileView.as_view(), name='api-v2.1-file-view'),
|
||||
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/dir/$', DirView.as_view(), name='api-v2.1-dir-view'),
|
||||
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/dir/detail/$', DirDetailView.as_view(), name='api-v2.1-dir-detail-view'),
|
||||
|
147
tests/api/endpoints/test_file_tag.py
Normal file
147
tests/api/endpoints/test_file_tag.py
Normal file
@ -0,0 +1,147 @@
|
||||
#coding:utf-8
|
||||
import os
|
||||
import json
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test.client import encode_multipart
|
||||
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seaserv import seafile_api
|
||||
class FileTagTest(BaseTestCase):
|
||||
def setUp(self):
|
||||
self.login_as(self.user)
|
||||
|
||||
self.test_filepath ='/test_file.txt'
|
||||
self.test_folderpath = '/test_folder'
|
||||
self.test_parentpath = '/'
|
||||
self.test_filename = 'test_file.txt'
|
||||
self.test_folder_name = 'test_folder'
|
||||
self.new_repo = seafile_api.get_repo(self.create_repo(
|
||||
name='test-repo', desc='', username=self.user.username,
|
||||
passwd=None))
|
||||
self.endpoint = reverse('api-v2.1-filetags-view', args=[self.new_repo.id])
|
||||
self._endpoint = reverse('api-v2.1-filetag-view', args=[self.new_repo.id, 'test_tagname'])
|
||||
|
||||
self.test_file = self.create_file( repo_id=self.new_repo.id,
|
||||
parent_dir=self.test_parentpath, filename=self.test_filename,
|
||||
username=self.user.username
|
||||
)
|
||||
self.test_folder = self.create_folder(repo_id = self.new_repo.id,
|
||||
parent_dir=self.test_parentpath, dirname=self.test_folder_name,
|
||||
username=self.user.username)
|
||||
|
||||
def test_default(self):
|
||||
#test for create file
|
||||
response = self.client.post(self.endpoint, { 'path': self.test_filepath,
|
||||
'names': 'test_tagname', 'is_dir': False,
|
||||
})
|
||||
assert response.status_code == 200
|
||||
self.filetag_id = response.data['tags'][0]['id']
|
||||
self.filetag_name = response.data['tags'][0]['name']
|
||||
self.filetag_username = response.data['tags'][0]['creator']
|
||||
assert self.filetag_id
|
||||
assert self.filetag_name
|
||||
assert self.filetag_username
|
||||
|
||||
#test for create folder
|
||||
folder_response = self.client.post(self.endpoint, {
|
||||
'path': self.test_folderpath, 'names': 'test_tagname',
|
||||
'is_dir': True,
|
||||
})
|
||||
assert folder_response.status_code == 200
|
||||
self.foldertag_id = folder_response.data["tags"][0]['id']
|
||||
self.foldertag_name = folder_response.data["tags"][0]['name']
|
||||
self.foldertag_username = folder_response.data["tags"][0]['creator']
|
||||
assert self.foldertag_id
|
||||
assert self.foldertag_name
|
||||
assert self.foldertag_username
|
||||
|
||||
#test for get file tag
|
||||
response = self.client.get(self.endpoint, {
|
||||
'path': self.test_filepath,
|
||||
'is_dir': False,
|
||||
})
|
||||
assert response.status_code == 200
|
||||
assert response.data['tags'][0]['id'] == self.filetag_id
|
||||
assert response.data['tags'][0]['name'] == self.filetag_name
|
||||
assert response.data['tags'][0]['creator'] == self.filetag_username
|
||||
|
||||
#test for get folder tag
|
||||
response = self.client.get(self.endpoint, {
|
||||
'path': self.test_folderpath,
|
||||
'is_dir': True,
|
||||
})
|
||||
assert response.status_code == 200
|
||||
assert response.data['tags'][0]['id'] == self.foldertag_id
|
||||
assert response.data['tags'][0]['name'] == self.foldertag_name
|
||||
assert response.data['tags'][0]['creator'] == self.foldertag_username
|
||||
|
||||
#test for del file tag
|
||||
response = self.client.delete(self._endpoint + "?path=%s&is_dir=%s"
|
||||
%(self.test_filepath, False))
|
||||
assert response.status_code == 200
|
||||
response = self.client.get(self.endpoint, {
|
||||
'path': self.test_filepath,
|
||||
'is_dir': False,
|
||||
})
|
||||
assert len(response.data['tags']) == 0
|
||||
#test for del folder tag
|
||||
response = self.client.delete(self._endpoint + "?path=%s&is_dir=%s"
|
||||
%(self.test_folderpath, True))
|
||||
assert response.status_code == 200
|
||||
response = self.client.get(self.endpoint, {
|
||||
'path': self.test_folderpath,
|
||||
'is_dir': True,
|
||||
})
|
||||
assert len(response.data['tags']) == 0
|
||||
|
||||
def test_post(self):
|
||||
# add one
|
||||
response = self.client.post(self.endpoint, {
|
||||
'path': self.test_filepath, 'names': 'test_tagname',
|
||||
'is_dir': False,
|
||||
})
|
||||
assert response.status_code == 200
|
||||
assert response.data["tags"][0]["id"]
|
||||
assert response.data["tags"][0]["name"] == "test_tagname"
|
||||
assert response.data["tags"][0]["creator"] == self.user.username
|
||||
# add more
|
||||
response = self.client.post(self.endpoint, {
|
||||
'path': self.test_filepath,
|
||||
'names': 'test_tagname, test_tagname1, test_tagnm天',
|
||||
'is_dir': False,
|
||||
})
|
||||
assert response.status_code == 200
|
||||
|
||||
assert response.data["tags"][0]["id"]
|
||||
tags_names = [tags["name"] for tags in response.data["tags"]]
|
||||
assert "test_tagname" in tags_names
|
||||
assert "test_tagname1" in tags_names
|
||||
assert "test_tagnm天".decode('utf-8') in tags_names
|
||||
assert response.data["tags"][0]["creator"] == self.user.username
|
||||
response = self.client.get(self.endpoint, {
|
||||
'path': self.test_filepath,
|
||||
'is_dir': False,
|
||||
})
|
||||
tags_names = [tags["name"] for tags in response.data["tags"]]
|
||||
assert "test_tagname" in tags_names
|
||||
assert "test_tagname1" in tags_names
|
||||
assert "test_tagnm天".decode('utf-8') in tags_names
|
||||
#test delete all filetag and add specifiy tag
|
||||
data = 'names=test_zm-.&path=%s&is_dir=%s' % (self.test_filepath, False)
|
||||
response = self.client.put(self.endpoint, data,'application/x-www-form-urlencoded')
|
||||
assert response.status_code == 200
|
||||
response = self.client.get(self.endpoint, { 'path': self.test_filepath,
|
||||
'is_dir': False,
|
||||
})
|
||||
tags_names = [tags["name"] for tags in response.data["tags"]]
|
||||
assert "test_tagname" not in tags_names
|
||||
assert "test_tagname1" not in tags_names
|
||||
assert "test_tagnm" not in tags_names
|
||||
assert "test_zm-." in tags_names
|
||||
#delete delete all filetag
|
||||
data = 'names=&path=%s&is_dir=%s' % (self.test_filepath, False)
|
||||
response = self.client.put(self.endpoint, data,'application/x-www-form-urlencoded')
|
||||
tags_names = [tags["name"] for tags in response.data["tags"]]
|
||||
assert response.status_code == 200
|
||||
assert "test_zm" not in tags_names
|
Loading…
Reference in New Issue
Block a user