Thread Safe Number Dispenser

The NumberDispenser class that we introduced in the previous blogs works fine for a single thread. However, if multiple threads start requesting for numbers, there is a possibility that two threads might get the same number. The problem is that while a thread is in the dispense method it might be preempted  right after it figures out what is the next number to be dispensed.

[sourcecode language=”scala”]
def dispense: Int = {
val nextNum = nextNumberStream.head
// Thread might be preempted right here …
nextNumberStream = nextNumberStream.tail
nextNum
}
[/sourcecode]

To illustrate the problem, lets make the thread sleep right where we suspect that the thread might be preempted.

[sourcecode language=”scala”]
def dispense: Int = {
val nextNum = nextNumberStream.head
Thread.sleep(10)
nextNumberStream = nextNumberStream.tail
nextNum
}
[/sourcecode]

In order to test out our theory, we will add another test case to our ScalaTest test suite and run multiple threads in that test case, each thread asking the same NumberDispenser to dispense a number. In the spirit of this blog we will use Scala’s Actors model to achieve this. Actors are thread-like entities that can be viewed as concurrent processes that communicate with each other exchanging messages. Each actor runs in its own thread. For this test case, we do not have a need for the threads to communicate with each other just the need for them to run concurrently sharing the same NumberDispenser and asking it to dispense a number. We are going to see that since we have introduces a sleep, in the dispense method, the likelihood of one of the actor thread getting preempted increases. We will see that the two threads will  get the same number in this case. Clearly this is not desirable.

The new test case looks like this –

[sourcecode language=”scala”]
import actors.Actor
test("multithreaded") {
val a = new NumberDispenser(100)
object actorOne extends Actor {
def act() {
for (i <- 1 to 100)
println("ActorOne => " + a.dispense)
}
}
object actorTwo extends Actor {
def act() {
for (i <- 1 to 100) {
println("ActorTwo => " + a.dispense)
}
}
}
actorOne.start();
actorTwo.start();
}
[/sourcecode]

This test case is simple. We create a single NumberDispense as “val a”. A val in Scala stands for value (I guess) and is similar to a final variable in Java. Once a value is assigned to a val, it cannot be changed. We create two actors in this test case. These actors are standalone objects that extend a trait called Actor. An object in Scala is a singleton. Meaning that only one instance exists. In the above example, there is a single actorOne and a single actorTwo instance. That is all we need in this case.  An Actor’s behavior is specified by defining its <em>act</em> method as we are doing above. The code inside the <em>act</em> is simple. Each thread will loop 100 times asking for a number from the shared NumberDispenser. For simplicity we just print out the number along with the name of the thread. We start the thread for the actor by calling its “<em>start</em>” method. The output looks like this –

[sourcecode language=”text”]
ActorTwo => 100
ActorOne => 100
ActorOne => 102
ActorTwo => 102
ActorTwo => 103
ActorOne => 103
ActorTwo => 104
ActorOne => 104
ActorTwo => 105
ActorOne => 105
ActorTwo => 107
ActorOne => 107
ActorTwo => 109
ActorOne => 109
ActorOne => 110
ActorTwo => 110
ActorOne => 111
ActorTwo => 111
ActorOne => 112
ActorTwo => 112
ActorTwo => 113
ActorOne => 113
ActorOne => 114
ActorTwo => 114
ActorTwo => 115
ActorOne => 115
ActorTwo => 116
ActorOne => 116
ActorTwo => 117
ActorOne => 117
ActorOne => 118
ActorTwo => 118

[/sourcecode]

As we can see that the threads get the same number. This needs to be fixed.

One way of fixing this problem and making the NumberDispense thread safe is to use the typical Java threading model based on shared data and locks. Basically we will synchronize the access to the code that dispense a number and that will fix this issue. Here is the modified code.

[sourcecode language=”scala”]
def dispense: Int = synchronized {
val nextNum = nextNumberStream.head
Thread.sleep(10)
nextNumberStream = nextNumberStream.tail
nextNum
}
[/sourcecode]

Now the threads don’t get same number.

