在Python中,是否有一种方法可以通过ICMP来ping服务器,如果服务器响应则返回TRUE,如果没有响应则返回FALSE ?
当前回答
在linux上,可以创建ICMP数据报(不是原始)套接字,而不需要root(或setuid或CAP_NET_RAW): https://unix.stackexchange.com/a/592914。最后我得到了
$ id
uid=1000(raylu) gid=1000(raylu) [...]
$ sudo sysctl net.ipv4.ping_group_range='1000 1000'
import socket
import struct
import time
def main():
ping('192.168.1.10')
def ping(destination):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.getprotobyname('icmp'))
sock.settimeout(10.0)
start_time = time.time_ns() # python 3.7+ only
payload = struct.pack('L', start_time)
sock.sendto(encode(payload), (destination, 0))
while (time.time_ns() - start_time) // 1_000_000_000 < 10:
try:
data, source = sock.recvfrom(256)
except socket.timeout:
print('timed out')
return
message_type, message_code, check, identifier, sequence_number = struct.unpack('bbHHh', data[:8])
if source == (destination, 0) and message_type == ICMP.ECHO_REPLY and data[8:] == payload:
print((time.time_ns() - start_time) // 1_000_000, 'ms')
break
else:
print('got unexpected packet from %s:' % source[0], message_type, data[8:])
else:
print('timed out')
def encode(payload: bytes):
# calculate checksum with check set to 0
checksum = calc_checksum(icmp_header(ICMP.ECHO_REQUEST, 0, 0, 1, 1) + payload)
# craft the packet again with the checksum set
return icmp_header(ICMP.ECHO_REQUEST, 0, checksum, 1, 1) + payload
def icmp_header(message_type, message_code, check, identifier, sequence_number) -> bytes:
return struct.pack('bbHHh', message_type, message_code, check, identifier, sequence_number)
def calc_checksum(data: bytes) -> int:
'''RFC 1071'''
# code stolen from https://github.com/alessandromaggio/pythonping/blob/a59ce65a/pythonping/icmp.py#L8
'''
MIT License
Copyright (c) 2018 Alessandro Maggio
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''
subtotal = 0
for i in range(0, len(data)-1, 2):
subtotal += (data[i] << 8) + data[i+1]
if len(data) % 2:
subtotal += (data[len(data)-1] << 8)
while subtotal >> 16:
subtotal = (subtotal & 0xFFFF) + (subtotal >> 16)
check = ~subtotal
return ((check << 8) & 0xFF00) | ((check >> 8) & 0x00FF)
class ICMP:
ECHO_REPLY = 0
ECHO_REQUEST = 8
虽然这里其他答案建议的许多软件包也可以工作
其他回答
使用这个,它在python 2.7上测试过,工作很好,如果成功,它返回以毫秒为单位的ping时间,失败时返回False。
import platform,subproccess,re
def Ping(hostname,timeout):
if platform.system() == "Windows":
command="ping "+hostname+" -n 1 -w "+str(timeout*1000)
else:
command="ping -i "+str(timeout)+" -c 1 " + hostname
proccess = subprocess.Popen(command, stdout=subprocess.PIPE)
matches=re.match('.*time=([0-9]+)ms.*', proccess.stdout.read(),re.DOTALL)
if matches:
return matches.group(1)
else:
return False
确保安装了pyping或安装它
#!/usr/bin/python
import pyping
response = pyping.ping('Your IP')
if response.ret_code == 0:
print("reachable")
else:
print("unreachable")
我有类似的要求,所以我实现它如下所示。它在Windows 64位和Linux上进行了测试。
import subprocess
def systemCommand(Command):
Output = ""
Error = ""
try:
Output = subprocess.check_output(Command,stderr = subprocess.STDOUT,shell='True')
except subprocess.CalledProcessError as e:
#Invalid command raises this exception
Error = e.output
if Output:
Stdout = Output.split("\n")
else:
Stdout = []
if Error:
Stderr = Error.split("\n")
else:
Stderr = []
return (Stdout,Stderr)
#in main
Host = "ip to ping"
NoOfPackets = 2
Timeout = 5000 #in milliseconds
#Command for windows
Command = 'ping -n {0} -w {1} {2}'.format(NoOfPackets,Timeout,Host)
#Command for linux
#Command = 'ping -c {0} -w {1} {2}'.format(NoOfPackets,Timeout,Host)
Stdout,Stderr = systemCommand(Command)
if Stdout:
print("Host [{}] is reachable.".format(Host))
else:
print("Host [{}] is unreachable.".format(Host))
当IP不可达时,subprocess.check_output()将引发异常。额外的验证可以通过从输出行“Packets: Sent = 2, Received = 2, Lost = 0 (0% loss)”中提取信息来完成。
在linux上,可以创建ICMP数据报(不是原始)套接字,而不需要root(或setuid或CAP_NET_RAW): https://unix.stackexchange.com/a/592914。最后我得到了
$ id
uid=1000(raylu) gid=1000(raylu) [...]
$ sudo sysctl net.ipv4.ping_group_range='1000 1000'
import socket
import struct
import time
def main():
ping('192.168.1.10')
def ping(destination):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.getprotobyname('icmp'))
sock.settimeout(10.0)
start_time = time.time_ns() # python 3.7+ only
payload = struct.pack('L', start_time)
sock.sendto(encode(payload), (destination, 0))
while (time.time_ns() - start_time) // 1_000_000_000 < 10:
try:
data, source = sock.recvfrom(256)
except socket.timeout:
print('timed out')
return
message_type, message_code, check, identifier, sequence_number = struct.unpack('bbHHh', data[:8])
if source == (destination, 0) and message_type == ICMP.ECHO_REPLY and data[8:] == payload:
print((time.time_ns() - start_time) // 1_000_000, 'ms')
break
else:
print('got unexpected packet from %s:' % source[0], message_type, data[8:])
else:
print('timed out')
def encode(payload: bytes):
# calculate checksum with check set to 0
checksum = calc_checksum(icmp_header(ICMP.ECHO_REQUEST, 0, 0, 1, 1) + payload)
# craft the packet again with the checksum set
return icmp_header(ICMP.ECHO_REQUEST, 0, checksum, 1, 1) + payload
def icmp_header(message_type, message_code, check, identifier, sequence_number) -> bytes:
return struct.pack('bbHHh', message_type, message_code, check, identifier, sequence_number)
def calc_checksum(data: bytes) -> int:
'''RFC 1071'''
# code stolen from https://github.com/alessandromaggio/pythonping/blob/a59ce65a/pythonping/icmp.py#L8
'''
MIT License
Copyright (c) 2018 Alessandro Maggio
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''
subtotal = 0
for i in range(0, len(data)-1, 2):
subtotal += (data[i] << 8) + data[i+1]
if len(data) % 2:
subtotal += (data[len(data)-1] << 8)
while subtotal >> 16:
subtotal = (subtotal & 0xFFFF) + (subtotal >> 16)
check = ~subtotal
return ((check << 8) & 0xFF00) | ((check >> 8) & 0x00FF)
class ICMP:
ECHO_REPLY = 0
ECHO_REQUEST = 8
虽然这里其他答案建议的许多软件包也可以工作
在windows或linux中Ping它们,返回一个排序的列表。这是@Ahmed Essam和@Arno回复的混合/修正。
import asyncio
import re
import platform
isWindows = platform.system()
async def ping(host):
cmd = 'ping {} {} 1'.format(host, '-n' if isWindows else '-c')
ping_proc = \
await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await ping_proc.communicate()
outstr = stdout.decode()
if ping_proc.returncode == 0:
delay = int(re.search(r'(?:time=)([\d]*)', outstr).group(1)) if 'time=' in outstr else -1
if delay >= 0:
# print('{} {}ms'.format(host, delay))
return [host, delay]
return [host, None]
async def ping_all():
tasks = []
for i in range(1, 256):
ip = "192.168.1.{}".format(i)
task = asyncio.ensure_future(ping(ip))
tasks.append(task)
retList = await asyncio.gather(*tasks, return_exceptions=True)
retList = [x for x in retList if x[1] is not None]
retList.sort(key=lambda x: int(x[0].split('.')[-1]))
return retList
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
pingRet = loop.run_until_complete(ping_all())
for ip, d in pingRet:
print('{:<16s} {}ms'.format(ip, d))
推荐文章
- python用什么方法避免默认参数为空列表?
- ValueError: numpy。Ndarray大小改变,可能表示二进制不兼容。期望从C头得到88,从PyObject得到80
- Anaconda /conda -安装特定的软件包版本
- 我在哪里调用Keras的BatchNormalization函数?
- 打印测试执行时间并使用py.test锁定缓慢的测试
- 插入一行到熊猫数据框架
- 要列出Pandas DataFrame列
- 在Django模型中存储电话号码的最佳方法是什么?
- 从导入的模块中模拟函数
- 滚动或滑动窗口迭代器?
- python的方法找到最大值和它的索引在一个列表?
- 如何读取文件的前N行?
- 如何删除matplotlib中的顶部和右侧轴?
- 解析.py文件,读取AST,修改它,然后写回修改后的源代码
- Visual Studio Code:如何调试Python脚本的参数