我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。

我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?


在for循环中执行while True,将try代码放入其中,只有当代码成功时才退出while循环。

for i in range(0,100):
    while True:
        try:
            # do stuff
        except SomeSpecificException:
            continue
        break

最清晰的方法是显式地设置i。例如:

i = 0
while i < 100:
    i += 1
    try:
        # do stuff

    except MyException:
        continue

只有当try子句成功时才增加循环变量


不使用那些丑陋的while循环的更“功能性”的方法:

def tryAgain(retries=0):
    if retries > 10: return
    try:
        # Do stuff
    except:
        retries+=1
        tryAgain(retries)

tryAgain()

在Python装饰器库中也有类似的东西。

请记住,它不测试异常,而是测试返回值。它会重新尝试,直到被修饰的函数返回True。

稍微修改一下版本就可以了。


我倾向于限制重试次数,这样如果某个特定项目出现问题,你就可以继续进行下一个项目,如下:

for i in range(100):
  for attempt in range(10):
    try:
      # do thing
    except:
      # perhaps reconnect, etc.
    else:
      break
  else:
    # we failed all the attempts - deal with the consequences.

更新2021-12-01:

自2016年6月起,不再维护重试包。 考虑使用活动的fork github.com/jd/tenacity,或者github.com/litl/backoff。


重试包是在失败时重试代码块的好方法。

例如:

@retry(wait_random_min=1000, wait_random_max=2000)
def wait_random_1_to_2_s():
    print("Randomly wait 1 to 2 seconds between retries")

带超时的通用解决方案:

import time

def onerror_retry(exception, callback, timeout=2, timedelta=.1):
    end_time = time.time() + timeout
    while True:
        try:
            yield callback()
            break
        except exception:
            if time.time() > end_time:
                raise
            elif timedelta > 0:
                time.sleep(timedelta)

用法:

for retry in onerror_retry(SomeSpecificException, do_stuff):
    retry()

以下是我关于如何解决这个问题的想法:

j = 19
def calc(y):
    global j
    try:
        j = j + 8 - y
        x = int(y/j)   # this will eventually raise DIV/0 when j=0
        print("i = ", str(y), " j = ", str(j), " x = ", str(x))
    except:
        j = j + 1   # when the exception happens, increment "j" and retry
        calc(y)
for i in range(50):
    calc(i)

这里有一个与其他解决方案类似的解决方案,但是如果在规定的次数或重试次数内没有成功,它将引发异常。

tries = 3
for i in range(tries):
    try:
        do_the_thing()
    except KeyError as e:
        if i < tries - 1: # i is zero indexed
            continue
        else:
            raise
    break

使用while和计数器:

count = 1
while count <= 3:  # try 3 times
    try:
        # do_the_logic()
        break
    except SomeSpecificException as e:
        # If trying 3rd time and still error?? 
        # Just throw the error- we don't have anything to hide :)
        if count == 3:
            raise
        count += 1

使用递归

for i in range(100):
    def do():
        try:
            ## Network related scripts
        except SpecificException as ex:
            do()
    do() ## invoke do() whenever required inside this loop

你可以使用Python重试包。 重试

它是用Python编写的,目的是简化向几乎任何东西添加重试行为的任务。


如果你想要一个没有嵌套循环和成功调用break的解决方案,你可以为任何可迭代对象开发一个快速的可检索包装。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:

client = get_client()
smart_loop = retriable(list_of_values):

for value in smart_loop:
    try:
        client.do_something_with(value)
    except ClientAuthExpired:
        client = get_client()
        smart_loop.retry()
        continue
    except NetworkTimeout:
        smart_loop.retry()
        continue

我在我的代码中使用following,

   for i in range(0, 10):
    try:
        #things I need to do
    except ValueError:
        print("Try #{} failed with ValueError: Sleeping for 2 secs before next try:".format(i))
        time.sleep(2)
        continue
    break

for _ in range(5):
    try:
        # replace this with something that may fail
        raise ValueError("foo")

    # replace Exception with a more specific exception
    except Exception as e:
        err = e
        continue

    # no exception, continue remainder of code
    else:
        break

