如何在SQLAlchemy中执行原始SQL ?
我有一个python web应用程序,运行在flask和接口到数据库通过SQLAlchemy。
我需要一种方法来运行原始SQL。该查询涉及多个表连接和内联视图。
我试过了:
connection = db.session.connection()
connection.execute( <sql here> )
但是我总是得到网关错误。
如何在SQLAlchemy中执行原始SQL ?
我有一个python web应用程序,运行在flask和接口到数据库通过SQLAlchemy。
我需要一种方法来运行原始SQL。该查询涉及多个表连接和内联视图。
我试过了:
connection = db.session.connection()
connection.execute( <sql here> )
但是我总是得到网关错误。
当前回答
SQL Alchemy会话对象有自己的执行方法:
result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
All your application queries should be going through a session object, whether they're raw SQL or not. This ensures that the queries are properly managed by a transaction, which allows multiple queries in the same request to be committed or rolled back as a single unit. Going outside the transaction using the engine or the connection puts you at much greater risk of subtle, possibly hard to detect bugs that can leave you with corrupted data. Each request should be associated with only one transaction, and using db.session will ensure this is the case for your application.
还要注意,execute是为参数化查询设计的。使用参数,如示例中的:val,用于查询的任何输入,以保护自己免受SQL注入攻击。您可以通过传递dict作为第二个参数来提供这些参数的值,其中每个键都是查询中出现的参数的名称。参数本身的确切语法可能因数据库而异,但所有主要的关系数据库都以某种形式支持它们。
假设它是一个SELECT查询,这将返回一个RowProxy对象的可迭代对象。
您可以使用各种技术访问单个列:
for r in result:
print(r[0]) # Access by positional index
print(r['my_column']) # Access by column name as a string
r_dict = dict(r.items()) # convert to dict keyed by column names
就我个人而言,我更喜欢将结果转换为命名元组:
from collections import namedtuple
Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
print(r.my_column)
print(r)
如果您没有使用Flask-SQLAlchemy扩展,您仍然可以轻松地使用会话:
import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session
engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))
s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
其他回答
对于SQLAlchemy≥1.4
从SQLAlchemy 1.4开始,已弃用无连接或隐式执行,即。
db.engine.execute(...) # DEPRECATED
以及裸字符串作为查询。
新的API需要显式连接,例如。
from sqlalchemy import text
with db.engine.connect() as connection:
result = connection.execute(text("SELECT * FROM ..."))
for row in result:
# ...
类似地,如果有可用的Session,则鼓励使用现有的Session:
result = session.execute(sqlalchemy.text("SELECT * FROM ..."))
或者使用参数:
session.execute(sqlalchemy.text("SELECT * FROM a_table WHERE a_column = :val"),
{'val': 5})
更多细节请参见文档中的“无连接执行,隐式执行”。
SQL表达式语言教程-使用文本
例子:
from sqlalchemy.sql import text
connection = engine.connect()
# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)
# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
text('select * from Employees where EmployeeGroup = :group'),
group = employeeGroup)
# or - notice the requirement to quote 'Staff'
employees = connection.execute(
text("select * from Employees where EmployeeGroup = 'Staff'"))
for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')
你有没有试过:
result = db.engine.execute("<sql here>")
or:
from sqlalchemy import text
sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names
注意,db.engine.execute()是“无连接的”,这在SQLAlchemy 2.0中已弃用。
SQL Alchemy会话对象有自己的执行方法:
result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
All your application queries should be going through a session object, whether they're raw SQL or not. This ensures that the queries are properly managed by a transaction, which allows multiple queries in the same request to be committed or rolled back as a single unit. Going outside the transaction using the engine or the connection puts you at much greater risk of subtle, possibly hard to detect bugs that can leave you with corrupted data. Each request should be associated with only one transaction, and using db.session will ensure this is the case for your application.
还要注意,execute是为参数化查询设计的。使用参数,如示例中的:val,用于查询的任何输入,以保护自己免受SQL注入攻击。您可以通过传递dict作为第二个参数来提供这些参数的值,其中每个键都是查询中出现的参数的名称。参数本身的确切语法可能因数据库而异,但所有主要的关系数据库都以某种形式支持它们。
假设它是一个SELECT查询,这将返回一个RowProxy对象的可迭代对象。
您可以使用各种技术访问单个列:
for r in result:
print(r[0]) # Access by positional index
print(r['my_column']) # Access by column name as a string
r_dict = dict(r.items()) # convert to dict keyed by column names
就我个人而言,我更喜欢将结果转换为命名元组:
from collections import namedtuple
Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
print(r.my_column)
print(r)
如果您没有使用Flask-SQLAlchemy扩展,您仍然可以轻松地使用会话:
import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session
engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))
s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
result = db.engine.execute(text("<sql here>"))
执行<sql在>,但不提交它,除非你在自动提交模式。因此,插入和更新不会反映在数据库中。
要在更改后提交,请执行
result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))