在Python中,是否有一种方法可以通过ICMP来ping服务器,如果服务器响应则返回TRUE,如果没有响应则返回FALSE ?


当前回答

我需要一个更快的ping扫描,我不想使用任何外部库,所以我决定使用内置的asyncio来实现并发。

此代码需要python 3.7+,仅在Linux上制作和测试。它不能在Windows上工作,但我相信你可以很容易地将其更改为在Windows上工作。

我不是asyncio方面的专家,但我使用了这篇很棒的文章,用并发性加速你的Python程序,我想出了这些代码行。我试图使它尽可能简单,所以很可能您需要添加更多的代码来满足您的需求。

它不会返回true或false,我认为让它打印响应ping请求的IP会更方便。我认为这是相当快的,在近10秒内ping 255个ip。

#!/usr/bin/python3

import asyncio

async def ping(host):
    """
    Prints the hosts that respond to ping request
    """
    ping_process = await asyncio.create_subprocess_shell("ping -c 1 " + host + " > /dev/null 2>&1")
    await ping_process.wait()

    if ping_process.returncode == 0:
        print(host)
    return 


async def ping_all():
    tasks = []

    for i in range(1,255):
        ip = "192.168.1.{}".format(i)
        task = asyncio.ensure_future(ping(ip))
        tasks.append(task)

    await asyncio.gather(*tasks, return_exceptions = True)

asyncio.run(ping_all())

样例输出:

192.168.1.1
192.168.1.3
192.168.1.102
192.168.1.106
192.168.1.6

请注意,IP不是按顺序排列的,因为IP在它响应时就会打印出来,所以先响应的就会先打印出来。

其他回答

在四处寻找之后,我最终编写了自己的ping模块,该模块旨在监视大量地址,是异步的,并且不使用大量系统资源。你可以在这里找到它:https://github.com/romana/multi-ping/它是Apache授权的,所以你可以在你的项目中以任何你认为合适的方式使用它。

实施我自己的方法的主要原因是其他方法的限制:

这里提到的许多解决方案都需要将exec输出到命令行实用程序。如果您需要监控大量的IP地址,这是非常低效和资源消耗的。 其他人提到了一些较老的python ping模块。我看了这些,最后,他们都有这样或那样的问题(比如没有正确设置数据包id),不能处理大量地址的ping-ing。

import subprocess
ping_response = subprocess.Popen(["/bin/ping", "-c1", "-w100", "192.168.0.1"], stdout=subprocess.PIPE).stdout.read()

此函数适用于任何操作系统(Unix、Linux、macOS和Windows) Python 2和Python 3

编辑: @radato os。System被subprocess.call取代。这避免了在没有验证主机名字符串的情况下shell注入漏洞。

import platform    # For getting the operating system name
import subprocess  # For executing a shell command

def ping(host):
    """
    Returns True if host (str) responds to a ping request.
    Remember that a host may not respond to a ping (ICMP) request even if the host name is valid.
    """

    # Option for the number of packets as a function of
    param = '-n' if platform.system().lower()=='windows' else '-c'

    # Building the command. Ex: "ping -c 1 google.com"
    command = ['ping', param, '1', host]

    return subprocess.call(command) == 0

注意,根据Windows上的@ikrase,如果你得到一个目标主机不可达错误,这个函数仍然会返回True。

解释

该命令在Windows和类unix系统中都是ping。 选项-n (Windows)或-c (Unix)控制包的数量,在本例中设置为1。

platform.system()返回平台名称。前女友。macOS上的“达尔文”。 Subprocess.call()执行一个系统调用。例如subprocess.call ([' ls ', ' - l '])。

在linux上,可以创建ICMP数据报(不是原始)套接字,而不需要root(或setuid或CAP_NET_RAW): https://unix.stackexchange.com/a/592914。最后我得到了

$ id
uid=1000(raylu) gid=1000(raylu) [...]
$ sudo sysctl net.ipv4.ping_group_range='1000 1000'
import socket
import struct
import time

def main():
    ping('192.168.1.10')

def ping(destination):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.getprotobyname('icmp'))
    sock.settimeout(10.0)
    start_time = time.time_ns() # python 3.7+ only

    payload = struct.pack('L', start_time)
    sock.sendto(encode(payload), (destination, 0))
    while (time.time_ns() - start_time) // 1_000_000_000 < 10:
        try:
            data, source = sock.recvfrom(256)
        except socket.timeout:
            print('timed out')
            return
        message_type, message_code, check, identifier, sequence_number = struct.unpack('bbHHh', data[:8])
        if source == (destination, 0) and message_type == ICMP.ECHO_REPLY and data[8:] == payload:
            print((time.time_ns() - start_time) // 1_000_000, 'ms')
            break
        else:
            print('got unexpected packet from %s:' % source[0], message_type, data[8:])
    else:
        print('timed out')

def encode(payload: bytes):
    # calculate checksum with check set to 0
    checksum = calc_checksum(icmp_header(ICMP.ECHO_REQUEST, 0, 0, 1, 1) + payload)
    # craft the packet again with the checksum set
    return icmp_header(ICMP.ECHO_REQUEST, 0, checksum, 1, 1) + payload

def icmp_header(message_type, message_code, check, identifier, sequence_number) -> bytes:
    return struct.pack('bbHHh', message_type, message_code, check, identifier, sequence_number)

def calc_checksum(data: bytes) -> int:
    '''RFC 1071'''
    # code stolen from https://github.com/alessandromaggio/pythonping/blob/a59ce65a/pythonping/icmp.py#L8
    '''
    MIT License

    Copyright (c) 2018 Alessandro Maggio

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
    '''
    subtotal = 0
    for i in range(0, len(data)-1, 2):
        subtotal += (data[i] << 8) + data[i+1]
    if len(data) % 2:
        subtotal += (data[len(data)-1] << 8)
    while subtotal >> 16:
        subtotal = (subtotal & 0xFFFF) + (subtotal >> 16)
    check = ~subtotal
    return ((check << 8) & 0xFF00) | ((check >> 8) & 0x00FF)

class ICMP:
    ECHO_REPLY = 0
    ECHO_REQUEST = 8

虽然这里其他答案建议的许多软件包也可以工作

下面是一个使用Python的子进程模块和底层操作系统提供的ping CLI工具的解决方案。在Windows和Linux上测试。支持设置网络超时。不需要根权限(至少在Windows和Linux上)。

import platform
import subprocess

def ping(host, network_timeout=3):
    """Send a ping packet to the specified host, using the system "ping" command."""
    args = [
        'ping'
    ]

    platform_os = platform.system().lower()

    if platform_os == 'windows':
        args.extend(['-n', '1'])
        args.extend(['-w', str(network_timeout * 1000)])
    elif platform_os in ('linux', 'darwin'):
        args.extend(['-c', '1'])
        args.extend(['-W', str(network_timeout)])
    else:
        raise NotImplemented('Unsupported OS: {}'.format(platform_os))

    args.append(host)

    try:
        if platform_os == 'windows':
            output = subprocess.run(args, check=True, universal_newlines=True).stdout

            if output and 'TTL' not in output:
                return False
        else:
            subprocess.run(args, check=True)

        return True
    except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
        return False