我正在收集网站列表上的统计数据,为了简单起见,我正在使用请求。这是我的代码:

data=[]
websites=['http://google.com', 'http://bbc.co.uk']
for w in websites:
    r= requests.get(w, verify=False)
    data.append( (r.url, len(r.content), r.elapsed.total_seconds(), str([(l.status_code, l.url) for l in r.history]), str(r.headers.items()), str(r.cookies.items())) )
 

现在,我想要请求。10秒后进入超时,这样循环就不会卡住。

这个问题以前也很有趣,但没有一个答案是干净的。

我听说可能不使用请求是一个好主意,但我应该如何得到请求提供的好东西(元组中的那些)。


当前回答

Timeout =(连接超时,数据读取超时)或给出单个参数(Timeout =1)

import requests

try:
    req = requests.request('GET', 'https://www.google.com',timeout=(1,1))
    print(req)
except requests.ReadTimeout:
    print("READ TIME OUT")

其他回答

要创建超时,您可以使用信号。

解决这个案子最好的办法可能是

设置一个异常作为告警信号的处理程序 延迟十秒发出警报信号 在try-except-finally块中调用函数。 如果函数超时,则到达except块。 在finally块中,你中止了警报,所以它不会在以后发出信号。

下面是一些示例代码:

import signal
from time import sleep

class TimeoutException(Exception):
    """ Simple Exception to be called on timeouts. """
    pass

def _timeout(signum, frame):
    """ Raise an TimeoutException.

    This is intended for use as a signal handler.
    The signum and frame arguments passed to this are ignored.

    """
    # Raise TimeoutException with system default timeout message
    raise TimeoutException()

# Set the handler for the SIGALRM signal:
signal.signal(signal.SIGALRM, _timeout)
# Send the SIGALRM signal in 10 seconds:
signal.alarm(10)

try:    
    # Do our code:
    print('This will take 11 seconds...')
    sleep(11)
    print('done!')
except TimeoutException:
    print('It timed out!')
finally:
    # Abort the sending of the SIGALRM signal:
    signal.alarm(0)

这里有一些注意事项:

它不是线程安全的,信号总是传递到主线程,所以你不能把它放在任何其他线程中。 在调度信号和执行实际代码之后会有一个轻微的延迟。这意味着示例即使只休眠了10秒也会超时。

但是,这些都在标准python库中!除了sleep函数导入,它只是一个导入。如果你要在很多地方使用超时,你可以很容易地把TimeoutException, _timeout和singaling放在一个函数中,然后调用它。或者你可以创建一个装饰器,并把它放在函数上,请看下面链接的答案。

你也可以将它设置为“上下文管理器”,这样你就可以在with语句中使用它:

import signal
class Timeout():
    """ Timeout for use with the `with` statement. """

    class TimeoutException(Exception):
        """ Simple Exception to be called on timeouts. """
        pass

    def _timeout(signum, frame):
        """ Raise an TimeoutException.

        This is intended for use as a signal handler.
        The signum and frame arguments passed to this are ignored.

        """
        raise Timeout.TimeoutException()

    def __init__(self, timeout=10):
        self.timeout = timeout
        signal.signal(signal.SIGALRM, Timeout._timeout)

    def __enter__(self):
        signal.alarm(self.timeout)

    def __exit__(self, exc_type, exc_value, traceback):
        signal.alarm(0)
        return exc_type is Timeout.TimeoutException

# Demonstration:
from time import sleep

print('This is going to take maximum 10 seconds...')
with Timeout(10):
    sleep(15)
    print('No timeout?')
print('Done')

这种上下文管理器方法的一个可能的缺点是,您无法知道代码是否实际超时。

资料来源及推荐阅读:

关于信号的文档 这是@David Narayan对暂停的回答。他以装饰者的身份组织了上面的代码。

只是另一个解决方案(从http://docs.python-requests.org/en/master/user/advanced/#streaming-uploads获得)

在上传之前,你可以找出内容大小:

TOO_LONG = 10*1024*1024  # 10 Mb
big_url = "http://ipv4.download.thinkbroadband.com/1GB.zip"
r = requests.get(big_url, stream=True)
print (r.headers['content-length'])
# 1073741824  

if int(r.headers['content-length']) < TOO_LONG:
    # upload content:
    content = r.content

但是要小心,发送方可以在“content-length”响应字段中设置不正确的值。

设置stream=True并使用r.iter_content(1024)。是的,eventlet。我就是不喜欢超时。

try:
    start = time()
    timeout = 5
    with get(config['source']['online'], stream=True, timeout=timeout) as r:
        r.raise_for_status()
        content = bytes()
        content_gen = r.iter_content(1024)
        while True:
            if time()-start > timeout:
                raise TimeoutError('Time out! ({} seconds)'.format(timeout))
            try:
                content += next(content_gen)
            except StopIteration:
                break
        data = content.decode().split('\n')
        if len(data) in [0, 1]:
            raise ValueError('Bad requests data')
except (exceptions.RequestException, ValueError, IndexError, KeyboardInterrupt,
        TimeoutError) as e:
    print(e)
    with open(config['source']['local']) as f:
        data = [line.strip() for line in f.readlines()]

讨论在这里https://redd.it/80kp1h

这可能有点过分,但是芹菜分布式任务队列对超时有很好的支持。

特别是,您可以定义一个软时间限制,它只在您的流程中引发一个异常(这样您就可以清理)和/或一个硬时间限制,它在超过时间限制时终止任务。

在封面之下,这使用了与你的“之前”帖子中引用的相同的信号方法,但以一种更可用和更易于管理的方式。如果你监控的网站列表很长,你可能会从它的主要功能中受益——各种各样的方法来管理大量任务的执行。

如果遇到这种情况,创建一个看门狗线程,在10秒后搞乱请求的内部状态,例如:

关闭底层套接字,理想情况下 如果请求重试该操作,则触发异常

请注意,根据系统库的不同,您可能无法设置DNS解析的截止日期。