鸿 网 互 联 www.68idc.cn

当前位置 : 服务器租用 > 编程语言开发 > java > >

74 java多线程_6 _线程安全的集合

来源:互联网 作者:佚名 时间:2022-07-19 11:12
文章目录 ??线程安全的集合?? ??问题演示?? ??解决问题?? ??并发包中提供的线程安全集合?? ??CopyOnWriteArrayList?? ??CopyOnWriteArraySet?? ??Queue接口(队列)


文章目录

  • ??线程安全的集合??
  • ??问题演示??
  • ??解决问题??
  • ??并发包中提供的线程安全集合??
  • ??CopyOnWriteArrayList??
  • ??CopyOnWriteArraySet??
  • ??Queue接口(队列)??
  • ??ConcurrentLinkedQueue(无界队列,非阻塞队列)??
  • ??BlockingQueue接口(阻塞队列,有界的队列)??
  • ??阻塞队列实现类:??
  • ??ArrayBlockingQueue??
  • ??LinkedBlock ingQueue:??
  • ??ConcurrentHashMap??
  • ??ConcurrentHashMap,jdk1.7与jdk1.8的区别??
  • ??concurrentHashMap jdk7版本??
  • ??concurrentHashMap jdk8版本??

线程安全的集合

  • Collection体系集合中,除Vector以外的线程安全集合。
  • 74 java多线程_6 _线程安全的集合_数组

  • Map安全集合体系
  • 74 java多线程_6 _线程安全的集合_线程安全_02

    问题演示

    package com.wlw.thread.threaddemo03;

    import java.util.ArrayList;

    /**
    * 演示:使用多线程操作线程不安全的集合
    */
    public class Demo01 {
    public static void main(String[] args) {
    //创建一个线程不安全的集合
    ArrayList<String> arrayList = new ArrayList<>();
    //创建多个线程
    for (int i = 0; i < 10; i++) {
    int temp = i;
    new Thread(new Runnable() {
    @Override
    public void run() {
    for (int j = 0; j < 10; j++) {
    arrayList.add(Thread.currentThread().getPriority()+"===="+temp+"===="+j);
    System.out.println(arrayList.toString());
    }
    }
    }).start();
    }
    }
    }
    /*并发修改异常:
    Exception in thread "Thread-9" java.util.ConcurrentModificationException
    */

    解决问题

    • jdk1.5之前并没有上面提到的并发包中的安全集合(jdk1.5之后才有)。
    • JDK1. 2提供,接口统一、维护性高,但性能没有提升,均以synchoni zed实现。
    • Collections工具类中提供了多个可以获得线程安全集合的方法:(现在是不用了)
    • public static Collection synchronizedCollection (Collection c)
    • public static List synchronizedList(List list)
    • public static Set synchionizedSet (Set s)
    • public static <K, V> Map<K, V> synchronizedMap (Map<K, V> m)
    • public static SortedSet synchronizedSortedSet (SortedSet s)
    • public static <K, V> SortedMap<K, V> synchronizedSortedMap (SortedMap<K, V> m)

    代码:

    package com.wlw.thread.threaddemo03;

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;

    /**
    * 演示:使用多线程操作线程不安全的集合
    */
    public class Demo01 {
    public static void main(String[] args) {
    //1.创建一个线程不安全的集合
    ArrayList<String> arrayList = new ArrayList<>();
    //1.1 使用Collections中线程安全方法 将其转成线程安全的集合(jdk1.2)
    List<String> synList = Collections.synchronizedList(arrayList);
    //1.2 使用并发包中提供的线程安全集合(jdk1.5)
    CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();


    //2.创建多个线程
    for (int i = 0; i < 10; i++) {
    int temp = i;
    new Thread(new Runnable() {
    @Override
    public void run() {
    for (int j = 0; j < 10; j++) {
    //synList.add(Thread.currentThread().getPriority()+"===="+temp+"===="+j);
    //System.out.println(synList.toString());

    copyOnWriteArrayList.add(Thread.currentThread().getPriority()+"===="+temp+"===="+j);
    System.out.println(copyOnWriteArrayList.toString());
    }
    }
    }).start();
    }
    }
    }

    并发包中提供的线程安全集合

    jdk1.5之后给我们提供了并发包中的线程安全集合

    CopyOnWriteArrayList

  • 线程安全的ArrayList, 加强版的读写分离。
  • 写有锁,读无锁,读写之间不阻塞,优于读写锁。
  • 写入时,先copy一个容器副本、再添加新元素,最后替换引用。
  • (缺点就是:浪费空间,以空间来换取线程安全)
  • 使用方式与ArrayList无异。
  • 代码:

    package com.wlw.thread.threaddemo03;

    import java.util.Arrays;
    import java.util.Random;
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    /**
    *使用多线程来操作 CopyOnWriteArrayList
    */
    public class CopyOnWriteArrayListDemo {
    public static void main(String[] args) {

    //1.创建集合
    CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();

    //2.创建线程池
    ExecutorService es = Executors.newFixedThreadPool(5);

    //3.提交任务(提交五次)
    for (int i = 0; i < 5; i++) {
    es.submit(new Runnable() {
    @Override
    public void run() {
    for (int i = 0; i < 10; i++) {
    copyOnWriteArrayList.add(Thread.currentThread().getName()+"----"+new Random().nextInt(1000));

    }
    }
    });
    }

    //4.关闭线程池
    es.shutdown();
    //判断
    while (!es.isTerminated()){}
    //打印集合
    for (String s : copyOnWriteArrayList) {
    System.out.println(s);
    }

    }
    }/*
    CopyOnWriteArrayList的部分源码
    */
    final transient ReentrantLock lock = new ReentrantLock(); //重入锁
    private transient volatile Object[] array; //数组
    final Object[] getArray() {return array;}
    final void setArray(Object[] a) {array = a;}
    public CopyOnWriteArrayList() {setArray(new Object[0]);} //构造方法,一开始为空数组

    //add()方法
    public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();//上锁
    try {
    Object[] elements = getArray();
    int len = elements.length;
    Object[] newElements = Arrays.copyOf(elements, len + 1);//复制了一份新的数组
    newElements[len] = e;
    setArray(newElements);
    return true;
    } finally {
    lock.unlock();
    }
    }
    //remove()方法
    public E remove(int index) {
    final ReentrantLock lock = this.lock;
    lock.lock();//上锁
    try {
    Object[] elements = getArray();
    int len = elements.length;
    E oldValue = get(elements, index);
    int numMoved = len - index - 1;
    if (numMoved == 0)
    setArray(Arrays.copyOf(elements, len - 1));//复制了一份新的数组
    else {
    Object[] newElements = new Object[len - 1];
    System.arraycopy(elements, 0, newElements, 0, index);
    System.arraycopy(elements, index + 1, newElements, index,
    numMoved);
    setArray(newElements);
    }
    return oldValue;
    } finally {
    lock.unlock();
    }
    }

    //get()方法
    private E get(Object[] a, int index) {
    return (E) a[index]; //没有上锁,直接就获取了
    }

    CopyOnWriteArraySet

  • 线程安全的Set,底层使用CopyOnWriteArrayList实现。
  • 唯一不同在于, 使用addIfAbsent() 添加元素,会遍历数组,
  • 如存在元素,则不添加(扔掉副本)。
  • 代码:

    package com.wlw.thread.threaddemo03;

    import java.util.concurrent.CopyOnWriteArraySet;

    /**
    *操作 CopyOnWriteArraySet
    重复依据:equals方法来判断
    */
    public class CopyOnWriteArraySetDemo {
    public static void main(String[] args) {
    //1.创建集合
    CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
    //添加
    set.add("pingguo");
    set.add("xiaomi");
    set.add("huawei");
    set.add("oppo");
    set.add("pingguo"); //重复,不会添加
    //打印
    System.out.println(set.size());
    System.out.println(set.toString());
    }
    }/*
    CopyOnWriteArraySet的部分源码
    */
    private final CopyOnWriteArrayList<E> al; //
    //构造方法
    public CopyOnWriteArraySet() {
    al = new CopyOnWriteArrayList<E>();//底层就是CopyOnWriteArrayList
    }
    //add()方法
    public boolean add(E e) {
    return al.addIfAbsent(e);
    }
    public boolean addIfAbsent(E e) {
    Object[] snapshot = getArray();
    return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
    addIfAbsent(e, snapshot);
    }
    //判断是否重复,用equals来判断,重复则返回下标,不重复返回-1,添加
    private static int indexOf(Object o, Object[] elements,
    int index, int fence) {
    if (o == null) {
    for (int i = index; i < fence; i++)
    if (elements[i] == null)
    return i;
    } else {
    for (int i = index; i < fence; i++)
    if (o.equals(elements[i]))
    return i;
    }
    return -1;
    }
    //添加 和CopyOnWriteArrayList的添加一样
    private boolean addIfAbsent(E e, Object[] snapshot) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    Object[] current = getArray();
    int len = current.length;
    if (snapshot != current) {
    // Optimize for lost race to another addXXX operation
    int common = Math.min(snapshot.length, len);
    for (int i = 0; i < common; i++)
    if (current[i] != snapshot[i] && eq(e, current[i]))
    return false;
    if (indexOf(e, current, common, len) >= 0)
    return false;
    }
    Object[] newElements = Arrays.copyOf(current, len + 1);//复制了一份新的数组
    newElements[len] = e;
    setArray(newElements);
    return true;
    } finally {
    lock.unlock();
    }
    }

    Queue接口(队列)

  • Collection的子接口, 表示队列FIFO (First In First Out)先进先出。
  • 常用方法:
  • 抛出异常:
  • boolean add(E e) //顺序添加一个元素(到达上限后,再添加则会抛出异常)
  • E remove() //获得第-一个元素并移除(如果队列没有元素时,则抛异常)
  • E element() //获得第-一个元素但不移除(如果队列没有元素时,则抛异常)
  • 返回特殊值:(推荐使用)
  • boolean offer(E e) //顺序添加一个元素 (到达上限后,再添加则会返回false)
  • E poll() //获得第一个元素并移除 (如果队列没有元素时,则返回null)
  • E peek() //获得第一个元素但不移除 (如果队列没有元素时,则返回null)
  • 代码:

    package com.wlw.thread.threaddemo03;

    import java.util.LinkedList;
    import java.util.Queue;

    /**
    * 队列Queue的使用
    */
    public class QueueDemo {
    public static void main(String[] args) {
    //创建队列( LinkedList 是线程不安全的 先不要线程)
    Queue<String> queue = new LinkedList<>();
    //添加
    queue.offer("苹果");
    queue.offer("橘子");
    queue.offer("香蕉");
    queue.offer("榴莲");
    System.out.println("元素个数:" + queue.size());

    //获取元素
    String s = queue.peek(); //获得第一个元素但不移除 (如果队列没有元素时,则返回null)
    System.out.println(s);
    System.out.println(queue.peek());

    int size = queue.size();
    //出队 移除元素
    System.out.println("出队元素:");
    for (int i = 0; i < size; i++) {
    System.out.println(queue.poll());//获得第一个元素并移除 (如果队列没有元素时,则返回null)
    }
    System.out.println("出队之后:"+queue.size());
    }

    }
    /*
    元素个数:4
    苹果
    苹果
    出队元素:
    苹果
    橘子
    香蕉
    榴莲
    出队之后:0
    */

    ConcurrentLinkedQueue(无界队列,非阻塞队列)

  • 线程安全、可高效读写的队列,高并发下性能最好的队列。
  • 无锁、CAS比较交换算法,修改的方法包含三个核心参数(V, E, N)
  • V:要更新的变量、E:预期值、N:新值。
  • 只有当V==E时,V=N(更新操作);否则表示已被更新过,则取消当前操作。
  • 代码:

    package com.wlw.thread.threaddemo03;

    import java.util.concurrent.ConcurrentLinkedQueue;

    /**
    * ConcurrentLinkedQueue的使用
    * 它是线程安全的
    */
    public class ConcurrentLinkedQueueDemo {
    public static void main(String[] args) throws Exception {
    //创建安全队列
    ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
    //用线程做入队操作
    Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
    for (int i = 1; i <= 5; i++) {
    concurrentLinkedQueue.offer(i);
    }
    }
    });
    Thread t2 = new Thread(new Runnable() {
    @Override
    public void run() {
    for (int i = 6; i <= 10; i++) {
    concurrentLinkedQueue.offer(i);
    }
    }
    });

    //启动线程
    t1.start();
    t2.start();
    t1.join(); //确保两个线程都运行完之后,再出队
    t2.join();

    System.out.println("元素个数:"+concurrentLinkedQueue.size());
    //出队
    System.out.println("------------出队-----------");
    int size = concurrentLinkedQueue.size();
    for (int i = 0; i < size; i++) {
    System.out.println(concurrentLinkedQueue.poll());
    }
    }
    }
    /*
    元素个数:10
    ------------出队-----------
    6
    1
    7
    8
    2
    9
    3
    10
    4
    5
    */

    BlockingQueue接口(阻塞队列,有界的队列)

  • Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法。
  • 方法:
  • void put(E e) //将指定元素插入此队列中,如果没有可用空间,则等待。
  • E take() //获取并移除此队列头部元素,如果没有可用元素,则等待。
  • 可用于解决生产生、消费者问题。
  • 阻塞队列实现类:

    ArrayBlockingQueue
    • 数组结构实现,有界队列。(手工固定 上限)
    LinkedBlock ingQueue:
    • 链表结构实现,有界队列。(默认 上限Integer. MAX_ VALUE)

    案例一:

    package com.wlw.thread.threaddemo03;

    import java.util.concurrent.ArrayBlockingQueue;
    /**
    * 阻塞队列的使用
    * 案例1:创建一个有界队列,添加元素
    * 案例2:使用阻塞队列实现生产者和消费者
    */
    public class BlockingQueueDemo {
    public static void main(String[] args) throws Exception {
    //1.创建一个有界队列,添加元素(只能添加5个元素)
    ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(5);
    //添加
    arrayBlockingQueue.put("aaa");//将指定元素插入此队列中,如果没有可用空间,则等待。
    arrayBlockingQueue.put("bbb");
    arrayBlockingQueue.put("ccc");
    arrayBlockingQueue.put("ddd");
    arrayBlockingQueue.put("eee");
    arrayBlockingQueue.take();//获取并移除此队列头部元素,如果没有可用元素,则等待。
    System.out.println("1.队列元素:"+arrayBlockingQueue.size());

    arrayBlockingQueue.put("fff");
    //这一句是加不进去的,因为只能加5个,并且线程一直处在阻塞状态,
    //只有将队列中的元素移除一个之后,这一句才能执行成功
    System.out.println("2.队列元素:"+arrayBlockingQueue.size());
    System.out.println(arrayBlockingQueue.toString());

    }
    }

    案例二:

    package com.wlw.thread.threaddemo03;

    import java.util.concurrent.ArrayBlockingQueue;

    /**
    * 阻塞队列的使用
    * 案例1:创建一个有界队列,添加元素
    * 案例2:使用阻塞队列实现生产者和消费者***
    */
    public class BlockingQueueDemo02 {
    public static void main(String[] args) throws Exception {
    //1.创建一个有界队列,(6个)
    ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<Integer>(6);
    //创建线程并创建操纵
    Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
    for (int i = 0; i < 10; i++) {
    try {
    arrayBlockingQueue.put(i);
    System.out.println(Thread.currentThread().getName()+",生产了第"+i+"号面包");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }
    });
    Thread t2 = new Thread(new Runnable() {
    @Override
    public void run() {
    for (int i = 0; i < 10; i++) {
    try {
    arrayBlockingQueue.take();
    System.out.println(Thread.currentThread().getName()+",消费了第"+i+"号面包");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }
    });

    //启动线程
    t1.start();
    t2.start();
    }
    }
    /*
    因为两个输出语句并没有同步,所以输出语句表示的结果有时候有可能先打印了消费了某号面包,再打印生产
    但我们保证了这个队列(容器)最多只能放六个
    */

    ConcurrentHashMap

  • 初始容量默认为16段(Segment) ,使用(jdk1.7)分段锁设计。(jdk1.8)是采用cas+syschronized实现
  • 不对整个Map加锁,而是为每个Segment加锁。
  • 当多个对象存入同一个Segment时, 才需要互斥(需要锁)。
  • 最理想状态为16个对象分别存入16个Segment, 并行数量16。
  • 使用方式与HashMap无异。
  • 代码:

    package com.wlw.thread.threaddemo03;
    import java.util.concurrent.ConcurrentHashMap;

    /**
    * ConcurrentHashMap的使用
    */
    public class ConcurrentHashMapDemo {
    public static void main(String[] args) {
    //创建集合
    ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
    //创建线程
    for (int i = 0; i < 5; i++) {
    new Thread(new Runnable() {
    @Override
    public void run() {
    for (int j = 0; j < 10; j++) {
    concurrentHashMap.put(Thread.currentThread().getName()+"---"+j,j+" ");
    System.out.println(concurrentHashMap);
    }
    }
    }).start();
    }

    }
    }

    ConcurrentHashMap,jdk1.7与jdk1.8的区别

    concurrentHashMap jdk7版本

    • 线程安全,不允许key value为空。
    • 两个比较重要的参数,Segment(实现了ReentrantLock的类)数组,HashEntry数组,Segment中包含HashEntry。
    • 每个Segment类似一个HashMap对象。同HashMap一样,Segment包含一个HashEntry数组,数组中的每一个HashEntry既是一个键值对,也是一个链表的头节点,hash冲突时会变成链表连接,与jdk7的hashmap类似。默认Segment的个数是16个,同时会保证是2的幂次方(类似hashMap),HashEntry默认的个数是2个(initialCapacity/concurrencyLevel,最小是2个)
    • 构造函数:默认构造函数设置3个参数的值,initialCapacity(用于计算HashEntry数组容量,initialCapacity/concurrencyLevel), loadFactor 负载因子(Segment的节点数超过 负载因子*entry容量后开始扩容), concurrencyLevel(Segment的个数,如果不是2的幂次数会扩大到最接近的2的幂次数)。默认构造函数Segment的个数是16,但是只会先初始化第一个Segment,其他的懒加载。
    • get方法,通过计算key的hash值(根据key的hashCode再hash,防止散列不均匀),再通过int j =(hash >>> segmentShift) & segmentMask这个算法获取到所在的Segment的位置j。segmentShift=32-sshift,而2的sshift次方=ssize,ssize即Segment分个数,默认Segment个数为16即ssize为16,sshift即为4,segmentShift=32-4=28。segmentMask为ssize-1即默认为15(二进制位全是1,因ssize是2的幂次数),即段的位置j=(hash >>> 28) & 15,即hash值右移28位(保留高位),再和15相与(相当于和16取余计算)。获取当前key所在的Segment,再根据hash值和数组的大小取余获取HashEntry数组对应的位置。
    • 总结就是计算Segment的位置是获取到key的hashCode再hash后,根据hash值先右移保留高位,再取余。计算HashEntry的位置是直接根据hash值取余(之所以一个取高位,一个直接使用hash值是为了防止两个数值相同,基于Segment已经散列开,但是基于HashEntry没有散列开)。补充一点get方法是无锁的,HashEntry的value是volatile,确保线程的修改立即可见,而且即使同个线程同时操作,一个写入一个读取,因为对 volatile 字段的写入操作先于读操作,所以也能确保拿到新数据。
    • put方法,获取Segment位置和get同样,再获取这个Segment的锁,通过hash计算对应的entry,存放数据,如果该数组存在数据,则判断是否重复,重复则替换,不重复则头插法插入数组。容量超过负载因子*容量即会扩容,与hashMap相反是先判断要不要扩容再存放数据,防止扩容后就没数据进来,浪费空间。扩容基于某个Segment扩容,两倍扩容。
    • size()方法,Segment中维护两个值,一个是数据的总数count,一个是HashEntry改动的次数modCount。获取map中数据的总数,需要加各个Segment中数据的总数count累加起来。这个累加过程是不加锁的,因此可能会在统计过程中某个Segment的个数又增加。所以基于乐观锁和悲观锁的理念,累加会进行修改次数的比较,相等则返回,不相等则重试(乐观锁),重试默认超过2次就会把所有的Segment都加锁(悲观锁),再统计size返回。在第一次循环统计modCount和统计count,然后比较last(记录上次统计修改次数,第一次循环last=0)和这次累加modCount是否相等(第一次循环last默认值为0,所以只有集合没有改动过即没有数据,这一次modCount的统计才会和last相等,才会第一次循环就退出)。第二次循环的累加count和累加modCount,然后比较last和这次累加modCount是否相等,相等说明统计count前后没有发现改变跳出循环,否则重复第二次循环的操作,超过重试次数就加锁统计。

    concurrentHashMap jdk8版本

    • 通过cas+syschronized实现,底层基于数组+链表+红黑树,类似hashMap。且concurrentHashMap通过Unsafe类基于内存地址操作变量值。
    • Unsafe类,类似c、c++的指针,可以直接通过内存地址操作内存数据,性能会更好,但是也会存在指针相关问题的风险,并且unsafe支持操作堆外内存(concurrentHashMap是用堆内内存,节点数组是通过new的方式创建的),但是需要自己管理内存,使得java不安全,因此也叫不安全类(但是因为不是堆内内存,不收垃圾回收管理,因此也避免了垃圾回收造成的停顿,并且提高io性能,避免io过程堆内存和堆外内存进行数据拷贝的操作)。java底层很多用到这个类进行操作。且该类支持执行处理指令cas指令和内存屏障指令,concurrentHashMap通过该类执行cas、操作数组、操作变量值。
    • 几个比较重要的参数和数值说明:
  • sizeCtl参数。初始化或扩容时的标志参数,-1表示正在初始化,-N表示有N-1个线程正在扩容(concurrentHashMap 1.8支持多线程扩容),0表示没有初始化,正数表示下一次扩容的时机,即0.75size。
  • 节点主要有四种节点,分别是Node(普通节点)。TreeNode(树节点)。TreeBin(封装树的头结点,也就是数组存的是一个TreeBin,TreeBin基于TreeNode创建,TreeBin的构造函数会基于TreeNode构造红黑树(转换红黑树时,传入的TreeNode只是基于next连接节点))。ForwardingNode(正在迁移的节点,当一个数组正在被迁移时,旧数组上的节点会被设置成ForwardingNode,ForwardingNode保存了正在迁移的新数组数据)。
  • 节点的hash值:node节点和TreeNode的hash值即为key的hash(hashCode再hash)值,TreeBin的hash值为-2,ForwardingNode的hash值为-1。
  • MIN_TRANSFER_STRIDE: 最小切割数,默认值为16。为了支持多线程扩容,concurrentHashMap通过切分数组,每个数组单独扩容,即每个线程最少处理16个数组位置的扩容。
  • TRANSFERINDEX: 存储当前还未迁移的数组位置的最大下标,比如旧数组大小为64,每个线程处理16个数组位置,一开始TRANSFERINDEX设置成64,一个线程开始处理之后,这个值就变成64-16=48,那么第一个线程处理迁移的范围就是 49 ~ 64。
    • 构造函数是空方法,put方法判断为空再cas初始化数组(防止多线程初始化,cas失败会重试,重试如果判断),默认数组大小是16,sizeCtl为16*0.75=12。
    • get 方法:为无锁操作。通过hash值计算数组位置。(1)如果判断改位置的节点hash值和key都相等就返回该节点。(2)判断如果hash值小于0(因为TreeBin和ForwardingNode的hash值都被固定设置成负数),就通过节点本身的find方法查询节点。TreeBin的find方法就是通过红黑树的方式查找节点,ForwardingNode的find方法会再次确认,如果确认是ForwardingNode,会基于新数组查询,如果不再次确认时已经不是ForwardingNode,就类似之前的判断决定用遍历联表方式还是用红黑树的find。(3) 如果hash值大于0,就通过链表形式遍历查询。
    • put方法: 外层嵌套一个无限循环,break再跳出。(1)如果节点数组为空则初始化创建一个节点数组,继续下次循环判断。(2)通过计算hash值对应数组位置,如果该位置不存在节点则cas存放一个新节点,cas成功则break退出,失败则继续下一次循环判断。(3)判断改数组位置节点的hash值是不是-1(ForwardingNode节点),如果是代表正在扩容迁移数据到新数组中,调用helpTransfer方法帮助迁移。(4)通过syschronized锁住当前节点,判断如果数组上的节点的hash值大于0,代表是普通节点,通过链表形式扩容。如果hash值小于0,通过红黑树形式扩容。释放锁。(5) 存放节点之后,判断链表的长度binCount是否大于等于8,如果是则调用treeifyBin方法转换红黑树。
    • treeifyBin()方法:这个方法的作用是将数组某个位置的链表转为红黑树。(1)先判断节点数组的大小是否大于64,如果没有大于64则调用tryPresize方法进行数组扩容。扩大两倍扩容(2)如果节点数组的大小已经大于等于64,syschronized锁住当前节点,将当前数组位置的链表转换成红黑树。转红黑树先循环创建TreeNode节点,并将多个TreeNode节点通过next连接起来,循环结束后创建TreeBin节点,TreeBin接收第一个TreeNode节点,在构造函数中构建红黑树结构。
    • tryPresize 尝试扩容方法:根据传入的size扩大1.5倍+1,再扩大到最接近的2的幂次数(假设传入32,则放大到64)。(1)如果调用这个方法时,节点数组为空,创建节点数组。(2)调用transfer方法扩容。
    • transfer 扩容方法:通过cpu核数计算每个线程负责多少个数组位置的扩容,最少负责16个数组位置。再根据TRANSFERINDEX计算负责迁移的范围,遍历这个范围的数组节点处理。(1)判断节点为空则将老数组节点设置成ForwardingNode节点,继续遍历查找。(2)如果节点的hash值为-1(即已经是ForwardingNode),则说明改节点已经迁移完成,无需处理。(3)通过syschronized锁住当前节点,开始迁移。迁移与jdk7版本类似。一个线程处理完会重复这个流程。最后通过TRANSFERINDEX遍历到下标小于0,即所有数组位置都处理完就退出了。
    • addCount方法:case增加baseCount的值,校验是否需要扩容
    • 为什么数组位置为空的时候,put采用cas,不为空的时候还要使用syschronized? 个人理解有两个原因。(1)数组位置为空的时候,cas有参照物且只需要一步操作,不为空则需要连接链表或者红黑树。链表添加值不可以用cas,遍历到尾节点,设置尾节点的next为空则替换成新元素,虽然能保证插入不会被重复设值,但是如果尾节点被另一个线程删除,那么新插入的节点也会丢失。红黑树因为存放数据后还需要调节平衡,如果调节平衡过程中put一个新数据会影响红黑树的调节出现问题,而且也没办法做cas操作。(2)数组位置为空时,扩容的时候可以同样通过cas将空位置替换成ForwardingNode,而put也是cas将空位置替换成普通节点,因此不会出现线程安全问题,只有一个成功,另一个失败的会重试。而在节点不为空的情况下,如果不加锁,因为扩容过程流程多,非原子操作,可能会出现正在复制节点时,刚复制好,还没把老数组设置成ForwardingNode,另一个线程又往老数组put了新数组,这时就出现数据丢失的情况。因此需要用syschronized加锁变成原子操作。
    • concurrentHashMap利用cas的方式大部分都是在外面嵌套一个无限循环,cas成功或者其他别的情况处理成功再break跳出循环。


    上一篇:71 java多线程_3 _线程状态与常见方法
    下一篇:没有了
    网友评论
    <