Состояния java.lang.Thread на граблях и примерах

воскресенье, 5 февраля 2017 г.
Каждый java разработчик знает что такое поток, как его запустить и, возможно, поменять ему приоритет или даже сделать его демоном. Сегодня этих поверхностных знаний зачастую достаточно для того, чтобы успешно справляться со своими повседневными задачами, в которых крутые фреймворки всеми силами пытаются скрыть от нас нюансы многопоточности. Но иногда жизнь заставляет спустится на дно  на уровень ниже и познакомиться с нюансами работы с потоками более детально.

В этой статье, по мере решения простых задач, через серию проб и ошибок, мы рассмотрим некоторые нюансы при работе с классом Thread в java, поговорим о том, какие у потоков бывают состояния и при каких условиях поток переходит из одного состояния в другое.

Задача про шагающего робота

Условия задачи:
Надо написать робота который умеет ходить. За движение каждой его ноги отвечает отдельный поток. Шаг выражается в выводе в консоль LEFT или RIGHT. 
В общем случае каркас решения может выглядеть следующим образом:
public class Robot {

    class Leg implements Runnable {
        private final String name;
        Leg(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            while(true) {
                step();
            }
        }
        private void step() {
            System.out.println(name);
        }
    }

    Leg left = new Leg("LEFT");
    Leg right = new Leg("RIGHT");
    void run() {
        new Thread(left).start();
        new Thread(right).start();
    }

    public static void main(String[] args) {
        new Robot().run();
    }
}
Нам остается только переписать метод run так, чтобы вывод в консоль выглядел чередой не повторяющихся строк LEFT и RIGHT:
LEFT
RIGHT
LEFT
RIGHT
...

Вариант первый: разделяемое состояние.

Идея проста: присвоим каждой ноге true или false, заведем разделяемое между потоками значение текущей ноги и на каждой итерации в каждой ноге будем сверять текущее значение разделяемой переменной с внутренним:
public class RobotInfinitloop {

    boolean currentLeg = true;

    class Leg implements Runnable {
        private final String name;
        private final boolean leg;

        Leg(String name, boolean leg) {
            this.name = name;
            this.leg = leg;
        }
        @Override
        public void run() {
            while(true) {
                if (leg == currentLeg) {
                    step();
                    currentLeg = !leg;
                    Thread.yield();
                }
            }
        }

        private void step() {
            System.out.println(name);
        }
    }

    Leg left = new Leg("LEFT", false);
    Leg right = new Leg("RIGHT", true);

    void run() {
        new Thread(left).start();
        new Thread(right).start();
    }

    public static void main(String[] args) {
        new RobotInfinitloop().run();
    }
}
Если запустить этот код, можно обнаружить, что прогулка нашего робота будет совсем не такой долгой как нам хотелось и в некоторых случаях может и вовсе ограничиться одним единственным шагом.

Все из-за того, что для оптимизации производительности для каждого потока создается локальная копия переменной currentLeg, изменения которой не видны другому потоку. Для решения этой проблемы существует ключевое слово volatile, которое говорит о том, что операция над переменной совершенная в одном потоке, должна быть видна в других.
...
volatile boolean currentLeg = true;
...
Теперь два потока бегая в бесконечных циклах и пытаясь перехватить друг у друга разделяемый ресурс, решают нашу задачу.

Обратите внимание на инструкцию Thread.yield(). Метод yield переводит состояние потока из Running в Ready и позволяет планировщику переключиться на другой поток.  Конкретно в нашем примере наличие или отсутствие вызова данного метода не сильно скажется на результате, но на практике может позволить сделать переключение между потоками более предсказуемым.

Но что на счет эффективности нашего решения? Наше приложение порождает два потока, которые не останавливаясь производят вычисления. Если открыть системный монитор в операционной системе, или запустить VisualVM, то можно заметить огромное потребление ресурсов CPU нашей программой. Пока один поток производит вывод в system out, другой наворачивает циклы в пустую. Чем дольше выполняет полезную нагрузку один поток, тем больше пустой работы выполняет второй:
Показания VisualVM
Показания System Monitor

