Java 多线程的竞争条件、互斥和同步

竞争条件

在《操作系统精髓与设计原理》一书中,对竞争条件的定义如下:多个进程或线程同时读写某些数据项,导致最后的结果取决于这些进程中指令的执行顺序。

也就是说,竞争条件下,计算结果由单线程下的确定状态,变成了多线程下的不确定状态。这个不确定性产生的原因是什么?我们下面来讨论。

在只有单个线程的情况下,所有代码都按照既定的顺序依次执行。例如我们操作某个内存区域,修改某个变量,得到的结果跟代码的逻辑都是相一致的。

但是当多个线程同时执行时,情况就变得麻烦得多了。所谓同时执行,也就是并发,是通过处理器的时间分片来实现的,即每个线程轮流执行一个时间片。在这种情况下,某个线程在时间片用完时,可能还未执行完所有操作,而处理器在中途被切换到其它线程执行。

最糟糕的是,如果被打断的线程正在操作资源 A,而切换到的线程也要操作资源 A,那么处理器再次执行被打断的线程时,资源 A 可能已经被修改得面目全非了,得到的结果自然也就有问题。

我们通过一个例子来理解竞争条件。

/**
 * @author: Wray Zheng
 * @date: 2018-02-01
 * @description: An example of multi-thread
 */
public class MyThread implements Runnable {

    private static String globalBuffer = "";
    private String m_msg;

    public static void main(String[] args) {
        Thread t1 = new Thread(new MyThread("A"), "Thread-A");
        Thread t2 = new Thread(new MyThread("B"), "Thread-B");

        t1.start();
        t2.start();
    }

    public MyThread(String msg) {
        m_msg = msg;
    }

    public static void print(String msg) {
        globalBuffer = msg;
        System.out.println(Thread.currentThread().getName() + ": " + globalBuffer);
    }

    @Override
    public void run() {
        try {
            while (true) {
                print(m_msg);
                Thread.sleep(500);
            }
        } catch(Exception e) {}
    }

}

示例程序的关键点在于全局变量 globalBuffer 和打印函数 print。print 函数非常简单,就是将参数先赋值给 globalBuffer,然后将它打印出来。

按照代码的逻辑,输出的结果应该是线程 A 打印字符 A,线程 B 打印字符 B。但结果却是:

Thread-B: A
Thread-A: A
Thread-B: B
Thread-A: A
Thread-A: B
Thread-B: B
Thread-A: A
Thread-B: A

导致输出错误的原因在于,线程 A 和 B 都需要对共享资源 globalBuffer 进行读写,而读写的过程,即 print 函数,并不是一个原子(atomic)操作。

由于并发的特点,线程 A 在执行 print 函数的第一条赋值语句之后,可能会被打断,处理器转而执行线程 B 的 print 函数。当再次回到线程 A 执行 print 函数时,此时的 globalBuffer 已经被线程 B 修改了,因此输出错误的结果。

互斥

为了解决竞争条件带来的问题,我们可以对资源上锁。多个线程共同读写的资源称为共享资源,也叫 临界资源。涉及操作临界资源的代码区域称为 临界区(Critical Section)。同一时刻,只能有一个线程进入临界区。我们把这种情况称为互斥,即不允许多个线程同时对共享资源进行操作。

临界区是如何实现互斥的?我们继续分析。

进入临界区前,需要先获得互斥锁。如果已经有线程正在使用资源,那么需要一直等待,直到其它线程归还互斥锁。

操作完共享资源之后,即退出临界区时,需要归还互斥锁,以便其它等待使用该资源的线程能够进入临界区。

伪代码示例:

wait(lock); //获得互斥锁
{
    临界区,操作共享资源
}
signal(lock); //归还互斥锁

Java 中可以使用 ReentrantLock 对临界区上锁,防止多个线程同时进入临界区:

private static Lock bufferLock = new ReentrantLock();

public static void print(String msg) {
    bufferLock.lock();
    //临界区,操作临界资源 globalBuffer
    bufferLock.unlock();
}

这里我们只需要在临界区前使用 lock() 上锁,在临界区后使用 unlock() 解锁即可,java.util.concurrent 帮我们实现了临界区前判断锁状态的工作,会自己决定是阻塞还是进入临界区。

synchronized 关键字

java 为我们提供了更加简便的方式,用于实现临界区的互斥。

例如,我们可以为操作共享资源的函数加上 synchronized 关键字:

public synchronized void myFunction() {
    //操作共享资源 A
}

