Многопоточность

Зачем нужна?

Поток исполнения - независимая последовательность выполнения инструкций внутри одного процесса с общей памятью, но собственными регистрами и стеком.

Банальное применение - чтобы использовать несколько имеющихся ядер, необходимо в программе завести несколько потоков.

Стоит отметить, что нагрузить ядра можно не только несколькими потоками, но и несколькими процессами. Кроме того, API для работы с несколькими потоками появился ещё до того, как процессоры с несколькими ядрами стали популярными - получается, что потребность в потоках возникла не только для нагрузки ядер. Одна из потребностей заключается в использовании потоков для функций, которые должны работать параллельно основной программе (запускаем какую-нибудь функцию из библиотеки, но не хотим, чтобы она была блокирующей - например, если параллельно нужно читать ввод от пользователя).

API для работы с потоками в C++ появился в 11-м стандарте и был расширен в 20-м. До этого стандарт не предоставлял ничего для работы с многопоточными программами, поэтому люди делали всё средствами ОС.

std::thread

Просто пример:

int main() {
    std::thread th([]{
        std::cout << "Hello, world!\n";
    });
    th.join();
}

Конструктор std::thread принимает функциональный объект, который нужно исполнить в другом потоке.

Деструктор потока аварийно завершает программу вызовом std::terminate, если на момент вызова деструктора поток все ещё работает и не была сделана ни одна из следующих операций: join - дождаться, пока поток выйдет, detach - "отделить поток", он продолжит исполняться, а объект std::thread перестанет быть привязан к этому потоку.

Может возникнуть вопрос, почему не выбрали одну из этих операций, чтобы вызывать её дефолтно в деструкторе. Если это был бы detach, то не понятно, что происходит в таком случае:

int main() {
    std::thread th([]{
        std::cout << "Hello, world!\n";
    });
    th.detach();
}

После выхода из main всё равно все потоки будут убиты при завершении программы, кроме того, при выходе из main вызываются деструкторы глобальных переменных (например, std::cout), к которым может обращаться поток, поэтому делать detach не очень хорошо. В принципе, detach нужен вообще достаточно редко, потому что в основном поток привязан к каким-то глобальным данным (как в примере выше, к глобальной переменной std::cout).

Делать join по дефолту тоже не сработает - некоторые потоки могут остаться бесконечно ждать, если потоку не сообщили, что он должен выходить (например, если поток повиснет в ожидании ввода каких-нибудь данных).

В C++20 появился механизм, позволяющий сообщить потоку, что ему нужно выходить. В связи с этим появился std::jthread, деструктор которого по умолчанию сообщает потоку, что нужно выйти, а потом делает join.

Способа принудительно завершить поток в стандартной библиотеке нет. Если бы такой способ был, то он бы завершал поток в неопределённой точке, что было бы странно для многих программ - например, если бы тред делал какие-то операции со структурой данных, в таком случае она бы осталась в невалидном состоянии.

Такие механизмы есть у ОС - в Windows есть TerminateThread, в описании которого сказано следующее: You should call TerminateThread only if you know exactly what the target thread is doing, and you control all of the code that the target thread could possibly be running at the time of the termination. В принципе, её можно было бы использовать, если мы точно уверены, что поток крутится в каком-нибудь бесконечном цикле и не делает в нём никаких выделений памяти и не трогает глобальные данные. Из-за того, что такая фича редко нужна, она не была внесена в стандартную библиотеку.

std::mutex

std::array<int, 1000> accounts;

void transfer(size_t to, size_t from, int amount) {
    if (accounts[from] < amount) {
        throw std::runtime_error("insufficient funds");
    }
    accounts[from] -= amount;
    accounts[to] += amount;
}

Если функция transfer вызывается из нескольких потоков, то это считается UB. Может возникнуть следующая проблема: если вызывается transfer(1, 2, 100) из двух разных потоков, а accounts[2] == 101, то может возникнуть так, что они оба сначала проверят, что на счету достаточно денег, а потом вычтут по 100. Кроме того, нет никакой гарантии, что операция -= транслируется не в одну инструкцию, а в несколько, что тоже может вызвать проблему. В общем случае, такую проблему называют race condition (состояние гонки).

Нам может помочь примитив синхронизации под названием mutex (от слов mutual exclusion - взаимное исключение). В стандартной библиотеке это класс std::mutex с двумя операциями: lock и unlock. У него может быть два состояния - разблокирован (по умолчанию при создании) и заблокирован. Вызов lock ждёт, пока mutex будет разблокирован, а потом блокирует его. Метод unlock, очевидно, разблокирует mutex.

std::mutex m;
std::array<int, 1000> accounts;

void transfer(size_t to, size_t from, int amount) {
    m.lock();
    if (accounts[from] < amount) {
        m.unlock();
        throw std::runtime_error("insufficient funds");
    }
    accounts[from] -= amount;
    accounts[to] += amount;
    m.unlock();
}