Вариант второй: общий монитор

Было бы здорово останавливать выполнение одного потока на время работы другого, а затем просыпаться и останавливать второй поток.

В классе Thread есть методы suspend() и resume(),  но они помечены как устаревшие и считаются опасными для использования.

Чтобы ответить на вопрос почему, давайте представим, что в программе, выполняющейся в потоке, есть работа с критическими ресурсами (на пример System.out), доступ к которым мы должны получать только через монитор:
public class RobotSuspendResume {

    static final Object monitor = new Object();

    class Leg implements Runnable {
        private final String name;
        Leg(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            while(true) {
                synchronized (monitor) {
                    step();
                    Thread.currentThread().suspend();
                }
            }
        }
        private void step() {
            System.out.println(name);
        }
    }

    Leg left = new Leg("LEFT");
    Leg right = new Leg("RIGHT");
    void run() {
        new Thread(left).start();
        new Thread(right).start(); // DEADLOCK!
    }

    public static void main(String[] args) {
        new RobotSuspendResume().run();
    }
}
Теперь нам требуется чтобы кто-то из вне вовремя будил наши потоки. Но прямая ссылка на поток не часто доступна для вызова resume() и велика вероятность того, что наши потоки так и останутся в состоянии "frozen" processes.

Альтернативой методам suspend() и resume() являются методы wait(), notify() и notifyAll().

Метод wait() переводит поток в состояние Waiting (или Timed Waiting, если указан таймаут ожидания), а методы notify() и notifyAll() возвращают его в состояние Runnable.

Важно понимать, что это методы не класса Thread, а класса Object который может быть легко расшарен между потоками, что позволяет избежать вышеописанных трудностей с методами suspend и resume.

Теперь мы можем разделить между нашими потоками общий монитор и сделав шаг будить всех его владельцев, после чего спокойно начинать ждать пока нас кто-нибудь разбудит:
public class RobotWait {

    private final Object monitor = new Object();

    class Leg implements Runnable {
        private final String name;

