C++14でshared_timed_mutexがstdに含まれましたが、リードからライトへ昇格(Upgrade)はサポートされませんでした。
実は昇格も比較的簡単に自前で実装できます。C++11以降はstd::threadやstd::atomicなど、スレッドに関する機能が大幅に強化されているので、これらを使って昇格可能なロックを実装してみます。
基本のロック
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Atomic { | |
std::atomic<int> val = 0; | |
public: | |
void Lock() | |
{ | |
int expected = 0; | |
while (!val.compare_exchange_weak(expected, 1)) { | |
expected = 0; | |
} | |
} | |
void Unlock() | |
{ | |
val.exchange(0); | |
} | |
}; |
std::atomicを使います。0ならロックされていない状態、1ならロック状態です。Lockを呼んだ時点ですでにロックされていたらその場でループして待ちます。
compare_exchange_weakというのは、いわゆる"Compare and Swap"、略してCASと呼ばれます。比較と交換を1命令で行う、マルチスレッドの実装に必須の命令です。
whileでループしているので、ロック取得の待機中はCPU時間を消費します。この方式をスピンロックといいます。比較的短時間のロックは効率的ですが、ロック時間が長くなると無駄が生じる特性ががあります。今回の記事はすべてスピンロックで実装しました。
最も簡単なreaders-writer lock
リードロックとライトロックを分けた実装です。リードロックは複数スレッドから同時に可能で、ライトは1つのスレッドから可能です。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Atomic { | |
std::atomic<int> val = 0; | |
static const int writeLockBit = 0x40000000; | |
void LockInternal(int delta, int testBits) | |
{ | |
int expected; | |
do { | |
do { | |
expected = val; | |
} while (expected & testBits); | |
} while (!val.compare_exchange_weak(expected, expected + delta)); | |
} | |
public: | |
void ReadLock() { LockInternal(1, writeLockBit); } | |
void ReadUnlock() { val.fetch_sub(1); } | |
void WriteLock() { LockInternal(writeLockBit, 0xffffffff); } | |
void WriteUnlock() { val.fetch_sub(writeLockBit); } | |
}; |
val変数の0x40000000ビットをライトロックの為に使い、それより下のビットはリードロックの個数を記録するカウンタとして動作します。
LockInternal関数がポイントで、valの中にtestBitと被るビットがある間は待機し、testBitと被らなくなったらdeltaをvalに加算します。この関数に渡す数値の組み合わせだけでリードロックとライトロックが実現できます。
これを表にするとこうなります。
ステート移動 | 立てるビット | 待機するビット |
アイドル→リード | リード | ライト |
アイドル→ライト | ライト | リード、ライト |
昇格可能なreaders-writer lock
リードロックからライトロックへの昇格を実装してみます。実は「昇格」をどう定義するかで実装が変わってしまうのですが、以下は実装の一例です。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Atomic { | |
std::atomic<int> val = 0; | |
static const int writeLockBit = 0x40000000; | |
static const int upgradingBit = 0x00010000; | |
static const int writeUpgradeingBits = 0x7fff0000; | |
static const int readerBits = 0x0000ffff; | |
void LockInternal(int delta, int testBits) | |
{ | |
int expected; | |
do { | |
do { | |
expected = val; | |
} while (expected & testBits); | |
} while (!val.compare_exchange_weak(expected, expected + delta)); | |
} | |
public: | |
void ReadLock() { LockInternal(1, writeUpgradeingBits); } | |
void ReadUnlock() { val.fetch_sub(1); } | |
void WriteLock() { LockInternal(writeLockBit, readerBits | writeLockBit); } | |
void WriteUnlock() { val.fetch_sub(writeLockBit); } | |
void Upgrade() | |
{ | |
val.fetch_add(upgradingBit - 1); | |
LockInternal(writeLockBit - upgradingBit, readerBits | writeLockBit); | |
} | |
}; |
実はUpgrade関数の内部で「昇格中」という別のステートが存在しています。
ステート移動 | 立てるビット | 待機するビット |
アイドル→リード | リード | ライト、昇格中 |
アイドル→ライト | ライト | リード、ライト |
アイドル→昇格中 | 昇格中 | 無し |
なぜ「昇格中」ステートが必要かを説明します。
ライトロック取得の為には他の全てのスレッドのリードロックの解除を待ちます。しかし、リードからライトに移行しようとする複数のスレッドがある時デッドロックが発生することになります。なぜならお互いがリードロックを手放さずに待ち続けているためです。
ここで、「昇格中」というステートを設けます。このステートに入ることで、自身のリードロックを解除し、他のスレッドのリードロックの解除も待ちます。すると残るのは「昇格中」ステートを持つスレッドのみになるので、順次ライトロックを取得できるようになります。
この方法は、デッドロックを避ける事ができますが、「Upgrade関数実行中」に他のスレッドがライトロックを取得できてしまうという欠点があります。Upgrade関数呼び出しの前後でロック対象のデータが変更されている可能性があるわけです。
リードロックを昇格可能なものと不可能なものに分けたreaders-writer lock
昇格の前後でデータが変更されたくない場合、「昇格可能な特殊なリードロックは一つだけ取得可能」とする方法があります。boostのupgrade_lockの実装のようなものです。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Atomic { | |
std::atomic<int> val = 0; | |
static const int writeLockBit = 0x40000000; | |
static const int upgradableBit = 0x00010000; | |
static const int readerBits = 0x0000ffff; | |
void LockInternal(int delta, int testBits) | |
{ | |
int expected; | |
do { | |
do { | |
expected = val; | |
} while (expected & testBits); | |
} while (!val.compare_exchange_weak(expected, expected + delta)); | |
} | |
public: | |
void ReadLock() { LockInternal(1, writeLockBit); } | |
void ReadUnlock() { val.fetch_sub(1); } | |
void ReadLockUpgradable() { LockInternal(upgradableBit, writeLockBit | upgradableBit); } | |
void ReadUnlockUpgradable() { val.fetch_sub(upgradableBit); } | |
void Upgrade() { LockInternal(writeLockBit - upgradableBit, readerBits | writeLockBit); } | |
void WriteLock() { LockInternal(writeLockBit, readerBits | writeLockBit); } | |
void WriteUnlock() { val.fetch_sub(writeLockBit); } | |
}; |
ReadLockUpgradableでロックを取得した場合、他のスレッドからのReadLockを妨げずに共存しますが、他のスレッドからReadLockUpgradableを同時に取得することはできません。1つのスレッドだけがReadLockUpgradableを取れるので、Upgradeの段階で競合が発生しないことと、Upgrade中に他のスレッドがライトロックを取得したりしないことが保障されます。
ステート移動 | 立てるビット | 待機するビット |
アイドル→リード | リード | ライト |
アイドル→ライト | ライト | 全ビット |
アイドル→昇格可能リード | 昇格可能リード | ライト、昇格可能リード |
リードロックを途切れなくライトロックに昇格可能、ただし失敗付き
全てのリードロックを昇格可能にしたいけれど、Upgrade中に別のWriteLockに割り込まれたくない場合の実装です。代償として、昇格の関数名がTryUpgradeとなっており、昇格が失敗する場合があります。
なぜ失敗するかというと、途切れない昇格を実現するために昇格中にリードロックを手放さないためです。二つ以上のスレッドが同時に昇格しようとするとお互いのリードロック解放を待つためにデッドロックします。そこで、最初に昇格しようとしたスレッドのみ昇格に成功させることにします。
万が一昇格に失敗した場合、次に出来ることはリードロックを解除することだけです。もし、どうしてもライトロックが欲しければ一度リードロックも解除して他のスレッドにライトロックを譲った後、改めてライトロックの取得を試みるしかありません。
この手法の短所は利用者が例外処理を書かなければならず、ただでさえバグを入れやすいマルチスレッド処理の実装がより複雑化しやすい事です。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Atomic { | |
std::atomic<int> val = 0; | |
static const int writeLockBit = 0x40000000; | |
static const int upgradingBit = 0x20000000; | |
static const int readerBits = 0x0000ffff; | |
void LockInternal(int delta, int testBits) | |
{ | |
int expected; | |
do { | |
do { | |
expected = val; | |
} while (expected & testBits); | |
} while (!val.compare_exchange_weak(expected, expected + delta)); | |
} | |
public: | |
void ReadLock() { LockInternal(1, writeLockBit | upgradingBit); } | |
void ReadUnlock() { val.fetch_sub(1); } | |
void WriteLock() { LockInternal(writeLockBit, 0xffffffff); } | |
void WriteUnlock() { val.fetch_sub(writeLockBit); } | |
bool TryUpgrade() | |
{ | |
int expected; | |
do { | |
expected = val; | |
if (expected & upgradingBit) { | |
return false; | |
} | |
} while (!val.compare_exchange_weak(expected, expected | upgradingBit)); | |
LockInternal(writeLockBit - upgradingBit - 1, writeLockBit | (readerBits & ~1)); | |
return true; | |
} | |
}; |
自身によって昇格中ビットを立てることに成功すればLockInternalでライトステートへ移行します。readerBitsの最下位ビットを待機対象から除外するのは、自身がリードロックを持っているためリードロックのカウンタが1の時は通過させなければいけないためです。
ステート移動 | 立てるビット | 待機するビット | 備考 |
アイドル→リード | リード | ライト、昇格中 | |
アイドル→ライト | ライト | 全ビット | |
リード→昇格中 | 昇格中 | 無し | 他のスレッドが昇格中ビットを取得したら失敗 |
昇格中→ライト | ライト | ライト、リード | リードロックの1つは自分が持っているので、最下位ビットを待機対象から除外 |
まとめ
リードロックからライトロックへの昇格が比較的簡単に実装できることがわかりましたが、問題は昇格の方式が色々あって、しかも一長一短があるために標準化しにくいということでした。
素直に昇格はあきらめてshared_timed_mutexで満足するのも手かもしれませんし、boostやIntel TBB等の有名なライブラリを使う場合はそのライブラリで提供された昇格機能に限って使うことにしてもいいかもしれません。
テストに使ったGitHubレポジトリ
https://github.com/yorung/ThreadTest