我如何能看到什么是在S3桶与boto3?(例如,写一个“ls”)?
做以下事情:
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('some/path/')
返回:
s3.Bucket(name='some/path/')
我如何看到它的内容?
我如何能看到什么是在S3桶与boto3?(例如,写一个“ls”)?
做以下事情:
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('some/path/')
返回:
s3.Bucket(name='some/path/')
我如何看到它的内容?
当前回答
我的s3键实用函数本质上是@Hephaestus的答案的优化版本:
import boto3
s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')
def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
prefix = prefix.lstrip(delimiter)
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
yield content['Key']
在我的测试(boto3 1.9.84)中,它比等效的(但更简单)代码要快得多:
import boto3
def keys(bucket_name, prefix='/', delimiter='/'):
prefix = prefix.lstrip(delimiter)
bucket = boto3.resource('s3').Bucket(bucket_name)
return (_.key for _ in bucket.objects.filter(Prefix=prefix))
由于S3保证UTF-8二进制排序结果,因此在第一个函数中添加了start_after优化。
其他回答
从lambda函数运行aws cli命令也是一个不错的选择
import subprocess
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def run_command(command):
command_list = command.split(' ')
try:
logger.info("Running shell command: \"{}\"".format(command))
result = subprocess.run(command_list, stdout=subprocess.PIPE);
logger.info("Command output:\n---\n{}\n---".format(result.stdout.decode('UTF-8')))
except Exception as e:
logger.error("Exception: {}".format(e))
return False
return True
def lambda_handler(event, context):
run_command('/opt/aws s3 ls s3://bucket-name')
一种更节俭的方法,而不是通过一个for循环来迭代,你也可以只打印原始对象,其中包含S3桶中的所有文件:
session = Session(aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
s3 = session.resource('s3')
bucket = s3.Bucket('bucket_name')
files_in_s3 = bucket.objects.all()
#you can print this iterable with print(list(files_in_s3))
我的s3键实用函数本质上是@Hephaestus的答案的优化版本:
import boto3
s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')
def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
prefix = prefix.lstrip(delimiter)
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
yield content['Key']
在我的测试(boto3 1.9.84)中,它比等效的(但更简单)代码要快得多:
import boto3
def keys(bucket_name, prefix='/', delimiter='/'):
prefix = prefix.lstrip(delimiter)
bucket = boto3.resource('s3').Bucket(bucket_name)
return (_.key for _ in bucket.objects.filter(Prefix=prefix))
由于S3保证UTF-8二进制排序结果,因此在第一个函数中添加了start_after优化。
查看内容的一种方法是:
for my_bucket_object in my_bucket.objects.all():
print(my_bucket_object)
为了处理大型键列表(即当目录列表大于1000项时),我使用以下代码将多个列表中的键值(即文件名)累积起来(感谢上面的阿梅里奥的第一行)。代码是针对python3的:
from boto3 import client
bucket_name = "my_bucket"
prefix = "my_key/sub_key/lots_o_files"
s3_conn = client('s3') # type: BaseClient ## again assumes boto.cfg setup, assume AWS S3
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter = "/")
if 'Contents' not in s3_result:
#print(s3_result)
return []
file_list = []
for key in s3_result['Contents']:
file_list.append(key['Key'])
print(f"List count = {len(file_list)}")
while s3_result['IsTruncated']:
continuation_key = s3_result['NextContinuationToken']
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter="/", ContinuationToken=continuation_key)
for key in s3_result['Contents']:
file_list.append(key['Key'])
print(f"List count = {len(file_list)}")
return file_list