我正在收集网站列表上的统计数据,为了简单起见,我正在使用请求。这是我的代码:
data=[]
websites=['http://google.com', 'http://bbc.co.uk']
for w in websites:
r= requests.get(w, verify=False)
data.append( (r.url, len(r.content), r.elapsed.total_seconds(), str([(l.status_code, l.url) for l in r.history]), str(r.headers.items()), str(r.cookies.items())) )
现在,我想要请求。10秒后进入超时,这样循环就不会卡住。
这个问题以前也很有趣,但没有一个答案是干净的。
我听说可能不使用请求是一个好主意,但我应该如何得到请求提供的好东西(元组中的那些)。
我相信你可以使用多处理,而不依赖于第三方包:
import multiprocessing
import requests
def call_with_timeout(func, args, kwargs, timeout):
manager = multiprocessing.Manager()
return_dict = manager.dict()
# define a wrapper of `return_dict` to store the result.
def function(return_dict):
return_dict['value'] = func(*args, **kwargs)
p = multiprocessing.Process(target=function, args=(return_dict,))
p.start()
# Force a max. `timeout` or wait for the process to finish
p.join(timeout)
# If thread is still active, it didn't finish: raise TimeoutError
if p.is_alive():
p.terminate()
p.join()
raise TimeoutError
else:
return return_dict['value']
call_with_timeout(requests.get, args=(url,), kwargs={'timeout': 10}, timeout=60)
传递给kwargs的超时是从服务器获取任何响应的超时,参数timeout是获取完整响应的超时。
我使用请求2.2.1和eventlet不适合我。相反,我可以使用gevent超时代替,因为gevent在我的服务中用于gunicorn。
import gevent
import gevent.monkey
gevent.monkey.patch_all(subprocess=True)
try:
with gevent.Timeout(5):
ret = requests.get(url)
print ret.status_code, ret.content
except gevent.timeout.Timeout as e:
print "timeout: {}".format(e.message)
请注意geevent .timeout. timeout不会被常规异常处理捕获。
所以要么显式地捕获getevent。timeout。timeout
或者传入一个不同的异常,像这样使用:with gevent。Timeout(5, requests.exceptions.Timeout):尽管在引发此异常时没有传递任何消息。
此代码工作socketError 11004和10060......
# -*- encoding:UTF-8 -*-
__author__ = 'ACE'
import requests
from PyQt4.QtCore import *
from PyQt4.QtGui import *
class TimeOutModel(QThread):
Existed = pyqtSignal(bool)
TimeOut = pyqtSignal()
def __init__(self, fun, timeout=500, parent=None):
"""
@param fun: function or lambda
@param timeout: ms
"""
super(TimeOutModel, self).__init__(parent)
self.fun = fun
self.timeer = QTimer(self)
self.timeer.setInterval(timeout)
self.timeer.timeout.connect(self.time_timeout)
self.Existed.connect(self.timeer.stop)
self.timeer.start()
self.setTerminationEnabled(True)
def time_timeout(self):
self.timeer.stop()
self.TimeOut.emit()
self.quit()
self.terminate()
def run(self):
self.fun()
bb = lambda: requests.get("http://ipv4.download.thinkbroadband.com/1GB.zip")
a = QApplication([])
z = TimeOutModel(bb, 500)
print 'timeout'
a.exec_()