I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.
当前回答
下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)
它是对PyCurl示例的稍微修改版本。
(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)
(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)
import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN) # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info
NCONNS = 2 # Number of concurrent GET requests
url = 'example.com'
urls = [url for i in range(0x7*NCONNS)] # Copy the same URL over and over
# Check args
nurls = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')
# Pre-allocate a list of curl objects
m = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
c = pycurl.Curl()
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.CONNECTTIMEOUT, 30)
c.setopt(pycurl.TIMEOUT, 300)
c.setopt(pycurl.NOSIGNAL, 1)
m.handles.append(c)
handles = m.handles # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:
while urls and handles: # If there is an url to process and a free curl object, add to multi stack
url = urls.pop(0)
c = handles.pop()
c.buf = io.BytesIO()
c.url = url # store some info
c.t0 = time.perf_counter()
c.setopt(pycurl.URL, c.url)
c.setopt(pycurl.WRITEDATA, c.buf)
c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
m.add_handle(c)
while 1: # Run the internal curl state machine for the multi stack
ret, num_handles = m.perform()
if ret!=pycurl.E_CALL_MULTI_PERFORM: break
while 1: # Check for curl objects which have terminated, and add them to the handles
nq, ok_list, ko_list = m.info_read()
for c in ok_list:
m.remove_handle(c)
t1 = time.perf_counter()
reply = gzip.decompress(c.buf.getvalue())
print(f'\x1b[33mGET \x1b[32m{t1-c.t0:.3f} \x1b[37m{len(reply):9,} \x1b[0m{reply[:32]}...') # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
handles.append(c)
for c, errno, errmsg in ko_list:
m.remove_handle(c)
print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
handles.append(c)
nprocessed = nprocessed + len(ok_list) + len(ko_list)
if nq==0: break
m.select(1.0) # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.
for c in m.handles:
c.close()
m.close()
其他回答
(工具)
Apache Bench是您所需要的全部。—用于测量HTTP web服务器性能的命令行计算机程序
给你一篇不错的博客文章:https://www.petefreitag.com/item/689.cfm(来自Pete Freitag)
如果您希望获得尽可能好的性能,您可能会考虑使用异步I/O而不是线程。与成千上万个操作系统线程相关的开销是不小的,Python解释器内的上下文切换甚至增加了更多的开销。线程当然可以完成工作,但我怀疑异步路由将提供更好的整体性能。
具体来说,我建议使用Twisted库中的异步web客户端(http://www.twistedmatrix.com)。它有一个公认的陡峭的学习曲线,但一旦你很好地掌握了Twisted的异步编程风格,它就很容易使用。
Twisted的异步web客户端API的HowTo可以在以下地址找到:
http://twistedmatrix.com/documents/current/web/howto/client.html
pip install requests-threads
使用实例使用async/await - send 100个并发请求
from requests_threads import AsyncSession
session = AsyncSession(n=100)
async def _main():
rs = []
for _ in range(100):
rs.append(await session.get('http://httpbin.org/get'))
print(rs)
if __name__ == '__main__':
session.run(_main)
此示例仅适用于Python 3。您还可以提供自己的asyncio事件循环!
使用实例Twisted
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession
session = AsyncSession(n=100)
@inlineCallbacks
def main(reactor):
responses = []
for i in range(100):
responses.append(session.get('http://httpbin.org/get'))
for response in responses:
r = yield response
print(r)
if __name__ == '__main__':
react(main)
这个例子在Python 2和Python 3上都可以运行。
也许这对我的回购有帮助,一个基本的例子, 用python编写快速异步HTTP请求
创建epoll对象, 打开许多客户端TCP套接字, 调整他们的发送缓冲区比请求头多一点, 发送一个请求头-它应该是即时的,只是放置到缓冲区, 在epoll对象中注册套接字 在epoll obect上做。poll, 从.poll中读取每个套接字的前3个字节, 将它们写入sys。Stdout后面跟着\n(不刷新), 关闭客户端套接字。
限制同时打开的套接字数量-在创建套接字时处理错误。只有当另一个套接字关闭时才创建新的套接字。 调整操作系统限制。 尝试分成几个(不是很多)进程:这可能有助于更有效地使用CPU。
(下一个项目的自我提示)
Python 3解决方案只使用请求。它是最简单且快速的,不需要多处理或复杂的异步库。
最重要的方面是重用连接,特别是对于HTTPS (TLS需要额外的往返才能打开)。注意,连接是特定于子域的。如果在多个域上抓取多个页面,则可以对url列表进行排序,以最大化连接重用(它有效地按域进行排序)。
当给定足够的线程时,它将与任何异步代码一样快。(请求在等待响应时释放python GIL)。
[带有日志记录和错误处理的生产等级代码]
import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# source: https://stackoverflow.com/a/68583332/5994461
THREAD_POOL = 16
# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
'https://',
requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
max_retries=3,
pool_block=True)
)
def get(url):
response = session.get(url)
logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
if response.status_code != 200:
logging.error("request failed, error code %s [%s]", response.status_code, response.url)
if 500 <= response.status_code < 600:
# server is overloaded? give it a break
time.sleep(5)
return response
def download(urls):
with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
# wrap in a list() to wait for all requests to complete
for response in list(executor.map(get, urls)):
if response.status_code == 200:
print(response.content)
def main():
logging.basicConfig(
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
urls = [
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/404",
"https://httpstat.us/503"
]
download(urls)
if __name__ == "__main__":
main()
推荐文章
- 在每个列表元素上调用int()函数?
- 当使用代码存储库时,如何引用资源的相对路径
- 如何在Flask-SQLAlchemy中按id删除记录
- 在Python中插入列表的第一个位置
- Python Pandas只合并某些列
- 如何在一行中连接两个集而不使用“|”
- 从字符串中移除前缀
- 代码结束时发出警报
- 如何在Python中按字母顺序排序字符串中的字母
- 在matplotlib中将y轴标签添加到次要y轴
- 为什么说“HTTP是无状态协议”?
- 如何消除数独方块的凹凸缺陷?
- 为什么出现这个UnboundLocalError(闭包)?
- 使用Python请求的异步请求
- 如何检查一个对象是否是python中的生成器对象?