Кратко
СкопированоAtomics
— объект, который содержит статические методы для выполнения атомарных операций.
Потребность в использовании атомарных операций возникает при работе с SharedArray
, благодаря которому появляется возможность разделения общей памяти между потоками. При работе с общей памятью есть риск возникновения состояния гонки (race condition) без контроля доступа к общему состоянию.
При правильном использовании Atomics
мы можем гарантировать, что изменения записываются и читаются, а операции не прерываются и завершаются до начала следующей.
Рассмотрим пример, позволяющий лучше понять важность применения атомарных операций.
const buffer = new SharedArrayBuffer(1024);const sharedMemory = new Int32Array(buffer);sharedMemory[0] = sharedMemory[0] + 5// Эквивалентная запись (только при выполнении в одном потоке)let temp = sharedMemory[0]temp += 5sharedMemory[0] = temp
const buffer = new SharedArrayBuffer(1024); const sharedMemory = new Int32Array(buffer); sharedMemory[0] = sharedMemory[0] + 5 // Эквивалентная запись (только при выполнении в одном потоке) let temp = sharedMemory[0] temp += 5 sharedMemory[0] = temp
При работе в одном потоке, как все мы привыкли, в такой записи не будет проблемы. Потому что в одном потоке в единый момент времени выполняется только одна функция. При работе с общей памятью подобной гарантии нет. Пока в одном потоке увеличивается значение переменной temp
, другой поток успевает изменить значение shared
.
Для решения этой проблемы можно использовать метод Atomics
и Atomics
.
const buffer = new SharedArrayBuffer(1024);const sharedMemory = new Int32Array(buffer);Atomics.add(sharedMemory, 0, 5)// 0Atomics.load(sharedMemory, 0)// 5
const buffer = new SharedArrayBuffer(1024); const sharedMemory = new Int32Array(buffer); Atomics.add(sharedMemory, 0, 5) // 0 Atomics.load(sharedMemory, 0) // 5
Использование Atomics
СкопированоAtomics
— многофункциональный инструмент, необходимый для работы с многопоточностью. К сожалению, он очень не прост в использовании из-за низкоуровнего API. По этой причине в чистом виде Atomics
применяют не часто. Часто проще использовать более высокоуровневое API, построенное на Atomics
.
Один из распространённых паттернов многопоточного программирования — критические секции. Вы можете знать о таких примитивах из других языков программирования, как: Mutex
, Semaphore
, ReadWrite
и других. Их возможно реализовать и в JavaScript. Для того чтобы лучше понять область применения Atomics
, рассмотрим пример получения состояния гонки.
Как получить состояние гонки
СкопированоДопустим, у нас есть общая переменная. В одном потоке хотим 10 000 000 раз увеличить её на 1, а в другом 10 000 000 раз уменьшить на 1. Что мы получим в результате? 0?
В примерах специально опущены детали передачи данных между потоками, потому что это отдельная тема
// Главный поток (main thread)// Инициализируем памятьconst buffer = new SharedArrayBuffer(1024);const sharedMemory = new Int32Array(buffer);// Отправка данных в worker...for (let i = 0; i < 10_000_000; i++) { sharedMemory[0] = sharedMemory[0] + 1}// Worker// Получаем данные из main threadconst sharedMemory = new Int32Array(buffer);for (let i = 0; i < 10_000_000; i++) { sharedMemory[0] = sharedMemory[0] - 1}
// Главный поток (main thread) // Инициализируем память const buffer = new SharedArrayBuffer(1024); const sharedMemory = new Int32Array(buffer); // Отправка данных в worker... for (let i = 0; i < 10_000_000; i++) { sharedMemory[0] = sharedMemory[0] + 1 } // Worker // Получаем данные из main thread const sharedMemory = new Int32Array(buffer); for (let i = 0; i < 10_000_000; i++) { sharedMemory[0] = sharedMemory[0] - 1 }
Вывод в консоль может выглядеть как в примере ниже, но каждый раз он будет случайным, все будет зависить от того, что у системного планировщика на уме.
> Main Thread Started: 0 > Worker Started: 352 > Main Thread Finished: -2573302 > Worker Finished: -2741254
Разрешение состояния гонки
СкопированоПри разрешении состояния гонки мы можем использовать Mutex
. Mutex
обеспечивает механизм критических секций. Критическая секция — это часть кода между вызовами блокировки и разблокировки доступа к общему состоянию. Проще говоря, Mutex
позволяет только одному потоку в единый момент времени владеть общим состоянием (SharedArrayBuffer в контексте JavaScript).
Если кратко, то:
- У
Mutex
есть два состояния «заблокирован» и «разблокирован». - Пока
Mutex
заблокирован, остальные потоки ожидают получения права на блокировку общего состояния. - В момент разблокировки
Mutex
уведомляет один ожидающий поток. Mutex
не может быть разблокирован дважды.
Реализация Mutex
на Atomics
СкопированоВ JavaScript Mutex
на Atomics
реализуется разными способами. Самый распространённый основан на методах Atomics
, Atomics
, Atomics
и Atomics
. Посмотрим на такую реализацию.
const INDEX = 0const UNLOCKED = 0const LOCKED = 1class Mutex { constructor(sharedArrayBuffer) { this.arrayView = new Int32Array(sharedArrayBuffer) } lock() { while (true) { const oldValue = Atomics.compareExchange( this.arrayView, INDEX, UNLOCKED, LOCKED ) if (oldValue === UNLOCKED) { return } Atomics.wait(this.arrayView, INDEX, LOCKED) } } unlock() { const oldValue = Atomics.compareExchange( this.arrayView, INDEX, LOCKED, UNLOCKED ) if (oldValue === UNLOCKED) { throw new Error('Mutex уже был разблокирован!') } Atomics.notify(this.arrayView, INDEX, 1) } executeLocked(callback) { const tryGetLock = async () => { while (true) { const oldValue = Atomics.compareExchange( this.arrayView, INDEX, UNLOCKED, LOCKED ); if (oldValue === UNLOCKED) { callback(); this.unlock() return } const result = Atomics.waitAsync(this.arrayView, INDEX, LOCKED) await result.value } } tryGetLock() }}
const INDEX = 0 const UNLOCKED = 0 const LOCKED = 1 class Mutex { constructor(sharedArrayBuffer) { this.arrayView = new Int32Array(sharedArrayBuffer) } lock() { while (true) { const oldValue = Atomics.compareExchange( this.arrayView, INDEX, UNLOCKED, LOCKED ) if (oldValue === UNLOCKED) { return } Atomics.wait(this.arrayView, INDEX, LOCKED) } } unlock() { const oldValue = Atomics.compareExchange( this.arrayView, INDEX, LOCKED, UNLOCKED ) if (oldValue === UNLOCKED) { throw new Error('Mutex уже был разблокирован!') } Atomics.notify(this.arrayView, INDEX, 1) } executeLocked(callback) { const tryGetLock = async () => { while (true) { const oldValue = Atomics.compareExchange( this.arrayView, INDEX, UNLOCKED, LOCKED ); if (oldValue === UNLOCKED) { callback(); this.unlock() return } const result = Atomics.waitAsync(this.arrayView, INDEX, LOCKED) await result.value } } tryGetLock() } }
Состояние Mutex
может принимать значения LOCKED
и UNLOCKED
и хранится по индексу INDEX
. Учитывайте, что в коде используем Int32Array
для хранения состояния, потому что некоторые атомарные операции, такие как Atomics
, Atomics
и другие, работают только с Int32Array
или BigInt64
.
Метод lock
пытается перевести Mutex
в заблокированное состояние. Это делается с помощью Atomics
и Atomics
. Благодаря Atomics
, состояние изменяется на заблокировано только в том случае, если оно было разблокировано. Atomics
возвращает предыдущее состояние. Его используют, чтобы проверить, удалось ли получить блокировку или нет. Если нет, то с помощью Atomics
ожидаем, пока кто-нибудь не разблокирует Mutex
и не уведомит нас об этом.
С методом unlock
всё проще. Состояние изменится на разблокировано только в том случае, если оно было заблокировано. В ином случае выкидывается ошибка. Далее, с помощью Atomics
, один ожидающий агент уведомляется о возможности получения блокировки. Если ожидающих агентов нет, уведомление игнорируется.
Метод execute
реализован по аналогии с lock
, но он асинхронный, чтобы использовать его в главном потоке (main thread).
Применение Mutex
на Atomics
СкопированоВот так можно использовать Mutex
, построенный на Atomics
, для разрешения состояния гонки.
// Главный поток (main thread)const buffer = new SharedArrayBuffer(1024);const sharedMemory = new Int32Array(buffer);const mutexBuffer = new SharedArrayBuffer(4)const mutex = new Mutex(mutexBuffer)// Отправка данных в worker...for (let i = 0; i < 10_000_000; i++) { // Асинхронно получаем блокировку mutex.executeLocked(() => { sharedMemory[0] = sharedMemory[0] + 1 })}// Worker// Получаем данные из main threadconst sharedMemory = new Int32Array(buffer);const mutex = new Mutex(mutexBuffer)for (let i = 0; i < 10_000_000; i++) { // Синхронно получаем блокировку mutex.lock() sharedMemory[0] = sharedMemory[0] - 1 mutex.unlock()}
// Главный поток (main thread) const buffer = new SharedArrayBuffer(1024); const sharedMemory = new Int32Array(buffer); const mutexBuffer = new SharedArrayBuffer(4) const mutex = new Mutex(mutexBuffer) // Отправка данных в worker... for (let i = 0; i < 10_000_000; i++) { // Асинхронно получаем блокировку mutex.executeLocked(() => { sharedMemory[0] = sharedMemory[0] + 1 }) } // Worker // Получаем данные из main thread const sharedMemory = new Int32Array(buffer); const mutex = new Mutex(mutexBuffer) for (let i = 0; i < 10_000_000; i++) { // Синхронно получаем блокировку mutex.lock() sharedMemory[0] = sharedMemory[0] - 1 mutex.unlock() }
Ниже пример вывода в консоль. Вне зависимости от ресурсов компьютера или очерёдности работы потоков, в результате всегда будет 0. Так поисходит благодаря Mutex
и, в частности, Atomics
. Потоки по очереди получают блокировку и изменяют состояние.
> Main Thread Started: 0 > Worker Started: 0 > Main Thread Finished: 3232452 > Worker Finished: 0
Конечно, в идеальной ситуации Mutex
встраивается в структуру данных, с которой работаем. Предыдущий пример просто показывает, как используют Atomics
в JavaScript.
Статические методы
СкопированоAtomics
прибавляет заданное значение по индексу, возвращает предыдущее значение.
Atomics
вычитает заданное значение по индексу, возвращает предыдущее значение.
Atomics
вычисляет побитовое «И» с заданным значением и значением по индексу. Возвращает предыдущее значение.
Atomics
вычисляет побитовое «ИЛИ» с заданным значением и значением по индексу. Возвращает предыдущее значение.
Atomics
вычисляет побитовое исключающее «ИЛИ» с заданным значением и значением по индексу. Возвращает предыдущее значение.
Atomics
обновляет значение по индексу только если оно равно указанному значению. Возвращает предыдущее значение.
Atomics
обновляет заданное значение по индексу, возвращает предыдущее значение.
Atomics
возвращает значение по индексу.
Atomics
уведомляет о числе агентов, ожидающих по указанному индексу. Возвращает число уведомлённых агентов.
Atomics
сохраняет заданное значение по индексу, возвращает сохранённое значение.
Atomics
проверяет значение по индексу и уходит в режим ожидания до уведомления о пробуждении или истечения времени ожидания. Нельзя использовать в главном потоке в большинстве браузеров, так как является блокирующим.
Atomics
— неблокирующий аналог Atomics
, возвращает Promise
.