1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-08-11 20:01:40 +00:00
seahub/thirdpart/social_django/storage.py

220 lines
7.3 KiB
Python
Raw Normal View History

"""Django ORM models for Social Auth"""
import base64
import six
import sys
from django.db import transaction
from django.db.utils import IntegrityError
from social_core.storage import UserMixin, AssociationMixin, NonceMixin, \
CodeMixin, PartialMixin, BaseStorage
from seahub.base.accounts import User
class DjangoUserMixin(UserMixin):
"""Social Auth association model"""
@classmethod
def changed(cls, user):
user.save()
def set_extra_data(self, extra_data=None):
if super(DjangoUserMixin, self).set_extra_data(extra_data):
self.save()
@classmethod
def allowed_to_disconnect(cls, user, backend_name, association_id=None):
if association_id is not None:
qs = cls.objects.exclude(id=association_id)
else:
qs = cls.objects.exclude(provider=backend_name)
qs = qs.filter(username=user.username)
if hasattr(user, 'has_usable_password'):
valid_password = user.has_usable_password()
else:
valid_password = True
return valid_password or qs.count() > 0
@classmethod
def disconnect(cls, entry):
entry.delete()
@classmethod
def username_field(cls):
return 'username'
# return getattr(cls.user_model(), 'USERNAME_FIELD', 'username')
@classmethod
def user_exists(cls, *args, **kwargs):
"""
Return True/False if a User instance exists with the given arguments.
Arguments are directly passed to filter() manager method.
"""
if 'username' in kwargs:
kwargs[cls.username_field()] = kwargs.pop('username')
assert 'username' in kwargs
try:
User.objects.get(email=kwargs['username'])
return True
except User.DoesNotExist:
return False
# return cls.user_model().objects.filter(*args, **kwargs).count() > 0
@classmethod
def get_username(cls, user):
return getattr(user, cls.username_field(), None)
@classmethod
def create_user(cls, *args, **kwargs):
username_field = cls.username_field()
if 'username' in kwargs and username_field not in kwargs:
kwargs[username_field] = kwargs.pop('username')
assert 'username' in kwargs
user = User.objects.create_user(email=kwargs['username'],
2019-01-28 07:31:12 +00:00
is_active=True)
# try:
# if hasattr(transaction, 'atomic'):
# # In Django versions that have an "atomic" transaction decorator / context
# # manager, there's a transaction wrapped around this call.
# # If the create fails below due to an IntegrityError, ensure that the transaction
# # stays undamaged by wrapping the create in an atomic.
# with transaction.atomic():
# user = cls.user_model().objects.create_user(*args, **kwargs)
# else:
# user = cls.user_model().objects.create_user(*args, **kwargs)
# except IntegrityError:
# # User might have been created on a different thread, try and find them.
# # If we don't, re-raise the IntegrityError.
# exc_info = sys.exc_info()
# # If email comes in as None it won't get found in the get
# if kwargs.get('email', True) is None:
# kwargs['email'] = ''
# try:
# user = cls.user_model().objects.get(*args, **kwargs)
# except cls.user_model().DoesNotExist:
# six.reraise(*exc_info)
return user
@classmethod
def get_user(cls, pk=None, **kwargs):
if pk:
kwargs = {'pk': pk}
try:
return User.objects.get(email=pk)
except User.DoesNotExist:
return None
# try:
# return cls.user_model().objects.get(**kwargs)
# except cls.user_model().DoesNotExist:
# return None
@classmethod
def get_users_by_email(cls, email):
user_model = cls.user_model()
email_field = getattr(user_model, 'EMAIL_FIELD', 'email')
return user_model.objects.filter(**{email_field + '__iexact': email})
@classmethod
def get_social_auth(cls, provider, uid):
if not isinstance(uid, six.string_types):
uid = str(uid)
try:
return cls.objects.get(provider=provider, uid=uid)
except cls.DoesNotExist:
return None
@classmethod
def get_social_auth_for_user(cls, user, provider=None, id=None):
qs = cls.objects.filter(username=user.username)
if provider:
qs = qs.filter(provider=provider)
if id:
qs = qs.filter(id=id)
return qs
@classmethod
def create_social_auth(cls, user, uid, provider):
if not isinstance(uid, six.string_types):
uid = str(uid)
if hasattr(transaction, 'atomic'):
# In Django versions that have an "atomic" transaction decorator / context
# manager, there's a transaction wrapped around this call.
# If the create fails below due to an IntegrityError, ensure that the transaction
# stays undamaged by wrapping the create in an atomic.
with transaction.atomic():
social_auth = cls.objects.create(username=user.username, uid=uid, provider=provider)
else:
social_auth = cls.objects.create(username=user.username, uid=uid, provider=provider)
return social_auth
class DjangoNonceMixin(NonceMixin):
@classmethod
def use(cls, server_url, timestamp, salt):
return cls.objects.get_or_create(server_url=server_url,
timestamp=timestamp,
salt=salt)[1]
class DjangoAssociationMixin(AssociationMixin):
@classmethod
def store(cls, server_url, association):
# Don't use get_or_create because issued cannot be null
try:
assoc = cls.objects.get(server_url=server_url,
handle=association.handle)
except cls.DoesNotExist:
assoc = cls(server_url=server_url,
handle=association.handle)
assoc.secret = base64.encodestring(association.secret)
assoc.issued = association.issued
assoc.lifetime = association.lifetime
assoc.assoc_type = association.assoc_type
assoc.save()
@classmethod
def get(cls, *args, **kwargs):
return cls.objects.filter(*args, **kwargs)
@classmethod
def remove(cls, ids_to_delete):
cls.objects.filter(pk__in=ids_to_delete).delete()
class DjangoCodeMixin(CodeMixin):
@classmethod
def get_code(cls, code):
try:
return cls.objects.get(code=code)
except cls.DoesNotExist:
return None
class DjangoPartialMixin(PartialMixin):
@classmethod
def load(cls, token):
try:
return cls.objects.get(token=token)
except cls.DoesNotExist:
return None
@classmethod
def destroy(cls, token):
partial = cls.load(token)
if partial:
partial.delete()
class BaseDjangoStorage(BaseStorage):
user = DjangoUserMixin
nonce = DjangoNonceMixin
association = DjangoAssociationMixin
code = DjangoCodeMixin