I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.


当前回答

(工具)

Apache Bench是您所需要的全部。—用于测量HTTP web服务器性能的命令行计算机程序

给你一篇不错的博客文章:https://www.petefreitag.com/item/689.cfm(来自Pete Freitag)

其他回答

Twistedless解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这个方案比twisted方案稍微快一点,并且使用更少的CPU。

这个扭曲的异步web客户端运行得相当快。

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

Scrapy框架将快速和专业地解决您的问题。它还将缓存所有请求,以便稍后可以重新运行失败的请求。

将该脚本保存为quotes_spider.py。

# quote_spiders.py
import json
import string
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.item import Item, Field

class TextCleaningPipeline(object):
    def _clean_text(self, text):
        text = text.replace('“', '').replace('”', '')
        table = str.maketrans({key: None for key in string.punctuation})
        clean_text = text.translate(table)
        return clean_text.lower()

    def process_item(self, item, spider):
        item['text'] = self._clean_text(item['text'])
        return item

class JsonWriterPipeline(object):
    def open_spider(self, spider):
        self.file = open(spider.settings['JSON_FILE'], 'a')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item

class QuoteItem(Item):
    text = Field()
    author = Field()
    tags = Field()
    spider = Field()

class QuoteSpider(scrapy.Spider):
    name = "quotes"

    def start_requests(self):
        urls = [
            'http://quotes.toscrape.com/page/1/',
            'http://quotes.toscrape.com/page/2/',
            # ...
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response):
        for quote in response.css('div.quote'):
            item = QuoteItem()
            item['text'] = quote.css('span.text::text').get()
            item['author'] = quote.css('small.author::text').get()
            item['tags'] = quote.css('div.tags a.tag::text').getall()
            item['spider'] = self.name
            yield item

if __name__ == '__main__':
    settings = dict()
    settings['USER_AGENT'] = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
    settings['HTTPCACHE_ENABLED'] = True
    settings['CONCURRENT_REQUESTS'] = 20
    settings['CONCURRENT_REQUESTS_PER_DOMAIN'] = 20
    settings['JSON_FILE'] = 'items.jl'
    settings['ITEM_PIPELINES'] = dict()
    settings['ITEM_PIPELINES']['__main__.TextCleaningPipeline'] = 800
    settings['ITEM_PIPELINES']['__main__.JsonWriterPipeline'] = 801

    process = CrawlerProcess(settings=settings)
    process.crawl(QuoteSpider)
    process.start()

紧随其后的是

$ pip install Scrapy
$ python quote_spiders.py 

为了微调scraper,相应地调整CONCURRENT_REQUESTS和CONCURRENT_REQUESTS_PER_DOMAIN设置。

下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)

它是对PyCurl示例的稍微修改版本。

(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)

(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)

import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info

NCONNS = 2  # Number of concurrent GET requests
url    = 'example.com'
urls   = [url for i in range(0x7*NCONNS)]  # Copy the same URL over and over

# Check args
nurls  = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')

# Pre-allocate a list of curl objects
m         = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
  c = pycurl.Curl()
  c.setopt(pycurl.FOLLOWLOCATION,  1)
  c.setopt(pycurl.MAXREDIRS,       5)
  c.setopt(pycurl.CONNECTTIMEOUT,  30)
  c.setopt(pycurl.TIMEOUT,         300)
  c.setopt(pycurl.NOSIGNAL,        1)
  m.handles.append(c)

handles    = m.handles  # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:

  while urls and handles:  # If there is an url to process and a free curl object, add to multi stack
    url   = urls.pop(0)
    c     = handles.pop()
    c.buf = io.BytesIO()
    c.url = url  # store some info
    c.t0  = time.perf_counter()
    c.setopt(pycurl.URL,        c.url)
    c.setopt(pycurl.WRITEDATA,  c.buf)
    c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
    m.add_handle(c)

  while 1:  # Run the internal curl state machine for the multi stack
    ret, num_handles = m.perform()
    if ret!=pycurl.E_CALL_MULTI_PERFORM:  break

  while 1:  # Check for curl objects which have terminated, and add them to the handles
    nq, ok_list, ko_list = m.info_read()
    for c in ok_list:
      m.remove_handle(c)
      t1 = time.perf_counter()
      reply = gzip.decompress(c.buf.getvalue())
      print(f'\x1b[33mGET  \x1b[32m{t1-c.t0:.3f}  \x1b[37m{len(reply):9,}  \x1b[0m{reply[:32]}...')  # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
      handles.append(c)
    for c, errno, errmsg in ko_list:
      m.remove_handle(c)
      print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
      handles.append(c)
    nprocessed = nprocessed + len(ok_list) + len(ko_list)
    if nq==0: break

  m.select(1.0)  # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.

for c in m.handles:
  c.close()
m.close()

我发现使用tornado包是最快和最简单的方法来实现这一点:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])