我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。

我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?


当前回答

for _ in range(5):
    try:
        # replace this with something that may fail
        raise ValueError("foo")

    # replace Exception with a more specific exception
    except Exception as e:
        err = e
        continue

    # no exception, continue remainder of code
    else:
        break

# did not break the for loop, therefore all attempts
# raised an exception
else:
    raise err

我的版本与上面的几个类似,但没有使用单独的while循环,如果所有重试都失败,则重新引发最新的异常。可以显式地在顶部设置err = None,但不是严格必要的,因为它只应该在出现错误时执行最后一个else块,因此设置了err。

其他回答

这里有一个与其他解决方案类似的解决方案,但是如果在规定的次数或重试次数内没有成功,它将引发异常。

tries = 3
for i in range(tries):
    try:
        do_the_thing()
    except KeyError as e:
        if i < tries - 1: # i is zero indexed
            continue
        else:
            raise
    break

我使用这个,它可以用于任何函数:

def run_with_retry(func: callable, max_retries: int = 3, wait_seconds: int = 2, **func_params):
num_retries = 1
while True:
    try:
        return func(*func_params.values())
    except Exception as e:
        if num_retries > max_retries:
            print('we have reached maximum errors and raising the exception')
            raise e
        else:
            print(f'{num_retries}/{max_retries}')
            print("Retrying error:", e)
            num_retries += 1
            sleep(wait_seconds)

像这样调用:

    def add(val1, val2):
        return val1 + val2

    run_with_retry(func=add, param1=10, param2=20)

重新尝试的替代方案:坚韧和退缩(2020年更新)

重新尝试库是以前的方法,但遗憾的是,它有一些bug,自2016年以来就没有任何更新。其他的选择似乎是后退和坚韧。在写这篇文章的时候,tenacity有更多的GItHub星(2.3k vs 1.2k),并且最近更新了,因此我选择使用它。这里有一个例子:

from functools import partial
import random # producing random errors for this example

from tenacity import retry, stop_after_delay, wait_fixed, retry_if_exception_type

# Custom error type for this example
class CommunicationError(Exception):
    pass

# Define shorthand decorator for the used settings.
retry_on_communication_error = partial(
    retry,
    stop=stop_after_delay(10),  # max. 10 seconds wait.
    wait=wait_fixed(0.4),  # wait 400ms 
    retry=retry_if_exception_type(CommunicationError),
)()


@retry_on_communication_error
def do_something_unreliable(i):
    if random.randint(1, 5) == 3:
        print('Run#', i, 'Error occured. Retrying.')
        raise CommunicationError()

for i in range(100):
    do_something_unreliable(i)

上面的代码输出如下:

Run# 3 Error occured. Retrying.
Run# 5 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 10 Error occured. Retrying.
.
.
.

坚韧的更多设置。坚韧GitHub页面上列出了重试。

你可以有一个专门的函数使用返回短路结果。比如这样:

def my_function_with_retries(..., max_retries=100):
    for attempt in range(max_retries):
        try:
            return my_function(...)
        except SomeSpecificException as error:
            logging.warning(f"Retrying after failed execution: {error}")

    raise SomeOtherException()

我喜欢laurent-laporte的回答。下面是我的版本,它包装在一个类与静态方法和一些例子。我实现了重试计数作为另一种重试方式。还增加了kwargs。

from typing import List
import time


class Retry:
    @staticmethod
    def onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
                      errors: List = None, **kwargs):
        """

        @param exception: The exception to trigger retry handling with.
        @param callback: The function that will potentially fail with an exception
        @param retries: Optional total number of retries, regardless of timing if this threshold is met, the call will
                        raise the exception.
        @param timeout: Optional total amount of time to do retries after which the call will raise an exception
        @param timedelta: Optional amount of time to sleep in between calls
        @param errors: A list to receive all the exceptions that were caught.
        @param kwargs: An optional key value parameters to pass to the function to retry.
        """
        for retry in Retry.__onerror_retry(exception, callback, retries, timeout, timedelta, errors, **kwargs):
            if retry: retry(**kwargs)  # retry will be None when all retries fail.

    @staticmethod
    def __onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
                        errors: List = None, **kwargs):
        end_time = time.time() + timeout
        continues = 0
        while True:
            try:
                yield callback(**kwargs)
                break
            except exception as ex:
                print(ex)
                if errors:
                    errors.append(ex)

                continues += 1
                if 0 < retries < continues:
                    print('ran out of retries')
                    raise

                if timeout > 0 and time.time() > end_time:
                    print('ran out of time')
                    raise
                elif timedelta > 0:
                    time.sleep(timedelta)


err = 0

#
# sample dumb fail function
def fail_many_times(**kwargs):
    global err
    err += 1
    max_errors = kwargs.pop('max_errors', '') or 1
    if err < max_errors:
        raise ValueError("I made boo boo.")
    print("Successfully did something.")

#
# Example calls
try:
    #
    # retries with a parameter that overrides retries... just because
    Retry.onerror_retry(ValueError, fail_many_times, retries=5, max_errors=3)
    err = 0
    #
    # retries that run out of time, with 1 second sleep between retries.
    Retry.onerror_retry(ValueError, fail_many_times, timeout=5, timedelta=1, max_errors=30)
except Exception as err:
    print(err)