I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.
当前回答
自从2010年这篇文章发布以来,事情发生了很大的变化,我还没有尝试过所有其他的答案,但我尝试了一些,我发现使用python3.6对我来说这是最好的。
在AWS上运行时,我每秒可以获取大约150个独特的域名。
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
其他回答
我发现使用tornado包是最快和最简单的方法来实现这一点:
from tornado import ioloop, httpclient, gen
def main(urls):
"""
Asynchronously download the HTML contents of a list of URLs.
:param urls: A list of URLs to download.
:return: List of response objects, one for each URL.
"""
@gen.coroutine
def fetch_and_handle():
httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
http_client = httpclient.AsyncHTTPClient()
waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
for url in urls])
results = []
# Wait for the jobs to complete
while not waiter.done():
try:
response = yield waiter.next()
except httpclient.HTTPError as e:
print(f'Non-200 HTTP response returned: {e}')
continue
except Exception as e:
print(f'An unexpected error occurred querying: {e}')
continue
else:
print(f'URL \'{response.request.url}\' has status code <{response.code}>')
results.append(response)
return results
loop = ioloop.IOLoop.current()
web_pages = loop.run_sync(fetch_and_handle)
return web_pages
my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])
一个解决方案:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
Testtime:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)
它是对PyCurl示例的稍微修改版本。
(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)
(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)
import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN) # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info
NCONNS = 2 # Number of concurrent GET requests
url = 'example.com'
urls = [url for i in range(0x7*NCONNS)] # Copy the same URL over and over
# Check args
nurls = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')
# Pre-allocate a list of curl objects
m = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
c = pycurl.Curl()
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.CONNECTTIMEOUT, 30)
c.setopt(pycurl.TIMEOUT, 300)
c.setopt(pycurl.NOSIGNAL, 1)
m.handles.append(c)
handles = m.handles # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:
while urls and handles: # If there is an url to process and a free curl object, add to multi stack
url = urls.pop(0)
c = handles.pop()
c.buf = io.BytesIO()
c.url = url # store some info
c.t0 = time.perf_counter()
c.setopt(pycurl.URL, c.url)
c.setopt(pycurl.WRITEDATA, c.buf)
c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
m.add_handle(c)
while 1: # Run the internal curl state machine for the multi stack
ret, num_handles = m.perform()
if ret!=pycurl.E_CALL_MULTI_PERFORM: break
while 1: # Check for curl objects which have terminated, and add them to the handles
nq, ok_list, ko_list = m.info_read()
for c in ok_list:
m.remove_handle(c)
t1 = time.perf_counter()
reply = gzip.decompress(c.buf.getvalue())
print(f'\x1b[33mGET \x1b[32m{t1-c.t0:.3f} \x1b[37m{len(reply):9,} \x1b[0m{reply[:32]}...') # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
handles.append(c)
for c, errno, errmsg in ko_list:
m.remove_handle(c)
print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
handles.append(c)
nprocessed = nprocessed + len(ok_list) + len(ko_list)
if nq==0: break
m.select(1.0) # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.
for c in m.handles:
c.close()
m.close()
创建epoll对象, 打开许多客户端TCP套接字, 调整他们的发送缓冲区比请求头多一点, 发送一个请求头-它应该是即时的,只是放置到缓冲区, 在epoll对象中注册套接字 在epoll obect上做。poll, 从.poll中读取每个套接字的前3个字节, 将它们写入sys。Stdout后面跟着\n(不刷新), 关闭客户端套接字。
限制同时打开的套接字数量-在创建套接字时处理错误。只有当另一个套接字关闭时才创建新的套接字。 调整操作系统限制。 尝试分成几个(不是很多)进程:这可能有助于更有效地使用CPU。
最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程。它们有问题(比如序列化),但足够好了。你需要一个队列和线程池。这里有一个选项,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录