I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.
当前回答
对于您的情况,线程可能会做的技巧,因为您可能会花费大部分时间等待响应。标准库中有一些有用的模块,如Queue,可能会有所帮助。
我以前做过类似的并行下载文件的事情,对我来说已经足够好了,但它不是你所说的那种规模。
如果您的任务对cpu的限制更大,您可能需要考虑multiprocessing模块,它将允许您利用更多的cpu /内核/线程(更多的进程不会相互阻塞,因为锁定是每个进程)
其他回答
一个使用tornado的异步网络库解决方案
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
这段代码使用非阻塞网络I/O,没有任何限制。它可以扩展到数万个打开的连接。它将在单个线程中运行,但比任何线程解决方案都要快。签出非阻塞I/O
(工具)
Apache Bench是您所需要的全部。—用于测量HTTP web服务器性能的命令行计算机程序
给你一篇不错的博客文章:https://www.petefreitag.com/item/689.cfm(来自Pete Freitag)
使用线程池是一个很好的选择,这将使这相当容易。不幸的是,python并没有一个标准库来简化线程池。但这里有一个不错的图书馆,你应该开始: http://www.chrisarndt.de/projects/threadpool/
来自他们网站的代码示例:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
希望这能有所帮助。
一个解决方案:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
Testtime:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)
它是对PyCurl示例的稍微修改版本。
(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)
(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)
import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN) # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info
NCONNS = 2 # Number of concurrent GET requests
url = 'example.com'
urls = [url for i in range(0x7*NCONNS)] # Copy the same URL over and over
# Check args
nurls = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')
# Pre-allocate a list of curl objects
m = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
c = pycurl.Curl()
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.CONNECTTIMEOUT, 30)
c.setopt(pycurl.TIMEOUT, 300)
c.setopt(pycurl.NOSIGNAL, 1)
m.handles.append(c)
handles = m.handles # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:
while urls and handles: # If there is an url to process and a free curl object, add to multi stack
url = urls.pop(0)
c = handles.pop()
c.buf = io.BytesIO()
c.url = url # store some info
c.t0 = time.perf_counter()
c.setopt(pycurl.URL, c.url)
c.setopt(pycurl.WRITEDATA, c.buf)
c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
m.add_handle(c)
while 1: # Run the internal curl state machine for the multi stack
ret, num_handles = m.perform()
if ret!=pycurl.E_CALL_MULTI_PERFORM: break
while 1: # Check for curl objects which have terminated, and add them to the handles
nq, ok_list, ko_list = m.info_read()
for c in ok_list:
m.remove_handle(c)
t1 = time.perf_counter()
reply = gzip.decompress(c.buf.getvalue())
print(f'\x1b[33mGET \x1b[32m{t1-c.t0:.3f} \x1b[37m{len(reply):9,} \x1b[0m{reply[:32]}...') # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
handles.append(c)
for c, errno, errmsg in ko_list:
m.remove_handle(c)
print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
handles.append(c)
nprocessed = nprocessed + len(ok_list) + len(ko_list)
if nq==0: break
m.select(1.0) # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.
for c in m.handles:
c.close()
m.close()
推荐文章
- 为什么这个用于初始化列表列表的代码明显地将列表链接在一起?
- 在Python中,在函数内部导入时会发生什么?
- 用Python列表中的值创建一个.csv文件
- 在Python中哪个更快:x**。5还是math.sqrt(x)?
- 有哪些好的Python ORM解决方案?
- 如何在f字符串中转义括号?
- Python void返回类型注释
- 如何为python模块的argparse部分编写测试?
- 在python中是否有用于均方根误差(RMSE)的库函数?
- 如何从matplotlib (pyplot。Figure vs matplotlib。figure) (frameon=False matplotlib中有问题)
- django test app error -在创建测试数据库时出现错误:创建数据库的权限被拒绝
- 识别使用pip安装的python包的依赖关系
- 从字符串变量导入模块
- 如何删除Python中的前导空白?
- python中的assertEquals和assertEqual