Scala Actors – Tutorial Breve

Author: Philipp Haller
Spanish version: Juanjo Bazán (jjbazan .at. gmail .dot. com)
Date: May 24, 2007

Introducción

Con la llegada de los procesadores multi-core, la programación concurrente se está convirtiendo en indispensable. El instrumento principal que proporciona Scala para la concurrencia son los Actores(o actors). Los Actors son básicamente procesos concurrentes que se comunican a través del intercambio de mensajes. Puede verse también como un tipo de objetos activos en los que invocar a uno de sus métodos se corresponde con el envío de un mensaje.

La librería de Actors de Scala proporciona envío de mensajes tanto asíncronos con síncronos(estos últimos se implementan mediante el intercambio de varios mensajes asíncronos). Es más, los actores pueden comunicarse utilizando futuros, en los que las peticiones se tratan de manera asíncrona, pero devuelven una representación (un futuro) que permite esperar a la respuesta.

Este tutorial esta diseñado principalmente como un recorrido a través de varios ejemplos de programas completos que pueden ser compilados y ejecutados utilizando Scala versión 2.4 o posterior.

Primer Ejemplo

Nuestro primer ejemplo está formado por dos actores que intercambian una serie de mensajes y luego finalizan. El primer actor envía mensajes "ping" al segundo actor, que como respuesta envía mensajes "pong" (por cada mensaje "ping" recibido, un mensaje "pong").

Comenzamos definiendo los mensajes enviados y recibidos por nuestros actores. En este caso podemos usar objetos singleton(en programas más avanzados, los mensajes en general se parametrizan). Como queremos saber el tipo de cada mensaje mediante comparación de patrones, cada mensaje es un objeto a comparar con case:

case object Ping
case object Pong
case object Stop

El actor ping comienza el intercambio de mensajes enviando un mensjae Ping al actor pong. El mensaje Pong es la respuesta del actor pong. Cuando el actor ping ha enviado un cierto número de mensajes Ping , envía un mensaje Stop al actor pong.

Todas las clases, objetos y traits(el equivalente a los interfaces de Java) de la librería de actores de Scala se encuentran en el paquete scala.actors. Desde este paquete importamos la clase Actor que vamos a extender para definir nuestros propios actores. Además, importamos todos los miembros del objeto Actor porque éste contiene muchas acciones útiles sobre actores:

import scala.actors.Actor
import scala.actors.Actor._

Los actores son objetos normales que se crean instanciando subclases de la clase Actor. Definimos el comportamiento de los actores ping creando una subclase de Actor e implementando su método abstracto act:

class Ping(count: int, pong: Actor) extends Actor {
  def act() {
    var pingsLeft = count - 1
    pong ! Ping
    while (true) {
      receive {
        case Pong =>
          if (pingsLeft % 1000 == 0)
            Console.println("Ping: pong")
          if (pingsLeft > 0) {
            pong ! Ping
            pingsLeft -= 1
          } else {
            Console.println("Ping: stop")
            pong ! Stop
            exit()
          }
      }
    }
  }
}

El constructor recibe como argumentos la cantidad de mensajes Ping a ser enviados y el actor pong. La llamada al método receive dentro del bucle infinito deja en suspenso al actor hasta que que se le envía un mensaje Pong. En ese caso el mensaje se elimina de la cola de entrada y se ejecuta la acción correspondiente(a la derecha de la flecha).

En el caso de que pingsLeft sea mayor que cero enviamos un mensaje a pong utilizando el operador !, y decrementamos el contador pingsLeft. Si el contador pingsLeft ha llegado a cero, enviamos un mensaje Stop a pong, y terminamos la ejecución del actor actual llamando a exit().

La clase para nuestro actor pong se define de manera similar:

class Pong extends Actor {
  def act() {
    var pongCount = 0
    while (true) {
      receive {
        case Ping =>
          if (pongCount % 1000 == 0)
            Console.println("Pong: ping "+pongCount)
          sender ! Pong
          pongCount = pongCount + 1
        case Stop =>
          Console.println("Pong: stop")
          exit()
      }
    }
  }
}

Hay un punto interesante que señalar: cuando se recibe un mensaje Ping, se envia un mensaje Pong al actor sender. ¡Pero este actor no está definido en ninguna parte de nuestra clase! En realidad es un método de la clase Actor . Utilizando sender, uno puede hacer referencia al actor que envió el último mensaje recibido por el actor actual. Así se evita tener que pasar explicitamente el actor remitente como argumento a los mensajes.

Una vez definidas nuestras clases actor, estamos listos para crear la aplicación Scala que los use:

object pingpong extends Application {
  val pong = new Pong
  val ping = new Ping(100000, pong)
  ping.start
  pong.start
}

Análogamente a los Threads(también llamados hilos o hebras) de Java, los actores se ponen en marcha llamando a su método start.

A ejecutar!

Este ejemplo completo se incluye en la distribución de Scala, en doc/scala-devel/scala/examples/actors/pingpong.scala. Así es como se compila y se ejecuta:

$ scalac pingpong.scala
$ scala -cp . examples.actors.pingpong
Pong: ping 0
Ping: pong
Pong: ping 1000
Ping: pong
Pong: ping 2000
...
Ping: stop
Pong: stop

¡Ahora sin hilos!

Los Actors se ejecutan sobre un pool de threads/hilos. Inicialmente hay 4 threads disponibles. El pool va creciendo de tamaño si todos los threads están bloqueados pero todavía hay tareas pendientes de ser procesadas. Idealmente el tamaño del pool se corresponde con el número de procesadores de la máquina.

