Skip to content
Snippets Groups Projects
ceph_service.py 1006 B
Newer Older
  • Learn to ignore specific revisions
  • import boto3
    import json
    import os
    
    MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://minio:9000")
    AWS_ACCESS_KEY_ID = os.getenv("MINIO_ACCESS_KEY_ID", "minioadmin")
    AWS_SECRET_ACCESS_KEY = os.getenv("MINIO_SECRET_ACCESS_KEY", "minioadmin")
    AWS_SESSION_TOKEN = None
    
    
    def get_s3_client():
        resource = boto3.resource('s3',
                                  endpoint_url=MINIO_ENDPOINT,
                                  aws_access_key_id=AWS_ACCESS_KEY_ID,
                                  aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                  aws_session_token=AWS_SESSION_TOKEN,
                                  config=boto3.session.Config(signature_version='s3v4'),
                                  verify=False
                                  )
        client = resource.meta.client
        return client
    
    
    def list_buckets() -> json:
        s3 = get_s3_client()
        buckets = s3.list_buckets()
        buckets = {key: buckets[key] for key in ["Buckets", "Owner"]}
        return json.dumps(buckets, default=str)