[sourcecode language=”text”]
ActorOne => 100
ActorTwo => 101
ActorOne => 102
ActorTwo => 103
ActorOne => 104
ActorTwo => 105
ActorOne => 106
ActorTwo => 107
ActorOne => 108
ActorTwo => 109
ActorOne => 110
ActorTwo => 111
ActorOne => 112
ActorTwo => 113
ActorOne => 114
ActorTwo => 115
ActorOne => 116
ActorTwo => 117
ActorOne => 118
ActorTwo => 119
ActorOne => 120

[/sourcecode]

The shared data and lock approach has several documented issues such as possibility of causing a deadlock or getting into a race condition. It makes it harder for developers to develop working thread safe code using the shared data and lock strategy.

Scala provides the Actor model that makes it easy to develop thread safe applications that makes it easier to develop a multi threaded application. Let us see how to go about converting our simple NumberDispenser to make it into an actor. Here is the modified source code.

[sourcecode language=”scala”]
import actors.Actor

class NumberDispenser(startingNumber: Int) extends Actor {
private def numberStream(num: Int): Stream[Int] = num #:: numberStream(num + 1)

private var nextNumberStream = numberStream(startingNumber)

// Methods of this class
def act() {
loop {
react {
case ("DISPENSE", requester: Actor) => {
requester ! nextNumberStream.head;
nextNumberStream = nextNumberStream.tail
}
case ("RESET", resetTo: Int) => nextNumberStream = numberStream(resetTo)

case ("PEEK", requester: Actor) => requester ! nextNumberStream.head
}
}
}
}
[/sourcecode]

First thing that we do is to mix in the Actor trait into our NumberDispenser class. This is similar to what we did for the test case earlier when we extended it to create a multi threaded test case. We saw that we have to define the act method to make our actor behave. Actors communicate with each other using messages. An actor never blocks when it is sending a message and it cannot be interrupted while it is servicing a message. When a message arrives the actor can either receive the message by calling the receive method or can react to it by calling the react method. The difference between the two is that receive method returns from the call (possibly returning a value), react does not return after the call. Its return type in Scala is Nothing which indicates that a method will return abnormally and there is no need to maintain the stack trace of the thread. This implies that the same thread can potentially be used for the next actor that wakes up making this approach very effective.

So we are using the react method to react to the messages that will arrive at the NumberDispenser actor’s “mailbox”. The react and receive methods expect what are called partial functions as its argument. A partial function is the one that provides implementation for only some of the cases. In the example above of all the possible messages that can arrive at the door of our NumberDispenser Actor, its react method can only handle three kinds of messages. If a different type of message shows up in NumberDispenser’s mailbox, it will be ignored  and the next invocation of react method will handle the next message in the  mail boxes queue.

The three kinds of messages that can be handled by our Actor are listed as three case statements. A case statement can pattern match the message that arrives in mailbox and if the message matches its argument, it calls the body of code after the => operator.

The three messages that NumberDispenser can handle are

  1. A tuple that has 2 elements, a token string “DISPENSE” and the actor that made the request.
  2. A tuple that has 2 elements, a token string “RESET” and a number to reset the NumberDispenser to
  3. A tuple that has 2 elements, a token string “PEEK” and the actor that made the request.

Why are we passing the thread that made the request as part of the message. Remember that the react method does not return, so the partial function that is passed to it needs to do all the remaining work for the actor. In the case of DISPENSE and PEEK messages, our actor needs to reply back to the requesting thread with an answer and it does that by sending back a message to the requesting thread. That brings us to the interesting question that we have been talking along, how do you send a message to an actor. In Scala you use the ! operator to send a message. So in the example above “requester ! nextNumberStream.head” sends the element sitting at the head of the number stream to the requester. We will see how the requester will use this information next when we write the test case for testing this new thread safe NumberDispenser. For now, let’s just look at the loop that encloses out react method for the actor. Why do we have a loop? Remember that the react method does not return after it services the message, however we want to be in a continuous reactionary mode to service messages as they arrive. For this pattern, Scala provides a coding paradigm where loop method repeatedly executes the block passed to it. This works perfectly in our case because we do want an unending supply of numbers for our virtual queuing system.

So now let us change our test cases to make use of this new thread safe NumberDispenser. Here is the modified code for our test cases.

First, the ScalaTest case

[sourcecode language=”scala”]
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import actors.Actor
import actors.Actor.self

