在。net 4.0中看到新的System.Collections.Concurrent命名空间,我非常激动,非常棒!我已经看到ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBag和BlockingCollection。
有一样东西似乎神秘地丢失了,那就是ConcurrentList<T>。我必须自己写吗(或者从网上得到:))?
我是不是遗漏了什么明显的东西?
在。net 4.0中看到新的System.Collections.Concurrent命名空间,我非常激动,非常棒!我已经看到ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBag和BlockingCollection。
有一样东西似乎神秘地丢失了,那就是ConcurrentList<T>。我必须自己写吗(或者从网上得到:))?
我是不是遗漏了什么明显的东西?
当前回答
我实现了一个类似于Brian的方法。我的情况不同:
我直接管理数组。 我没有在try块中输入锁。 我使用yield return生成枚举器。 我支持锁递归。这允许在迭代期间从列表中读取。 我尽可能使用可升级的读锁。 DoSync和GetSync方法允许需要独占访问列表的顺序交互。
代码:
public class ConcurrentList<T> : IList<T>, IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private int _count = 0;
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _count;
}
finally
{
_lock.ExitReadLock();
}
}
}
public int InternalArrayLength
{
get
{
_lock.EnterReadLock();
try
{
return _arr.Length;
}
finally
{
_lock.ExitReadLock();
}
}
}
private T[] _arr;
public ConcurrentList(int initialCapacity)
{
_arr = new T[initialCapacity];
}
public ConcurrentList():this(4)
{ }
public ConcurrentList(IEnumerable<T> items)
{
_arr = items.ToArray();
_count = _arr.Length;
}
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
_arr[_count] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
public void AddRange(IEnumerable<T> items)
{
if (items == null)
throw new ArgumentNullException("items");
_lock.EnterWriteLock();
try
{
var arr = items as T[] ?? items.ToArray();
var newCount = _count + arr.Length;
EnsureCapacity(newCount);
Array.Copy(arr, 0, _arr, _count, arr.Length);
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
private void EnsureCapacity(int capacity)
{
if (_arr.Length >= capacity)
return;
int doubled;
checked
{
try
{
doubled = _arr.Length * 2;
}
catch (OverflowException)
{
doubled = int.MaxValue;
}
}
var newLength = Math.Max(doubled, capacity);
Array.Resize(ref _arr, newLength);
}
public bool Remove(T item)
{
_lock.EnterUpgradeableReadLock();
try
{
var i = IndexOfInternal(item);
if (i == -1)
return false;
_lock.EnterWriteLock();
try
{
RemoveAtInternal(i);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
_lock.EnterReadLock();
try
{
for (int i = 0; i < _count; i++)
// deadlocking potential mitigated by lock recursion enforcement
yield return _arr[i];
}
finally
{
_lock.ExitReadLock();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public int IndexOf(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item);
}
finally
{
_lock.ExitReadLock();
}
}
private int IndexOfInternal(T item)
{
return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
}
public void Insert(int index, T item)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index > _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
// shift everything right by one, starting at index
Array.Copy(_arr, index, _arr, index + 1, _count - index);
// insert
_arr[index] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void RemoveAt(int index)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
RemoveAtInternal(index);
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private void RemoveAtInternal(int index)
{
Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
_count--;
// release last element
Array.Clear(_arr, _count, 1);
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
Array.Clear(_arr, 0, _count);
_count = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item) != -1;
}
finally
{
_lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterReadLock();
try
{
if(_count > array.Length - arrayIndex)
throw new ArgumentException("Destination array was not long enough.");
Array.Copy(_arr, 0, array, arrayIndex, _count);
}
finally
{
_lock.ExitReadLock();
}
}
public bool IsReadOnly
{
get { return false; }
}
public T this[int index]
{
get
{
_lock.EnterReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
return _arr[index];
}
finally
{
_lock.ExitReadLock();
}
}
set
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
_arr[index] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
public void DoSync(Action<ConcurrentList<T>> action)
{
GetSync(l =>
{
action(l);
return 0;
});
}
public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
{
_lock.EnterWriteLock();
try
{
return func(this);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}
其他回答
你用ConcurrentList做什么?
在线程世界中,随机访问容器的概念并不像它看起来那样有用。该声明
if (i < MyConcurrentList.Count)
x = MyConcurrentList[i];
总的来说仍然不是线程安全的。
与其创建ConcurrentList,不如尝试使用现有的内容构建解决方案。最常见的类是ConcurrentBag,尤其是BlockingCollection。
我实现了一个类似于Brian的方法。我的情况不同:
我直接管理数组。 我没有在try块中输入锁。 我使用yield return生成枚举器。 我支持锁递归。这允许在迭代期间从列表中读取。 我尽可能使用可升级的读锁。 DoSync和GetSync方法允许需要独占访问列表的顺序交互。
代码:
public class ConcurrentList<T> : IList<T>, IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private int _count = 0;
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _count;
}
finally
{
_lock.ExitReadLock();
}
}
}
public int InternalArrayLength
{
get
{
_lock.EnterReadLock();
try
{
return _arr.Length;
}
finally
{
_lock.ExitReadLock();
}
}
}
private T[] _arr;
public ConcurrentList(int initialCapacity)
{
_arr = new T[initialCapacity];
}
public ConcurrentList():this(4)
{ }
public ConcurrentList(IEnumerable<T> items)
{
_arr = items.ToArray();
_count = _arr.Length;
}
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
_arr[_count] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
public void AddRange(IEnumerable<T> items)
{
if (items == null)
throw new ArgumentNullException("items");
_lock.EnterWriteLock();
try
{
var arr = items as T[] ?? items.ToArray();
var newCount = _count + arr.Length;
EnsureCapacity(newCount);
Array.Copy(arr, 0, _arr, _count, arr.Length);
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
private void EnsureCapacity(int capacity)
{
if (_arr.Length >= capacity)
return;
int doubled;
checked
{
try
{
doubled = _arr.Length * 2;
}
catch (OverflowException)
{
doubled = int.MaxValue;
}
}
var newLength = Math.Max(doubled, capacity);
Array.Resize(ref _arr, newLength);
}
public bool Remove(T item)
{
_lock.EnterUpgradeableReadLock();
try
{
var i = IndexOfInternal(item);
if (i == -1)
return false;
_lock.EnterWriteLock();
try
{
RemoveAtInternal(i);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
_lock.EnterReadLock();
try
{
for (int i = 0; i < _count; i++)
// deadlocking potential mitigated by lock recursion enforcement
yield return _arr[i];
}
finally
{
_lock.ExitReadLock();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public int IndexOf(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item);
}
finally
{
_lock.ExitReadLock();
}
}
private int IndexOfInternal(T item)
{
return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
}
public void Insert(int index, T item)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index > _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
// shift everything right by one, starting at index
Array.Copy(_arr, index, _arr, index + 1, _count - index);
// insert
_arr[index] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void RemoveAt(int index)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
RemoveAtInternal(index);
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private void RemoveAtInternal(int index)
{
Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
_count--;
// release last element
Array.Clear(_arr, _count, 1);
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
Array.Clear(_arr, 0, _count);
_count = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item) != -1;
}
finally
{
_lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterReadLock();
try
{
if(_count > array.Length - arrayIndex)
throw new ArgumentException("Destination array was not long enough.");
Array.Copy(_arr, 0, array, arrayIndex, _count);
}
finally
{
_lock.ExitReadLock();
}
}
public bool IsReadOnly
{
get { return false; }
}
public T this[int index]
{
get
{
_lock.EnterReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
return _arr[index];
}
finally
{
_lock.ExitReadLock();
}
}
set
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
_arr[index] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
public void DoSync(Action<ConcurrentList<T>> action)
{
GetSync(l =>
{
action(l);
return 0;
});
}
public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
{
_lock.EnterWriteLock();
try
{
return func(this);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}
在读的数量远远超过写的情况下,或者(无论多么频繁)写是非并发的情况下,在写时复制方法可能是合适的。
实现如下所示
lockless blazingly fast for concurrent reads, even while concurrent modifications are ongoing - no matter how long they take because "snapshots" are immutable, lockless atomicity is possible, i.e. var snap = _list; snap[snap.Count - 1]; will never (well, except for an empty list of course) throw, and you also get thread-safe enumeration with snapshot semantics for free.. how I LOVE immutability! implemented generically, applicable to any data structure and any type of modification dead simple, i.e. easy to test, debug, verify by reading the code usable in .Net 3.5
为了让写时复制工作,你必须让你的数据结构有效地保持不可变,也就是说,在你让它们对其他线程可用之后,任何人都不允许改变它们。当你想修改的时候,你
克隆结构 对克隆进行修改 原子地交换对修改后的克隆的引用
Code
static class CopyOnWriteSwapper
{
public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
where T : class
{
while (true)
{
var objBefore = Volatile.Read(ref obj);
var newObj = cloner(objBefore);
op(newObj);
if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
return;
}
}
}
使用
CopyOnWriteSwapper.Swap(ref _myList,
orig => new List<string>(orig),
clone => clone.Add("asdf"));
如果你需要更多的性能,它将有助于非泛化方法,例如,为你想要的每一种类型的修改(添加,删除,…)创建一个方法,并硬编码函数指针克隆和op。
注意事项#1:确保没有人修改(假定)不可变的数据结构是你的责任。在泛型实现中我们无法阻止这种情况,但当专门化到List<T>时,可以使用List. asreadonly()来防止修改。
注意事项2 .注意列表中的值。上面的copy on write方法只保护它们的列表成员关系,但如果你放的不是字符串,而是一些其他可变对象,你必须注意线程安全(例如锁定)。但这与这个解决方案是正交的,例如,可变值的锁定可以很容易地使用而没有问题。你只需要意识到这一点。
注意事项#3如果您的数据结构很大,并且您经常修改它,那么从内存消耗和复制所涉及的CPU成本两方面考虑,全部在写时复制的方法可能是不可取的。在这种情况下,你可能想要使用MS的不可变集合。
有些人强调了一些好处(以及我的一些想法):
It could looklikes insane to unable random accesser (indexer) but to me it appears fine. You only have to think that there is many methods on multi-threaded collections that could fail like Indexer and Delete. You could also define failure (fallback) action for write accessor like "fail" or simply "add at the end". It is not because it is a multithreaded collection that it will always be used in a multithreaded context. Or it could also be used by only one writer and one reader. Another way to be able to use indexer in a safe manner could be to wrap actions into a lock of the collection using its root (if made public). For many people, making a rootLock visible goes agaist "Good practice". I'm not 100% sure about this point because if it is hidden you remove a lot of flexibility to the user. We always have to remember that programming multithread is not for anybody. We can't prevent every kind of wrong usage. Microsoft will have to do some work and define some new standard to introduce proper usage of Multithreaded collection. First the IEnumerator should not have a moveNext but should have a GetNext that return true or false and get an out paramter of type T (this way the iteration would not be blocking anymore). Also, Microsoft already use "using" internally in the foreach but sometimes use the IEnumerator directly without wrapping it with "using" (a bug in collection view and probably at more places) - Wrapping usage of IEnumerator is a recommended pratice by Microsoft. This bug remove good potential for safe iterator... Iterator that lock collection in constructor and unlock on its Dispose method - for a blocking foreach method.
这不是答案。这只是一些不适合特定场合的注释。
... My conclusion, Microsoft has to make some deep changes to the "foreach" to make MultiThreaded collection easier to use. Also it has to follow there own rules of IEnumerator usage. Until that, we can write a MultiThreadList easily that would use a blocking iterator but that will not follow "IList". Instead, you will have to define own "IListPersonnal" interface that could fail on "insert", "remove" and random accessor (indexer) without exception. But who will want to use it if it is not standard ?
在顺序执行的代码中,使用的数据结构与(编写良好的)并发执行的代码不同。原因是顺序代码隐含着隐式的顺序。然而并发代码并不意味着任何顺序;更妙的是,它暗示着缺乏任何明确的秩序!
因此,具有隐含顺序的数据结构(如List)对于解决并发问题不是很有用。列表意味着顺序,但它并没有清楚地定义这个顺序是什么。因此,操作列表的代码的执行顺序将(在某种程度上)决定列表的隐式顺序,这与有效的并发解决方案直接冲突。
记住,并发是一个数据问题,而不是代码问题!您不能先实现代码(或重写现有的顺序代码),然后获得设计良好的并发解决方案。您需要首先设计数据结构,同时记住在并发系统中不存在隐式排序。