1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-13 13:50:07 +00:00

file participant batch add (#3914)

* file participants batch add view

* file participants batch add test

* file participants batch add in exist API

* fix database query
This commit is contained in:
sniper-py
2019-07-26 11:14:08 +08:00
committed by Daniel Pan
parent 4a4e280098
commit 75318ef1f9
2 changed files with 86 additions and 44 deletions

View File

@@ -62,7 +62,7 @@ class FileParticipantsView(APIView):
return Response({'participant_list': participant_list})
def post(self, request, repo_id, format=None):
"""Post a participant of a file.
"""batch add participants of a file.
"""
# argument check
path = request.data.get('path')
@@ -70,41 +70,67 @@ class FileParticipantsView(APIView):
return api_error(status.HTTP_400_BAD_REQUEST, 'path invalid.')
path = normalize_file_path(path)
email = request.data.get('email')
if not email or not is_valid_username(email):
return api_error(status.HTTP_400_BAD_REQUEST, 'email invalid.')
emails = request.data.get('emails')
if not emails or not isinstance(emails, list):
return api_error(status.HTTP_400_BAD_REQUEST, 'emails invalid.')
emails = list(set(emails))
# resource check
file_id = seafile_api.get_file_id_by_path(repo_id, path)
if not file_id:
return api_error(status.HTTP_404_NOT_FOUND, 'File %s not found.' % path)
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
return api_error(status.HTTP_404_NOT_FOUND, 'User %s not found.' % email)
# permission check
if not check_folder_permission(request, repo_id, '/'):
return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.')
if not seafile_api.check_permission_by_path(repo_id, '/', user.username):
return api_error(status.HTTP_403_FORBIDDEN, _('%s Permission denied.') % email)
# batch add
success = list()
failed = list()
# main
try:
file_uuid = FileUUIDMap.objects.get_or_create_fileuuidmap_by_path(repo_id, path, False)
if FileParticipant.objects.get_participant(file_uuid, email):
return api_error(status.HTTP_409_CONFLICT, _('Participant %s already exists.') % email)
FileParticipant.objects.add_participant(file_uuid, email)
participant = get_user_common_info(email)
uuid = FileUUIDMap.objects.get_or_create_fileuuidmap_by_path(repo_id, path, False)
participants_queryset = FileParticipant.objects.get_participants(uuid)
except Exception as e:
logger.error(e)
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, 'Internal Server Error.')
return Response(participant, status=201)
for email in emails:
if not is_valid_username(email):
error_dic = {'email': email, 'error_msg': 'email invalid.', 'error_code': 400}
failed.append(error_dic)
continue
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
error_dic = {'email': email, 'error_msg': 'User not found.', 'error_code': 404}
failed.append(error_dic)
continue
# permission check
if not seafile_api.check_permission_by_path(repo_id, '/', user.username):
error_dic = {'email': email, 'error_msg': 'Permission denied.', 'error_code': 403}
failed.append(error_dic)
continue
# main
try:
if participants_queryset.filter(uuid=uuid, username=email).count() > 0:
error_dic = {'email': email, 'error_msg': 'Participant already exists.', 'error_code': 409}
failed.append(error_dic)
continue
FileParticipant.objects.add_participant(uuid, email)
participant = get_user_common_info(email)
success.append(participant)
except Exception as e:
logger.error(e)
error_dic = {'email': email, 'error_msg': 'Internal Server Error.', 'error_code': 500}
failed.append(error_dic)
continue
return Response({'success': success, 'failed': failed})
class FileParticipantView(APIView):

View File

@@ -49,52 +49,68 @@ class FileParticipantsTest(BaseTestCase):
self.assertEqual(403, resp.status_code)
def test_can_post(self):
resp = self.client.post(self.url, {
'email': self.tmp_user.username,
data = {
'emails': [self.tmp_user.username, self.admin.username],
'path': self.file,
})
self.assertEqual(201, resp.status_code)
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['email'] == self.tmp_user.username
assert json_resp['avatar_url']
assert json_resp['contact_email']
assert json_resp['name']
assert FileParticipant.objects.filter(
uuid=self.file_uuid, username=self.tmp_user.username).count() == 1
assert len(json_resp['success']) == 1
assert json_resp['success'][0]['email'] == self.tmp_user.username
assert len(json_resp['failed']) == 1
assert json_resp['failed'][0]['email'] == self.admin.username
assert FileParticipant.objects.filter(uuid=self.file_uuid).count() == 2
def test_can_not_post_by_not_exists_path(self):
invalid_path = self.file + randstring(5)
resp = self.client.post(self.url, {
'email': self.tmp_user.username,
data = {
'emails': [self.tmp_user.username],
'path': invalid_path,
})
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
def test_can_not_post_by_invalid_user_permission(self):
self.logout()
self.login_as(self.admin)
resp = self.client.post(self.url, {
'email': self.tmp_user.username,
data = {
'emails': [self.tmp_user.username],
'path': self.file,
})
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
def test_can_not_post_by_not_exists_user(self):
invalid_email = randstring(5) + '@seafile.com'
resp = self.client.post(self.url, {
'email': invalid_email,
data = {
'emails': [invalid_email],
'path': self.file,
})
self.assertEqual(404, resp.status_code)
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 0
assert len(json_resp['failed']) == 1
assert json_resp['failed'][0]['email'] == invalid_email
def test_can_not_post_by_invalid_email_permission(self):
resp = self.client.post(self.url, {
'email': self.admin.username,
data = {
'emails': [self.admin.username],
'path': self.file,
})
self.assertEqual(403, resp.status_code)
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 0
assert len(json_resp['failed']) == 1
assert json_resp['failed'][0]['email'] == self.admin.username
class FileParticipantTest(BaseTestCase):