I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.


当前回答

这个扭曲的异步web客户端运行得相当快。

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

其他回答

pip install requests-threads

使用实例使用async/await - send 100个并发请求

from requests_threads import AsyncSession

session = AsyncSession(n=100)

async def _main():
    rs = []
    for _ in range(100):
        rs.append(await session.get('http://httpbin.org/get'))
    print(rs)

if __name__ == '__main__':
    session.run(_main)

此示例仅适用于Python 3。您还可以提供自己的asyncio事件循环!

使用实例Twisted

from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession

session = AsyncSession(n=100)

@inlineCallbacks
def main(reactor):
    responses = []
    for i in range(100):
        responses.append(session.get('http://httpbin.org/get'))

    for response in responses:
        r = yield response
        print(r)

if __name__ == '__main__':
    react(main)

这个例子在Python 2和Python 3上都可以运行。

也许这对我的回购有帮助,一个基本的例子, 用python编写快速异步HTTP请求

我发现使用tornado包是最快和最简单的方法来实现这一点:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])

最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程。它们有问题(比如序列化),但足够好了。你需要一个队列和线程池。这里有一个选项,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。

对于您的情况,线程可能会做的技巧,因为您可能会花费大部分时间等待响应。标准库中有一些有用的模块,如Queue,可能会有所帮助。

我以前做过类似的并行下载文件的事情,对我来说已经足够好了,但它不是你所说的那种规模。

如果您的任务对cpu的限制更大,您可能需要考虑multiprocessing模块,它将允许您利用更多的cpu /内核/线程(更多的进程不会相互阻塞,因为锁定是每个进程)

下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)

它是对PyCurl示例的稍微修改版本。

(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)

(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)

import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info

NCONNS = 2  # Number of concurrent GET requests
url    = 'example.com'
urls   = [url for i in range(0x7*NCONNS)]  # Copy the same URL over and over

# Check args
nurls  = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')

# Pre-allocate a list of curl objects
m         = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
  c = pycurl.Curl()
  c.setopt(pycurl.FOLLOWLOCATION,  1)
  c.setopt(pycurl.MAXREDIRS,       5)
  c.setopt(pycurl.CONNECTTIMEOUT,  30)
  c.setopt(pycurl.TIMEOUT,         300)
  c.setopt(pycurl.NOSIGNAL,        1)
  m.handles.append(c)

handles    = m.handles  # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:

  while urls and handles:  # If there is an url to process and a free curl object, add to multi stack
    url   = urls.pop(0)
    c     = handles.pop()
    c.buf = io.BytesIO()
    c.url = url  # store some info
    c.t0  = time.perf_counter()
    c.setopt(pycurl.URL,        c.url)
    c.setopt(pycurl.WRITEDATA,  c.buf)
    c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
    m.add_handle(c)

  while 1:  # Run the internal curl state machine for the multi stack
    ret, num_handles = m.perform()
    if ret!=pycurl.E_CALL_MULTI_PERFORM:  break

  while 1:  # Check for curl objects which have terminated, and add them to the handles
    nq, ok_list, ko_list = m.info_read()
    for c in ok_list:
      m.remove_handle(c)
      t1 = time.perf_counter()
      reply = gzip.decompress(c.buf.getvalue())
      print(f'\x1b[33mGET  \x1b[32m{t1-c.t0:.3f}  \x1b[37m{len(reply):9,}  \x1b[0m{reply[:32]}...')  # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
      handles.append(c)
    for c, errno, errmsg in ko_list:
      m.remove_handle(c)
      print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
      handles.append(c)
    nprocessed = nprocessed + len(ok_list) + len(ko_list)
    if nq==0: break

  m.select(1.0)  # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.

for c in m.handles:
  c.close()
m.close()