1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-05-13 02:15:59 +00:00

update search user web api ()

only return active users
This commit is contained in:
lian 2024-11-09 16:34:11 +08:00 committed by GitHub
parent 3235466eb3
commit f316cbd2a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 114 additions and 136 deletions

View File

@ -1,7 +1,6 @@
# Copyright (c) 2012-2016 Seafile Ltd.
import os
import sys
import json
import logging
from django.db.models import Q
@ -18,12 +17,13 @@ from seaserv import ccnet_api
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.utils import api_error
from seahub.utils import is_valid_email, is_org_context
from seahub.utils.ccnet_db import CcnetDB
from seahub.base.accounts import User
from seahub.base.templatetags.seahub_tags import email2nickname, \
email2contact_email
from seahub.profile.models import Profile
from seahub.contacts.models import Contact
from seahub.avatar.templatetags.avatar_tags import api_avatar_url
from seahub.settings import ENABLE_GLOBAL_ADDRESSBOOK, \
@ -38,12 +38,11 @@ except ImportError:
try:
current_path = os.path.dirname(os.path.abspath(__file__))
seafile_conf_dir = os.path.join(current_path, \
'../../../../../conf')
seafile_conf_dir = os.path.join(current_path, '../../../../../conf')
sys.path.append(seafile_conf_dir)
from seahub_custom_functions import custom_search_user
CUSTOM_SEARCH_USER = True
except ImportError as e:
except ImportError:
CUSTOM_SEARCH_USER = False
@ -84,6 +83,10 @@ class SearchUser(APIView):
limited_emails = []
for org_user in all_org_users:
if not org_user.is_active:
continue
# prepare limited emails for search from profile
limited_emails.append(org_user.email)
@ -121,18 +124,7 @@ class SearchUser(APIView):
# remove duplicate emails
# get_emailusers_in_list can only accept 20 users at most
email_list = list({}.fromkeys(email_list).keys())[:20]
email_result = []
# remove nonexistent or inactive user
email_list_json = json.dumps(email_list)
user_obj_list = ccnet_api.get_emailusers_in_list('DB', email_list_json) + \
ccnet_api.get_emailusers_in_list('LDAP', email_list_json)
for user_obj in user_obj_list:
if user_obj.is_active:
email_result.append(user_obj.email)
email_result = list({}.fromkeys(email_list).keys())[:20]
# specific search `q`
user_q_obj = ccnet_api.get_emailuser(q)
@ -177,7 +169,7 @@ def format_searched_user_result(request, users, size):
results = []
if ENABLE_SHOW_LOGIN_ID_WHEN_SEARCH_USER:
profile_queryset = Profile.objects.filter(user__in=users)
profile_dict = { p.user: p.login_id for p in profile_queryset if p.login_id }
profile_dict = {p.user: p.login_id for p in profile_queryset if p.login_id}
for email in users:
url, is_default, date_uploaded = api_avatar_url(email, size)
@ -194,21 +186,16 @@ def format_searched_user_result(request, users, size):
return results
def search_user_from_ccnet(q):
def search_user_from_ccnet(q, offset=0, limit=10):
""" Return 10 items at most.
"""
users = []
db_users = ccnet_api.search_emailusers('DB', q, 0, 10)
users.extend(db_users)
ccnet_db = CcnetDB()
users, count = ccnet_db.list_eligible_users(offset, limit,
is_active=True, q=q)
count = len(users)
if count < 10:
ldap_imported_users = ccnet_api.search_emailusers('LDAP', q, 0, 10 - count)
users.extend(ldap_imported_users)
count = len(users)
if count < 10 and ENABLE_SEARCH_FROM_LDAP_DIRECTLY:
all_ldap_users = ccnet_api.search_ldapusers(q, 0, 10 - count)
users.extend(all_ldap_users)
@ -221,29 +208,44 @@ def search_user_from_ccnet(q):
return email_list
def search_user_from_profile(q):
""" Return 10 items at most.
def search_user_from_profile(q, offset=0, limit=10, max_attempts=10):
"""Return a list of email addresses, with a length of at least 10.
If not enough results are found, continue searching up to max_attempts times.
"""
# 'nickname__icontains' for search by nickname
# 'contact_email__icontains' for search by contact email
users = Profile.objects.filter(Q(nickname__icontains=q) | \
Q(contact_email__icontains=q) | \
Q(login_id__icontains=q)).values('user')[:10]
email_list = []
for user in users:
email_list.append(user['user'])
attempts = 0
return email_list
ccnet_db = CcnetDB()
while len(email_list) < 10 and attempts < max_attempts:
# Search by nickname, contact email, or login ID
users = Profile.objects.filter(
Q(nickname__icontains=q) |
Q(contact_email__icontains=q) |
Q(login_id__icontains=q)
).values('user')[offset:offset + limit]
new_emails = ccnet_db.get_active_users_by_user_list([user['user'] for user in users])
email_list.extend([email for email in new_emails if email not in email_list])
offset += limit
attempts += 1
return email_list[:10]
def search_user_from_profile_with_limits(q, limited_emails):
""" Return 10 items at most.
"""
# search within limited_emails
users = Profile.objects.filter(Q(user__in=limited_emails) & (Q(nickname__icontains=q) | \
Q(contact_email__icontains=q) | \
Q(login_id__icontains=q))).values('user')[:10]
users = Profile.objects.filter(
Q(user__in=limited_emails) & (
Q(nickname__icontains=q) |
Q(contact_email__icontains=q) |
Q(login_id__icontains=q)
)
).values('user')[:10]
email_list = []
for user in users:
@ -259,23 +261,6 @@ def search_user_when_global_address_book_disabled(request, q):
email_list = []
username = request.user.username
# search from contact
# get user's contact list
contacts = Contact.objects.get_contacts_by_user(username)
for contact in contacts:
# search user from contact list
if q in contact.contact_email:
email_list.append(contact.contact_email)
# search from profile, limit search range in user's contacts
limited_emails = []
for contact in contacts:
limited_emails.append(contact.contact_email)
email_list += search_user_from_profile_with_limits(q, limited_emails)
current_user = User.objects.get(email=username)
if current_user.role.lower() != 'guest':

View File

@ -27,12 +27,12 @@ MANAGERS = ADMINS
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
'NAME': '%s/seahub/seahub.db' % PROJECT_ROOT, # Or path to database file if using sqlite3.
'USER': '', # Not used with sqlite3.
'PASSWORD': '', # Not used with sqlite3.
'HOST': '', # Set to empty string for localhost. Not used with sqlite3.
'PORT': '', # Set to empty string for default. Not used with sqlite3.
'ENGINE': 'django.db.backends.mysql',
'NAME': 'seahub',
'USER': 'root',
'PASSWORD': 'root',
'HOST': '127.0.0.1',
'PORT': '3306',
}
}

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import os
import configparser
from django.db import connection
def get_ccnet_db_name():
@ -24,11 +25,6 @@ def get_ccnet_db_name():
return db_name, None
import os
import configparser
from django.db import connection
class CcnetGroup(object):
def __init__(self, **kwargs):
@ -89,23 +85,29 @@ class CcnetDB:
groups.append(group_obj)
return groups
def list_eligible_users(self, start, limit, is_active=None, role=None):
def list_eligible_users(self, start, limit,
is_active=None, role=None, q=None):
def status(is_active):
return 'AND t1.is_active=%s ' % is_active
return f'AND t1.is_active={is_active} '
def is_role(role):
if role == 'default':
return 'AND (t2.role is null or t2.role = "default")'
return 'AND (t2.role is null or t2.role = "default") '
else:
return 'AND t2.role = "%s"' % role
return f'AND t2.role = "{role}" '
def search(q):
return f'AND t1.email LIKE "%{q}%" '
search_clause = ''
if is_active:
search_clause += status(is_active)
if role:
search_clause += is_role(role)
if q:
search_clause += search(q)
count_sql = f"""
SELECT count(1)
FROM
@ -130,16 +132,15 @@ class CcnetDB:
WHERE
t1.email NOT LIKE '%%@seafile_group' %s
ORDER BY t1.id
LIMIT {limit} OFFSET {start}
LIMIT {limit} OFFSET {start};
""" % search_clause
users = []
total = 0
with connection.cursor() as cursor:
cursor.execute(count_sql)
cursor.execute(count_sql)
total_count = int(cursor.fetchone()[0])
cursor.execute(sql)
for item in cursor.fetchall():
user_id = item[0]
@ -168,9 +169,9 @@ class CcnetDB:
group_ids_str = ','.join(str(id) for id in group_ids)
sql = f"""
SELECT user_name, group_id
FROM
FROM
`{self.db_name}`.`GroupUser`
WHERE
WHERE
group_id IN ({group_ids_str}) AND is_staff = 1
"""
with connection.cursor() as cursor:
@ -210,10 +211,11 @@ class CcnetDB:
SELECT `email`
FROM `{self.db_name}`.`EmailUser`
WHERE
email IN ({user_list_str}) AND is_active = 1 AND email NOT LIKE '%%@seafile_group'
email IN ({user_list_str}) AND is_active = 1 AND email NOT LIKE '%%@seafile_group'
"""
with connection.cursor() as cursor:
cursor.execute(sql)
for user in cursor.fetchall():
active_users.append(user[0])
return active_users