synchronized 关键字本质上是使用对象内部的锁,因为每个对象自身有且只有一个锁,因此加了 synchronized 关键字的方法在执行时,该对象所有的 synchronized 方法都不能被其它线程执行,但是这些方法和该类的其它对象的方法并不互相干扰,也就是说其他对象的 synchronized 方法此时也可以执行。

通过这种方式,能够确保同一时刻最多只有一个线程在执行该对象的某个 synchronized 方法。如果资源 A 只在该方法中读写,那么可以保证资源 A 不会出现被多个线程同时读写的情况。

下面我们给出另一种方法来实现资源使用的互斥。

synchronized 代码块

通过声明函数为 synchronized 的方式,只能实现函数体的互斥。要确保资源使用的互斥,即同一时刻只能有一个线程使用该资源,可以将操作资源 A 的语句放入 synchronized 代码块:

public void function1() {
    ......
    synchronized (A) {
        //操作资源 A
    }
    ......
}

public void function2() {
    ......
    synchronized (A) {
        //操作资源 A
    }
    ......
}

这样,对于资源 A 来说,同一时刻,只能有一个对应的 synchronized 代码块执行。因此,无论是在哪个地方使用资源 A,都不会出现多个线程竞争该资源的情况。

同步

多个线程通过协作的方式,对相同资源进行操作,这种行为称为同步。同步实际上就是线程间的合作,只不过合作时需要操作同一资源。

一个著名的例子就是生产者-消费者问题。

现在有一个生产者和一个消费者,生产者负责生产资源,并放在盒子中,盒子的容量无限大;消费者从盒子中取走资源,如果盒子中没有资源,则需要等待。

这个问题涉及两个方面:互斥、同步(合作)。互斥是指同一时刻,只能有一方使用盒子。同步指的是消费者消费时,需要满足一个条件:盒子中的资源不为零,这个条件需要双方的合作才能完成,即消费者不能无限制地消费资源,需要在生产者生产的资源有剩余时才能进行消费。

这个例子中,消费者不仅受到盒子互斥锁的限制,还必须等到资源不为零时才可消费。

用伪代码表示双方的操作如下:

private static Box box = new Box();
private static int boxSize = 0;

public static void producer() {
    wait(box);
    //往 box 中放入资源,boxSize++
    signal(box);
}

public static void consumer() {
    while (boxSize == 0); //资源为零时阻塞
    wait(box);
    //从 box 中取出资源,boxSize--
    signal(box);
}

public static void main(String[] args) {
    parbegin(producer, consumer); //两个函数由两个线程并发执行
}

判断“资源不为零”这个条件需要在取得互斥锁之前进行,否则,如果先取得盒子的互斥锁,再进行条件判断,就可能出现死锁,即两个线程都被永久性阻塞。

这是因为消费者取得互斥锁后,如果发现盒子不满足“资源不为零”这个条件,就会阻塞,等待生产者生产资源。而此时由于盒子被消费者使用,因此生产者也会阻塞,等待消费者用完盒子。于是两个线程均处于阻塞状态,且再也无法解除阻塞。

Condition 对象

我们上面讲到,资源使用条件的判断应该在获得互斥锁之前,防止死锁的产生,对于正常的代码逻辑确实应该如此,但是 Java 中锁的实现较为特殊,应该先获得互斥锁,再判断是否满足使用条件,如果不满足条件,则使用 Condition 对象的 await() 方法放弃已获得的互斥锁,并阻塞到条件满足为止。

也就是说,通常的逻辑是:先判断资源使用条件,如果满足再获得互斥锁。而 Java 的逻辑是:先获得互斥锁,如果不满足资源使用条件,再放弃已获得的互斥锁。

在 Java 中通过调用 Lock 对象的 newCondition() 方法来创建 Condition 对象,这也意味着 Condition 与 Lock 是相关联的。当获得互斥锁之后,如果条件不满足,那么我们使用 Condition 对象的 await() 方法来放弃已经获得的锁,以便其它线程使用共享资源,这样可以避免死锁的产生。同时,当前线程会开始等待,一旦其它线程调用了 Condition 对象的 signalAll() 方法,当前线程就会收到通知,再次检查条件,如果满足就会等待获取互斥锁。

下面给出具体示例:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author: Wray Zheng
 * @date: 2018-02-02
 * @description: An example of synchronization between multiple threads
 */
public class Synchronization {

