多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

中國最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2

scala教程

Scala 并發編程

閱讀 (2090)

Scala 并發編程

Runnable/Callable

Runnable 接口只有一個沒有返回值的方法。

trait Runnable {
  def run(): Unit
}
Callable與之類似,除了它有一個返回值

trait Callable[V] {
  def call(): V
}

線程

Scala 并發是建立在 Java 并發模型基礎上的。

在 Sun JVM 上,對 IO 密集的任務,我們可以在一臺機器運行成千上萬個線程。

一個線程需要一個 Runnable。你必須調用線程的 start 方法來運行 Runnable。

scala> val hello = new Thread(new Runnable {
  def run() {
    println("hello world")
  }
})
hello: java.lang.Thread = Thread[Thread-3,5,main]

scala> hello.start
hello world

當你看到一個類實現了 Runnable 接口,你就知道它的目的是運行在一個線程中。

單線程代碼

這里有一個可以工作但有問題的代碼片斷。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)

  def run() {
    while (true) {
      // This will block until a connection comes in.
      val socket = serverSocket.accept()
      (new Handler(socket)).run()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

每個請求都會回應當前線程的名稱,所以結果始終是 main 。

這段代碼的主要缺點是在同一時間,只有一個請求可以被相應!

你可以把每個請求放入一個線程中處理。只要簡單改變

(new Handler(socket)).run()

(new Thread(new Handler(socket))).start()

但如果你想重用線程或者對線程的行為有其他策略呢?

Executors

隨著 Java 5 的發布,它決定提供一個針對線程的更抽象的接口。

你可以通過 Executors 對象的靜態方法得到一個 ExecutorService 對象。這些方法為你提供了可以通過各種政策配置的 ExecutorService ,如線程池。

下面改寫我們之前的阻塞式網絡服務器來允許并發請求。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

  def run() {
    try {
      while (true) {
        // This will block until a connection comes in.
        val socket = serverSocket.accept()
        pool.execute(new Handler(socket))
      }
    } finally {
      pool.shutdown()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

這里有一個連接腳本展示了內部線程是如何重用的。

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

Futures

Future 代表異步計算。你可以把你的計算包裝在 Future 中,當你需要計算結果的時候,你只需調用一個阻塞的 get() 方法就可以了。一個 Executor 返回一個 Future 。如果使用 Finagle RPC 系統,你可以使用 Future 實例持有可能尚未到達的結果。

一個 FutureTask 是一個 Runnable 實現,就是被設計為由 Executor 運行的

val future = new FutureTask[String](new Callable[String]() {
  def call(): String = {
    searcher.search(target);
}})
executor.execute(future)

現在我需要結果,所以阻塞直到其完成。

val blockingResult = future.get()

參考 Scala School 的 Finagle 介紹中大量使用了 Future,包括一些把它們結合起來的不錯的方法。以及 Effective Scala 對 [Futures](http://twitter.github.com/effectivescala/ standard libraries-Futures) 的意見。

線程安全問題

class Person(var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

這個程序在多線程環境中是不安全的。如果有兩個線程有引用到同一個 Person 實例,并調用 set ,你不能預測兩個調用結束后 name 的結果。

在 Java 內存模型中,允許每個處理器把值緩存在 L1 或 L2 緩存中,所以在不同處理器上運行的兩個線程都可以有自己的數據視圖。

讓我們來討論一些工具,來使線程保持一致的數據視圖。

三種工具

同步

互斥鎖(Mutex)提供所有權語義。當你進入一個互斥體,你擁有它。同步是 JVM 中使用互斥鎖最常見的方式。在這個例子中,我們會同步 Person。

在 JVM 中,你可以同步任何不為 null 的實例。

class Person(var name: String) {
  def set(changedName: String) {
    this.synchronized {
      name = changedName
    }
  }
}

volatile

隨著 Java 5 內存模型的變化,volatile 和 synchronized 基本上是相同的,除了 volatile 允許空值。

synchronized 允許更細粒度的鎖。 而 volatile 則對每次訪問同步。

class Person(@volatile var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

AtomicReference

此外,在 Java 5 中還添加了一系列低級別的并發原語。 AtomicReference 類是其中之一

import java.util.concurrent.atomic.AtomicReference

class Person(val name: AtomicReference[String]) {
  def set(changedName: String) {
    name.set(changedName)
  }
}

這個成本是什么?

AtomicReference 是這兩種選擇中最昂貴的,因為你必須去通過方法調度(method dispatch)來訪問值。

volatile 和 synchronized 是建立在 Java 的內置監視器基礎上的。如果沒有資源爭用,監視器的成本很小。由于 synchronized 允許你進行更細粒度的控制權,從而會有更少的爭奪,所以 synchronized 往往是最好的選擇。

當你進入同步點,訪問 volatile 引用,或去掉 AtomicReferences 引用時, Java 會強制處理器刷新其緩存線從而提供了一致的數據視圖。

如果我錯了,請大家指正。這是一個復雜的課題,我敢肯定要弄清楚這一點需要一個漫長的課堂討論。

Java 5 的其他靈巧的工具

正如前面提到的 AtomicReference ,Java 5 帶來了許多很棒的工具。

CountDownLatch

CountDownLatch 是一個簡單的多線程互相通信的機制。

val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)

doneSignal.await()
println("both workers finished!")

先不說別的,這是一個優秀的單元測試。比方說,你正在做一些異步工作,并要確保功能完成。你的函數只需要 倒數計數(countDown) 并在測試中 等待(await) 就可以了。

AtomicInteger/Long

由于對 Int 和 Long 遞增是一個經常用到的任務,所以增加了 AtomicInteger 和 AtomicLong 。

AtomicBoolean

我可能不需要解釋這是什么。

ReadWriteLocks

讀寫鎖(ReadWriteLock) 使你擁有了讀線程和寫線程的鎖控制。當寫線程獲取鎖的時候讀線程只能等待。

讓我們構建一個不安全的搜索引擎

下面是一個簡單的倒排索引,它不是線程安全的。我們的倒排索引按名字映射到一個給定的用戶。

這里的代碼天真地假設只有單個線程來訪問。

注意使用了 mutable.HashMap 替代了默認的構造函數 this()

import scala.collection.mutable

case class User(name: String, id: Int)

class InvertedIndex(val userMap: mutable.Map[String, User]) {

  def this() = this(new mutable.HashMap[String, User])

  def tokenizeName(name: String): Seq[String] = {
    name.split(" ").map(_.toLowerCase)
  }

  def add(term: String, user: User) {
    userMap += term -> user
  }

  def add(user: User) {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

這里沒有寫如何從索引中獲取用戶。稍后我們會補充。

讓我們把它變為線程安全

在上面的倒排索引例子中,userMap 不能保證是線程安全的。多個客戶端可以同時嘗試添加項目,并有可能出現前面 Person 例子中的視圖錯誤。

由于 userMap 不是線程安全的,那我們怎樣保持在同一個時間只有一個線程能改變它呢?

你可能會考慮在做添加操作時鎖定 userMap。

def add(user: User) {
  userMap.synchronized {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

不幸的是,這個粒度太粗了。一定要試圖在互斥鎖以外做盡可能多的耗時的工作。還記得我說過如果不存在資源爭奪,鎖開銷就會很小嗎。如果在鎖代碼塊里面做的工作越少,爭奪就會越少。

def add(user: User) {
  // tokenizeName was measured to be the most expensive operation.
  val tokens = tokenizeName(user.name)

  tokens.foreach { term =>
    userMap.synchronized {
      add(term, user)
    }
  }
}

SynchronizedMap

我們可以通過 SynchronizedMap 特質將同步混入一個可變的 HashMap。

我們可以擴展現有的 InvertedIndex,提供給用戶一個簡單的方式來構建同步索引。

import scala.collection.mutable.SynchronizedMap

class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

如果你看一下其實現,你就會意識到,它只是在每個方法上加同步鎖來保證其安全性,所以它很可能沒有你希望的性能。

Java ConcurrentHashMap

Java 有一個很好的線程安全的 ConcurrentHashMap。值得慶幸的是,我們可以通過 JavaConverters 獲得不錯的 Scala 語義。

事實上,我們可以通過擴展老的不安全的代碼,來無縫地接入新的線程安全 InvertedIndex。

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._

class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
    extends InvertedIndex(userMap) {

  def this() = this(new ConcurrentHashMap[String, User] asScala)
}

讓我們加載 InvertedIndex

原始方式

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class FileRecordProducer(path: String) extends UserMaker {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      index.add(makeUser(line))
    }
  }
}

對于文件中的每一行,我們可以調用 makeUser 然后 add 到 InvertedIndex中。如果我們使用并發 InvertedIndex,我們可以并行調用 add 因為 makeUser 沒有副作用,所以我們的代碼已經是線程安全的了。

我們不能并行讀取文件,但我們可以并行構造用戶并且把它添加到索引中。

一個解決方案:生產者/消費者

異步計算的一個常見模式是把消費者和生產者分開,讓他們只能通過隊列(Queue) 溝通。讓我們看看如何將這個模式應用在我們的搜索引擎索引中。

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      queue.put(line)
    }
  }
}

// Abstract consumer
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
  def run() {
    while (true) {
      val item = queue.take()
      consume(item)
    }
  }

  def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()

// One thread for the producer
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
  def consume(t: String) = index.add(makeUser(t))
}

// Let's pretend we have 8 cores on this machine.
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

// Submit one consumer per core.
for (i <- i to cores) {
  pool.submit(new IndexerConsumer[String](index, q))
}
關閉
程序員人生
主站蜘蛛池模板: 亚洲乱码一二三四区国产 | 亚州视频一区 | 亚洲人成影院在线高清 | 久久精品中文字幕不卡一二区 | 国产日韩亚洲欧洲一区二区三区 | 久久久久99这里有精品10 | 欧美一级毛片久久精品 | 影音先锋成人影院 | 性欧美大战久久久久久久 | 上海一级毛片 | 波多野氏免费一区 | 精品国产91久久久久 | 亚洲乱码一二三四五六区 | 美国毛片亚洲社区在线观看 | 日本免费乱人伦在线观看 | 秋霞一级特黄真人毛片 | 中文字幕免费在线观看 | аⅴ中文在线天堂 | 亚洲日韩欧美一区二区在线 | 欧美天天综合 | 亚洲自拍偷拍视频 | 91久久亚洲精品一区二区 | 爽爽影院色黄网站在线观看 | 花蝴蝶亚洲一区二区三区 | 欧美18videosex性孕妇 | 国产区成人综合色在线 | 欧美一级做a爰片免费 | 欧美人与动性xxxxx杂性 | 日本最新伦中文字幕 | 天堂在线亚洲 | 日本一道本中文字幕 | 二区国产| 亚洲综合在线观看视频 | 情侣偷偷看的羞羞视频网站 | 国产成人一区二区三区小说 | 日韩一区二区三区四区五区 | 老司机福利在线播放 | 中文字幕乱码视频 | 永久免费毛片在线播放 | 国产成+人+综合+亚洲不卡 | 亚洲26uuuu最新地址 |