Files
Rasadyar_Users/Authentication/views.py
2026-01-18 12:05:56 +03:30

636 lines
21 KiB
Python

import cryptocode
from django.core.cache import cache
from rest_framework.decorators import permission_classes, api_view
from .models import UserIdentity
from rest_framework.permissions import AllowAny
from django.contrib.auth.models import User, Group
from rest_framework.response import Response
from django.http import HttpResponse
from django.shortcuts import render
from rest_framework import status, viewsets
from oauth2_provider.models import AccessToken
import json
import requests
import random
import uuid
import cryptocode
from oauth2_provider.contrib.rest_framework import (
TokenHasReadWriteScope,
OAuth2Authentication, )
from rest_framework.decorators import authentication_classes
from Authentication.models import ClientToken
from Authentication.sms import send_otp_code
from .serializers import UserIdentitySerializer
from datetime import timedelta
BASE_URL = "https://userbackend.rasadyar.com/api/"
ARTA_CLIENT_ID = 'cpxlBf9GPPnk0nfOMLEa6fZyUrew6Z17wujOUMJr'
ARTA_CLIENT_SECRET = 'ONFoHxBCPOtIUw72QnLL4oa0wOKQNQ6h3Hc8pZrk3qHcR759hmgFn7fJZJMh1nQRWMeRGUHbRoTBFCIQn7OsiKrY7y4JM975T7mjM7WXJs3Ezl30gMAUgfpuEpzJgChz'
CHICKEN_CLIENT_SECRET = '4EK8EAPBOGsUHTeTHpgXrjQwbOQKAnNnQIOHmZa3IlOYVafwV1rmoKHhJE91OmLJ201yp7UkGu5TikiesoZxhNj0FYOyTtC7YtcqvopdBO36e2PSnjuqkLt0yCmaK2ph'
CHICKEN_CLIENT_ID = 'DhL3VMce6p3CBPSTwBg1AJjcaREvddWoOP8G8pHc'
LO_CHICKEN_CLIENT_SECRET = "xqZM6iTDe0XDS1mC8iVhahXqb2TWIZ07mx7yYOZrzTYHyHoFYIpvBm6IcM169fsGZ8uQs3gBHmicgbUMVXwbHyJIaCOeFp9SNK72E4v2OR51om3eH43VMQSK4pEKmMX6"
LO_CHICKEN_CLIENT_ID = "kSHxeTGASY8JsczTinnt5t820clWOKC3X1NHnMOi"
HA_CHICKEN_CLIENT_SECRET = 'l2Gt9AgwOfIneoQU2hamnGYCOiIUdAY2nmLI9eCkNo7wXU6TvNEU93oHtk8IzSHzJc5vVkm9scJaAlWGbzumNenGsQbIESbA1mAsLXWoWSllZKCuGyCBTJtKQ7BhnHZ6'
HA_CHICKEN_CLIENT_ID = 'WwpP780hSemYh8K93MqeuZ3HAir3ahQxDTGG43nG'
DM_CLIENT_ID = '2fDx0CopuiLnRz7YyCQD8nBXKjpxzqZg38Fcl02l'
DM_CLIENT_SECRET = 'PKStjauydu4k157bSaoPVenKHvLVtLI9Upn4JxU7tnHhuHPfAUp1abkfWp55orh7dFCXdE09E5CeWu7vBJsv1VpXz13EBl7OSW2LAceo3ztvq4FNAEVmEEt56cEmQzpF'
INSPECTION_CLIENT_ID = 'R2Ox6eqrXPeh1KbeWLDO5MCapuOFpHDvstOOD1XC'
INSPECTION_CLIENT_SECRET = 'imFgEGkcs248XZkLE7JNMo6mwVkiUMGYUBenBAlgZFwW0lyCYILrmh5Akh8dpHbgpCYaSvuYepFu3WdUXY3ZXPDZq11KbqlrmjHwf8wuW2DUsa0oSDozDv4p9Lx3lJPO'
# # Create your views here.
# @api_view(["POST"])
# @permission_classes([AllowAny])
# def GernalSendOtp(request):
# mobile = request.data["mobile"]
# state = request.data["state"]
# try:
# user = User.objects.get(username__exact=mobile)
# user_identity = UserIdentity.objects.get(user)
# client = ClientToken.objects.get(key=user_identity.client.key)
# except User.DoesNotExist:
# return Response({'is_user': False}, status=status.HTTP_401_UNAUTHORIZED)
# if len(mobile) < 11 or len(mobile) > 11:
# return Response(
# {
# "pattern": "wrong",
# },
# status=status.HTTP_403_FORBIDDEN,
# )
# key = str(uuid.uuid4())
# rand = random.randint(10000, 99000)
# cache.set(key, str(rand), timeout=120)
# if not User.objects.filter(username=mobile).exists():
# receptor = mobile
# send_otp_code(receptor, rand)
# return Response(
# {
# "is_user": False,
# "key": key,
# },
# status=status.HTTP_404_NOT_FOUND,
# )
#
# if state == "forget_password":
# receptor = mobile
# send_otp_code(receptor, rand)
# return Response(
# {
# "is_user": True,
# "key": key,
# },
# status=status.HTTP_200_OK,
# )
#
# elif state == "change_password":
# receptor = mobile
# send_otp_code(receptor, rand)
# return Response(
# {
# "is_user": True,
# "key": key,
# },
# status=status.HTTP_200_OK,
# )
#
# elif state == "":
# return Response(
# {
# "is_user": True,
# },
# status=status.HTTP_200_OK,
# )
@api_view(["POST"])
@permission_classes([AllowAny])
def send_otp(request):
# frontend_url = request.headers.get("Origin")
# frontend_url = request.data.get("frontend_url", frontend_url)
# if "https://rasadyaar.ir" in frontend_url:
# return Response({'result': 'https://rasadyar.net'}, status.HTTP_401_UNAUTHORIZED)
mobile = request.data["mobile"]
state = request.data["state"]
try:
user = User.objects.get(username__exact=mobile)
user_identity = UserIdentity.objects.get(user=user)
except User.DoesNotExist:
return Response({'is_user': False}, status=status.HTTP_404_NOT_FOUND)
if len(mobile) < 11 or len(mobile) > 11:
return Response(
{
"pattern": "wrong",
},
status=status.HTTP_403_FORBIDDEN,
)
key = str(uuid.uuid4())
rand = random.randint(10000, 99000)
cache.set(key, str(rand), timeout=120)
if not User.objects.filter(username=mobile).exists():
receptor = mobile
# send_otp_code(receptor, rand)
return Response(
{
"is_user": False,
"key": key,
},
status=status.HTTP_404_NOT_FOUND,
)
if state == "forget_password":
receptor = mobile
send_otp_code(receptor, rand)
return Response(
{
"is_user": True,
"key": key,
"address": user_identity.client.client_web_address,
"backend": user_identity.client.client_web_address_backend,
"api_key": user_identity.client.client_token,
},
status=status.HTTP_200_OK,
)
elif state == "change_password":
receptor = mobile
send_otp_code(receptor, rand)
return Response(
{
"is_user": True,
"key": key,
"address": user_identity.client.client_web_address,
"backend": user_identity.client.client_web_address_backend,
"api_key": user_identity.client.client_token,
},
status=status.HTTP_200_OK,
)
elif state == "":
return Response(
{
"is_user": True,
"address": user_identity.client.client_web_address,
"backend": user_identity.client.client_web_address_backend,
"api_key": user_identity.client.client_token,
},
status=status.HTTP_200_OK,
)
@api_view(["POST"])
@permission_classes([AllowAny])
def store_send_otp(request):
mobile = request.data["mobile"]
key = str(uuid.uuid4())
rand = random.randint(10000, 99000)
cache.set(key, str(rand), timeout=120)
receptor = mobile
send_otp_code(receptor, rand)
return Response(
{
"key": key,
},
status=status.HTTP_200_OK,
)
@api_view(["POST"])
@permission_classes([AllowAny])
def change_user_mobile(request):
first_mobile = request.data["first_mobile_number"]
second_mobile = request.data["second_mobile_number"]
user = User.objects.get(username=first_mobile)
user.username = second_mobile
user.save()
# user_identity=UserIdentity.objects.get(mobile=first_mobile)
# user_identity.mobile=second_mobile
# user_identity.save()
return Response({"result": "number changed"}, status=status.HTTP_200_OK)
@api_view(["POST"])
@permission_classes([AllowAny])
def check_otp(request):
key = request.data["key"]
code = cache.get(key)
if request.data["code"] == code:
return Response(
{
"code": True,
},
status=status.HTTP_200_OK,
)
else:
return Response(
{
"code": False,
},
status=status.HTTP_403_FORBIDDEN,
)
@api_view(["POST"])
@permission_classes([AllowAny])
# @permission_classes([TokenHasReadWriteScope])
@authentication_classes([OAuth2Authentication])
def change_password(request):
username = request.data["username"]
password = request.data["password"]
user = User.objects.get(username=username)
user.password = cryptocode.encrypt(password, password)
user.save()
return Response({"password": "changed"}, status=status.HTTP_200_OK)
@api_view(["POST"])
@permission_classes([AllowAny])
def register(request):
# if 'role' in request.data.keys() and 'tenant' in request.data.keys():
# request.data.pop('role')
# request.data.pop('tenant')
username = request.data["username"]
password = request.data["password"]
api_key = request.data["api_key"]
client = ClientToken.objects.get(client_token=api_key)
if User.objects.filter(username__exact=username).exists():
return Response({"result": "user exist"}, status=status.HTTP_400_BAD_REQUEST)
if 'first_name' in request.data.keys() and 'last_name' in request.data.keys():
user = User(
username=username, password=cryptocode.encrypt(password, password), first_name=request.data['first_name'],
last_name=request.data['last_name']
)
else:
user = User(
username=username, password=cryptocode.encrypt(password, password)
)
user.save()
# if 'role' in request.data.keys():
# group = Group.objects.get(name__exact=request.data['role'])
if not UserIdentity.objects.filter(user=user):
user_identity = UserIdentity(
user=user,
client=client
)
user_identity.save()
if 'national_code' in request.data.keys():
user_identity.national_id = request.data['national_code']
if 'first_name' in request.data.keys() and 'last_name' in request.data.keys():
user_identity.first_name = request.data['first_name']
user_identity.last_name = request.data['last_name']
user_identity.mobile = request.data['username']
user_identity.save()
# user_identity.role.add(group)
data = {
"username": str(user.username),
"password": user.password,
"client_id": client.client_id,
"client_secret": client.client_secret,
"grant_type": "client_credentials",
# "scope": "read"
"scope": "read write",
}
r = requests.post(url=BASE_URL + "token/", data=json.dumps(data), verify=False)
access = AccessToken.objects.get(token=r.json()["access_token"])
access.user = user
access.save()
dict_info = {
"access_token": r.json()["access_token"],
"expires_in": r.json()["expires_in"],
"token_type": r.json()["token_type"],
"scope": r.json()["scope"],
"expire_time": access.expires,
}
# r.json()["expire_time"]=access.expires
return Response(dict_info, status=status.HTTP_200_OK)
@api_view(["POST"])
@permission_classes([AllowAny])
def register_all(request):
username = request.data["username"]
password = request.data["password"]
api_key = request.data["api_key"]
client = ClientToken.objects.get(client_token=api_key)
if User.objects.filter(username__exact=username).exists():
pass
else:
if 'first_name' in request.data.keys() and 'last_name' in request.data.keys():
user = User(
username=username, password=password, first_name=request.data['first_name'],
last_name=request.data['last_name']
)
else:
user = User(
username=username, password=password
)
user.save()
if not UserIdentity.objects.filter(user=user):
user_identity = UserIdentity(
user=user,
client=client
)
user_identity.save()
if 'national_code' in request.data.keys():
user_identity.national_id = request.data['national_code']
if 'first_name' in request.data.keys() and 'last_name' in request.data.keys():
user_identity.first_name = request.data['first_name']
user_identity.last_name = request.data['last_name']
user_identity.mobile = request.data['username']
user_identity.save()
return Response("ok", status=status.HTTP_200_OK)
@api_view(["POST"])
@permission_classes([AllowAny])
def login(request):
username = request.data['username']
password = (request.data['password'],)
api_key = request.data["api_key"]
roles = []
roles_from_request = []
client = ClientToken.objects.get(client_token=api_key)
try:
user = User.objects.get(username__exact=username)
except User.DoesNotExist:
return Response({'is_user': False}, status=status.HTTP_401_UNAUTHORIZED)
if 'role' in request.data.keys():
if type(request.data['role']) is list:
roles_from_request = request.data['role']
else:
roles_from_request.append(request.data['role'])
if 'user_key' in request.data.keys():
for item in roles_from_request:
group = Group.objects.get(name__exact=item)
if not UserIdentity.objects.filter(user=user, role=group):
if not UserIdentity.objects.filter(user=user).exists():
user_identity = UserIdentity()
else:
user_identity = UserIdentity.objects.get(user=user)
user_identity.user = user
user_identity.key = request.data['user_key']
user_identity.client = client
user_identity.save()
user_identity.role.add(group)
else:
user_identity = UserIdentity.objects.get(user=user)
user_identity.key = request.data['user_key']
user_identity.client = client
user_identity.save()
for item in user_identity.role.all():
roles.append(item.name)
decrypted_password = cryptocode.decrypt(user.password, password[0])
if decrypted_password != password[0]:
return Response({'password': 'wrong'}, status=status.HTTP_401_UNAUTHORIZED)
data = {
"username": username,
"password": password,
"client_id": client.client_id,
"client_secret": client.client_secret,
"grant_type": "client_credentials",
"scope": "read write",
}
r = requests.post(url=BASE_URL + "token/", data=json.dumps(data), verify=False)
access = AccessToken.objects.get(token=r.json()["access_token"])
access.user = user
access.save()
dict_info = {
"access_token": r.json()["access_token"],
"expires_in": r.json()["expires_in"],
"token_type": r.json()["token_type"],
"scope": r.json()["scope"],
"expire_time": access.expires,
"role": roles
}
return Response(dict_info, status=status.HTTP_200_OK)
class UserIdentityViewSet(viewsets.ModelViewSet):
queryset = UserIdentity.objects.all()
serializer_class = UserIdentitySerializer
permission_classes = [TokenHasReadWriteScope]
def list(self, request, *args, **kwargs):
pass
def retrieve(self, request, *args, **kwargs):
pass
def create(self, request, *args, **kwargs):
edit_type = request.data['type']
request.data.pop('type')
if edit_type == 'check_user':
# return Response({'sss': 'exist'}, status=status.HTTP_201_CREATED)
# if user exists in system
if self.queryset.filter(
mobile=request.data['value']
).exists() or self.queryset.filter(
national_id=request.data['value']
).exists():
if self.queryset.filter(
mobile=request.data['value']
).exists():
# contains user object
user = self.queryset.get(
mobile=request.data['value'],
)
if self.queryset.filter(
national_id=request.data['value']
).exists():
# contains user object
user = self.queryset.get(
national_id=request.data['value'],
)
serializer = self.serializer_class(user)
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(status=status.HTTP_404_NOT_FOUND)
def update(self, request, *args, **kwargs):
# contains user identity object
user_identity = UserIdentity.objects.get(key=request.data['userprofile_key'])
request.data.pop('userprofile_key') # remove user key from data
serializer = self.serializer_class(data=request.data)
if serializer.is_valid():
identity_obj = serializer.update(validated_data=request.data, instance=user_identity)
serializer = self.serializer_class(identity_obj)
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def destroy(self, request, *args, **kwargs):
pass
@api_view(["GET"])
@permission_classes([AllowAny])
def Find_User(request):
data = request.GET["data"]
if UserIdentity.objects.filter(mobile=data).exists():
user = UserIdentity.objects.get(mobile=data)
elif UserIdentity.objects.filter(national_id=data).exists():
user = UserIdentity.objects.get(national_id=data)
else:
return Response({"result": "user not found"}, status=status.HTTP_401_UNAUTHORIZED)
return Response({
"firstname": user.first_name,
"lastname": user.last_name,
"national_id": user.national_id,
"mobile": user.mobile,
"city": user.city,
"province": user.province,
})
@api_view(["POST"])
@permission_classes([AllowAny])
def Identity(request):
user = UserIdentity.objects.get(user__username=request.data["mobile"])
user.mobile = request.data["mobile"]
user.first_name = request.data["first_name"]
user.last_name = request.data["last_name"]
user.national_id = request.data["national_id"]
user.city = request.data["city"]
user.province = request.data["province"]
user.save()
return Response({"mobile": user.mobile, "first_name": user.first_name, "last_name": user.last_name})
@api_view(["GET"])
@permission_classes([AllowAny])
def NumberOfActiveUsers(request):
from datetime import datetime
now=datetime.now().date()
access = AccessToken.objects.filter(expires__date__gte=now)
return Response({"number_of_active_users":len(access)})
@api_view(["GET"])
@permission_classes([AllowAny])
def remove_access_token(request):
import datetime
token=request.GET.get('token')
now = datetime.datetime.now()
accesses = AccessToken.objects.filter(created__date__gte=now.date() - timedelta(days=3))
if token is not None:
accesses=accesses.filter(token=token)
for access in accesses:
access.expires = now - timedelta(days=2)
access.save()
return Response("ok",status=status.HTTP_200_OK)
@api_view(["GET"])
@permission_classes([AllowAny])
def check_user_exists(request):
mobile = request.GET.get('mobile')
if not mobile:
return Response(
{"error": "mobile parameter is required"},
status=status.HTTP_400_BAD_REQUEST
)
try:
user = User.objects.get(username__exact=mobile)
return Response(
{
"exists": True,
"mobile": mobile,
"user_id": user.id
},
status=status.HTTP_404_NOT_FOUND
)
except User.DoesNotExist:
return Response(
{
"exists": False,
"mobile": mobile
},
status=status.HTTP_200_OK
)
@api_view(["POST"])
@permission_classes([AllowAny])
def remove_user_role(request):
mobile = request.data.get('mobile')
role = request.data.get('role')
if not mobile:
return Response(
{"error": "mobile parameter is required"},
status=status.HTTP_400_BAD_REQUEST
)
if not role:
return Response(
{"error": "role parameter is required"},
status=status.HTTP_400_BAD_REQUEST
)
try:
user = User.objects.get(username__exact=mobile)
except User.DoesNotExist:
return Response(
{"error": "user not found"},
status=status.HTTP_404_NOT_FOUND
)
try:
user_identity = UserIdentity.objects.get(user=user)
except UserIdentity.DoesNotExist:
return Response(
{"error": "user identity not found"},
status=status.HTTP_404_NOT_FOUND
)
try:
group = Group.objects.get(name__exact=role)
except Group.DoesNotExist:
return Response(
{"error": "role not found"},
status=status.HTTP_404_NOT_FOUND
)
if user_identity.role.filter(id=group.id).exists():
user_identity.role.remove(group)
return Response(
{
"result": "role removed successfully",
"mobile": mobile,
"role": role
},
status=status.HTTP_200_OK
)
else:
return Response(
{
"error": "user does not have this role",
"mobile": mobile,
"role": role
},
status=status.HTTP_400_BAD_REQUEST
)