SequentialMessageQueues in Java

Benjamins Blog

  • Blog
  • Themen
  • Fotografie
  • Sneaker
  • Kontakt
  • Impressum
-22-
09
2009

SequentialMessageQueues in Java

Viele Netzwerkprotokolle teilen ihre Kommunikationsentitäten in Pakete oder Nachrichten auf. Manche Protokolle erwarten außerdem eine sequentielle Abarbeitung (z.B. TCP auf Transportebene), auch wenn die darunterliegenden Protokollschichten dass nicht unbedingt unterstützen. Für die Implementierung eines Nachrichtenpuffers für RTSP-Nachrichten, die ungeordnet ankommen können, aber sequentiell abgearbeitet werden müssen, habe ich eine entsprechende Datenstruktur in Java erstellt. RTSP-Nachrichten besitzen ein Cseq-Header-Feld, welches die einzelnen Nachrichten durchnummiert und als ordnendes Element genutzt werden kann.

Die folgende generische Implementierung erwartet, dass alle Nachrichtenelemete eindeutige und miteinander vergleichbare Identifier besitzen. Desweiteren erlaubt diese Implementierung zwar das parallele Schreiben in die Queue, allerdings sollte das Lesen durch einen einzelnen Thread realisiert werden. Ansonsten kann die Ordnung nach dem Entnehmen aus der Queue durch unterschiedlich lange Laufzeiten der Worker-Threads wieder verloren gehen. Außerdem realisiert diese Implementierung die sequentielle Ordnung eines vollständigen Nachrichtenstroms. Nachrichtenverluste oder Timeouts werden nicht behandelt. Hierfür eignen sich eher Automatic repeat request Protokolle.
Interface für MessageQueue
Eine MessageQueue bietet die grundlegenden Funktionen zum Einfügen und Entfernen von Elementen sowie Hilfsmethoden für die Größe der Queue an.
/**
 * Interface for message queues.
 * 
 * @author Benjamin Erb
 * 
 * @param 
 *            Element type
 */
public interface MessageQueue
{
	/**
	 * Adds element to queue. Blocks until element can be added.
	 * 
	 * @param e
	 *            new element
	 * @throws InterruptedException
	 */
	void push(E e) throws InterruptedException;

	/**
	 * Takes an element from the queue. Blocks until element becomes available.
	 * 
	 * @return taken element
	 * @throws InterruptedException
	 */
	E pop() throws InterruptedException;

	/**
	 * Gets the size of the queue
	 * 
	 * @return
	 */
	int size();

	/**
	 * Checks whether queue is empty or not.
	 * 
	 * @return
	 */
	boolean empty();

}


Interface für speziellen Comparator
Dieses spezielel Comparator-Interface fügt Methoden hinzu für das Erfragen von vorherigen und nachfolgenden Identifiern an. Außerdem können Nachrichten, Identifier oder eine Kombination aus beidem verglichen werden.
import java.util.Comparator;

/**
 * An enhanced comparator interface for retrieving preceding and subsequent identifiers.  
 * 
 * @author Benjamin Erb
 *
 * @param  Element type
 * @param  Element identifier
 */
/**
 */
public interface SequentialComparator> extends Comparator
{
	/**
	 * Returns the subsequent identifier of a given entity.
	 * 
	 * @param t
	 *            entity
	 * @return subsequent identifier
	 */
	T getNext(E e);

	/**
	 * Returns the preceding identifier of a given entity.
	 * 
	 * @param t
	 *            entity
	 * @return preceding identifier
	 */
	T getPrevious(E e);

	/**
	 * Compares two entities.
	 * 
	 * @param t1
	 *            entity 1
	 * @param t2
	 *            entity 2
	 * @return a negative integer, zero, or a positive integer as the first
	 *         argument is less than, equal to, or greater than the second.
	 */
	int compare(T t1, T t2);

	/**
	 * Compares an entity and an identifier.
	 * 
	 * @param t1
	 *            entity
	 * @param e2
	 *            identifier
	 * @return a negative integer, zero, or a positive integer as the first
	 *         argument is less than, equal to, or greater than the second.
	 */
	int compare(T t1, E e2);

	/**
	 * Compares an identifier and an entity.
	 * 
	 * @param e1
	 *            identifier
	 * @param t2
	 *            entity
	 * @return a negative integer, zero, or a positive integer as the first
	 *         argument is less than, equal to, or greater than the second.
	 */
	int compare(E e1, T t2);
}


SequentialMessageQueue Implementierung
Die eigentliche Queue-Implementierung greift intern auf eine PriorityBlockingQueue zurück, allerdings wird durch die Kapselung sichergestellt, dass nur das nächste zu erwartende Element entnommen werden kann.
import java.util.concurrent.PriorityBlockingQueue;

/**
 * A message queue for sequential message processing. This queue orders incoming
 * messages entities by their identifiers and outputs entites in-order. This
 * queue supports more than one writing thread. However, it is designed for one
 * reading/consuming thread in order to prevent out-of-order processing by
 * multiple threads.
 * 
 * @author Benjamin Erb
 * 
 * @param 
 *            Message entity
 * @param 
 *            Message identifier
 */
public class SequentialMessageQueue> implements MessageQueue
{
	/**
	 * Lock object for queueing
	 */
	private final Object lock = new Object();

	/**
	 * internal queue
	 */
	private final PriorityBlockingQueue internalQueue;
	/**
	 * comparator
	 */
	private final SequentialComparator comparator;

	/**
	 * Represents the identifier of the next expected entity
	 */
	private T expectedIdentifier;

	public SequentialMessageQueue(SequentialComparator comparator, T initialIdentifier)
	{
		this.internalQueue = new PriorityBlockingQueue(16, comparator);
		this.comparator = comparator;
		this.expectedIdentifier = initialIdentifier;
	}

