在。net 4.0中看到新的System.Collections.Concurrent命名空间,我非常激动,非常棒!我已经看到ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBag和BlockingCollection。
有一样东西似乎神秘地丢失了,那就是ConcurrentList<T>。我必须自己写吗(或者从网上得到:))?
我是不是遗漏了什么明显的东西?
在。net 4.0中看到新的System.Collections.Concurrent命名空间,我非常激动,非常棒!我已经看到ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBag和BlockingCollection。
有一样东西似乎神秘地丢失了,那就是ConcurrentList<T>。我必须自己写吗(或者从网上得到:))?
我是不是遗漏了什么明显的东西?
当前回答
如果不需要处理太多项,无锁复制和写入方法非常有效。 下面是我写的一个类:
public class CopyAndWriteList<T>
{
public static List<T> Clear(List<T> list)
{
var a = new List<T>(list);
a.Clear();
return a;
}
public static List<T> Add(List<T> list, T item)
{
var a = new List<T>(list);
a.Add(item);
return a;
}
public static List<T> RemoveAt(List<T> list, int index)
{
var a = new List<T>(list);
a.RemoveAt(index);
return a;
}
public static List<T> Remove(List<T> list, T item)
{
var a = new List<T>(list);
a.Remove(item);
return a;
}
}
使用示例: orders_BUY = CopyAndWriteList.Clear(orders_BUY);
其他回答
我很惊讶没有人提到使用LinkedList作为编写专业类的基础。
通常我们不需要各种集合类的完整API,如果您编写的主要是功能性副作用的免费代码,尽可能使用不可变的类,那么您实际上不会希望改变集合,以支持各种快照实现。
LinkedList solves some difficult problems of creating snapshot copies/clones of large collections. I also use it to create "threadsafe" enumerators to enumerate over the collection. I can cheat, because I know that I'm not changing the collection in any way other than appending, I can keep track of the list size, and only lock on changes to list size. Then my enumerator code simply enumerates from 0 to n for any thread that wants a "snapshot" of the append only collection, that will be guaranteed to represent a "snapshot" of the collection at any moment in time, regardless of what other threads are appending to the head of the collection.
我非常确定大多数需求通常非常简单,您只需要2或3个方法。编写一个真正的泛型库是非常困难的,但解决您自己的代码需求有时可以通过一两个技巧很容易。
LinkedList和优秀的函数式编程万岁。
干杯,……爱你们! 艾尔
附注:样本hack AppendOnly类在这里:https://github.com/goblinfactory/AppendOnly
ConcurrentList(作为一个可调整大小的数组,而不是一个链表)不容易用非阻塞操作编写。它的API不能很好地转换为“并发”版本。
在顺序执行的代码中,使用的数据结构与(编写良好的)并发执行的代码不同。原因是顺序代码隐含着隐式的顺序。然而并发代码并不意味着任何顺序;更妙的是,它暗示着缺乏任何明确的秩序!
因此,具有隐含顺序的数据结构(如List)对于解决并发问题不是很有用。列表意味着顺序,但它并没有清楚地定义这个顺序是什么。因此,操作列表的代码的执行顺序将(在某种程度上)决定列表的隐式顺序,这与有效的并发解决方案直接冲突。
记住,并发是一个数据问题,而不是代码问题!您不能先实现代码(或重写现有的顺序代码),然后获得设计良好的并发解决方案。您需要首先设计数据结构,同时记住在并发系统中不存在隐式排序。
我试过一段时间(也:在GitHub)。我的实现出现了一些问题,我在这里就不赘述了。让我告诉你,更重要的是,我学到了什么。
首先,你不可能得到一个完整的IList<T>的无锁和线程安全的实现。特别是,随机插入和删除是行不通的,除非你也忘记了O(1)随机访问(也就是说,除非你“欺骗”,只是使用某种链表,让索引糟糕透顶)。
我认为可能值得的是一个线程安全的IList<T>的有限子集:特别是一个允许添加并通过索引提供随机只读访问的子集(但没有Insert、RemoveAt等,也没有随机写访问)。
This was the goal of my ConcurrentList<T> implementation. But when I tested its performance in multithreaded scenarios, I found that simply synchronizing adds to a List<T> was faster. Basically, adding to a List<T> is lightning fast already; the complexity of the computational steps involved is miniscule (increment an index and assign to an element in an array; that's really it). You would need a ton of concurrent writes to see any sort of lock contention on this; and even then, the average performance of each write would still beat out the more expensive albeit lockless implementation in ConcurrentList<T>.
在相对罕见的情况下,列表的内部数组需要调整自身的大小,您确实需要付出一点代价。因此,最终我得出结论,这是一个适合的场景,其中仅添加ConcurrentList<T>集合类型是有意义的:当您希望保证在每次调用中添加元素的开销较低时(因此,与平摊性能目标相反)。
它并不是一个像您想象的那样有用的类。
我实现了一个类似于Brian的方法。我的情况不同:
我直接管理数组。 我没有在try块中输入锁。 我使用yield return生成枚举器。 我支持锁递归。这允许在迭代期间从列表中读取。 我尽可能使用可升级的读锁。 DoSync和GetSync方法允许需要独占访问列表的顺序交互。
代码:
public class ConcurrentList<T> : IList<T>, IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private int _count = 0;
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _count;
}
finally
{
_lock.ExitReadLock();
}
}
}
public int InternalArrayLength
{
get
{
_lock.EnterReadLock();
try
{
return _arr.Length;
}
finally
{
_lock.ExitReadLock();
}
}
}
private T[] _arr;
public ConcurrentList(int initialCapacity)
{
_arr = new T[initialCapacity];
}
public ConcurrentList():this(4)
{ }
public ConcurrentList(IEnumerable<T> items)
{
_arr = items.ToArray();
_count = _arr.Length;
}
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
_arr[_count] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
public void AddRange(IEnumerable<T> items)
{
if (items == null)
throw new ArgumentNullException("items");
_lock.EnterWriteLock();
try
{
var arr = items as T[] ?? items.ToArray();
var newCount = _count + arr.Length;
EnsureCapacity(newCount);
Array.Copy(arr, 0, _arr, _count, arr.Length);
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
private void EnsureCapacity(int capacity)
{
if (_arr.Length >= capacity)
return;
int doubled;
checked
{
try
{
doubled = _arr.Length * 2;
}
catch (OverflowException)
{
doubled = int.MaxValue;
}
}
var newLength = Math.Max(doubled, capacity);
Array.Resize(ref _arr, newLength);
}
public bool Remove(T item)
{
_lock.EnterUpgradeableReadLock();
try
{
var i = IndexOfInternal(item);
if (i == -1)
return false;
_lock.EnterWriteLock();
try
{
RemoveAtInternal(i);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
_lock.EnterReadLock();
try
{
for (int i = 0; i < _count; i++)
// deadlocking potential mitigated by lock recursion enforcement
yield return _arr[i];
}
finally
{
_lock.ExitReadLock();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public int IndexOf(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item);
}
finally
{
_lock.ExitReadLock();
}
}
private int IndexOfInternal(T item)
{
return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
}
public void Insert(int index, T item)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index > _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
// shift everything right by one, starting at index
Array.Copy(_arr, index, _arr, index + 1, _count - index);
// insert
_arr[index] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void RemoveAt(int index)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
RemoveAtInternal(index);
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private void RemoveAtInternal(int index)
{
Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
_count--;
// release last element
Array.Clear(_arr, _count, 1);
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
Array.Clear(_arr, 0, _count);
_count = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item) != -1;
}
finally
{
_lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterReadLock();
try
{
if(_count > array.Length - arrayIndex)
throw new ArgumentException("Destination array was not long enough.");
Array.Copy(_arr, 0, array, arrayIndex, _count);
}
finally
{
_lock.ExitReadLock();
}
}
public bool IsReadOnly
{
get { return false; }
}
public T this[int index]
{
get
{
_lock.EnterReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
return _arr[index];
}
finally
{
_lock.ExitReadLock();
}
}
set
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
_arr[index] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
public void DoSync(Action<ConcurrentList<T>> action)
{
GetSync(l =>
{
action(l);
return 0;
});
}
public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
{
_lock.EnterWriteLock();
try
{
return func(this);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}