        Leg(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            while (true) {
                step();
                monitor.notify();
                try {
                    monitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private void step() {
            System.out.println(name);
        }
    }

    Leg left = new Leg("LEFT");
    Leg right = new Leg("RIGHT");

    void run() {
        new Thread(left).start();
        new Thread(right).start();
    }

    public static void main(String[] args) {
        new RobotWait().run();
    }
}
Запуск этого кода приведет к исключению:
LEFT
Exception in thread "Thread-0" java.lang.IllegalMonitorStateException
RIGHT
 at java.lang.Object.notifyAll(Native Method)
 at ru.dokwork.RobotWait$Leg.run(RobotWait.java:20)
 at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
 at java.lang.Object.notifyAll(Native Method)
 at ru.dokwork.RobotWait$Leg.run(RobotWait.java:20)
 at java.lang.Thread.run(Thread.java:745)

Process finished with exit code 0
Дело в том, что прежде чем выполнить операцию notify(), notifyAll() или wait() поток должен завладеть монитором, на котором он собирается ее выполнить:
while(true) {
    synchronized (monitor) {
        step();
        monitor.notify();
        try {
            monitor.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Это позволяет избежать взаимных блокировок, когда оба потока уходят в ожидание одного и того же монитора. Конкретно в нашем примере велика вероятность того, что оба потока одновременно вызовут метод notify, а затем вместе уснут, и некому будет их разбудить.

Важно не забывать об этом, тк к сожалению, компилятор не может вам об этом напомнить, а вспоминать об этом в в runtime очень не приятно!

С другой стороны метод wait выполняется внутри блока синхронизации и может создаться впечатление, что поток уже никогда не освободит монитор. Но на самом деле это не так. Все дело в том, что метод wait освобождает монитор, а проснувшись, поток попадает в состояние ожидания монитора, который он освободил перед сном.

Раз засыпая поток освобождает монитор, то таких заснувших потоков может быть много - отсюда существование метода notifyAll() и замечание о том, что метод notify() пробуждает случайный из спящих потоков.

Теперь наше решение выглядит куда более разумным, ведь мы избавились от пустой траты ресурсов на выполнение холостых циклов!

Показания VisualVM

Показания System Monitor

Потокам тоже снятся страшные сны

У потока в java есть одна очень важная особенность, которую можно запросто упустить из виду. Она называется spurious wakeup и заключается в том, что поток может выйти из состояния ожидания без явных на то причин. Об этом можно явно прочитать в документации:
A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup.
Непосредственно для нашего примера с шагающим роботом случайное пробуждение не будет иметь сколько нибудь серьезных последствий. И чтобы прочувствовать всю боль от этого эффекта, давайте несколько изменим условия задачи. Пусть наш робот научится лазать по канату. Для этого в отдельном потоке мы будем управлять движением его рук: схватить канат, отпустить канат:
public class RobotClimber {

    private final Object monitor = new Object();

    class Hand implements Runnable {
        private final String name;
        Hand(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            while(true) {
                synchronized (monitor) {
                    grab();
                    monitor.notify();
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                release();
            }
        }
        private void grab() {
            System.out.println(name + ": GRAB");
        }
        private void release() {
            System.out.println(name + ": RELEASE");
        }
    }

    Hand left = new Hand("LEFT");
    Hand right = new Hand("RIGHT");
    void run() {
        new Thread(left).start();
        new Thread(right).start();
    }

    public static void main(String[] args) {
        new RobotClimber().run();
    }

LEFT: GRAB
RIGHT: GRAB
LEFT: RELEASE
LEFT: GRAB
RIGHT: RELEASE
RIGHT: GRAB
LEFT: RELEASE
LEFT: GRAB
RIGHT: RELEASE
RIGHT: GRAB
LEFT: RELEASE
...
Теперь, из-за непреднамеренного просыпания, может случиться непоправимое! Наш робот может сорваться вниз*:

  1. Левая рука схватила канат и уснула
  2. Правая рука разбудила левую и схватила канат
  3. Левая рука проснулась
  4. Левая рука отпустила канат
  5. Неожиданно проснулась правая рука!
  6. Правая рука отпустила канат
  7. Крах!
*Пример приведен чисто теоретический, для наглядной демонстрации последствий описываемой проблемы. Мне не удавалось (да я и не сильно пытался) воспроизвести описываемую последовательность событий на практике

Чтобы это предотвратить, документация советует нам вызывать метод wait в цикле с явной проверкой необходимости проснуться:
public class RobotClimber {

    private final Object monitor = new Object();
    private volatile boolean currentHand = false;

    class Hand implements Runnable {
        private final String name;
        private final boolean hand;
        Hand(String name, boolean hand) {
            this.name = name;
            this.hand = hand;
        }
        @Override
        public void run() {
            while(true) {
                synchronized (monitor) {
                    currentHand = !hand;
                    grab();
                    monitor.notify();
                    while (currentHand != hand) {
                        try {
                            monitor.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                release();
            }
        }
        private void grab() {
            System.out.println(name + ": GRAB");
        }
        private void release() {
            System.out.println(name + ": RELEASE");
        }
    }

    Hand left = new Hand("LEFT", false);
    Hand right = new Hand("RIGHT", true);
    void run() {
        new Thread(left).start();
        new Thread(right).start();
    }

    public static void main(String[] args) {
        new RobotClimber().run();
    }
}

Как остановить поток?

У многих начинающих изучать потоки в java людей возникает вопрос: как в общем случае мне остановить поток? Короткий ответ: никак.

Не смотря на то, что в классе Thread есть подходящий метод stop(), он отмечен как устаревший и вообще предан жесточайшей анафеме!

Почему? Одна из причин заключается в том, что поток может быть остановлен в момент владения монитором и проведения критических операций, в результате чего изменяемый объект будет доступен во вне в непредсказуемом состоянии.

Для примера рассмотрим пример с денежными счетами и переводом средств между ними:
public class Transactions {

    static class Account {
        private long balance = 0;
        Account(long initial) {
            this.balance = initial;
        }
        public long getBalance() {
            return balance;
        }
        void add(long value) {
            this.balance += value;
        }
        void withdraw(long value) {
            if (value > this.balance) {
                throw new IllegalArgumentException();
            }
            this.balance -= value;
        }
    }

    static class Transaction extends Thread {
        private final Object context;
        private final Account from;
        private final Account to;
        private final long value;

        Transaction(Object context, Account from, Account to, long value) {
            this.context = context;
            this.from = from;
            this.to = to;
            this.value = value;
        }

        @Override
        public void run() {
            synchronized (context) {
                from.withdraw(value);
                to.add(value);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Object context = new Object();
        Account first = new Account(100L);
        Account second = new Account(100L);
        Transaction tx_1 = new Transaction(context, first, second, 50L);
        Transaction tx_2 = new Transaction(context, second, first, 150L);
        tx_1.start();
        // tx_1.stop(); <- подобная операция может привести систему в неконсистентное состояние
        tx_1.join(); // ожидаем пополнения второго счета до 150
        tx_2.start();
        tx_2.join(); // ожидаем пополнения первого счета до 200
Метод join() - это метод для ожидания завершения работы потока. Он переводит поток, в котором был вызван, в состояние Waiting до тех пор, пока не завершится тот поток, для которого этот метод был вызван.

Если попытаться остановить первую транзакцию с помощью метода stop() сразу после запуска, то есть вероятность того, что поток будет прерван после списания средств с первого счета, но до перевода их на второй и мы получим неприятную ситуацию с исчезновением средств в системе, что приведет к исключению во время проведения второй транзакции.

Как быть, если нам очень надо сделать транзакции прерываемыми и все таки останавливать потоки? В таком случае мы должны уметь понять, что транзакцию пытаются прервать и предпринять меры по откату уже произведенных операций.

Чтобы указать потоку, что его работа должна быть прервана, существует метод interrupt(). Если поток на момент вызова метода interrupt() находился в ожидании выполнения метода wait(), sleep() или join(), будет сгенерировано исключение InterruptedException. При этом, если поток выполнял вычисления (например бегал в цикле), то эти вычисления не будут прерваны, а поток просто будет отмечен как прерванный.
public class TransactionsInterrupt {

    static class Account {
        private long balance = 0;
        Account(long initial) {
            this.balance = initial;
        }
        public long getBalance() {
            return balance;
        }
        void add(long value) {
            this.balance += value;
        }
        void withdraw(long value) {
            if (value > this.balance) {
                throw new IllegalArgumentException();
            }
            this.balance -= value;
        }
    }

    static class Transaction extends Thread {
        private final Object context;
        private final Account from;
        private final Account to;
        private final long value;

        Transaction(Object context, Account from, Account to, long value) {
            this.context = context;
            this.from = from;
            this.to = to;
            this.value = value;
        }

        @Override
        public void run() {
            synchronized (context) {
                from.withdraw(value);
                if (isInterrupted()) {
                    from.add(value);
                    return;
                }
                to.add(value);
                if (isInterrupted()) {
                    to.withdraw(value);
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Object context = new Object();
        Account first = new Account(100L);
        Account second = new Account(100L);
        Transaction tx_1 = new Transaction(context, first, second, 50L);
        tx_1.start();
        tx_1.interrupt();
        tx_1.join();
        // транзакция должна была откатиться, баланс не должен был измениться
        assert first.getBalance() == 100L;
        assert second.getBalance() == 100L;
    }
}

Не глотайте бездумно InterruptedException!

Исключение InterruptedException таит в себе опасность. Чтобы наглядно ее продемонстрировать, добавим новое условие в старую задачу: добавим метод для остановки нашего шагающего робота. Для этого заменим бесконечный цикл на цикл с условием остановки, когда текущий поток будет прерван:
public class RobotStop {

    private final Object monitor = new Object();

    class Leg implements Runnable {
        private final String name;
        Leg(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            while(!Thread.currentThread().isInterrupted()) {
                synchronized (monitor) {
                    step();
                    monitor.notify();
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        private void step() {
            System.out.println(name);
        }
    }

    Thread left = new Thread(new Leg("LEFT"));
    Thread right = new Thread(new Leg("RIGHT"));

    void run() {
        left.start();
        right.start();
    }

    void stop() throws InterruptedException {
        left.interrupt();
        right.interrupt();
        left.join();
        right.join();
    }

    public static void main(String[] args) throws InterruptedException {
        RobotStop robot = new RobotStop();
        robot.run();
        Thread.sleep(1000L);
        robot.stop();
    }
}
Здесь и ранее, допущена ошибка в обработке исключения InterruptedException из метода wait(). Проблема заключается в том, что при возникновении этого исключения поток не помечается как прерванный и цикл в нашем примере не будет прерван! 

К сожалению, исключение InterruptedException создано как checked, и должно либо фигурировать в сигнатуре метода run(), либо явно обработано внутри метода. Перехватив это исключение мы не должны его просто игнорировать, нам необходимо самостоятельно отметить поток как прерванный:
try {
    monitor.wait();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

Так какие же бывают у потока состояния?

В статье неоднократно упоминались состояния потока и условия перехода из одного состояния в другое. В конце статьи мне бы хотелось подвести итог этому вопросу, приведя наглядную диаграмму правил перехода:

*данная диаграмма не отражает условий смены состояний потока через вызовы методов класса java.util.concurrent.locks.LockSupport.


[1] Java Thread Primitive Deprecation
[2] "Случайный" выход из Object.wait()
[3] Enum Thread.State
[4] А как же всё-таки работает многопоточность?

2 комментария :

  1. В задаче про робота в случае разных реализаций еще интересно наблюдать за количеством шагов за единицу времени для каждой реализации:
    У меня были следующие реализации
    1) синхронизация только лишь по volatile переменной. Прожигание циклов процессора впустую
    2) с использованием монитора и wait/notify

    Интересно, что робот с первой реализацией шагал в 10 раз быстрее второго.

    P.S. эту задачу про робота меня просили решить на собеседовании в одну большую российскую компанию

    ОтветитьУдалить
  2. У меня получилось так

    public static void main(String[] args) {
    new Robot();
    }

    public static class Robot{
    Object CommunicationBetweenFoots = new Object();
    RightFoot RightFoot = new RightFoot(CommunicationBetweenFoots);
    LeftFoot LeftFoot = new LeftFoot(CommunicationBetweenFoots);

    Robot(){
    RightFoot.start();
    LeftFoot.start();
    }
    }
    public static class RightFoot extends Thread{
    Object CommunicationBetweenFoots = new Object();

    public RightFoot(Object communicationBetweenFoots) {
    this.CommunicationBetweenFoots = communicationBetweenFoots;
    }

    @Override
    public void run() {

    while(true) {
    try {
    synchronized(CommunicationBetweenFoots) {
    CommunicationBetweenFoots.wait();
    }
    } catch (InterruptedException e) {}
    System.out.println("RIGHT");
    try {
    sleep(2000);
    } catch (InterruptedException e1) {}
    synchronized(CommunicationBetweenFoots) {
    CommunicationBetweenFoots.notify();
    }
    }
    }

    }
    public static class LeftFoot extends Thread{
    Object CommunicationBetweenFoots = new Object();

    public LeftFoot(Object communicationBetweenFoots) {
    this.CommunicationBetweenFoots = communicationBetweenFoots;
    }

    @Override
    public void run() {
    while(true) {
    System.out.println("LEFT");
    try {
    sleep(2000);
    } catch (InterruptedException e1) {}
    synchronized(CommunicationBetweenFoots) {
    CommunicationBetweenFoots.notify();
    }
    try {
    synchronized(CommunicationBetweenFoots) {
    CommunicationBetweenFoots.wait();
    }
    } catch (InterruptedException e) {}
    }
    }

    }

    ОтветитьУдалить

Ваше мнение мне искренне интересно. Смелее!

Технологии Blogger.