我正在寻找插入实体框架的最快方法。
我之所以问这个问题,是因为您有一个活动的TransactionScope,并且插入量很大(4000+)。它可能会持续超过10分钟(事务的默认超时),这将导致事务不完整。
我正在寻找插入实体框架的最快方法。
我之所以问这个问题,是因为您有一个活动的TransactionScope,并且插入量很大(4000+)。它可能会持续超过10分钟(事务的默认超时),这将导致事务不完整。
您应该考虑为此使用System.Data.SqlClient.SqlBulkCopy。这是文档,当然还有很多在线教程。
抱歉,我知道您正在寻找一个简单的答案来让EF做您想做的事情,但批量操作并不是ORM真正的用途。
据我所知,EntityFramework中没有BulkInsert来提高大型插件的性能。
在这种情况下,您可以使用ADO.net中的SqlBulkCopy来解决问题
秘密是插入到相同的空白暂存表中。插件快速发光。然后在主大表中运行一个单独的插入。然后截断临时表,为下一批做好准备。
ie.
insert into some_staging_table using Entity Framework.
-- Single insert into main table (this could be a tiny stored proc call)
insert into some_main_already_large_table (columns...)
select (columns...) from some_staging_table
truncate table some_staging_table
对于您在问题评论中的评论:
“…保存更改(每个记录)。。。"
这是你能做的最糟糕的事情!对每个记录调用SaveChanges()会大大降低批量插入的速度。我会做一些简单的测试,这很可能会提高性能:
在所有记录后调用SaveChanges()一次。例如,在100条记录之后调用SaveChanges()。例如,在100条记录之后调用SaveChanges(),并释放上下文并创建新的上下文。禁用更改检测
对于批量插入,我正在使用这样的模式进行工作和试验:
using (TransactionScope scope = new TransactionScope())
{
MyDbContext context = null;
try
{
context = new MyDbContext();
context.Configuration.AutoDetectChangesEnabled = false;
int count = 0;
foreach (var entityToInsert in someCollectionOfEntitiesToInsert)
{
++count;
context = AddToContext(context, entityToInsert, count, 100, true);
}
context.SaveChanges();
}
finally
{
if (context != null)
context.Dispose();
}
scope.Complete();
}
private MyDbContext AddToContext(MyDbContext context,
Entity entity, int count, int commitCount, bool recreateContext)
{
context.Set<Entity>().Add(entity);
if (count % commitCount == 0)
{
context.SaveChanges();
if (recreateContext)
{
context.Dispose();
context = new MyDbContext();
context.Configuration.AutoDetectChangesEnabled = false;
}
}
return context;
}
我有一个测试程序,它将560.000个实体(9个标量财产,没有导航财产)插入数据库。使用此代码,它只需不到3分钟即可工作。
为了提高性能,在“许多”记录之后调用SaveChanges()非常重要(“许多”约为100或1000)。它还提高了在SaveChanges之后释放上下文并创建新上下文的性能。这将从所有实体中清除上下文,SaveChanges不会这样做,实体仍然以状态Unchanged附加到上下文。正是由于上下文中附加实体的大小不断增加,才导致插入过程一步步变慢。因此,在一段时间后清除它是有帮助的。
以下是我的560000个实体的一些测量值:
commitCount=1,recreateContext=false:许多小时(这是您当前的过程)commitCount=100,recreateContext=false:超过20分钟commitCount=1000,recreateContext=false:242秒commitCount=10000,recreateContext=false:202秒commitCount=100000,recreateContext=false:199秒commitCount=1000000,recreateContext=false:内存不足异常commitCount=1,recreateContext=true:超过10分钟commitCount=10,recreateContext=true:241秒commitCount=100,recreateContext=true:164秒commitCount=1000,recreateContext=true:191秒
上述第一个测试中的行为是,性能非常非线性,并且随着时间的推移会大大降低。(“很多小时”是一个估计,我从未完成过这项测试,20分钟后我停在了50000个实体。)这种非线性行为在所有其他测试中都不那么重要。
这种组合可以很好地提高速度。
context.Configuration.AutoDetectChangesEnabled = false;
context.Configuration.ValidateOnSaveEnabled = false;
我同意亚当·拉基斯的观点。SqlBulkCopy是将批量记录从一个数据源传输到另一数据源的最快方法。我用这个复制了20K张唱片,只用了不到3秒钟。看看下面的例子。
public static void InsertIntoMembers(DataTable dataTable)
{
using (var connection = new SqlConnection(@"data source=;persist security info=True;user id=;password=;initial catalog=;MultipleActiveResultSets=True;App=EntityFramework"))
{
SqlTransaction transaction = null;
connection.Open();
try
{
transaction = connection.BeginTransaction();
using (var sqlBulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, transaction))
{
sqlBulkCopy.DestinationTableName = "Members";
sqlBulkCopy.ColumnMappings.Add("Firstname", "Firstname");
sqlBulkCopy.ColumnMappings.Add("Lastname", "Lastname");
sqlBulkCopy.ColumnMappings.Add("DOB", "DOB");
sqlBulkCopy.ColumnMappings.Add("Gender", "Gender");
sqlBulkCopy.ColumnMappings.Add("Email", "Email");
sqlBulkCopy.ColumnMappings.Add("Address1", "Address1");
sqlBulkCopy.ColumnMappings.Add("Address2", "Address2");
sqlBulkCopy.ColumnMappings.Add("Address3", "Address3");
sqlBulkCopy.ColumnMappings.Add("Address4", "Address4");
sqlBulkCopy.ColumnMappings.Add("Postcode", "Postcode");
sqlBulkCopy.ColumnMappings.Add("MobileNumber", "MobileNumber");
sqlBulkCopy.ColumnMappings.Add("TelephoneNumber", "TelephoneNumber");
sqlBulkCopy.ColumnMappings.Add("Deleted", "Deleted");
sqlBulkCopy.WriteToServer(dataTable);
}
transaction.Commit();
}
catch (Exception)
{
transaction.Rollback();
}
}
}
如果您添加的实体()依赖于上下文中的其他预加载实体(例如导航财产),则Dispose()上下文会产生问题
我使用类似的概念来保持我的上下文较小,以实现相同的性能
但我只是分离已经SaveChanges()的实体,而不是Dispose()上下文并重新创建
public void AddAndSave<TEntity>(List<TEntity> entities) where TEntity : class {
const int CommitCount = 1000; //set your own best performance number here
int currentCount = 0;
while (currentCount < entities.Count())
{
//make sure it don't commit more than the entities you have
int commitCount = CommitCount;
if ((entities.Count - currentCount) < commitCount)
commitCount = entities.Count - currentCount;
//e.g. Add entities [ i = 0 to 999, 1000 to 1999, ... , n to n+999... ] to conext
for (int i = currentCount; i < (currentCount + commitCount); i++)
_context.Entry(entities[i]).State = System.Data.EntityState.Added;
//same as calling _context.Set<TEntity>().Add(entities[i]);
//commit entities[n to n+999] to database
_context.SaveChanges();
//detach all entities in the context that committed to database
//so it won't overload the context
for (int i = currentCount; i < (currentCount + commitCount); i++)
_context.Entry(entities[i]).State = System.Data.EntityState.Detached;
currentCount += commitCount;
} }
如果需要,用try-catch和TrasactionScope()将其包装起来,为了保持代码干净,没有在这里显示它们
我已经研究了Slauma的答案(这太棒了,感谢创意人),我已经减少了批量,直到达到最佳速度。查看Slauma的结果:
commitCount=1,recreateContext=true:超过10分钟commitCount=10,recreateContext=true:241秒commitCount=100,recreateContext=true:164秒commitCount=1000,recreateContext=true:191秒
可以看出,当从1移动到10,以及从10移动到100时,速度会增加,但从100到1000的插入速度会再次下降。
因此,我重点关注了当您将批量大小减少到10到100之间时会发生什么,下面是我的结果(我使用了不同的行内容,因此我的时间值不同):
Quantity | Batch size | Interval
1000 1 3
10000 1 34
100000 1 368
1000 5 1
10000 5 12
100000 5 133
1000 10 1
10000 10 11
100000 10 101
1000 20 1
10000 20 9
100000 20 92
1000 27 0
10000 27 9
100000 27 92
1000 30 0
10000 30 9
100000 30 92
1000 35 1
10000 35 9
100000 35 94
1000 50 1
10000 50 10
100000 50 106
1000 100 1
10000 100 14
100000 100 141
根据我的结果,批量大小的实际最佳值约为30。它小于10和100。问题是,我不知道为什么30是最优的,也找不到任何合理的解释。
您是否尝试过通过后台工作人员或任务插入?
在我的例子中,我插入了7760个寄存器,分布在182个具有外键关系的不同表中(通过NavigationProperties)。
没有这项任务,花了2分半钟。在一个Task(Task.Factory.StartNew(…))中,花费了15秒。
我只在将所有实体添加到上下文之后才执行SaveChanges()。(确保数据完整性)
下面是使用实体框架和使用SqlBulkCopy类之间的性能比较,具体示例为:如何将复杂对象批量插入SQL Server数据库
正如其他人已经强调的,ORM不应用于批量操作。它们提供了灵活性、关注点分离和其他好处,但批量操作(批量读取除外)不是其中之一。
最快的方法是使用批量插入扩展,这是我开发的
注:这是一种商业产品,不是免费的
它使用SqlBulkCopy和自定义数据读取器来获得最大性能。因此,它比使用常规插入或AddRange快20倍以上
用法非常简单
context.BulkInsert(hugeAmountOfEntities);
我将推荐这篇关于如何使用EF进行批量插入的文章。
实体框架和慢速批量INSERT
他探索了这些领域并比较了绩效:
默认EF(57分钟完成添加30000条记录)替换为ADO.NET代码(对于相同的30000,25秒)上下文膨胀-通过为每个工作单元使用一个新的上下文来保持活动的上下文图较小(相同的30000个插入需要33秒)大列表-关闭AutoDetectChangesEnabled(将时间缩短至约20秒)批处理(最短16秒)DbTable.AddRange()-(性能在12范围内)
正如其他人所说,如果您想要真正好的插入性能,SqlBulkCopy是一种实现方法。
它的实现有点麻烦,但有一些库可以帮助您实现它。有一些库,但这次我将无耻地使用我自己的库:https://github.com/MikaelEliasson/EntityFramework.Utilities#batch-插入实体
您需要的唯一代码是:
using (var db = new YourDbContext())
{
EFBatchOperation.For(db, db.BlogPosts).InsertAll(list);
}
那么它快多少?很难说,因为这取决于许多因素,计算机性能、网络、对象大小等。我所做的性能测试表明,如果您像其他答案中提到的那样优化EF配置,则可以在10秒左右以标准方式在本地主机上插入25k个实体。使用EFUtilities,大约需要300毫秒。更有趣的是,我使用这种方法在不到15秒内保存了大约300万个实体,平均每秒大约200万个实体。
当然,一个问题是若需要插入相关数据。这可以使用上述方法在sql server中高效地完成,但它需要您有一个Id生成策略,允许您在应用程序代码中为父级生成Id,以便您可以设置外键。这可以使用GUID或类似HiLo id生成的方法来完成。
这里编写的所有解决方案都无济于事,因为当您执行SaveChanges()时,insert语句会一个接一个地发送到数据库,这就是Entity的工作方式。
例如,如果您的数据库往返行程是50毫秒,那么插入所需的时间是记录数x 50毫秒。
您必须使用BulkInsert,以下是链接:https://efbulkinsert.codeplex.com/
通过使用它,我的插入时间从5-6分钟减少到10-12秒。
我对上面的@Slauma示例进行了一个通用扩展;
public static class DataExtensions
{
public static DbContext AddToContext<T>(this DbContext context, object entity, int count, int commitCount, bool recreateContext, Func<DbContext> contextCreator)
{
context.Set(typeof(T)).Add((T)entity);
if (count % commitCount == 0)
{
context.SaveChanges();
if (recreateContext)
{
context.Dispose();
context = contextCreator.Invoke();
context.Configuration.AutoDetectChangesEnabled = false;
}
}
return context;
}
}
用法:
public void AddEntities(List<YourEntity> entities)
{
using (var transactionScope = new TransactionScope())
{
DbContext context = new YourContext();
int count = 0;
foreach (var entity in entities)
{
++count;
context = context.AddToContext<TenancyNote>(entity, count, 100, true,
() => new YourContext());
}
context.SaveChanges();
transactionScope.Complete();
}
}
另一种选择是使用Nuget提供的SqlBulkTools。它非常容易使用,并且具有一些强大的功能。
例子:
var bulk = new BulkOperations();
var books = GetBooks();
using (TransactionScope trans = new TransactionScope())
{
using (SqlConnection conn = new SqlConnection(ConfigurationManager
.ConnectionStrings["SqlBulkToolsTest"].ConnectionString))
{
bulk.Setup<Book>()
.ForCollection(books)
.WithTable("Books")
.AddAllColumns()
.BulkInsert()
.Commit(conn);
}
trans.Complete();
}
有关更多示例和高级用法,请参阅文档。免责声明:我是这个图书馆的作者,任何观点都是我自己的观点。
我正在寻找插入实体框架的最快方法
有一些支持大容量插入的第三方库可用:
Z.EntityFramework.Extensions(推荐)EF实用程序实体框架.BulkInsert
请参见:实体框架大容量插入库
选择大容量插入库时要小心。只有实体框架扩展支持所有类型的关联和继承,并且它是唯一一个仍然受支持的实体框架扩展。
免责声明:我是实体框架扩展的所有者
此库允许您执行场景所需的所有批量操作:
批量保存更改大容量插入批量删除批量更新批量合并
实例
// Easy to use
context.BulkSaveChanges();
// Easy to customize
context.BulkSaveChanges(bulk => bulk.BatchSize = 100);
// Perform Bulk Operations
context.BulkDelete(customers);
context.BulkInsert(customers);
context.BulkUpdate(customers);
// Customize Primary Key
context.BulkMerge(customers, operation => {
operation.ColumnPrimaryKeyExpression =
customer => customer.Code;
});
使用SqlBulkCopy:
void BulkInsert(GpsReceiverTrack[] gpsReceiverTracks)
{
if (gpsReceiverTracks == null)
{
throw new ArgumentNullException(nameof(gpsReceiverTracks));
}
DataTable dataTable = new DataTable("GpsReceiverTracks");
dataTable.Columns.Add("ID", typeof(int));
dataTable.Columns.Add("DownloadedTrackID", typeof(int));
dataTable.Columns.Add("Time", typeof(TimeSpan));
dataTable.Columns.Add("Latitude", typeof(double));
dataTable.Columns.Add("Longitude", typeof(double));
dataTable.Columns.Add("Altitude", typeof(double));
for (int i = 0; i < gpsReceiverTracks.Length; i++)
{
dataTable.Rows.Add
(
new object[]
{
gpsReceiverTracks[i].ID,
gpsReceiverTracks[i].DownloadedTrackID,
gpsReceiverTracks[i].Time,
gpsReceiverTracks[i].Latitude,
gpsReceiverTracks[i].Longitude,
gpsReceiverTracks[i].Altitude
}
);
}
string connectionString = (new TeamTrackerEntities()).Database.Connection.ConnectionString;
using (var connection = new SqlConnection(connectionString))
{
connection.Open();
using (var transaction = connection.BeginTransaction())
{
using (var sqlBulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, transaction))
{
sqlBulkCopy.DestinationTableName = dataTable.TableName;
foreach (DataColumn column in dataTable.Columns)
{
sqlBulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
sqlBulkCopy.WriteToServer(dataTable);
}
transaction.Commit();
}
}
return;
}
[POSTGRESQL的新解决方案]嘿,我知道这是一篇很老的文章,但我最近遇到了类似的问题,但我们使用的是Postgresql。我想使用有效的膨胀剂,但结果很难。我在这个数据库上找不到合适的免费库。我只找到了这个助手:https://bytefish.de/blog/postgresql_bulk_insert/也在Nuget上。我编写了一个小映射器,它以实体框架的方式自动映射财产:
public static PostgreSQLCopyHelper<T> CreateHelper<T>(string schemaName, string tableName)
{
var helper = new PostgreSQLCopyHelper<T>("dbo", "\"" + tableName + "\"");
var properties = typeof(T).GetProperties();
foreach(var prop in properties)
{
var type = prop.PropertyType;
if (Attribute.IsDefined(prop, typeof(KeyAttribute)) || Attribute.IsDefined(prop, typeof(ForeignKeyAttribute)))
continue;
switch (type)
{
case Type intType when intType == typeof(int) || intType == typeof(int?):
{
helper = helper.MapInteger("\"" + prop.Name + "\"", x => (int?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
break;
}
case Type stringType when stringType == typeof(string):
{
helper = helper.MapText("\"" + prop.Name + "\"", x => (string)typeof(T).GetProperty(prop.Name).GetValue(x, null));
break;
}
case Type dateType when dateType == typeof(DateTime) || dateType == typeof(DateTime?):
{
helper = helper.MapTimeStamp("\"" + prop.Name + "\"", x => (DateTime?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
break;
}
case Type decimalType when decimalType == typeof(decimal) || decimalType == typeof(decimal?):
{
helper = helper.MapMoney("\"" + prop.Name + "\"", x => (decimal?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
break;
}
case Type doubleType when doubleType == typeof(double) || doubleType == typeof(double?):
{
helper = helper.MapDouble("\"" + prop.Name + "\"", x => (double?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
break;
}
case Type floatType when floatType == typeof(float) || floatType == typeof(float?):
{
helper = helper.MapReal("\"" + prop.Name + "\"", x => (float?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
break;
}
case Type guidType when guidType == typeof(Guid):
{
helper = helper.MapUUID("\"" + prop.Name + "\"", x => (Guid)typeof(T).GetProperty(prop.Name).GetValue(x, null));
break;
}
}
}
return helper;
}
我使用它的方式如下(我有一个名为“承诺”的实体):
var undertakingHelper = BulkMapper.CreateHelper<Model.Undertaking>("dbo", nameof(Model.Undertaking));
undertakingHelper.SaveAll(transaction.UnderlyingTransaction.Connection as Npgsql.NpgsqlConnection, undertakingsToAdd));
我展示了一个事务的示例,但它也可以通过从上下文中检索到的正常连接来完成。undertakingsToAdd是普通实体记录的枚举,我想将其批量插入数据库。
这个解决方案,我经过几个小时的研究和尝试后得到的,正如你所期望的那样,速度更快,最终易于使用和免费!我真的建议你使用这个解决方案,不仅因为上面提到的原因,而且因为它是唯一一个我对Postgresql本身没有问题的解决方案,许多其他解决方案都可以完美地工作,例如SqlServer。
但是,对于超过(+4000)个插入,我建议使用存储过程。附上经过的时间。我确实插入了20英寸的11.788行
这就是代码
public void InsertDataBase(MyEntity entity)
{
repository.Database.ExecuteSqlCommand("sp_mystored " +
"@param1, @param2"
new SqlParameter("@param1", entity.property1),
new SqlParameter("@param2", entity.property2));
}
我知道这是一个非常古老的问题,但这里的一个家伙说,他开发了一个扩展方法,可以在EF中使用批量插入,当我检查时,我发现这个库今天的价格是599美元(对于一个开发人员来说)。也许对于整个库来说这是有意义的,但是对于大容量插入来说这太多了。
这是我做的一个非常简单的扩展方法。我首先将其与数据库配对使用(不首先使用代码进行测试,但我认为这是一样的)。使用上下文名称更改YourEntitys:
public partial class YourEntities : DbContext
{
public async Task BulkInsertAllAsync<T>(IEnumerable<T> entities)
{
using (var conn = new SqlConnection(Database.Connection.ConnectionString))
{
await conn.OpenAsync();
Type t = typeof(T);
var bulkCopy = new SqlBulkCopy(conn)
{
DestinationTableName = GetTableName(t)
};
var table = new DataTable();
var properties = t.GetProperties().Where(p => p.PropertyType.IsValueType || p.PropertyType == typeof(string));
foreach (var property in properties)
{
Type propertyType = property.PropertyType;
if (propertyType.IsGenericType &&
propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = Nullable.GetUnderlyingType(propertyType);
}
table.Columns.Add(new DataColumn(property.Name, propertyType));
}
foreach (var entity in entities)
{
table.Rows.Add(
properties.Select(property => property.GetValue(entity, null) ?? DBNull.Value).ToArray());
}
bulkCopy.BulkCopyTimeout = 0;
await bulkCopy.WriteToServerAsync(table);
}
}
public void BulkInsertAll<T>(IEnumerable<T> entities)
{
using (var conn = new SqlConnection(Database.Connection.ConnectionString))
{
conn.Open();
Type t = typeof(T);
var bulkCopy = new SqlBulkCopy(conn)
{
DestinationTableName = GetTableName(t)
};
var table = new DataTable();
var properties = t.GetProperties().Where(p => p.PropertyType.IsValueType || p.PropertyType == typeof(string));
foreach (var property in properties)
{
Type propertyType = property.PropertyType;
if (propertyType.IsGenericType &&
propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = Nullable.GetUnderlyingType(propertyType);
}
table.Columns.Add(new DataColumn(property.Name, propertyType));
}
foreach (var entity in entities)
{
table.Rows.Add(
properties.Select(property => property.GetValue(entity, null) ?? DBNull.Value).ToArray());
}
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(table);
}
}
public string GetTableName(Type type)
{
var metadata = ((IObjectContextAdapter)this).ObjectContext.MetadataWorkspace;
var objectItemCollection = ((ObjectItemCollection)metadata.GetItemCollection(DataSpace.OSpace));
var entityType = metadata
.GetItems<EntityType>(DataSpace.OSpace)
.Single(e => objectItemCollection.GetClrType(e) == type);
var entitySet = metadata
.GetItems<EntityContainer>(DataSpace.CSpace)
.Single()
.EntitySets
.Single(s => s.ElementType.Name == entityType.Name);
var mapping = metadata.GetItems<EntityContainerMapping>(DataSpace.CSSpace)
.Single()
.EntitySetMappings
.Single(s => s.EntitySet == entitySet);
var table = mapping
.EntityTypeMappings.Single()
.Fragments.Single()
.StoreEntitySet;
return (string)table.MetadataProperties["Table"].Value ?? table.Name;
}
}
您可以对继承自IEnumerable的任何集合使用它,如下所示:
await context.BulkInsertAllAsync(items);
保存列表的最快方法之一必须应用以下代码
context.Configuration.AutoDetectChangesEnabled = false;
context.Configuration.ValidateOnSaveEnabled = false;
AutoDetectChangesEnabled=false
添加、添加范围和保存更改:无法检测更改。
ValidateOnSaveEnabled=false;
未检测到更改跟踪器
您必须添加nuget
Install-Package Z.EntityFramework.Extensions
现在您可以使用以下代码
var context = new MyContext();
context.Configuration.AutoDetectChangesEnabled = false;
context.Configuration.ValidateOnSaveEnabled = false;
context.BulkInsert(list);
context.BulkSaveChanges();
SqlBulkCopy速度极快
这是我的实现:
// at some point in my calling code, I will call:
var myDataTable = CreateMyDataTable();
myDataTable.Rows.Add(Guid.NewGuid,tableHeaderId,theName,theValue); // e.g. - need this call for each row to insert
var efConnectionString = ConfigurationManager.ConnectionStrings["MyWebConfigEfConnection"].ConnectionString;
var efConnectionStringBuilder = new EntityConnectionStringBuilder(efConnectionString);
var connectionString = efConnectionStringBuilder.ProviderConnectionString;
BulkInsert(connectionString, myDataTable);
private DataTable CreateMyDataTable()
{
var myDataTable = new DataTable { TableName = "MyTable"};
// this table has an identity column - don't need to specify that
myDataTable.Columns.Add("MyTableRecordGuid", typeof(Guid));
myDataTable.Columns.Add("MyTableHeaderId", typeof(int));
myDataTable.Columns.Add("ColumnName", typeof(string));
myDataTable.Columns.Add("ColumnValue", typeof(string));
return myDataTable;
}
private void BulkInsert(string connectionString, DataTable dataTable)
{
using (var connection = new SqlConnection(connectionString))
{
connection.Open();
SqlTransaction transaction = null;
try
{
transaction = connection.BeginTransaction();
using (var sqlBulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, transaction))
{
sqlBulkCopy.DestinationTableName = dataTable.TableName;
foreach (DataColumn column in dataTable.Columns) {
sqlBulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
sqlBulkCopy.WriteToServer(dataTable);
}
transaction.Commit();
}
catch (Exception)
{
transaction?.Rollback();
throw;
}
}
}
因为这里从未提到过,我想在这里重新推荐EFCore.BulkExtensions
context.BulkInsert(entitiesList); context.BulkInsertAsync(entitiesList);
context.BulkUpdate(entitiesList); context.BulkUpdateAsync(entitiesList);
context.BulkDelete(entitiesList); context.BulkDeleteAsync(entitiesList);
context.BulkInsertOrUpdate(entitiesList); context.BulkInsertOrUpdateAsync(entitiesList); // Upsert
context.BulkInsertOrUpdateOrDelete(entitiesList); context.BulkInsertOrUpdateOrDeleteAsync(entitiesList); // Sync
context.BulkRead(entitiesList); context.BulkReadAsync(entitiesList);
使用此技术可以提高实体框架中插入记录的速度。这里我使用一个简单的存储过程来插入记录。为了执行这个存储过程,我使用实体框架的.FromSql()方法来执行Raw SQL。
存储过程代码:
CREATE PROCEDURE TestProc
@FirstParam VARCHAR(50),
@SecondParam VARCHAR(50)
AS
Insert into SomeTable(Name, Address) values(@FirstParam, @SecondParam)
GO
接下来,循环遍历所有4000条记录,并添加执行存储的
该过程每100次循环一次。
为此,我创建了一个字符串查询来执行这个过程,并继续将每一组记录附加到它。
然后检查循环是否以100的倍数运行,在这种情况下,使用.FromSql()执行它。
所以对于4000条记录,我只需要执行以下步骤4000/100=40次。
检查以下代码:
string execQuery = "";
var context = new MyContext();
for (int i = 0; i < 4000; i++)
{
execQuery += "EXEC TestProc @FirstParam = 'First'" + i + "'', @SecondParam = 'Second'" + i + "''";
if (i % 100 == 0)
{
context.Student.FromSql(execQuery);
execQuery = "";
}
}
〔2019更新〕EF Core 3.1
如上所述,在EF Core中禁用AutoDetectChangesEnabled非常有效:插入时间除以100(从几分钟到几秒,10k条记录具有交叉表关系)
更新的代码为:
context.ChangeTracker.AutoDetectChangesEnabled = false;
foreach (IRecord record in records) {
//Add records to your database
}
context.ChangeTracker.DetectChanges();
context.SaveChanges();
context.ChangeTracker.AutoDetectChangesEnabled = true; //do not forget to re-enable
TL;博士我知道这是一个老帖子,但我已经实施了一个解决方案,从其中一个提议开始,扩展它并解决其中的一些问题;此外,我还阅读了所提出的其他解决方案,与这些方案相比,我似乎提出了一种更适合原始问题中提出的要求的解决方案。
在这个解决方案中,我扩展了Slauma的方法,我认为它非常适合原始问题中提出的情况,即使用实体框架和事务范围对数据库进行昂贵的写入操作。
在Slauma的解决方案中,这只是一个草稿,只是用来了解EF的速度与实施批量插入的策略-存在以下问题:
交易超时(默认情况下,1分钟可通过代码延长至最多10分钟);复制宽度等于事务结束时使用的提交大小的第一个数据块(这个问题很奇怪,可以通过变通方法解决)。
我还报告了一个例子,其中包括几个从属实体的上下文插入,从而扩展了Slauma提出的案例研究。
我能够验证的性能是10K记录/分钟,在数据库中插入200K宽的记录块,每个记录块大约1KB。速度是恒定的,性能没有下降,测试需要大约20分钟才能成功运行。
详细的解决方案
主持在示例存储库类中插入的批量插入操作的方法:
abstract class SomeRepository {
protected MyDbContext myDbContextRef;
public void ImportData<TChild, TFather>(List<TChild> entities, TFather entityFather)
where TChild : class, IEntityChild
where TFather : class, IEntityFather
{
using (var scope = MyDbContext.CreateTransactionScope())
{
MyDbContext context = null;
try
{
context = new MyDbContext(myDbContextRef.ConnectionString);
context.Configuration.AutoDetectChangesEnabled = false;
entityFather.BulkInsertResult = false;
var fileEntity = context.Set<TFather>().Add(entityFather);
context.SaveChanges();
int count = 0;
//avoids an issue with recreating context: EF duplicates the first commit block of data at the end of transaction!!
context = MyDbContext.AddToContext<TChild>(context, null, 0, 1, true);
foreach (var entityToInsert in entities)
{
++count;
entityToInsert.EntityFatherRefId = fileEntity.Id;
context = MyDbContext.AddToContext<TChild>(context, entityToInsert, count, 100, true);
}
entityFather.BulkInsertResult = true;
context.Set<TFather>().Add(fileEntity);
context.Entry<TFather>(fileEntity).State = EntityState.Modified;
context.SaveChanges();
}
finally
{
if (context != null)
context.Dispose();
}
scope.Complete();
}
}
}
仅用于示例目的的接口:
public interface IEntityChild {
//some properties ...
int EntityFatherRefId { get; set; }
}
public interface IEntityFather {
int Id { get; set; }
bool BulkInsertResult { get; set; }
}
db上下文中,我将解决方案的各个元素实现为静态方法:
public class MyDbContext : DbContext
{
public string ConnectionString { get; set; }
public MyDbContext(string nameOrConnectionString)
: base(nameOrConnectionString)
{
Database.SetInitializer<MyDbContext>(null);
ConnectionString = Database.Connection.ConnectionString;
}
/// <summary>
/// Creates a TransactionScope raising timeout transaction to 30 minutes
/// </summary>
/// <param name="_isolationLevel"></param>
/// <param name="timeout"></param>
/// <remarks>
/// It is possible to set isolation-level and timeout to different values. Pay close attention managing these 2 transactions working parameters.
/// <para>Default TransactionScope values for isolation-level and timeout are the following:</para>
/// <para>Default isolation-level is "Serializable"</para>
/// <para>Default timeout ranges between 1 minute (default value if not specified a timeout) to max 10 minute (if not changed by code or updating max-timeout machine.config value)</para>
/// </remarks>
public static TransactionScope CreateTransactionScope(IsolationLevel _isolationLevel = IsolationLevel.Serializable, TimeSpan? timeout = null)
{
SetTransactionManagerField("_cachedMaxTimeout", true);
SetTransactionManagerField("_maximumTimeout", timeout ?? TimeSpan.FromMinutes(30));
var transactionOptions = new TransactionOptions();
transactionOptions.IsolationLevel = _isolationLevel;
transactionOptions.Timeout = TransactionManager.MaximumTimeout;
return new TransactionScope(TransactionScopeOption.Required, transactionOptions);
}
private static void SetTransactionManagerField(string fieldName, object value)
{
typeof(TransactionManager).GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static).SetValue(null, value);
}
/// <summary>
/// Adds a generic entity to a given context allowing commit on large block of data and improving performance to support db bulk-insert operations based on Entity Framework
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="context"></param>
/// <param name="entity"></param>
/// <param name="count"></param>
/// <param name="commitCount">defines the block of data size</param>
/// <param name="recreateContext"></param>
/// <returns></returns>
public static MyDbContext AddToContext<T>(MyDbContext context, T entity, int count, int commitCount, bool recreateContext) where T : class
{
if (entity != null)
context.Set<T>().Add(entity);
if (count % commitCount == 0)
{
context.SaveChanges();
if (recreateContext)
{
var contextConnectionString = context.ConnectionString;
context.Dispose();
context = new MyDbContext(contextConnectionString);
context.Configuration.AutoDetectChangesEnabled = false;
}
}
return context;
}
}
是的,SqlBulkUpdate确实是这类任务最快的工具。我想在.NETCore中找到“最省力”的通用方法,所以我最终使用了MarcGravell的优秀库FastMember,并为实体框架DB上下文编写了一个小小的扩展方法。工作速度极快:
using System.Collections.Generic;
using System.Linq;
using FastMember;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
namespace Services.Extensions
{
public static class DbContextExtensions
{
public static void BulkCopyToServer<T>(this DbContext db, IEnumerable<T> collection)
{
var messageEntityType = db.Model.FindEntityType(typeof(T));
var tableName = messageEntityType.GetSchema() + "." + messageEntityType.GetTableName();
var tableColumnMappings = messageEntityType.GetProperties()
.ToDictionary(p => p.PropertyInfo.Name, p => p.GetColumnName());
using (var connection = new SqlConnection(db.Database.GetDbConnection().ConnectionString))
using (var bulkCopy = new SqlBulkCopy(connection))
{
foreach (var (field, column) in tableColumnMappings)
{
bulkCopy.ColumnMappings.Add(field, column);
}
using (var reader = ObjectReader.Create(collection, tableColumnMappings.Keys.ToArray()))
{
bulkCopy.DestinationTableName = tableName;
connection.Open();
bulkCopy.WriteToServer(reader);
connection.Close();
}
}
}
}
}
Configuration.LazyLoadingEnabled=false;Configuration.ProxyCreationEnabled=false;
如果没有AutoDetectChangesEnabled=false,这些速度太快;我建议使用不同于dbo的表头。通常我使用nop、sop、tbl等。。
记下几点,这是我的实施,我的改进以及其他回答和评论。
改进:
从我的实体获取SQL连接字符串仅在某些部分使用SQLBulk,其余部分仅使用实体框架使用与SQL数据库相同的日期表列名,无需映射每个列使用与SQL Datatable相同的Datatable名称public void InsertBulkDatatable(DataTable数据表){EntityConnectionStringBuilder entityBuilder=新的EntityConnectionStringBuilder(ConfigurationManager.ConnectionStrings[“MyDbContextConnectionName”].ConnectionString);string cs=entityBuilder.ProviderConnectionString;使用(varconnection=newSqlConnection(cs)){SqlTransaction事务=null;connection.Open();尝试{transaction=connection.BegginTransaction();使用(var sqlBulkCopy=新的sqlBulkCopy(连接,SqlBulkCopyOptions.TableLock,事务)){sqlBulkCopy.DestinationTableName=dataTable.TableName//使用SQL数据表在c中命名数据表#//映射列foreach(dataTable.Columns中的DataColumn列){sqlBulkCopy.ColumnMappings.Add(column.ColumnName,column.ColumnName);}sqlBulkCopy.WriteToServer(数据表);}transaction.Commit();}catch(异常){transaction.Rollback();}}}