martedì 18 marzo 2008

The Aglets threading scheme

The Aglets platform is a multi-threaded platform where a thread can serve one agent depending on the actions it must perform during its life. The thread used by Aglets is implemented in the AgletThread class of the com.ibm.aglets.thread package.

Please note that each thread is activated depending on the delivery of one (or more) messages to an agent. Once a message is delivered, the addressee agent must process it and so a thread is woken up and used to handle the message thru the addressee agent.


Once an agent must receive/process a message, a thread is activated and used to process such message. In other words, the handleMessage() method of an aglet is run on top of one AgletThread.
Please note that the thread management involves the message manager and the message itself, since the steps are the following:
1) a message is delivered from a sender to the destination agent. The addressee agent stores the message into a message queue, hold from the message manager.
2) the message manager, once one or more thread have been stored in the queue, pops a thread and starts processing such threads one by one. In particular:
a) the message is passed to the thread;
b) the thread invokes the handle() method on the message itself
c) the message invokes the handleMessage() on the aglet, passing itself as argument
d) the thread exits the monitor, that is informs the message manager that the above message has been delivered. The message manager processes another message or leaves the thread. In the first case there is a chain that produces that until the message queue is empty the thread is hold to process messages. In the second case the thread is free, and the message manager waits until a new message comes.

In the 2.0.2 schema each agent has its own threadSpool, that is a stack of threads used to manage only messages related to the owner agent. Once the thread has delivered the message, it is pushed back into such stack (that is contained in the message manager).

In the 2.1.0 under development the schema is different: there is a thread pool that, globally, provides threads for the whole messaging system. Thus the message manager does not handle any more a private stack of threads but requires them to the pool. Once the thread has delivered the message and no more messages must be processed for this agent, the message manager pushes it back in the thread pool This allows a thread to be used for different agents at different times. Please note that this implies that a thread must know not only the message it is going to process, but also the message manager that oredered that, for coherence.
The thread has also two ways of locking depending on its state:
_ processing = it is processing a message, thus it cannot receive changes about the message itself or the message manager;
_ changing = it is changing either the message manager or the message to process and thus cannot process it.

Please note that the message manager will push and pop the thread again when it process a next message, this can bring to situations where the next message is processed by a different thread and, in general, wastes a little resources. Maybe this will be fixed in the future.

It is important to note that when a thread is woken up to handle a message, it is assigned to a specific MessageManager, that is an handler that owns messages for a specific agent (thru a message queue), as well as an agent reference. So when the thread re-start its execution, it knows exactly the message manager from which it can obtain the message and the agent. The message is already directly available to the agent, so that the thread can directly process the message. Processing the message means that the handle(..) method of the MessageImpl object is invoked, that will call consequently the handleMessage(..) method on the agent itself.

More in detail with regard to the MessageManager: the message manager is the decoupling point between a sender thread and a receiver one. In fact, when an agent sends a message to another agent, it comes up to the message manager of the addressee agent and enters the postMessage method. Such method is quite complex, but briefly stores the message into the addressee message queue and then notifies the message manager itself that are at least one new pending message. The addressee message manager then pops a thread from the thread pool and then processes the message at the top of the queue (and all the following ones) until the queue is empty. After that the message managers waits for other messages to come.

Once the thread has processed the message, it searches to process a new message (pushThreadAndExitMonitorIfOwner(..)). Please note that the MessageManager implementation (MessageManagerImpl) has the concept of owner: a message that has just been picked from the message queue or that has been just processed by a thread. So, if the thread was processing the message owner (i.e., the message for which the thread has been waked up) a new message to process is searched. If no new (or remaining) messages are present in the message queue, the thread is forced to suspend itself (and to return to the thread pool).
In the special case of re-entrant message (a message that issued a new message to process, as for instance in a request-response protocol) a new message is popped from the message queue and processed; in the case no remaining messages are available (this should not be the case of a re-entrant message) the thread suspends itself as above.
public void run() {
// if the loop of handing messages is already started return, so thus
// no more than one run call can be done.
if (loop_started) {
// to assure that aglet cannot call run on this thread.
return;
}

// set this thread as "started to handle messages"
loop_started = true;
start = false;

// get the reference of the agent behind the message manager
if( this.messageManager == null )
return; // the message manager is not valid!

try {
while (valid) {
try {

logger.debug("AgletThread is starting processing");
this.setReentrant(false); // if the process is here and is re-entrant now I'm processing
// a re-entrant message, thus after this I have to suspend myself.
this.setProcessing(true);
// get the right reference to the aglet behind the current
// message manager. This must be done each time in the cycle because
// the thread could be suspended or the message manager could be changed
// if the thread has passed thru the pool.
MessageManagerImpl manager = this.getMessageManager();
logger.debug("The message manager is " + manager + ", the message is " + message);
LocalAgletRef ref = manager.getAgletRef();
message.handle(ref); // handle the message
this.messageHandled++; // increment the number of messages handled by this thread

synchronized(this){
if( ! this.isReentrant() ){
message = null; // invalidate the message so to not repeat the handling
logger.debug("AgletThread has invalidate the message just processed (no reentrant find!)");
}
}

this.setProcessing(false);
logger.debug("AgletThread finished processing a message");

} catch (RuntimeException ex) {
logger.error("Exception caught while processing a message", ex);
valid = false;
throw ex;
} catch (Error ex) {
logger.error("Error caught while processing a message");
valid = false;
throw ex;
} catch (InvalidAgletException ex) {
logger.error("Exception caught while processing a message", ex);
valid = false;
start = true;
} finally {

// if the thread is valid, i.e., it has not been stopped
// then invoke special methods on the message manager to process
// another message (thus once the thread has been activated all messages are processed)
// or to process another message (if present) and to push back the thread in the pool.
if (valid && (! this.isReentrant())) {
// push the thread back into the pool...
logger.debug("The thread is going to be pushed back in the pool...");
messageManager.pushThreadAndExitMonitorIfOwner(this);
} else {
// process one more message...
messageManager.exitMonitorIfOwner();
}
}

// here the message has been processed, thus I can suspend myself
// waiting for a new message to process
synchronized (this) {

while (valid && this.message == null && (! this.isReentrant())) {
try {
logger.debug("Thread suspending waiting for a next message...");
this.wait();
} catch (InterruptedException ex) {
logger.error("Exception caught while waiting for an incoming message", ex);
}
}

}

}
}
finally {
message = null;
}
}
}


Nessun commento: