I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.


最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程。它们有问题(比如序列化),但足够好了。你需要一个队列和线程池。这里有一个选项,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。


使用线程池是一个很好的选择,这将使这相当容易。不幸的是,python并没有一个标准库来简化线程池。但这里有一个不错的图书馆,你应该开始: http://www.chrisarndt.de/projects/threadpool/

来自他们网站的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这能有所帮助。


对于您的情况,线程可能会做的技巧,因为您可能会花费大部分时间等待响应。标准库中有一些有用的模块,如Queue,可能会有所帮助。

我以前做过类似的并行下载文件的事情,对我来说已经足够好了,但它不是你所说的那种规模。

如果您的任务对cpu的限制更大,您可能需要考虑multiprocessing模块,它将允许您利用更多的cpu /内核/线程(更多的进程不会相互阻塞,因为锁定是每个进程)


解决这个问题的一个好方法是首先编写获得一个结果所需的代码,然后合并线程代码来并行化应用程序。

In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).

您可以遵循以下设计模式来解决上述问题:

Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished. Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a list or dict in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.

我建议您使用threading模块。您可以使用它来启动和跟踪正在运行的线程。Python的线程支持是完全的,但是对问题的描述表明它完全满足了您的需求。

最后,如果您希望看到用Python编写的并行网络应用程序的相当简单的应用程序,请查看ssh.py。它是一个小型库,使用Python线程并行处理许多SSH连接。该设计非常接近您的需求,您可能会发现它是一个很好的资源。


如果您希望获得尽可能好的性能,您可能会考虑使用异步I/O而不是线程。与成千上万个操作系统线程相关的开销是不小的,Python解释器内的上下文切换甚至增加了更多的开销。线程当然可以完成工作,但我怀疑异步路由将提供更好的整体性能。

具体来说,我建议使用Twisted库中的异步web客户端(http://www.twistedmatrix.com)。它有一个公认的陡峭的学习曲线,但一旦你很好地掌握了Twisted的异步编程风格,它就很容易使用。

Twisted的异步web客户端API的HowTo可以在以下地址找到:

http://twistedmatrix.com/documents/current/web/howto/client.html


线程绝对不是这里的答案。它们将提供进程和内核瓶颈,以及吞吐量限制,如果总体目标是“最快的方式”,这些限制是不可接受的。

稍微扭曲一点,它的异步HTTP客户端会给你更好的结果。


考虑使用风车,虽然风车可能不能做那么多线程。

您可以在5台机器上使用手卷Python脚本,每台机器使用端口40000-60000连接出站,打开100,000个端口连接。

另外,使用一个线程良好的QA应用程序(如OpenSTA)做一个示例测试可能会有所帮助,以了解每个服务器可以处理多少。

另外,试着在LWP::ConnCache类中使用简单的Perl。这样您可能会获得更好的性能(更多的连接)。


一个解决方案:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

Twistedless解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这个方案比twisted方案稍微快一点,并且使用更少的CPU。


使用grequests,它是requests + Gevent模块的组合。

GRequests允许您使用带有Gevent的Requests来轻松地生成异步HTTP请求。

用法很简单:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)

同时发送:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

这个扭曲的异步web客户端运行得相当快。

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

一个使用tornado的异步网络库解决方案

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

这段代码使用非阻塞网络I/O,没有任何限制。它可以扩展到数万个打开的连接。它将在单个线程中运行,但比任何线程解决方案都要快。签出非阻塞I/O


自从2010年这篇文章发布以来,事情发生了很大的变化,我还没有尝试过所有其他的答案,但我尝试了一些,我发现使用python3.6对我来说这是最好的。

在AWS上运行时,我每秒可以获取大约150个独特的域名。

import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')

创建epoll对象, 打开许多客户端TCP套接字, 调整他们的发送缓冲区比请求头多一点, 发送一个请求头-它应该是即时的,只是放置到缓冲区, 在epoll对象中注册套接字 在epoll obect上做。poll, 从.poll中读取每个套接字的前3个字节, 将它们写入sys。Stdout后面跟着\n(不刷新), 关闭客户端套接字。

限制同时打开的套接字数量-在创建套接字时处理错误。只有当另一个套接字关闭时才创建新的套接字。 调整操作系统限制。 尝试分成几个(不是很多)进程:这可能有助于更有效地使用CPU。


我知道这是一个老问题,但在Python 3.7中,您可以使用asyncio和aiohttp来做到这一点。

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

你可以阅读更多关于它的内容,并在这里看到一个例子。


我发现使用tornado包是最快和最简单的方法来实现这一点:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])

(工具)

Apache Bench是您所需要的全部。—用于测量HTTP web服务器性能的命令行计算机程序

给你一篇不错的博客文章:https://www.petefreitag.com/item/689.cfm(来自Pete Freitag)


(下一个项目的自我提示)

Python 3解决方案只使用请求。它是最简单且快速的,不需要多处理或复杂的异步库。

最重要的方面是重用连接,特别是对于HTTPS (TLS需要额外的往返才能打开)。注意,连接是特定于子域的。如果在多个域上抓取多个页面,则可以对url列表进行排序,以最大化连接重用(它有效地按域进行排序)。

当给定足够的线程时,它将与任何异步代码一样快。(请求在等待响应时释放python GIL)。

[带有日志记录和错误处理的生产等级代码]

import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# source: https://stackoverflow.com/a/68583332/5994461

THREAD_POOL = 16

# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
    'https://',
    requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
                                  max_retries=3,
                                  pool_block=True)
)

def get(url):
    response = session.get(url)
    logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
    if response.status_code != 200:
        logging.error("request failed, error code %s [%s]", response.status_code, response.url)
    if 500 <= response.status_code < 600:
        # server is overloaded? give it a break
        time.sleep(5)
    return response

