您的当前位置:首页正文

AKKA学习笔记

2024-12-17 来源:东饰资讯网

有两种方式可以获取Actor引用,一是创建Actor,而是查找Actor。

创建Actor

一个actor系统通常由ActorSystem.actorOf构造根Actor下的次一级Actor开始,然后使用新创建的Actor的ActorContext.actorOf 方法开始构造Actor树,这些方法返回新创建的Actor的引用。每个Actor可以直接访问其父Actor的引用,自身的引用和其子Actor的引用。

object Pi extends App {
 calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
 // actors and messages ...
 def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
   // Create an Akka system
   val system = ActorSystem("PiSystem")
    // create the result listener, which will print the result and shutdown the system
    val listener = system.actorOf(Props[Listener], name = "listener")
    // create the master
    val master = system.actorOf(Props(new Master(
      nrOfWorkers, nrOfMessages, nrOfElements, listener)),
      name = "master")
    // start the calculation
    master ! Calculate
  }
}

actor创建完成后创建路由规则。一般使用轮询规则RoundRobinRouter

val workerRouter = context.actorOf(

Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")

实现一个receive方法,应该在receive方法中定义一系列的case语句,基于标准Scala的模式匹配方法,来实现每一种消息的处理逻辑。

def receive = {
      case Calculate ⇒
        for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
      case Result(value) ⇒
        pi += value
        nrOfResults += 1
        if (nrOfResults == nrOfMessages) {
          // Send the result to the listener
          listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis)
          // Stops this actor and all its supervised children
          context.stop(self)
        }
    }

context.stop(self)中context的由来

implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(
        s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
   "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }

使用Actor路径查找Actor

此外也可以使用ActorSystem.actorSelection方法来查找一个Actor引用。

绝对路径和相对路径

除了ActorSystem.actorSelection方法之外,还有一个方法ActorContext.actorSelection,该方法可以应用到任意的Actor引用。它的作用和ActorSystem.actorSelection类似,但不同的是,它无需从根Actor开始查找,而是从该actor开始查找actor树。路径名如果使用”..”代表父Actor。 比如,你可以使用如下的路径来给某个特定的同级的其它Actor发送消息:

context.actorSelection("../brother") ! msg

决定路径也可以使用如下的方式来访问:(和文件路径类似)

context.actorSelection("/user/serviceA") ! msg

由于Actor系统和文件系统类似,因此可以使用文件系统类似的方法来匹配路径,比如你可以使用“*”通配符来匹配路径,比如你可以使用如下的方法,给所有匹配的Actor发送消息:

context.actorSelection("../*") ! msg

最上层次路径

在”/”根路径之下包含如下几个路径:

“/user” 为所有用户创建的Actor对象的最上级Actor,所有由ActorSystem.actorOf 创建的actor都在该Actor之下。

“/system” 为所有系统Actor的最上级Actor,比如日志服务。

“/deadLetter” 为死信处理Actor。

“/temp” 为所有临时创建的Actor的管理员Actor。

“/remote” 代表远程Actor的根管理员Actor。

附上路径小例子

package com.test

import akka.actor.Actor

import akka.actor.Props

import akka.actor.ActorSystem

class Actor1 extends Actor {

              override def preStart = {

                        context.actorOf(Props[Actor2], "actor2")

              }

              def receive = {

                      case _ =>

               }

}

class Actor2 extends Actor {

          override def preStart = { println("my path is: " + context.self.path) }

          def receive = {

                       case _ =>

            }

}

object FutureTesting {

      def main(args: Array[String]) {

          val sys = ActorSystem("test")

          implicit val ec = sys.dispatcher

           //Starting an Actor2 from system

           sys.actorOf(Props[Actor2], "actor2")

           //Starting an Actor1 from system which in turn starts an Actor2

          sys.actorOf(Props[Actor1], "actor1")

       }

}

会输出

my path is: akka://test/user/actor2

my path is: akka://test/user/actor1/actor2

路径表明了actor的父子关系。

显示全文