# did not break the for loop, therefore all attempts
# raised an exception
else:
    raise err

我的版本与上面的几个类似,但没有使用单独的while循环,如果所有重试都失败,则重新引发最新的异常。可以显式地在顶部设置err = None,但不是严格必要的,因为它只应该在出现错误时执行最后一个else块,因此设置了err。


我最近用我的python解决了这个问题,我很高兴与stackoverflow的访问者分享,如果需要请给予反馈。

print("\nmonthly salary per day and year converter".title())
print('==' * 25)


def income_counter(day, salary, month):
    global result2, result, is_ready, result3
    result = salary / month
    result2 = result * day
    result3 = salary * 12
    is_ready = True
    return result, result2, result3, is_ready


i = 0
for i in range(5):
    try:
        month = int(input("\ntotal days of the current month: "))
        salary = int(input("total salary per month: "))
        day = int(input("Total Days to calculate> "))
        income_counter(day=day, salary=salary, month=month)
        if is_ready:
            print(f'Your Salary per one day is: {round(result)}')
            print(f'your income in {day} days will be: {round(result2)}')
            print(f'your total income in one year will be: {round(result3)}')
            break
        else:
            continue
    except ZeroDivisionError:
        is_ready = False
        i += 1
        print("a month does'nt have 0 days, please try again")
        print(f'total chances left: {5 - i}')
    except ValueError:
        is_ready = False
        i += 1
        print("Invalid value, please type a number")
        print(f'total chances left: {5 - i}')

尝试次数= 3 而尝试: 试一试: ... ... <状态好了> 打破 除了: 尝试- = 1 Else: #已执行的唯一break未被引发 < >失败状态


以下是我对这个问题的看法。下面的重试功能支持以下特性:

当调用成功时返回被调用函数的值 如果尝试失败,则引发被调用函数的异常 尝试次数限制(0表示无限) 在尝试之间等待(线性或指数) 仅当异常是特定异常类型的实例时重试。 可选的尝试记录

import time

def retry(func, ex_type=Exception, limit=0, wait_ms=100, wait_increase_ratio=2, logger=None):
    attempt = 1
    while True:
        try:
            return func()
        except Exception as ex:
            if not isinstance(ex, ex_type):
                raise ex
            if 0 < limit <= attempt:
                if logger:
                    logger.warning("no more attempts")
                raise ex

            if logger:
                logger.error("failed execution attempt #%d", attempt, exc_info=ex)

            attempt += 1
            if logger:
                logger.info("waiting %d ms before attempt #%d", wait_ms, attempt)
            time.sleep(wait_ms / 1000)
            wait_ms *= wait_increase_ratio

用法:

def fail_randomly():
    y = random.randint(0, 10)
    if y < 10:
        y = 0
    return x / y


logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

logger.info("starting")
result = retry.retry(fail_randomly, ex_type=ZeroDivisionError, limit=20, logger=logger)
logger.info("result is: %s", result)

更多信息请看我的帖子。


重新尝试的替代方案:坚韧和退缩(2020年更新)

重新尝试库是以前的方法,但遗憾的是,它有一些bug,自2016年以来就没有任何更新。其他的选择似乎是后退和坚韧。在写这篇文章的时候,tenacity有更多的GItHub星(2.3k vs 1.2k),并且最近更新了,因此我选择使用它。这里有一个例子:

from functools import partial
import random # producing random errors for this example

from tenacity import retry, stop_after_delay, wait_fixed, retry_if_exception_type

# Custom error type for this example
class CommunicationError(Exception):
    pass

# Define shorthand decorator for the used settings.
retry_on_communication_error = partial(
    retry,
    stop=stop_after_delay(10),  # max. 10 seconds wait.
    wait=wait_fixed(0.4),  # wait 400ms 
    retry=retry_if_exception_type(CommunicationError),
)()


@retry_on_communication_error
def do_something_unreliable(i):
    if random.randint(1, 5) == 3:
        print('Run#', i, 'Error occured. Retrying.')
        raise CommunicationError()

