聊聊高并發(十一)實現幾種自旋鎖(五)
來源:程序員人生 發布時間:2014-11-17 08:07:52 閱讀次數:3591次
在聊聊高并發(9)實現幾種自旋鎖(4)中實現的限時隊列鎖是1個基于鏈表的限時無界隊列鎖,它的tryLock方法支持限時操作和中斷操作,無饑餓,保證了先來先服務的公平性,在多個同享狀態上自旋,是低爭用的。但是它的1個缺點是犧牲了空間,為了讓線程可以屢次使用鎖,每次Lock的時候都要new
QNode,并設置給線程,而不能重復使用原來的節點。
這篇說說限時有界隊列鎖,它采取了有界隊列,并且和ArrayLock不同,它不限制線程的個數。它的特點主要有
1. 采取有界隊列,減小了空間復雜度,L把鎖的空間復雜度在最壞的情況下(有界隊列長度為1)是O(L)
2. 非公平,不保證先來先服務,這也是1個很常見的需求
3. 由于是有界隊列,所以在高并發下存在高爭用,需要結合回退鎖來下降爭用
它的實現思路是:
1. 采取了1個有界的等待隊列,等待隊列的每一個節點都有多種狀態,每一個節點是可復用的
2. 采取了1個工作隊列,Tail指針指向工作隊列的隊尾節點。獲得和是不是鎖的操作是在工作隊列中的節點之間進行
3. 由因而限時隊列,并支持中斷,所以隊列中的節點都是可以退出隊列的
4. 算法分為3步,第1步是線程從有界的等待隊列中取得1個節點,并設置為WAITING,如果沒有取得,就自旋
第2步是把這個節點加入工作隊列,并取得前1個節點的指針
第3步是在前1個節點的狀態上自旋,直到取得鎖,并把前1個節點RELEASED狀態改成FREE
節點有4種狀態:
1. FREE: 表示節點可以被取得。當前1個節點釋放鎖,并設置狀態為RELEASED的時候,后1個節點需要把前1個節點設置為FREE。當節點在沒有進入工作隊列時超時,也被設置為FREE.
2. RELEASED:節點釋放鎖時設置為RELEASED,需要后續節點把它設置為FREE。如果是工作隊列的最后1個節點,那末RELEASED狀態的節點在第1步時可被取得
3. WAITING:表示取得了鎖或在工作隊列中等待鎖。是在第1步中被設置的,第1步的結果就是取得1個狀態為WAITING的節點
4. ABORTED:工作隊列中的節點超時或中斷的節點被設置為ABORTED。 隊尾的ABORTED節點可以被第1步取得,隊中的ABORTED節點不能被第1步獲得,只能把它的preNode指針指向它的前1個節點,表示它自己不能被獲得了
理解節點這4種狀態的轉變是理解這個設計的關鍵。這個設計比較復雜,從篇幅斟酌,這篇只介紹Lock和UnLock操作,下1篇說tryLock限時操作
1. 創建枚舉類型State來表示狀態
2. 創建QNode表示節點,使用1個AtomicReference原子變量指向它的State,以便于支持CAS操作。節點保護1個PreNode援用,只有節點被Aborted的時候才設置這個援用的值,表示跳過這個節點
3. 1個有界的QNode隊列,使用數組表示
4. MIN_BACKOFF和MAX_BACKOFF支持回退操作,單位是毫秒。這兩個值依賴于硬件性能,需要通過不斷測試來獲得最優值
5. 1個Random隨機數,來產生隨即的數組下標,非公平性需要
6. 1個AtomicStampedReference類型的原子變量作為隊尾指針tail。AtomicStampedReference采取了版本號來避免CAS操作的ABA問題。這很重要,由于有界等待隊列的節點會屢次進出工作隊列,所以可能產生同1個節點被前1個線程準備CAS操作時,已被后幾個線程進出了工作隊列,致使第1個線程拿到的QNode的狀態不正確。
7. lock實現分為3步,上文已說過了
8. unlock操作就是兩步,第1修改狀態通知其他線程獲得鎖。第2是設置自己的節點援用,以便下次可再次取得鎖而不影響其他線程的狀態。這里是把線程指向的節點狀態設置為RELEASED,同時設置線程的節點援用為空,這樣其他線程可以繼續使用這個節點。
package com.zc.lock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 限時有界隊列鎖,并且直接不限數量的線程
* 由因而有界的隊列,所以爭用劇烈,可以復合回退鎖的概念,減少高爭用
* 分為3步:
* 第1步是獲得1個State為FREE的節點,設置為WAITING
* 第2步是把這個節點加入隊列,獲得前1個節點
* 第3步是在前1個節點上自旋
*
* 優點是L個鎖的空間復雜度是O(L),而限時無界隊列鎖的空間復雜度為O(Ln)
* **/
public class CompositeLock implements TryLock{
enum State {FREE, WAITING, RELEASED, ABORTED}
class QNode{
AtomicReference<State> state = new AtomicReference<CompositeLock.State>(State.FREE);
volatile QNode preNode;
}
private final int SIZE = 10;
private final int MIN_BACKOFF = 1;
private final int MAX_BACKOFF = 10;
private Random random = new Random();
// 有界的QNode數組,表示隊列總共可使用的節點數
private QNode[] waitings = new QNode[10];
// 指向隊尾節點,使用AtomicStampedReference帶版本號的原子援用變量,可以避免ABA問題,由于這個算法實現需要對同1個Node屢次進出隊列
private AtomicStampedReference<QNode> tail = new AtomicStampedReference<CompositeLock.QNode>(null, 0);
// 每一個線程保護1個QNode援用
private ThreadLocal<QNode> myNode = new ThreadLocal<CompositeLock.QNode>(){
public QNode initialValue(){
return null;
}
};
public CompositeLock(){
for(int i = 0; i < SIZE; i ++){
waitings[i] = new QNode();
}
}
@Override
public void lock() {
Backoff backoff = new Backoff(MIN_BACKOFF, MAX_BACKOFF);
QNode node = waitings[random.nextInt(SIZE)];
// 第1步: 先取得數組里的1個Node,并把它的狀態設置為WAITING,否則就自旋
GETNODE:
while(true){
while(node.state.get() != State.FREE){
// 由于釋放鎖時只是設置了State為RELEASED,由后繼的線程來設置RELEASED為FREE
// 如果該節點已是隊尾節點了并且是RELEASED,那末可以直接可以被使用
// 獲得當前原子援用變量的版本號
int[] currentStamp = new int[1];
QNode tailNode = tail.get(currentStamp);
if(tailNode == node && tailNode.state.get() == State.RELEASED){
if(tail.compareAndSet(tailNode, null, currentStamp[0], currentStamp[0] + 1)){
node.state.set(State.WAITING);
break GETNODE;
}
}
}
if(node.state.compareAndSet(State.FREE, State.WAITING)){
break;
}
try {
backoff.backoff();
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted, stop to get the lock");
}
}
// 第2步加入隊列
int[] currentStamp = new int[1];
QNode preTailNode = null;
do{
preTailNode = tail.get(currentStamp);
}
// 如果沒加入隊列,就1直自旋
while(!tail.compareAndSet(preTailNode, node, currentStamp[0], currentStamp[0] + 1));
// 第3步在前1個節點自旋,如果前1個節點為null,證明是第1個加入隊列的節點
if(preTailNode != null){
// 在前1個節點的狀態自旋
while(preTailNode.state.get() != State.RELEASED){}
// 設置前1個節點的狀態為FREE,可以被其他線程使用
preTailNode.state.set(State.FREE);
}
// 將線程的myNode指向取得鎖的node
myNode.set(node);
return;
}
@Override
public void unlock() {
QNode node = myNode.get();
node.state.set(State.RELEASED);
myNode.set(null);
}
@Override
public boolean trylock(long time, TimeUnit unit)
throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
}
采取我們之前的驗證鎖正確性的測試用例來測試lock, unlock操作。
package com.zc.lock;
public class Main {
//private static Lock lock = new TimeCost(new ArrayLock(150));
private static Lock lock = new CompositeLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
結果是順序打印的,證明鎖是正確的,每次只有1個線程取得了鎖
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