我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。

我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?


当前回答

尝试次数= 3 而尝试: 试一试: ... ... <状态好了> 打破 除了: 尝试- = 1 Else: #已执行的唯一break未被引发 < >失败状态

其他回答

Decorator是一个很好的方法。

from functools import wraps
import time

class retry:
    def __init__(self, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True):
        self.success = success
        self.times = times
        self.raiseexception = raiseexception
        self.echo = echo
        self.delay = delay
    def retry(fun, *args, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True, **kwargs):
        ex = Exception(f"{fun} failed.")
        r = None
        for i in range(times):
            if i > 0:
                time.sleep(delay*2**(i-1))
            try:
                r = fun(*args, **kwargs)
                s = success(r)
            except Exception as e:
                s = False
                ex = e
                # raise e
            if not s:
                continue
            return r
        else:
            if echo:
                print(f"{fun} failed.", "args:", args, kwargs, "\nresult: %s"%r)
            if raiseexception:
                raise ex
    def __call__(self, fun):
        @wraps(fun)
        def wraper(*args, retry=0, **kwargs):
            retry = retry if retry>0 else self.times
            return self.__class__.retry(fun, *args, 
                                        success=self.success, 
                                        times=retry,
                                        delay=self.delay,
                                        raiseexception = self.raiseexception,
                                        echo = self.echo,
                                        **kwargs)
        return wraper

一些用法示例:

@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf1(x=[]):
    x.append(1)
    print(x)
    return len(x)
> rf1()

[1]
[1, 1]
[1, 1, 1]
[1, 1, 1, 1]

4
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf2(l=[], v=1):
    l.append(v)
    print(l)
    assert len(l)>4
    return len(l)
> rf2(v=2, retry=10) #overwite times=4

[2]
[2, 2]
[2, 2, 2]
[2, 2, 2, 2]
[2, 2, 2, 2, 2]

5
> retry.retry(lambda a,b:a+b, 1, 2, times=2)

3
> retry.retry(lambda a,b:a+b, 1, "2", times=2)

TypeError: unsupported operand type(s) for +: 'int' and 'str'

如果你想要一个没有嵌套循环和成功调用break的解决方案,你可以为任何可迭代对象开发一个快速的可检索包装。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:

client = get_client()
smart_loop = retriable(list_of_values):

for value in smart_loop:
    try:
        client.do_something_with(value)
    except ClientAuthExpired:
        client = get_client()
        smart_loop.retry()
        continue
    except NetworkTimeout:
        smart_loop.retry()
        continue

使用递归

for i in range(100):
    def do():
        try:
            ## Network related scripts
        except SpecificException as ex:
            do()
    do() ## invoke do() whenever required inside this loop

我喜欢laurent-laporte的回答。下面是我的版本,它包装在一个类与静态方法和一些例子。我实现了重试计数作为另一种重试方式。还增加了kwargs。

from typing import List
import time


class Retry:
    @staticmethod
    def onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
                      errors: List = None, **kwargs):
        """

        @param exception: The exception to trigger retry handling with.
        @param callback: The function that will potentially fail with an exception
        @param retries: Optional total number of retries, regardless of timing if this threshold is met, the call will
                        raise the exception.
        @param timeout: Optional total amount of time to do retries after which the call will raise an exception
        @param timedelta: Optional amount of time to sleep in between calls
        @param errors: A list to receive all the exceptions that were caught.
        @param kwargs: An optional key value parameters to pass to the function to retry.
        """
        for retry in Retry.__onerror_retry(exception, callback, retries, timeout, timedelta, errors, **kwargs):
            if retry: retry(**kwargs)  # retry will be None when all retries fail.

    @staticmethod
    def __onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
                        errors: List = None, **kwargs):
        end_time = time.time() + timeout
        continues = 0
        while True:
            try:
                yield callback(**kwargs)
                break
            except exception as ex:
                print(ex)
                if errors:
                    errors.append(ex)

                continues += 1
                if 0 < retries < continues:
                    print('ran out of retries')
                    raise

                if timeout > 0 and time.time() > end_time:
                    print('ran out of time')
                    raise
                elif timedelta > 0:
                    time.sleep(timedelta)


err = 0

#
# sample dumb fail function
def fail_many_times(**kwargs):
    global err
    err += 1
    max_errors = kwargs.pop('max_errors', '') or 1
    if err < max_errors:
        raise ValueError("I made boo boo.")
    print("Successfully did something.")

#
# Example calls
try:
    #
    # retries with a parameter that overrides retries... just because
    Retry.onerror_retry(ValueError, fail_many_times, retries=5, max_errors=3)
    err = 0
    #
    # retries that run out of time, with 1 second sleep between retries.
    Retry.onerror_retry(ValueError, fail_many_times, timeout=5, timedelta=1, max_errors=30)
except Exception as err:
    print(err)

你可以使用Python重试包。 重试

它是用Python编写的,目的是简化向几乎任何东西添加重试行为的任务。