I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.
当前回答
最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程。它们有问题(比如序列化),但足够好了。你需要一个队列和线程池。这里有一个选项,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。
其他回答
自从2010年这篇文章发布以来,事情发生了很大的变化,我还没有尝试过所有其他的答案,但我尝试了一些,我发现使用python3.6对我来说这是最好的。
在AWS上运行时,我每秒可以获取大约150个独特的域名。
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程。它们有问题(比如序列化),但足够好了。你需要一个队列和线程池。这里有一个选项,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。
我知道这是一个老问题,但在Python 3.7中,您可以使用asyncio和aiohttp来做到这一点。
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
你可以阅读更多关于它的内容,并在这里看到一个例子。
(下一个项目的自我提示)
Python 3解决方案只使用请求。它是最简单且快速的,不需要多处理或复杂的异步库。
最重要的方面是重用连接,特别是对于HTTPS (TLS需要额外的往返才能打开)。注意,连接是特定于子域的。如果在多个域上抓取多个页面,则可以对url列表进行排序,以最大化连接重用(它有效地按域进行排序)。
当给定足够的线程时,它将与任何异步代码一样快。(请求在等待响应时释放python GIL)。
[带有日志记录和错误处理的生产等级代码]
import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# source: https://stackoverflow.com/a/68583332/5994461
THREAD_POOL = 16
# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
'https://',
requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
max_retries=3,
pool_block=True)
)
def get(url):
response = session.get(url)
logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
if response.status_code != 200:
logging.error("request failed, error code %s [%s]", response.status_code, response.url)
if 500 <= response.status_code < 600:
# server is overloaded? give it a break
time.sleep(5)
return response
def download(urls):
with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
# wrap in a list() to wait for all requests to complete
for response in list(executor.map(get, urls)):
if response.status_code == 200:
print(response.content)
def main():
logging.basicConfig(
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
urls = [
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/404",
"https://httpstat.us/503"
]
download(urls)
if __name__ == "__main__":
main()
使用线程池是一个很好的选择,这将使这相当容易。不幸的是,python并没有一个标准库来简化线程池。但这里有一个不错的图书馆,你应该开始: http://www.chrisarndt.de/projects/threadpool/
来自他们网站的代码示例:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
希望这能有所帮助。
推荐文章
- 为什么这个用于初始化列表列表的代码明显地将列表链接在一起?
- 在Python中,在函数内部导入时会发生什么?
- 用Python列表中的值创建一个.csv文件
- 在Python中哪个更快:x**。5还是math.sqrt(x)?
- 有哪些好的Python ORM解决方案?
- 如何在f字符串中转义括号?
- Python void返回类型注释
- 如何为python模块的argparse部分编写测试?
- 在python中是否有用于均方根误差(RMSE)的库函数?
- 如何从matplotlib (pyplot。Figure vs matplotlib。figure) (frameon=False matplotlib中有问题)
- django test app error -在创建测试数据库时出现错误:创建数据库的权限被拒绝
- 识别使用pip安装的python包的依赖关系
- 从字符串变量导入模块
- 如何删除Python中的前导空白?
- python中的assertEquals和assertEqual