1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-05 17:02:47 +00:00

update search user

user can search if he/she has no permission of using global address book
This commit is contained in:
lian
2017-02-13 16:03:31 +08:00
parent 7736ff0893
commit c904884526
2 changed files with 143 additions and 69 deletions

View File

@@ -43,104 +43,92 @@ class SearchUser(APIView):
def get(self, request, format=None): def get(self, request, format=None):
if not self._can_use_global_address_book(request):
return api_error(status.HTTP_403_FORBIDDEN,
'Guest user can not use global address book.')
q = request.GET.get('q', None) q = request.GET.get('q', None)
if not q: if not q:
return api_error(status.HTTP_400_BAD_REQUEST, 'Argument missing.') return api_error(status.HTTP_400_BAD_REQUEST, 'q invalid.')
users_from_ccnet = [] email_list = []
users_from_profile = []
users_result = []
username = request.user.username username = request.user.username
if CLOUD_MODE: if self._can_use_global_address_book(request):
if is_org_context(request): # check user permission according to user's role(default, guest, etc.)
url_prefix = request.user.org.url_prefix # if current user can use global address book
users = seaserv.get_org_users_by_url_prefix(url_prefix, -1, -1) if CLOUD_MODE:
if is_org_context(request):
# search from org
email_list += search_user_from_org(request, q)
# search user from ccnet # search from profile, limit search range in all org users
users_from_ccnet = filter(lambda u: q in u.email, users) limited_emails = []
url_prefix = request.user.org.url_prefix
all_org_users = seaserv.get_org_users_by_url_prefix(url_prefix, -1, -1)
for user in all_org_users:
limited_emails.append(user.email)
# when search profile, only search users in org email_list += search_user_from_profile(q, limited_emails)
# 'nickname__icontains' for search by nickname
# 'contact_email__icontains' for search by contact email
users_from_profile = Profile.objects.filter(Q(user__in=[u.email for u in users]) &
(Q(nickname__icontains=q)) | Q(contact_email__icontains=q)).values('user')
elif ENABLE_GLOBAL_ADDRESSBOOK: elif ENABLE_GLOBAL_ADDRESSBOOK:
users_from_ccnet = search_user_from_ccnet(q) # search from ccnet
users_from_profile = Profile.objects.filter(Q(contact_email__icontains=q) | email_list += search_user_from_ccnet(q)
Q(nickname__icontains=q)).values('user')
# search from profile, NOT limit search range
email_list += search_user_from_profile(q)
else:
# in cloud mode, user will be added to Contact when share repo
# search user from user's contacts
email_list += search_user_from_contact(request, q)
else: else:
# in cloud mode, user will be added to Contact when share repo # not CLOUD_MODE
users = [] # search from ccnet
contacts = Contact.objects.get_contacts_by_user(username) email_list += search_user_from_ccnet(q)
for c in contacts:
try:
user = User.objects.get(email = c.contact_email)
c.is_active = user.is_active
except User.DoesNotExist:
continue
c.email = c.contact_email
users.append(c)
users_from_ccnet = filter(lambda u: q in u.email, users)
if is_valid_email(q):
users_from_ccnet += search_user_from_ccnet(q)
# 'user__in' for only get profile of contacts
# 'nickname__icontains' for search by nickname
# 'contact_email__icontains' for search by contact
users_from_profile = Profile.objects.filter(Q(user__in=[u.email for u in users]) &
(Q(nickname__icontains=q)) | Q(contact_email__icontains=q)).values('user')
# search from profile, NOT limit search range
email_list += search_user_from_profile(q)
else: else:
users_from_ccnet = search_user_from_ccnet(q) # if current user can NOT use global address book,
users_from_profile = Profile.objects.filter(Q(contact_email__icontains=q) | # he/she can also search `q` from Contact,
Q(nickname__icontains=q)).values('user') # search user from user's contacts
email_list += search_user_from_contact(request, q)
# remove inactive users and add to result ## search finished
for u in users_from_ccnet[:10]: # check if `q` is a valid email
if u.is_active: if is_valid_email(q):
users_result.append(u.email) email_list.append(q)
for p in users_from_profile[:10]: # remove duplicate emails
email_list = {}.fromkeys(email_list).keys()
email_result = []
# remove nonexistent or inactive user
for email in email_list:
try: try:
user = User.objects.get(email = p['user']) user = User.objects.get(email=email)
if user.is_active:
email_result.append(email)
except User.DoesNotExist: except User.DoesNotExist:
continue continue
if not user.is_active: # check if include myself in user result
continue
users_result.append(p['user'])
# remove duplicate emails
users_result = {}.fromkeys(users_result).keys()
try: try:
include_self = int(request.GET.get('include_self', 1)) include_self = int(request.GET.get('include_self', 1))
except ValueError: except ValueError:
include_self = 1 include_self = 1
if include_self == 0 and username in users_result: if include_self == 0 and username in email_result:
# reomve myself # reomve myself
users_result.remove(username) email_result.remove(username)
# format user result
try: try:
size = int(request.GET.get('avatar_size', 32)) size = int(request.GET.get('avatar_size', 32))
except ValueError: except ValueError:
size = 32 size = 32
formated_result = format_searched_user_result(request, users_result, size)[:10] formated_result = format_searched_user_result(
return HttpResponse(json.dumps({"users": formated_result}), status=200, request, email_result[:10], size)
content_type='application/json; charset=utf-8')
return HttpResponse(json.dumps({"users": formated_result}),
status=200, content_type='application/json; charset=utf-8')
def format_searched_user_result(request, users, size): def format_searched_user_result(request, users, size):
results = [] results = []
@@ -156,6 +144,20 @@ def format_searched_user_result(request, users, size):
return results return results
def search_user_from_org(request, q):
# get all org users
url_prefix = request.user.org.url_prefix
all_org_users = seaserv.get_org_users_by_url_prefix(url_prefix, -1, -1)
# search user from org users
email_list = []
for org_user in all_org_users:
if q in org_user.email:
email_list.append(org_user.email)
return email_list
def search_user_from_ccnet(q): def search_user_from_ccnet(q):
users = [] users = []
@@ -172,4 +174,47 @@ def search_user_from_ccnet(q):
all_ldap_users = seaserv.ccnet_threaded_rpc.search_ldapusers(q, 0, 10 - count) all_ldap_users = seaserv.ccnet_threaded_rpc.search_ldapusers(q, 0, 10 - count)
users.extend(all_ldap_users) users.extend(all_ldap_users)
return users # `users` is already search result, no need search more
email_list = []
for user in users:
email_list.append(user.email)
return email_list
def search_user_from_profile(q, limited_emails=[]):
users = []
if limited_emails:
# 'nickname__icontains' for search by nickname
# 'contact_email__icontains' for search by contact email
users = Profile.objects.filter(Q(user__in=limited_emails) &
(Q(nickname__icontains=q) | Q(contact_email__icontains=q))).values('user')
else:
users = Profile.objects.filter(
Q(nickname__icontains=q) | Q(contact_email__icontains=q)).values('user')
email_list = []
for user in users:
email_list.append(user['user'])
return email_list
def search_user_from_contact(request, q):
# get user's contact list
username = request.user.username
contacts = Contact.objects.get_contacts_by_user(username)
# search user from contact list
email_list = []
for contact in contacts:
if q in contact.contact_email:
email_list.append(contact.contact_email)
# search from profile, limit search range in user contacts
limited_emails = []
for contact in contacts:
limited_emails.append(contact.contact_email)
email_list += search_user_from_profile(q, limited_emails)
return email_list

