是否有一种方法可以在执行查询时显示Django正在运行的SQL ?


请参阅文档常见问题:“如何查看Django正在运行的原始SQL查询?”

django.db.connection.queries包含一个SQL查询列表:

from django.db import connection
print(connection.queries)

queryset也有一个包含要执行的查询的query属性:

print(MyModel.objects.filter(name="my name").query)

注意,查询的输出不是有效的SQL,因为:

Django从未真正插入参数:它将查询和参数分别发送到数据库适配器,由数据库适配器执行适当的操作。

来自Django错误报告#17741。

因此,不应该将查询输出直接发送到数据库。

如果你需要重置查询,例如,查看在给定的时间内有多少查询在运行,你可以使用reset_queries from django.db:

from django.db import reset_queries
from django.db import connection

reset_queries()
# Run your query here
print(connection.queries)
>>> []

虽然可以使用提供的代码来完成,但我发现使用调试工具栏应用程序是显示查询的一个很好的工具。你可以从github下载。

这使您可以选择显示在给定页面上运行的所有查询以及查询所花费的时间。它还汇总了页面上的查询数量以及用于快速查看的总时间。当你想了解Django ORM在幕后做了什么时,这是一个很好的工具。它也有很多其他不错的功能,你可以使用,如果你喜欢。


看一下debug_toolbar,它对调试非常有用。

文档和源代码可以在http://django-debug-toolbar.readthedocs.io/上找到。


如果你确保你的settings.py文件有:

django.core.context_processors.debug中列出的 DEBUG = True INTERNAL_IPS元组中的IP

然后您应该可以访问sql_queries变量。我在每个页面上都添加了一个页脚,如下所示:

{%if sql_queries %}
  <div class="footNav">
    <h2>Queries</h2>
    <p>
      {{ sql_queries|length }} Quer{{ sql_queries|pluralize:"y,ies" }}, {{sql_time_sum}} Time
    {% ifnotequal sql_queries|length 0 %}
      (<span style="cursor: pointer;" onclick="var s=document.getElementById('debugQueryTable').style;s.disp\
lay=s.display=='none'?'':'none';this.innerHTML=this.innerHTML=='Show'?'Hide':'Show';">Show</span>)
    {% endifnotequal %}
    </p>
    <table id="debugQueryTable" style="display: none;">
      <col width="1"></col>
      <col></col>
      <col width="1"></col>
      <thead>
        <tr>
          <th scope="col">#</th>
          <th scope="col">SQL</th>
          <th scope="col">Time</th>
        </tr>
      </thead>
      <tbody>
        {% for query in sql_queries %}
          <tr class="{% cycle odd,even %}">
            <td>{{ forloop.counter }}</td>
            <td>{{ query.sql|escape }}</td>
            <td>{{ query.time }}</td>
          </tr>
        {% endfor %}
      </tbody>
    </table>
  </div>
{% endif %}

通过添加一行,我得到了变量sql_time_sum

context_extras['sql_time_sum'] = sum([float(q['time']) for q in connection.queries])

到django_src/django/core/context_processors.py中的debug函数。


查询实际上嵌入在模型API中:

q = Query.objects.values('val1','val2','val_etc')

print(q.query)

另一个选项,请参阅本文中描述的settings.py中的日志记录选项

http://dabapps.com/blog/logging-sql-queries-django-13/

Debug_toolbar会降低开发服务器上每个页面的加载速度,而日志记录则不会,因此速度更快。输出可以转储到控制台或文件,所以UI不是很好。但是对于包含大量sql的视图,通过debug_toolbar调试和优化sql可能需要很长时间,因为每个页面加载都非常慢。


没有其他答案涵盖这个方法,所以:

我发现迄今为止最有用、最简单、最可靠的方法是询问您的数据库。例如,在Linux的Postgres上,你可能会这样做:

sudo su postgres
tail -f /var/log/postgresql/postgresql-8.4-main.log

每个数据库的过程略有不同。在数据库日志中,你不仅可以看到原始SQL,还可以看到django在系统上设置的任何连接或事务开销。


django扩展有一个带参数print-sql的命令shell_plus

./manage.py shell_plus --print-sql

在django-shell中,所有执行的查询都会被打印出来

Ex.:

User.objects.get(pk=1)
SELECT "auth_user"."id",
       "auth_user"."password",
       "auth_user"."last_login",
       "auth_user"."is_superuser",
       "auth_user"."username",
       "auth_user"."first_name",
       "auth_user"."last_name",
       "auth_user"."email",
       "auth_user"."is_staff",
       "auth_user"."is_active",
       "auth_user"."date_joined"
FROM "auth_user"
WHERE "auth_user"."id" = 1

