[back]
Demonstation In Go Of Sync, Async, And Buffering Communication
modified: 2015-03-22 04:22:15

    /*
        Leonard Kevin McGuire Jr (kmcg3413@gmail.com)

        This demonstration shows two very important concepts.

        (1) It shows the difference in a synchronous request and reply model between
            two contexts of execution to an asynchronous with latency as a factor. By
            showing how using an asynchronous model can maximize the communication link
            throughput utilization.

        (2) It shows the dead lock problem if you blindly shovel data in an asynchrnous
            model, and also shows the correct way to handle an asynchronous model. It
            does this by simulation an IPConn object using channels.
    */
    package main

    import          "fmt"
    import          "time"
    import rand     "math/rand"

    type WorkItem struct {
        stime       time.Time
        work        byte
    }

    func newWorkItem(work byte) (wi *WorkItem) {
        wi = new(WorkItem)
        wi.work = work
        wi.stime = time.Now()
        return
    }

    func shovelEntry(in chan *WorkItem, out chan *WorkItem) {
        for {
            // read work item
            wi := <- in
            // wait until its time to send it
            delta := 500000000 - (time.Now().Sub(wi.stime))
            // put 2 seconds delay at minimum
            if delta > 0 {
                time.Sleep(delta)
            }
            // send work item
            out <- wi
        }
    }

    // you should notice the important part here is there is
    // very little delay in processing making this CPU very
    // fast
    func serverEntry(in chan *WorkItem, out chan *WorkItem) {
        for {
            // get the work
            wi := <- in
            // do some work
            wi.work = wi.work + 1
            // send the work back
            out <- wi

        }
    }

    // you should notice a very low throughput which means lower
    // link utilization in this demonstration (synchronous)
    func clientEntrySync(in chan *WorkItem, out chan *WorkItem) {
        for x := 0; x < 10; {
            // produce some work
            wi := newWorkItem(byte(x))
            x = x + 1
            // send the work
            out <- wi
            // get a reply
            wi = <- in
            // print result
            fmt.Printf("[sync-demo] in:%d\n", wi.work)
        }
    }

    // you should notice much more throughput which means higher
    // link utilization in this demonstration (asynchronous)
    func clientEntryAsync(in chan *WorkItem, out chan *WorkItem) {
        var wi      *WorkItem

        for x := 0; x < 100; {
            // create work item
            if wi == nil {
                wi = newWorkItem(byte(x))
                x++
            }

            // determine if we can send a work item
            select {
                case out <- wi:
                    wi = nil
                case _wi := <- in:
                    fmt.Printf("[async-demo] in:%d\n", _wi.work)
            }
        }
    }

    // this simulates using an output that has no way of determining
    // if it might block on write or read so we have to create two
    // other goroutines that are helpers
    //
    // ** to simulate this we simply disallow the usage of the select
    //    on the `in` and `out` channels because they represent for
    //     example a net.Conn's Read and Write methods
    //
    // ** we can do select on our `helperin` and `helperout` becase
    //    they represent a true channel which is what we use to work
    //    around the potential to block
    //
    // ** we generate a variable number of work items but we can pretend
    //    that they are a single work packet of variable size
    //
    // ** the maxbuf represents the maximum memory in our machine as a
    //    outgoing and incoming application level buffer; OR they represent
    //    a static hard buffer size (instead of being able to expand)
    //
    // what will happen is we will get stuck trying to send something but
    // that will fail because the server's incoming buffer is full and our
    // incoming buffer is full causing the server to do an infinite block
    // and we are in an infinite block too because we can not read
    //
    // ** the solution is for us to implement some type of throttling
    //    to prevent exhausting all of our memory in the machine represented
    //    by the `helperin` and `helperout` and `maxbuf`
    func __helperout(in chan *WorkItem, out chan *WorkItem) {
        // goroutine for writing since IPConn supports concurrent access
        for {
            out <- <- in
        }
    }

    func __helperin(in chan *WorkItem, out chan *WorkItem) {
        // goroutine for reading since IPConn supports concurrent access
        for {
            in <- <- out
        }
    }
    func clientEntryAsyncBlocking(in chan *WorkItem, out chan *WorkItem, limit bool) {
        //
        // ** remember we cant select on `in` or `out` because we are
        //    pretending that they do not support that feature as they
        //    are like an IPConn object
        //
        var wi          *WorkItem

        maxbuf := 10
        helperin := make(chan *WorkItem, maxbuf)
        helperout := make(chan *WorkItem, maxbuf)

        go __helperout(helperin, out)
        go __helperin(helperout, in)

        var grp     []*WorkItem
        var sz      int
        var dosend  bool
        var cycles  int

        if limit {
            cycles = 500
        } else {
            cycles = 1000000
        }

        for x := 0; x < cycles; {
            if grp == nil {
                // to demonstrate we build random length number
                // of work items that must be sent at once
                sz = rand.Intn(3)
                grp = make([]*WorkItem, sz)
                for y := 0; y < sz; y++ {
                    wi := newWorkItem(byte(x))
                    grp[y] = wi
                }
                x++
            }

            // this is where we check that our buffer being the
            // `helperin` channel has room and if not we do not
            // produce anymore work but instead wait for it to
            // have room
            if limit {
                if sz > (cap(helperin) - len(helperin)) {
                    // do not send anything
                    if dosend {
                        // we do not send.. this is where we can dead lock
                        // at if we go ahead and try to send even though the
                        // buffer can not hold all of it
                        fmt.Printf("waiting because buffer is too full\n")
                    }
                    dosend = false
                } else {
                    dosend = true
                }
            } else {
                dosend = true
            }

            if dosend {
                // send group
                for y := 0; y < sz; y++ {
                    // `helperin` represents our maximum memory in the machine
                    //  as a application level buffer.. the internal network
                    //  stack buffer for our pretend IPConn is represented by
                    //  `maxbuf` in the `main` function
                    //
                    //  eventually we are going to get stuck here trying to send
                    //  and we unable to because the server's input buffer is full
                    //  and our input buffer is full but we cant read from it because
                    //  we are stuck blocked here
                    helperin <- grp[y]
                }
                grp = nil
            }

            // this allows us to determine if we are able to send or recv but
            // we do not keep in check the helperin avalible capacity so we
            // eventually overun it and get stuck
            select {
                case wi = <- helperout:
                    fmt.Printf("[async-nolimit-demo] in:%d\n", wi.work)
                default:
            }
            ////////
        }
    }

    func main() {
        maxbuf := 10
        clientout := make(chan *WorkItem, maxbuf)
        clientin := make(chan *WorkItem, maxbuf)
        serverout := make(chan *WorkItem, maxbuf)
        serverin := make(chan *WorkItem, maxbuf)

        // create two goroutines to handle shoveling data between
        go shovelEntry(clientout, serverin)
        go shovelEntry(serverout, clientin)

        // create server goroutine
        go serverEntry(serverin, serverout)

        // demonstration using channels
        clientEntrySync(clientin, clientout)
        clientEntryAsync(clientin, clientout)

        // demonstration about proper usage of buffers by simulaton of IPConn
        clientEntryAsyncBlocking(clientin, clientout, true)
        // demonstration about incorrect usage of buffers by simulation of IPConn
        clientEntryAsyncBlocking(clientin, clientout, false)
    }