I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.
当前回答
(下一个项目的自我提示)
Python 3解决方案只使用请求。它是最简单且快速的,不需要多处理或复杂的异步库。
最重要的方面是重用连接,特别是对于HTTPS (TLS需要额外的往返才能打开)。注意,连接是特定于子域的。如果在多个域上抓取多个页面,则可以对url列表进行排序,以最大化连接重用(它有效地按域进行排序)。
当给定足够的线程时,它将与任何异步代码一样快。(请求在等待响应时释放python GIL)。
[带有日志记录和错误处理的生产等级代码]
import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# source: https://stackoverflow.com/a/68583332/5994461
THREAD_POOL = 16
# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
'https://',
requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
max_retries=3,
pool_block=True)
)
def get(url):
response = session.get(url)
logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
if response.status_code != 200:
logging.error("request failed, error code %s [%s]", response.status_code, response.url)
if 500 <= response.status_code < 600:
# server is overloaded? give it a break
time.sleep(5)
return response
def download(urls):
with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
# wrap in a list() to wait for all requests to complete
for response in list(executor.map(get, urls)):
if response.status_code == 200:
print(response.content)
def main():
logging.basicConfig(
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
urls = [
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/404",
"https://httpstat.us/503"
]
download(urls)
if __name__ == "__main__":
main()
其他回答
使用线程池是一个很好的选择,这将使这相当容易。不幸的是,python并没有一个标准库来简化线程池。但这里有一个不错的图书馆,你应该开始: http://www.chrisarndt.de/projects/threadpool/
来自他们网站的代码示例:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
希望这能有所帮助。
Twistedless解决方案:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
这个方案比twisted方案稍微快一点,并且使用更少的CPU。
一个解决方案:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
Testtime:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Scrapy框架将快速和专业地解决您的问题。它还将缓存所有请求,以便稍后可以重新运行失败的请求。
将该脚本保存为quotes_spider.py。
# quote_spiders.py
import json
import string
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.item import Item, Field
class TextCleaningPipeline(object):
def _clean_text(self, text):
text = text.replace('“', '').replace('”', '')
table = str.maketrans({key: None for key in string.punctuation})
clean_text = text.translate(table)
return clean_text.lower()
def process_item(self, item, spider):
item['text'] = self._clean_text(item['text'])
return item
class JsonWriterPipeline(object):
def open_spider(self, spider):
self.file = open(spider.settings['JSON_FILE'], 'a')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item
class QuoteItem(Item):
text = Field()
author = Field()
tags = Field()
spider = Field()
class QuoteSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'http://quotes.toscrape.com/page/1/',
'http://quotes.toscrape.com/page/2/',
# ...
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response):
for quote in response.css('div.quote'):
item = QuoteItem()
item['text'] = quote.css('span.text::text').get()
item['author'] = quote.css('small.author::text').get()
item['tags'] = quote.css('div.tags a.tag::text').getall()
item['spider'] = self.name
yield item
if __name__ == '__main__':
settings = dict()
settings['USER_AGENT'] = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
settings['HTTPCACHE_ENABLED'] = True
settings['CONCURRENT_REQUESTS'] = 20
settings['CONCURRENT_REQUESTS_PER_DOMAIN'] = 20
settings['JSON_FILE'] = 'items.jl'
settings['ITEM_PIPELINES'] = dict()
settings['ITEM_PIPELINES']['__main__.TextCleaningPipeline'] = 800
settings['ITEM_PIPELINES']['__main__.JsonWriterPipeline'] = 801
process = CrawlerProcess(settings=settings)
process.crawl(QuoteSpider)
process.start()
紧随其后的是
$ pip install Scrapy
$ python quote_spiders.py
为了微调scraper,相应地调整CONCURRENT_REQUESTS和CONCURRENT_REQUESTS_PER_DOMAIN设置。
我知道这是一个老问题,但在Python 3.7中,您可以使用asyncio和aiohttp来做到这一点。
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
你可以阅读更多关于它的内容,并在这里看到一个例子。
推荐文章
- 为什么这个用于初始化列表列表的代码明显地将列表链接在一起?
- 在Python中,在函数内部导入时会发生什么?
- 用Python列表中的值创建一个.csv文件
- 在Python中哪个更快:x**。5还是math.sqrt(x)?
- 有哪些好的Python ORM解决方案?
- 如何在f字符串中转义括号?
- Python void返回类型注释
- 如何为python模块的argparse部分编写测试?
- 在python中是否有用于均方根误差(RMSE)的库函数?
- 如何从matplotlib (pyplot。Figure vs matplotlib。figure) (frameon=False matplotlib中有问题)
- django test app error -在创建测试数据库时出现错误:创建数据库的权限被拒绝
- 识别使用pip安装的python包的依赖关系
- 从字符串变量导入模块
- 如何删除Python中的前导空白?
- python中的assertEquals和assertEqual