View File

@@ -4,8 +4,10 @@ from mock import patch
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.test import override_settings from django.test import override_settings
from seahub.contacts.models import Contact
from seahub.profile.models import Profile from seahub.profile.models import Profile
from seahub.profile.utils import refresh_cache from seahub.profile.utils import refresh_cache
from seahub.api2.endpoints.search_user import SearchUser
from seahub.test_utils import BaseTestCase from seahub.test_utils import BaseTestCase
class SearchUserTest(BaseTestCase): class SearchUserTest(BaseTestCase):
@@ -154,3 +156,30 @@ class SearchUserTest(BaseTestCase):
self.assertEqual(200, resp.status_code) self.assertEqual(200, resp.status_code)
assert json_resp['users'][0]['email'] == self.admin.username assert json_resp['users'][0]['email'] == self.admin.username
@patch.object(SearchUser, '_can_use_global_address_book')
def test_search_with_not_use_global_address_book(self, mock_can_use_global_address_book):
mock_can_use_global_address_book.return_value = False
resp = self.client.get(self.endpoint + '?q=%s' % self.admin.username)
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['users'][0]['email'] == self.admin.username
@patch.object(SearchUser, '_can_use_global_address_book')
def test_search_by_contact_with_not_use_global_address_book(self, mock_can_use_global_address_book):
mock_can_use_global_address_book.return_value = False
Contact.objects.add_contact(self.user.username, self.admin.username)
nickname_of_admin = 'nickname of admin'
Profile.objects.add_or_update(self.admin.username,
nickname=nickname_of_admin)
resp = self.client.get(self.endpoint + '?q=%s' % nickname_of_admin)
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['users'][0]['email'] == self.admin.username