我如何创建一个并发的列表实例,在那里我可以通过索引访问元素?JDK是否有我可以使用的类或工厂方法?


因为获取位置和从给定位置获取元素的行为自然需要一些锁定(您不能让列表在这两个操作之间发生结构变化)。

并发收集的思想是,每个操作本身都是原子的,可以在没有显式锁定/同步的情况下完成。

因此,从给定的List中获取位置n的元素作为原子操作在期望并发访问的情况下没有太大意义。


在java.util.concurrent中有一个并发列表实现。特别是CopyOnWriteArrayList。


免责声明:这个答案发表于2011年,在JDK 5之前,也在更高级和最优的并发api之前。因此,虽然下面的方法可以工作,但它不是最好的选择。


如果你需要的只是简单的调用同步,你可以很好地使用Collections.synchronizedList(List):

 List<Object> objList = Collections.synchronizedList(new ArrayList<Object>());

ConcurrentLinkedQueue

如果您不关心是否具有基于索引的访问,而只想要List的插入顺序保持特性,那么可以考虑java.util.concurrent.ConcurrentLinkedQueue。因为它实现了Iterable,一旦你添加完所有的项,你可以使用增强的for语法循环内容:

Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();

//Multiple threads can safely call globalQueue.add()...

for (String href : globalQueue) {
    //do something with href
}

CopyOnWriteArrayList是ArrayList的线程安全变体 所有可变操作(添加、设置等)都由 创建底层数组的新副本。

CopyOnWriteArrayList是同步列表实现列表接口的并发替代,它是java.util.concurrent包的一部分,它是线程安全的集合。

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable

CopyOnWriteArrayList是故障安全的,当底层的CopyOnWriteArrayList在迭代期间使用ArrayList的单独副本被修改时,不会抛出ConcurrentModificationException。

这通常代价太大,因为拷贝数组涉及每个更新操作,将创建一个克隆副本。CopyOnWriteArrayList是频繁读操作的最佳选择。

/**
         * Returns a shallow copy of this list.  (The elements themselves
         * are not copied.)
         *
         * @return a clone of this list
         */
        public Object clone() {
            try {
                @SuppressWarnings("unchecked")
                CopyOnWriteArrayList<E> clone =
                    (CopyOnWriteArrayList<E>) super.clone();
                clone.resetLock();
                return clone;
            } catch (CloneNotSupportedException e) {
                // this shouldn't happen, since we are Cloneable
                throw new InternalError();
            }
        }

http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/CopyOnWriteArrayList.html https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/CopyOnWriteArrayList.html https://www.logicbig.com/tutorials/core-java-tutorial/java-collections/concurrent-collection-cheatsheet.html


你有这些选择:

Collections.synchronizedList(): you can wrap any List implementation (ArrayList, LinkedList or a 3rd-party list). Access to every method (reading and writing) will be protected using synchronized. When using iterator() or enhanced for loop, you must manually synchronize the whole iteration. While iterating, other threads are fully blocked even from reading. You can also synchronize separately for each hasNext and next calls, but then ConcurrentModificationException is possible. CopyOnWriteArrayList: it's expensive to modify, but wait-free to read. Iterators never throw ConcurrentModificationException, they return a snapshot of the list at the moment of iterator creation even if the list is modified by another thread while iterating. Useful for infrequently updated lists. Bulk operations like addAll are preferred for updates - the internal array is copied less many times. Vector: very much like synchronizedList(new ArrayList<>()), but iteration is synchronized too. However, iterators can throw ConcurrentModificationException if the vector is modified by another thread while iterating.

其他选项:

Queue or Deque might be an alternative if you only add/remove at the ends of the list or iterate it. Queue allows only adding at one end and removing from the other end, Deque allows adding and removing on both ends. There's no access by index. There are multiple implementations with better concurrency properties than any List can provide. Look at "All Known Implementing Classes" in the Queue javadoc, those implementations that are in the java.util.concurrent package are concurrent. You can also have a look at JCTools, it contains faster queue implementations specialized for single consumer or single producer. Collections.unmodifiableList(): wait-free, thread-safe, but non-modifiable List.of & List.copyOf: Another non-modifiable list in Java 9 and later.