Execution time: 0.002466s [Database: default]

<User: username>

下面基于https://code.djangoproject.com/ticket/17741:返回有效的SQL查询

def str_query(qs):
    """
    qs.query returns something that isn't valid SQL, this returns the actual
    valid SQL that's executed: https://code.djangoproject.com/ticket/17741
    """
    cursor = connections[qs.db].cursor()
    query, params = qs.query.sql_with_params()
    cursor.execute('EXPLAIN ' + query, params)
    res = str(cursor.db.ops.last_executed_query(cursor, query, params))
    assert res.startswith('EXPLAIN ')
    return res[len('EXPLAIN '):]

我已经做了一个你可以使用的小片段:

from django.conf import settings
from django.db import connection


def sql_echo(method, *args, **kwargs):
    settings.DEBUG = True
    result = method(*args, **kwargs)
    for query in connection.queries:
        print(query)
    return result


# HOW TO USE EXAMPLE:
# 
# result = sql_echo(my_method, 'whatever', show=True)

它以参数函数(包含sql查询)来检查和args, kwargs需要调用该函数。结果它返回函数返回的内容,并在控制台中打印SQL查询。


我为此开发了一个扩展,所以你可以很容易地在你的视图函数上放一个装饰器,看看有多少查询被执行了。

如何安装:

$ pip install django-print-sql

用作上下文管理器:

from django_print_sql import print_sql

# set `count_only` to `True` will print the number of executed SQL statements only
with print_sql(count_only=False):

  # write the code you want to analyze in here,
  # e.g. some complex foreign key lookup,
  # or analyzing a DRF serializer's performance

  for user in User.objects.all()[:10]:
      user.groups.first()

装饰:用作装饰:

from django_print_sql import print_sql_decorator


@print_sql_decorator(count_only=False)  # this works on class-based views as well
def get(request):
    # your view code here

Github: https://github.com/rabbit-aaron/django-print-sql


我相信这应该工作,如果你使用PostgreSQL:

from django.db import connections
from app_name import models
from django.utils import timezone

# Generate a queryset, use your favorite filter, QS objects, and whatnot.
qs=models.ThisDataModel.objects.filter(user='bob',date__lte=timezone.now())

# Get a cursor tied to the default database
cursor=connections['default'].cursor()

# Get the query SQL and parameters to be passed into psycopg2, then pass
# those into mogrify to get the query that would have been sent to the backend
# and print it out. Note F-strings require python 3.6 or later.
print(f'{cursor.mogrify(*qs.query.sql_with_params())}')

我把这个函数放在我项目中的一个应用程序的util文件中:

import logging
import re

from django.db import connection

logger = logging.getLogger(__name__)

def sql_logger():
    logger.debug('TOTAL QUERIES: ' + str(len(connection.queries)))
    logger.debug('TOTAL TIME: ' + str(sum([float(q['time']) for q in connection.queries])))

    logger.debug('INDIVIDUAL QUERIES:')
    for i, query in enumerate(connection.queries):
        sql = re.split(r'(SELECT|FROM|WHERE|GROUP BY|ORDER BY|INNER JOIN|LIMIT)', query['sql'])
        if not sql[0]: sql = sql[1:]
        sql = [(' ' if i % 2 else '') + x for i, x in enumerate(sql)]
        logger.debug('\n### {} ({} seconds)\n\n{};\n'.format(i, query['time'], '\n'.join(sql)))

然后,当需要时,我只是导入它,并从任何上下文(通常是视图)调用它是必要的,例如:

# ... other imports
from .utils import sql_logger

class IngredientListApiView(generics.ListAPIView):
    # ... class variables and such

    # Main function that gets called when view is accessed
    def list(self, request, *args, **kwargs):
        response = super(IngredientListApiView, self).list(request, *args, **kwargs)

        # Call our function
        sql_logger()

        return response

在模板之外这样做很好,因为如果你有API视图(通常是Django Rest框架),它也适用于模板。


使用django.db.connection.queries查看查询

from django.db import connection
print(connection.queries)

访问QuerySet对象上的原始SQL查询

 qs = MyModel.objects.all()
 print(qs.query)

对于Django 2.2:

当我使用。/manage.py shell时,大多数答案对我没有多大帮助。最后我找到了答案。希望这能对别人有所帮助。

查询所有查询信息。

from django.db import connection
connection.queries

查看单个查询的查询:

q=Query.objects.all()
q.query.__str__()

q。query只是为我显示对象。 使用__str__()(字符串表示)显示完整的查询。


在django中,如果你有这样的查询:

MyModel.objects.all()

do:

MyModel.objects.all().query.sql_with_params()

or:

str(MyModel.objects.all().query)

来获取SQL字符串


