我想编写一个函数,该函数将执行shell命令并将其输出作为字符串返回,无论它是错误消息还是成功消息。我只想得到和用命令行得到的相同的结果。

什么样的代码示例可以做到这一点呢?

例如:

def run_command(cmd):
    # ??????

print run_command('mysqladmin create test -uroot -pmysqladmin12')
# Should output something like:
# mysqladmin: CREATE DATABASE failed; error: 'Can't create database 'test'; database exists'

就像这样:

def runProcess(exe):    
    p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    while(True):
        # returns None while subprocess is running
        retcode = p.poll() 
        line = p.stdout.readline()
        yield line
        if retcode is not None:
            break

注意,我将stderr重定向到stdout,这可能不是你想要的,但我也想要错误消息。

这个函数一行一行地输出(通常你必须等待子进程完成才能得到完整的输出)。

对于你的情况,用法是:

for line in runProcess('mysqladmin create test -uroot -pmysqladmin12'.split()):
    print line,

在所有官方维护的Python版本中,最简单的方法是使用子进程。check_output功能:

>>> subprocess.check_output(['ls', '-l'])
b'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

Check_output运行一个单独的程序,只接受参数作为输入它返回打印到stdout的结果。如果您需要向stdin写入输入,请跳过运行或Popen部分。如果您想执行复杂的shell命令,请参阅答案末尾shell=True的说明。

check_output函数适用于所有官方维护的Python版本。但是对于最近的版本,可以使用更灵活的方法。

Python(3.5或更高版本)的现代版本:运行

如果您使用的是Python 3.5+,并且不需要向后兼容,那么对于大多数任务,官方文档都推荐使用新的run函数。它为子流程模块提供了一个非常通用的高级API。要捕获程序的输出,请传递子流程。PIPE标志指向stdout关键字参数。然后访问返回的CompletedProcess对象的stdout属性:

>>> import subprocess
>>> result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
>>> result.stdout
b'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

返回值是一个bytes对象,所以如果想要一个合适的字符串,就需要解码它。假设被调用进程返回一个utf -8编码的字符串:

>>> result.stdout.decode('utf-8')
'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

如果需要,这些都可以压缩成一行代码:

>>> subprocess.run(['ls', '-l'], stdout=subprocess.PIPE).stdout.decode('utf-8')
'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

如果你想把输入传递给进程的stdin,你可以把一个bytes对象传递给input关键字参数:

>>> cmd = ['awk', 'length($0) > 5']
>>> ip = 'foo\nfoofoo\n'.encode('utf-8')
>>> result = subprocess.run(cmd, stdout=subprocess.PIPE, input=ip)
>>> result.stdout.decode('utf-8')
'foofoo\n'

你可以通过传递stderr=subprocess来捕捉错误。PIPE(捕获到result.stderr)或stderr=subprocess。捕获结果。标准输出和常规输出)。如果希望在流程返回非零退出码时运行抛出异常,可以传递check=True。(或者你可以检查上面result的returncode属性。)当安全性不是问题时,您还可以通过传递shell=True来运行更复杂的shell命令,如答案末尾所述。

Python的后续版本进一步简化了上述内容。在Python 3.7+中,上面的一行代码可以这样拼写:

>>> subprocess.run(['ls', '-l'], capture_output=True, text=True).stdout
'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

与旧的操作方式相比,以这种方式使用run只增加了一点复杂性。但是现在您可以单独使用run函数完成几乎所有需要做的事情。

旧版本的Python(3-3.4):更多关于check_output

如果您使用的是较旧版本的Python,或者需要适度的向后兼容性,您可以使用上面简要描述的check_output函数。它从Python 2.7开始就可用了。

subprocess.check_output(*popenargs, **kwargs)  

它接受与Popen相同的参数(见下文),并返回包含程序输出的字符串。这个回答的开头有一个更详细的用法示例。在Python 3.5+中,check_output相当于执行带有check=True和stdout=PIPE的run,并且只返回stdout属性。

你可以传递stderr=subprocess。STDOUT以确保返回的输出中包含错误消息。当安全性不是问题时,您还可以通过传递shell=True来运行更复杂的shell命令,如答案末尾所述。

如果您需要从stderr管道或将输入传递给流程,check_output将无法完成该任务。在这种情况下,请参阅下面的Popen示例。

复杂应用程序和Python(2.6及以下)的遗留版本:Popen

如果需要深度向后兼容性,或者需要比check_output或运行provider更复杂的功能,则必须直接使用Popen对象,它为子进程封装了低级API。

Popen构造函数可以接受不带参数的单个命令,也可以接受包含命令作为第一项的列表,后面跟着任意数量的参数,每个参数作为列表中的单独项。shlex。Split可以帮助将字符串解析为适当格式化的列表。Popen对象还接受许多用于进程IO管理和低级配置的不同参数。

要发送输入和捕获输出,通信几乎总是首选的方法。如:

output = subprocess.Popen(["mycmd", "myarg"], 
                          stdout=subprocess.PIPE).communicate()[0]

Or

>>> import subprocess
>>> p = subprocess.Popen(['ls', '-a'], stdout=subprocess.PIPE, 
...                                    stderr=subprocess.PIPE)
>>> out, err = p.communicate()
>>> print out
.
..
foo

如果你设置了stdin=PIPE, communication也允许你通过stdin向进程传递数据:

>>> cmd = ['awk', 'length($0) > 5']
>>> p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
...                           stderr=subprocess.PIPE,
...                           stdin=subprocess.PIPE)
>>> out, err = p.communicate('foo\nfoofoo\n')
>>> print out
foofoo

请注意Aaron Hall的回答,这表明在某些系统上,您可能需要将stdout、stderr和stdin全部设置为PIPE(或DEVNULL)才能使通信正常工作。

在极少数情况下,您可能需要复杂的实时输出捕获。Vartec的回答提出了一条前进的道路,但如果不小心使用,除了沟通之外的其他方法很容易陷入僵局。

与上述所有函数一样,当安全性不是问题时,您可以通过传递shell=True来运行更复杂的shell命令。

笔记

1. 执行shell命令:参数shell=True

通常,对run、check_output或Popen构造函数的每次调用都执行一个程序。这意味着没有花哨的敲击式管道。如果你想运行复杂的shell命令,你可以传递shell=True,这三个函数都支持。例如:

>>> subprocess.check_output('cat books/* | wc', shell=True, text=True)
' 1299377 17005208 101299376\n'

然而,这样做会引起安全问题。如果您要做的不是简单的脚本,那么最好分别调用每个进程,并将每个进程的输出作为输入传递给下一个进程,即via

run(cmd, [stdout=etc...], input=other_output)

Or

Popen(cmd, [stdout=etc...]).communicate(other_output)

直接连接管道的诱惑很强烈;抵制它。否则,您可能会看到死锁,或者不得不做类似这样的讨厌的事情。


这要简单得多,但只适用于Unix(包括Cygwin)和Python2.7。

import commands
print commands.getstatusoutput('wc -l file')

它返回一个元组(return_value, output)。

对于在Python2和Python3中都可以工作的解决方案,请使用subprocess模块:

from subprocess import Popen, PIPE
output = Popen(["date"],stdout=PIPE)
response = output.communicate()
print response

Vartec的答案不读取所有的行,所以我做了一个版本:

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

用法与公认的答案相同:

command = 'mysqladmin create test -uroot -pmysqladmin12'.split()
for line in run_command(command):
    print(line)

我在Python 2.6.5的Windows上尝试了@senderle对Vartec解决方案的旋转,但我得到了错误,没有其他解决方案有效。我的错误是:WindowsError:[错误6]句柄无效。

我发现我必须将PIPE分配给每个句柄,以使它返回我所期望的输出——下面的方法对我有用。

import subprocess

def run_command(cmd):
    """given shell command, returns communication tuple of stdout and stderr"""
    return subprocess.Popen(cmd, 
                            stdout=subprocess.PIPE, 
                            stderr=subprocess.PIPE, 
                            stdin=subprocess.PIPE).communicate()

然后像这样调用,([0]获取元组的第一个元素,stdout):

run_command('tracert 11.1.0.1')[0]

在了解了更多信息后,我认为我需要这些管道参数,因为我正在开发一个使用不同句柄的自定义系统,因此我必须直接控制所有std。

要停止控制台弹出窗口(Windows),请执行以下操作:

def run_command(cmd):
    """given shell command, returns communication tuple of stdout and stderr"""
    # instantiate a startupinfo obj:
    startupinfo = subprocess.STARTUPINFO()
    # set the use show window flag, might make conditional on being in Windows:
    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    # pass as the startupinfo keyword argument:
    return subprocess.Popen(cmd,
                            stdout=subprocess.PIPE, 
                            stderr=subprocess.PIPE, 
                            stdin=subprocess.PIPE, 
                            startupinfo=startupinfo).communicate()

