我正在寻找插入实体框架的最快方法。
我之所以问这个问题,是因为您有一个活动的TransactionScope,并且插入量很大(4000+)。它可能会持续超过10分钟(事务的默认超时),这将导致事务不完整。
我正在寻找插入实体框架的最快方法。
我之所以问这个问题,是因为您有一个活动的TransactionScope,并且插入量很大(4000+)。它可能会持续超过10分钟(事务的默认超时),这将导致事务不完整。
当前回答
如果您添加的实体()依赖于上下文中的其他预加载实体(例如导航财产),则Dispose()上下文会产生问题
我使用类似的概念来保持我的上下文较小,以实现相同的性能
但我只是分离已经SaveChanges()的实体,而不是Dispose()上下文并重新创建
public void AddAndSave<TEntity>(List<TEntity> entities) where TEntity : class {
const int CommitCount = 1000; //set your own best performance number here
int currentCount = 0;
while (currentCount < entities.Count())
{
//make sure it don't commit more than the entities you have
int commitCount = CommitCount;
if ((entities.Count - currentCount) < commitCount)
commitCount = entities.Count - currentCount;
//e.g. Add entities [ i = 0 to 999, 1000 to 1999, ... , n to n+999... ] to conext
for (int i = currentCount; i < (currentCount + commitCount); i++)
_context.Entry(entities[i]).State = System.Data.EntityState.Added;
//same as calling _context.Set<TEntity>().Add(entities[i]);
//commit entities[n to n+999] to database
_context.SaveChanges();
//detach all entities in the context that committed to database
//so it won't overload the context
for (int i = currentCount; i < (currentCount + commitCount); i++)
_context.Entry(entities[i]).State = System.Data.EntityState.Detached;
currentCount += commitCount;
} }
如果需要,用try-catch和TrasactionScope()将其包装起来,为了保持代码干净,没有在这里显示它们
其他回答
我同意亚当·拉基斯的观点。SqlBulkCopy是将批量记录从一个数据源传输到另一数据源的最快方法。我用这个复制了20K张唱片,只用了不到3秒钟。看看下面的例子。
public static void InsertIntoMembers(DataTable dataTable)
{
using (var connection = new SqlConnection(@"data source=;persist security info=True;user id=;password=;initial catalog=;MultipleActiveResultSets=True;App=EntityFramework"))
{
SqlTransaction transaction = null;
connection.Open();
try
{
transaction = connection.BeginTransaction();
using (var sqlBulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, transaction))
{
sqlBulkCopy.DestinationTableName = "Members";
sqlBulkCopy.ColumnMappings.Add("Firstname", "Firstname");
sqlBulkCopy.ColumnMappings.Add("Lastname", "Lastname");
sqlBulkCopy.ColumnMappings.Add("DOB", "DOB");
sqlBulkCopy.ColumnMappings.Add("Gender", "Gender");
sqlBulkCopy.ColumnMappings.Add("Email", "Email");
sqlBulkCopy.ColumnMappings.Add("Address1", "Address1");
sqlBulkCopy.ColumnMappings.Add("Address2", "Address2");
sqlBulkCopy.ColumnMappings.Add("Address3", "Address3");
sqlBulkCopy.ColumnMappings.Add("Address4", "Address4");
sqlBulkCopy.ColumnMappings.Add("Postcode", "Postcode");
sqlBulkCopy.ColumnMappings.Add("MobileNumber", "MobileNumber");
sqlBulkCopy.ColumnMappings.Add("TelephoneNumber", "TelephoneNumber");
sqlBulkCopy.ColumnMappings.Add("Deleted", "Deleted");
sqlBulkCopy.WriteToServer(dataTable);
}
transaction.Commit();
}
catch (Exception)
{
transaction.Rollback();
}
}
}
是的,SqlBulkUpdate确实是这类任务最快的工具。我想在.NETCore中找到“最省力”的通用方法,所以我最终使用了MarcGravell的优秀库FastMember,并为实体框架DB上下文编写了一个小小的扩展方法。工作速度极快:
using System.Collections.Generic;
using System.Linq;
using FastMember;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
namespace Services.Extensions
{
public static class DbContextExtensions
{
public static void BulkCopyToServer<T>(this DbContext db, IEnumerable<T> collection)
{
var messageEntityType = db.Model.FindEntityType(typeof(T));
var tableName = messageEntityType.GetSchema() + "." + messageEntityType.GetTableName();
var tableColumnMappings = messageEntityType.GetProperties()
.ToDictionary(p => p.PropertyInfo.Name, p => p.GetColumnName());
using (var connection = new SqlConnection(db.Database.GetDbConnection().ConnectionString))
using (var bulkCopy = new SqlBulkCopy(connection))
{
foreach (var (field, column) in tableColumnMappings)
{
bulkCopy.ColumnMappings.Add(field, column);
}
using (var reader = ObjectReader.Create(collection, tableColumnMappings.Keys.ToArray()))
{
bulkCopy.DestinationTableName = tableName;
connection.Open();
bulkCopy.WriteToServer(reader);
connection.Close();
}
}
}
}
}
我已经研究了Slauma的答案(这太棒了,感谢创意人),我已经减少了批量,直到达到最佳速度。查看Slauma的结果:
commitCount=1,recreateContext=true:超过10分钟commitCount=10,recreateContext=true:241秒commitCount=100,recreateContext=true:164秒commitCount=1000,recreateContext=true:191秒
可以看出,当从1移动到10,以及从10移动到100时,速度会增加,但从100到1000的插入速度会再次下降。
因此,我重点关注了当您将批量大小减少到10到100之间时会发生什么,下面是我的结果(我使用了不同的行内容,因此我的时间值不同):
Quantity | Batch size | Interval
1000 1 3
10000 1 34
100000 1 368
1000 5 1
10000 5 12
100000 5 133
1000 10 1
10000 10 11
100000 10 101
1000 20 1
10000 20 9
100000 20 92
1000 27 0
10000 27 9
100000 27 92
1000 30 0
10000 30 9
100000 30 92
1000 35 1
10000 35 9
100000 35 94
1000 50 1
10000 50 10
100000 50 106
1000 100 1
10000 100 14
100000 100 141
根据我的结果,批量大小的实际最佳值约为30。它小于10和100。问题是,我不知道为什么30是最优的,也找不到任何合理的解释。
我知道这是一个非常古老的问题,但这里的一个家伙说,他开发了一个扩展方法,可以在EF中使用批量插入,当我检查时,我发现这个库今天的价格是599美元(对于一个开发人员来说)。也许对于整个库来说这是有意义的,但是对于大容量插入来说这太多了。
这是我做的一个非常简单的扩展方法。我首先将其与数据库配对使用(不首先使用代码进行测试,但我认为这是一样的)。使用上下文名称更改YourEntitys:
public partial class YourEntities : DbContext
{
public async Task BulkInsertAllAsync<T>(IEnumerable<T> entities)
{
using (var conn = new SqlConnection(Database.Connection.ConnectionString))
{
await conn.OpenAsync();
Type t = typeof(T);
var bulkCopy = new SqlBulkCopy(conn)
{
DestinationTableName = GetTableName(t)
};
var table = new DataTable();
var properties = t.GetProperties().Where(p => p.PropertyType.IsValueType || p.PropertyType == typeof(string));
foreach (var property in properties)
{
Type propertyType = property.PropertyType;
if (propertyType.IsGenericType &&
propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = Nullable.GetUnderlyingType(propertyType);
}
table.Columns.Add(new DataColumn(property.Name, propertyType));
}
foreach (var entity in entities)
{
table.Rows.Add(
properties.Select(property => property.GetValue(entity, null) ?? DBNull.Value).ToArray());
}
bulkCopy.BulkCopyTimeout = 0;
await bulkCopy.WriteToServerAsync(table);
}
}
public void BulkInsertAll<T>(IEnumerable<T> entities)
{
using (var conn = new SqlConnection(Database.Connection.ConnectionString))
{
conn.Open();
Type t = typeof(T);
var bulkCopy = new SqlBulkCopy(conn)
{
DestinationTableName = GetTableName(t)
};
var table = new DataTable();
var properties = t.GetProperties().Where(p => p.PropertyType.IsValueType || p.PropertyType == typeof(string));
foreach (var property in properties)
{
Type propertyType = property.PropertyType;
if (propertyType.IsGenericType &&
propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = Nullable.GetUnderlyingType(propertyType);
}
table.Columns.Add(new DataColumn(property.Name, propertyType));
}
foreach (var entity in entities)
{
table.Rows.Add(
properties.Select(property => property.GetValue(entity, null) ?? DBNull.Value).ToArray());
}
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(table);
}
}
public string GetTableName(Type type)
{
var metadata = ((IObjectContextAdapter)this).ObjectContext.MetadataWorkspace;
var objectItemCollection = ((ObjectItemCollection)metadata.GetItemCollection(DataSpace.OSpace));
var entityType = metadata
.GetItems<EntityType>(DataSpace.OSpace)
.Single(e => objectItemCollection.GetClrType(e) == type);
var entitySet = metadata
.GetItems<EntityContainer>(DataSpace.CSpace)
.Single()
.EntitySets
.Single(s => s.ElementType.Name == entityType.Name);
var mapping = metadata.GetItems<EntityContainerMapping>(DataSpace.CSSpace)
.Single()
.EntitySetMappings
.Single(s => s.EntitySet == entitySet);
var table = mapping
.EntityTypeMappings.Single()
.Fragments.Single()
.StoreEntitySet;
return (string)table.MetadataProperties["Table"].Value ?? table.Name;
}
}
您可以对继承自IEnumerable的任何集合使用它,如下所示:
await context.BulkInsertAllAsync(items);
下面是使用实体框架和使用SqlBulkCopy类之间的性能比较,具体示例为:如何将复杂对象批量插入SQL Server数据库
正如其他人已经强调的,ORM不应用于批量操作。它们提供了灵活性、关注点分离和其他好处,但批量操作(批量读取除外)不是其中之一。