如果你不打算从列表中删除元素(因为这需要在删除元素之后更改所有元素的索引),那么你可以使用ConcurrentSkipListMap<Integer, T>来代替ArrayList<T>,例如:

NavigableMap<Integer, T> map = new ConcurrentSkipListMap<>();

只要只有一个写线程(否则map.size()和map.put()之间存在竞争条件),这将允许你像下面这样在“列表”的末尾添加项:

// Add item to end of the "list":
map.put(map.size(), item);

显然,你也可以通过简单地调用map来修改“列表”(即map)中任何项的值。把(指数项)。

将项放入映射或按索引检索它们的平均代价是O(log(n)),并且ConcurrentSkipListMap是无锁的,这使得它明显优于Vector (ArrayList的旧同步版本)。

您可以通过使用NavigableMap接口的方法在“列表”中来回迭代。

只要您理解竞态条件警告(或者您可以只同步写入方法),您可以将上述所有内容包装到实现List接口的类中,并且您需要为remove方法抛出一个不受支持的操作异常。实现所有必需的方法需要相当多的样板文件,但这里有一个快速的实现尝试。

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;

public class ConcurrentAddOnlyList<V> implements List<V> {
    private NavigableMap<Integer, V> map = new ConcurrentSkipListMap<>();

    @Override
    public int size() {
        return map.size();
    }

    @Override
    public boolean isEmpty() {
        return map.isEmpty();
    }

    @Override
    public boolean contains(Object o) {
        return map.values().contains(o);
    }

    @Override
    public Iterator<V> iterator() {
        return map.values().iterator();
    }

    @Override
    public Object[] toArray() {
        return map.values().toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return map.values().toArray(a);
    }

    @Override
    public V get(int index) {
        return map.get(index);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return map.values().containsAll(c);
    }

    @Override
    public int indexOf(Object o) {
        for (Entry<Integer, V> ent : map.entrySet()) {
            if (Objects.equals(ent.getValue(), o)) {
                return ent.getKey();
            }
        }
        return -1;
    }

    @Override
    public int lastIndexOf(Object o) {
        for (Entry<Integer, V> ent : map.descendingMap().entrySet()) {
            if (Objects.equals(ent.getValue(), o)) {
                return ent.getKey();
            }
        }
        return -1;
    }

    @Override
    public ListIterator<V> listIterator(int index) {
        return new ListIterator<V>() {
            private int currIdx = 0;

            @Override
            public boolean hasNext() {
                return currIdx < map.size();
            }

            @Override
            public V next() {
                if (currIdx >= map.size()) {
                    throw new IllegalArgumentException(
                            "next() called at end of list");
                }
                return map.get(currIdx++);
            }

            @Override
            public boolean hasPrevious() {
                return currIdx > 0;
            }

            @Override
            public V previous() {
                if (currIdx <= 0) {
                    throw new IllegalArgumentException(
                            "previous() called at beginning of list");
                }
                return map.get(--currIdx);
            }

            @Override
            public int nextIndex() {
                return currIdx + 1;
            }

            @Override
            public int previousIndex() {
                return currIdx - 1;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }

            @Override
            public void set(V e) {
                // Might change size of map if currIdx == map.size(),
                // so need to synchronize 
                synchronized (map) {
                    map.put(currIdx, e);
                }
            }

            @Override
            public void add(V e) {
                synchronized (map) {
                    // Insertion is not supported except at end of list
                    if (currIdx < map.size()) {
                        throw new UnsupportedOperationException();
                    }
                    map.put(currIdx++, e);
                }
            }
        };
    }

    @Override
    public ListIterator<V> listIterator() {
        return listIterator(0);
    }