run_command('tracert 11.1.0.1')

如果您需要在多个文件上运行shell命令,这对我来说很有用。

import os
import subprocess

# Define a function for running commands and capturing stdout line by line
# (Modified from Vartec's solution because it wasn't printing all lines)
def runProcess(exe):    
    p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

# Get all filenames in working directory
for filename in os.listdir('./'):
    # This command will be run on each file
    cmd = 'nm ' + filename

    # Run the command and capture the output line by line.
    for line in runProcess(cmd.split()):
        # Eliminate leading and trailing whitespace
        line.strip()
        # Split the output 
        output = line.split()

        # Filter the output and print relevant lines
        if len(output) > 2:
            if ((output[2] == 'set_program_name')):
                print filename
                print line

编辑:刚刚看到Max Persson的解决方案和J.F. Sebastian的建议。继续把它整合进去。


这是一个棘手但超级简单的解决方案,适用于许多情况:

import os
os.system('sample_cmd > tmp')
print(open('tmp', 'r').read())

使用命令的输出创建一个临时文件(这里是tmp),您可以从中读取所需的输出。

备注: 如果是一次性作业,可以删除tmp文件。如果需要多次执行此操作,则不需要删除tmp。

os.remove('tmp')

对于相同的问题,我有一个稍微不同的需求:

捕获并返回STDOUT消息,因为它们在STDOUT缓冲区中累积(即实时)。 @vartec用python方法解决了这个问题,他使用生成器和“yield” 上面的字 打印所有STDOUT行(即使进程在STDOUT缓冲区可以完全读取之前退出) 不要浪费CPU周期以高频轮询进程 检查子流程的返回代码 如果得到非零错误返回码,则打印STDERR(与STDOUT分开)。

我综合了之前的答案,得出了以下结论:

import subprocess
from time import sleep

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         shell=True)
    # Read stdout from subprocess until the buffer is empty !
    for line in iter(p.stdout.readline, b''):
        if line: # Don't print blank lines
            yield line
    # This ensures the process has completed, AND sets the 'returncode' attr
    while p.poll() is None:                                                                                                                                        
        sleep(.1) #Don't waste CPU-cycles
    # Empty STDERR buffer
    err = p.stderr.read()
    if p.returncode != 0:
       # The run_command() function is responsible for logging STDERR 
       print("Error: " + str(err))

这段代码的执行方式与前面的答案相同:

for line in run_command(cmd):
    print(line)

我也遇到了同样的问题,但我想出了一个非常简单的方法:

import subprocess
output = subprocess.getoutput("ls -l")
print(output)

注意:此解决方案是特定于Python3的subprocess.getoutput()在Python2中不起作用


可以使用以下命令运行任何shell命令。我在ubuntu上使用过它们。

import os
os.popen('your command here').read()

注意:自python 2.6起已弃用。现在必须使用subprocess.Popen。下面是示例

import subprocess

p = subprocess.Popen("Your command", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
print p.split("\n")

例如,execute('ls -ahl') 区分三/四种可能的回报和操作系统平台:

没有输出,但运行成功 输出空行,运行成功 运行失败 输出一些东西,成功运行

下面的函数

def execute(cmd, output=True, DEBUG_MODE=False):
"""Executes a bash command.
(cmd, output=True)
output: whether print shell output to screen, only affects screen display, does not affect returned values
return: ...regardless of output=True/False...
        returns shell output as a list with each elment is a line of string (whitespace stripped both sides) from output
        could be 
        [], ie, len()=0 --> no output;    
        [''] --> output empty line;     
        None --> error occured, see below

        if error ocurs, returns None (ie, is None), print out the error message to screen
"""
if not DEBUG_MODE:
    print "Command: " + cmd

    # https://stackoverflow.com/a/40139101/2292993
    def _execute_cmd(cmd):
        if os.name == 'nt' or platform.system() == 'Windows':
            # set stdin, out, err all to PIPE to get results (other than None) after run the Popen() instance
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        else:
            # Use bash; the default is sh
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable="/bin/bash")

        # the Popen() instance starts running once instantiated (??)
        # additionally, communicate(), or poll() and wait process to terminate
        # communicate() accepts optional input as stdin to the pipe (requires setting stdin=subprocess.PIPE above), return out, err as tuple
        # if communicate(), the results are buffered in memory

        # Read stdout from subprocess until the buffer is empty !
        # if error occurs, the stdout is '', which means the below loop is essentially skipped
        # A prefix of 'b' or 'B' is ignored in Python 2; 
        # it indicates that the literal should become a bytes literal in Python 3 
        # (e.g. when code is automatically converted with 2to3).
        # return iter(p.stdout.readline, b'')
        for line in iter(p.stdout.readline, b''):
            # # Windows has \r\n, Unix has \n, Old mac has \r
            # if line not in ['','\n','\r','\r\n']: # Don't print blank lines
                yield line
        while p.poll() is None:                                                                                                                                        
            sleep(.1) #Don't waste CPU-cycles
        # Empty STDERR buffer
        err = p.stderr.read()
        if p.returncode != 0:
            # responsible for logging STDERR 
            print("Error: " + str(err))
            yield None

    out = []
    for line in _execute_cmd(cmd):
        # error did not occur earlier
        if line is not None:
            # trailing comma to avoid a newline (by print itself) being printed
            if output: print line,
            out.append(line.strip())
        else:
            # error occured earlier
            out = None
    return out