В стандартной библиотеке есть std::lock_guard - RAII-обёртка над мьютексом, которая в конструкторе блокирует его, а в деструкторе разблокирует. Кроме того, обычно мьютекс привязан к каким-то данным, а не функциям, работающим с ними - например, в примере выше, идейно можно думать, что это часть accounts, поэтому иногда мьютекс с данными можно объединить в один объект.

Проблема такого примера в том, что мы блокируем все аккаунты одновременно, лишаясь параллельности. С одной стороны, это плохо, с другой, функция transfer не обязательна должна уметь параллельно работать с несколькими аккаунтами - например, если программа в несколько потоков делает кучу другой работы, но иногда вызывает transfer. Здесь можно сослаться на известный Закон Амдала, который говорит о том, что рост производительности с увеличением числа потоков, ограничен долей операций, которые можно идеально распараллелить.

Допустим, что нам хочется, чтобы независимые по данным вызовы функции transfer могли работать параллельно - можно создать больше мьютексов! Такой приём называется мелкогранулярными блокировками.

struct account {
    std::mutex m;
    int32_t balance;
};

std::array<account, 1000> accounts;

void transfer(size_t to, size_t from, int amount) {
    std::lock_guard<std::mutex> lg_from(accounts[from].m);
    std::lock_guard<std::mutex> lg_to(accounts[to].m);
    if (accounts[from].balance < amount) {
        throw std::runtime_error("insufficient funds");
    }
    accounts[from].balance -= amount;
    accounts[to].balance += amount;
}

И тут мы сталкиваемся с ситуацией, которая называется deadlock: при вызовах transfer(a, b, 42), transfer(b, a, 42) может произойти так, что первый поток залочит мьютекс a, второй залочит b и в итоге первый поток будет бесконечно ждать b, который заблокирован вторым потоком, который бесконечно ждёт a, в итоге программа зависнет. Формально, deadlock - это ситуация, при которой ни один поток находится в ожидании ресурсов, занятых друг другом, и не может продолжить своё исполнение.

Это фиксится введением порядка на мьютексах - назначим им номера и введём на них отношения порядка, в котором будем их блокировать (например, в примере выше номерами могут быть номера аккаунтов, а блокировать можно всегда меньший). Кстати, повторная блокировка мьютекса внутри одного потока это UB, поэтому в примере надо проверитьfrom != to.

Почему блокировать в каком-то порядке всегда это верно? Можно представить ориентированный граф, в котором вершины это мьютексы, а ребро uv означает, что в программе есть место, в котором, удерживая мьютекс u, поток попытается заблокировать мьютекс v. Утверждается, что если в этом графе нет циклов, то дедлок возникнуть не может. Если же на мьютексах введён порядок, то циклы возникнуть не могут, так как все рёбра будут проведены от меньшего к большему.

Ещё одна проблема кода выше - мьютекс на каждый аккаунт заводить дорого, так как sizeof мьютекса это 40 байт. Можно, к примеру, завести по мьютексу на какую-то часть аккаунтов (например, по одному на каждые 100 аккаунтов).

Проблема возникала из-за того, что мы блокировали один мьютекс и ждали разблокировки следующего, не отпуская заблокированный. Если бы мы, например, блокировали один, затем пробовали заблокировать второй, если же не получилось, отпускали первый и повторяли всё заново, то это позволило бы решить проблему. У std::mutex есть метод try_lock, который возвращает false для заблокированного мьютекса, иначе блокирует его и возвращает true.

Кроме того, в стандартной библиотеке есть функция std::lock, которая блокирует несколько объектов, избегая дедлоков неспецифицированной последовательностью вызовов функций lock, unlock, try_lock. В таком случае нельзя использовать lock_guard, потому что он в конструкторе пытается залочить мьютекс. Можно воспользоваться unique_lock, который имеет более тонкую настройку и даёт возможность создать незаблокированный лок:

void transfer(size_t to, size_t from, int32_t amount) {
    if (from == to) {
        return;
    }
    std::unique_lock<std::mutex> lg_from(accounts[from].m, std::defer_lock);
    std::unique_lock<std::mutex> lg_to(accounts[to].m, std::defer_lock);
    std::lock(lg_from, lg_to);
    // ...
}

std::condition_variable

Нельзя просто вставить в каждую функцию очереди мьютекс:

template <typename T>
struct concurrent_queue {
    void push(T value) {
        std::lock_guard<std::mutex> lg(m);
        q.push_back(std::move(value));
    }
    
    bool empty() const {
        std::lock_guard<std::mutex> lg(m);
        return q.empty();
    }
    
    T pop() {
        std::lock_guard<std::mutex> lg(m);
        T result = q.front();
        q.pop_front();
        return result;
    }
private:
    mutable std::mutex m;
    std::deque<T> q;
};

