您的位置:首页 > 编程语言 > Java开发

【Java并发编程】1. 线程基础、线程之间的共享和协作

2020-03-17 18:32 483 查看

文章目录

  • 2. 进程和线程
  • 3. Java线程工作方式
  • 4. 线程状态
  • 5. synchronized 用法
  • 5. volatile
  • 6. ThreadLocal
  • 7. wait notify
  • 8. join
  • 9. 调用yield() 、sleep()、wait()、notify()等方法对锁有何影响?
  • 参考

  • Java 并发编程基础

    一言以蔽之:Java 并发编程的目的就是充分利用计算机的资源,把计算机的性能发挥到最大

    1. 什么是高并发

    并发 concurrency 和并行 parallelism 的区别

    并发是指多个线程操作同一个资源,不是同时操作,而是交替操作,单核 CPU,只不过因为速度太快,看起来是同时执行(张三、李四,共用一口锅炒菜,交替执行),通过时间片轮转机制RR调度实现并发。

    并行才是真正的同时执行,多核 CPU,每个线程使用一个独立的 CPU 的资源来运行。(张三、李四,一人一口锅,一起炒菜)

    并发编程是指使系统允许多个任务在重叠的时间段内执行的设计结构。

    高并发我们设计的程序,可以支持海量任务的同时执行(任务的执行在时间段上有重叠的情况)

    • QPS:每秒响应的请求数,QPS 并不是并发数。
    • 吞吐量:单位时间内处理的请求数,QPS 和并发数决定。
    • 平均响应时间:系统对一个请求作出响应的平均时间,QPS = 并发数/平均响应时间
    • 并发用户数:系统可以承载的最大用户数量。

    高并发编程利弊

    • 利:充分利用cpu的资源、加快用户响应的时间,程序模块化,异步化
    • 弊:线程共享资源,存在冲突。容易导致死锁。启用太多的线程,就有搞垮机器的可能。

    互联网项目架构中,如何提高系统的并发能力?

    • 垂直扩展
    • 水平扩展

    垂直扩展

    提升单机的处理能力

    1、增强单机的硬件性能:增加 CPU 的核数,硬盘扩容,内存升级。

    2、提升系统的架构性能:使用 Cache 来提高效率,异步请求增加单个服务的吞吐量,使用 NoSQL 来提升数据的访问性能。

    水平扩展

    集群、分布式都是水平扩展的方案

    集群:多个人做同一件事情(3 个厨师同时炒菜)

    分布式:把一件复杂的事情,拆分成几个简单的步骤,分别找不同的人去完成 (1 洗菜、2 切菜、3 炒菜)



    并发跟并行区别跟联系:

    1. 分布式是指多个系统协同合作完成一个特定任务的系统。分布式是的问题,把所有解决中心化管理的任务叠加到一个节点处理,太慢了。把一个业务拆分为多个子业务(当某个子业务访问量突增时,增加该子任务的节点数即可),部署在多个服务器上 ,分布式的主要工作是分解任务,将职能拆解。
    2. 集群主要的使用场景是为了分担请求的压力,同一个业务,部署在多个服务器上 ,也就是在几个服务器上部署来==相同的应用程序,==分担客户端请求。当压力进一步增大的时候,可能在需要存储的部分,mysql 无法面对很多的写压力。因为在 mysql 做成集群之后,主要的写压力还是在 master 的机器上面,其他 slave 机器无法分担写压力,从而这个时候,也就引出来分布式。分布式的主要应用场景是单台机器已经无法满足这种性能的要求,必须要融合多个节点,并且节点之间是相关之间有交互的。相当于在写 mysql 的时候,每个节点存储部分数据,也就是分布式存储的由来。存储一些非结构化数据:静态文件、图片、pdf、小视频 … 这些也就是分布式文件系统的由来。
    3. 集群主要是。分布式中的某个子任简单加机器解决问题,对于问题本身不做任何分解;分布式处理里必然包含任务分解与答案归并务节点,可能由一个集群来代替;集群中任一节点,都是做一个完整的任务。集群和分布式都是由多个节点组成,。但是集群之间的通信协调基本不需要;而分布式各个节点的通信协调必不可少RPC
    4. 将一套系统拆分成不同子系统部署在不同服务器上(这叫分布式),
      然后部署多个相同的子系统在不同的服务器上(这叫集群),部署在不同服务器上的同一个子系统应做负载均衡。
    5. 负载均衡集群:一个集群节点来接受任务,然后根据其余节点的工作状态(CPU,内存等信息)派发任务量, 最终实现完成任务。
    6. SOA:业务系统分解为多个组件,让每个组件都独立提供离散,自治,可复用的服务能力,通过服务的组合和编排来实现上层的业务流程
      作用:简化维护,降低整体风险,伸缩灵活
      7.微服务:架构设计概念,各服务间隔离(分布式也是隔离),自治(分布式依赖整体组合)其它特性(单一职责,边界,异步通信,独立部署)是分布式概念的跟严格执行SOA到微服务架构的演进过程 作用:各服务可独立应用,组合服务也可系统应用。SpringClond就是很经典的微服务框架。

    1、站点层扩展:Nginx 方向代理,高并发系统,一个 Tomcat 带不起来,就找十个 Tomcat 去带


    2、服务层扩展:通过 RPC 框架实现远程调用,Dubbo、Spring Boot/Spring Cloud,将业务逻辑拆分到不同的 RPC Client,各自完成不同的业务,如果某些业务的并发量很大,就增加新的 RPC Client,理论上实现无限高并发。

    3、数据层扩展:一台数据库拆成多台,主从复制、读写分离、分表分库。

    2. 进程和线程

    进程就是计算机正在运行的一个独立的应用程序,进程是一个动态的概念,必须是运行状态,如果一个应用程序没有启动,那就不是进程。

    线程就是组成进程的基本单位,可以完成特定的功能,一个进程是由一个或多个线程组成。

    进程和线程的区别在于运行时是否拥有独立的内存空间,每个进程所拥有的空间都是独立的,互不干扰,多个线程是共享内存空间的,但是每个线程的执行是相互独立。

    线程必须依赖于进程才能执行,单独线程是无法执行的,由进程来控制多个线程的执行。
    Java 默认有几个线程?

    public class OnlyMain {
    public static void main(String[] args) {
    //虚拟机线程管理的接口
    ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
    ThreadInfo[] threadInfos =
    threadMXBean.dumpAllThreads(false, false);
    for(ThreadInfo threadInfo:threadInfos) {
    System.out.println("["+threadInfo.getThreadId()+"]"+" "
    +threadInfo.getThreadName());
    }
    }
    }

    Java 默认有两个线程:Main 和 GC

    Java 本身是无法启动线程的

    new Thread(futureTask).start();
    
    public synchronized void start() {
    /**
    * This method is not invoked for the main method thread or "system"
    * group threads created/set up by the VM. Any new functionality added
    * to this method in the future may have to also be added to the VM.
    *
    * A zero status value corresponds to state "NEW".
    */
    if (threadStatus != 0)
    throw new IllegalThreadStateException();
    
    /* Notify the group that this thread is about to be started
    * so that it can be added to the group's list of threads
    * and the group's unstarted count can be decremented. */
    group.add(this);
    
    boolean started = false;
    try {
    start0();
    started = true;
    } finally {
    try {
    if (!started) {
    group.threadStartFailed(this);
    }
    } catch (Throwable ignore) {
    /* do nothing. If start0 threw a Throwable then
    it will be passed up the call stack */
    }
    }
    }
    
    private native void start0();// 调用本地方法C++动态函数库

    Java 无法操作底层硬件,只能通过调用本地方法,C++ 编写的动态函数库,由 C++ 去操作底层启动线程,Java 只是间接调用。

    多线程实现方式

    1. 继承 Thread
    2. 实现 Runnable
    3. 实现 Callable
      重点:Thread 是线程对象,Runnable 是任务,一线程启动的时候一定是对象。

    ```java
    package com.southwind.demo;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    public class Test {
    public static void main(String[] args) {
    MyCallable myCallable = new MyCallable();
    FutureTask futureTask = new FutureTask(myCallable);
    Thread thread = new Thread(futureTask);
    thread.start();
    //获取Callable的返回值
    try {
    System.out.println(futureTask.get());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }
    }
    
    class MyCallable implements Callable<String>{
    
    @Override
    public String call() throws Exception {
    System.out.println("callable");
    return "hello";
    }
    }

    Callable 与 Runnable 的区别:

    • Callable 的 call 方法有返回值,Runnable 的 run 方法没有返回值。

    • Callable 的 call 方法可以抛出异常,Runnable 的 run 方法不能抛出异常。

    • 在外部通过 FutureTask 的 get 方法异步获取执行结果,FutureTask 是一个可以控制的异步任务,是对 Runnable 实现的一种继承和扩展。

    • get 方法可能会产生阻塞,一般放在代码的最后。Callable 有缓存。

    3. Java线程工作方式

    java线程是协作式,而非抢占式。调用一个线程的interrupt() 方法中断一个线程,并不是强行关闭这个线程,只是跟这个线程打个招呼,将线程的中断标志位置为true,线程是否中断,由线程本身决定。
    static方法interrupted() 判定当前线程是否处于中断状态,同时中断标志位改为false。
    注意:方法里如果抛出InterruptedException,线程的中断标志位会被复位成false,如果确实是需要中断线程,要求我们自己在catch语句块里再次调用interrupt()。

    public class HasInterrputException {
    
    private static SimpleDateFormat formater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS");
    
    private static class UseThread extends Thread {
    
    public UseThread(String name) {
    super(name);
    }
    
    @Override
    public void run() {
    String threadName = Thread.currentThread().getName();
    while (!isInterrupted()) {
    try {
    System.out.println("UseThread:" + formater.format(new Date()));
    Thread.sleep(300);
    } catch (InterruptedException e) { //此处捕捉到异常后会将该进程的停止标志位变为false
    System.out.println(threadName + " catch interrput flag is "
    + isInterrupted() + " at " + (formater.format(new Date())));
    interrupt(); // 超级重要 如果要推出程序
    e.printStackTrace();
    }
    System.out.println(threadName);
    }
    System.out.println(threadName + " interrput flag is "
    + isInterrupted());
    }
    }
    
    public static void main(String[] args) throws InterruptedException {
    Thread endThread = new UseThread("HasInterrputEx");
    endThread.start();
    System.out.println("Main:" + formater.format(new Date()));
    Thread.sleep(800);
    System.out.println("Main begin interrupt thread:" + formater.format(new Date()));
    endThread.interrupt();
    }
    
    }

    4. 线程状态

    1. 线程只有6种状态。整个生命周期就是这几种状态的切换。
    2. run()和start() :run方法就是普通对象的普通方法,只有调用了start()后,Java才会将线程对象和操作系统中实际的线程进行映射,再来执行run方法。
    3. yield() :Thread.yield(),让出cpu的执行权,将线程从运行转到可运行状态,但是下个时间片,该线程依然有可能被再次选中运行 。
    4. 线程的优先级 取值为1~10,缺省为5,最高10最低1,但线程的优先级不可靠,只是一个大概率执行而已,不建议作为线程开发时候的手
    5. 守护线程:和主线程共死,finally不能保证一定执行

    5. synchronized 用法

    整体代码块如下:

    class SleepTools {
    
    /**
    * 按秒休眠
    * @param seconds 秒数
    */
    public static final void second(int seconds) {
    try {
    TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    }
    }
    
    /**
    * 按毫秒数休眠
    * @param seconds 毫秒数
    */
    public static final void ms(int seconds) {
    try {
    TimeUnit.MILLISECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    }
    }
    }
    
    public class SynClzAndInst {
    
    //使用类锁的线程
    private static class SynClass extends Thread {
    @Override
    public void run() {
    System.out.println("TestClass is running...");
    synClass();
    }
    }
    
    //使用对象锁的线程
    private static class InstanceSyn implements Runnable {
    private SynClzAndInst synClzAndInst;
    
    public InstanceSyn(SynClzAndInst synClzAndInst) {
    this.synClzAndInst = synClzAndInst;
    }
    
    @Override
    public void run() {
    System.out.println("TestInstance is running..." + synClzAndInst);
    synClzAndInst.instance();
    }
    }
    
    //使用对象锁的线程
    private static class Instance2Syn implements Runnable {
    private SynClzAndInst synClzAndInst;
    
    public Instance2Syn(SynClzAndInst synClzAndInst) {
    
    this.synClzAndInst = synClzAndInst;
    }
    
    @Override
    public void run() {
    System.out.println("TestInstance2 is running..." + synClzAndInst);
    synClzAndInst.instance2();
    }
    }
    
    //锁对象
    private synchronized void instance() {
    SleepTools.second(3);
    System.out.println("synInstance is going..." + this.toString());
    SleepTools.second(3);
    System.out.println("synInstance ended " + this.toString());
    }
    
    //锁对象
    private synchronized void instance2() {
    SleepTools.second(3);
    System.out.println("synInstance2 is going..." + this.toString());
    SleepTools.second(3);
    System.out.println("synInstance2 ended " + this.toString());
    }
    
    //类锁,实际是锁类的class对象
    private static synchronized void synClass() {
    SleepTools.second(3);
    System.out.println("synClass going...");
    SleepTools.second(3);
    System.out.println("synClass end");
    }
    }

    1. 普通方法对象锁

    调用不同对象时

    public static void main(String[] args) {
    
    SynClzAndInst synClzAndInst = new SynClzAndInst();
    Thread t1 = new Thread(new InstanceSyn(synClzAndInst));
    
    SynClzAndInst synClzAndInst2 = new SynClzAndInst();
    Thread t2 = new Thread(new Instance2Syn(synClzAndInst2 ));
    
    t1.start();
    t2.start();
    SleepTools.second(1);
    }


    结论:当我们操作不同对象时,是可以并行的。

    调用相同对象时

    public static void main(String[] args) {
    SynClzAndInst synClzAndInst = new SynClzAndInst();
    Thread t1 = new Thread(new InstanceSyn(synClzAndInst));
    
    SynClzAndInst synClzAndInst2 = new SynClzAndInst();
    Thread t2 = new Thread(new Instance2Syn(synClzAndInst)); // synClzAndInst2 变成 synClzAndInst  操作的是相同对象
    t1.start();
    t2.start();
    SleepTools.second(1);
    }


    结论:如果操作相同对象则必须排队执行。
    但是如果我们仅仅把 instance2 方法前面的synchronized取消掉,不同线程操作同一个对象,一个调用锁方法一个调用非锁方法也是可以并行的

    //锁对象
    private void instance2() {
    SleepTools.second(3);
    System.out.println("synInstance2 is going..." + this.toString());
    SleepTools.second(3);
    System.out.println("synInstance2 ended " + this.toString());
    }


    最终结论:当前实例对象this ,进入同步代码前要获得当前实例的锁。调用当前实例的非syn方法时可并行。

    2. 静态方法类锁

    一般就是在方法前面加入 static synchronized,锁是当前类的class对象,这是一个特殊对象而已 ,进入同步代码前要获得当前类对象的锁。此时同一个类创建出来对若干对象在调用静态方法时候会有先后顺序。但是如果一个操作实例对象一个
    操作类对象。是不影响的

    public static void main(String[] args) {
    SynClzAndInst synClzAndInst = new SynClzAndInst();
    Thread t1 = new Thread(new InstanceSyn(synClzAndInst));
    t1.start();
    SynClass synClass = new SynClass();
    synClass.start();
    SleepTools.second(1);
    }

    结论:锁是当前类的class对象 ,进入同步代码前要获得当前类对象的锁

    3. 方法块

    在某些情况下,我们编写的方法体可能比较大,同时存在一些比较耗时的操作,而需要同步的代码又只有一小部分,如果直接对整个方法进行同步操作,可能会得不偿失,此时我们可以使用同步代码块的方式对需要同步的代码进行包裹,这样就无需对整个方法进行同步操作了。

    public class synchronizedTest implements Runnable {
    static synchronizedTest instance=new synchronizedTest();
    static int i=0;
    @Override
    public void run() {
    //省略其他耗时操作....
    //使用同步代码块对变量i进行同步操作,锁对象为instance
    synchronized(instance){
    for(int j=0;j<10000;j++){
    i++;
    }
    }
    }
    public static void main(String[] args) throws InterruptedException {
    Thread t1=new Thread(instance);
    Thread t2=new Thread(instance);
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println(i);
    }
    }


    分析:将synchronized作用于一个给定的实例对象instance,即当前实例对象就是锁对象,每次当线程进入synchronized包裹的代码块时就会要求当前线程持有instance实例对象锁,如果当前有其他线程正持有该对象锁,那么新到的线程就必须等待,这样也就保证了每次只有一个线程执行i++;操作。当然除了instance作为对象外,我们还可以使用this对象(代表当前实例)或者当前类的class对象作为锁,如下代码:

    //this,当前实例对象锁
    synchronized(this){
    for(int j=0;j<1000000;j++){
    i++;
    }
    }
    //class对象锁
    synchronized(AccountingSync.class){
    for(int j=0;j<1000000;j++){
    i++;
    }
    }

    Integer 的值 -128 - 127 之间,会使用包装类常量池,如果超出这个范围,不会使用包装类常量池。

    package com.sowhat.demo;
    
    import java.util.concurrent.TimeUnit;
    
    public class Test {
    public static void main(String[] args) {
    User user = new User();
    new Thread(()->{
    user.count();
    },"A").start();
    new Thread(()->{
    user.count();
    },"B").start();
    }
    }
    
    class User{
    private Integer num = 0;
    private Integer id = 0;
    public void count(){
    //        synchronized (num){  //不要锁num 因为该对象在同步块内变化了
    synchronized (id){
    num++;
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName()+"是第"+num+"位访问");
    }
    }
    }

    5. volatile

    适合于只有一个线程写,多个线程读的场景,因为它只能确保可见性。

    java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量。Java语言提供了volatile,在某些情况下比锁更加方便。如果一个字段被声明成volatile,java线程内存模型确保所有线程看到这个变量的值是一致的。

    这句话的含义有两层.

    1. volatile 的写操作, 需要将线程本地内存值,立马刷新到 主内存的共享变量中.
    2. volatile 的读操作, 需要从主内存的共享变量中读取,更新本地内存变量的值.

    由此引出 volatile 的内存语义.

    1. 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存.
    2. 当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量,并更新本地内存的值.
    3. PS :本来在JVM中有共享变量区以及方法的栈区,栈区里有共享区变量的高速缓存的,一般方法区直接用高速缓存区数据来运算。

    volatile 的特性

    1. 可见性 : 对一个volatile的变量的读,总是能看到任意线程对这个变量最后的写入.
    2. 单个读或者写具有原子性 : 对于单个volatile变量的读或者写具有原子性,复合操作不具有.(如i++)
    3. 互斥性 : 同一时刻只允许一个线程对变量进行操作.(互斥锁的特点)

    JMM(java memory model)决定一个线程对共享变量的写入何时对另一个线程可见,JMM定义了线程和主内存之间的抽象关系:共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存保存了被该线程使用到的主内存的副本拷贝,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量。

    声明变量为volatile后 方法区中的数据本地缓存就失效了,每次都要在主内存中拿。

    6. ThreadLocal

    线程局部变量。进来类型简单点哦,可以理解为是个map,比如类型 Map<Thread,Integer>,跟Python中的线程局部变量类似。

    public class UseThreadLocal {
    
    //可以理解为 一个map,类型 Map<Thread,Integer>
    static ThreadLocal<Integer> threadLaocl = new ThreadLocal<Integer>() {
    @Override
    protected Integer initialValue() {
    return 1;
    }
    };
    
    /**
    * 运行3个线程
    */
    public void StartThreadArray() {
    Thread[] runs = new Thread[3];
    for (int i = 0; i < runs.length; i++) {
    runs[i] = new Thread(new TestThread(i));
    }
    for (int i = 0; i < runs.length; i++) {
    runs[i].start();
    }
    }
    
    /**
    * 类说明:测试线程,线程的工作是将ThreadLocal变量的值变化,并写回,看看线程之间是否会互相影响
    */
    public static class TestThread implements Runnable {
    int id;
    
    public TestThread(int id) {
    this.id = id;
    }
    
    public void run() {
    System.out.println(Thread.currentThread().getName() + ":start");
    Integer s = threadLaocl.get();//获得变量的值
    s = s + id;
    threadLaocl.set(s);
    System.out.println(Thread.currentThread().getName() + ":"  + threadLaocl.get());
    //threadLaocl.remove();
    }
    }
    
    public static void main(String[] args) {
    UseThreadLocal test = new UseThreadLocal();
    test.StartThreadArray();
    }
    }

    7. wait notify

    wait是指在一个已经进入了同步锁的线程内,让自己暂时让出同步锁,以便其他正在等待此锁的线程可以得到同步锁并运行,只有其他线程调用了notify方法(notify并不释放锁,只是告诉调用过wait方法的线程可以去参与获得锁的竞争了,但不是马上得到锁,因为锁还在别人手里,别人还没释放),调用wait方法的一个或多个线程就会解除wait状态,重新参与竞争对象锁,程序如果可以再次得到锁,就可以继续向下运行。

    1. wait()、notify()和notifyAll()方法是本地方法,并且为final方法,无法被重写。
    2. 当前线程必须拥有此对象的monitor(即锁),才能调用某个对象的wait()方法能让当前线程阻塞。(这种阻塞是通过提前释放synchronized锁,重新去请求锁导致的阻塞,这种请求必须有其他线程通过notify()或者notifyAll()唤醒重新竞争获得锁)
    3. 调用某个对象的notify()方法能够唤醒一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程; (notify()或者notifyAll()方法并不是真正释放锁,必须等到synchronized方法或者语法块执行完才真正释放锁)
    4. 调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程,唤醒的线程获得锁的概率是随机的,取决于cpu调度
    实现一个连接池

    SqlConnectImpl实现类

    public class SqlConnectImpl implements Connection{
    
    /*拿一个数据库连接*/
    public static final Connection fetchConnection(){
    return new SqlConnectImpl();
    }
    }

    DBTool

    public class DBPool {
    
    //数据库池的容器
    private static LinkedList<Connection> pool = new LinkedList<>();
    
    public DBPool(int initalSize) {
    if (initalSize > 0) {
    for (int i = 0; i < initalSize; i++) {
    pool.addLast(SqlConnectImpl.fetchConnection());
    }
    }
    }
    
    //在mills时间内还拿不到数据库连接,返回一个null
    public Connection fetchConn(long mills) throws InterruptedException {
    synchronized (pool) {
    if (mills < 0) {
    while (pool.isEmpty()) {
    pool.wait();
    }
    return pool.removeFirst();
    } else {
    long overtime = System.currentTimeMillis() + mills;
    long remain = mills;
    while (pool.isEmpty() && remain > 0) // 如果池子为空 而等待时间大0
    {
    pool.wait(remain);
    remain = overtime - System.currentTimeMillis(); // 需要等待的时间
    }
    Connection result = null;
    if (!pool.isEmpty()) {
    result = pool.removeFirst();
    }
    return result;
    }
    }
    }
    
    //放回数据库连接
    public void releaseConn(Connection conn) {
    if (conn != null) {
    synchronized (pool) {
    pool.addLast(conn);
    pool.notifyAll();
    }
    }
    }
    }

    测试代码类

    public class DBPoolTest {
    static DBPool pool = new DBPool(10);
    // 控制器:控制main线程将会等待所有Woker结束后才能继续执行
    static CountDownLatch end;
    
    public static void main(String[] args) throws Exception {
    // 线程数量
    int threadCount = 50;
    end = new CountDownLatch(threadCount);
    int count = 20;//每个线程的操作次数
    AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
    AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
    for (int i = 0; i < threadCount; i++) {
    Thread thread = new Thread(new Worker(count, got, notGot), "worker_" + i);
    thread.start();
    }
    end.await();// main线程在此处等待
    System.out.println("总共尝试了: " + (threadCount * count));
    System.out.println("拿到连接的次数:  " + got);
    System.out.println("没能连接的次数: " + notGot);
    }
    
    static class Worker implements Runnable {
    int count;
    AtomicInteger got;
    AtomicInteger notGot;
    
    public Worker(int count, AtomicInteger got, AtomicInteger notGot) {
    this.count = count;
    this.got = got;
    this.notGot = notGot;
    }
    
    public void run() {
    while (count > 0) {
    try {
    // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
    // 分别统计连接获取的数量got和未获取到的数量notGot
    Connection connection = pool.fetchConn(1000);
    if (connection != null) {
    try {
    System.out.println(Thread.currentThread().getName() + "成功获得链接");
    connection.createStatement();
    connection.commit();
    } finally {
    pool.releaseConn(connection);
    got.incrementAndGet();
    }
    } else {
    notGot.incrementAndGet();
    System.out.println(Thread.currentThread().getName() + "等待超时!");
    }
    } catch (Exception ex) {
    } finally {
    count--;
    }
    }
    end.countDown();
    }
    }
    }

    8. join

    thread.join的含义是当前线程需要等待previousThread线程终止之后才从thread.join返回j继续执行下面。简单来说,就是线程没有执行完之前,会一直阻塞在join方法处。

    Thread.join的作用

    Thread.join一般用在规划多线程执行顺序,下面这段代码演示了Thread.join的作用

    public class JoinDemo extends Thread{
    int i;
    Thread previousThread; //上一个线程
    public JoinDemo(Thread previousThread,int i){
    this.previousThread=previousThread;
    this.i=i;
    }
    @Override
    public void run() {
    try {
    //调用上一个线程的join方法,大家可以自己演示的时候可以把这行代码注释掉
    previousThread.join();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("num:"+i);
    }
    public static void main(String[] args) {
    Thread previousThread=Thread.currentThread();
    for(int i=0;i<10;i++){
    JoinDemo joinDemo=new JoinDemo(previousThread,i);
    joinDemo.start();
    previousThread=joinDemo;
    }
    }
    }
    Thread.join的实现原理

    线程是如何被阻塞的?又是通过什么方法唤醒的呢?先来看看Thread.join方法做了什么事情

    public class Thread implements Runnable {
    ...
    public final void join() throws InterruptedException {
    join(0);
    }
    ...
    public final synchronized void join(long millis) throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;
    if (millis < 0) {
    throw new IllegalArgumentException("timeout value is negative");
    }
    if (millis == 0) { //判断是否携带阻塞的超时时间,等于0表示没有设置超时时间
    while (isAlive()) {//isAlive获取线程状态,无线等待直到previousThread线程结束
    wait(0); //调用Object中的wait方法实现线程的阻塞
    }
    } else { //阻塞直到超时
    while (isAlive()) {
    long delay = millis - now;
    if (delay <= 0) {
    break;
    }
    wait(delay);
    now = System.currentTimeMillis() - base;
    }
    }
    }
    ...

    从join方法的源码来看,join方法的本质调用的是Object中的wait方法实现线程的阻塞,但是我们需要知道的是,调用wait方法必须要获取锁,所以join方法是被synchronized修饰的,synchronized修饰在方法层面相当于synchronized(this),this就是previousThread本身的实例。

    有很多人不理解join为什么阻塞的是主线程呢? 不理解的原因是阻塞主线程的方法是放在previousThread这个实例作用,让大家误以为应该阻塞previousThread线程。实际上主线程会持有previousThread这个对象的锁,然后调用wait方法去阻塞,而这个方法的调用者是在主线程中的。所以造成主线程阻塞。
    为什么previousThread线程执行完毕就能够唤醒住线程呢?或者说是在什么时候唤醒的?
    要了解这个问题,我们又得翻jdk的源码,但是如果大家对线程有一定的基本了解的话,通过wait方法阻塞的线程,需要通过notify或者notifyall来唤醒。所以在线程执行完毕以后会有一个唤醒的操作,只是我们不需要关心。
    接下来在C++ hotspot的源码中找到 thread.cpp,看看线程退出以后有没有做相关的事情来证明我们的猜想.

    void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
    assert(this == JavaThread::current(),  "thread consistency check");
    ...
    // Notify waiters on thread object. This has to be done after exit() is called
    // on the thread (if the thread is the last thread in a daemon ThreadGroup the
    // group should have the destroyed bit set before waiters are notified).
    ensure_join(this);
    assert(!this->has_pending_exception(), "ensure_join should have cleared");
    ...

    观察一下 ensure_join(this)这行代码上的注释,唤醒处于等待的线程对象,这个是在线程终止之后做的清理工作,这个方法的定义代码片段如下

    static void ensure_join(JavaThread* thread) {
    // We do not need to grap the Threads_lock, since we are operating on ourself.
    Handle threadObj(thread, thread->threadObj());
    assert(threadObj.not_null(), "java thread object must exist");
    ObjectLocker lock(threadObj, thread);
    // Ignore pending exception (ThreadDeath), since we are exiting anyway
    thread->clear_pending_exception();
    // Thread is exiting. So set thread_status field in  java.lang.Thread class to TERMINATED.
    java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
    // Clear the native thread instance - this makes isAlive return false and allows the join()
    // to complete once we've done the notify_all below
    //这里是清除native线程,这个操作会导致isAlive()方法返回false
    java_lang_Thread::set_thread(threadObj(), NULL);
    lock.notify_all(thread);//注意这里
    // Ignore pending exception (ThreadDeath), since we are exiting anyway
    thread->clear_pending_exception();
    }

    ensure_join方法中,调用 lock.notify_all(thread); 唤醒所有等待thread锁的线程,意味着调用了join方法被阻塞的主线程会被唤醒
    总结:Thread.join其实底层是通过wait/notifyall来实现线程的通信达到线程阻塞的目的;当线程执行结束以后,会触发两个事情,第一个是设native 线程对象为null、第二个是通过notifyall方法,让等待在previousThread对象锁上的wait方法被唤醒。而上面多demo无非就是连环套性质多wait 跟 notifyall。

    什么时候会使用Thread.join

    在实际应用开发中,我们很少会使用thread.join。在实际使用过程中,我们可以通过join方法来等待线程执行的结果,其实有点类似future/callable的功能。也可以用 CountDownLatch 实现 ,这个更简单 。
    我们通过以下伪代码来说明join的使用场景

    public void joinDemo(){
    //....
    Thread t=new Thread(payService); //可以若干个线程
    t.start();
    //....
    //其他业务逻辑处理,不需要确定t线程是否执行完
    insertData();
    //后续的处理,需要依赖t线程的执行结果,可以在这里调用join方法等待t线程执行结束
    t.join();
    //然后做一个总结性工作,比如我们开多线程处理数据,最终要对数据处理多啊。
    }

    9. 调用yield() 、sleep()、wait()、notify()等方法对锁有何影响?

    1. 线程在执行yield()以后,持有的锁是的,操不释放作系统仍然可能选择到该进程。
    2. sleep()方法被调用以后,持有的锁是不释放的,休眠期间操作系统不会再执行该进程。
    3. 调动方法之前,必须要持有锁。调用了wait()方法以后,锁就会被释放,当wait方法返回的时候,线程会重新持有锁 .
    4. 调动方法之前,必须要持有锁,调用notify()方法本身不会释放锁的,只是告诉调用过wait方法的线程可以去参与获得锁的竞争了.

    参考

    Synchronized的使用
    深入理解Java并发之synchronized实现原理
    volatile
    守护线程共死

    • 点赞 10
    • 收藏
    • 分享
    • 文章举报
    SoWhat1412 博客专家 发布了347 篇原创文章 · 获赞 1996 · 访问量 156万+ 他的留言板 关注
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: