我如何能看到什么是在S3桶与boto3?(例如,写一个“ls”)?

做以下事情:

import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('some/path/')

返回:

s3.Bucket(name='some/path/')

我如何看到它的内容?


当前回答

我的s3键实用函数本质上是@Hephaestus的答案的优化版本:

import boto3


s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')


def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
    prefix = prefix.lstrip(delimiter)
    start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
    for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
        for content in page.get('Contents', ()):
            yield content['Key']

在我的测试(boto3 1.9.84)中,它比等效的(但更简单)代码要快得多:

import boto3


def keys(bucket_name, prefix='/', delimiter='/'):
    prefix = prefix.lstrip(delimiter)
    bucket = boto3.resource('s3').Bucket(bucket_name)
    return (_.key for _ in bucket.objects.filter(Prefix=prefix))

由于S3保证UTF-8二进制排序结果,因此在第一个函数中添加了start_after优化。

其他回答

从lambda函数运行aws cli命令也是一个不错的选择

import subprocess
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def run_command(command):
    command_list = command.split(' ')

    try:
        logger.info("Running shell command: \"{}\"".format(command))
        result = subprocess.run(command_list, stdout=subprocess.PIPE);
        logger.info("Command output:\n---\n{}\n---".format(result.stdout.decode('UTF-8')))
    except Exception as e:
        logger.error("Exception: {}".format(e))
        return False

    return True

def lambda_handler(event, context):
    run_command('/opt/aws s3 ls s3://bucket-name')

一种更节俭的方法,而不是通过一个for循环来迭代,你也可以只打印原始对象,其中包含S3桶中的所有文件:

session = Session(aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
s3 = session.resource('s3')
bucket = s3.Bucket('bucket_name')

files_in_s3 = bucket.objects.all() 
#you can print this iterable with print(list(files_in_s3))

我的s3键实用函数本质上是@Hephaestus的答案的优化版本:

import boto3


s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')


def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
    prefix = prefix.lstrip(delimiter)
    start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
    for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
        for content in page.get('Contents', ()):
            yield content['Key']

在我的测试(boto3 1.9.84)中,它比等效的(但更简单)代码要快得多:

import boto3


def keys(bucket_name, prefix='/', delimiter='/'):
    prefix = prefix.lstrip(delimiter)
    bucket = boto3.resource('s3').Bucket(bucket_name)
    return (_.key for _ in bucket.objects.filter(Prefix=prefix))

由于S3保证UTF-8二进制排序结果,因此在第一个函数中添加了start_after优化。

查看内容的一种方法是:

for my_bucket_object in my_bucket.objects.all():
    print(my_bucket_object)

为了处理大型键列表(即当目录列表大于1000项时),我使用以下代码将多个列表中的键值(即文件名)累积起来(感谢上面的阿梅里奥的第一行)。代码是针对python3的:

    from boto3  import client
    bucket_name = "my_bucket"
    prefix      = "my_key/sub_key/lots_o_files"

    s3_conn   = client('s3')  # type: BaseClient  ## again assumes boto.cfg setup, assume AWS S3
    s3_result =  s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter = "/")

    if 'Contents' not in s3_result:
        #print(s3_result)
        return []

    file_list = []
    for key in s3_result['Contents']:
        file_list.append(key['Key'])
    print(f"List count = {len(file_list)}")

    while s3_result['IsTruncated']:
        continuation_key = s3_result['NextContinuationToken']
        s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter="/", ContinuationToken=continuation_key)
        for key in s3_result['Contents']:
            file_list.append(key['Key'])
        print(f"List count = {len(file_list)}")
    return file_list