for i in range(100):
    do_something_unreliable(i)

上面的代码输出如下:

Run# 3 Error occured. Retrying.
Run# 5 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 10 Error occured. Retrying.
.
.
.

坚韧的更多设置。坚韧GitHub页面上列出了重试。


Decorator是一个很好的方法。

from functools import wraps
import time

class retry:
    def __init__(self, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True):
        self.success = success
        self.times = times
        self.raiseexception = raiseexception
        self.echo = echo
        self.delay = delay
    def retry(fun, *args, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True, **kwargs):
        ex = Exception(f"{fun} failed.")
        r = None
        for i in range(times):
            if i > 0:
                time.sleep(delay*2**(i-1))
            try:
                r = fun(*args, **kwargs)
                s = success(r)
            except Exception as e:
                s = False
                ex = e
                # raise e
            if not s:
                continue
            return r
        else:
            if echo:
                print(f"{fun} failed.", "args:", args, kwargs, "\nresult: %s"%r)
            if raiseexception:
                raise ex
    def __call__(self, fun):
        @wraps(fun)
        def wraper(*args, retry=0, **kwargs):
            retry = retry if retry>0 else self.times
            return self.__class__.retry(fun, *args, 
                                        success=self.success, 
                                        times=retry,
                                        delay=self.delay,
                                        raiseexception = self.raiseexception,
                                        echo = self.echo,
                                        **kwargs)
        return wraper

一些用法示例:

@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf1(x=[]):
    x.append(1)
    print(x)
    return len(x)
> rf1()

[1]
[1, 1]
[1, 1, 1]
[1, 1, 1, 1]

4
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf2(l=[], v=1):
    l.append(v)
    print(l)
    assert len(l)>4
    return len(l)
> rf2(v=2, retry=10) #overwite times=4

[2]
[2, 2]
[2, 2, 2]
[2, 2, 2, 2]
[2, 2, 2, 2, 2]

5
> retry.retry(lambda a,b:a+b, 1, 2, times=2)

3
> retry.retry(lambda a,b:a+b, 1, "2", times=2)

TypeError: unsupported operand type(s) for +: 'int' and 'str'

我喜欢使用bool值,如下所示:

success = False
num_try = 0
while success is False:
    if num_try >= 10: # or any number
        # handle error how  you please
    try:
        # code
        success = True
    except Exception as e:
        # record or do something with exception if needed
        num_try += 1

使用这个装饰器,您可以轻松地控制错误

class catch:
    def __init__(self, max=1, callback=None):
        self.max = max 
        self.callback = callback 
    
    def set_max(self, max):
        self.max = max
    
    def handler(self, *args, **kwargs):
        self.index = 0
        while self.index < self.max: 
            self.index += 1
            try:
                self.func(self, *args, **kwargs)
        
            except Exception as error:
                if callable(self.callback):
                    self.callback(self, error, args, kwargs)
                
    def __call__(self, func):
        self.func = func
        return self.handler

import time
def callback(cls, error, args, kwargs):
    print('func args', args, 'func kwargs', kwargs)
    print('error', repr(error), 'trying', cls.index)
    if cls.index == 2:
        cls.set_max(4)
    
    else:
        time.sleep(1)
    
    
@catch(max=2, callback=callback)  
def test(cls, ok, **kwargs):
    raise ValueError('ok')

test(1, message='hello')

我使用这个,它可以用于任何函数:

def run_with_retry(func: callable, max_retries: int = 3, wait_seconds: int = 2, **func_params):
num_retries = 1
while True:
    try:
        return func(*func_params.values())
    except Exception as e:
        if num_retries > max_retries:
            print('we have reached maximum errors and raising the exception')
            raise e
        else:
            print(f'{num_retries}/{max_retries}')
            print("Retrying error:", e)
            num_retries += 1
            sleep(wait_seconds)

像这样调用:

    def add(val1, val2):
        return val1 + val2

    run_with_retry(func=add, param1=10, param2=20)