从django获取查询结果到数据库(使用正确的参数替换) 你可以使用这个函数:

from django.db import connection

def print_database_query_formatted(query):
    sql, params = query.sql_with_params()
    cursor = connection.cursor()
    cursor.execute('EXPLAIN ' + sql, params)
    db_query = cursor.db.ops.last_executed_query(cursor, sql, params).replace('EXPLAIN ', '')

    parts = '{}'.format(db_query).split('FROM')
    print(parts[0])
    if len(parts) > 1:
        parts = parts[1].split('WHERE')
        print('FROM{}'.format(parts[0]))
        if len(parts) > 1:
            parts = parts[1].split('ORDER BY')
            print('WHERE{}'.format(parts[0]))
            if len(parts) > 1:
                print('ORDER BY{}'.format(parts[1]))

# USAGE
users = User.objects.filter(email='admin@admin.com').order_by('-id')
print_database_query_formatted(users.query)

输出示例

SELECT "users_user"."password", "users_user"."last_login", "users_user"."is_superuser", "users_user"."deleted", "users_user"."id", "users_user"."phone", "users_user"."username", "users_user"."userlastname", "users_user"."email", "users_user"."is_staff", "users_user"."is_active", "users_user"."date_joined", "users_user"."latitude", "users_user"."longitude", "users_user"."point"::bytea, "users_user"."default_search_radius", "users_user"."notifications", "users_user"."admin_theme", "users_user"."address", "users_user"."is_notify_when_buildings_in_radius", "users_user"."active_campaign_id", "users_user"."is_unsubscribed", "users_user"."sf_contact_id", "users_user"."is_agree_terms_of_service", "users_user"."is_facebook_signup", "users_user"."type_signup" 
FROM "users_user" 
WHERE "users_user"."email" = 'admin@admin.com' 
ORDER BY "users_user"."id" DESC

它基于这个票证评论:https://code.djangoproject.com/ticket/17741#comment:4


Django SQL Sniffer是查看(和查看)从Django ORM的任何进程中执行的原始查询的另一种选择。我已经构建了它来满足我所拥有的一个特定用例,我在任何地方都没有看到过,即:

不需要更改目标进程正在执行的源代码(不需要在django设置中注册一个新的应用程序,到处导入装饰器等)。 不更改日志配置(例如,因为我对一个特定的进程感兴趣,而不是配置应用的整个进程群) 不需要重新启动目标进程(例如,因为它是一个重要的组件,重新启动可能会导致一些停机时间)

因此,Django SQL Sniffer可以特别使用,并附加到已经运行的进程。然后,该工具“嗅探”已执行的查询,并在执行时将它们打印到控制台。当工具停止时,将显示一个统计摘要,其中包含基于某些可能的度量(计数、最大持续时间和总组合持续时间)的异常值查询。

这是我附加到Python shell的示例的截图

你可以在github页面上查看现场演示和更多细节。


为CREATE / UPDATE / DELETE /命令生成SQL,这在Django中是即时的

from django.db.models import sql

def generate_update_sql(queryset, update_kwargs):
    """Converts queryset with update_kwargs
    like : queryset.update(**update_kwargs) to UPDATE SQL"""

    query = queryset.query.clone(sql.UpdateQuery)
    query.add_update_values(update_kwargs)
    compiler = query.get_compiler(queryset.db)
    sql, params = compiler.as_sql()
    return sql % params
from django.db.models import sql

def generate_delete_sql(queryset):
    """Converts select queryset to DELETE SQL """
    query = queryset.query.chain(sql.DeleteQuery)
    compiler = query.get_compiler(queryset.db)
    sql, params = compiler.as_sql()
    return sql % params
from django.db.models import sql

def generate_create_sql(model, model_data):
    """Converts queryset with create_kwargs
    like if was: queryset.create(**create_kwargs) to SQL CREATE"""
    
    not_saved_instance = model(**model_data)
    not_saved_instance._for_write = True

    query = sql.InsertQuery(model)

    fields = [f for f in model._meta.local_concrete_fields if not isinstance(f, AutoField)]
    query.insert_values(fields, [not_saved_instance], raw=False)

    compiler = query.get_compiler(model.objects.db)
    sql, params = compiler.as_sql()[0]
    return sql % params

测试和使用

    def test_generate_update_sql_with_F(self):
        qs = Event.objects.all()
        update_kwargs = dict(description=F('slug'))
        result = generate_update_sql(qs, update_kwargs)
        sql = "UPDATE `api_event` SET `description` = `api_event`.`slug`"
        self.assertEqual(sql, result)

    def test_generate_create_sql(self):
        result = generate_create_sql(Event, dict(slug='a', app='b', model='c', action='e'))
        sql = "INSERT INTO `api_event` (`slug`, `app`, `model`, `action`, `action_type`, `description`) VALUES (a, b, c, e, , )"
        self.assertEqual(sql, result)