View File

@ -220,12 +220,12 @@ class SeafileDB:
if start == -1 and limit == -1:
sql = f"""
SELECT
u.repo_id, o.owner_id, u.email, e.token,
u.repo_id, o.owner_id, u.email, e.token,
p.peer_id, p.peer_ip, p.peer_name, p.sync_time, p.client_ver, e.error_time, e.error_con, i.name
FROM
`{self.db_name}`.`RepoSyncError` e
LEFT JOIN `{self.db_name}`.`RepoUserToken` u ON e.token = u.token
LEFT JOIN `{self.db_name}`.`RepoInfo` i ON u.repo_id = i.repo_id
`{self.db_name}`.`RepoSyncError` e
LEFT JOIN `{self.db_name}`.`RepoUserToken` u ON e.token = u.token
LEFT JOIN `{self.db_name}`.`RepoInfo` i ON u.repo_id = i.repo_id
LEFT JOIN `{self.db_name}`.`RepoTokenPeerInfo` p ON e.token = p.token
CROSS JOIN `{self.db_name}`.`RepoOwner` o
WHERE
@ -234,14 +234,14 @@ class SeafileDB:
e.error_time DESC
"""
else:
sql =f"""
sql = f"""
SELECT
u.repo_id, o.owner_id, u.email, e.token,
u.repo_id, o.owner_id, u.email, e.token,
p.peer_id, p.peer_ip, p.peer_name, p.sync_time, p.client_ver, e.error_time, e.error_con, i.name
FROM
`{self.db_name}`.`RepoSyncError` e
LEFT JOIN `{self.db_name}`.`RepoUserToken` u ON e.token = u.token
LEFT JOIN `{self.db_name}`.`RepoInfo` i ON u.repo_id = i.repo_id
`{self.db_name}`.`RepoSyncError` e
LEFT JOIN `{self.db_name}`.`RepoUserToken` u ON e.token = u.token
LEFT JOIN `{self.db_name}`.`RepoInfo` i ON u.repo_id = i.repo_id
LEFT JOIN `{self.db_name}`.`RepoTokenPeerInfo` p ON e.token = p.token
CROSS JOIN `{self.db_name}`.`RepoOwner` o
WHERE
@ -273,7 +273,7 @@ class SeafileDB:
def get_org_trash_repo_list(self, org_id, start, limit):
sql = f"""
SELECT repo_id, repo_name, head_id, owner_id, `size`, del_time
SELECT repo_id, repo_name, head_id, owner_id, `size`, del_time
FROM `{self.db_name}`.`RepoTrash`
WHERE org_id = {org_id}
ORDER BY del_time DESC
@ -306,15 +306,15 @@ class SeafileDB:
"""
empty org repo trash
"""
def del_repo_trash(cursor,repo_ids):
def del_repo_trash(cursor, repo_ids):
del_file_count_sql = """
DELETE FROM
`%s`.`RepoFileCount`
WHERE
WHERE
repo_id in %%s;
""" % self.db_name
cursor.execute(del_file_count_sql, (repo_ids, ))
del_repo_info_sql = """
DELETE FROM
`%s`.`RepoInfo`
@ -322,7 +322,7 @@ class SeafileDB:
repo_id in %%s;
""" % self.db_name
cursor.execute(del_repo_info_sql, (repo_ids, ))
del_trash_sql = """
DELETE FROM
`%s`.`RepoTrash`
@ -330,14 +330,13 @@ class SeafileDB:
repo_id in %%s;
""" % self.db_name
cursor.execute(del_trash_sql, (repo_ids,))
sql_list_repo_id = f"""
SELECT
SELECT
t.repo_id
FROM
`{self.db_name}`.`RepoTrash` t
WHERE
WHERE
org_id={org_id};
"""
with connection.cursor() as cursor:
@ -348,7 +347,7 @@ class SeafileDB:
repo_ids.append(repo_id)
del_repo_trash(cursor, repo_ids)
cursor.close()
def add_repos_to_org_user(self, org_id, username, repo_ids):
for repo_id in repo_ids:
sql = f"""
@ -364,7 +363,7 @@ class SeafileDB:
SET `type`= '%s'
WHERE `repo_id`='%s'
""" % (repo_type, repo_id)
with connection.cursor() as cursor:
cursor.execute(sql)

View File

@ -4,24 +4,24 @@ from mock import patch
from django.urls import reverse
from django.test import override_settings
from seahub.contacts.models import Contact
from seahub.profile.models import Profile
from seahub.profile.utils import refresh_cache
from seahub.api2.endpoints.search_user import SearchUser
from seahub.test_utils import BaseTestCase
from tests.common.utils import randstring
class SearchUserTest(BaseTestCase):
def setUp(self):
self.login_as(self.user)
self.endpoint = reverse('search-user')
@override_settings(CLOUD_MODE = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_can_search(self):
email = self.admin.email
nickname = 'admin_test'
contact_email= 'new_admin_test@test.com'
contact_email = 'new_admin_test@test.com'
p = Profile.objects.add_or_update(email, nickname=nickname)
p.contact_email = contact_email
p.save()
@ -36,11 +36,11 @@ class SearchUserTest(BaseTestCase):
assert json_resp['users'][0]['name'] == nickname
assert json_resp['users'][0]['contact_email'] == contact_email
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = True)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=True)
def test_search_when_enable_addressbook_opt_in(self):
email = self.admin.email
nickname = 'admin_test'
contact_email= 'new_admin_test@test.com'
contact_email = 'new_admin_test@test.com'
p = Profile.objects.add_or_update(email, nickname=nickname)
p.contact_email = contact_email
p.save()
@ -65,12 +65,12 @@ class SearchUserTest(BaseTestCase):
assert json_resp['users'][0]['name'] == nickname
assert json_resp['users'][0]['contact_email'] == contact_email
@override_settings(CLOUD_MODE = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_search_myself(self):
email = self.user.email
nickname = 'user_test'
contact_email= 'new_user_test@test.com'
contact_email = 'new_user_test@test.com'
p = Profile.objects.add_or_update(email, nickname=nickname)
p.contact_email = contact_email
p.save()
@ -85,8 +85,8 @@ class SearchUserTest(BaseTestCase):
assert json_resp['users'][0]['name'] == nickname
assert json_resp['users'][0]['contact_email'] == contact_email
@override_settings(CLOUD_MODE = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_search_without_myself(self):
email = self.user.email
resp = self.client.get(self.endpoint + '?include_self=0&q=' + email)
@ -95,8 +95,8 @@ class SearchUserTest(BaseTestCase):
self.assertEqual(200, resp.status_code)
assert len(json_resp['users']) == 0
@override_settings(CLOUD_MODE = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_search_unregistered_user(self):
resp = self.client.get(self.endpoint + '?q=unregistered_user@seafile.com')
json_resp = json.loads(resp.content)
@ -104,8 +104,8 @@ class SearchUserTest(BaseTestCase):
self.assertEqual(200, resp.status_code)
assert len(json_resp['users']) == 0
@override_settings(CLOUD_MODE = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_can_search_by_nickname(self):
admin_email = self.admin.email
@ -125,8 +125,8 @@ class SearchUserTest(BaseTestCase):
assert json_resp['users'][0]['name'] == 'Carl Smith'
assert json_resp['users'][0]['contact_email'] == 'new_mail@test.com'
@override_settings(CLOUD_MODE = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_can_search_by_nickname_insensitive(self):
admin_email = self.admin.email
@ -158,8 +158,8 @@ class SearchUserTest(BaseTestCase):
assert json_resp['users'][0]['name'] == 'Carl Smith'
assert json_resp['users'][0]['contact_email'] == 'new_mail@test.com'
@override_settings(CLOUD_MODE = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_can_search_by_contact_email(self):
admin_email = self.admin.email
nickname = 'admin_test'
@ -180,9 +180,9 @@ class SearchUserTest(BaseTestCase):
assert json_resp['users'][0]['name'] == nickname
assert json_resp['users'][0]['contact_email'] == 'new_mail@test.com'
@override_settings(CLOUD_MODE = True)
@override_settings(ENABLE_GLOBAL_ADDRESSBOOK = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(CLOUD_MODE=True)
@override_settings(ENABLE_GLOBAL_ADDRESSBOOK=False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
@patch('seahub.api2.endpoints.search_user.is_org_context')
def test_search_full_email(self, mock_is_org_context):
@ -195,7 +195,7 @@ class SearchUserTest(BaseTestCase):
assert json_resp['users'][0]['email'] == self.admin.username
@patch.object(SearchUser, '_can_use_global_address_book')
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN = False)
@override_settings(ENABLE_ADDRESSBOOK_OPT_IN=False)
def test_search_when_not_use_global_address_book(self, mock_can_use_global_address_book):
mock_can_use_global_address_book.return_value = False
@ -210,19 +210,10 @@ class SearchUserTest(BaseTestCase):
resp = self.client.get(self.endpoint + '?q=%s' % contact_email)
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['users'][0]['email'] == self.admin.username
assert json_resp['users'][0]['email'] == contact_email
# search with invalid email & has no contacts
resp = self.client.get(self.endpoint + '?q=%s' % contact_email[:6])
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert len(json_resp['users']) == 0
# search with invalid email & has contact
nickname_of_admin = randstring(6)
Contact.objects.add_contact(self.user.username, self.admin.username)
Profile.objects.add_or_update(self.admin.username, nickname=nickname_of_admin)
resp = self.client.get(self.endpoint + '?q=%s' % nickname_of_admin)
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['users'][0]['email'] == self.admin.username

View File

@ -1,5 +1,6 @@
from seahub.test_utils import BaseTestCase
from seahub.api2.models import TokenV2, TokenV2Manager
from seahub.api2.models import TokenV2
class TokenV2ManagerTest(BaseTestCase):
def setUp(self):
@ -37,7 +38,7 @@ class TokenV2ManagerTest(BaseTestCase):
self.admin.username, self.token.platform, self.token.device_id,
self.token.device_name, '1.1.1', '0.1.1', self.ip_v6)
assert len(TokenV2.objects.all()) == 2
assert TokenV2.objects.all()[1].user == self.admin.username
assert self.admin.username in [item.user for item in TokenV2.objects.all()]
def test_delete_device_token(self):
TokenV2.objects.delete_device_token(