    private static int resourceCount = 3;
    private static Lock boxLock = new ReentrantLock();
    private static Condition resourceAvailable = boxLock.newCondition();

    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            try {
                while (true) producer();
            } catch (InterruptedException e) {}
        });
        Thread consumer = new Thread(() -> {
            try {
                while (true) consumer();
            } catch (InterruptedException e) {}
        });

        producer.start();
        consumer.start();
    }

    public static void producer() throws InterruptedException {
        boxLock.lock();
        resourceCount++;
        resourceAvailable.signalAll();
        System.out.println("Producer: boxSize + 1 = " + resourceCount);
        boxLock.unlock();
        Thread.sleep(1000);
    }

    public static void consumer() throws InterruptedException {
        boxLock.lock();
        try {
            while (resourceCount == 0) 
                resourceAvailable.await();
            resourceCount--;
            System.out.println("Consumer: boxSize - 1 = " + resourceCount);
        }
        finally {
            boxLock.unlock();
        }
        Thread.sleep(500);
    }

}

示例中创建了两个线程:消费者线程和生产者线程。生产者不断生产资源,而消费者不断消耗资源,为了验证消费者在资源为零时会阻塞到生产者再次产生资源为止,这里让消费者的消耗速度大于生产者的生产速度,并且将初始的资源数 resourceCount 设置为 3。

某次运行该程序,得到如下结果:

Producer: resourceCount + 1 = 4
Consumer: resourceCount - 1 = 3
Consumer: resourceCount - 1 = 2
Producer: resourceCount + 1 = 3
Consumer: resourceCount - 1 = 2
Consumer: resourceCount - 1 = 1
Consumer: resourceCount - 1 = 0
Producer: resourceCount + 1 = 1
Consumer: resourceCount - 1 = 0
Producer: resourceCount + 1 = 1
Consumer: resourceCount - 1 = 0
Producer: resourceCount + 1 = 1
Consumer: resourceCount - 1 = 0

可以看到,一开始资源消耗速度比生产速度快得多,但消耗完之后,消费者不得不每次都等待生产者生产资源,于是每次都被迫阻塞一段时间,最后消耗速度与生产速度保持一致。

wait()、notifyAll()

实际上,Java 中的 Object 对象自带了与 Condition 对象的 await() 和 signalAll() 相对应的方法:wait() 和 notifyAll()。

由于所有对象都继承自 Object,因此我们可以通过 synchronized 关键字使用共享资源对象内部的锁,并且使用对象的 wait() 和 notifyAll() 来实现线程间的同步。

前面的生产者-消费者程序,可以改写如下:

/**
 * @author: Wray Zheng
 * @date: 2018-02-02
 * @description: An example of synchronization between multiple threads
 */
public class Synchronization {

    private static int resourceCount = 3;
    private static Object box = new Object();

    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            try {
                while (true) producer();
            } catch (InterruptedException e) {}
        });
        Thread consumer = new Thread(() -> {
            try {
                while (true) consumer();
            } catch (InterruptedException e) {}
        });

        producer.start();
        consumer.start();
    }

    public static void producer() throws InterruptedException {
        synchronized (box) {
            resourceCount++;
            box.notifyAll();
            System.out.println("Producer: resourceCount + 1 = " + resourceCount);
        }
        Thread.sleep(1000);
    }

    public static void consumer() throws InterruptedException {
        synchronized (box) {
            while (resourceCount == 0) 
                box.wait();
            resourceCount--;
            System.out.println("Consumer: resourceCount - 1 = " + resourceCount);
        }
        Thread.sleep(500);
    }

}

比起手动创建 Lock 对象、Condition 对象来实现线程间的同步,直接使用 synchronized 代码块和对象自带的 wait()、notifyAll() 是不是方便多了?

补充

对于声明了 synchronized 的函数而言,可以通过以下方式实现同步:

public synchronized void function() {
    while (!condition)
        wait();
    //操作共享资源
    notifyAll();
}

总结

Java 中多线程的互斥和同步,与操作系统中进程的互斥和同步的原理都是类似的。

在 Java 中,多个线程操作同一资源时,可以使用 ReentrantLock 对象的 lock() 和 unlock() 来实现资源的使用互斥,也可以使用 synchronized 代码块来实现。声明函数时添加 synchronized 关键字,可以实现函数体的互斥。

多线程的同步,不仅需要实现资源的使用互斥,还需要满足一定的条件才能使用。

可以通过 Lock 对象的 newCondition() 方法得到一个 Condition 对象,在获得互斥锁时,如果不满足资源的使用条件,那么可以调用 Condition 对象的 await() 方法来放弃互斥锁,同时阻塞到条件满足时才再次获取互斥锁。

还可以使用 Java 对象自带的 wait() 和 notify() 方法,与 synchronized 关键字共同使用,可以更简便地实现多线程的同步。

相关文章

Loading Likes...

发表评论

电子邮件地址不会被公开。 必填项已用*标注