这是一个很晚的回答,但其他的都是通过搜索得到的。

我想介绍一种日志记录方法,非常简单;在settings .py中添加django.db.backends logger

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
        },
    },
    'loggers': {
        'django.db.backends': {
            'handlers': ['console'],
            'level': 'DEBUG',
        },
    },
}

我还使用了一个环境变量来设置级别。 所以当我想看到SQL查询时,我只是设置了环境变量,调试日志显示了实际的查询。


如果您需要为一些自定义SQL重用查询,还有另一种非常有用的方法。我曾在一个分析应用程序中使用过这种方法,它远远超出了Django的ORM所能轻松完成的范围,所以我将ORM生成的SQL作为子查询包含进来。

from django.db import connection
from myapp.models import SomeModel

queryset = SomeModel.objects.filter(foo='bar')

sql_query, params = queryset.query.as_sql(None, connection)

这将为您提供带有占位符的SQL,以及可以使用的带有查询参数的元组。你可以直接把它传递给DB:

with connection.connection.cursor(cursor_factory=DictCursor) as cursor:
    cursor.execute(sql_query, params)
    data = cursor.fetchall()

from django.db import reset_queries, connection
class ShowSQL(object):
    def __enter__(self):
        reset_queries()
        return self

    def __exit__(self, *args):
        for sql in connection.queries:
            print('Time: %s\nSQL: %s' % (sql['time'], sql['sql']))

然后你可以使用:

with ShowSQL() as t:
    some queries <select>|<annotate>|<update> or other 

它打印

时间:% s SQL: % s


你可以使用连接。在Django中运行原始SQL查询,如下所示:

# "store/views.py"

from django.db import transaction
from .models import Person
from django.db import connection
from django.http import HttpResponse

@transaction.atomic
def test(request):
    Person.objects.create(name="John") # INSERT
    
    qs = Person.objects.select_for_update().get(name="John") # SELECT FOR UPDATE
    qs.name = "Tom"
    qs.save() # UPDATE
    qs.delete() # DELETE
                 
    for query in connection.queries: # Here
        print(query)

    return HttpResponse("Test")

然后,原始查询打印在控制台,如下所示:

{'sql': 'INSERT INTO "store_person" ("name") VALUES (\'John\') RETURNING "store_person"."id"', 'time': '0.000'}
{'sql': 'SELECT "store_person"."id", "store_person"."name" FROM "store_person" WHERE "store_person"."name" = \'John\' LIMIT 21 FOR UPDATE', 'time': '0.000'}      
{'sql': 'UPDATE "store_person" SET "name" = \'Tom\' WHERE "store_person"."id" = 179', 'time': '0.000'}
{'sql': 'DELETE FROM "store_person" WHERE "store_person"."id" IN (179)', 'time': '0.000'}
[24/Dec/2022 06:29:32] "GET /store/test/ HTTP/1.1" 200 9

然后,把reset_queries()放在Person.objects.select_for_update()之后,如果你想只得到UPDATE和DELETE查询,而没有INSERT和SELECT FOR UPDATE查询,如下所示:

# "store/views.py"

from django.db import transaction
from .models import Person
from django.db import reset_queries
from django.db import connection
from django.http import HttpResponse

@transaction.atomic
def test(request):
    Person.objects.create(name="John") # INSERT
    
    qs = Person.objects.select_for_update().get(name="John") # SELECT FOR UPDATE
    reset_queries() # Here
    qs.name = "Tom"
    qs.save() # UPDATE
    qs.delete() # DELETE
                 
    for query in connection.queries: # Here
        print(query)

    return HttpResponse("Test")

然后,只打印UPDATE和DELETE查询,不打印INSERT和SELECT FOR UPDATE查询,如下所示:

{'sql': 'UPDATE "store_person" SET "name" = \'Tom\' WHERE "store_person"."id" = 190', 'time': '0.000'}
{'sql': 'DELETE FROM "store_person" WHERE "store_person"."id" IN (190)', 'time': '0.000'}
[24/Dec/2022 07:00:01] "GET /store/test/ HTTP/1.1" 200 9

这里已经有几个很好的答案了。

还有一种方法。

在测试中,这样做:

with self.assertNumQueries(3):
    response = self.client.post(reverse('payments:pay_list'))
    # or whatever

如果查询数量错误,则测试失败,并在控制台中打印所有原始SQL查询。

此外,此类测试有助于控制SQL查询的数量不会随着代码的更改而增加,并且数据库负载不会过多。