Это не будет работать, когда функция pop вызывается одним потоком у непустой очереди, но пока он ждёт мьютекс, другой поток её опустошает. Одно из решений - возвращать std::optional<T> из pop.

Иногда может понадобиться "ждущий pop". Например, можно крутить в цикле какой-нибудь try_pop, но это не очень выгодно - он не будет совершать полезной работы. Появляется желание уметь "усыпить поток" и потом "пробудить его" из другого потока - для этого существует специальный класс std::condition_variable. С помощью него можно переписать очередь следующим образом:

template <typename T>
struct concurrent_queue {
    void push(T value) {
        std::unique_lock<std::mutex> lg(m);
        q.push_back(std::move(value));
        lg.unlock();
        cv.notify_one();
    }

    bool empty() const {
        std::lock_guard<std::mutex> lg(m);
        return q.empty();
    }

    T pop() {
        std::unique_lock<std::mutex> lg(m);
        cv.wait(lg, [&] {
            return !q.empty();
        });
        T result = q.front();
        q.pop_front();
        return result;
    }
private:
    mutable std::mutex m;
    std::deque<T> q;
    std::condition_variable cv;
};

У него существует несколько операций:

  • wait - поток начинает спать
  • notify_one - пробуждает один поток
  • notify_all - пробуждает все потоки

wait принимает в качестве аргумента unique_lock. Это нужно, чтобы между unlock и вызовом wait другой поток ничего не испортил.

У wait есть две перегрузки - одна принимает только лок, а вторая ещё и предикат. wait, принимающий предикат, эквивалентен следующему:

while (!pred()) {
    wait(lock);
}

Так как между пробуждением потока и захватом блокировки, условие может нарушиться другим потоком, проверку нужно крутить в цикле. Кроме того, потоки могут случайно пробуждаться (spurious wakeup) без нотифая.

Существует специальная перегрузка у wait для таких ситуаций:

cv.wait(lg, [&]{
    return !q.empty();
})

Кажется, что можно оптимизировать функцию push следующим образом - если очередь не пуста, то не пробуждать потоки.

void push(T value) {
    std::lock_guard<std::mutex> lg(m);
    bool was_empty = q.empty()
    q.push_back(std::move(value));
    if (was_empty) {
        cv.notify_one();
    }
}

Проблема в том, что если ждут несколько pop, то может произойти следующее: в очередь произойдёт вставка, она сделает notify только одного pop, затем произойдёт ещё несколько вставок, которые не пробудят другие pop, потому что очередь ещё не пуста.

Может появиться желание ограничить размер очереди, например, если в неё пушат сильно быстрее, чем попают. В таком случае можно сделать ждущий push, заведя для этого вторую condition_variable.

Как это устроено внутри?

Для реализации этих примитивов нужно иметь возможность "усыпить поток", что возможно только с использованием средств ОС. В игрушечных реализациях используют sleep или std::this_thread::yield. Проблема yield в том, что операционная система (а конкретно scheduler) может решить продолжить выполнять этот поток, даже если есть другие кандидаты - как минимум, переключаться на другой поток дороже из-за сброса TLB-кэша и т.д. В ОС есть специальные операции, которые в каком-то смысле говорят, что данный поток будет ждать чего-то. В Linux это называется futex, который говорит, что поток будет ждать "по какой-то переменной/адресу в памяти" и пробуждает другой поток.

Бывают случаи, когда засыпать потоку может быть невыгодно и можно просто немного подождать, пока мьютекс не разлочится. Для этого применяется примитив синхронизации под названием spinlock, который просто крутится в цикле, проверяя, не залочен ли. Большинство реализацией мьютексов совмещают эти подходы: крутятся в цикле какое-то время, а потом засыпают, если мьютекс не разблокировался. В x86 есть специальная инструкция Spin Loop Hint, которую можно вставить внутрь цикла спинлока, чтобы процессор как-нибудь оптимизировал это.

std::atomic

В начале лекции мы говорили о том, что многопоточный не read-only доступ к обычным данным это UB. Почему так было специфицировано?

Во-первых, на это влияет количество разных архитектур процессоров - во многих из них отличаются инструкции, они предоставляют разные гарантии. Во-вторых, не любую переменную можно модифицировать атомарно. Атомарность - это неделимость операции с точки зрения других потоков ("они не видят её промежуточного результата"). Особенно это вызывает проблемы, если переменная находится на стыке кэш-линий, из-за чего возникает необходимость атомарно работать с двумя кэш-линиями сразу, в x86 такое возможно, но со спецэффектами уровня "другие ядра не могут читать из памяти в этот момент" (гуглится по запросу split lock).

