在。net 4.0中看到新的System.Collections.Concurrent命名空间,我非常激动,非常棒!我已经看到ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBag和BlockingCollection。
有一样东西似乎神秘地丢失了,那就是ConcurrentList<T>。我必须自己写吗(或者从网上得到:))?
我是不是遗漏了什么明显的东西?
在。net 4.0中看到新的System.Collections.Concurrent命名空间,我非常激动,非常棒!我已经看到ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBag和BlockingCollection。
有一样东西似乎神秘地丢失了,那就是ConcurrentList<T>。我必须自己写吗(或者从网上得到:))?
我是不是遗漏了什么明显的东西?
当前回答
我试过一段时间(也:在GitHub)。我的实现出现了一些问题,我在这里就不赘述了。让我告诉你,更重要的是,我学到了什么。
首先,你不可能得到一个完整的IList<T>的无锁和线程安全的实现。特别是,随机插入和删除是行不通的,除非你也忘记了O(1)随机访问(也就是说,除非你“欺骗”,只是使用某种链表,让索引糟糕透顶)。
我认为可能值得的是一个线程安全的IList<T>的有限子集:特别是一个允许添加并通过索引提供随机只读访问的子集(但没有Insert、RemoveAt等,也没有随机写访问)。
This was the goal of my ConcurrentList<T> implementation. But when I tested its performance in multithreaded scenarios, I found that simply synchronizing adds to a List<T> was faster. Basically, adding to a List<T> is lightning fast already; the complexity of the computational steps involved is miniscule (increment an index and assign to an element in an array; that's really it). You would need a ton of concurrent writes to see any sort of lock contention on this; and even then, the average performance of each write would still beat out the more expensive albeit lockless implementation in ConcurrentList<T>.
在相对罕见的情况下,列表的内部数组需要调整自身的大小,您确实需要付出一点代价。因此,最终我得出结论,这是一个适合的场景,其中仅添加ConcurrentList<T>集合类型是有意义的:当您希望保证在每次调用中添加元素的开销较低时(因此,与平摊性能目标相反)。
它并不是一个像您想象的那样有用的类。
其他回答
如果不需要处理太多项,无锁复制和写入方法非常有效。 下面是我写的一个类:
public class CopyAndWriteList<T>
{
public static List<T> Clear(List<T> list)
{
var a = new List<T>(list);
a.Clear();
return a;
}
public static List<T> Add(List<T> list, T item)
{
var a = new List<T>(list);
a.Add(item);
return a;
}
public static List<T> RemoveAt(List<T> list, int index)
{
var a = new List<T>(list);
a.RemoveAt(index);
return a;
}
public static List<T> Remove(List<T> list, T item)
{
var a = new List<T>(list);
a.Remove(item);
return a;
}
}
使用示例: orders_BUY = CopyAndWriteList.Clear(orders_BUY);
我实现了一个类似于Brian的方法。我的情况不同:
我直接管理数组。 我没有在try块中输入锁。 我使用yield return生成枚举器。 我支持锁递归。这允许在迭代期间从列表中读取。 我尽可能使用可升级的读锁。 DoSync和GetSync方法允许需要独占访问列表的顺序交互。
代码:
public class ConcurrentList<T> : IList<T>, IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private int _count = 0;
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _count;
}
finally
{
_lock.ExitReadLock();
}
}
}
public int InternalArrayLength
{
get
{
_lock.EnterReadLock();
try
{
return _arr.Length;
}
finally
{
_lock.ExitReadLock();
}
}
}
private T[] _arr;
public ConcurrentList(int initialCapacity)
{
_arr = new T[initialCapacity];
}
public ConcurrentList():this(4)
{ }
public ConcurrentList(IEnumerable<T> items)
{
_arr = items.ToArray();
_count = _arr.Length;
}
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
_arr[_count] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
public void AddRange(IEnumerable<T> items)
{
if (items == null)
throw new ArgumentNullException("items");
_lock.EnterWriteLock();
try
{
var arr = items as T[] ?? items.ToArray();
var newCount = _count + arr.Length;
EnsureCapacity(newCount);
Array.Copy(arr, 0, _arr, _count, arr.Length);
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
private void EnsureCapacity(int capacity)
{
if (_arr.Length >= capacity)
return;
int doubled;
checked
{
try
{
doubled = _arr.Length * 2;
}
catch (OverflowException)
{
doubled = int.MaxValue;
}
}
var newLength = Math.Max(doubled, capacity);
Array.Resize(ref _arr, newLength);
}
public bool Remove(T item)
{
_lock.EnterUpgradeableReadLock();
try
{
var i = IndexOfInternal(item);
if (i == -1)
return false;
_lock.EnterWriteLock();
try
{
RemoveAtInternal(i);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
_lock.EnterReadLock();
try
{
for (int i = 0; i < _count; i++)
// deadlocking potential mitigated by lock recursion enforcement
yield return _arr[i];
}
finally
{
_lock.ExitReadLock();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public int IndexOf(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item);
}
finally
{
_lock.ExitReadLock();
}
}
private int IndexOfInternal(T item)
{
return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
}
public void Insert(int index, T item)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index > _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
// shift everything right by one, starting at index
Array.Copy(_arr, index, _arr, index + 1, _count - index);
// insert
_arr[index] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void RemoveAt(int index)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
RemoveAtInternal(index);
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private void RemoveAtInternal(int index)
{
Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
_count--;
// release last element
Array.Clear(_arr, _count, 1);
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
Array.Clear(_arr, 0, _count);
_count = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item) != -1;
}
finally
{
_lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterReadLock();
try
{
if(_count > array.Length - arrayIndex)
throw new ArgumentException("Destination array was not long enough.");
Array.Copy(_arr, 0, array, arrayIndex, _count);
}
finally
{
_lock.ExitReadLock();
}
}
public bool IsReadOnly
{
get { return false; }
}
public T this[int index]
{
get
{
_lock.EnterReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
return _arr[index];
}
finally
{
_lock.ExitReadLock();
}
}
set
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
_arr[index] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
public void DoSync(Action<ConcurrentList<T>> action)
{
GetSync(l =>
{
action(l);
return 0;
});
}
public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
{
_lock.EnterWriteLock();
try
{
return func(this);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}
ConcurrentList(作为一个可调整大小的数组,而不是一个链表)不容易用非阻塞操作编写。它的API不能很好地转换为“并发”版本。
在顺序执行的代码中,使用的数据结构与(编写良好的)并发执行的代码不同。原因是顺序代码隐含着隐式的顺序。然而并发代码并不意味着任何顺序;更妙的是,它暗示着缺乏任何明确的秩序!
因此,具有隐含顺序的数据结构(如List)对于解决并发问题不是很有用。列表意味着顺序,但它并没有清楚地定义这个顺序是什么。因此,操作列表的代码的执行顺序将(在某种程度上)决定列表的隐式顺序,这与有效的并发解决方案直接冲突。
记住,并发是一个数据问题,而不是代码问题!您不能先实现代码(或重写现有的顺序代码),然后获得设计良好的并发解决方案。您需要首先设计数据结构,同时记住在并发系统中不存在隐式排序。
没有ConcurrentList的原因是因为它根本不能被写入。原因是在IList中有几个重要的操作依赖于索引,而这根本行不通。例如:
int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");
作者所追求的效果是在“cat”之前插入“dog”,但在多线程环境中,这两行代码之间的列表可能发生任何事情。例如,另一个线程可能执行list. removeat(0),将整个列表向左移动,但关键的是,catIndex不会改变。这里的影响是Insert操作实际上将“狗”放在猫之后,而不是在它之前。
The several implementations that you see offered as "answers" to this question are well-meaning, but as the above shows, they don't offer reliable results. If you really want list-like semantics in a multithreaded environment, you can't get there by putting locks inside the list implementation methods. You have to ensure that any index you use lives entirely inside the context of the lock. The upshot is that you can use a List in a multithreaded environment with the right locking, but the list itself cannot be made to exist in that world.
如果你认为你需要一个并发列表,实际上只有两种可能:
你真正需要的是一个ConcurrentBag 您需要创建自己的集合,可能使用List和自己的并发控制来实现。
If you have a ConcurrentBag and are in a position where you need to pass it as an IList, then you have a problem, because the method you're calling has specified that they might try to do something like I did above with the cat & dog. In most worlds, what that means is that the method you're calling is simply not built to work in a multi-threaded environment. That means you either refactor it so that it is or, if you can't, you're going to have to handle it very carefully. You you'll almost certainly be required to create your own collection with its own locks, and call the offending method within a lock.