我试图比较两个表,SQL Server,以验证一些数据。我想从两个表中返回数据在其中一个或另一个中的所有行。本质上,我想展示所有的差异。我需要检查这样做的三段数据,FirstName, LastName和产品。

我对SQL相当陌生,似乎我找到的很多解决方案都过于复杂了。我不需要担心null。

我是这样开始的:

SELECT DISTINCT [First Name], [Last Name], [Product Name] FROM [Temp Test Data]
WHERE ([First Name] NOT IN (SELECT [First Name] 
FROM [Real Data]))

不过我很难继续下去。

谢谢!

编辑:

基于@treaschf的回答,我一直在尝试使用以下查询的变体:

SELECT td.[First Name], td.[Last Name], td.[Product Name]
FROM [Temp Test Data] td FULL OUTER JOIN [Data] AS d 
ON td.[First Name] = d.[First Name] AND td.[Last Name] = d.[Last Name] 
WHERE (d.[First Name] = NULL) AND (d.[Last Name] = NULL)

但是我总是得到0结果,当我知道td中至少有1行不是在d中。

编辑:

好吧,我想我明白了。至少在我几分钟的测试中,它似乎工作得足够好。

SELECT [First Name], [Last Name]
FROM [Temp Test Data] AS td
WHERE (NOT EXISTS
        (SELECT [First Name], [Last Name]
         FROM [Data] AS d
         WHERE ([First Name] = td.[First Name]) OR ([Last Name] = td.[Last Name])))

这基本上会告诉我测试数据中哪些是真实数据中没有的。这完全可以满足我的需求。


当前回答

把迪夫斯的凯迪拉克作为一个SP。看里面的基本模板,这是基于@erikkallen的回答。它支持

重复行感知(这里的大多数其他答案都没有) 根据参数对结果排序 限制为特定列 忽略列(例如ModifiedUtc) 跨数据库表名 临时表(用作区分视图的变通方法)

用法:

exec Common.usp_DiffTableRows '#t1', '#t2';

exec Common.usp_DiffTableRows 
    @pTable0          = 'ydb.ysh.table1',
    @pTable1          = 'xdb.xsh.table2',
    @pOrderByCsvOpt   = null,  -- Order the results
    @pOnlyCsvOpt      = null,  -- Only compare these columns
    @pIgnoreCsvOpt    = null;  -- Ignore these columns (ignored if @pOnlyCsvOpt is specified)

代码:

alter proc [Common].[usp_DiffTableRows]    
    @pTable0          varchar(300),
    @pTable1          varchar(300),
    @pOrderByCsvOpt   nvarchar(1000) = null,  -- Order the Results
    @pOnlyCsvOpt      nvarchar(4000) = null,  -- Only compare these columns
    @pIgnoreCsvOpt    nvarchar(4000) = null,  -- Ignore these columns (ignored if @pOnlyCsvOpt is specified)
    @pDebug           bit = 0
as
/*---------------------------------------------------------------------------------------------------------------------
    Purpose:  Compare rows between two tables.

      Usage:  exec Common.usp_DiffTableRows '#a', '#b';

    Modified    By          Description
    ----------  ----------  -------------------------------------------------------------------------------------------
    2015.10.06  crokusek    Initial Version
    2019.03.13  crokusek    Added @pOrderByCsvOpt
    2019.06.26  crokusek    Support for @pIgnoreCsvOpt, @pOnlyCsvOpt.    
    2019.09.04  crokusek    Minor debugging improvement
    2020.03.12  crokusek    Detect duplicate rows in either source table
  ---------------------------------------------------------------------------------------------------------------------*/