else:
    print "Simulation! The command is " + cmd
    print ""

拆分子进程的初始命令可能非常棘手和麻烦。

使用shlex.split()来帮助自己。

示例命令

Git log -n 5——5年前到2年前

的代码

from subprocess import check_output
from shlex import split

res = check_output(split('git log -n 5 --since "5 years ago" --until "2 year ago"'))
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

如果没有shlex.split(),代码将如下所示

res = check_output([
    'git', 
    'log', 
    '-n', 
    '5', 
    '--since', 
    '5 years ago', 
    '--until', 
    '2 year ago'
])
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

根据@senderle的说法,如果你像我一样使用python3.6:

def sh(cmd, input=""):
    rst = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, input=input.encode("utf-8"))
    assert rst.returncode == 0, rst.stderr.decode("utf-8")
    return rst.stdout.decode("utf-8")
sh("ls -a")

将完全像您在bash中运行命令一样


如果你使用subprocess python模块,你可以分别处理命令的STDOUT、STDERR和返回码。您可以看到完整命令调用器实现的示例。当然你可以用try.扩展它,除非你想。

下面的函数返回STDOUT, STDERR和Return代码,以便您可以在另一个脚本中处理它们。

import subprocess

def command_caller(command=None)
    sp = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=False)
    out, err = sp.communicate()
    if sp.returncode:
        print(
            "Return code: %(ret_code)s Error message: %(err_msg)s"
            % {"ret_code": sp.returncode, "err_msg": err}
            )
    return sp.returncode, out, err

可以将输出重定向到文本文件,然后将其读回来。

import subprocess
import os
import tempfile

def execute_to_file(command):
    """
    This function execute the command
    and pass its output to a tempfile then read it back
    It is usefull for process that deploy child process
    """
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()
    path = temp_file.name
    command = command + " > " + path
    proc = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    if proc.stderr:
        # if command failed return
        os.unlink(path)
        return
    with open(path, 'r') as f:
        data = f.read()
    os.unlink(path)
    return data

if __name__ == "__main__":
    path = "Somepath"
    command = 'ecls.exe /files ' + path
    print(execute(command))

这里有一个解决方案,如果你想在进程运行或不运行时打印输出。


我还添加了当前工作目录,这对我来说很有用不止一次。


希望这个解决方案能帮助到别人:)。

import subprocess

def run_command(cmd_and_args, print_constantly=False, cwd=None):
"""Runs a system command.

:param cmd_and_args: the command to run with or without a Pipe (|).
:param print_constantly: If True then the output is logged in continuous until the command ended.
:param cwd: the current working directory (the directory from which you will like to execute the command)
:return: - a tuple containing the return code, the stdout and the stderr of the command
"""
output = []

process = subprocess.Popen(cmd_and_args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)

while True:
    next_line = process.stdout.readline()
    if next_line:
        output.append(str(next_line))
        if print_constantly:
            print(next_line)
    elif not process.poll():
        break

error = process.communicate()[1]

return process.returncode, '\n'.join(output), error

我想建议你考虑一下simppl。它是一个可以通过pypi: pip install simppl获得的模块,运行在python3上。

Simppl允许用户运行shell命令并从屏幕上读取输出。

开发人员建议使用三种类型的用例:

最简单的用法是这样的: 从simppl。simple_pipeline导入simepipeline sp = SimplePipeline(start=0, end=100): sp.print_and_run(“< YOUR_FIRST_OS_COMMAND >”) sp.print_and_run(“< YOUR_SECOND_OS_COMMAND >”)' ' '


要同时运行多个命令,请使用: commands = ['<YOUR_FIRST_OS_COMMAND>', '<YOUR_SECOND_OS_COMMAND>'] Max_number_of_processes = 4 max_number_of_processes sp.run_parallel(命令)' ' '


最后,如果你的项目使用cli模块,你可以直接运行另一个command_line_tool作为管道的一部分。另一个工具会 从相同的进程中运行,但它将从日志中显示为 管道中的另一个命令。这使得调试和 重构调用其他工具的工具。 从example_module导入example_tool Sp.print_and_run_clt (example_tool.run, ['first_number', 'second_nmber'], {'-key1': 'val1', '-key2': 'val2'}, {“——国旗”})' ' '

请注意,打印到STDOUT/STDERR是通过python的日志模块。


下面是simppl工作原理的完整代码:

import logging
from logging.config import dictConfig

logging_config = dict(
    version = 1,
    formatters = {
        'f': {'format':
              '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'}
        },
    handlers = {
        'h': {'class': 'logging.StreamHandler',
              'formatter': 'f',
              'level': logging.DEBUG}
        },
    root = {
        'handlers': ['h'],
        'level': logging.DEBUG,
        },
)
dictConfig(logging_config)

from simppl.simple_pipeline import SimplePipeline
sp = SimplePipeline(0, 100)
sp.print_and_run('ls')

在Python 3.7+中,使用subprocess.run并传递capture_output=True:

import subprocess
result = subprocess.run(['echo', 'hello', 'world'], capture_output=True)
print(repr(result.stdout))

这将返回字节:

b'hello world\n'

如果你想把字节转换成字符串,添加text=True:

result = subprocess.run(['echo', 'hello', 'world'], capture_output=True, text=True)
print(repr(result.stdout))

这将读取字节使用您的默认编码:

'hello world\n'

如果你需要手动指定不同的编码,使用encoding="your encoding"而不是text=True:

result = subprocess.run(['echo', 'hello', 'world'], capture_output=True, encoding="utf8")
print(repr(result.stdout))

下面是一个简单而灵活的解决方案,适用于各种操作系统版本,以及Python 2和3,在shell模式下使用IPython:

from IPython.terminal.embed import InteractiveShellEmbed
my_shell = InteractiveShellEmbed()
result = my_shell.getoutput("echo hello world")
print(result)

Out: ['hello world']

它有几个优点

它只需要安装IPython,所以在使用它时,你真的不需要担心特定的Python或操作系统版本,它附带Jupyter -具有广泛的支持 它默认接受一个简单的字符串-所以不需要使用shell模式参数或字符串拆分,使它在我看来稍微干净一些 它还使替换变量甚至字符串本身的整个Python命令更加干净


为了演示:

var = "hello world "
result = my_shell.getoutput("echo {var*2}")
print(result)

Out: ['hello world hello world']

只是想给你一个额外的选择,特别是如果你已经安装了Jupyter

当然,如果你在一个实际的Jupyter笔记本,而不是一个.py脚本,你也可以总是这样做:

result = !echo hello world
print(result)

为了达到同样的目的。


出于某种原因,这个方法适用于Python 2.7,你只需要导入操作系统!

import os 

def bash(command):
    output = os.popen(command).read()
    return output

print_me = bash('ls -l')
print(print_me)

改进以获得更好的日志记录。 为了获得更好的输出,可以使用迭代器。 从下面开始,我们变得更好

from subprocess import Popen, getstatusoutput, PIPE
def shell_command(cmd):
    result = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)

    output = iter(result.stdout.readline, b'')
    error = iter(result.stderr.readline, b'')
    print("##### OutPut ###")
    for line in output:
        print(line.decode("utf-8"))
    print("###### Error ########")
    for line in error:
        print(error.decode("utf-8")) # Convert bytes to str

    status, terminal_output = run_command(cmd)
    print(terminal_output)

shell_command("ls") # this will display all the files & folders in directory

使用getstatusoutput的其他方法(容易理解)

from subprocess import Popen, getstatusoutput, PIPE

status_Code, output = getstausoutput(command)
print(output) # this will give the terminal output

# status_code, output = getstatusoutput("ls") # this will print the all files & folder available in the directory