В C++ есть класс std::atomic со специализациями для всех встроенных (и не только) типов, предоставляющий атомарный доступ к переменным. Несмотря на то, что многие операции для него транслируются в те же инструкции, что и операции с обычными переменными (например, в x86 load и store для std::atomic<int> это будет просто mov), нельзя считать, что можно пользоваться обычными типами вместо атомиков. Во-первых, в разных архитектурах атомики могут транслироваться в разные инструкции (например, в ARM для этого есть специальные инструкции). Во-вторых, обычные переменные могут по-разному оптимизироваться компилятором, потому что они не дают никаких гарантий на многопоточный доступ.

Через std::atomic первый пример можно переписать так:

std::array<std::atomic<int32_t>, 1000> accounts;

void transfer(size_t to, size_t from, int amount) {
    if (from == to) {
        return;
    }
    int32_t old = accounts[from].load();
    do {
        if (old < amount) {
            throw std::runtime_error("insufficient funds");
        }
    } while (!accounts[from].compare_exchange_weak(old, old - amount));
    accounts[to].fetch_add(amount);
}

int32_t get_balance(size_t account) {
    return accounts[account].load();
}

Операция compare_exchange принимает два аргумента expected и desired и записывает в атомик значение desired, если там записано значение expected, иначе в expected записывается значение из атомика.

Есть compare_exchange_weak и compare_exchange_strong:weak-форме разрешено "спонтанно фейлиться" (вести себя так, будто *this != excepted, даже если они равны), strong-форма гарантирует ожидаемое поведение (внутри там что-то вроде цикла). В случаях, как выше, выгоднее использовать weak-форму, потому что у нас уже есть внешний цикл. На x86 разницы между ними нет, там обе формы выражаются одной ассемблерной инструкцией.

memory_order

Почти у всех операций с атомиками есть дополнительный параметр memory_order, у которого всегда есть дефолтное значение. На эту тему рекомендуется посмотреть C++ and Beyond 2012: Herb Sutter - atomic Weapons 1 of 2, далее следует краткий пересказ основных поинтов доклада.

В упрощённой модели программа исполняется следующим образом: есть какие-то ядра, они соединены в память, из которой они читают и в которую они пишут. Реальный мир устроен не так: есть память, есть кэши разных уровней, какие-то из уровней общие для нескольких ядер и т.д. В такой системе дорого поддерживать иллюзию наивного представления "процессор - память", но как может быть по-другому? Самый простой пример - из-за бранч-предиктора процессор может читать спекулятивно из памяти или просто префетчить при последовательном чтении, аналогично запись может быть отложенной.

То, в каком порядке ядро читает из памяти и пишет в неё становится непредсказуемым из-за оптимизаций, хотя в x86 есть некоторые гарантии на это, о них можно почитать в Intel Software Developer Manual: чтения не переупорядочиваются с другими чтениями, записи не переупорядочиваются с чтениями, идущими до них, записи не переупорядочиваются с другими записями кроме некоторых исключений, чтения могут быть переупорядочены с идущими до них записями в другие места памяти. На эту тему есть забавный пример о том, что даже в однопоточной программе в зависимости от порядка инструкций меняется производительность, потому что процессор обязан соблюдать ордеринг. Для борьбы с реордерингом применяются так называемые барьеры памяти.

Пример:

std::atomic<int> x, y;

void thread_1() {
    x.store(1);
}

void thread_2() {
    y.store(1);
}

void thread_3() {
    int x3 = x.load();
    int y3 = y.load();
    if (x3 == 1 && y3 == 0) {
        std::cout << "x written before y\n";
    }
}

void thread_4() {
    int y4 = y.load();
    int x4 = x.load();
    if (y4 == 1 && x4 == 0) {
        std::cout << "y written before x\n";
    }
}

Представим, что все 4 потоки запускаются одновременно. Если x3 = y3, то поток 3 отработал раньше (или позже) обоих потоков 1 и 2, случай неинтересный, если x3 = 0, y3 = 1, то запись в y могла случиться после чтения x, поэтому ничего сказать нельзя. Интересен один случай - когда x3 = 1, y3 = 0, тогда точно можно сказать, что запись в x произошла раньше записи в y. Аналогично если y4 = 1, x4 = 0, то запись в y произошла раньше записи в x.

Кажется логичным, что одновременно нельзя получить два сообщения - и это правда для атомиков в C++, они гарантируют такое поведение, потому что для locked инструкций реордеринг не наблюдается (например, для записи в атомике используется инструкция xchg, которая по умолчанию считается locked). Но на многих архитектурах обычные операции чтения и записи не обеспечивают такой гарантии. Это означает, что потоки 3 и 4 видят записи от других потоков в разном порядке. Такое, в принципе, может происходит из-за того, на каких ядрах они исполняются, так же на это влияют и кэши. В x86 такого эффекта нет, но в мануале есть интересная фраза "Any two stores are seen in a consistent order by processors other than those perfoming the stores", самый известный пример на эту тему - Memory Reordering Caught in the Act.

В некоторых случаях можно ослабить лишние гарантии для атомиков, чтобы получиться большую эффективность. Для этого и нужен memory_order.

std::string value;
std::atomic<bool> value_present;

void produce() {
    value = "hello";
    value_present.store(true, std::memory_order_release);
    // ... 
}

void try_consume() {
    if (value_present.load(std::memory_order_acquire)) {
        std::string  tmp = value;
    }
}

Все операции, сделанные до memory_order_release не могут быть сделаны после него, аналогично все операции, сделанные после memory_order_acquire не могут быть сделаны до него, и есть дополнительное правило, что если один поток записал в переменную с ордерингом release, а другой прочитал с ордерингом acquire, то все записи, сделанные в первом потоке, будут сделаны до всех чтений во втором потоке. Пример выше без них накладывает больше ненужных гарантий, которые как раз ослабляются параметром memory_order.

Если посмотреть, во что это транслируется, то будет видно, что с дефолтным memory_order запись делается инструкцией xchg, а в случае с memory_order_release, там будет mov.

Ещё есть memory_order_relaxed, который не накладывает дополнительных ограничений на упорядочивание этой операции. По сути, relaxed означает, что "значение когда-нибудь запишется", а нужный ордеринг обеспечивается за счёт других вещей, например, это может быть join, который гарантирует, что все записи потока, который джойнится, будут видны из того, который вызывает join.

void thread1() {
    for (;;) {
        // ...
        number_of_events.fetch_add(1, std::memory_order_relaxed);
    }
}

int main() {
    std::thread th(&thread 1);
    // ...
    th.join();
    number_of_events.load(std::memory_order_relaxed);
}

Также memory_order_relaxed можно использовать при записи в память до создания других потоков, потому что их создание также гарантирует, что они увидят все записи, сделанные до этого.

Если посмотреть на маппинг между операциями атомика и инструкциями процессора, то для x86 имеет смысл использовать разные memory_order только для записи, потому что там любой load мапится в mov.

volatile

Модификатор volatile у переменных в C++ никак не связан с многопоточностью. Если вы вдруг забыли это, то есть полезный сайт, к которому всегда можно обратиться.

Почему у людей возникает сомнение в этом? Во-первых, volatile в других языках имеет другой смысл: например, в Java это аналог атомиков. Во-вторых, строго говоря, набор ограничений на volatile переменные похож на те, которые накладываются на атомики, но не одинаков. В-третьих, до C++11 и в Си volatile использовался при работе с потоками - например, в MSVC есть ключик для совместимости со старыми программами, заменяющий volatile переменные на атомики.

Для чего на самом деле используется volatile?

  • Device memory: подавить оптимизации компилятора при работе с памятью какого-либо устройства (например, чтобы компилятор не заменял несколько записей подряд на последнюю из них)
  • setjmp/longjmp: что-то из Си, нам неактуально, можете погуглить
  • UNIX-signal: если внешний обработчик сигнала работает с переменными, то они должны быть volatile
volatile int a;
void foo() {
    a = 1;
    a = 2; // компилятор обязан сохранить порядок записи
}

std::atomic<int> b;
void bar() {
    b.store(1);
    b.store(2); // компилятор имеет право оставить только запись 2, сделав вид, что другой поток не успел прочитать между записью 1 и 2
    b.load(); // может не читать
}

void progress_bar() {
    for (size_t i = 0; i != 10000; ++i) {
        // ...
        b += 1;
    }
    // может быть соптимизировано в b += 10000; снаружи цикла
}

Но при этом обычно компилятор не оптимизирует такие штуки, потому что это может испортить смысл программы (как в примере с progress_bar), а так же на самом деле редко встречаются юзкейсы, когда можно так соптимизировать. Подробнее про это можно прочитать No Sane Compiler Would Optimize Atomics.

Кэш и многопоточные программы

Есть вот такой пример:

namespace po = boost::program_options;

void thread_proc(std::atomic<int>& v, std::atomic<bool>& finish) {
    while (!finish.load()) {
        v += 1;
    }
}

int main(int argc, char* argv[]) {
    try {
        po::options_description desc("Allowed options");
        desc.add_options()("threads,j", po::value<size_t>(), "set number of threads");
        po::variables_map vm;
        po::store(po::parse_command_line(argc, argv, desc), vm);
        po::notify(vm);
        size_t number_of_threads = 1;
        if (vm.count("threads") != 0) {
            number_of_threads = vm["threads"].as<size_t>();
        }
        for (;;) {
            std::cout << number_of_threads << " threads";
            std::atomic<int> val(0);
            std::atomic<bool> finished(value);
            std::vector<std::threads> threads;
            for (size_t i = 0; i != number_of_threads; ++i) {
                threads.emplace_back(&thread_proc, std::ref(val), std::ref(finished));
            }
            std::this_thread::sleep_for(std::chrono::seconds(2));
            finished.store(true);
            for (auto i = threads.rbegin(); i != threads.rend(); ++i) {
                auto& th = *j;
                th.join();
            }
            std::cout << ", " << val.load() << " iterations" << std::endl;
        }
    } catch(std::exception const& e) {
        std::cer << e.what() << std::endl;
        return EXIT_FAILURE;
    }
}

Эта программа принимает аргументом число N и создаёт N потоков, которые внутри себя инкрементят переменную val в цикле, пока их не остановит значение переменной finished, которое записывается через 2 секунды. В итоге выводим сколько раз за 2 секунды N потоков инкрементят переменную val. Интересно следующее - как при увеличении числа потоков изменится значение переменной.

При запуске на одном потоке результат получился в 4 раза больше, чем на двух потоках, при этом на двух потоках результат получился шумным (скачет от 2 до 4 раз). При увеличении числа потоков дальше результат остаётся примерно одинаковым.

Дело в том, что работать из разных потоков с одной переменной одновременно всё равно нельзя - инкременты у процессора будут выполняться последовательно, поэтому получить результат лучше, чем на одном потоке, не получится. Почему результат для двух потоков был шумным и в несколько раз хуже? Ответ кроется в кэшах - запись в переменную от одного ядра инвалидирует её в других кэшах.

С помощью команды taskset можно ограничить ядра, на которых запускается программа. Если запускать с двумя потоками на ядрах 0, 1 (которые на самом деле одно физическое ядро, просто hyperthreading, а значит кэш у них общий), то результат примерно такой же, как если запускать программу в один поток. Если же запустить на ядрах 0, 2 (разные физические ядра), то результат в 4 раза хуже и при этом он не шумит, потому что кэш у них разный. Шумный результат означал, что иногда планировщик закидывал потоки на одно физическое ядро, а иногда на разные.

Один из способов решения такой проблемы - делать по переменной на каждый поток, а в конце складывать результаты. Давайте заведём N этих переменных:

std::vector<std::atomic<int>> vals(number_of_threads);

И каждому потоку выдадим по своей переменной.

Кажется, что должно стать лучше, ведь теперь все потоки пишут в разные переменные, но результат получился тот же: для одного потока результат тот же, для двух потоков получили в 2-3 раза меньше итераций. Аналогично с исполнениями на одном ядре и на разных. Проблема опять в кэше - соседние переменные из вектора попадают в одну кэш-линию, поэтому, несмотря на то, что переменные разные, один поток инвалидирует кэш-линию, которая нужна второму потоку. Кроме того, даже один и тот же поток может перекидываться ОС с одного ядра на другое, что тоже сказывается на производительности (хоть и не так заметно). Кстати, для двух потоков стало чуть быстрее, чем в прошлом случае TODO: почему - не понятно, если знаете, допишите.

Можно аккуратно подобрать оффсет так, чтобы переменные для разных потоков попадали в разные кэш-линии. Размер кэш-линии на x86 64 байта, размер атомик инта 4 байта, поэтому кажется, что надо делать сдвиг на 16. Тогда при запуске на разных ядрах двух потоков результат получается в 2 раза больше, чем при запуске на одном ядре.

Если покрутить значения, то заметно, что при сдвиге меньше 8 на двух потоках результат как на одном, при сдвиге больше 8 результат на двух потоках стабильно в 2 раза больше. Откуда берётся число 8, если мы посчитали, что сдвиг должен быть равен 16? Вектор выделяет память, выравнивая по 32 байта, поэтому он мог выделить память так, что первые 8 элементов попали в одну кэш-линию, а следующие 8 уже в следующую, поэтому и было достаточно сдвига 8.

Применение многопоточности

Самый канонический пример многопоточности - аллокаторы памяти. Раньше, когда процессоры были одноядерные, проблема нескольких потоков была проблемой корректности, а не перфоманса, поэтому в аллокаторах памяти просто все операции оборачивали в мьютексы. В многоядерных процессорах это стало приводить к проблемам - если два потока аллоцируют и освобождают память в процессе работы, при этом не работая с общими данными, узким местом будет аллокатор памяти.

Один из первых известных аллокаторов с понятным описанием это Hoard. Хоть он и устарел и на практике уже давно не применяется, концепции, описанные там, переняли разработчики других аллокаторов. Они попытались сделать так, чтобы аллокатор хорошо масштабировался на много потоков, при этом пытаясь избежать false sharing памяти, которую возвращает аллокатор - например, если аллокатор выделил два куска памяти для разных потоков, они не должны попадать в одну кэш-линию.

Концептуально они добились этого тем, что сделали N копий однопоточного аллокатора, назначая каждому потоку аллокатор по хешу потока, в итоге потоки равномерно распределяются по аллокаторам, кроме того случая, когда один поток выделил память, а другой освобождает её (в таком случае освобождать должен аллокатор первого потока). Заведение нескольких копий позволило решить проблему false sharing - нужно просто гарантировать, что каждая кэш-линия принадлежит хипу ровно одного из аллокаторов. Ещё один хак - если поток обращается к аллокатору, а тот залочен, он может переходить к следующему и так, пока не найдёт свободный, который он запомнит и будет дальше обращаться только к нему. Подробнее про всё это можно почитать в пейпере по Hoard.

Пример про дедлок

struct integer_generator {
    integer_generator(std::function<void (uint64_t)> on_next_number, uint64_t val)
        : on_next_number(std::move(on_next_number))
        , current_val(val)
        , quit(false)
        , inner_thread([this] { thread_proc(); }) {}

    integer_generator(integer_generator const&) = delete;
    integer_generator& operator=(integer_generator const&) = delete;

    ~integer_generator(){
        {
            std::lock_guard<std::mutex> lg(m1);
            quit = true;
        }
        inner_thread.join();
    }

    void reset(uint64_t val) {
        std::lock_guard<std::mutex> lg(m1);
        current_val = val;
    }
private:
    void thread_proc() {
        for (;;) {
            std::lock_guard<std::mutex> lg(m1);
            if (quit)
                break;
            ++current_val;
            on_next_number(current_val);
        }
    }
    
    std::mutex m1;
    std::function<void (uint64_t)> const on_next_number;
    uint64_t current_val;
    bool quit;
    std::thread inner_thread;
};

struct integer_accumulator
{
    integer_accumulator(uint64_t sum, uint64_t val)
        : current_sum(sum)
        , gen([this] (uint64_t next) {
            std::lock_guard<std::mutex> lg(m2);
            current_sum += next;
            std::cout << "sum = " << current_sum << ", " << next << '\n';
        }, val) {}

    void reset(uint64_t sum, uint64_t val) {
        std::lock_guard<std::mutex> lg(m2);
        current_sum = sum;
        gen.reset(val);
    }

private:
    std::mutex m2;
    uint64_t current_sum;
    integer_generator gen;
};
int main() {
    integer_accumulator acc(0, 0);
    for (;;) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        acc.reset(0, 0);
    }
}

Класс integer_generator хранит внутри себя поток inner_thread, который создаётся в конструкторе и крутится, пока не quit, вызывая функцию on_next_number с увеличивающимся каждый раз значением. Функция reset сбрасывает текущее значение. В деструкторе quit присваивается true и джойнится поток.

Класс integer_accumulator создаёт генератор, передавая функцию для суммирования чисел.

В примере дедлок - acc.reset берёт мьютекс m2 и, удерживая его, заходит в gen.reset и пытается взять мьютекс m1, а в thread_proc происходит наоборот: берётся m1, вызывается on_next_number, который пытается взять m2.

Одно из решений - объединить эти мьютексы в один, другое - заметить, что при вызове on_next_number не требуется удерживать мьютекс m1, но для этого нужно копировать current_val, а это может быть дорого или вообще запрещено в случае более сложного объекта.

Пример может показаться надуманным, но такое возникает достаточно часто, если писать многопоточный код, разбивая на классы. Одна из формулировок, которую используют - "не нужно делать коллбэки (в примере это функция on_next_number), удерживая мьютекс", другая - "то, какие мьютексы удерживает класс - часть интерфейса". Интересно, что в примере оба класса по отдельности корректны, но вместе вызывают дедлок. Получается, что рассуждая о корректности программы, нужно знать, что генератор, вызывая коллбэк, держит тот же внутренний мьютекс, что и лочит reset. Это можно считать проблемой того, как код разбит на классы.

cancellation

Почти наверняка, когда запускаем какой-то поток или даём какому-то потоку сделать какую-то операцию, от него хочется получить какой-то ответ или результат. Есть проблема - может произойти так, что в момент, когда операция завершится, её результат уже не будет никому нужен, тогда возникает несколько вопросов - не получится ли так, что когда операция завершилась, поток обращается к каким-то несуществующим объектам или из программы вышли совсем. Это пример, когда полезна следующая техника cancellation - сообщить потоку, что он больше не нужен и может выходить, например, это можно сделать через std::atomic<bool>.

В C++20 такое стандартизовали, появился std::jthread с похожим функционалом. У него есть stop_token:

int main() {
    std::jthread jt{ [](std::stop_token st) {
        while (!st.stop_requested()) {
            // ...
        }
    }};
    sleep(5);
    jt.request_stop();
    jt.join();
}

request_stop устанавливает stop_token. Деструктор jthread так же вызывает request_stop, если он не был вызван до удаления объекта, и делает jt.join(). Кстати, тут видно, что удобно думать о потоках как об обычных ресурсах.

