我想编写一个函数,该函数将执行shell命令并将其输出作为字符串返回,无论它是错误消息还是成功消息。我只想得到和用命令行得到的相同的结果。
什么样的代码示例可以做到这一点呢?
例如:
def run_command(cmd):
# ??????
print run_command('mysqladmin create test -uroot -pmysqladmin12')
# Should output something like:
# mysqladmin: CREATE DATABASE failed; error: 'Can't create database 'test'; database exists'
如果您需要在多个文件上运行shell命令,这对我来说很有用。
import os
import subprocess
# Define a function for running commands and capturing stdout line by line
# (Modified from Vartec's solution because it wasn't printing all lines)
def runProcess(exe):
p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return iter(p.stdout.readline, b'')
# Get all filenames in working directory
for filename in os.listdir('./'):
# This command will be run on each file
cmd = 'nm ' + filename
# Run the command and capture the output line by line.
for line in runProcess(cmd.split()):
# Eliminate leading and trailing whitespace
line.strip()
# Split the output
output = line.split()
# Filter the output and print relevant lines
if len(output) > 2:
if ((output[2] == 'set_program_name')):
print filename
print line
编辑:刚刚看到Max Persson的解决方案和J.F. Sebastian的建议。继续把它整合进去。
拆分子进程的初始命令可能非常棘手和麻烦。
使用shlex.split()来帮助自己。
示例命令
Git log -n 5——5年前到2年前
的代码
from subprocess import check_output
from shlex import split
res = check_output(split('git log -n 5 --since "5 years ago" --until "2 year ago"'))
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'
如果没有shlex.split(),代码将如下所示
res = check_output([
'git',
'log',
'-n',
'5',
'--since',
'5 years ago',
'--until',
'2 year ago'
])
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'
Vartec的答案不读取所有的行,所以我做了一个版本:
def run_command(command):
p = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return iter(p.stdout.readline, b'')
用法与公认的答案相同:
command = 'mysqladmin create test -uroot -pmysqladmin12'.split()
for line in run_command(command):
print(line)
对于相同的问题,我有一个稍微不同的需求:
捕获并返回STDOUT消息,因为它们在STDOUT缓冲区中累积(即实时)。
@vartec用python方法解决了这个问题,他使用生成器和“yield”
上面的字
打印所有STDOUT行(即使进程在STDOUT缓冲区可以完全读取之前退出)
不要浪费CPU周期以高频轮询进程
检查子流程的返回代码
如果得到非零错误返回码,则打印STDERR(与STDOUT分开)。
我综合了之前的答案,得出了以下结论:
import subprocess
from time import sleep
def run_command(command):
p = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
# Read stdout from subprocess until the buffer is empty !
for line in iter(p.stdout.readline, b''):
if line: # Don't print blank lines
yield line
# This ensures the process has completed, AND sets the 'returncode' attr
while p.poll() is None:
sleep(.1) #Don't waste CPU-cycles
# Empty STDERR buffer
err = p.stderr.read()
if p.returncode != 0:
# The run_command() function is responsible for logging STDERR
print("Error: " + str(err))
这段代码的执行方式与前面的答案相同:
for line in run_command(cmd):
print(line)
这里有一个解决方案,如果你想在进程运行或不运行时打印输出。
我还添加了当前工作目录,这对我来说很有用不止一次。
希望这个解决方案能帮助到别人:)。
import subprocess
def run_command(cmd_and_args, print_constantly=False, cwd=None):
"""Runs a system command.
:param cmd_and_args: the command to run with or without a Pipe (|).
:param print_constantly: If True then the output is logged in continuous until the command ended.
:param cwd: the current working directory (the directory from which you will like to execute the command)
:return: - a tuple containing the return code, the stdout and the stderr of the command
"""
output = []
process = subprocess.Popen(cmd_and_args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
while True:
next_line = process.stdout.readline()
if next_line:
output.append(str(next_line))
if print_constantly:
print(next_line)
elif not process.poll():
break
error = process.communicate()[1]
return process.returncode, '\n'.join(output), error