253 lines
8.8 KiB
Python
253 lines
8.8 KiB
Python
import datetime
|
||
import re
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from requests.adapters import HTTPAdapter
|
||
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
|
||
|
||
from django.db.models import Q
|
||
from django.views.decorators.csrf import csrf_exempt
|
||
from rest_framework import status
|
||
from rest_framework.decorators import api_view, permission_classes
|
||
from rest_framework.permissions import AllowAny
|
||
from rest_framework.response import Response
|
||
|
||
from app.cityandprovince import correct_province, correct_city, search_city_list, \
|
||
search_province_list
|
||
from app.models import TransportingDetail, Guilds, AllProductsTransport
|
||
from app.serializers import TransportingDetailForUpdateSerializer, AllProductsTransportSerializer
|
||
|
||
|
||
@api_view(["POST"])
|
||
@permission_classes([AllowAny])
|
||
@csrf_exempt
|
||
def get_bar_info(request):
|
||
day = datetime.datetime.now() - datetime.timedelta(days=3)
|
||
kill_houses = request.data
|
||
kill_houses = dict(kill_houses)['kill_house']
|
||
bars = AllProductsTransport.objects.filter(trash=False, out=True, jihadi_destination__in=kill_houses,
|
||
unloading='تخلیه شده.',product='مرغ زنده -جهت كشتار',hatching__isnull=False,
|
||
date__gte=day).order_by('-date')
|
||
ser_data = AllProductsTransportSerializer(bars, many=True).data
|
||
return Response(ser_data, status=status.HTTP_200_OK)
|
||
|
||
|
||
@api_view(["GET"])
|
||
@permission_classes([AllowAny])
|
||
@csrf_exempt
|
||
def test_city(request):
|
||
excluded_provinces = search_province_list
|
||
excluded_city = search_city_list
|
||
# hatchings = Hatching.objects.filter(
|
||
# ~Q(ProvinceName__in=excluded_provinces) | ~Q(CityName__in=excluded_city)
|
||
# )
|
||
transport = TransportingDetail.objects.filter(
|
||
~Q(Province__in=excluded_provinces) | ~Q(City__in=excluded_city)).only('Province', 'City')
|
||
# hatchings = Hatching.objects.filter(
|
||
# Q(ProvinceName__contains='<27>') | Q( CityName__contains='<27>')
|
||
# )
|
||
l = 1
|
||
for t in transport:
|
||
print(l)
|
||
if t.Province not in excluded_provinces:
|
||
t.Province = correct_province(t.Province)
|
||
t.save()
|
||
if t.City not in excluded_city:
|
||
t.CityName = correct_city(t.City, t.Province)
|
||
t.save()
|
||
l += 1
|
||
# if hatching:
|
||
|
||
return Response('lo')
|
||
|
||
|
||
class SSLAdapter(HTTPAdapter):
|
||
def __init__(self, *args, **kwargs):
|
||
self.context = create_urllib3_context()
|
||
self.context.options |= 0x4 # OP_LEGACY_SERVER_CONNECT
|
||
super().__init__(*args, **kwargs)
|
||
|
||
def init_poolmanager(self, *args, **kwargs):
|
||
kwargs['ssl_context'] = self.context
|
||
return super().init_poolmanager(*args, **kwargs)
|
||
|
||
def build_response(self, req, resp):
|
||
resp = super().build_response(req, resp)
|
||
return resp
|
||
|
||
|
||
def check_quarantine_code(lst):
|
||
result_code = []
|
||
for code in lst:
|
||
session = requests.Session()
|
||
session.mount('https://', SSLAdapter())
|
||
data = {'gid': str(code)}
|
||
m = session.post('https://e.ivo.ir/Rahgiri/Gidprnt.aspx', data=data, verify=False,
|
||
headers={
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||
' (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'})
|
||
context = BeautifulSoup(m.text, 'html.parser')
|
||
table = context.find_all('table')
|
||
if table[5:6]:
|
||
row = context.find('div', align="right")
|
||
if row:
|
||
content = row.get_text(separator=" ", strip=True)
|
||
date_match = re.search(r'تاريخ:(\d{4}/\d{2}/\d{2})', content)
|
||
if date_match:
|
||
for i in table[5:6]:
|
||
row = i.find_all('tr')
|
||
for r in row[1:2]:
|
||
quantity = r.find('td')
|
||
match = re.search(r'\d+', quantity.text)
|
||
if match:
|
||
result_code.append(code)
|
||
return result_code
|
||
|
||
|
||
def get_hatching_permit_code(code):
|
||
result = {}
|
||
|
||
session = requests.Session()
|
||
session.mount('https://', SSLAdapter())
|
||
data = {'gid': str(code)}
|
||
m = session.post(
|
||
'https://e.ivo.ir/Rahgiri/Gidprnt.aspx',
|
||
data=data,
|
||
verify=False,
|
||
headers={
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||
' (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
|
||
},
|
||
timeout=20,
|
||
)
|
||
text = m.text
|
||
permit = None
|
||
match = re.search(r'کد\s*مجوز\s*جوجه\s*ریزی\s*[::]\s*([0-9۰-۹]+)', text)
|
||
if match:
|
||
raw_permit = match.group(1)
|
||
trans = str.maketrans('۰۱۲۳۴۵۶۷۸۹', '0123456789')
|
||
permit = raw_permit.translate(trans)
|
||
if permit:
|
||
result[str(code)] = permit
|
||
return result
|
||
|
||
|
||
@api_view(["GET"])
|
||
@permission_classes([AllowAny])
|
||
@csrf_exempt
|
||
def api_get_hatching_permit_code(request):
|
||
code = request.GET.get('code')
|
||
if not code:
|
||
return Response({'detail': 'code query param is required'}, status=status.HTTP_400_BAD_REQUEST)
|
||
try:
|
||
data = get_hatching_permit_code(code)
|
||
return Response(data, status=status.HTTP_200_OK)
|
||
except Exception as e:
|
||
return Response({'detail': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||
|
||
|
||
def normalize_persian_arabic_text(text):
|
||
"""
|
||
نرمالسازی متن فارسی/عربی برای یکسانسازی کاراکترهای مشابه
|
||
تبدیل کاراکترهای فارسی به عربی برای سازگاری با دیتابیس
|
||
این تابع کاراکترهای 'ک' و 'ی' فارسی را به 'ك' و 'ي' عربی تبدیل میکند
|
||
تا با فرم استاندارد ذخیره شده در دیتابیس مطابقت داشته باشد
|
||
"""
|
||
if not text:
|
||
return text
|
||
|
||
# تبدیل کاراکترهای فارسی به عربی
|
||
# 'ک' (U+06A9 - Persian Kaf) -> 'ك' (U+0643 - Arabic Kaf)
|
||
# 'ی' (U+06CC - Persian Yeh) -> 'ي' (U+064A - Arabic Yeh)
|
||
text = str(text)
|
||
# text = text.replace('ک', 'ك') # Persian Kaf to Arabic Kaf
|
||
text = text.replace('ی', 'ي') # Persian Yeh to Arabic Yeh
|
||
|
||
return text
|
||
|
||
|
||
def create_guild(**info):
|
||
|
||
Guilds(
|
||
)
|
||
|
||
|
||
class SSLAdapter(HTTPAdapter):
|
||
def __init__(self, *args, **kwargs):
|
||
self.context = create_urllib3_context()
|
||
self.context.options |= 0x4 # OP_LEGACY_SERVER_CONNECT
|
||
super().__init__(*args, **kwargs)
|
||
|
||
|
||
|
||
|
||
|
||
from datetime import datetime, timedelta
|
||
from django.utils import timezone
|
||
|
||
|
||
def apply_date_filter(queryset, date_filter):
|
||
if not date_filter:
|
||
return queryset
|
||
|
||
field = date_filter.get("field", "Date")
|
||
filter_type = date_filter.get("type")
|
||
value = date_filter.get("value")
|
||
|
||
now = timezone.now()
|
||
|
||
if filter_type == "today":
|
||
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
end = start + timedelta(days=1)
|
||
return queryset.filter(
|
||
**{f"{field}__gte": start, f"{field}__lt": end}
|
||
)
|
||
|
||
if filter_type == "yesterday":
|
||
start = (now - timedelta(days=1)).replace(
|
||
hour=0, minute=0, second=0, microsecond=0
|
||
)
|
||
end = start + timedelta(days=1)
|
||
return queryset.filter(
|
||
**{f"{field}__gte": start, f"{field}__lt": end}
|
||
)
|
||
|
||
if filter_type == "last_n_days" and value:
|
||
start = now - timedelta(days=int(value))
|
||
return queryset.filter(**{f"{field}__gte": start})
|
||
|
||
if filter_type == "this_week":
|
||
start = now - timedelta(days=now.weekday())
|
||
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
return queryset.filter(**{f"{field}__gte": start})
|
||
|
||
if filter_type == "this_month":
|
||
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||
return queryset.filter(**{f"{field}__gte": start})
|
||
|
||
if filter_type == "last_n_month" and value:
|
||
start = now
|
||
for _ in range(int(value)):
|
||
start = (start.replace(day=1) - timedelta(days=1)).replace(day=1)
|
||
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
return queryset.filter(**{f"{field}__gte": start})
|
||
|
||
if filter_type == "this_year":
|
||
start = now.replace(
|
||
month=1, day=1, hour=0, minute=0, second=0, microsecond=0
|
||
)
|
||
return queryset.filter(**{f"{field}__gte": start})
|
||
|
||
if filter_type == "last_n_year" and value:
|
||
start = now.replace(
|
||
year=now.year - int(value),
|
||
month=1,
|
||
day=1,
|
||
hour=0,
|
||
minute=0,
|
||
second=0,
|
||
microsecond=0
|
||
)
|
||
return queryset.filter(**{f"{field}__gte": start})
|
||
|
||
return queryset |