Здесь возникает сразу несколько проблем. Во-первых, stop_token внутри нужно достаточно часто проверять, чтобы обеспечить реакцию на отмену потока. Во-вторых, даже если проверять часто, во время блокирующих операций (в том числе банального взятия мьютекса) не получится прервать поток. Блокирующие операции специфичны для операционной системы, поэтому нужно пользоваться механизмами, которые предоставляет ОС.

Асинхронные операции

Вместо блокирующих операций можно использовать асинхронные операции, которые более-менее независимо появились в UI и в серверных программах.

Как работает сервер: у него есть n клиентов, из каждого из которых он хочет читать, поэтому просто так делать блокирующие операции не получается. Традиционно делали следующее: каждому соединению заводили поток, что не очень экономно.

Альтернативный подход - спросить у ОС, у какого из сокетов (или файловых дескрипторов), можно "взять данные", такой механизм называется poll, он пробегается по всему массиву файловых дескрипторов и проверяет, готовы ли они. Недостаток poll в том, что он не возвращает, какие именно дескрипторы готовы. Продвинутая его версия - epoll, в который можно не сразу передать n дескрипторов, а добавлять их или убирать по одному. В отличие от poll, он возвращает список только тех дескрипторов, для которых произошли события. Подробнее и понятнее можно почитать в статье на хабре.

Операции такого рода называются асинхронными. Преимущества таких операций в том, что не нужно создавать много потоков, можно самостоятельно приоритизировать сокеты.

Кроме сетевых программ, асинхронные операции нужны в UI-программах. Различные ивенты (нажатие кнопки, клик мышки и т.д.) складывают в одну очередь, не заводя отдельные потоки. Часто внутри там тоже epoll.

Тут был пример UI-программы на QT, смотрите запись.

Про блокирующие операции и cancellation

Пусть есть поток, который повис на одной из блокирующих операций. Как убить такой поток? Поскольку все эти операции зависят от ОС, то и механизмы cancellation'а специфичны для ОС.

Просто пример - сокеты. У них есть функция shutdown, которая отменяет вызов recv на сокете. После её вызова, с сокетом работать нельзя. Важно: не нужно пытаться вызывать close, это не отменит блокирующую операцию.

Пусть есть поток, который сидит в вызове sleep. Как его прервать? Прямого способа нет, поэтому нужно начать издалека и поговорить про UNIX-сигналы (не надо путать с сигналами, о которых говорили на прошлых лекциях). Сигналы похожи на прерывания, но внутри одного процесса. Например, обращение по нулевому указателю, программа прерывается с сигналом SIGSEGV. На самом деле, программа может зарегистрировать функцию, которая называется обработчик сигнала и будет вызвана на него. Обработчик сигнала может быть вызван на любой инструкции, поэтому он не может полагаться на какое-либо состояние программы, вследствие чего внутри обработчика сигнала можно пользоваться ограниченным набором системных вызовов.

void sigsegv_handler(int) {
    int a = 5;
}
int* p;
int main() {
    struct sigaction new_action;
    new_action.sa_handler = &sigsegv_handler;
    sigemptyset(&new_action.sa_mask);
    new_action.sa_flags = 0;
    sigaction(SIGSEGV, &new_action, nullptr);
    *p = 42; // SIGSEGV
}

Если потоку приходит сигнал, то системные вызовы либо дорабатывают до начала обработки сигнала, либо возвращают код ошибки. Это сделано для того, чтобы избежать ситуации, когда обработчик сигнала вызывают системный вызов, на инструкции которого он уже был вызван.

На линуксе есть ещё одно API:

void thread_proc() {
    for (;;) {
        pthread_testcancel();
    }
}
int main() {
    pthread_cancel(handle);
}

Как это работает? pthread_testcancel() пробрасывает исключение, если поток отменён. Его можно использовать самостоятельно, но в куче блокирующих операций из стандартной библиотеки эта проверка уже стоит (например, в read). Применяется это как-то так:

int main() {
    std::thread th([]{
        for (;;) {
            std::string s;
            std::cin >> s; // блокирующая, внутри есть pthread_testcancel
            // вылетает abi::__forced_unwind
            if (!std::cin) {
                break;
            }
            std::cout << s << '\n';
        }
    });
    std::this_thread::sleep_for(std::chrono::seconds(1));
    pthread_cancel(th.native_handle());
    th.join();
}

Хоть это API и кажется удобным, но его хейтят по некоторым причинам:

  • Код, через который пролетает исключение, должен быть exception-safe: сложно использовать в коде на Си.

  • Если где-то используется noexcept, то исключение, которое вылетает при отмене потока, нарушает эти гарантии.

  • В некоторых кастомных реализациях стандартной библиотеки нет пробрасывания исключения при отмене потока, в них поток просто убивают, из-за чего деструкторы не вызываются.

  • abi::__force_unwind нельзя ловить и не пробрасывать, поэтому, если в программе есть catch(...), который не пробрасывает исключение, тогда pthread_cancel тоже использовать нельзя.