@RunWith(classOf[JUnitRunner])
class NumberDispenserSuiteTest extends FunSuite with BeforeAndAfter {
private val numberDispenser = new NumberDispenser(100)
numberDispenser.start()

before {
numberDispenser ! ("RESET", 200)
}

test("dispense") {
numberDispenser ! ("DISPENSE",self)
assert(200 == self.receive {case nextNum:Int => nextNum})
numberDispenser ! ("DISPENSE",self)
assert(201 == self.receive {case nextNum:Int => nextNum})
numberDispenser ! ("DISPENSE",self)
assert(202 == self.receive {case nextNum:Int => nextNum})
}

test("peek") {
numberDispenser ! ("PEEK",self)
assert(200 == self.receive {case nextNum:Int => nextNum})
numberDispenser ! ("PEEK",self)
assert(200 == self.receive {case nextNum:Int => nextNum})
numberDispenser ! ("DISPENSE",self)
numberDispenser ! ("DISPENSE",self)
self.receive {case nextNum:Int => nextNum}
self.receive {case nextNum:Int => nextNum}
numberDispenser ! ("PEEK",self)
assert(202 == self.receive {case nextNum:Int => nextNum})
}

test("reset") {
numberDispenser ! ("PEEK",self)
assert(200 == self.receive {case nextNum:Int => nextNum})
}

test("multithreaded") {
val a = new NumberDispenser(100)
val actorOne = actor {
for (i <- 1 to 100){
numberDispenser ! ("DISPENSE",self)
println("ActorOne => " + self.receive {case nextNum:Int => nextNum})
}
}
val actorTwo = actor {
for (i <- 1 to 100) {
numberDispenser ! ("DISPENSE",self)
println("ActorTwo => " + self.receive {case nextNum:Int => nextNum})
}
}
}
}
[/sourcecode]

And next the specs test case

[sourcecode language=”scala”]
import org.specs._
import actors.Actor._

class NumberDispenserExperimentTest extends SpecificationWithJUnit {
private val numberDispenser = new NumberDispenser(200)
numberDispenser.start()

"Number Dispenser" should {
"dispense numbers in a sequential order" in {
numberDispenser ! ("DISPENSE",self)
numberDispenser ! ("DISPENSE",self)
self.receive {case nextNum:Int => nextNum} must_== 200
self.receive {case nextNum:Int => nextNum} must_== 201
}

"allow us to reset it to any number. It should then dispense numbers from that number onwards" in {
numberDispenser ! ("PEEK",self)
self.receive {case nextNum:Int => nextNum} must_==200
numberDispenser ! ("RESET",100)
numberDispenser ! ("PEEK",self)
self.receive {case nextNum:Int => nextNum} must_== 100
}

"let us peek at the next number without dispensing or advancing to the next number" in {
numberDispenser ! ("PEEK",self)
self.receive {case nextNum:Int => nextNum} must_== 200
numberDispenser ! ("PEEK",self)
self.receive {case nextNum:Int => nextNum} must_== 200
numberDispenser ! ("PEEK",self)
self.receive {case nextNum:Int => nextNum} must_== 200
}
}

}
[/sourcecode]

The main change that we had to make in order to use the modified thread safe class was that instead of invoking the methods of the NumberDispenser class directly we will have to send messages to the Actor NumberDispenser. This actor is created inside of the test case and it is started using the start method. The start method will ensure that some thread calls the act method of NumberDispenser. At that point the act method will invoke react method that will wait for a message to arrive. If the message matches the type of messages that the partial function passed into the react method can handle, the react method will schedule the execution of the body of the partial function and it will throw an exception. The thread that called the react method will catch the exception and move on to other things. In the test case we pass the messages that can be handled by our Actor. These being the three tuples.

The NumberDispenser actor will in turn react to the message that are passed by the test thread, if the message requires a return value, the NumberDispenser method will send the return value back to the test thread as a message. Notice that after sending the message to retrieve the next number is sent to the NumberDispenser actor, the test thread immediately calls receive using self reference. It passes the partial function to the receive method of the test thread. The partial function can handle message of just one type – Int; which is what is sent in the message by the NumberDispenser thread.

Conclusion

In today’s blog we converted our simple NumberDispenser into a thread safe version. Next we will introduce another domain object – Token and TokenDispenser and see how a TokenDispenser will use the NumberDispenser class to hand out Tokens. In developing this we will learn about scope and object visibility.

Leave a Reply

Your email address will not be published. Required fields are marked *