在。net 4.0中看到新的System.Collections.Concurrent命名空间,我非常激动,非常棒!我已经看到ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBag和BlockingCollection。

有一样东西似乎神秘地丢失了,那就是ConcurrentList<T>。我必须自己写吗(或者从网上得到:))?

我是不是遗漏了什么明显的东西?


当前回答

我实现了一个类似于Brian的方法。我的情况不同:

我直接管理数组。 我没有在try块中输入锁。 我使用yield return生成枚举器。 我支持锁递归。这允许在迭代期间从列表中读取。 我尽可能使用可升级的读锁。 DoSync和GetSync方法允许需要独占访问列表的顺序交互。

代码:

public class ConcurrentList<T> : IList<T>, IDisposable
{
    private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    private int _count = 0;

    public int Count
    {
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _count;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    public int InternalArrayLength
    { 
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _arr.Length;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    private T[] _arr;

    public ConcurrentList(int initialCapacity)
    {
        _arr = new T[initialCapacity];
    }

    public ConcurrentList():this(4)
    { }

    public ConcurrentList(IEnumerable<T> items)
    {
        _arr = items.ToArray();
        _count = _arr.Length;
    }

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {       
            var newCount = _count + 1;          
            EnsureCapacity(newCount);           
            _arr[_count] = item;
            _count = newCount;                  
        }
        finally
        {
            _lock.ExitWriteLock();
        }       
    }

    public void AddRange(IEnumerable<T> items)
    {
        if (items == null)
            throw new ArgumentNullException("items");

        _lock.EnterWriteLock();

        try
        {           
            var arr = items as T[] ?? items.ToArray();          
            var newCount = _count + arr.Length;
            EnsureCapacity(newCount);           
            Array.Copy(arr, 0, _arr, _count, arr.Length);       
            _count = newCount;
        }
        finally
        {
            _lock.ExitWriteLock();          
        }
    }

    private void EnsureCapacity(int capacity)
    {   
        if (_arr.Length >= capacity)
            return;

        int doubled;
        checked
        {
            try
            {           
                doubled = _arr.Length * 2;
            }
            catch (OverflowException)
            {
                doubled = int.MaxValue;
            }
        }

        var newLength = Math.Max(doubled, capacity);            
        Array.Resize(ref _arr, newLength);
    }

    public bool Remove(T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {           
            var i = IndexOfInternal(item);

            if (i == -1)
                return false;

            _lock.EnterWriteLock();
            try
            {   
                RemoveAtInternal(i);
                return true;
            }
            finally
            {               
                _lock.ExitWriteLock();
            }
        }
        finally
        {           
            _lock.ExitUpgradeableReadLock();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        _lock.EnterReadLock();

        try
        {    
            for (int i = 0; i < _count; i++)
                // deadlocking potential mitigated by lock recursion enforcement
                yield return _arr[i]; 
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    public int IndexOf(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item);
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }

    private int IndexOfInternal(T item)
    {
        return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
    }

    public void Insert(int index, T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {                       
            if (index > _count)
                throw new ArgumentOutOfRangeException("index"); 

            _lock.EnterWriteLock();
            try
            {       
                var newCount = _count + 1;
                EnsureCapacity(newCount);

                // shift everything right by one, starting at index
                Array.Copy(_arr, index, _arr, index + 1, _count - index);

                // insert
                _arr[index] = item;     
                _count = newCount;
            }
            finally
            {           
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }


    }

    public void RemoveAt(int index)
    {   
        _lock.EnterUpgradeableReadLock();
        try
        {   
            if (index >= _count)
                throw new ArgumentOutOfRangeException("index");

            _lock.EnterWriteLock();
            try
            {           
                RemoveAtInternal(index);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }
    }

    private void RemoveAtInternal(int index)
    {           
        Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
        _count--;

        // release last element
        Array.Clear(_arr, _count, 1);
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {        
            Array.Clear(_arr, 0, _count);
            _count = 0;
        }
        finally
        {           
            _lock.ExitWriteLock();
        }   
    }

    public bool Contains(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item) != -1;
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {       
        _lock.EnterReadLock();
        try
        {           
            if(_count > array.Length - arrayIndex)
                throw new ArgumentException("Destination array was not long enough.");

            Array.Copy(_arr, 0, array, arrayIndex, _count);
        }
        finally
        {
            _lock.ExitReadLock();           
        }
    }

    public bool IsReadOnly
    {   
        get { return false; }
    }

    public T this[int index]
    {
        get
        {
            _lock.EnterReadLock();
            try
            {           
                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                return _arr[index]; 
            }
            finally
            {
                _lock.ExitReadLock();               
            }           
        }
        set
        {
            _lock.EnterUpgradeableReadLock();
            try
            {

                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                _lock.EnterWriteLock();
                try
                {                       
                    _arr[index] = value;
                }
                finally
                {
                    _lock.ExitWriteLock();              
                }
            }
            finally
            {
                _lock.ExitUpgradeableReadLock();
            }

        }
    }

    public void DoSync(Action<ConcurrentList<T>> action)
    {
        GetSync(l =>
        {
            action(l);
            return 0;
        });
    }

    public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
    {
        _lock.EnterWriteLock();
        try
        {           
            return func(this);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Dispose()
    {   
        _lock.Dispose();
    }
}

其他回答

你用ConcurrentList做什么?

在线程世界中,随机访问容器的概念并不像它看起来那样有用。该声明

  if (i < MyConcurrentList.Count)  
      x = MyConcurrentList[i]; 

总的来说仍然不是线程安全的。

与其创建ConcurrentList,不如尝试使用现有的内容构建解决方案。最常见的类是ConcurrentBag,尤其是BlockingCollection。

我很惊讶没有人提到使用LinkedList作为编写专业类的基础。

通常我们不需要各种集合类的完整API,如果您编写的主要是功能性副作用的免费代码,尽可能使用不可变的类,那么您实际上不会希望改变集合,以支持各种快照实现。

LinkedList solves some difficult problems of creating snapshot copies/clones of large collections. I also use it to create "threadsafe" enumerators to enumerate over the collection. I can cheat, because I know that I'm not changing the collection in any way other than appending, I can keep track of the list size, and only lock on changes to list size. Then my enumerator code simply enumerates from 0 to n for any thread that wants a "snapshot" of the append only collection, that will be guaranteed to represent a "snapshot" of the collection at any moment in time, regardless of what other threads are appending to the head of the collection.

我非常确定大多数需求通常非常简单,您只需要2或3个方法。编写一个真正的泛型库是非常困难的,但解决您自己的代码需求有时可以通过一两个技巧很容易。

LinkedList和优秀的函数式编程万岁。

干杯,……爱你们! 艾尔

附注:样本hack AppendOnly类在这里:https://github.com/goblinfactory/AppendOnly

在顺序执行的代码中,使用的数据结构与(编写良好的)并发执行的代码不同。原因是顺序代码隐含着隐式的顺序。然而并发代码并不意味着任何顺序;更妙的是,它暗示着缺乏任何明确的秩序!

因此,具有隐含顺序的数据结构(如List)对于解决并发问题不是很有用。列表意味着顺序,但它并没有清楚地定义这个顺序是什么。因此,操作列表的代码的执行顺序将(在某种程度上)决定列表的隐式顺序,这与有效的并发解决方案直接冲突。

记住,并发是一个数据问题,而不是代码问题!您不能先实现代码(或重写现有的顺序代码),然后获得设计良好的并发解决方案。您需要首先设计数据结构,同时记住在并发系统中不存在隐式排序。

ConcurrentList(作为一个可调整大小的数组,而不是一个链表)不容易用非阻塞操作编写。它的API不能很好地转换为“并发”版本。

我实现了一个类似于Brian的方法。我的情况不同:

我直接管理数组。 我没有在try块中输入锁。 我使用yield return生成枚举器。 我支持锁递归。这允许在迭代期间从列表中读取。 我尽可能使用可升级的读锁。 DoSync和GetSync方法允许需要独占访问列表的顺序交互。

代码:

public class ConcurrentList<T> : IList<T>, IDisposable
{
    private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    private int _count = 0;

    public int Count
    {
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _count;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    public int InternalArrayLength
    { 
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _arr.Length;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    private T[] _arr;

    public ConcurrentList(int initialCapacity)
    {
        _arr = new T[initialCapacity];
    }

    public ConcurrentList():this(4)
    { }

    public ConcurrentList(IEnumerable<T> items)
    {
        _arr = items.ToArray();
        _count = _arr.Length;
    }

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {       
            var newCount = _count + 1;          
            EnsureCapacity(newCount);           
            _arr[_count] = item;
            _count = newCount;                  
        }
        finally
        {
            _lock.ExitWriteLock();
        }       
    }

    public void AddRange(IEnumerable<T> items)
    {
        if (items == null)
            throw new ArgumentNullException("items");

        _lock.EnterWriteLock();

        try
        {           
            var arr = items as T[] ?? items.ToArray();          
            var newCount = _count + arr.Length;
            EnsureCapacity(newCount);           
            Array.Copy(arr, 0, _arr, _count, arr.Length);       
            _count = newCount;
        }
        finally
        {
            _lock.ExitWriteLock();          
        }
    }

    private void EnsureCapacity(int capacity)
    {   
        if (_arr.Length >= capacity)
            return;

        int doubled;
        checked
        {
            try
            {           
                doubled = _arr.Length * 2;
            }
            catch (OverflowException)
            {
                doubled = int.MaxValue;
            }
        }

        var newLength = Math.Max(doubled, capacity);            
        Array.Resize(ref _arr, newLength);
    }

    public bool Remove(T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {           
            var i = IndexOfInternal(item);

            if (i == -1)
                return false;

            _lock.EnterWriteLock();
            try
            {   
                RemoveAtInternal(i);
                return true;
            }
            finally
            {               
                _lock.ExitWriteLock();
            }
        }
        finally
        {           
            _lock.ExitUpgradeableReadLock();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        _lock.EnterReadLock();

        try
        {    
            for (int i = 0; i < _count; i++)
                // deadlocking potential mitigated by lock recursion enforcement
                yield return _arr[i]; 
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    public int IndexOf(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item);
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }

    private int IndexOfInternal(T item)
    {
        return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
    }

    public void Insert(int index, T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {                       
            if (index > _count)
                throw new ArgumentOutOfRangeException("index"); 

            _lock.EnterWriteLock();
            try
            {       
                var newCount = _count + 1;
                EnsureCapacity(newCount);

                // shift everything right by one, starting at index
                Array.Copy(_arr, index, _arr, index + 1, _count - index);

                // insert
                _arr[index] = item;     
                _count = newCount;
            }
            finally
            {           
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }


    }

    public void RemoveAt(int index)
    {   
        _lock.EnterUpgradeableReadLock();
        try
        {   
            if (index >= _count)
                throw new ArgumentOutOfRangeException("index");

            _lock.EnterWriteLock();
            try
            {           
                RemoveAtInternal(index);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }
    }

    private void RemoveAtInternal(int index)
    {           
        Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
        _count--;

        // release last element
        Array.Clear(_arr, _count, 1);
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {        
            Array.Clear(_arr, 0, _count);
            _count = 0;
        }
        finally
        {           
            _lock.ExitWriteLock();
        }   
    }

    public bool Contains(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item) != -1;
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {       
        _lock.EnterReadLock();
        try
        {           
            if(_count > array.Length - arrayIndex)
                throw new ArgumentException("Destination array was not long enough.");

            Array.Copy(_arr, 0, array, arrayIndex, _count);
        }
        finally
        {
            _lock.ExitReadLock();           
        }
    }

    public bool IsReadOnly
    {   
        get { return false; }
    }

    public T this[int index]
    {
        get
        {
            _lock.EnterReadLock();
            try
            {           
                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                return _arr[index]; 
            }
            finally
            {
                _lock.ExitReadLock();               
            }           
        }
        set
        {
            _lock.EnterUpgradeableReadLock();
            try
            {

                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                _lock.EnterWriteLock();
                try
                {                       
                    _arr[index] = value;
                }
                finally
                {
                    _lock.ExitWriteLock();              
                }
            }
            finally
            {
                _lock.ExitUpgradeableReadLock();
            }

        }
    }

    public void DoSync(Action<ConcurrentList<T>> action)
    {
        GetSync(l =>
        {
            action(l);
            return 0;
        });
    }

    public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
    {
        _lock.EnterWriteLock();
        try
        {           
            return func(this);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Dispose()
    {   
        _lock.Dispose();
    }
}