我想知道boto3中是否存在一个键。我可以循环桶内容并检查键是否匹配。
但这似乎太长了,也太过分了。Boto3官方文档明确说明了如何做到这一点。
也许我忽略了最明显的一点。有人能告诉我怎么做吗?
我想知道boto3中是否存在一个键。我可以循环桶内容并检查键是否匹配。
但这似乎太长了,也太过分了。Boto3官方文档明确说明了如何做到这一点。
也许我忽略了最明显的一点。有人能告诉我怎么做吗?
当前回答
不仅是客户端,还有桶:
import boto3
import botocore
bucket = boto3.resource('s3', region_name='eu-west-1').Bucket('my-bucket')
try:
bucket.Object('my-file').get()
except botocore.exceptions.ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
print('NoSuchKey')
其他回答
使用对象。过滤器和检查结果列表是目前为止检查文件是否存在于S3桶中最快的方法。
使用这个简洁的联机程序,当你不得不在一个现有的项目中抛出它而不修改很多代码时,它会减少干扰。
s3_file_exists = lambda filename: bool(list(bucket.objects.filter(Prefix=filename)))
上面的函数假设bucket变量已经声明。
您可以扩展lambda以支持其他参数,例如
s3_file_exists = lambda filename, bucket: bool(list(bucket.objects.filter(Prefix=filename)))
import boto3
client = boto3.client('s3')
s3_key = 'Your file without bucket name e.g. abc/bcd.txt'
bucket = 'your bucket name'
content = client.head_object(Bucket=bucket,Key=s3_key)
if content.get('ResponseMetadata',None) is not None:
print "File exists - s3://%s/%s " %(bucket,s3_key)
else:
print "File does not exist - s3://%s/%s " %(bucket,s3_key)
在Boto3中,如果您正在检查文件夹(前缀)或使用list_objects的文件。您可以使用响应字典中的“Contents”是否存在来检查对象是否存在。这是另一种避免try/except捕获的方法,就像@EvilPuppetMaster建议的那样
import boto3
client = boto3.client('s3')
results = client.list_objects(Bucket='my-bucket', Prefix='dootdoot.jpg')
return 'Contents' in results
您可以使用S3Fs,它本质上是boto3的包装器,它公开了典型的文件系统风格操作:
import s3fs
s3 = s3fs.S3FileSystem()
s3.exists('myfile.txt')
FWIW,这里是我正在使用的非常简单的函数
import boto3
def get_resource(config: dict={}):
"""Loads the s3 resource.
Expects AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to be in the environment
or in a config dictionary.
Looks in the environment first."""
s3 = boto3.resource('s3',
aws_access_key_id=os.environ.get(
"AWS_ACCESS_KEY_ID", config.get("AWS_ACCESS_KEY_ID")),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", config.get("AWS_SECRET_ACCESS_KEY")))
return s3
def get_bucket(s3, s3_uri: str):
"""Get the bucket from the resource.
A thin wrapper, use with caution.
Example usage:
>> bucket = get_bucket(get_resource(), s3_uri_prod)"""
return s3.Bucket(s3_uri)
def isfile_s3(bucket, key: str) -> bool:
"""Returns T/F whether the file exists."""
objs = list(bucket.objects.filter(Prefix=key))
return len(objs) == 1 and objs[0].key == key
def isdir_s3(bucket, key: str) -> bool:
"""Returns T/F whether the directory exists."""
objs = list(bucket.objects.filter(Prefix=key))
return len(objs) > 1