def download(urls):
    with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
        # wrap in a list() to wait for all requests to complete
        for response in list(executor.map(get, urls)):
            if response.status_code == 200:
                print(response.content)

def main():
    logging.basicConfig(
        format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/404",
        "https://httpstat.us/503"
    ]

    download(urls)

if __name__ == "__main__":
    main()

pip install requests-threads

使用实例使用async/await - send 100个并发请求

from requests_threads import AsyncSession

session = AsyncSession(n=100)

async def _main():
    rs = []
    for _ in range(100):
        rs.append(await session.get('http://httpbin.org/get'))
    print(rs)

if __name__ == '__main__':
    session.run(_main)

此示例仅适用于Python 3。您还可以提供自己的asyncio事件循环!

使用实例Twisted

from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession

session = AsyncSession(n=100)

@inlineCallbacks
def main(reactor):
    responses = []
    for i in range(100):
        responses.append(session.get('http://httpbin.org/get'))

    for response in responses:
        r = yield response
        print(r)

if __name__ == '__main__':
    react(main)

这个例子在Python 2和Python 3上都可以运行。

也许这对我的回购有帮助,一个基本的例子, 用python编写快速异步HTTP请求


下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)

它是对PyCurl示例的稍微修改版本。

(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)

(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)

import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info

NCONNS = 2  # Number of concurrent GET requests
url    = 'example.com'
urls   = [url for i in range(0x7*NCONNS)]  # Copy the same URL over and over

# Check args
nurls  = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')

# Pre-allocate a list of curl objects
m         = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
  c = pycurl.Curl()
  c.setopt(pycurl.FOLLOWLOCATION,  1)
  c.setopt(pycurl.MAXREDIRS,       5)
  c.setopt(pycurl.CONNECTTIMEOUT,  30)
  c.setopt(pycurl.TIMEOUT,         300)
  c.setopt(pycurl.NOSIGNAL,        1)
  m.handles.append(c)

handles    = m.handles  # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:

  while urls and handles:  # If there is an url to process and a free curl object, add to multi stack
    url   = urls.pop(0)
    c     = handles.pop()
    c.buf = io.BytesIO()
    c.url = url  # store some info
    c.t0  = time.perf_counter()
    c.setopt(pycurl.URL,        c.url)
    c.setopt(pycurl.WRITEDATA,  c.buf)
    c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
    m.add_handle(c)

  while 1:  # Run the internal curl state machine for the multi stack
    ret, num_handles = m.perform()
    if ret!=pycurl.E_CALL_MULTI_PERFORM:  break

  while 1:  # Check for curl objects which have terminated, and add them to the handles
    nq, ok_list, ko_list = m.info_read()
    for c in ok_list:
      m.remove_handle(c)
      t1 = time.perf_counter()
      reply = gzip.decompress(c.buf.getvalue())
      print(f'\x1b[33mGET  \x1b[32m{t1-c.t0:.3f}  \x1b[37m{len(reply):9,}  \x1b[0m{reply[:32]}...')  # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
      handles.append(c)
    for c, errno, errmsg in ko_list:
      m.remove_handle(c)
      print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
      handles.append(c)
    nprocessed = nprocessed + len(ok_list) + len(ko_list)
    if nq==0: break

  m.select(1.0)  # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.

for c in m.handles:
  c.close()
m.close()

Scrapy框架将快速和专业地解决您的问题。它还将缓存所有请求,以便稍后可以重新运行失败的请求。

将该脚本保存为quotes_spider.py。

# quote_spiders.py
import json
import string
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.item import Item, Field

class TextCleaningPipeline(object):
    def _clean_text(self, text):
        text = text.replace('“', '').replace('”', '')
        table = str.maketrans({key: None for key in string.punctuation})
        clean_text = text.translate(table)
        return clean_text.lower()

    def process_item(self, item, spider):
        item['text'] = self._clean_text(item['text'])
        return item

class JsonWriterPipeline(object):
    def open_spider(self, spider):
        self.file = open(spider.settings['JSON_FILE'], 'a')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item

class QuoteItem(Item):
    text = Field()
    author = Field()
    tags = Field()
    spider = Field()

class QuoteSpider(scrapy.Spider):
    name = "quotes"

    def start_requests(self):
        urls = [
            'http://quotes.toscrape.com/page/1/',
            'http://quotes.toscrape.com/page/2/',
            # ...
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response):
        for quote in response.css('div.quote'):
            item = QuoteItem()
            item['text'] = quote.css('span.text::text').get()
            item['author'] = quote.css('small.author::text').get()
            item['tags'] = quote.css('div.tags a.tag::text').getall()
            item['spider'] = self.name
            yield item

if __name__ == '__main__':
    settings = dict()
    settings['USER_AGENT'] = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
    settings['HTTPCACHE_ENABLED'] = True
    settings['CONCURRENT_REQUESTS'] = 20
    settings['CONCURRENT_REQUESTS_PER_DOMAIN'] = 20
    settings['JSON_FILE'] = 'items.jl'
    settings['ITEM_PIPELINES'] = dict()
    settings['ITEM_PIPELINES']['__main__.TextCleaningPipeline'] = 800
    settings['ITEM_PIPELINES']['__main__.JsonWriterPipeline'] = 801

    process = CrawlerProcess(settings=settings)
    process.crawl(QuoteSpider)
    process.start()

紧随其后的是

$ pip install Scrapy
$ python quote_spiders.py 

为了微调scraper,相应地调整CONCURRENT_REQUESTS和CONCURRENT_REQUESTS_PER_DOMAIN设置。