begin try

    if (substring(@pTable0, 1, 1) = '#')
        set @pTable0 = 'tempdb..' + @pTable0; -- object_id test below needs full names for temp tables

    if (substring(@pTable1, 1, 1) = '#')
        set @pTable1 = 'tempdb..' + @pTable1; -- object_id test below needs full names for temp tables

    if (object_id(@pTable0) is null)
        raiserror('Table name is not recognized: ''%s''', 16, 1, @pTable0);

    if (object_id(@pTable1) is null)
        raiserror('Table name is not recognized: ''%s''', 16, 1, @pTable1);

    create table #ColumnGathering
    (
        Name nvarchar(300) not null,
        Sequence int not null,
        TableArg tinyint not null
    );

    declare
        @usp          varchar(100) = object_name(@@procid),    
        @sql          nvarchar(4000),
        @sqlTemplate  nvarchar(4000) = 
        '  
            use $database$;

            insert into #ColumnGathering
            select Name, column_id as Sequence, $TableArg$ as TableArg
              from sys.columns c
             where object_id = object_id(''$table$'', ''U'')
        ';          

    set @sql = replace(replace(replace(@sqlTemplate,
        '$TableArg$', 0),
        '$database$', (select DatabaseName from Common.ufn_SplitDbIdentifier(@pTable0))),
        '$table$', @pTable0);

    if (@pDebug = 1)
        print 'Sql #CG 0: ' + @sql;

    exec sp_executesql @sql;

    set @sql = replace(replace(replace(@sqlTemplate,
        '$TableArg$', 1),
        '$database$', (select DatabaseName from Common.ufn_SplitDbIdentifier(@pTable1))),
        '$table$', @pTable1);

    if (@pDebug = 1)
        print 'Sql #CG 1: ' + @sql;

    exec sp_executesql @sql;

    if (@pDebug = 1)
        select * from #ColumnGathering;

    select Name, 
           min(Sequence) as Sequence, 
           convert(bit, iif(min(TableArg) = 0, 1, 0)) as InTable0,
           convert(bit, iif(max(TableArg) = 1, 1, 0)) as InTable1
      into #Columns
      from #ColumnGathering
     group by Name
    having (     @pOnlyCsvOpt is not null 
             and Name in (select Value from Common.ufn_UsvToNVarcharKeyTable(@pOnlyCsvOpt, default)))
        or 
           (     @pOnlyCsvOpt is null
             and @pIgnoreCsvOpt is not null 
             and Name not in (select Value from Common.ufn_UsvToNVarcharKeyTable(@pIgnoreCsvOpt, default)))
        or 
           (     @pOnlyCsvOpt is null
             and @pIgnoreCsvOpt is null)

    if (exists (select 1 from #Columns where InTable0 = 0 or InTable1 = 0))
    begin
        select 1; -- without this the debugging info doesn't stream sometimes
        select * from #Columns order by Sequence;        
        waitfor delay '00:00:02';  -- give results chance to stream before raising exception
        raiserror('Columns are not equal between tables, consider using args @pIgnoreCsvOpt, @pOnlyCsvOpt.  See Result Sets for details.', 16, 1);    
    end

    if (@pDebug = 1)
        select * from #Columns order by Sequence;

    declare 
        @columns nvarchar(4000) = --iif(@pOnlyCsvOpt is null and @pIgnoreCsvOpt is null,
           -- '*',     
            (
              select substring((select ',' + ac.name
                from #Columns ac
               order by Sequence
                 for xml path('')),2,200000) as csv
            );

    if (@pDebug = 1)
    begin
        print 'Columns: ' + @columns;
        waitfor delay '00:00:02';  -- give results chance to stream before possibly raising exception
    end

    -- Based on https://stackoverflow.com/a/2077929/538763
    --     - Added sensing for duplicate rows
    --     - Added reporting of source table location
    --
    set @sqlTemplate = '
            with 
               a as (select ~, Row_Number() over (partition by ~ order by (select null)) -1 as Duplicates from $a$), 
               b as (select ~, Row_Number() over (partition by ~ order by (select null)) -1 as Duplicates from $b$)
            select 0 as SourceTable, ~
              from 
                 (
                   select * from a
                   except
                   select * from b
                 )  anb
              union all
             select 1 as SourceTable, ~
               from 
                 (
                   select * from b
                   except
                   select * from a
                 )  bna
             order by $orderBy$
        ';    

     set @sql = replace(replace(replace(replace(@sqlTemplate, 
            '$a$', @pTable0), 
            '$b$', @pTable1),
            '~', @columns),
            '$orderBy$', coalesce(@pOrderByCsvOpt, @columns + ', SourceTable')
        );

     if (@pDebug = 1)
        print 'Sql: ' + @sql;

     exec sp_executesql @sql;

end try
begin catch
    declare        
        @CatchingUsp  varchar(100) = object_name(@@procid);    

    if (xact_state() = -1)
        rollback;    

    -- Disabled for S.O. post

    --exec Common.usp_Log
        --@pMethod = @CatchingUsp;

    --exec Common.usp_RethrowError        
        --@pCatchingMethod = @CatchingUsp;

    throw;
end catch
go

create function Common.Trim
(
    @pOriginalString nvarchar(max), 
    @pCharsToTrim nvarchar(50) = null -- specify null or 'default' for whitespae 
)  
returns table
with schemabinding
as 
/*--------------------------------------------------------------------------------------------------
    Purpose:   Trim the specified characters from a string.

    Modified    By              Description
    ----------  --------------  --------------------------------------------------------------------
    2012.09.25  S.Rutszy/crok   Modified from https://dba.stackexchange.com/a/133044/9415    
  --------------------------------------------------------------------------------------------------*/ 
return
with cte AS
(
  select patindex(N'%[^' + EffCharsToTrim + N']%', @pOriginalString) AS [FirstChar],
         patindex(N'%[^' + EffCharsToTrim + N']%', reverse(@pOriginalString)) AS [LastChar],
         len(@pOriginalString + N'~') - 1 AS [ActualLength]
   from
   (
         select EffCharsToTrim = coalesce(@pCharsToTrim, nchar(0x09) + nchar(0x20) + nchar(0x0d) + nchar(0x0a))
   ) c
)
select substring(@pOriginalString, [FirstChar],
                 ((cte.[ActualLength] - [LastChar]) - [FirstChar] + 2)
       ) AS [TrimmedString]
       --
       --cte.[ActualLength],
       --[FirstChar],
       --((cte.[ActualLength] - [LastChar]) + 1) AS [LastChar]              
from cte;
go

create function [Common].[ufn_UsvToNVarcharKeyTable] (
    @pCsvList     nvarchar(MAX),
    @pSeparator   nvarchar(1) = ','       -- can pass keyword 'default' when calling using ()'s
    )    
    --
    -- SQL Server 2012 distinguishes nvarchar keys up to maximum of 450 in length (900 bytes)
    -- 
    returns @tbl table (Value nvarchar(450) not null primary key(Value)) as
/*-------------------------------------------------------------------------------------------------
    Purpose:  Converts a comma separated list of strings into a sql NVarchar table.  From

              http://www.programmingado.net/a-398/SQL-Server-parsing-CSV-into-table.aspx     

              This may be called from RunSelectQuery:

                  GRANT SELECT ON Common.ufn_UsvToNVarcharTable TO MachCloudDynamicSql;

    Modified    By              Description
    ----------  --------------  -------------------------------------------------------------------
    2011.07.13  internet        Initial version
    2011.11.22  crokusek        Support nvarchar strings and a custom separator.
    2017.12.06  crokusek        Trim leading and trailing whitespace from each element.
    2019.01.26  crokusek        Remove newlines
  -------------------------------------------------------------------------------------------------*/     
begin
    declare 
        @pos      int,
        @textpos  int,
        @chunklen smallint,
        @str      nvarchar(4000),
        @tmpstr   nvarchar(4000),
        @leftover nvarchar(4000),
        @csvList nvarchar(max) = iif(@pSeparator not in (char(13), char(10), char(13) + char(10)),
            replace(replace(@pCsvList, char(13), ''), char(10), ''),
            @pCsvList); -- remove newlines

    set @textpos = 1
    set @leftover = ''  
    while @textpos <= len(@csvList)
    begin
        set @chunklen = 4000 - len(@leftover)
        set @tmpstr = ltrim(@leftover + substring(@csvList, @textpos, @chunklen))
        set @textpos = @textpos + @chunklen

        set @pos = charindex(@pSeparator, @tmpstr)
        while @pos > 0
        begin
            set @str = substring(@tmpstr, 1, @pos - 1)
            set @str = (select TrimmedString from Common.Trim(@str, default));
            insert @tbl (value) values(@str);
            set @tmpstr = ltrim(substring(@tmpstr, @pos + 1, len(@tmpstr)))
            set @pos = charindex(@pSeparator, @tmpstr)
        end

        set @leftover = @tmpstr
    end

    -- Handle @leftover

    set @str = (select TrimmedString from Common.Trim(@leftover, default));

    if @str <> ''
       insert @tbl (value) values(@str);

    return
end
GO

create function Common.ufn_SplitDbIdentifier(@pIdentifier nvarchar(300))
returns @table table 
(    
    InstanceName          nvarchar(300) not null,
    DatabaseName          nvarchar(300) not null,
    SchemaName            nvarchar(300),
    BaseName              nvarchar(300) not null,
    FullTempDbBaseName    nvarchar(300),            -- non-null for tempdb (e.g. #Abc____...)
    InstanceWasSpecified  bit not null,
    DatabaseWasSpecified  bit not null,
    SchemaWasSpecified    bit not null,
    IsCurrentInstance     bit not null,
    IsCurrentDatabase     bit not null,
    IsTempDb              bit not null,
    OrgIdentifier         nvarchar(300) not null
) as
/*-----------------------------------------------------------------------------------------------------------
    Purpose:  Split a Sql Server Identifier into its parts, providing appropriate default values and
              handling temp table (tempdb) references.

    Example:  select * from Common.ufn_SplitDbIdentifier('t')
              union all
              select * from Common.ufn_SplitDbIdentifier('s.t')
              union all
              select * from Common.ufn_SplitDbIdentifier('d.s.t')
              union all
              select * from Common.ufn_SplitDbIdentifier('i.d.s.t')
              union all
              select * from Common.ufn_SplitDbIdentifier('#d')
              union all
              select * from Common.ufn_SplitDbIdentifier('tempdb..#d'); 

              -- Empty
              select * from Common.ufn_SplitDbIdentifier('illegal name'); 

    Modified    By              Description
    ----------  --------------  -----------------------------------------------------------------------------
    2013.09.27  crokusek        Initial version.  
  -----------------------------------------------------------------------------------------------------------*/
begin
    declare 
        @name nvarchar(300) = ltrim(rtrim(@pIdentifier));

    -- Return an empty table as a "throw"
    --
    --Removed for SO post
    --if (Common.ufn_IsSpacelessLiteralIdentifier(@name) = 0)
      --  return;

    -- Find dots starting from the right by reversing first.

    declare 
        @revName nvarchar(300) = reverse(@name);

    declare
        @firstDot int = charindex('.', @revName);

    declare
        @secondDot  int = iif(@firstDot = 0,  0, charindex('.', @revName, @firstDot + 1));

    declare
        @thirdDot   int = iif(@secondDot = 0, 0, charindex('.', @revName, @secondDot + 1));

    declare
        @fourthDot  int = iif(@thirdDot = 0, 0, charindex('.', @revName, @thirdDot + 1));

    --select @firstDot, @secondDot, @thirdDot, @fourthDot, len(@name);

    -- Undo the reverse() (first dot is first from the right).
    --
    set @firstDot = iif(@firstDot = 0, 0, len(@name) - @firstDot + 1);
    set @secondDot = iif(@secondDot = 0, 0, len(@name) - @secondDot + 1);
    set @thirdDot = iif(@thirdDot = 0, 0, len(@name) - @thirdDot + 1);
    set @fourthDot = iif(@fourthDot = 0, 0, len(@name) - @fourthDot + 1);

    --select @firstDot, @secondDot, @thirdDot, @fourthDot, len(@name);

    declare
        @baseName   nvarchar(300)  = substring(@name, @firstDot + 1, len(@name) - @firstdot);

    declare
        @schemaName nvarchar(300) = iif(@firstDot - @secondDot - 1 <= 0, 
                                        null,
                                        substring(@name, @secondDot + 1, @firstDot - @secondDot - 1));
    declare
        @dbName     nvarchar(300) = iif(@secondDot - @thirdDot - 1 <= 0, 
                                        null,
                                        substring(@name, @thirdDot + 1, @secondDot - @thirdDot - 1));
    declare
        @instName   nvarchar(300) = iif(@thirdDot - @fourthDot - 1 <= 0, 
                                        null, 
                                        substring(@name, @fourthDot + 1, @thirdDot - @fourthDot - 1));

    with input as (
        select
           coalesce(@instName, '[' + @@servername + ']') as InstanceName,
           coalesce(@dbName,     iif(left(@baseName, 1) = '#', 'tempdb', db_name())) as DatabaseName,
           coalesce(@schemaName, iif(left(@baseName, 1) = '#', 'dbo', schema_name())) as SchemaName,
           @baseName as BaseName,
           iif(left(@baseName, 1) = '#',
               (
                  select [name] from tempdb.sys.objects
                  where object_id = object_id('tempdb..' + @baseName)
               ), 
               null) as FullTempDbBaseName,                
           iif(@instName is null, 0, 1) InstanceWasSpecified,       
           iif(@dbName is null, 0, 1) DatabaseWasSpecified,
           iif(@schemaName is null, 0, 1) SchemaWasSpecified    
     )
     insert into @table           
     select i.InstanceName, i.DatabaseName, i.SchemaName, i.BaseName, i.FullTempDbBaseName,
            i.InstanceWasSpecified, i.DatabaseWasSpecified, i.SchemaWasSpecified,
            iif(i.InstanceName = '[' + @@servername + ']', 1, 0) as IsCurrentInstance,
            iif(i.DatabaseName = db_name(), 1, 0) as IsCurrentDatabase,
            iif(left(@baseName, 1) = '#', 1, 0) as IsTempDb,
            @name as OrgIdentifier
       from input i;

    return;
end
GO

其他回答

EXCEPT和NOT EXISTS是快速发现数据集之间差异的好方法,但我经常想知道哪些列是不同的,以及它们是如何不同的。

在这些情况下,我发现有用的方法是使用UNPIVOT将列转换为可以逐行比较的键值对。

不过,为了有用,需要有一种方法来匹配你想要比较的记录,在你的例子中,比如社会安全号码或“个人id”:

CREATE TABLE [RealData] 
(
  [PersonId] INT,
  [FirstName] NVARCHAR(100), 
  [LastName] NVARCHAR(100),
  [ProductName] NVARCHAR(100)
)

CREATE TABLE [TempTestData] 
(
  [PersonId] INT,
  [FirstName] NVARCHAR(100), 
  [LastName] NVARCHAR(100),
  [ProductName] NVARCHAR(100)
)

INSERT INTO [RealData] ([PersonId], [FirstName], [LastName], [ProductName])
VALUES 
(1, 'Al', 'Bundy', 'Ladies Size 12'),
(2, 'Peggy', 'Bundy', 'TV Guide')

INSERT INTO [TempTestData] ([PersonId], [FirstName], [LastName], [ProductName])
VALUES 
(1, 'Al', 'Bundy', 'Ladies Size 13'),
(2, 'Peggy', 'Bundy', 'TV Guide')

UNPIVOT加上一些cte

;WITH RealDataCte AS (
  SELECT 
   'Real Data' AS [DataSource],
   unpivotedRealData.* 
  FROM
    (SELECT 
      CAST([PersonId] AS NVARCHAR(100)) AS [PersonId],
      [FirstName],
      [LastName], 
      [ProductName]
    FROM [RealData]) AS realData
  UNPIVOT
  (ColumnValue FOR ColumnName IN ([FirstName], [LastName],  [ProductName])) AS unpivotedRealData
),
TempTestDataCte AS (
  SELECT 
    'Temp Test Data' AS [DataSource],
     unpivotedDempTestData.* 
  FROM
    (SELECT 
      CAST([PersonId] AS NVARCHAR(100)) AS [PersonId],
      [FirstName],
      [LastName], 
      [ProductName]
    FROM [TempTestData]) AS tempTestData
  UNPIVOT
  (ColumnValue FOR ColumnName IN ([FirstName], [LastName],  [ProductName])) AS unpivotedDempTestData
)
SELECT
  RealDataCte.[DataSource],
  RealDataCte.[ColumnName],
  RealDataCte.[ColumnValue],
  TempTestDataCte.[DataSource],
  TempTestDataCte.[ColumnName],
  TempTestDataCte.[ColumnValue],
  CASE WHEN RealDataCte.[ColumnValue] <> TempTestDataCte.[ColumnValue] THEN 'YES' ELSE 'NO' END AS ColumnsDiffer
FROM RealDataCte
INNER JOIN
TempTestDataCte 
ON RealDataCte.[ColumnName] = TempTestDataCte.[ColumnName] 
AND RealDataCte.[PersonId] = TempTestDataCte.[PersonId] 
WHERE 
RealDataCte.[ColumnValue] <> TempTestDataCte.[ColumnValue] 

结果-两种产品是不同的:

如果要在大量的行中比较很多很多列,这是非常好的。

不过,它可能需要一段时间来设置,并且您确实需要将每个列值转换为相同的类型,这可能需要一段时间(注意PersonId的CAST)。

如果数据集非常大,您可能还想使用临时表而不是cte。

SQL小提琴示例

你可以用except,比如这样:

-- DB1..Tb1 have values than DB2..Tb1 not have
Select Col1,Col2,Col3 From DB1..Tb1
except
Select Col1,Col2,Col3 From DB2..Tb1
-- Now we change order
-- DB2..Tb1 have values than DB1..Tb1 not have
Select Col1,Col2,Col3 From DB2..Tb1
except
Select Col1,Col2,Col3 From DB1..Tb1

如果你有表A和表B,都有列C,下面是在表A中出现而在表B中没有的记录:

SELECT A.*
FROM A
    LEFT JOIN B ON (A.C = B.C)
WHERE B.C IS NULL

要用一个查询获得所有的差异,必须使用一个完整的连接,就像这样:

SELECT A.*, B.*
FROM A
    FULL JOIN B ON (A.C = B.C)
WHERE A.C IS NULL OR B.C IS NULL

在这种情况下,您需要知道的是,当一个记录可以在a中找到,但在B中找不到,那么来自B的列将为NULL,类似地,对于那些存在于B而不在a中的记录,来自a的列将为NULL。

如果你想要得到哪些列值是不同的,你可以使用实体-属性-值模型:

declare @Data1 xml, @Data2 xml

select @Data1 = 
(
    select * 
    from (select * from Test1 except select * from Test2) as a
    for xml raw('Data')
)

select @Data2 = 
(
    select * 
    from (select * from Test2 except select * from Test1) as a
    for xml raw('Data')
)

;with CTE1 as (
    select
        T.C.value('../@ID', 'bigint') as ID,
        T.C.value('local-name(.)', 'nvarchar(128)') as Name,
        T.C.value('.', 'nvarchar(max)') as Value
    from @Data1.nodes('Data/@*') as T(C)    
), CTE2 as (
    select
        T.C.value('../@ID', 'bigint') as ID,
        T.C.value('local-name(.)', 'nvarchar(128)') as Name,
        T.C.value('.', 'nvarchar(max)') as Value
    from @Data2.nodes('Data/@*') as T(C)     
)
select
    isnull(C1.ID, C2.ID) as ID, isnull(C1.Name, C2.Name) as Name, C1.Value as Value1, C2.Value as Value2
from CTE1 as C1
    full outer join CTE2 as C2 on C2.ID = C1.ID and C2.Name = C1.Name
where
not
(
    C1.Value is null and C2.Value is null or
    C1.Value is not null and C2.Value is not null and C1.Value = C2.Value
)

SQL小提琴示例

@erikkallen回答的简单变化,显示行在哪个表中:

(   SELECT 'table1' as source, * FROM table1
    EXCEPT
    SELECT * FROM table2)  
UNION ALL
(   SELECT 'table2' as source, * FROM table2
    EXCEPT
    SELECT * FROM table1) 

如果你得到一个错误

所有使用UNION、INTERSECT或EXCEPT操作符组合的查询在其目标列表中必须有相等数量的表达式。

然后可能会有帮助

(   SELECT 'table1' as source, * FROM table1
    EXCEPT
    SELECT 'table1' as source, * FROM table2)  
UNION ALL
(   SELECT 'table2' as source, * FROM table2
    EXCEPT
    SELECT 'table2' as source, * FROM table1)