blog:scala_distributed_mandelbrot

no way to compare when less than two revisions

Differences

This shows you the differences between two versions of the page.


blog:scala_distributed_mandelbrot [2009/11/27 17:53] (current) – created - external edit 127.0.0.1
Line 1: Line 1:
 +====== Scala distributed Mandelbrot ======
 +
 +My first foray into distributed programming with scala.  This worked out pretty well.  The whole system automatically load balances and recovers elegantly if a worker is killed half way through a render or indeed if another joins whilst a render is in progress.
 +
 +It has no bells and whistles with regards to zooming and panning so as keep the code as simple as possible.
 +
 +{{:blog:mandelbrot.png|}}
 +
 +Running a worker is as easy as.  The IP and port on the worker are the IP address of the GUI, and the LOCAL port which the worker will used for listening.  If you want to many worker on the same server give each a unique port number
 +<code>
 +# export CLASSPATH=.:/usr/local/scala/lib/scala-library.jar
 +# cd MandelbrotDistributed/build/classes
 +# java mandelbrotdistributed/MandelWorker 192.168.1.137 4000
 +"another worker on the same server"
 +# java mandelbrotdistributed/MandelWorker 192.168.1.137 4001
 +</code>
 +
 +This is the entire code:
 +
 +<code scala>
 +/*
 + * Raster.scala
 + */
 +package mandelbrotdistributed
 +
 +@serializable
 +class Raster(xlen : Int) {
 +    var line = new Array[Int](xlen)
 +    def width = line.length
 +}
 +</code>
 +
 +<code scala>
 +/*
 + * Complex.scala
 + */
 +package mandelbrotdistributed
 +
 +class Complex (
 +    val a: Double,
 +      val b: Double)
 +{
 +    def abs() = Math.sqrt(a*a + b*b)
 +    // (a,b)(c,d) = (ac - bd, bc + ad)
 +    def *(that: Complex) = new Complex(a*that.a-b*that.b, b*that.a+a*that.b)
 +    def +(that: Complex) = new Complex(a+that.a, b+that.b)
 +    override def toString = a + " + " + b+"i"
 +}
 +</code>
 +
 +<code scala>
 +/*
 + * MandelWorker.scala
 + */
 +package mandelbrotdistributed
 +
 +import scala.actors.Actor._
 +import scala.actors.remote._
 +import scala.actors.remote.RemoteActor.select
 +import java.lang.Math
 +import java.net.InetAddress
 +
 +case class Register(me:Locator)
 +case class RenderedLine(m:Locator,row:Int,raster:Raster)
 +case class RenderAction(row:Int,width:Int,height:Int,level:Int)
 +case class Tick
 +
 +class MandelActor(me : Locator, clientLoc : Locator) {
 +    RemoteActor.classLoader = getClass().getClassLoader()
 +
 +    actor {
 +        println("Worker Ready")
 +        RemoteActor.alive(me.node.port)
 +        RemoteActor.register(me.name, self)
 +        loop {
 +            react {
 +                case RenderAction(row : Int, width : Int, height : Int, level : Int) =>
 +                    println("Raster row "+row)
 +                    sender ! RenderedLine(me,row, generate(width, height, row, level))
 +                case msg =>
 +                    println("Unhandled message: " + msg)
 +            }
 +        }
 +    }
 +    
 +    // Register with the GUI every 5 secs (heartbeat)
 +    val client = select(clientLoc.node,clientLoc.name)
 +    ActorPing.scheduleAtFixedRate(client, Register(me), 0L, 5000L)
 +
 +    def iterate(z:Complex, c:Complex, level:Int, i:Int): (Complex,Int) =
 +        if(z.abs > 2 || i > level) (z,i) else iterate(z*z+c, c, level, i+1)
 +    
 +    def generate(width : Int, height : Int, row : Int, level: Int) : Raster = {
 +        val raster = new Raster(width)
 +        val y = -1.5 + row*3.0/height
 +        for { x0 <- 0 until width}
 +        {
 +            val x = -2.0 + x0*3.0/width
 +            val (z, i) = iterate(new Complex(0,0), new Complex(x,y), level, 0)
 +            raster.line(x0) = if (z.abs < 2) 0 else i
 +        }
 +        raster
 +    }
 +}
 +
 +object MandelWorker {
 +    def main(args: Array[String]) :Unit = {
 +        // arg0: remote IP of where the MandelGUI program is running
 +        // arg1: a local port for the mandel worker
 +        val host = if (args.length >= 1) args(0) else "127.0.0.1"
 +        val port = if (args.length >= 2) args(1).toInt else 9010
 +        val gui = Locator(Node(host,9999), 'MandelGUI)
 +        val me = new Locator(Node(InetAddress.getLocalHost.getHostAddress, port), 'MandelWorker)
 +        new MandelActor(me, gui)
 +    }
 +}
 +</code>
 +
 +<code scala>
 +/*
 + * MandelGui.scala
 + */
 +
 +package mandelbrotdistributed
 +
 +import scala.swing._
 +import scala.swing.event.ButtonClicked
 +import scala.actors.{Actor,OutputChannel}
 +import scala.actors.Actor._
 +import scala.actors.remote.{RemoteActor,Locator,Node}
 +import scala.actors.remote.RemoteActor._
 +import scala.collection.mutable.Stack
 +import java.awt.image.BufferedImage
 +import java.awt.{Graphics,Graphics2D}
 +import java.awt.Color
 +
 +object MandelGui extends SimpleGUIApplication {
 +    val img = new BufferedImage(480,480,BufferedImage.TYPE_INT_RGB)
 +    val mandel = Mandel
 +
 +    val drawing = new Panel {
 +        background = Color.black
 +        preferredSize = (img.getWidth, img.getHeight)
 +
 +        override def paintComponent(g : Graphics) : Unit = {
 +            g.asInstanceOf[Graphics2D].drawImage(img,null,0,0)
 +        }
 +    }
 +
 +    def clearDrawing() {
 +        var g = img.getGraphics
 +        g.setColor(Color.BLACK)
 +        g.fillRect(0,0,img.getWidth,img.getHeight)
 +    }
 +
 +    def top = new MainFrame {
 +        title="Mandelbrot"
 +        contents = new BorderPanel {
 +
 +            val control = new BoxPanel(Orientation.Horizontal) {
 +                val start = new Button {
 +                    text = "Start"
 +                }
 +                val stop = new Button {
 +                    text = "Stop"
 +                }
 +                val continue = new Button {
 +                    text = "Continue"
 +                }
 +                contents.append(start,stop,continue)
 +                listenTo(start,stop,continue)
 +                reactions += {
 +                    case ButtonClicked(`start`) =>
 +                        clearDrawing
 +                        Mandel.startup
 +                    case ButtonClicked(`stop`) =>
 +                        Mandel.shutdown
 +                    case ButtonClicked(`continue`) =>
 +                        Mandel.process
 +                }
 +            }            
 +            drawing
 +
 +            import BorderPanel.Position._
 +            layout(control) = North
 +            layout(drawing) = Center                                   
 +        }
 +    }
 +
 +    object WorkerMgmt {
 +        private var allWorkers : List[Worker] = List()
 +        val defaultTTL = 6  //sweeps a worker can survive without a register
 +
 +        def foreach(op: Worker => Unit) = allWorkers.foreach(op)
 +
 +        def findWorkerForRow(row : int) : Worker = {
 +            allWorkers.filter(w=> w.row == row)(0)
 +        }
 +
 +        def find(m:Locator) : Worker = {
 +            if (allWorkers.isEmpty) null
 +            else {
 +                val list = allWorkers.filter(w=> w.loc == m)
 +                if(list.isEmpty) null else list(0)
 +            }
 +        }
 +
 +        def register(m:Locator) : Worker = {
 +            var worker = find(m)
 +            if (worker == null) {
 +                worker = new Worker(m, defaultTTL)
 +                allWorkers = worker :: allWorkers
 +            }
 +            worker.keepAlive
 +        }
 +
 +        def sweep() = {
 +            allWorkers.foreach(_.decTTL)
 +            val (ok, expired) = allWorkers span (_.ttl >= 0)
 +            allWorkers = ok
 +            expired
 +        }
 +    }
 +
 +    class Worker(val loc:Locator, val defaultTTL:int) {
 +        var row : Int = 0
 +        var ttl:Int = 0
 +        val actor = select(loc.node,loc.name)
 +        val iterationDepth = 2048
 +
 +        def decTTL = ttl -= 1
 +
 +        def keepAlive() = {
 +            ttl = defaultTTL
 +            this
 +        }
 +
 +        def render(row : Int) {
 +            this.row = row
 +            actor ! RenderAction(row, img.getWidth, img.getHeight, iterationDepth)
 +        }
 +
 +        override def toString = loc.toString
 +    }
 +
 +    object Mandel {        
 +
 +        object State extends Enumeration {
 +            val Running, Stopped = Value
 +        }
 +        private var state = State.Stopped        
 +        private var workQueue : Stack[Int] = new Stack()
 +
 +        val draw = actor {
 +            Actor.loop {
 +                react {
 +                    case (row:Int, raster:Raster) =>
 +                        for(x <- 0 until raster.width) {
 +                            val shade = raster.line(x) % 256
 +                            val rgb = new Color(shade,shade,shade).getRGB
 +                            img.setRGB(x, row, rgb)
 +                        }
 +                        drawing.repaint
 +                }
 +            }
 +        }
 +
 +        val a = actor {
 +            RemoteActor.alive(9999) // Port
 +            RemoteActor.register('MandelGUI, Actor.self)
 +            // Sweep non responsive workers every 2sec
 +            ActorPing.scheduleAtFixedRate(Actor.self, Tick, 0L, 2000L)
 +
 +            Actor.loop {
 +                react {
 +                    case "StartWork" =>
 +                        //print("Start work")
 +                        WorkerMgmt.foreach(farmWork)
 +                    case RenderedLine(m:Locator,row:int, raster:Raster) =>
 +                        draw ! (row, raster) // Get it on the screen
 +                        if(state == State.Running) farmWork(WorkerMgmt.find(m))
 +                    case Register(m:Locator) =>
 +                        println("Register "+m)
 +                        // Register and assign it work; Immediate load balance
 +                        farmWork(WorkerMgmt.register(m))
 +                    case Tick =>
 +                        for(w <- WorkerMgmt.sweep) {
 +                            println("Unregister "+w)
 +                            workQueue.push(w.row)  // push their row
 +                        }
 +                    case msg =>
 +                        println("Unhandled message: "+msg)
 +                }
 +            }
 +        }
 +
 +        def farmWork(worker : Worker) {
 +            if(workQueue.isEmpty)
 +                shutdown
 +            else {
 +                if(worker != null) worker.render(workQueue.pop)
 +            }
 +        }
 +
 +        def shutdown() {
 +            state = State.Stopped
 +        }
 +
 +        def startup() {
 +            if(state == State.Stopped) {                
 +                workQueue.clear
 +                for(row <- 0 to img.getHeight-1) workQueue.push(row)
 +                process
 +            }
 +        }
 +
 +        def process() {
 +            state = State.Running                
 +            a ! "StartWork"
 +        }
 +    }
 +}
 +</code>
 +
 +<code scala>
 +/*
 + * ActorPing.scala
 + */
 +
 +package mandelbrotdistributed
 +
 +import java.util.concurrent._
 +import scala.actors._
 +
 +// =============================================
 +/**
 + * Pings an actor every X seconds.
 + *
 + * Borrowed from Scala TIM sample; which borrows from:
 + *
 + * Code based on code from the ActorPing class in the /lift/ repository (http://liftweb.net).
 + * Copyright:
 + *
 + * (c) 2007 WorldWide Conferencing, LLC
 + * Distributed under an Apache License
 + * http://www.apache.org/licenses/LICENSE-2.0
 + */
 +object ActorPing {
 +
 +  def scheduleAtFixedRate(to: AbstractActor, msg: Any, initialDelay: Long, period: Long): ScheduledFuture[T] forSome {type T} = {
 +    val cmd = new Runnable {
 +      def run {
 +        // println("***ActorPing Event***");
 +        try {
 +          to ! msg
 +        }
 +        catch {
 +        case t:Throwable => t.printStackTrace
 +        }
 +      }
 +    }
 +    service.scheduleAtFixedRate(cmd, initialDelay, period, TimeUnit.MILLISECONDS)
 +  }
 +
 +  private val service = Executors.newSingleThreadScheduledExecutor(threadFactory)
 +
 +  private object threadFactory extends ThreadFactory {
 +    val threadFactory = Executors.defaultThreadFactory()
 +    def newThread(r: Runnable) : Thread = {
 +      val d: Thread = threadFactory.newThread(r)
 +      d setName "ActorPing"
 +      d setDaemon true
 +      d
 +    }
 +  }
 +}
 +</code>
 +
 +Entire work as NetBeans project : {{:blog:mandelbrotdistributed.zip|}}
 +
 +{{tag>scala programming}}
  
  • blog/scala_distributed_mandelbrot.txt
  • Last modified: 2009/11/27 17:53
  • by 127.0.0.1