I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.


当前回答

我发现使用tornado包是最快和最简单的方法来实现这一点:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])

其他回答

我发现使用tornado包是最快和最简单的方法来实现这一点:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])

创建epoll对象, 打开许多客户端TCP套接字, 调整他们的发送缓冲区比请求头多一点, 发送一个请求头-它应该是即时的,只是放置到缓冲区, 在epoll对象中注册套接字 在epoll obect上做。poll, 从.poll中读取每个套接字的前3个字节, 将它们写入sys。Stdout后面跟着\n(不刷新), 关闭客户端套接字。

限制同时打开的套接字数量-在创建套接字时处理错误。只有当另一个套接字关闭时才创建新的套接字。 调整操作系统限制。 尝试分成几个(不是很多)进程:这可能有助于更有效地使用CPU。

解决这个问题的一个好方法是首先编写获得一个结果所需的代码,然后合并线程代码来并行化应用程序。

In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).

您可以遵循以下设计模式来解决上述问题:

Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished. Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a list or dict in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.

我建议您使用threading模块。您可以使用它来启动和跟踪正在运行的线程。Python的线程支持是完全的,但是对问题的描述表明它完全满足了您的需求。

最后,如果您希望看到用Python编写的并行网络应用程序的相当简单的应用程序,请查看ssh.py。它是一个小型库,使用Python线程并行处理许多SSH连接。该设计非常接近您的需求,您可能会发现它是一个很好的资源。

一个解决方案:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

Scrapy框架将快速和专业地解决您的问题。它还将缓存所有请求,以便稍后可以重新运行失败的请求。

将该脚本保存为quotes_spider.py。

# quote_spiders.py
import json
import string
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.item import Item, Field

class TextCleaningPipeline(object):
    def _clean_text(self, text):
        text = text.replace('“', '').replace('”', '')
        table = str.maketrans({key: None for key in string.punctuation})
        clean_text = text.translate(table)
        return clean_text.lower()

    def process_item(self, item, spider):
        item['text'] = self._clean_text(item['text'])
        return item

class JsonWriterPipeline(object):
    def open_spider(self, spider):
        self.file = open(spider.settings['JSON_FILE'], 'a')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item

class QuoteItem(Item):
    text = Field()
    author = Field()
    tags = Field()
    spider = Field()

class QuoteSpider(scrapy.Spider):
    name = "quotes"

    def start_requests(self):
        urls = [
            'http://quotes.toscrape.com/page/1/',
            'http://quotes.toscrape.com/page/2/',
            # ...
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response):
        for quote in response.css('div.quote'):
            item = QuoteItem()
            item['text'] = quote.css('span.text::text').get()
            item['author'] = quote.css('small.author::text').get()
            item['tags'] = quote.css('div.tags a.tag::text').getall()
            item['spider'] = self.name
            yield item

if __name__ == '__main__':
    settings = dict()
    settings['USER_AGENT'] = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
    settings['HTTPCACHE_ENABLED'] = True
    settings['CONCURRENT_REQUESTS'] = 20
    settings['CONCURRENT_REQUESTS_PER_DOMAIN'] = 20
    settings['JSON_FILE'] = 'items.jl'
    settings['ITEM_PIPELINES'] = dict()
    settings['ITEM_PIPELINES']['__main__.TextCleaningPipeline'] = 800
    settings['ITEM_PIPELINES']['__main__.JsonWriterPipeline'] = 801

    process = CrawlerProcess(settings=settings)
    process.crawl(QuoteSpider)
    process.start()

紧随其后的是

$ pip install Scrapy
$ python quote_spiders.py 

为了微调scraper,相应地调整CONCURRENT_REQUESTS和CONCURRENT_REQUESTS_PER_DOMAIN设置。