import - non parallel version of link rancher to org
This commit is contained in:
96
apps/herd/management/commands/link_ranchers_on_org.py
Normal file
96
apps/herd/management/commands/link_ranchers_on_org.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
|
from apps.authentication.models import Organization
|
||||||
|
from apps.herd.models import Rancher, RancherOrganizationLink
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = "Link ranchers to cooperative of their city"
|
||||||
|
|
||||||
|
BATCH_SIZE = 2000
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
|
||||||
|
qs = (
|
||||||
|
Rancher.objects
|
||||||
|
.filter(city__isnull=False)
|
||||||
|
.select_related('city')
|
||||||
|
)
|
||||||
|
|
||||||
|
total = qs.count()
|
||||||
|
processed = 0
|
||||||
|
created = 0
|
||||||
|
skipped = 0
|
||||||
|
ambiguous = 0
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
|
||||||
|
buffer = []
|
||||||
|
|
||||||
|
for rancher in qs.iterator(chunk_size=self.BATCH_SIZE):
|
||||||
|
|
||||||
|
processed += 1
|
||||||
|
|
||||||
|
# اگر قبلاً لینک داره
|
||||||
|
if RancherOrganizationLink.objects.filter(rancher=rancher).exists():
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
orgs = Organization.objects.filter(
|
||||||
|
city=rancher.city,
|
||||||
|
type__key='CO' # ⚠️ کد type تعاونی خودت
|
||||||
|
)
|
||||||
|
|
||||||
|
if not orgs.exists():
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if orgs.count() > 1:
|
||||||
|
ambiguous += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
buffer.append(
|
||||||
|
RancherOrganizationLink(
|
||||||
|
rancher=rancher,
|
||||||
|
organization=orgs.first()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
print(len(buffer))
|
||||||
|
if len(buffer) >= self.BATCH_SIZE:
|
||||||
|
created += self.bulk_create(buffer)
|
||||||
|
buffer.clear()
|
||||||
|
|
||||||
|
# progress log
|
||||||
|
if processed % 10_000 == 0:
|
||||||
|
elapsed = time.time() - start
|
||||||
|
speed = processed / elapsed if elapsed else 0
|
||||||
|
|
||||||
|
self.stdout.write(
|
||||||
|
self.style.NOTICE(
|
||||||
|
f"[{processed:,}/{total:,}] "
|
||||||
|
f"created={created:,} "
|
||||||
|
f"skipped={skipped:,} "
|
||||||
|
f"ambiguous={ambiguous:,} "
|
||||||
|
f"{speed:.0f} r/s"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if buffer:
|
||||||
|
created += self.bulk_create(buffer)
|
||||||
|
|
||||||
|
self.stdout.write(
|
||||||
|
self.style.SUCCESS(
|
||||||
|
f"DONE ✅ created={created}, skipped={skipped}, ambiguous={ambiguous}"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
@transaction.atomic
|
||||||
|
def bulk_create(self, objs):
|
||||||
|
RancherOrganizationLink.objects.bulk_create(
|
||||||
|
objs,
|
||||||
|
ignore_conflicts=True
|
||||||
|
)
|
||||||
|
return len(objs)
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import os
|
||||||
from multiprocessing import cpu_count, Pool
|
from multiprocessing import cpu_count, Pool
|
||||||
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
@@ -5,6 +6,19 @@ from django.db import connection
|
|||||||
|
|
||||||
from .link_ranchers_parallel_worker import process_city
|
from .link_ranchers_parallel_worker import process_city
|
||||||
|
|
||||||
|
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
|
||||||
|
|
||||||
|
|
||||||
|
def run_worker(chunk):
|
||||||
|
import os
|
||||||
|
import django
|
||||||
|
|
||||||
|
os.environ.setdefault(
|
||||||
|
"DJANGO_SETTINGS_MODULE",
|
||||||
|
"Rasaddam_Backend.settings"
|
||||||
|
)
|
||||||
|
django.setup()
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = "Parallel link ranchers to cooperative by city"
|
help = "Parallel link ranchers to cooperative by city"
|
||||||
@@ -20,6 +34,14 @@ class Command(BaseCommand):
|
|||||||
from apps.herd.models import Rancher
|
from apps.herd.models import Rancher
|
||||||
workers = options['worker']
|
workers = options['worker']
|
||||||
|
|
||||||
|
from multiprocessing import get_context
|
||||||
|
|
||||||
|
chunks = self.prepare_chunks()
|
||||||
|
|
||||||
|
ctx = get_context("spawn")
|
||||||
|
with ctx.Pool(processes=options["workers"]) as pool:
|
||||||
|
pool.map(run_worker, chunks)
|
||||||
|
|
||||||
city_ids = (
|
city_ids = (
|
||||||
Rancher.objects.filter(city__isnull=False)
|
Rancher.objects.filter(city__isnull=False)
|
||||||
.values_list('city_id', flat=True)
|
.values_list('city_id', flat=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user