您的位置:首页 > 运维架构

CopyOnWriteArrayList与ConcurrentHashMap原理解析

2017-07-13 22:49 585 查看

CopyOnWriteArrayList

这两个都是非常常用的并发类,先从CopyOnWriteArrayList讲起。这个类我们从名字可以看出,他是在进行写操作时进行复制,因而其它线程进行读操作时不会出现并发问题。它的实现也很简单,我们来看一段简单源码:

public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

它在进行add操作时先加锁,然后将数组内容复制到一个新数组中,然后在新数组上进行add操作。操作完后再将旧数组的指针指向新数组,解锁。

public E get(int index) {
return get(getArray(), index);
}

get操作则就连锁都没有了,非常简单。

CopyOnWriteArrayList体现了一个非常重要的思想,就是“读写分离”,它非常适合读操作频繁,但写操作很少的情况。

但它也不是完美的,举个栗子:

package com.app.JavaMaven;/**
* Created by Tim on 2017/7/13.
*/

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* create by 陈樟杰
*/
public class CopyOnWriteArrayListTest {

@org.junit.Test
public void test() {
final List list = new CopyOnWriteArrayList();
for (int i = 0; i < 100; i++)
list.add(i);

new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println(list.get(list.size()-1));
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
while (true) {
for (int i = 0; i < list.size(); i++)
list.remove(list.size() - 1);
}
}
}).start();

while (true) ;

}
}
这种情况下就分分钟抛数组越界了。

ConcurrentHashMap:

下面再说一说ConcurrentHashMap,它体现了另一个非常重要的思想,那就是分段锁。它比Hashtable,优化的一点就是进行了分段加锁而不是将整个数组都锁上。

我先来介绍一下几种常见的锁优化方案:

缩小锁范围

优化前
public synchronized void synchronizedOnMethod(){ //粗粒度直接在方法上加synchronized,这样会提高锁冲突的概率
prefix();
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
}
post();
}
private void post(){
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
}
}
private void prefix(){
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
}
}
}优化后
//假设prefix和post方法是线程安全的(与锁无关的代码)
static class SynchronizedClazz{
public void mineSynOnMethod(){
prefix();
synchronized (this){ //synchronized代码块只保护有竞争的代码
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
}
}
post();
}


分离锁

优化前
static class DecomposeClazz{
private final Set<String> allUsers = new HashSet<String>();
private final Set<String> allComputers = new HashSet<String>();

public synchronized void addUser(String user){ //公用一把锁
allUsers.add(user);
}

public synchronized void addComputer(String computer){
allComputers.add(computer);
}
}

优化后
static class DecompossClazz2{
private final Set<String> allUsers = new HashSet<String>();
private final Set<String> allComputers = new HashSet<String>();
public void addUser(String user){ //分解为两把锁
synchronized (allUsers){
allUsers.add(user);
}
}
public void addComputer(String computer){
synchronized (allComputers){
allComputers.add(computer);
}
}
}


分段锁

package com.app.JavaMaven;/**
* Created by Tim on 2017/7/13.
*/

import java.util.HashMap;
import java.util.Map;

/**
* create by 陈樟杰
*/
public class ConcurrentHashMapTest {

@org.junit.Test
public void test() {
final MyConcurrentHashMap map = new MyConcurrentHashMap();

// Map map = new HashMap();

new Thread(new Runnable() {
@Override
public void run() {
while (true)
map.put("100", "100");
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
while (true)
map.put("100", null);
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
while (true)
if (map.get("100") == null)
System.out.println(map.get("100"));
}
}).start();

while (true) ;

}
}

class MyConcurrentHashMap<K, V> {
private final int LOCK_COUNT = 16;
private final Map<K, V> map;
private final Object[] locks;

public MyConcurrentHashMap() {
this.map = new HashMap<K, V>();
locks = new Object[LOCK_COUNT];
for (int i = 0; i < LOCK_COUNT; i++) {
locks[i] = new Object();
}
}

private int keyHashCode(K k) {
return Math.abs(k.hashCode() % LOCK_COUNT);
}

public V get(K k) {
int keyHashCode = keyHashCode(k);
synchronized (locks[keyHashCode % LOCK_COUNT]) {
return map.get(k);
}
}

public V put(K k, V v) {
int keyHashCode = keyHashCode(k);
synchronized (locks[keyHashCode % LOCK_COUNT]) {
return map.put(k, v);
}
}

}
自己实现的一个简单ConcurrentHashMap,和jdk的相差还是很大的。但能大致反应原理。

初始化

传入的参数有initialCapacity,loadFactor,concurrencyLevel这三个。

initialCapacity表示新创建的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的Entry数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16;

loadFactor表示负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就需要rehash,扩容。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f;

concurrencyLevel表示并发级别,这个值用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想情况下ConcurrentHashMap的真正的并发访问量能够达到concurrencyLevel,因为有concurrencyLevel个Segment,假如有concurrencyLevel个线程需要访问Map,并且需要访问的数据都恰好分别落在不同的Segment中,则这些线程能够无竞争地自由访问(因为他们不需要竞争同一把锁),达到同时访问的效果。这也是为什么这个参数起名为“并发级别”的原因。

JDK1.8的改动

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V> table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。


改进二:
将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息