    @Override
    public List<V> subList(int fromIndex, int toIndex) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public boolean add(V e) {
        synchronized (map) {
            map.put(map.size(), e);
            return true;
        }
    }

    @Override
    public boolean addAll(Collection<? extends V> c) {
        synchronized (map) {
            for (V val : c) {
                add(val);
            }
            return true;
        }
    }

    @Override
    public V set(int index, V element) {
        synchronized (map) {
            if (index < 0 || index > map.size()) {
                throw new IllegalArgumentException("Index out of range");
            }
            return map.put(index, element);
        }
    }

    @Override
    public void clear() {
        synchronized (map) {
            map.clear();
        }
    }

    @Override
    public synchronized void add(int index, V element) {
        synchronized (map) {
            if (index < map.size()) {
                // Insertion is not supported except at end of list
                throw new UnsupportedOperationException();
            } else if (index < 0 || index > map.size()) {
                throw new IllegalArgumentException("Index out of range");
            }
            // index == map.size()
            add(element);
        }
    }

    @Override
    public synchronized boolean addAll(
            int index, Collection<? extends V> c) {
        synchronized (map) {
            if (index < map.size()) {
                // Insertion is not supported except at end of list
                throw new UnsupportedOperationException();
            } else if (index < 0 || index > map.size()) {
                throw new IllegalArgumentException("Index out of range");
            }
            // index == map.size()
            for (V val : c) {
                add(val);
            }
            return true;
        }
    }

    @Override
    public boolean remove(Object o) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V remove(int index) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        throw new UnsupportedOperationException();
    }
}

不要忘记,即使使用如上所示的写入线程同步,您也需要小心不要遇到可能导致删除项的竞态条件,例如,如果您试图在读取线程中遍历列表,而写入线程正在向列表的末尾添加内容。

您甚至可以使用ConcurrentSkipListMap作为双结束列表,只要您不需要每个项的键来表示列表中的实际位置(即添加到列表的开头将为项分配负键)。(同样的竞争条件警告也适用于这里,即应该只有一个写入线程。)

// Add item after last item in the "list":
map.put(map.isEmpty() ? 0 : map.lastKey() + 1, item);

// Add item before first item in the "list":
map.put(map.isEmpty() ? 0 : map.firstKey() - 1, item);

大多数情况下,如果您需要并发列表,则它位于模型对象内部(因为您不应该使用抽象数据类型(如列表)来表示应用程序模型图中的节点),或者它是特定服务的一部分,您可以自己同步访问。

class MyClass {
  List<MyType> myConcurrentList = new ArrayList<>();
  void myMethod() {
    synchronzied(myConcurrentList) {
      doSomethingWithList;
    }
  }
}

通常这就足够让你继续下去了。如果你需要迭代,迭代列表的一个副本,而不是列表本身,并且只同步你复制列表的部分,而不是在你迭代它的时候。

另外,当并发处理一个列表时,通常要做的事情不仅仅是添加、删除或复制,这意味着该操作变得足够有意义,以保证它自己的方法,并且该列表成为一个特殊类的成员,仅表示这个具有线程安全行为的特定列表。

即使我同意需要一个并发列表实现和Vector / collections . synchronizelist (list)不做的技巧,你肯定需要像compareAndAdd或compareAndRemove或get(…, ifAbsentDo),即使您有ConcurrentList实现,开发人员在使用并发列表(和映射)时也经常会因为没有考虑真正的事务而引入错误。

在这些场景中,事务对于与并发ADT(抽象数据类型)交互的预期目的来说太小,总是导致我将列表隐藏在一个特殊的类中,并在方法级别上使用synchronized同步访问这个类对象方法。这是确保交易正确的唯一方法。

我已经看到了太多的错误,不能用其他方法来做——至少如果代码很重要,处理一些像金钱或安全或保证一些服务质量措施(例如至少发送一次且只发送一次消息)。