no way to compare when less than two revisions
Differences
This shows you the differences between two versions of the page.
— | blog:scala_distributed_mandelbrot [2009/11/27 17:53] (current) – created - external edit 127.0.0.1 | ||
---|---|---|---|
Line 1: | Line 1: | ||
+ | ====== Scala distributed Mandelbrot ====== | ||
+ | |||
+ | My first foray into distributed programming with scala. | ||
+ | |||
+ | It has no bells and whistles with regards to zooming and panning so as keep the code as simple as possible. | ||
+ | |||
+ | {{: | ||
+ | |||
+ | Running a worker is as easy as. The IP and port on the worker are the IP address of the GUI, and the LOCAL port which the worker will used for listening. | ||
+ | < | ||
+ | # export CLASSPATH=.:/ | ||
+ | # cd MandelbrotDistributed/ | ||
+ | # java mandelbrotdistributed/ | ||
+ | " | ||
+ | # java mandelbrotdistributed/ | ||
+ | </ | ||
+ | |||
+ | This is the entire code: | ||
+ | |||
+ | <code scala> | ||
+ | /* | ||
+ | * Raster.scala | ||
+ | */ | ||
+ | package mandelbrotdistributed | ||
+ | |||
+ | @serializable | ||
+ | class Raster(xlen : Int) { | ||
+ | var line = new Array[Int](xlen) | ||
+ | def width = line.length | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | <code scala> | ||
+ | /* | ||
+ | * Complex.scala | ||
+ | */ | ||
+ | package mandelbrotdistributed | ||
+ | |||
+ | class Complex ( | ||
+ | val a: Double, | ||
+ | val b: Double) | ||
+ | { | ||
+ | def abs() = Math.sqrt(a*a + b*b) | ||
+ | // (a,b)(c,d) = (ac - bd, bc + ad) | ||
+ | def *(that: Complex) = new Complex(a*that.a-b*that.b, | ||
+ | def +(that: Complex) = new Complex(a+that.a, | ||
+ | override def toString = a + " + " + b+" | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | <code scala> | ||
+ | /* | ||
+ | * MandelWorker.scala | ||
+ | */ | ||
+ | package mandelbrotdistributed | ||
+ | |||
+ | import scala.actors.Actor._ | ||
+ | import scala.actors.remote._ | ||
+ | import scala.actors.remote.RemoteActor.select | ||
+ | import java.lang.Math | ||
+ | import java.net.InetAddress | ||
+ | |||
+ | case class Register(me: | ||
+ | case class RenderedLine(m: | ||
+ | case class RenderAction(row: | ||
+ | case class Tick | ||
+ | |||
+ | class MandelActor(me : Locator, clientLoc : Locator) { | ||
+ | RemoteActor.classLoader = getClass().getClassLoader() | ||
+ | |||
+ | actor { | ||
+ | println(" | ||
+ | RemoteActor.alive(me.node.port) | ||
+ | RemoteActor.register(me.name, | ||
+ | loop { | ||
+ | react { | ||
+ | case RenderAction(row : Int, width : Int, height : Int, level : Int) => | ||
+ | println(" | ||
+ | sender ! RenderedLine(me, | ||
+ | case msg => | ||
+ | println(" | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | | ||
+ | // Register with the GUI every 5 secs (heartbeat) | ||
+ | val client = select(clientLoc.node, | ||
+ | ActorPing.scheduleAtFixedRate(client, | ||
+ | |||
+ | def iterate(z: | ||
+ | if(z.abs > 2 || i > level) (z,i) else iterate(z*z+c, | ||
+ | | ||
+ | def generate(width : Int, height : Int, row : Int, level: Int) : Raster = { | ||
+ | val raster = new Raster(width) | ||
+ | val y = -1.5 + row*3.0/ | ||
+ | for { x0 <- 0 until width} | ||
+ | { | ||
+ | val x = -2.0 + x0*3.0/ | ||
+ | val (z, i) = iterate(new Complex(0, | ||
+ | raster.line(x0) = if (z.abs < 2) 0 else i | ||
+ | } | ||
+ | raster | ||
+ | } | ||
+ | } | ||
+ | |||
+ | object MandelWorker { | ||
+ | def main(args: Array[String]) :Unit = { | ||
+ | // arg0: remote IP of where the MandelGUI program is running | ||
+ | // arg1: a local port for the mandel worker | ||
+ | val host = if (args.length >= 1) args(0) else " | ||
+ | val port = if (args.length >= 2) args(1).toInt else 9010 | ||
+ | val gui = Locator(Node(host, | ||
+ | val me = new Locator(Node(InetAddress.getLocalHost.getHostAddress, | ||
+ | new MandelActor(me, | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | <code scala> | ||
+ | /* | ||
+ | * MandelGui.scala | ||
+ | */ | ||
+ | |||
+ | package mandelbrotdistributed | ||
+ | |||
+ | import scala.swing._ | ||
+ | import scala.swing.event.ButtonClicked | ||
+ | import scala.actors.{Actor, | ||
+ | import scala.actors.Actor._ | ||
+ | import scala.actors.remote.{RemoteActor, | ||
+ | import scala.actors.remote.RemoteActor._ | ||
+ | import scala.collection.mutable.Stack | ||
+ | import java.awt.image.BufferedImage | ||
+ | import java.awt.{Graphics, | ||
+ | import java.awt.Color | ||
+ | |||
+ | object MandelGui extends SimpleGUIApplication { | ||
+ | val img = new BufferedImage(480, | ||
+ | val mandel = Mandel | ||
+ | |||
+ | val drawing = new Panel { | ||
+ | background = Color.black | ||
+ | preferredSize = (img.getWidth, | ||
+ | |||
+ | override def paintComponent(g : Graphics) : Unit = { | ||
+ | g.asInstanceOf[Graphics2D].drawImage(img, | ||
+ | } | ||
+ | } | ||
+ | |||
+ | def clearDrawing() { | ||
+ | var g = img.getGraphics | ||
+ | g.setColor(Color.BLACK) | ||
+ | g.fillRect(0, | ||
+ | } | ||
+ | |||
+ | def top = new MainFrame { | ||
+ | title=" | ||
+ | contents = new BorderPanel { | ||
+ | |||
+ | val control = new BoxPanel(Orientation.Horizontal) { | ||
+ | val start = new Button { | ||
+ | text = " | ||
+ | } | ||
+ | val stop = new Button { | ||
+ | text = " | ||
+ | } | ||
+ | val continue = new Button { | ||
+ | text = " | ||
+ | } | ||
+ | contents.append(start, | ||
+ | listenTo(start, | ||
+ | reactions += { | ||
+ | case ButtonClicked(`start`) => | ||
+ | clearDrawing | ||
+ | Mandel.startup | ||
+ | case ButtonClicked(`stop`) => | ||
+ | Mandel.shutdown | ||
+ | case ButtonClicked(`continue`) => | ||
+ | Mandel.process | ||
+ | } | ||
+ | } | ||
+ | drawing | ||
+ | |||
+ | import BorderPanel.Position._ | ||
+ | layout(control) = North | ||
+ | layout(drawing) = Center | ||
+ | } | ||
+ | } | ||
+ | |||
+ | object WorkerMgmt { | ||
+ | private var allWorkers : List[Worker] = List() | ||
+ | val defaultTTL = 6 //sweeps a worker can survive without a register | ||
+ | |||
+ | def foreach(op: Worker => Unit) = allWorkers.foreach(op) | ||
+ | |||
+ | def findWorkerForRow(row : int) : Worker = { | ||
+ | allWorkers.filter(w=> | ||
+ | } | ||
+ | |||
+ | def find(m: | ||
+ | if (allWorkers.isEmpty) null | ||
+ | else { | ||
+ | val list = allWorkers.filter(w=> | ||
+ | if(list.isEmpty) null else list(0) | ||
+ | } | ||
+ | } | ||
+ | |||
+ | def register(m: | ||
+ | var worker = find(m) | ||
+ | if (worker == null) { | ||
+ | worker = new Worker(m, defaultTTL) | ||
+ | allWorkers = worker :: allWorkers | ||
+ | } | ||
+ | worker.keepAlive | ||
+ | } | ||
+ | |||
+ | def sweep() = { | ||
+ | allWorkers.foreach(_.decTTL) | ||
+ | val (ok, expired) = allWorkers span (_.ttl >= 0) | ||
+ | allWorkers = ok | ||
+ | expired | ||
+ | } | ||
+ | } | ||
+ | |||
+ | class Worker(val loc: | ||
+ | var row : Int = 0 | ||
+ | var ttl:Int = 0 | ||
+ | val actor = select(loc.node, | ||
+ | val iterationDepth = 2048 | ||
+ | |||
+ | def decTTL = ttl -= 1 | ||
+ | |||
+ | def keepAlive() = { | ||
+ | ttl = defaultTTL | ||
+ | this | ||
+ | } | ||
+ | |||
+ | def render(row : Int) { | ||
+ | this.row = row | ||
+ | actor ! RenderAction(row, | ||
+ | } | ||
+ | |||
+ | override def toString = loc.toString | ||
+ | } | ||
+ | |||
+ | object Mandel { | ||
+ | |||
+ | object State extends Enumeration { | ||
+ | val Running, Stopped = Value | ||
+ | } | ||
+ | private var state = State.Stopped | ||
+ | private var workQueue : Stack[Int] = new Stack() | ||
+ | |||
+ | val draw = actor { | ||
+ | Actor.loop { | ||
+ | react { | ||
+ | case (row:Int, raster: | ||
+ | for(x <- 0 until raster.width) { | ||
+ | val shade = raster.line(x) % 256 | ||
+ | val rgb = new Color(shade, | ||
+ | img.setRGB(x, | ||
+ | } | ||
+ | drawing.repaint | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | val a = actor { | ||
+ | RemoteActor.alive(9999) // Port | ||
+ | RemoteActor.register(' | ||
+ | // Sweep non responsive workers every 2sec | ||
+ | ActorPing.scheduleAtFixedRate(Actor.self, | ||
+ | |||
+ | Actor.loop { | ||
+ | react { | ||
+ | case " | ||
+ | // | ||
+ | WorkerMgmt.foreach(farmWork) | ||
+ | case RenderedLine(m: | ||
+ | draw ! (row, raster) // Get it on the screen | ||
+ | if(state == State.Running) farmWork(WorkerMgmt.find(m)) | ||
+ | case Register(m: | ||
+ | println(" | ||
+ | // Register and assign it work; Immediate load balance | ||
+ | farmWork(WorkerMgmt.register(m)) | ||
+ | case Tick => | ||
+ | for(w <- WorkerMgmt.sweep) { | ||
+ | println(" | ||
+ | workQueue.push(w.row) | ||
+ | } | ||
+ | case msg => | ||
+ | println(" | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | def farmWork(worker : Worker) { | ||
+ | if(workQueue.isEmpty) | ||
+ | shutdown | ||
+ | else { | ||
+ | if(worker != null) worker.render(workQueue.pop) | ||
+ | } | ||
+ | } | ||
+ | |||
+ | def shutdown() { | ||
+ | state = State.Stopped | ||
+ | } | ||
+ | |||
+ | def startup() { | ||
+ | if(state == State.Stopped) { | ||
+ | workQueue.clear | ||
+ | for(row <- 0 to img.getHeight-1) workQueue.push(row) | ||
+ | process | ||
+ | } | ||
+ | } | ||
+ | |||
+ | def process() { | ||
+ | state = State.Running | ||
+ | a ! " | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | <code scala> | ||
+ | /* | ||
+ | * ActorPing.scala | ||
+ | */ | ||
+ | |||
+ | package mandelbrotdistributed | ||
+ | |||
+ | import java.util.concurrent._ | ||
+ | import scala.actors._ | ||
+ | |||
+ | // ============================================= | ||
+ | /** | ||
+ | * Pings an actor every X seconds. | ||
+ | * | ||
+ | * Borrowed from Scala TIM sample; which borrows from: | ||
+ | * | ||
+ | * Code based on code from the ActorPing class in the /lift/ repository (http:// | ||
+ | * Copyright: | ||
+ | * | ||
+ | * (c) 2007 WorldWide Conferencing, | ||
+ | * Distributed under an Apache License | ||
+ | * http:// | ||
+ | */ | ||
+ | object ActorPing { | ||
+ | |||
+ | def scheduleAtFixedRate(to: | ||
+ | val cmd = new Runnable { | ||
+ | def run { | ||
+ | // println(" | ||
+ | try { | ||
+ | to ! msg | ||
+ | } | ||
+ | catch { | ||
+ | case t:Throwable => t.printStackTrace | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | service.scheduleAtFixedRate(cmd, | ||
+ | } | ||
+ | |||
+ | private val service = Executors.newSingleThreadScheduledExecutor(threadFactory) | ||
+ | |||
+ | private object threadFactory extends ThreadFactory { | ||
+ | val threadFactory = Executors.defaultThreadFactory() | ||
+ | def newThread(r: | ||
+ | val d: Thread = threadFactory.newThread(r) | ||
+ | d setName " | ||
+ | d setDaemon true | ||
+ | d | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | Entire work as NetBeans project : {{: | ||
+ | |||
+ | {{tag> | ||