我想编写一个函数,该函数将执行shell命令并将其输出作为字符串返回,无论它是错误消息还是成功消息。我只想得到和用命令行得到的相同的结果。
什么样的代码示例可以做到这一点呢?
例如:
def run_command(cmd):
# ??????
print run_command('mysqladmin create test -uroot -pmysqladmin12')
# Should output something like:
# mysqladmin: CREATE DATABASE failed; error: 'Can't create database 'test'; database exists'
根据@senderle的说法,如果你像我一样使用python3.6:
def sh(cmd, input=""):
rst = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, input=input.encode("utf-8"))
assert rst.returncode == 0, rst.stderr.decode("utf-8")
return rst.stdout.decode("utf-8")
sh("ls -a")
将完全像您在bash中运行命令一样
这里有一个解决方案,如果你想在进程运行或不运行时打印输出。
我还添加了当前工作目录,这对我来说很有用不止一次。
希望这个解决方案能帮助到别人:)。
import subprocess
def run_command(cmd_and_args, print_constantly=False, cwd=None):
"""Runs a system command.
:param cmd_and_args: the command to run with or without a Pipe (|).
:param print_constantly: If True then the output is logged in continuous until the command ended.
:param cwd: the current working directory (the directory from which you will like to execute the command)
:return: - a tuple containing the return code, the stdout and the stderr of the command
"""
output = []
process = subprocess.Popen(cmd_and_args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
while True:
next_line = process.stdout.readline()
if next_line:
output.append(str(next_line))
if print_constantly:
print(next_line)
elif not process.poll():
break
error = process.communicate()[1]
return process.returncode, '\n'.join(output), error
Vartec的答案不读取所有的行,所以我做了一个版本:
def run_command(command):
p = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return iter(p.stdout.readline, b'')
用法与公认的答案相同:
command = 'mysqladmin create test -uroot -pmysqladmin12'.split()
for line in run_command(command):
print(line)
如果您需要在多个文件上运行shell命令,这对我来说很有用。
import os
import subprocess
# Define a function for running commands and capturing stdout line by line
# (Modified from Vartec's solution because it wasn't printing all lines)
def runProcess(exe):
p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return iter(p.stdout.readline, b'')
# Get all filenames in working directory
for filename in os.listdir('./'):
# This command will be run on each file
cmd = 'nm ' + filename
# Run the command and capture the output line by line.
for line in runProcess(cmd.split()):
# Eliminate leading and trailing whitespace
line.strip()
# Split the output
output = line.split()
# Filter the output and print relevant lines
if len(output) > 2:
if ((output[2] == 'set_program_name')):
print filename
print line
编辑:刚刚看到Max Persson的解决方案和J.F. Sebastian的建议。继续把它整合进去。
对于相同的问题,我有一个稍微不同的需求:
捕获并返回STDOUT消息,因为它们在STDOUT缓冲区中累积(即实时)。
@vartec用python方法解决了这个问题,他使用生成器和“yield”
上面的字
打印所有STDOUT行(即使进程在STDOUT缓冲区可以完全读取之前退出)
不要浪费CPU周期以高频轮询进程
检查子流程的返回代码
如果得到非零错误返回码,则打印STDERR(与STDOUT分开)。
我综合了之前的答案,得出了以下结论:
import subprocess
from time import sleep
def run_command(command):
p = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
# Read stdout from subprocess until the buffer is empty !
for line in iter(p.stdout.readline, b''):
if line: # Don't print blank lines
yield line
# This ensures the process has completed, AND sets the 'returncode' attr
while p.poll() is None:
sleep(.1) #Don't waste CPU-cycles
# Empty STDERR buffer
err = p.stderr.read()
if p.returncode != 0:
# The run_command() function is responsible for logging STDERR
print("Error: " + str(err))
这段代码的执行方式与前面的答案相同:
for line in run_command(cmd):
print(line)
在Python 3.7+中,使用subprocess.run并传递capture_output=True:
import subprocess
result = subprocess.run(['echo', 'hello', 'world'], capture_output=True)
print(repr(result.stdout))
这将返回字节:
b'hello world\n'
如果你想把字节转换成字符串,添加text=True:
result = subprocess.run(['echo', 'hello', 'world'], capture_output=True, text=True)
print(repr(result.stdout))
这将读取字节使用您的默认编码:
'hello world\n'
如果你需要手动指定不同的编码,使用encoding="your encoding"而不是text=True:
result = subprocess.run(['echo', 'hello', 'world'], capture_output=True, encoding="utf8")
print(repr(result.stdout))