我想编写一个函数,该函数将执行shell命令并将其输出作为字符串返回,无论它是错误消息还是成功消息。我只想得到和用命令行得到的相同的结果。

什么样的代码示例可以做到这一点呢?

例如:

def run_command(cmd):
    # ??????

print run_command('mysqladmin create test -uroot -pmysqladmin12')
# Should output something like:
# mysqladmin: CREATE DATABASE failed; error: 'Can't create database 'test'; database exists'

当前回答

可以将输出重定向到文本文件,然后将其读回来。

import subprocess
import os
import tempfile

def execute_to_file(command):
    """
    This function execute the command
    and pass its output to a tempfile then read it back
    It is usefull for process that deploy child process
    """
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()
    path = temp_file.name
    command = command + " > " + path
    proc = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    if proc.stderr:
        # if command failed return
        os.unlink(path)
        return
    with open(path, 'r') as f:
        data = f.read()
    os.unlink(path)
    return data

if __name__ == "__main__":
    path = "Somepath"
    command = 'ecls.exe /files ' + path
    print(execute(command))

其他回答

对于相同的问题,我有一个稍微不同的需求:

捕获并返回STDOUT消息,因为它们在STDOUT缓冲区中累积(即实时)。 @vartec用python方法解决了这个问题,他使用生成器和“yield” 上面的字 打印所有STDOUT行(即使进程在STDOUT缓冲区可以完全读取之前退出) 不要浪费CPU周期以高频轮询进程 检查子流程的返回代码 如果得到非零错误返回码,则打印STDERR(与STDOUT分开)。

我综合了之前的答案,得出了以下结论:

import subprocess
from time import sleep

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         shell=True)
    # Read stdout from subprocess until the buffer is empty !
    for line in iter(p.stdout.readline, b''):
        if line: # Don't print blank lines
            yield line
    # This ensures the process has completed, AND sets the 'returncode' attr
    while p.poll() is None:                                                                                                                                        
        sleep(.1) #Don't waste CPU-cycles
    # Empty STDERR buffer
    err = p.stderr.read()
    if p.returncode != 0:
       # The run_command() function is responsible for logging STDERR 
       print("Error: " + str(err))

这段代码的执行方式与前面的答案相同:

for line in run_command(cmd):
    print(line)

如果您需要在多个文件上运行shell命令,这对我来说很有用。

import os
import subprocess

# Define a function for running commands and capturing stdout line by line
# (Modified from Vartec's solution because it wasn't printing all lines)
def runProcess(exe):    
    p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

# Get all filenames in working directory
for filename in os.listdir('./'):
    # This command will be run on each file
    cmd = 'nm ' + filename

    # Run the command and capture the output line by line.
    for line in runProcess(cmd.split()):
        # Eliminate leading and trailing whitespace
        line.strip()
        # Split the output 
        output = line.split()

        # Filter the output and print relevant lines
        if len(output) > 2:
            if ((output[2] == 'set_program_name')):
                print filename
                print line

编辑:刚刚看到Max Persson的解决方案和J.F. Sebastian的建议。继续把它整合进去。

这里有一个解决方案,如果你想在进程运行或不运行时打印输出。


我还添加了当前工作目录,这对我来说很有用不止一次。


希望这个解决方案能帮助到别人:)。

import subprocess

def run_command(cmd_and_args, print_constantly=False, cwd=None):
"""Runs a system command.

:param cmd_and_args: the command to run with or without a Pipe (|).
:param print_constantly: If True then the output is logged in continuous until the command ended.
:param cwd: the current working directory (the directory from which you will like to execute the command)
:return: - a tuple containing the return code, the stdout and the stderr of the command
"""
output = []

process = subprocess.Popen(cmd_and_args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)

while True:
    next_line = process.stdout.readline()
    if next_line:
        output.append(str(next_line))
        if print_constantly:
            print(next_line)
    elif not process.poll():
        break

error = process.communicate()[1]

return process.returncode, '\n'.join(output), error

Vartec的答案不读取所有的行,所以我做了一个版本:

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

用法与公认的答案相同:

command = 'mysqladmin create test -uroot -pmysqladmin12'.split()
for line in run_command(command):
    print(line)

下面是一个简单而灵活的解决方案,适用于各种操作系统版本,以及Python 2和3,在shell模式下使用IPython:

from IPython.terminal.embed import InteractiveShellEmbed
my_shell = InteractiveShellEmbed()
result = my_shell.getoutput("echo hello world")
print(result)

Out: ['hello world']

它有几个优点

它只需要安装IPython,所以在使用它时,你真的不需要担心特定的Python或操作系统版本,它附带Jupyter -具有广泛的支持 它默认接受一个简单的字符串-所以不需要使用shell模式参数或字符串拆分,使它在我看来稍微干净一些 它还使替换变量甚至字符串本身的整个Python命令更加干净


为了演示:

var = "hello world "
result = my_shell.getoutput("echo {var*2}")
print(result)

Out: ['hello world hello world']

只是想给你一个额外的选择,特别是如果你已经安装了Jupyter

当然,如果你在一个实际的Jupyter笔记本,而不是一个.py脚本,你也可以总是这样做:

result = !echo hello world
print(result)

为了达到同样的目的。