I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.


当前回答

Twistedless解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这个方案比twisted方案稍微快一点,并且使用更少的CPU。

其他回答

下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)

它是对PyCurl示例的稍微修改版本。

(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)

(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)

import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info

NCONNS = 2  # Number of concurrent GET requests
url    = 'example.com'
urls   = [url for i in range(0x7*NCONNS)]  # Copy the same URL over and over

# Check args
nurls  = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')

# Pre-allocate a list of curl objects
m         = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
  c = pycurl.Curl()
  c.setopt(pycurl.FOLLOWLOCATION,  1)
  c.setopt(pycurl.MAXREDIRS,       5)
  c.setopt(pycurl.CONNECTTIMEOUT,  30)
  c.setopt(pycurl.TIMEOUT,         300)
  c.setopt(pycurl.NOSIGNAL,        1)
  m.handles.append(c)

handles    = m.handles  # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:

  while urls and handles:  # If there is an url to process and a free curl object, add to multi stack
    url   = urls.pop(0)
    c     = handles.pop()
    c.buf = io.BytesIO()
    c.url = url  # store some info
    c.t0  = time.perf_counter()
    c.setopt(pycurl.URL,        c.url)
    c.setopt(pycurl.WRITEDATA,  c.buf)
    c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
    m.add_handle(c)

  while 1:  # Run the internal curl state machine for the multi stack
    ret, num_handles = m.perform()
    if ret!=pycurl.E_CALL_MULTI_PERFORM:  break

  while 1:  # Check for curl objects which have terminated, and add them to the handles
    nq, ok_list, ko_list = m.info_read()
    for c in ok_list:
      m.remove_handle(c)
      t1 = time.perf_counter()
      reply = gzip.decompress(c.buf.getvalue())
      print(f'\x1b[33mGET  \x1b[32m{t1-c.t0:.3f}  \x1b[37m{len(reply):9,}  \x1b[0m{reply[:32]}...')  # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
      handles.append(c)
    for c, errno, errmsg in ko_list:
      m.remove_handle(c)
      print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
      handles.append(c)
    nprocessed = nprocessed + len(ok_list) + len(ko_list)
    if nq==0: break

  m.select(1.0)  # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.

for c in m.handles:
  c.close()
m.close()

一个解决方案:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

考虑使用风车,虽然风车可能不能做那么多线程。

您可以在5台机器上使用手卷Python脚本,每台机器使用端口40000-60000连接出站,打开100,000个端口连接。

另外,使用一个线程良好的QA应用程序(如OpenSTA)做一个示例测试可能会有所帮助,以了解每个服务器可以处理多少。

另外,试着在LWP::ConnCache类中使用简单的Perl。这样您可能会获得更好的性能(更多的连接)。

最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程。它们有问题(比如序列化),但足够好了。你需要一个队列和线程池。这里有一个选项,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。

创建epoll对象, 打开许多客户端TCP套接字, 调整他们的发送缓冲区比请求头多一点, 发送一个请求头-它应该是即时的,只是放置到缓冲区, 在epoll对象中注册套接字 在epoll obect上做。poll, 从.poll中读取每个套接字的前3个字节, 将它们写入sys。Stdout后面跟着\n(不刷新), 关闭客户端套接字。

限制同时打开的套接字数量-在创建套接字时处理错误。只有当另一个套接字关闭时才创建新的套接字。 调整操作系统限制。 尝试分成几个(不是很多)进程:这可能有助于更有效地使用CPU。