Differences

This shows you the differences between two versions of the page.

Link to this comparison view

blog:scala_distributed_mandelbrot [2009/11/27 17:53] (current)
Line 1: Line 1:
 +====== Scala distributed Mandelbrot ======
 +
 +My first foray into distributed programming with scala. ​ This worked out pretty well.  The whole system automatically load balances and recovers elegantly if a worker is killed half way through a render or indeed if another joins whilst a render is in progress.
 +
 +It has no bells and whistles with regards to zooming and panning so as keep the code as simple as possible.
 +
 +{{:​blog:​mandelbrot.png|}}
 +
 +Running a worker is as easy as.  The IP and port on the worker are the IP address of the GUI, and the LOCAL port which the worker will used for listening. ​ If you want to many worker on the same server give each a unique port number
 +<​code>​
 +# export CLASSPATH=.:/​usr/​local/​scala/​lib/​scala-library.jar
 +# cd MandelbrotDistributed/​build/​classes
 +# java mandelbrotdistributed/​MandelWorker 192.168.1.137 4000
 +"​another worker on the same server"​
 +# java mandelbrotdistributed/​MandelWorker 192.168.1.137 4001
 +</​code>​
 +
 +This is the entire code:
 +
 +<code scala>
 +/*
 + * Raster.scala
 + */
 +package mandelbrotdistributed
 +
 +@serializable
 +class Raster(xlen : Int) {
 +    var line = new Array[Int](xlen)
 +    def width = line.length
 +}
 +</​code>​
 +
 +<code scala>
 +/*
 + * Complex.scala
 + */
 +package mandelbrotdistributed
 +
 +class Complex (
 +    val a: Double,
 +      val b: Double)
 +{
 +    def abs() = Math.sqrt(a*a + b*b)
 +    // (a,b)(c,d) = (ac - bd, bc + ad)
 +    def *(that: Complex) = new Complex(a*that.a-b*that.b,​ b*that.a+a*that.b)
 +    def +(that: Complex) = new Complex(a+that.a,​ b+that.b)
 +    override def toString = a + " + " + b+"​i"​
 +}
 +</​code>​
 +
 +<code scala>
 +/*
 + * MandelWorker.scala
 + */
 +package mandelbrotdistributed
 +
 +import scala.actors.Actor._
 +import scala.actors.remote._
 +import scala.actors.remote.RemoteActor.select
 +import java.lang.Math
 +import java.net.InetAddress
 +
 +case class Register(me:​Locator)
 +case class RenderedLine(m:​Locator,​row:​Int,​raster:​Raster)
 +case class RenderAction(row:​Int,​width:​Int,​height:​Int,​level:​Int)
 +case class Tick
 +
 +class MandelActor(me : Locator, clientLoc : Locator) {
 +    RemoteActor.classLoader = getClass().getClassLoader()
 +
 +    actor {
 +        println("​Worker Ready"​)
 +        RemoteActor.alive(me.node.port)
 +        RemoteActor.register(me.name,​ self)
 +        loop {
 +            react {
 +                case RenderAction(row : Int, width : Int, height : Int, level : Int) =>
 +                    println("​Raster row "+row)
 +                    sender ! RenderedLine(me,​row,​ generate(width,​ height, row, level))
 +                case msg =>
 +                    println("​Unhandled message: " + msg)
 +            }
 +        }
 +    }
 +    ​
 +    // Register with the GUI every 5 secs (heartbeat)
 +    val client = select(clientLoc.node,​clientLoc.name)
 +    ActorPing.scheduleAtFixedRate(client,​ Register(me),​ 0L, 5000L)
 +
 +    def iterate(z:​Complex,​ c:Complex, level:Int, i:Int): (Complex,​Int) =
 +        if(z.abs > 2 || i > level) (z,i) else iterate(z*z+c,​ c, level, i+1)
 +    ​
 +    def generate(width : Int, height : Int, row : Int, level: Int) : Raster = {
 +        val raster = new Raster(width)
 +        val y = -1.5 + row*3.0/​height
 +        for { x0 <- 0 until width}
 +        {
 +            val x = -2.0 + x0*3.0/​width
 +            val (z, i) = iterate(new Complex(0,​0),​ new Complex(x,​y),​ level, 0)
 +            raster.line(x0) = if (z.abs < 2) 0 else i
 +        }
 +        raster
 +    }
 +}
 +
 +object MandelWorker {
 +    def main(args: Array[String]) :Unit = {
 +        // arg0: remote IP of where the MandelGUI program is running
 +        // arg1: a local port for the mandel worker
 +        val host = if (args.length >= 1) args(0) else "​127.0.0.1"​
 +        val port = if (args.length >= 2) args(1).toInt else 9010
 +        val gui = Locator(Node(host,​9999),​ '​MandelGUI)
 +        val me = new Locator(Node(InetAddress.getLocalHost.getHostAddress,​ port), '​MandelWorker)
 +        new MandelActor(me,​ gui)
 +    }
 +}
 +</​code>​
 +
 +<code scala>
 +/*
 + * MandelGui.scala
 + */
 +
 +package mandelbrotdistributed
 +
 +import scala.swing._
 +import scala.swing.event.ButtonClicked
 +import scala.actors.{Actor,​OutputChannel}
 +import scala.actors.Actor._
 +import scala.actors.remote.{RemoteActor,​Locator,​Node}
 +import scala.actors.remote.RemoteActor._
 +import scala.collection.mutable.Stack
 +import java.awt.image.BufferedImage
 +import java.awt.{Graphics,​Graphics2D}
 +import java.awt.Color
 +
 +object MandelGui extends SimpleGUIApplication {
 +    val img = new BufferedImage(480,​480,​BufferedImage.TYPE_INT_RGB)
 +    val mandel = Mandel
 +
 +    val drawing = new Panel {
 +        background = Color.black
 +        preferredSize = (img.getWidth,​ img.getHeight)
 +
 +        override def paintComponent(g : Graphics) : Unit = {
 +            g.asInstanceOf[Graphics2D].drawImage(img,​null,​0,​0)
 +        }
 +    }
 +
 +    def clearDrawing() {
 +        var g = img.getGraphics
 +        g.setColor(Color.BLACK)
 +        g.fillRect(0,​0,​img.getWidth,​img.getHeight)
 +    }
 +
 +    def top = new MainFrame {
 +        title="​Mandelbrot"​
 +        contents = new BorderPanel {
 +
 +            val control = new BoxPanel(Orientation.Horizontal) {
 +                val start = new Button {
 +                    text = "​Start"​
 +                }
 +                val stop = new Button {
 +                    text = "​Stop"​
 +                }
 +                val continue = new Button {
 +                    text = "​Continue"​
 +                }
 +                contents.append(start,​stop,​continue)
 +                listenTo(start,​stop,​continue)
 +                reactions += {
 +                    case ButtonClicked(`start`) =>
 +                        clearDrawing
 +                        Mandel.startup
 +                    case ButtonClicked(`stop`) =>
 +                        Mandel.shutdown
 +                    case ButtonClicked(`continue`) =>
 +                        Mandel.process
 +                }
 +            }            ​
 +            drawing
 +
 +            import BorderPanel.Position._
 +            layout(control) = North
 +            layout(drawing) = Center ​                                  
 +        }
 +    }
 +
 +    object WorkerMgmt {
 +        private var allWorkers : List[Worker] = List()
 +        val defaultTTL = 6  //sweeps a worker can survive without a register
 +
 +        def foreach(op: Worker => Unit) = allWorkers.foreach(op)
 +
 +        def findWorkerForRow(row : int) : Worker = {
 +            allWorkers.filter(w=>​ w.row == row)(0)
 +        }
 +
 +        def find(m:​Locator) : Worker = {
 +            if (allWorkers.isEmpty) null
 +            else {
 +                val list = allWorkers.filter(w=>​ w.loc == m)
 +                if(list.isEmpty) null else list(0)
 +            }
 +        }
 +
 +        def register(m:​Locator) : Worker = {
 +            var worker = find(m)
 +            if (worker == null) {
 +                worker = new Worker(m, defaultTTL)
 +                allWorkers = worker :: allWorkers
 +            }
 +            worker.keepAlive
 +        }
 +
 +        def sweep() = {
 +            allWorkers.foreach(_.decTTL)
 +            val (ok, expired) = allWorkers span (_.ttl >= 0)
 +            allWorkers = ok
 +            expired
 +        }
 +    }
 +
 +    class Worker(val loc:​Locator,​ val defaultTTL:​int) {
 +        var row : Int = 0
 +        var ttl:Int = 0
 +        val actor = select(loc.node,​loc.name)
 +        val iterationDepth = 2048
 +
 +        def decTTL = ttl -= 1
 +
 +        def keepAlive() = {
 +            ttl = defaultTTL
 +            this
 +        }
 +
 +        def render(row : Int) {
 +            this.row = row
 +            actor ! RenderAction(row,​ img.getWidth,​ img.getHeight,​ iterationDepth)
 +        }
 +
 +        override def toString = loc.toString
 +    }
 +
 +    object Mandel {        ​
 +
 +        object State extends Enumeration {
 +            val Running, Stopped = Value
 +        }
 +        private var state = State.Stopped ​       ​
 +        private var workQueue : Stack[Int] = new Stack()
 +
 +        val draw = actor {
 +            Actor.loop {
 +                react {
 +                    case (row:Int, raster:​Raster) =>
 +                        for(x <- 0 until raster.width) {
 +                            val shade = raster.line(x) % 256
 +                            val rgb = new Color(shade,​shade,​shade).getRGB
 +                            img.setRGB(x,​ row, rgb)
 +                        }
 +                        drawing.repaint
 +                }
 +            }
 +        }
 +
 +        val a = actor {
 +            RemoteActor.alive(9999) // Port
 +            RemoteActor.register('​MandelGUI,​ Actor.self)
 +            // Sweep non responsive workers every 2sec
 +            ActorPing.scheduleAtFixedRate(Actor.self,​ Tick, 0L, 2000L)
 +
 +            Actor.loop {
 +                react {
 +                    case "​StartWork"​ =>
 +                        //​print("​Start work")
 +                        WorkerMgmt.foreach(farmWork)
 +                    case RenderedLine(m:​Locator,​row:​int,​ raster:​Raster) =>
 +                        draw ! (row, raster) // Get it on the screen
 +                        if(state == State.Running) farmWork(WorkerMgmt.find(m))
 +                    case Register(m:​Locator) =>
 +                        println("​Register "+m)
 +                        // Register and assign it work; Immediate load balance
 +                        farmWork(WorkerMgmt.register(m))
 +                    case Tick =>
 +                        for(w <- WorkerMgmt.sweep) {
 +                            println("​Unregister "+w)
 +                            workQueue.push(w.row) ​ // push their row
 +                        }
 +                    case msg =>
 +                        println("​Unhandled message: "+msg)
 +                }
 +            }
 +        }
 +
 +        def farmWork(worker : Worker) {
 +            if(workQueue.isEmpty)
 +                shutdown
 +            else {
 +                if(worker != null) worker.render(workQueue.pop)
 +            }
 +        }
 +
 +        def shutdown() {
 +            state = State.Stopped
 +        }
 +
 +        def startup() {
 +            if(state == State.Stopped) {                ​
 +                workQueue.clear
 +                for(row <- 0 to img.getHeight-1) workQueue.push(row)
 +                process
 +            }
 +        }
 +
 +        def process() {
 +            state = State.Running ​               ​
 +            a ! "​StartWork"​
 +        }
 +    }
 +}
 +</​code>​
 +
 +<code scala>
 +/*
 + * ActorPing.scala
 + */
 +
 +package mandelbrotdistributed
 +
 +import java.util.concurrent._
 +import scala.actors._
 +
 +// =============================================
 +/**
 + * Pings an actor every X seconds.
 + *
 + * Borrowed from Scala TIM sample; which borrows from:
 + *
 + * Code based on code from the ActorPing class in the /lift/ repository (http://​liftweb.net).
 + * Copyright:
 + *
 + * (c) 2007 WorldWide Conferencing,​ LLC
 + * Distributed under an Apache License
 + * http://​www.apache.org/​licenses/​LICENSE-2.0
 + */
 +object ActorPing {
 +
 +  def scheduleAtFixedRate(to:​ AbstractActor,​ msg: Any, initialDelay:​ Long, period: Long): ScheduledFuture[T] forSome {type T} = {
 +    val cmd = new Runnable {
 +      def run {
 +        // println("​***ActorPing Event***"​);​
 +        try {
 +          to ! msg
 +        }
 +        catch {
 +        case t:Throwable => t.printStackTrace
 +        }
 +      }
 +    }
 +    service.scheduleAtFixedRate(cmd,​ initialDelay,​ period, TimeUnit.MILLISECONDS)
 +  }
 +
 +  private val service = Executors.newSingleThreadScheduledExecutor(threadFactory)
 +
 +  private object threadFactory extends ThreadFactory {
 +    val threadFactory = Executors.defaultThreadFactory()
 +    def newThread(r:​ Runnable) : Thread = {
 +      val d: Thread = threadFactory.newThread(r)
 +      d setName "​ActorPing"​
 +      d setDaemon true
 +      d
 +    }
 +  }
 +}
 +</​code>​
 +
 +Entire work as NetBeans project : {{:​blog:​mandelbrotdistributed.zip|}}
 +
 +{{tag>​scala programming}}