# توابع مورد نیاز برای اتصال به ای پی آی استوریج آروان # test import boto3 import logging from botocore.exceptions import ClientError from django.http import HttpResponse from PIL import Image import io import base64 ARVAN_STORAGE_URL = "https://dmstore.s3.ir-thr-at1.arvanstorage.ir/36bba98f-a813-4667-bd60-33aef708bcba.jpg?AWSAccessKeyId=d5739a44-e663-4f43-99f3-13121a62a9e6&Signature=KpBpHBtAS77Y3hHx53g6bmjlGpc%3D&Expires=1651552380" def connect(): logging.basicConfig(level=logging.INFO) try: s3_resource = boto3.resource( 's3', endpoint_url='https://s3.ir-thr-at1.arvanstorage.ir', aws_access_key_id='b0b563b2-bb60-4faf-b09a-660982e70b00', aws_secret_access_key='abdcccaadbd3d897b5432f72bc91048940e012ffa4f308ba0fe16f28e3c80e57' ) except Exception as exc: logging.info(exc) return s3_resource def get_bucket_list(): s3_resource = connect() li = [] try: for bucket in s3_resource.buckets.all(): logging.info(f'bucket_name: {bucket.name}') li.append(bucket.name) except ClientError as exc: logging.error(exc) return li[0] # این تابع رو درتمامی قسمت های سیستم میتونیم برای ذخیره عکس بر روی استوریج آروان استفاده کنیم # def upload_object(image_data, bucket_name, object_name): # resource_connect = connect() # s3_resource = resource_connect # bucket = s3_resource.Bucket(bucket_name) # with open(object_name, "wb") as fh: # fh.write(base64.standard_b64decode(image_data)) # with open(object_name, "rb") as fh: # bucket.put_object( # ACL='public-read', # Body=fh, # Key=object_name # ) def upload_object(image_data, bucket_name, object_name): resource_connect = connect() s3_resource = resource_connect bucket = s3_resource.Bucket(bucket_name) buffer = io.BytesIO() imgdata = base64.b64decode(image_data) img = Image.open(io.BytesIO(imgdata)) new_img = img.resize((500, 500)) # x, y new_img.save(buffer, format="PNG") img_b64 = base64.b64encode(buffer.getvalue()) with open(object_name, "wb") as fh: fh.write(base64.standard_b64decode(img_b64)) # base64.standard_b64decode(image_data) with open(object_name, "rb") as fh: bucket.put_object( ACL='public-read', Body=fh, Key=object_name ) def compress_image(image_data, quality): imgdata = base64.b64decode(image_data) img = Image.open(io.BytesIO(imgdata)) buffer = io.BytesIO() img.save(buffer, format='JPEG', quality=quality) compressed_data = buffer.getvalue() img_b64 = base64.b64encode(compressed_data) return img_b64 def upload_object_resize(image_data, bucket_name, object_name): resource_connect = connect() s3_resource = resource_connect bucket = s3_resource.Bucket(bucket_name) # compressed_img = compress_image(image_data, quality=200) # تغییر دهید quality به مقدار دلخواه خود decoded_image_data = base64.b64decode(image_data) image_stream = io.BytesIO(decoded_image_data) bucket.put_object( ACL='public-read', Body=image_stream, Key=object_name ) # with open(object_name, "wb") as fh: # fh.write(base64.standard_b64decode(compressed_img)) # # with open(object_name, "rb") as fh: # bucket.put_object( # ACL='public-read', # Body=fh, # Key=object_name # ) # def upload_object_resize(image_data, bucket_name, object_name): # resource_connect = connect() # s3_resource = resource_connect # bucket = s3_resource.Bucket(bucket_name) # buffer = io.BytesIO() # imgdata = base64.b64decode(image_data) # img = Image.open(io.BytesIO(imgdata)) # img.save(buffer, format="PNG") # img_b64 = base64.b64encode(buffer.getvalue()) # with open(object_name, "wb") as fh: # fh.write(base64.standard_b64decode(img_b64)) # # base64.standard_b64decode(image_data) # with open(object_name, "rb") as fh: # bucket.put_object( # ACL='public-read', # Body=fh, # Key=object_name # ) def upload_object_less_size(image_data, bucket_name, object_name): resource_connect = connect() s3_resource = resource_connect bucket = s3_resource.Bucket(bucket_name) buffer = io.BytesIO() imgdata = base64.b64decode(image_data) img = Image.open(io.BytesIO(imgdata)) # new_img = img.resize((500, 500)) # x, y img.save(buffer, format="PNG") img_b64 = base64.b64encode(buffer.getvalue()) with open(object_name, "wb") as fh: fh.write(base64.standard_b64decode(img_b64)) # base64.standard_b64decode(image_data) with open(object_name, "rb") as fh: bucket.put_object( ACL='public-read', Body=fh, Key=object_name ) def upload_object_for_poultry_science(image_data, bucket_name, object_name): resource_connect = connect() s3_resource = resource_connect bucket = s3_resource.Bucket(bucket_name) buffer = io.BytesIO() imgdata = base64.b64decode(image_data) new_img = Image.open(io.BytesIO(imgdata)) new_img.save(buffer, format="JPEG") img_b64 = base64.b64encode(buffer.getvalue()) with open(object_name, "wb") as fh: fh.write(base64.standard_b64decode(img_b64)) with open(object_name, "rb") as fh: bucket.put_object( ACL='public-read', Body=fh, Key=object_name )