	@Override
	public boolean empty()
	{
		return internalQueue.isEmpty();
	}

	@Override
	public E pop() throws InterruptedException
	{
		synchronized (lock)
		{
			E firstElement;
			while ((firstElement = internalQueue.peek()) == null || comparator.compare(firstElement, expectedIdentifier) != 0)
			{
				lock.wait();
			}
			internalQueue.remove();
			expectedIdentifier = comparator.getNext(firstElement);
			return firstElement;
		}
	}

	@Override
	public void push(E e) throws InterruptedException
	{
		synchronized (lock)
		{
			internalQueue.put(e);
			lock.notifyAll();
		}

	}

	@Override
	public int size()
	{
		return internalQueue.size();
	}

	/**
	 * Returns the used comparator
	 * 
	 * @return
	 */
	protected SequentialComparator getSequentialComparator()
	{
		return comparator;
	}

}


Dieser Post ist ein Beitrag auf ioexception.de
Geschrieben von Benjamin Erb am 22.09.2009 in Programmierung Kommentar: (1) Trackbacks: (0)
Tags für diesen Artikel: coding, java
Artikel mit ähnlichen Themen:
Kurzpräsentation – Node.js
LRU-Cache in Java
diretto
Mehrere Werte in Java-Methoden typsicher zurückgeben
Twitterbot in Perl
Semantic Mashup
Schnelle Quellcode-Navigation in Eclipse
IOException.de
Mit Working Sets Eclipse aufräumen
Skript: Java Grundlagen

Trackbacks
Trackback für spezifische URI dieses Eintrags

Keine Trackbacks

Kommentare
Ansicht der Kommentare: (Linear | Verschachtelt)

#1 - Oli 23.09.2009 15:37 - (Antwort)

Hab ich da was nicht verstanden? Ich dachte immer RTSP kann man auf UDP oder TCP aufsetzen. Und falls man eine geordnete Reihenfolge wünscht, dann nimmmt man einfach TCP.
Oder ist deine Implementierung für den UDP Fall gedacht?


Kommentar schreiben

Umschließende Sterne heben ein Wort hervor (*wort*), per _wort_ kann ein Wort unterstrichen werden.
Standard-Text Smilies wie :-) und ;-) werden zu Bildern konvertiert.
Die angegebene E-Mail-Adresse wird nicht dargestellt, sondern nur für eventuelle Benachrichtigungen verwendet.

Um maschinelle und automatische Übertragung von Spamkommentaren zu verhindern, bitte die Zeichenfolge im dargestellten Bild in der Eingabemaske eintragen. Nur wenn die Zeichenfolge richtig eingegeben wurde, kann der Kommentar angenommen werden. Bitte beachten Sie, dass Ihr Browser Cookies unterstützen muss um dieses Verfahren anzuwenden.
CAPTCHA

 
 

Über den Autor

Benjamin Erb Benjamin Erb ist 24 Jahre alt und studiert an der Universität Ulm Medieninformatik.

Aktuelle Projekte

  • diretto.org
  • IOException.de

Quicklinks

  • Meine Amazon-Wishlist
  • Mein PGP-Schlüßel
  • twitter.com/b_erb
  • facebook.com/benjamin.erb

Blogroll

  • Davids Blog
  • Flos Warteschleife
  • stk bloggt.es
  • guido.demelo.de
  • Sina paints her life
  • Malte Wittkugel.net
  • Marcus bloggt.es
  • claus bloggt.es
  • floBLOG
  • Basti in Japan
  • Sven in Frankreich

Sneaker-Blogroll

  • tomat3.de
  • sneakerb0b.de
  • vEnoMaZn
  • sneakerized.com
  • welovesneaker.com

Kalender

Zurück September '10
Mo Di Mi Do Fr Sa So
    1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30      

Getaggte Artikel

acm aesthetic computing animation apache blog bücher c++ ccc coding context free data visualization datenschutz dd-wrt design design pattern diretto eclipse fachschaft fail fotos fun gadget gdg handy hardware hip-hop homepage imaging informatik information design italien java javascript joggen kiosk system laptop latein latex mail mathematik mozilla musik mysql netbook nike af1 nike air max nmap perl pgp php politik postgresql privacy processing progwerkstatt psychologie ravensburg rivoli rutenfest s9y samsung q25 sneaker sneaker photography software sopra spanien sport sql studium studivz svn tagato trac trainingscamp typografie ubiquitous computing ubuntu ulm usability user interfaces videos virtualisierung vnc web web 2.0 welfen wikipedia wishlist xslt zivildienst

Archive

September 2010
August 2010
Juli 2010
Das Neueste ...
Älteres ...

Kategorien

  • XML Allgemeines (33)
  • XML Fotos (33)
  • XML Homepage (6)
  • XML Italien (7)
  • XML Lustiges (25)
  • XML Musik (11)
  • XML Nachdenkliches (9)
  • XML Schuhe (24)
  • XML Sonstiges (3)
  • XML Sport (5)
  • XML Videos (2)
  • XML Design (18)
  • XML IT (20)
  • XML Hardware (16)
  • XML Open-Source (7)
  • XML Programmierung (41)
  • XML Studium (61)
  • XML Web (30)
  • XML Datenschutz (6)
  • XML Usability (13)

Alle Kategorien

Feeds

XML RSS 2.0 feed
ATOM/XML ATOM 1.0 feed
XML OPML 1.0 feed

Statistiken

Letzter Artikel: 01.06.2010 16:01
242 Artikel wurden geschrieben
123 Kommentare wurden abgegeben

Verwaltung des Blogs

Login
 

© 2002 - 2010 Benjamin Erb