把迪夫斯的凯迪拉克作为一个SP。看里面的基本模板,这是基于@erikkallen的回答。它支持
重复行感知(这里的大多数其他答案都没有)
根据参数对结果排序
限制为特定列
忽略列(例如ModifiedUtc)
跨数据库表名
临时表(用作区分视图的变通方法)
用法:
exec Common.usp_DiffTableRows '#t1', '#t2';
exec Common.usp_DiffTableRows
@pTable0 = 'ydb.ysh.table1',
@pTable1 = 'xdb.xsh.table2',
@pOrderByCsvOpt = null, -- Order the results
@pOnlyCsvOpt = null, -- Only compare these columns
@pIgnoreCsvOpt = null; -- Ignore these columns (ignored if @pOnlyCsvOpt is specified)
代码:
alter proc [Common].[usp_DiffTableRows]
@pTable0 varchar(300),
@pTable1 varchar(300),
@pOrderByCsvOpt nvarchar(1000) = null, -- Order the Results
@pOnlyCsvOpt nvarchar(4000) = null, -- Only compare these columns
@pIgnoreCsvOpt nvarchar(4000) = null, -- Ignore these columns (ignored if @pOnlyCsvOpt is specified)
@pDebug bit = 0
as
/*---------------------------------------------------------------------------------------------------------------------
Purpose: Compare rows between two tables.
Usage: exec Common.usp_DiffTableRows '#a', '#b';
Modified By Description
---------- ---------- -------------------------------------------------------------------------------------------
2015.10.06 crokusek Initial Version
2019.03.13 crokusek Added @pOrderByCsvOpt
2019.06.26 crokusek Support for @pIgnoreCsvOpt, @pOnlyCsvOpt.
2019.09.04 crokusek Minor debugging improvement
2020.03.12 crokusek Detect duplicate rows in either source table
---------------------------------------------------------------------------------------------------------------------*/
begin try
if (substring(@pTable0, 1, 1) = '#')
set @pTable0 = 'tempdb..' + @pTable0; -- object_id test below needs full names for temp tables
if (substring(@pTable1, 1, 1) = '#')
set @pTable1 = 'tempdb..' + @pTable1; -- object_id test below needs full names for temp tables
if (object_id(@pTable0) is null)
raiserror('Table name is not recognized: ''%s''', 16, 1, @pTable0);
if (object_id(@pTable1) is null)
raiserror('Table name is not recognized: ''%s''', 16, 1, @pTable1);
create table #ColumnGathering
(
Name nvarchar(300) not null,
Sequence int not null,
TableArg tinyint not null
);
declare
@usp varchar(100) = object_name(@@procid),
@sql nvarchar(4000),
@sqlTemplate nvarchar(4000) =
'
use $database$;
insert into #ColumnGathering
select Name, column_id as Sequence, $TableArg$ as TableArg
from sys.columns c
where object_id = object_id(''$table$'', ''U'')
';
set @sql = replace(replace(replace(@sqlTemplate,
'$TableArg$', 0),
'$database$', (select DatabaseName from Common.ufn_SplitDbIdentifier(@pTable0))),
'$table$', @pTable0);
if (@pDebug = 1)
print 'Sql #CG 0: ' + @sql;
exec sp_executesql @sql;
set @sql = replace(replace(replace(@sqlTemplate,
'$TableArg$', 1),
'$database$', (select DatabaseName from Common.ufn_SplitDbIdentifier(@pTable1))),
'$table$', @pTable1);
if (@pDebug = 1)
print 'Sql #CG 1: ' + @sql;
exec sp_executesql @sql;
if (@pDebug = 1)
select * from #ColumnGathering;
select Name,
min(Sequence) as Sequence,
convert(bit, iif(min(TableArg) = 0, 1, 0)) as InTable0,
convert(bit, iif(max(TableArg) = 1, 1, 0)) as InTable1
into #Columns
from #ColumnGathering
group by Name
having ( @pOnlyCsvOpt is not null
and Name in (select Value from Common.ufn_UsvToNVarcharKeyTable(@pOnlyCsvOpt, default)))
or
( @pOnlyCsvOpt is null
and @pIgnoreCsvOpt is not null
and Name not in (select Value from Common.ufn_UsvToNVarcharKeyTable(@pIgnoreCsvOpt, default)))
or
( @pOnlyCsvOpt is null
and @pIgnoreCsvOpt is null)
if (exists (select 1 from #Columns where InTable0 = 0 or InTable1 = 0))
begin
select 1; -- without this the debugging info doesn't stream sometimes
select * from #Columns order by Sequence;
waitfor delay '00:00:02'; -- give results chance to stream before raising exception
raiserror('Columns are not equal between tables, consider using args @pIgnoreCsvOpt, @pOnlyCsvOpt. See Result Sets for details.', 16, 1);
end
if (@pDebug = 1)
select * from #Columns order by Sequence;
declare
@columns nvarchar(4000) = --iif(@pOnlyCsvOpt is null and @pIgnoreCsvOpt is null,
-- '*',
(
select substring((select ',' + ac.name
from #Columns ac
order by Sequence
for xml path('')),2,200000) as csv
);
if (@pDebug = 1)
begin
print 'Columns: ' + @columns;
waitfor delay '00:00:02'; -- give results chance to stream before possibly raising exception
end
-- Based on https://stackoverflow.com/a/2077929/538763
-- - Added sensing for duplicate rows
-- - Added reporting of source table location
--
set @sqlTemplate = '
with
a as (select ~, Row_Number() over (partition by ~ order by (select null)) -1 as Duplicates from $a$),
b as (select ~, Row_Number() over (partition by ~ order by (select null)) -1 as Duplicates from $b$)
select 0 as SourceTable, ~
from
(
select * from a
except
select * from b
) anb
union all
select 1 as SourceTable, ~
from
(
select * from b
except
select * from a
) bna
order by $orderBy$
';
set @sql = replace(replace(replace(replace(@sqlTemplate,
'$a$', @pTable0),
'$b$', @pTable1),
'~', @columns),
'$orderBy$', coalesce(@pOrderByCsvOpt, @columns + ', SourceTable')
);
if (@pDebug = 1)
print 'Sql: ' + @sql;
exec sp_executesql @sql;
end try
begin catch
declare
@CatchingUsp varchar(100) = object_name(@@procid);
if (xact_state() = -1)
rollback;
-- Disabled for S.O. post
--exec Common.usp_Log
--@pMethod = @CatchingUsp;
--exec Common.usp_RethrowError
--@pCatchingMethod = @CatchingUsp;
throw;
end catch
go
create function Common.Trim
(
@pOriginalString nvarchar(max),
@pCharsToTrim nvarchar(50) = null -- specify null or 'default' for whitespae
)
returns table
with schemabinding
as
/*--------------------------------------------------------------------------------------------------
Purpose: Trim the specified characters from a string.
Modified By Description
---------- -------------- --------------------------------------------------------------------
2012.09.25 S.Rutszy/crok Modified from https://dba.stackexchange.com/a/133044/9415
--------------------------------------------------------------------------------------------------*/
return
with cte AS
(
select patindex(N'%[^' + EffCharsToTrim + N']%', @pOriginalString) AS [FirstChar],
patindex(N'%[^' + EffCharsToTrim + N']%', reverse(@pOriginalString)) AS [LastChar],
len(@pOriginalString + N'~') - 1 AS [ActualLength]
from
(
select EffCharsToTrim = coalesce(@pCharsToTrim, nchar(0x09) + nchar(0x20) + nchar(0x0d) + nchar(0x0a))
) c
)
select substring(@pOriginalString, [FirstChar],
((cte.[ActualLength] - [LastChar]) - [FirstChar] + 2)
) AS [TrimmedString]
--
--cte.[ActualLength],
--[FirstChar],
--((cte.[ActualLength] - [LastChar]) + 1) AS [LastChar]
from cte;
go
create function [Common].[ufn_UsvToNVarcharKeyTable] (
@pCsvList nvarchar(MAX),
@pSeparator nvarchar(1) = ',' -- can pass keyword 'default' when calling using ()'s
)
--
-- SQL Server 2012 distinguishes nvarchar keys up to maximum of 450 in length (900 bytes)
--
returns @tbl table (Value nvarchar(450) not null primary key(Value)) as
/*-------------------------------------------------------------------------------------------------
Purpose: Converts a comma separated list of strings into a sql NVarchar table. From
http://www.programmingado.net/a-398/SQL-Server-parsing-CSV-into-table.aspx
This may be called from RunSelectQuery:
GRANT SELECT ON Common.ufn_UsvToNVarcharTable TO MachCloudDynamicSql;
Modified By Description
---------- -------------- -------------------------------------------------------------------
2011.07.13 internet Initial version
2011.11.22 crokusek Support nvarchar strings and a custom separator.
2017.12.06 crokusek Trim leading and trailing whitespace from each element.
2019.01.26 crokusek Remove newlines
-------------------------------------------------------------------------------------------------*/
begin
declare
@pos int,
@textpos int,
@chunklen smallint,
@str nvarchar(4000),
@tmpstr nvarchar(4000),
@leftover nvarchar(4000),
@csvList nvarchar(max) = iif(@pSeparator not in (char(13), char(10), char(13) + char(10)),
replace(replace(@pCsvList, char(13), ''), char(10), ''),
@pCsvList); -- remove newlines
set @textpos = 1
set @leftover = ''
while @textpos <= len(@csvList)
begin
set @chunklen = 4000 - len(@leftover)
set @tmpstr = ltrim(@leftover + substring(@csvList, @textpos, @chunklen))
set @textpos = @textpos + @chunklen
set @pos = charindex(@pSeparator, @tmpstr)
while @pos > 0
begin
set @str = substring(@tmpstr, 1, @pos - 1)
set @str = (select TrimmedString from Common.Trim(@str, default));
insert @tbl (value) values(@str);
set @tmpstr = ltrim(substring(@tmpstr, @pos + 1, len(@tmpstr)))
set @pos = charindex(@pSeparator, @tmpstr)
end
set @leftover = @tmpstr
end
-- Handle @leftover
set @str = (select TrimmedString from Common.Trim(@leftover, default));
if @str <> ''
insert @tbl (value) values(@str);
return
end
GO
create function Common.ufn_SplitDbIdentifier(@pIdentifier nvarchar(300))
returns @table table
(
InstanceName nvarchar(300) not null,
DatabaseName nvarchar(300) not null,
SchemaName nvarchar(300),
BaseName nvarchar(300) not null,
FullTempDbBaseName nvarchar(300), -- non-null for tempdb (e.g. #Abc____...)
InstanceWasSpecified bit not null,
DatabaseWasSpecified bit not null,
SchemaWasSpecified bit not null,
IsCurrentInstance bit not null,
IsCurrentDatabase bit not null,
IsTempDb bit not null,
OrgIdentifier nvarchar(300) not null
) as
/*-----------------------------------------------------------------------------------------------------------
Purpose: Split a Sql Server Identifier into its parts, providing appropriate default values and
handling temp table (tempdb) references.
Example: select * from Common.ufn_SplitDbIdentifier('t')
union all
select * from Common.ufn_SplitDbIdentifier('s.t')
union all
select * from Common.ufn_SplitDbIdentifier('d.s.t')
union all
select * from Common.ufn_SplitDbIdentifier('i.d.s.t')
union all
select * from Common.ufn_SplitDbIdentifier('#d')
union all
select * from Common.ufn_SplitDbIdentifier('tempdb..#d');
-- Empty
select * from Common.ufn_SplitDbIdentifier('illegal name');
Modified By Description
---------- -------------- -----------------------------------------------------------------------------
2013.09.27 crokusek Initial version.
-----------------------------------------------------------------------------------------------------------*/
begin
declare
@name nvarchar(300) = ltrim(rtrim(@pIdentifier));
-- Return an empty table as a "throw"
--
--Removed for SO post
--if (Common.ufn_IsSpacelessLiteralIdentifier(@name) = 0)
-- return;
-- Find dots starting from the right by reversing first.
declare
@revName nvarchar(300) = reverse(@name);
declare
@firstDot int = charindex('.', @revName);
declare
@secondDot int = iif(@firstDot = 0, 0, charindex('.', @revName, @firstDot + 1));
declare
@thirdDot int = iif(@secondDot = 0, 0, charindex('.', @revName, @secondDot + 1));
declare
@fourthDot int = iif(@thirdDot = 0, 0, charindex('.', @revName, @thirdDot + 1));
--select @firstDot, @secondDot, @thirdDot, @fourthDot, len(@name);
-- Undo the reverse() (first dot is first from the right).
--
set @firstDot = iif(@firstDot = 0, 0, len(@name) - @firstDot + 1);
set @secondDot = iif(@secondDot = 0, 0, len(@name) - @secondDot + 1);
set @thirdDot = iif(@thirdDot = 0, 0, len(@name) - @thirdDot + 1);
set @fourthDot = iif(@fourthDot = 0, 0, len(@name) - @fourthDot + 1);
--select @firstDot, @secondDot, @thirdDot, @fourthDot, len(@name);
declare
@baseName nvarchar(300) = substring(@name, @firstDot + 1, len(@name) - @firstdot);
declare
@schemaName nvarchar(300) = iif(@firstDot - @secondDot - 1 <= 0,
null,
substring(@name, @secondDot + 1, @firstDot - @secondDot - 1));
declare
@dbName nvarchar(300) = iif(@secondDot - @thirdDot - 1 <= 0,
null,
substring(@name, @thirdDot + 1, @secondDot - @thirdDot - 1));
declare
@instName nvarchar(300) = iif(@thirdDot - @fourthDot - 1 <= 0,
null,
substring(@name, @fourthDot + 1, @thirdDot - @fourthDot - 1));
with input as (
select
coalesce(@instName, '[' + @@servername + ']') as InstanceName,
coalesce(@dbName, iif(left(@baseName, 1) = '#', 'tempdb', db_name())) as DatabaseName,
coalesce(@schemaName, iif(left(@baseName, 1) = '#', 'dbo', schema_name())) as SchemaName,
@baseName as BaseName,
iif(left(@baseName, 1) = '#',
(
select [name] from tempdb.sys.objects
where object_id = object_id('tempdb..' + @baseName)
),
null) as FullTempDbBaseName,
iif(@instName is null, 0, 1) InstanceWasSpecified,
iif(@dbName is null, 0, 1) DatabaseWasSpecified,
iif(@schemaName is null, 0, 1) SchemaWasSpecified
)
insert into @table
select i.InstanceName, i.DatabaseName, i.SchemaName, i.BaseName, i.FullTempDbBaseName,
i.InstanceWasSpecified, i.DatabaseWasSpecified, i.SchemaWasSpecified,
iif(i.InstanceName = '[' + @@servername + ']', 1, 0) as IsCurrentInstance,
iif(i.DatabaseName = db_name(), 1, 0) as IsCurrentDatabase,
iif(left(@baseName, 1) = '#', 1, 0) as IsTempDb,
@name as OrgIdentifier
from input i;
return;
end
GO