martedì 18 marzo 2008

The Aglets MessageQueue

Each agent stores incoming messages into a message queue (com.ibm.aglets.MessageQueue); the message insertion into the message queue is transparently done by the delivering of a message.

The idea is that a message manager creates its own message queue and uses it to store message before it has activate a thread to process it. The use of a message queue allows the uncoupling of agent threads. In fact when an agent sends a message to another agent, the sending thread thru several method calls inserts the message in the message queue, and thus the sender thread returns. Thus the sender thread is involved until the message has been inserted in the message queue. Once this is done, it is the addressee thread in charge of processing the message itself, and this is done by the message manager that gets a thread and pops a message from the message queue.

The following is the implementation of the MessageQueue in the current development tree. As readers can see, the queue exploits a simple List to store messages that must be processed. All the methods implements the appropriate logic (e.g., priority) working around such list. Please note that the message queue is parametrizable thru Java Generics.

public class MessageQueue {


/**
* The messages will be stored into a linked list.
*/
private List messages = new LinkedList();



/**
* Appends a message, that is places the message at the end of the queue.
* @param msg the message to append
*/
public synchronized void append(MSG msg){
// check params
if( msg == null )
return;

// append the message at the tail
this.messages.add(msg);
}

/**
* Inserts a new message in the queue. The alghoritm is the following:
* each message is extracted from the queue and analyzed; at the first message with
* a priority less than the one we want to insert, the message is inserted. If no one
* message is found, than the message is placed at the tail of the queue.
* @param msg the message to insert
*/
public synchronized void insert(MSG msg){
// check params
if( msg == null )
return;


Iterator iter = this.messages.iterator();

while (iter != null && iter.hasNext()) {
MSG currentMessage = (MSG) iter.next();
if( currentMessage.getPriority() < msg.getPriority() ){
int index = this.messages.indexOf(currentMessage);
this.messages.add(index, msg);
return;
}
}
// if here the message must be placed at the tail of the queue
this.messages.add(msg);
}

/**
* Inserts the message at the top of the queue. Please note that this method will
* overtake the priority mechanism, thus it is possible to insert at the top a message
* with a low priority.
* @param top the message to place at the head of the list
*/
public synchronized void insertAtTop(MSG top){
if( top == null ) return;
// place the message at the head of the queue
this.messages.add(0, top);
}

public synchronized void insertAtTop(MessageQueue queue) {
if( queue == null || queue.isEmpty() )
return;
else{
this.messages.addAll(0, queue.messages);
}
}


public int size(){
return this.messages.size();
}

public boolean isEmpty(){
return this.messages.isEmpty();
}

/**
* Gets (but don't remove) the message at the top of the queue.
* @return the message at the top.
*/
public synchronized MSG peek(){
if( ! this.messages.isEmpty() )
return this.messages.get(0);
else
return null;
}


/**
* Extract (i.e., remove) the message at the top of the queue.
* @return the message at the top or null
*/
public synchronized MSG pop(){
if( this.messages.isEmpty() )
return null;
else
return this.messages.remove(0);
}



/**
* Removes a message from the queue.
* @param msg the message to remove
* @return true if the message has been removed, false otherwise
*/
public synchronized boolean remove(MSG msg){
if( this.messages.contains(msg) ){
this.messages.remove(msg);
return true;
}
else
return false;
}

/**
* Removes all the messages from the queue, that is after this the queue will be
* empty.
*
*/
public synchronized void removeAll(){
if( ! this.messages.isEmpty() )
this.messages.clear();
}


public String toString() {
StringBuffer buffer = new StringBuffer(50 * this.messages.size() );

buffer.append("The message queue contains " + this.messages.size() + " messages");
for(int i=0; i< this.messages.size(); i++){
buffer.append("\n");
for(int j=i; j>0; j--) // make a few indentation spaces
buffer.append(" ");

buffer.append("message " + i +")" + this.messages.get(i));
}

return buffer.toString();
}
}

Nessun commento: