-22-
09
2009
09
2009
SequentialMessageQueues in Java
Viele Netzwerkprotokolle teilen ihre Kommunikationsentitäten in Pakete oder Nachrichten auf. Manche Protokolle erwarten außerdem eine sequentielle Abarbeitung (z.B. TCP auf Transportebene), auch wenn die darunterliegenden Protokollschichten dass nicht unbedingt unterstützen. Für die Implementierung eines Nachrichtenpuffers für RTSP-Nachrichten, die ungeordnet ankommen können, aber sequentiell abgearbeitet werden müssen, habe ich eine entsprechende Datenstruktur in Java erstellt. RTSP-Nachrichten besitzen ein
Die folgende generische Implementierung erwartet, dass alle Nachrichtenelemete eindeutige und miteinander vergleichbare Identifier besitzen. Desweiteren erlaubt diese Implementierung zwar das parallele Schreiben in die Queue, allerdings sollte das Lesen durch einen einzelnen Thread realisiert werden. Ansonsten kann die Ordnung nach dem Entnehmen aus der Queue durch unterschiedlich lange Laufzeiten der Worker-Threads wieder verloren gehen. Außerdem realisiert diese Implementierung die sequentielle Ordnung eines vollständigen Nachrichtenstroms. Nachrichtenverluste oder Timeouts werden nicht behandelt. Hierfür eignen sich eher Automatic repeat request Protokolle.
Cseq-Header-Feld, welches die einzelnen Nachrichten durchnummiert und als ordnendes Element genutzt werden kann.Die folgende generische Implementierung erwartet, dass alle Nachrichtenelemete eindeutige und miteinander vergleichbare Identifier besitzen. Desweiteren erlaubt diese Implementierung zwar das parallele Schreiben in die Queue, allerdings sollte das Lesen durch einen einzelnen Thread realisiert werden. Ansonsten kann die Ordnung nach dem Entnehmen aus der Queue durch unterschiedlich lange Laufzeiten der Worker-Threads wieder verloren gehen. Außerdem realisiert diese Implementierung die sequentielle Ordnung eines vollständigen Nachrichtenstroms. Nachrichtenverluste oder Timeouts werden nicht behandelt. Hierfür eignen sich eher Automatic repeat request Protokolle.
Interface für MessageQueue
Eine MessageQueue bietet die grundlegenden Funktionen zum Einfügen und Entfernen von Elementen sowie Hilfsmethoden für die Größe der Queue an.
Interface für speziellen Comparator
Dieses spezielel Comparator-Interface fügt Methoden hinzu für das Erfragen von vorherigen und nachfolgenden Identifiern an. Außerdem können Nachrichten, Identifier oder eine Kombination aus beidem verglichen werden.
SequentialMessageQueue Implementierung
Die eigentliche Queue-Implementierung greift intern auf eine PriorityBlockingQueue zurück, allerdings wird durch die Kapselung sichergestellt, dass nur das nächste zu erwartende Element entnommen werden kann.
Dieser Post ist ein Beitrag auf ioexception.de
Eine MessageQueue bietet die grundlegenden Funktionen zum Einfügen und Entfernen von Elementen sowie Hilfsmethoden für die Größe der Queue an.
/** * Interface for message queues. * * @author Benjamin Erb * * @param* Element type */ public interface MessageQueue { /** * Adds element to queue. Blocks until element can be added. * * @param e * new element * @throws InterruptedException */ void push(E e) throws InterruptedException; /** * Takes an element from the queue. Blocks until element becomes available. * * @return taken element * @throws InterruptedException */ E pop() throws InterruptedException; /** * Gets the size of the queue * * @return */ int size(); /** * Checks whether queue is empty or not. * * @return */ boolean empty(); }
Interface für speziellen Comparator
Dieses spezielel Comparator-Interface fügt Methoden hinzu für das Erfragen von vorherigen und nachfolgenden Identifiern an. Außerdem können Nachrichten, Identifier oder eine Kombination aus beidem verglichen werden.
import java.util.Comparator; /** * An enhanced comparator interface for retrieving preceding and subsequent identifiers. * * @author Benjamin Erb * * @paramElement type * @param Element identifier */ /** */ public interface SequentialComparator > extends Comparator { /** * Returns the subsequent identifier of a given entity. * * @param t * entity * @return subsequent identifier */ T getNext(E e); /** * Returns the preceding identifier of a given entity. * * @param t * entity * @return preceding identifier */ T getPrevious(E e); /** * Compares two entities. * * @param t1 * entity 1 * @param t2 * entity 2 * @return a negative integer, zero, or a positive integer as the first * argument is less than, equal to, or greater than the second. */ int compare(T t1, T t2); /** * Compares an entity and an identifier. * * @param t1 * entity * @param e2 * identifier * @return a negative integer, zero, or a positive integer as the first * argument is less than, equal to, or greater than the second. */ int compare(T t1, E e2); /** * Compares an identifier and an entity. * * @param e1 * identifier * @param t2 * entity * @return a negative integer, zero, or a positive integer as the first * argument is less than, equal to, or greater than the second. */ int compare(E e1, T t2); }
SequentialMessageQueue Implementierung
Die eigentliche Queue-Implementierung greift intern auf eine PriorityBlockingQueue zurück, allerdings wird durch die Kapselung sichergestellt, dass nur das nächste zu erwartende Element entnommen werden kann.
import java.util.concurrent.PriorityBlockingQueue; /** * A message queue for sequential message processing. This queue orders incoming * messages entities by their identifiers and outputs entites in-order. This * queue supports more than one writing thread. However, it is designed for one * reading/consuming thread in order to prevent out-of-order processing by * multiple threads. * * @author Benjamin Erb * * @param* Message entity * @param * Message identifier */ public class SequentialMessageQueue > implements MessageQueue { /** * Lock object for queueing */ private final Object lock = new Object(); /** * internal queue */ private final PriorityBlockingQueue internalQueue; /** * comparator */ private final SequentialComparator comparator; /** * Represents the identifier of the next expected entity */ private T expectedIdentifier; public SequentialMessageQueue(SequentialComparator comparator, T initialIdentifier) { this.internalQueue = new PriorityBlockingQueue (16, comparator); this.comparator = comparator; this.expectedIdentifier = initialIdentifier; } @Override public boolean empty() { return internalQueue.isEmpty(); } @Override public E pop() throws InterruptedException { synchronized (lock) { E firstElement; while ((firstElement = internalQueue.peek()) == null || comparator.compare(firstElement, expectedIdentifier) != 0) { lock.wait(); } internalQueue.remove(); expectedIdentifier = comparator.getNext(firstElement); return firstElement; } } @Override public void push(E e) throws InterruptedException { synchronized (lock) { internalQueue.put(e); lock.notifyAll(); } } @Override public int size() { return internalQueue.size(); } /** * Returns the used comparator * * @return */ protected SequentialComparator getSequentialComparator() { return comparator; } }
Dieser Post ist ein Beitrag auf ioexception.de
Trackbacks
Trackback für spezifische URI dieses Eintrags
Keine Trackbacks
Kommentare
Ansicht der Kommentare:
(Linear | Verschachtelt)
#1 - Oli 23.09.2009 15:37 - (Antwort)
Hab ich da was nicht verstanden? Ich dachte immer RTSP kann man auf UDP oder TCP aufsetzen. Und falls man eine geordnete Reihenfolge wünscht, dann nimmmt man einfach TCP.
Oder ist deine Implementierung für den UDP Fall gedacht?
Benjamin Erb ist 24 Jahre alt und studiert an der 