Cuando los actores llaman a operaciones que bloquean hilos de ejecución, como receive (o incluso wait), el hilo que está ejecutando al actor actual (self) se bloquea. Por lo que básicamente el actor queda representado por un hilo bloqueado. Dependiendo del número de actores que quieras utilizar, deberías evitar estas situaciones, puesto que la mayoria de máquinas virtuales de Java sobre hardware estándar no pueden manejar más de unos pocos miles de hilos.

Las operaciones que bloquean hebras se pueden evitar utilizando react para esperar mensaje nuevos(la versión basada en eventos de receive). De todas maneras hay un precio(en general pequeño) que pagar: react no devuelve valor de respuesta. Lo que en la práctica significa que al acabar la reacción a la llegada de un mensaje, uno ha de llamar a alguna función que contenga el resto de la computación a realizar por parte del actor. Nótese que usar react dentro de un bucle while no funciona. Pero, como un bucle es algo común, la librería proporciona soporte para ello en la forma de la función loop. Que se puede usar así:

loop {
  react {
    case A => ...
    case B => ...
  }
}

Las llamadas a react se pueden anidar, lo que permite recibir una secuencia de varios mensajes, así:

react {
  case A => ...
  case B => react { // Si recibimos un mensaje B también queremos un C
    case C => ...
  }
}

Para hacer nuestros actores ping y pong libres de hilos, basta simplemente con reemplazar while(true) con loop, y receive con react. Por ejemplo, aquí vemos el método act modificado de nuestro actor pong:

def act() {
  var pongCount = 0
  loop {
    react {
      case Ping =>
        if (pongCount % 1000 == 0)
          Console.println("Pong: ping "+pongCount)
        sender ! Pong
        pongCount = pongCount + 1
      case Stop =>
        Console.println("Pong: stop")
        exit()
    }
  }
}

Segundo Ejemplo

En este ejemplo vamos a escribir una abstracción de productores, que proporcione un interfaz iterador estándar para obtener una serie de valores que se vayan produciendo

Los productores específicos se definen implementando un método abstracto produceValues. Los valores individuales se generan mediante el método produce. Ambos métodos se heredan de la clase Producer. Por ejemplo, un productor que genera valores contenidos en un arbol binario en preorden, se puede definir así:

class PreOrder(n: Tree) extends Producer[int] {
  def produceValues = traverse(n)
  def traverse(n: Tree) {
    if (n != null) {
      produce(n.elem)
      traverse(n.left)
      traverse(n.right)
    }
  }
}

Los productores se implementan como dos actores, un actor productor(producer) y un actor coordinador(coordinator). Aquí vemos como podemos implementar el actor productor:

abstract class Producer[T] {
  protected def produceValues: unit

  protected def produce(x: T) {
    coordinator ! Some(x)
    receive { case Next => }
  }

  private val producer: Actor = actor {
    receive {
      case Next =>
        produceValues
        coordinator ! None
    }
  }
  ...
}

Vease como hemos definido al actor producer.En esta ocasión no nos hemos molestado en crear una subclase de Actor y dar código a su método act. En vez de eso, simplemente definimos el comportamiento de actor directamente usando la función actor. ¡Mucho más conciso! Es más, los actores definidos mediante la función actor comienzan automáticamente, sin necesidad de invocar su método start.

Así pues, ¿cómo funciona el productor? Cuando recibe un mensaje Next ejecuta el método(abstracto) produceValues que a su vez llama al método produce. Esto se traduce en el envío de una serie de valores, incluidos en mensajes de tipo Some al coordinador. La secuencia termina con un mensaje None. Some y None son los dos casos posibles de la clase estándar de Scala Option.

El coordinador sincroniza las peticiones de los clientes y los valores que llegan desde el productor. Lo podemos implementar así:

private val coordinator: Actor = actor {
  loop {
    react {
      case Next =>
        producer ! Next
        reply {
          receive { case x: Option[_] => x }
        }
      case Stop => exit('stop)
    }
  }
}

Notese cómo cuando se recibe un mensaje Next usamos reply para devolver el valor de Option recibido desde el productor al actor que realizó la petición. Explicaremos esto en la siguiente sección...

El interfaz Iterator

Queremos crear productores que se puedan utilizar como un iterador normal. Para ello implementamos un método iterator que devuelve(¡sorpresa!) un iterador. Sus métodos hasNext and next envían mensajes al actor coordinador para cumplir su tarea. Vemoslo:

def iterator = new Iterator[T] {
  private var current: Any = Undefined
  private def lookAhead = {
    if (current == Undefined) current = coordinator !? Next
    current
  }

  def hasNext: boolean = lookAhead match {
    case Some(x) => true
    case None => { coordinator ! Stop; false }
  }

  def next: T = lookAhead match {
    case Some(x) => current = Undefined; x.asInstanceOf[T]
  }
}

Usamos un método privado lookAhead para implementar la lógica del iterador. Mientras no se conozca el siguiente valor, la variable current tiene como valor Undefined , que no es más que un objeto que reserva el sitio en espera de ser utilizado:

private val Undefined = new Object

Lo interesante está en el método lookAhead. Cuando el valor de current es Undefined significa que tenemos que conseguir el siguiente valor. Para ello usamos el operado de envíor de mensaje síncrono !?. Éste envía el mensaje Next a coordinator, pero en vez de devolver el control como un envío de mensaje normal(asíncrono), se queda esperando una respuesta del coordinador. Esta respuesta es el valor que devuelve !?. Un mensaje que se envía con !? se responde con reply. Fíjate que enviar un mensaje a sender no nos vale, porque !? espera recibir un mensaje por un canal de respuesta privado en vez de por la cola de entrada. Esto es necesario para diferenciar las respuestas "verdaderas" de las "falsas" respuestas que provengas de mensajes anteriores que pueda haber en la cola de entrada.

El ejemplo de los productores también se incluye en la distribución de Scala en doc/scala-devel/scala/examples/actors/producers.scala.