如果您正在寻找的是重新尝试x次失败的尝试,那么单个for else循环可能就是您想要的。考虑这个例子,尝试了3次:

attempts = 3

for attempt in range(1, attempts+1):
    try:
        if attempt < 4:
            raise TypeError(f"Error raised on attempt: {attempt}")
        else:
            print(f'Attempt {attempt} finally worked.')
    except (TypeError) as error:
        print(f'Attempt {attempt} hit the exception.')
        continue
    else:
        break
else:
    print(f'Exit after final attempt: {attempt}')

print(f'\nGo on to execute other code ...')

给出输出:

Attempt 1 hit the exception.
Attempt 2 hit the exception.
Attempt 3 hit the exception.
Exit after final attempt: 3

Go on to execute other code ...

再试一次它就成功了

attempts = 4

给出输出:

Attempt 1 hit the exception.
Attempt 2 hit the exception.
Attempt 3 hit the exception.
Attempt 4 finally worked.

Go on to execute other code ...

你可以有一个专门的函数使用返回短路结果。比如这样:

def my_function_with_retries(..., max_retries=100):
    for attempt in range(max_retries):
        try:
            return my_function(...)
        except SomeSpecificException as error:
            logging.warning(f"Retrying after failed execution: {error}")

    raise SomeOtherException()

这里有一个快速装饰器来处理这个问题。7行,没有依赖关系。

def retry(exception=Exception, retries=3, delay=0):
    def wrap(func):
        for i in range(retries):
            try:
                return func()
            except exception as e:
                print(f'Retrying {func.__name__}: {i}/{retries}')
                time.sleep(delay)
        raise e
    return wrap

@retry()
def do_something():
  ...
@retry(HTTPError, retries=100, delay=3)
def download_something():
  ...

可以添加的一个功能是扩展异常以处理多个异常(splat一个列表)。


我喜欢laurent-laporte的回答。下面是我的版本,它包装在一个类与静态方法和一些例子。我实现了重试计数作为另一种重试方式。还增加了kwargs。

from typing import List
import time


class Retry:
    @staticmethod
    def onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
                      errors: List = None, **kwargs):
        """

        @param exception: The exception to trigger retry handling with.
        @param callback: The function that will potentially fail with an exception
        @param retries: Optional total number of retries, regardless of timing if this threshold is met, the call will
                        raise the exception.
        @param timeout: Optional total amount of time to do retries after which the call will raise an exception
        @param timedelta: Optional amount of time to sleep in between calls
        @param errors: A list to receive all the exceptions that were caught.
        @param kwargs: An optional key value parameters to pass to the function to retry.
        """
        for retry in Retry.__onerror_retry(exception, callback, retries, timeout, timedelta, errors, **kwargs):
            if retry: retry(**kwargs)  # retry will be None when all retries fail.

    @staticmethod
    def __onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
                        errors: List = None, **kwargs):
        end_time = time.time() + timeout
        continues = 0
        while True:
            try:
                yield callback(**kwargs)
                break
            except exception as ex:
                print(ex)
                if errors:
                    errors.append(ex)

                continues += 1
                if 0 < retries < continues:
                    print('ran out of retries')
                    raise

                if timeout > 0 and time.time() > end_time:
                    print('ran out of time')
                    raise
                elif timedelta > 0:
                    time.sleep(timedelta)


err = 0

#
# sample dumb fail function
def fail_many_times(**kwargs):
    global err
    err += 1
    max_errors = kwargs.pop('max_errors', '') or 1
    if err < max_errors:
        raise ValueError("I made boo boo.")
    print("Successfully did something.")

#
# Example calls
try:
    #
    # retries with a parameter that overrides retries... just because
    Retry.onerror_retry(ValueError, fail_many_times, retries=5, max_errors=3)
    err = 0
    #
    # retries that run out of time, with 1 second sleep between retries.
    Retry.onerror_retry(ValueError, fail_many_times, timeout=5, timedelta=1, max_errors=30)
except Exception as err:
    print(err)