Merge pull request #3 from 01org/pubsub

Initial publish/subscribe implementation
diff --git a/pom.xml b/pom.xml
index 9ef8dbd..19093d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
 
   <groupId>com.intel.jndn.utils</groupId>
   <artifactId>jndn-utils</artifactId>
-  <version>1.0.4</version>
+  <version>1.1.0</version>
   <name>jndn-utils</name>
   <description>Collection of tools to simplify synchronous and asynchronous data transfer over the NDN network
   </description>
@@ -58,6 +58,12 @@
       <version>1.1.0</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>1.10.19</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <properties>
diff --git a/src/main/java/com/intel/jndn/utils/Cancellation.java b/src/main/java/com/intel/jndn/utils/Cancellation.java
new file mode 100644
index 0000000..1a8e29f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Cancellation.java
@@ -0,0 +1,35 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Represents a token used for canceling some ongoing action. This approach was chosen as it is lighter and more
+ * flexible than a cancellable {@link java.util.concurrent.Future}; perhaps it could be replaced by {@link
+ * AutoCloseable} although this seems to imply something else (however, the language support is a benefit, TODO
+ * re-look at this).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Cancellation {
+  /**
+   * Shortcut for a no-op, already-cancelled cancellation
+   */
+  Cancellation CANCELLED = () -> {/* do nothing */};
+
+  /**
+   * Cancel the action referenced by this token
+   */
+  void cancel();
+}
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 711d24c..e38d38d 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -13,17 +13,18 @@
  */
 package com.intel.jndn.utils;
 
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.Name;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Base functionality provided by all NDN clients in this package.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface Client {
 
@@ -38,7 +39,7 @@
    * @param interest the {@link Interest} to send over the network
    * @return a future {@link Data} packet
    */
-  public CompletableFuture<Data> getAsync(Face face, Interest interest);
+  CompletableFuture<Data> getAsync(Face face, Interest interest);
 
   /**
    * Convenience method for calling
@@ -50,7 +51,7 @@
    * @param name the {@link Name} to wrap inside a default {@link Interest}
    * @return a future {@link Data} packet
    */
-  public CompletableFuture<Data> getAsync(Face face, Name name);
+  CompletableFuture<Data> getAsync(Face face, Name name);
 
   /**
    * Synchronously retrieve the {@link Data} for an {@link Interest}; this will
@@ -64,7 +65,7 @@
    * @return a {@link Data} packet
    * @throws java.io.IOException if the request fails
    */
-  public Data getSync(Face face, Interest interest) throws IOException;
+  Data getSync(Face face, Interest interest) throws IOException;
   
   /**
    * Convenience method for calling
@@ -77,5 +78,5 @@
    * @return a {@link Data} packet
    * @throws java.io.IOException if the request fails
    */
-  public Data getSync(Face face, Name name) throws IOException;
+  Data getSync(Face face, Name name) throws IOException;
 }
diff --git a/src/main/java/com/intel/jndn/utils/ContentStore.java b/src/main/java/com/intel/jndn/utils/ContentStore.java
new file mode 100644
index 0000000..e6ad68c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/ContentStore.java
@@ -0,0 +1,95 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Represents a named content store of NDN data which allows querying by name or interest; TODO merge with Repository
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface ContentStore {
+  /**
+   * Store some content under a name
+   *
+   * @param name the name of the content
+   * @param content the bytes of data
+   */
+  void put(Name name, Blob content);
+
+  /**
+   * Check if the content exists
+   *
+   * @param name the name of the content
+   * @return true if the content exists
+   */
+  boolean has(Name name);
+
+  /**
+   * Check if the content exists
+   *
+   * @param interest a selector-enabled interest to search for content in the store
+   * @return true if the content exists
+   */
+  boolean has(Interest interest);
+
+  /**
+   * Retrieve the content by name
+   *
+   * @param name the name of the content
+   * @return the content if it exists
+   */
+  Optional<Blob> get(Name name);
+
+  /**
+   * Retrieve the content by name
+   *
+   * @param interest the name of the content
+   * @return the content if it exists
+   */
+  Optional<Blob> get(Interest interest);
+
+  /**
+   * Write the stored content to the face as Data packets. If no content exists for the given name, this method should
+   * have no effect.
+   *
+   * @param face the face to write to
+   * @param name the name of the data to write
+   * @throws IOException if the writing fails
+   */
+  void push(Face face, Name name) throws IOException;
+
+  /**
+   * Write the stored content to the face as Data packets. If no content exists for the given name, this method should
+   * have no effect.
+   *
+   * @param face the face to write to
+   * @param interest the interest identifying of the data to write
+   * @throws IOException if the writing fails
+   */
+  void push(Face face, Interest interest) throws IOException;
+
+  /**
+   * Remove all stored content
+   */
+  void clear();
+}
diff --git a/src/main/java/com/intel/jndn/utils/NameTree.java b/src/main/java/com/intel/jndn/utils/NameTree.java
new file mode 100644
index 0000000..a07c791
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/NameTree.java
@@ -0,0 +1,84 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Represent a tree of nodes, branching based on their component values
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface NameTree<T> {
+  /**
+   * @return the full name leading from the root of the tree to this node
+   */
+  Name fullName();
+
+  /**
+   * @return the last component identifying this node; e.g. if the {@link #fullName()} is {@code /a/b/c} for this node,
+   * this method must return {@code c}
+   */
+  Name.Component lastComponent();
+
+  /**
+   * @return the optional content stored at this location in the tree
+   */
+  Optional<T> content();
+
+  /**
+   * @return the children of this node or an empty collection
+   */
+  Collection<NameTree<T>> children();
+
+  /**
+   * @return the parent of this node; note that calling this on the root node will return {@code null}
+   */
+  NameTree<T> parent();
+
+  /**
+   * @param name the full name leading to the location in the tree; if intervening nodes do not yet exist, they will be
+   * created
+   * @param content the content to store at this location; this may overwrite previously existing content, much like a
+   * {@link java.util.Map}
+   * @return a reference to the node inserted
+   */
+  NameTree<T> insert(Name name, T content);
+
+  /**
+   * @param query the name to use as a path through the tree
+   * @return an optional node; if there is no node at the end of the query, the {@link Optional} will be empty
+   */
+  Optional<NameTree<T>> find(Name query);
+
+  /**
+   * @param name the name to use as a path through the tree
+   * @return the removed node or an empty {@link Optional} if the node was not found
+   */
+  Optional<NameTree<T>> delete(Name name);
+
+  /**
+   * @return the count of all nodes in the tree below this one that are non-empty (e.g. have some content)
+   */
+  int count();
+
+  /**
+   * Remove all nodes beneath this one; will have no effect on a leaf node
+   */
+  void clear();
+}
diff --git a/src/main/java/com/intel/jndn/utils/On.java b/src/main/java/com/intel/jndn/utils/On.java
new file mode 100644
index 0000000..96a84d2
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/On.java
@@ -0,0 +1,28 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Generic callback API for receiving some signal T when an event is fired.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+@FunctionalInterface
+public interface On<T> {
+  /**
+   * @param event the event object describing the occurrence
+   */
+  void on(T event);
+}
diff --git a/src/main/java/com/intel/jndn/utils/PendingInterestTable.java b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
new file mode 100644
index 0000000..3e49f11
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
@@ -0,0 +1,46 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+
+/**
+ * Represent a pending interest table; the jndn {@link net.named_data.jndn.impl.PendingInterestTable} depended on
+ * several patterns such as List output parameters for collecting output and did not allow querying without extracting
+ * (see {@link #has(Interest)}).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface PendingInterestTable {
+  /**
+   * @param interest the PIT entry to add
+   */
+  void add(Interest interest);
+
+  /**
+   * @param interest the incoming interest to match against
+   * @return true if the interest matches an entry already in the PIT
+   */
+  boolean has(Interest interest);
+
+  /**
+   * @param name the name to match against
+   * @return the PIT entries matching a name, removing them from the PIT
+   */
+  Collection<Interest> extract(Name name);
+}
diff --git a/src/main/java/com/intel/jndn/utils/ProcessingStage.java b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
index 68b2649..d665c84 100644
--- a/src/main/java/com/intel/jndn/utils/ProcessingStage.java
+++ b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
@@ -18,7 +18,7 @@
  * processing stage may convert a data packet with unencrypted content to one
  * with encrypted content.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface ProcessingStage<T, Y> {
 
@@ -30,5 +30,5 @@
    * may be a new object)
    * @throws Exception if the processing fails
    */
-  public Y process(T input) throws Exception;
+  Y process(T input) throws Exception;
 }
diff --git a/src/main/java/com/intel/jndn/utils/Publisher.java b/src/main/java/com/intel/jndn/utils/Publisher.java
new file mode 100644
index 0000000..4a93084
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Publisher.java
@@ -0,0 +1,30 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils;

+

+import net.named_data.jndn.util.Blob;

+

+import java.io.IOException;

+

+/**

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+public interface Publisher extends AutoCloseable {

+  /**

+   * @param message a binary blob to publish to a topic

+   * @throws IOException if the publication fails

+   */

+  void publish(Blob message) throws IOException;

+}

diff --git a/src/main/java/com/intel/jndn/utils/Repository.java b/src/main/java/com/intel/jndn/utils/Repository.java
index fcc0172..6e314f4 100644
--- a/src/main/java/com/intel/jndn/utils/Repository.java
+++ b/src/main/java/com/intel/jndn/utils/Repository.java
@@ -20,7 +20,7 @@
 /**
  * Define API for storing and retrieving NDN packets
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface Repository {
 
@@ -29,7 +29,7 @@
    *
    * @param data a {@link Data} packet
    */
-  public void put(Data data);
+  void put(Data data);
 
   /**
    * Retrieve a {@link Data} packet in the repository; this method should
@@ -39,7 +39,7 @@
    * @return a {@link Data} packet
    * @throws DataNotFoundException if the packet is not found
    */
-  public Data get(Interest interest) throws DataNotFoundException;
+  Data get(Interest interest) throws DataNotFoundException;
 
   /**
    * Check if this repository can satisfy the {@link Interest} with a
@@ -49,10 +49,10 @@
    * @param interest the {@link Interest} to attempt to satisfy
    * @return true if a {@link Data} exists that satisfies the {@link Interest}
    */
-  public boolean satisfies(Interest interest);
+  boolean satisfies(Interest interest);
 
   /**
    * Remove all stale {@link Data} packets from the repository.
    */
-  public void cleanup();
+  void cleanup();
 }
diff --git a/src/main/java/com/intel/jndn/utils/Server.java b/src/main/java/com/intel/jndn/utils/Server.java
index 197a52a..422074c 100644
--- a/src/main/java/com/intel/jndn/utils/Server.java
+++ b/src/main/java/com/intel/jndn/utils/Server.java
@@ -13,12 +13,13 @@
  */
 package com.intel.jndn.utils;
 
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Name;
 import net.named_data.jndn.OnInterestCallback;
 
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
 /**
  * Base interface for defining a server; see descendant interfaces for different
  * modes of serving packets. This class extends {@link Runnable} expecting
@@ -26,20 +27,20 @@
  * {@link Runnable#run()} block, thus allowing different ways to manage servers
  * (e.g. single-thread vs {@link ScheduledThreadPoolExecutor}.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface Server extends Runnable, OnInterestCallback {
 
   /**
    * @return the {@link Name} prefix this server is serving on.
    */
-  public Name getPrefix();
+  Name getPrefix();
 
   /**
    * @return the registered prefix ID of the server on the {@link Face} or -1 if
    * unregistered
    */
-  public long getRegisteredPrefixId();
+  long getRegisteredPrefixId();
 
   /**
    * Add a stage to the server pipeline. Each stage should be processed once the
@@ -49,5 +50,5 @@
    *
    * @param stage a Data-to-Data processing stage
    */
-  public void addPostProcessingStage(ProcessingStage<Data, Data> stage);
+  void addPostProcessingStage(ProcessingStage<Data, Data> stage);
 }
diff --git a/src/main/java/com/intel/jndn/utils/Subscriber.java b/src/main/java/com/intel/jndn/utils/Subscriber.java
new file mode 100644
index 0000000..2830cbc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Subscriber.java
@@ -0,0 +1,35 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils;

+

+import java.io.IOException;

+import java.util.Set;

+

+/**

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+public interface Subscriber extends AutoCloseable {

+  /**

+   * @return the currently known publishers

+   */

+  Set<Long> knownPublishers();

+

+  /**

+   * Open the subscriber transport mechanisms and retrieve publisher messages

+   *

+   * @throws IOException if the subscriber fails during network access

+   */

+  void open() throws IOException;

+}

diff --git a/src/main/java/com/intel/jndn/utils/Topic.java b/src/main/java/com/intel/jndn/utils/Topic.java
new file mode 100644
index 0000000..8816e5d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Topic.java
@@ -0,0 +1,68 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.pubsub.PubSubFactory;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+
+/**
+ * A publish-subscrib topic; see the {@code pubsub} package for implementation details
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public final class Topic {
+  private final Name name;
+
+  /**
+   * @param name the NDN name of the topic
+   */
+  public Topic(Name name) {
+    this.name = name;
+  }
+
+  /**
+   * @return the NDN name of the topic; the return is wrapped so that the original value is immutable and the return
+   * type can be modified
+   */
+  public Name name() {
+    return new Name(name);
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @param onMessage callback fired when a message is received
+   * @param onError callback fired when an error happens after subscription
+   * @return an open subscriber
+   * @throws IOException if the subscription fails
+   */
+  public Subscriber subscribe(Face face, On<Blob> onMessage, On<Exception> onError) throws IOException {
+    return PubSubFactory.newSubscriber(face, name, onMessage, onError);
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @return a factory-built publisher
+   */
+  public Publisher newPublisher(Face face) {
+    return PubSubFactory.newPublisher(face, name);
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/DataStream.java b/src/main/java/com/intel/jndn/utils/client/DataStream.java
index 7f33870..f558ab4 100644
--- a/src/main/java/com/intel/jndn/utils/client/DataStream.java
+++ b/src/main/java/com/intel/jndn/utils/client/DataStream.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -22,54 +22,56 @@
  * Define a stream of {@link Data} packets as they are retrieved from the
  * network and provide methods for observing stream events
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * TODO merge with Observable
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface DataStream extends OnData, OnTimeout, OnComplete, OnException {
 
   /**
    * @return true if the stream is complete
    */
-  public boolean isComplete();
+  boolean isComplete();
 
   /**
    * @return the current list of packets retrieved; this may change as more
    * packets are added
    */
-  public Data[] list();
+  Data[] list();
 
   /**
    * @return an assembled packet containing the concatenated bytes of all
    * received packets
    * @throws StreamException if assembly fails
    */
-  public Data assemble() throws StreamException;
+  Data assemble() throws StreamException;
 
   /**
    * Watch all {@link OnData} events
    *
    * @param onData the callback fired
    */
-  public void observe(OnData onData);
+  void observe(OnData onData);
 
   /**
    * Watch all {@link OnComplete} events
    *
    * @param onComplete the callback fired
    */
-  public void observe(OnComplete onComplete);
+  void observe(OnComplete onComplete);
 
   /**
    * Watch all {@link OnException} events
    *
    * @param onException the callback fired
    */
-  public void observe(OnException onException);
+  void observe(OnException onException);
 
   /**
    * Watch all {@link OnTimeout} events
    *
    * @param onTimeout the callback fired
    */
-  public void observe(OnTimeout onTimeout);
+  void observe(OnTimeout onTimeout);
 
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/OnComplete.java b/src/main/java/com/intel/jndn/utils/client/OnComplete.java
index 6cbbe86..5bd6811 100644
--- a/src/main/java/com/intel/jndn/utils/client/OnComplete.java
+++ b/src/main/java/com/intel/jndn/utils/client/OnComplete.java
@@ -16,12 +16,12 @@
 /**
  * Callback fired when an activity is finished.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface OnComplete {
 
   /**
    * Called when an activity is finished
    */
-  public void onComplete();
+  void onComplete();
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/OnException.java b/src/main/java/com/intel/jndn/utils/client/OnException.java
index d4c440d..976a874 100644
--- a/src/main/java/com/intel/jndn/utils/client/OnException.java
+++ b/src/main/java/com/intel/jndn/utils/client/OnException.java
@@ -17,7 +17,7 @@
  * Callback fired when an activity throws an exception; this is necessary because
  * some actions are performed asynchronously (e.g. in a thread pool) and exceptions
  * could otherwise be lost.
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface OnException {
 
@@ -25,5 +25,5 @@
    * Called when an activity throws an exception
    * @param exception the exception thrown
    */
-  public void onException(Exception exception);
+  void onException(Exception exception);
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/RetryClient.java b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
index 9a5dacb..432db0b 100644
--- a/src/main/java/com/intel/jndn/utils/client/RetryClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -11,18 +11,21 @@
  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
  * more details.
  */
+
 package com.intel.jndn.utils.client;
 
-import java.io.IOException;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.OnData;
 import net.named_data.jndn.OnTimeout;
 
+import java.io.IOException;
+
 /**
- * Define a client that can retry {@link Interest} packets on timeout.
+ * Define a client that can retry {@link Interest} packets on timeout. TODO should return Cancellation so that users
+ * can stop the process
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface RetryClient {
 
@@ -40,5 +43,5 @@
    * @param onTimeout the application's failure callback
    * @throws IOException when the client cannot perform the necessary network IO
    */
-  public void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException;
+  void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException;
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentationType.java b/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
index 166d81f..5d7dd88 100644
--- a/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
@@ -17,14 +17,14 @@
  * Documents known partition types from
  * http://named-data.net/doc/tech-memos/naming-conventions.pdf
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public enum SegmentationType {
 
   SEGMENT((byte) 0x00),
   BYTE_OFFSET((byte) 0xFB);
 
-  private byte marker;
+  private final byte marker;
 
   SegmentationType(byte marker) {
     this.marker = marker;
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java b/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
index d866675..6155398 100644
--- a/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
@@ -13,15 +13,16 @@
  */
 package com.intel.jndn.utils.client;
 
-import java.io.IOException;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 
+import java.io.IOException;
+
 /**
  * Define a client that can retrieve segmented packets into a
  * {@link DataStream}.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface SegmentedClient {
 
@@ -35,5 +36,5 @@
    * @return a data stream of packets returned
    * @throws IOException if the initial request fails
    */
-  public DataStream getSegmentsAsync(Face face, Interest interest) throws IOException;
+  DataStream getSegmentsAsync(Face face, Interest interest) throws IOException;
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/StreamingClient.java b/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
index 3ab5fb6..9e75840 100644
--- a/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
@@ -13,16 +13,17 @@
  */
 package com.intel.jndn.utils.client;
 
-import java.io.IOException;
-import java.io.InputStream;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 /**
  * Define a client that can stream content bytes that are partitioned over
  * multiple packets.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface StreamingClient {
 
@@ -39,5 +40,5 @@
    * @return a stream of content bytes
    * @throws IOException if the stream setup fails
    */
-  public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker) throws IOException;
+  InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker) throws IOException;
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java b/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
index 2659656..d1cdd51 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
@@ -13,51 +13,40 @@
  */
 package com.intel.jndn.utils.client.impl;
 
+import com.intel.jndn.utils.client.DataStream;
 import com.intel.jndn.utils.client.OnComplete;
 import com.intel.jndn.utils.client.OnException;
 import com.intel.jndn.utils.client.RetryClient;
-import com.intel.jndn.utils.client.SegmentedClient;
-import com.intel.jndn.utils.client.DataStream;
 import com.intel.jndn.utils.client.SegmentationType;
+import com.intel.jndn.utils.client.SegmentedClient;
 import com.intel.jndn.utils.client.StreamingClient;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.OnTimeout;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
 /**
  * Implementation of a client that can handle segmented data, retries after
  * failed requests, and streaming of data packets.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class AdvancedClient extends SimpleClient implements SegmentedClient, StreamingClient {
 
-  private static final Logger logger = Logger.getLogger(AdvancedClient.class.getName());
   public static final int DEFAULT_MAX_RETRIES = 3;
+  private static final Logger logger = Logger.getLogger(AdvancedClient.class.getName());
   private static AdvancedClient defaultInstance;
   private final SegmentedClient segmentedClient;
   private final RetryClient retryClient;
   private final StreamingClient streamingClient;
 
   /**
-   * Singleton access for simpler client use
-   *
-   * @return a default client
-   */
-  public static AdvancedClient getDefault() {
-    if (defaultInstance == null) {
-      defaultInstance = new AdvancedClient();
-    }
-    return defaultInstance;
-  }
-
-  /**
    * Build an advanced client
    *
    * @param sleepTime for synchronous processing, the time to sleep the thread
@@ -90,6 +79,18 @@
   }
 
   /**
+   * Singleton access for simpler client use
+   *
+   * @return a default client
+   */
+  public static AdvancedClient getDefault() {
+    if (defaultInstance == null) {
+      defaultInstance = new AdvancedClient();
+    }
+    return defaultInstance;
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/BackoffRetryClient.java b/src/main/java/com/intel/jndn/utils/client/impl/BackoffRetryClient.java
new file mode 100644
index 0000000..ca8357f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/BackoffRetryClient.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.RetryClient;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BackoffRetryClient implements RetryClient {
+
+  private static final Logger LOGGER = Logger.getLogger(BackoffRetryClient.class.getName());
+  private final double cutoffLifetime;
+  private final int backoffFactor;
+
+  public BackoffRetryClient(double cutoffLifetime, int backoffFactor) {
+    this.cutoffLifetime = cutoffLifetime;
+    this.backoffFactor = backoffFactor;
+  }
+
+  @Override
+  public void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException {
+    retryInterest(face, interest, onData, onTimeout);
+  }
+
+  private void retryInterest(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException {
+    double newLifetime = interest.getInterestLifetimeMilliseconds() * backoffFactor;
+    if (newLifetime < cutoffLifetime) {
+      interest.setInterestLifetimeMilliseconds(newLifetime);
+      resend(face, interest, onData, onTimeout);
+    } else {
+      onTimeout.onTimeout(interest);
+    }
+  }
+
+  private void resend(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException {
+    LOGGER.log(Level.INFO, "Resending interest with {0}ms lifetime: {1}", new Object[]{interest.getInterestLifetimeMilliseconds(), interest.getName()});
+    face.expressInterest(interest, onData, timedOutInterest -> {
+      try {
+        retryInterest(face, timedOutInterest, onData, onTimeout);
+      } catch (IOException e) {
+        onTimeout.onTimeout(interest);
+      }
+    });
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java b/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
index c1ca4fe..be642fc 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -13,18 +13,19 @@
  */
 package com.intel.jndn.utils.client.impl;
 
-import com.intel.jndn.utils.client.SegmentedClient;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
+import com.intel.jndn.utils.impl.SegmentationHelper;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.util.Blob;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
 /**
  * Internal class for assembling a list of {@link Data} packets into one large
  * data packet; this implementation will use all properties of the first packet
  * in the list and concatenate the content bytes of all packets in order.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 class DataAssembler {
 
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
index 21f1c0e..cb90496 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -14,23 +14,24 @@
 package com.intel.jndn.utils.client.impl;
 
 import com.intel.jndn.utils.client.RetryClient;
-import java.io.IOException;
-import java.util.logging.Logger;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.OnData;
 import net.named_data.jndn.OnTimeout;
 
+import java.io.IOException;
+import java.util.logging.Logger;
+
 /**
  * Default implementation of {@link RetryClient}; on request failure, this class
  * immediately retries the request until a maximum number of retries is reached.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class DefaultRetryClient implements RetryClient {
 
-  private static final Logger logger = Logger.getLogger(DefaultRetryClient.class.getName());
+  private static final Logger LOGGER = Logger.getLogger(DefaultRetryClient.class.getName());
   private final int numRetriesAllowed;
   private volatile int totalRetries = 0;
 
@@ -61,8 +62,8 @@
    * @param context the current request context
    * @throws IOException when the client cannot perform the necessary network IO
    */
-  synchronized private void retryInterest(RetryContext context) throws IOException {
-    logger.info("Retrying interest: " + context.interest.toUri());
+  private synchronized void retryInterest(RetryContext context) throws IOException {
+    LOGGER.info("Retrying interest: " + context.interest.toUri());
     context.face.expressInterest(context.interest, context, context);
     totalRetries++;
   }
@@ -70,7 +71,7 @@
   /**
    * @return the total number of retries logged by this client
    */
-  public int totalRetries() {
+  int totalRetries() {
     return totalRetries;
   }
 
@@ -80,20 +81,20 @@
    */
   private class RetryContext implements OnData, OnTimeout {
 
-    public int numFailures = 0;
-    public final Face face;
-    public final Interest interest;
-    public final OnData applicationOnData;
-    public final OnTimeout applicationOnTimeout;
+    final Face face;
+    final Interest interest;
+    final OnData applicationOnData;
+    final OnTimeout applicationOnTimeout;
+    int numFailures = 0;
 
-    public RetryContext(Face face, Interest interest, OnData applicationOnData, OnTimeout applicationOnTimeout) {
+    RetryContext(Face face, Interest interest, OnData applicationOnData, OnTimeout applicationOnTimeout) {
       this.face = face;
       this.interest = interest;
       this.applicationOnData = applicationOnData;
       this.applicationOnTimeout = applicationOnTimeout;
     }
 
-    public boolean shouldRetry() {
+    boolean shouldRetry() {
       return numFailures < numRetriesAllowed;
     }
 
@@ -105,7 +106,7 @@
     @Override
     public void onTimeout(Interest interest) {
       numFailures++;
-      logger.finest("Request failed, count " + numFailures + ": " + interest.toUri());
+      LOGGER.finest("Request failed, count " + numFailures + ": " + interest.toUri());
 
       if (shouldRetry()) {
         try {
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
index 83e0baa..74b7144 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -13,10 +13,9 @@
  */
 package com.intel.jndn.utils.client.impl;
 
-import com.intel.jndn.utils.client.SegmentedClient;
 import com.intel.jndn.utils.client.DataStream;
-import java.io.IOException;
-import java.util.logging.Logger;
+import com.intel.jndn.utils.client.SegmentedClient;
+import com.intel.jndn.utils.impl.SegmentationHelper;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -24,18 +23,21 @@
 import net.named_data.jndn.Name.Component;
 import net.named_data.jndn.OnData;
 
+import java.io.IOException;
+import java.util.logging.Logger;
+
 /**
  * Retrieve segments one by one until the FinalBlockId indicates an end segment;
  * then request remaining packets. This class currently only handles segmented
  * data (not yet byte-offset segmented data).
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class DefaultSegmentedClient implements SegmentedClient {
 
   private static final Logger logger = Logger.getLogger(DefaultSegmentedClient.class.getName());
   private static DefaultSegmentedClient defaultInstance;
-  private byte marker = 0x00;
+  private final byte marker = 0x00;
 
   /**
    * Singleton access for simpler client use
@@ -67,6 +69,26 @@
   }
 
   /**
+   * Replace the final component of an interest name with a segmented component;
+   * if the interest name does not have a segmented component, this will add
+   * one.
+   *
+   * @param interest the request
+   * @param segmentNumber a segment number
+   * @param marker a marker to use for segmenting the packet
+   * @return a segmented interest (a copy of the passed interest)
+   */
+  protected Interest replaceFinalComponent(Interest interest, long segmentNumber, byte marker) {
+    Interest copied = new Interest(interest);
+    Component lastComponent = Component.fromNumberWithMarker(segmentNumber, marker);
+    Name newName = (SegmentationHelper.isSegmented(copied.getName(), marker))
+        ? copied.getName().getPrefix(-1)
+        : new Name(copied.getName());
+    copied.setName(newName.append(lastComponent));
+    return copied;
+  }
+
+  /**
    * Helper class to track the last requested segment for a given request
    * context and request follow-on packets
    */
@@ -126,24 +148,4 @@
     }
   }
 
-  /**
-   * Replace the final component of an interest name with a segmented component;
-   * if the interest name does not have a segmented component, this will add
-   * one.
-   *
-   * @param interest the request
-   * @param segmentNumber a segment number
-   * @param marker a marker to use for segmenting the packet
-   * @return a segmented interest (a copy of the passed interest)
-   */
-  protected Interest replaceFinalComponent(Interest interest, long segmentNumber, byte marker) {
-    Interest copied = new Interest(interest);
-    Component lastComponent = Component.fromNumberWithMarker(segmentNumber, marker);
-    Name newName = (SegmentationHelper.isSegmented(copied.getName(), marker))
-            ? copied.getName().getPrefix(-1)
-            : new Name(copied.getName());
-    copied.setName(newName.append(lastComponent));
-    return copied;
-  }
-
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
index 55fa0f0..ce5db19 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
@@ -18,28 +18,27 @@
 import com.intel.jndn.utils.client.SegmentationType;
 import com.intel.jndn.utils.client.SegmentedClient;
 import com.intel.jndn.utils.client.StreamingClient;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnData;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.OnData;
 
 /**
  * Default implementation of {@link StreamingClient}; uses a segmented client to
  * retrieve packets asynchronously and pipes them to a stream as they are
  * received.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class DefaultStreamingClient implements StreamingClient {
-
-  private static final Logger logger = Logger.getLogger(DefaultStreamingClient.class.getName());
-  private SegmentedClient client;
+  private static final Logger LOGGER = Logger.getLogger(DefaultStreamingClient.class.getName());
 
   /**
    * {@inheritDoc}
@@ -50,13 +49,14 @@
   }
 
   /**
-   * 
-   * @param face
-   * @param interest
-   * @param partitionMarker
-   * @param onException
-   * @return
-   * @throws IOException
+   * @param face the {@link Face} on which to make the request; call {@link Face#processEvents()} separately to complete
+   * the request
+   * @param interest the {@link Interest} to send over the network
+   * @param partitionMarker the byte marker identifying how the data packets are partitioned (e.g. segmentation, see
+   * http://named-data.net/doc/tech-memos/naming-conventions.pdf)
+   * @param onException callback fired if a failure occurs during streaming
+   * @return a stream of content bytes
+   * @throws IOException if the stream setup fails
    */
   public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker, OnException onException) throws IOException {
     SegmentedClient client = DefaultSegmentedClient.getDefault();
@@ -64,11 +64,10 @@
   }
 
   /**
-   *
-   * @param onDataStream
-   * @param onException
-   * @return
-   * @throws IOException
+   * @param onDataStream the data stream of incoming data packets
+   * @param onException callback fired if a failure occurs during streaming
+   * @return a stream of content bytes
+   * @throws IOException if the stream setup fails
    */
   public InputStream getStreamAsync(DataStream onDataStream, OnException onException) throws IOException {
     PipedInputStream in = new PipedInputStream();
@@ -97,7 +96,7 @@
 
     @Override
     public void onException(Exception exception) {
-      logger.log(Level.SEVERE, "Streaming failed", exception);
+      LOGGER.log(Level.SEVERE, "Streaming failed", exception);
     }
   }
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java b/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java
deleted file mode 100644
index 7a051ee..0000000
--- a/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.client.impl;
-
-import net.named_data.jndn.Name;
-import net.named_data.jndn.encoding.EncodingException;
-
-/**
- * Helper methods for dealing with segmented packets (see
- * http://named-data.net/doc/tech-memos/naming-conventions.pdf).
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentationHelper {
-
-  /**
-   * Determine if a name is segmented, i.e. if it ends with the correct marker
-   * type.
-   *
-   * @param name the name of a packet
-   * @param marker the marker type (the initial byte of the component)
-   * @return true if the name is segmented
-   */
-  public static boolean isSegmented(Name name, byte marker) {
-    return name.size() > 0 ? name.get(-1).getValue().buf().get(0) == marker : false;
-  }
-
-  /**
-   * Retrieve the segment number from the last component of a name.
-   *
-   * @param name the name of a packet
-   * @param marker the marker type (the initial byte of the component)
-   * @return the segment number
-   * @throws EncodingException if the name does not have a final component of
-   * the correct marker type
-   */
-  public static long parseSegment(Name name, byte marker) throws EncodingException {
-    if (name.size() == 0) {
-      throw new EncodingException("No components to parse.");
-    }
-    return name.get(-1).toNumberWithMarker(marker);
-  }
-
-  /**
-   * Remove a segment component from the end of a name
-   *
-   * @param name the name of a packet
-   * @param marker the marker type (the initial byte of the component)
-   * @return the new name with the segment component removed or a copy of the
-   * name if no segment component was present
-   */
-  public static Name removeSegment(Name name, byte marker) {
-    return isSegmented(name, marker) ? name.getPrefix(-1) : new Name(name);
-  }
-}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java b/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
index 41fd86a..43f6839 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -13,9 +13,17 @@
  */
 package com.intel.jndn.utils.client.impl;
 
+import com.intel.jndn.utils.client.DataStream;
 import com.intel.jndn.utils.client.OnComplete;
 import com.intel.jndn.utils.client.OnException;
-import com.intel.jndn.utils.client.DataStream;
+import com.intel.jndn.utils.impl.SegmentationHelper;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+import net.named_data.jndn.encoding.EncodingException;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -23,12 +31,6 @@
 import java.util.Map;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.OnData;
-import net.named_data.jndn.OnTimeout;
-import net.named_data.jndn.encoding.EncodingException;
 
 /**
  * As packets are received, they are mapped by their last component's segment
@@ -40,7 +42,7 @@
  * data is received; if data is received out of order, the callbacks will not be
  * fired until adjoining packets are received.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SegmentedDataStream implements DataStream {
 
@@ -48,8 +50,8 @@
   private final byte PARTITION_MARKER = 0x00;
   private volatile long current = -1;
   private volatile long end = Long.MAX_VALUE;
-  private Map<Long, Data> packets = new HashMap<>();
-  private List<Object> observers = new ArrayList<>();
+  private final Map<Long, Data> packets = new HashMap<>();
+  private final List<Object> observers = new ArrayList<>();
   private Exception exception;
 
   @Override
@@ -84,7 +86,6 @@
       throw new StreamException(exception);
     }
 
-    Object o = new LinkedList();
     return new DataAssembler(list(), PARTITION_MARKER).assemble();
   }
 
@@ -146,7 +147,7 @@
         current++;
         assert (packets.containsKey(current));
         Data retrieved = packets.get(current);
-        observersOfType(OnData.class).stream().forEach((OnData cb) -> {
+        observersOfType(OnData.class).forEach((OnData cb) -> {
           cb.onData(interest, retrieved);
         });
       } while (hasNextPacket());
@@ -172,14 +173,14 @@
 
   @Override
   public synchronized void onComplete() {
-    observersOfType(OnComplete.class).stream().forEach((OnComplete cb) -> {
+    observersOfType(OnComplete.class).forEach((OnComplete cb) -> {
       cb.onComplete();
     });
   }
 
   @Override
   public synchronized void onTimeout(Interest interest) {
-    observersOfType(OnTimeout.class).stream().forEach((OnTimeout cb) -> {
+    observersOfType(OnTimeout.class).forEach((OnTimeout cb) -> {
       cb.onTimeout(interest);
     });
   }
@@ -188,7 +189,7 @@
   public synchronized void onException(Exception exception) {
     this.exception = exception;
 
-    observersOfType(OnException.class).stream().forEach((OnException cb) -> {
+    observersOfType(OnException.class).forEach((OnException cb) -> {
       cb.onException(exception);
     });
   }
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java b/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
index 9e2e594..5ebf05a 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
@@ -14,24 +14,25 @@
 package com.intel.jndn.utils.client.impl;
 
 import com.intel.jndn.utils.Client;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.Name;
 import net.named_data.jndn.OnData;
 import net.named_data.jndn.OnTimeout;
-import java.util.logging.Logger;
 import net.named_data.jndn.encoding.EncodingException;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 /**
  * Provide a client to simplify information retrieval over the NDN network.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SimpleClient implements Client {
 
@@ -43,18 +44,6 @@
   private final long interestLifetime;
 
   /**
-   * Singleton access for simpler client use
-   *
-   * @return a default client
-   */
-  public static SimpleClient getDefault() {
-    if (defaultInstance == null) {
-      defaultInstance = new SimpleClient();
-    }
-    return defaultInstance;
-  }
-
-  /**
    * Build a simple client
    *
    * @param sleepTime for synchronous processing, the time to sleep the thread
@@ -76,6 +65,18 @@
   }
 
   /**
+   * Singleton access for simpler client use
+   *
+   * @return a default client
+   */
+  public static SimpleClient getDefault() {
+    if (defaultInstance == null) {
+      defaultInstance = new SimpleClient();
+    }
+    return defaultInstance;
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
@@ -159,7 +160,6 @@
    * @return a default interest for the given name
    */
   public Interest getDefaultInterest(Name name) {
-    Interest interest = new Interest(name, interestLifetime);
-    return interest;
+    return new Interest(name, interestLifetime);
   }
 }
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java b/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
index 59ad55a..7cf5c6f 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
@@ -15,7 +15,7 @@
 
 /**
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class StreamException extends Exception {
 
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
new file mode 100644
index 0000000..5ed94cb
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
@@ -0,0 +1,68 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.PendingInterestTable;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * TODO use NameTree for storage? or Set and override Interest equals()
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTable implements PendingInterestTable {
+  private static final Logger LOGGER = Logger.getLogger(BoundedInMemoryPendingInterestTable.class.getName());
+  private final BoundedLinkedMap<Name, Interest> table;
+
+  public BoundedInMemoryPendingInterestTable(int maxSize) {
+    this.table = new BoundedLinkedMap<>(maxSize);
+  }
+
+  @Override
+  public void add(Interest interest) {
+    LOGGER.log(Level.INFO, "Adding pending interest: {0}", interest.toUri());
+    table.put(interest.getName(), interest);
+  }
+
+  @Override
+  public boolean has(Interest interest) {
+    if (interest.getChildSelector() != -1) {
+      for (Name name : table.keySet()) {
+        // TODO this logic must be more complex; must match selectors as well
+        if (interest.matchesName(name)) {
+          return true;
+        }
+      }
+      return false;
+    } else {
+      return has(interest.getName());
+    }
+  }
+
+  public boolean has(Name name) {
+    return table.containsKey(name);
+  }
+
+  @Override
+  public Collection<Interest> extract(Name name) {
+    return table.values().stream().filter(i -> i.matchesName(name)).collect(Collectors.toList());
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
new file mode 100644
index 0000000..6102a9b
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
@@ -0,0 +1,159 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Linked hash map exposing the earliest and latest entries added; it is bounded to a configurable size to save memory.
+ * When benchmarked against CircularBufferCache, this class had slower insertions but faster reads and was therefore
+ * retained for use.
+ * <p>
+ * It limits the amount of memory by replacing the oldest added element; this involves overriding the LinkedHashMap
+ * implementation's removeEldestEntry() method; see the <a href="https://docs.oracle.com/javase/8/docs/api/java/util/LinkedHashMap.html#removeEldestEntry-java.util.Map.Entry-">Javadoc
+ * entry</a> for more information.
+ * <p>
+ * Additionally, we implemented Josh Bloch's item 16 of Effective Java so that calls are forwarded to the underlying
+ * LinkedHashMap. This allows us to decorate with some custom behavior and synchronize as we need.
+ * <p>
+ * This class is coarsely thread-safe; every public method is synchronized for one-at-a-time access to the underlying
+ * map.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedLinkedMap<K, V> implements Map<K, V> {
+  private final LinkedHashMap<K, V> map;
+  private K latest;
+
+  /**
+   * @param maxSize the maximum allowed number of records to store
+   */
+  public BoundedLinkedMap(int maxSize) {
+    this.map = new LinkedHashMap<K, V>(maxSize) {
+      @Override
+      public boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+        return size() > maxSize;
+      }
+    };
+  }
+
+  /**
+   * @return the earliest key added to this set or null if none are added
+   */
+  public synchronized K earliest() {
+    for (K key : map.keySet()) {
+      return key; // the LinkedHashMap guarantees iteration in order of insertion
+    }
+    return null;
+  }
+
+  /**
+   * @return the latest key added to this set or null if none are added
+   */
+  public synchronized K latest() {
+    return latest;
+  }
+
+  @Override
+  public synchronized V put(K key, V value) {
+    latest = key;
+    return map.put(key, value);
+  }
+
+  @Override
+  public synchronized int size() {
+    return map.size();
+  }
+
+  @Override
+  public synchronized boolean isEmpty() {
+    return map.isEmpty();
+  }
+
+  @Override
+  public synchronized boolean containsKey(Object key) {
+    return map.containsKey(key);
+  }
+
+  @Override
+  public synchronized boolean containsValue(Object value) {
+    return map.containsValue(value);
+  }
+
+  @Override
+  public synchronized V get(Object key) {
+    return map.get(key);
+  }
+
+
+  @Override
+  public synchronized V remove(Object key) {
+    V value = map.remove(key);
+    if (key == latest) {
+      latest = findLatest(map);
+    }
+    return value;
+  }
+
+  @Override
+  public synchronized void putAll(Map<? extends K, ? extends V> m) {
+    map.putAll(m);
+    latest = findLatest(map);
+  }
+
+  @Override
+  public synchronized void clear() {
+    map.clear();
+    latest = null;
+  }
+
+  @Override
+  public synchronized Set<K> keySet() {
+    return map.keySet();
+  }
+
+  @Override
+  public synchronized Collection<V> values() {
+    return map.values();
+  }
+
+  @Override
+  public synchronized Set<java.util.Map.Entry<K, V>> entrySet() {
+    return map.entrySet();
+  }
+
+  @Override
+  public String toString() {
+    return map.toString();
+  }
+
+  /**
+   * To find the latest key in a LinkedHashMap, iterate and return the last one found. The LinkedHashMap guarantees
+   * iteration in order of insertion
+   *
+   * @param m the map to inspect
+   * @return the latest key added to the map
+   */
+  private K findLatest(LinkedHashMap<K, V> m) {
+    K newLatest = null;
+    for (K key : m.keySet()) {
+      newLatest = key;
+    }
+    return newLatest;
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
new file mode 100644
index 0000000..048f304
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
@@ -0,0 +1,154 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Name;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * TODO need a way to bound the size
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class DefaultNameTree<T> implements NameTree<T> {
+  private final DefaultNameTree<T> parent;
+  private final Map<Name.Component, DefaultNameTree<T>> children = new HashMap<>();
+  private Name.Component component;
+  private T content;
+
+  DefaultNameTree(DefaultNameTree<T> parent) {
+    this.parent = parent;
+  }
+
+  public static <T> NameTree<T> newRootTree() {
+    return new DefaultNameTree<>(null);
+  }
+
+  @Override
+  public Optional<T> content() {
+    return Optional.ofNullable(content);
+  }
+
+  @Override
+  public Name fullName() {
+    ArrayList<Name.Component> components = new ArrayList<>();
+    NameTree<T> self = this;
+    while (self.lastComponent() != null) {
+      components.add(self.lastComponent());
+      self = self.parent();
+    }
+    Collections.reverse(components);
+    return new Name(components);
+  }
+
+  @Override
+  public Name.Component lastComponent() {
+    return component;
+  }
+
+  @Override
+  public Collection<NameTree<T>> children() {
+    List<NameTree<T>> c = new ArrayList<>(children.size());
+    for (NameTree<T> nt : children.values()) {
+      c.add(nt);
+    }
+    return c;
+  }
+
+  @Override
+  public NameTree<T> parent() {
+    return parent;
+  }
+
+  @Override
+  public Optional<NameTree<T>> find(Name name) {
+    if (name.size() == 0) {
+      return Optional.of(this);
+    } else {
+      Name.Component first = name.get(0);
+      DefaultNameTree<T> child = children.get(first);
+      if (child == null) {
+        return Optional.empty();
+      } else {
+        return child.find(name.size() > 1 ? name.getSubName(1) : new Name());
+      }
+    }
+  }
+
+  @Override
+  public NameTree<T> insert(Name name, T content) {
+    Name.Component first = name.get(0);
+
+    DefaultNameTree<T> child = children.get(first);
+    if (child == null) {
+      child = new DefaultNameTree<>(this);
+      child.component = first;
+      children.put(first, child);
+    }
+
+    if (name.size() == 1) {
+      child.content = content;
+      return child;
+    } else {
+      return child.insert(name.getSubName(1), content);
+    }
+  }
+
+  @Override
+  public Optional<NameTree<T>> delete(Name name) {
+    if (name.size() == 0) {
+      return Optional.of(parent.deleteChild(this.component));
+    } else {
+      Name.Component first = name.get(0);
+      DefaultNameTree<T> child = children.get(first);
+      if (child == null) {
+        return Optional.empty();
+      } else {
+        if (children.size() == 1) {
+          children.remove(first);
+        }
+        return child.delete(name.getSubName(1));
+      }
+    }
+  }
+
+  private NameTree<T> deleteChild(Name.Component component) {
+    return children.remove(component);
+  }
+
+  @Override
+  public int count() {
+    throw new UnsupportedOperationException("Breadth-first search?");
+  }
+
+  @Override
+  public void clear() {
+    children.clear();
+  }
+
+  @Override
+  public String toString() {
+    String c = (component == null) ? null : component.toEscapedString();
+    return "DefaultNameTree{" + c + ": " + content + '}';
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
new file mode 100644
index 0000000..713442c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
@@ -0,0 +1,141 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.ContentStore;
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.logging.Logger;
+
+/**
+ * TODO must bound the size of the content store
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class InMemoryContentStore implements ContentStore {
+  private static final Logger LOGGER = Logger.getLogger(InMemoryContentStore.class.getName());
+  private final NameTree<Blob> store;
+  private final Data template;
+
+  public InMemoryContentStore(int freshnessMs) {
+    this.template = new Data();
+    this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
+    this.store = DefaultNameTree.newRootTree();
+  }
+
+  @Override
+  public void put(Name name, Blob data) {
+    store.insert(name, data);
+  }
+
+  @Override
+  public Optional<Blob> get(Interest interest) {
+    Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+    return leaf.isPresent() ? leaf.get().content() : Optional.empty();
+  }
+
+  private Optional<NameTree<Blob>> getWithSelectors(Interest interest) {
+    Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
+    if (possibleBlob.isPresent() && hasSelectors(interest)) {
+      List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
+      if (children.isEmpty()) {
+        return Optional.empty();
+      } else if (children.size() == 1) {
+        return Optional.of(children.get(0));
+      } else if (isRightMost(interest)) {
+        Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
+        return Optional.of(children.get(0));
+      } else if (isLeftMost(interest)) {
+        Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
+        return Optional.of(children.get(0));
+      } else {
+        // TODO max/min suffix components and excludes
+        LOGGER.warning("The interest requested has selectors not yet implemented; returning empty content");
+        return Optional.empty();
+      }
+    }
+    return possibleBlob;
+  }
+
+  private static boolean hasSelectors(Interest interest) {
+    return interest.getChildSelector() != -1 || interest.getExclude().size() > 0;
+  }
+
+  private static boolean isRightMost(Interest interest) {
+    return interest.getChildSelector() == Interest.CHILD_SELECTOR_RIGHT;
+  }
+
+  private static boolean isLeftMost(Interest interest) {
+    return interest.getChildSelector() == Interest.CHILD_SELECTOR_LEFT;
+  }
+
+  @Override
+  public boolean has(Name name) {
+    return store.find(name).isPresent();
+  }
+
+  @Override
+  public boolean has(Interest interest) {
+    return get(interest).isPresent();
+  }
+
+  @Override
+  public Optional<Blob> get(Name name) {
+    Optional<NameTree<Blob>> tree = store.find(name);
+    return tree.isPresent() ? tree.get().content() : Optional.empty();
+  }
+
+  @Override
+  public void push(Face face, Name name) throws IOException {
+    Optional<Blob> blob = get(name);
+    if (blob.isPresent()) {
+      Data t = new Data(template);
+      t.setName(name);
+      ByteArrayInputStream b = new ByteArrayInputStream(blob.get().getImmutableArray());
+      for (Data d : SegmentationHelper.segment(t, b)) {
+        face.putData(d);
+      }
+    }
+  }
+
+  @Override
+  public void push(Face face, Interest interest) throws IOException {
+    Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+    if (leaf.isPresent() && leaf.get().content().isPresent()) {
+      Data t = new Data(template);
+      t.setName(leaf.get().fullName());
+      ByteArrayInputStream b = new ByteArrayInputStream(leaf.get().content().get().getImmutableArray());
+      for (Data d : SegmentationHelper.segment(t, b)) {
+        face.putData(d);
+      }
+    }
+  }
+
+  @Override
+  public void clear() {
+    store.clear();
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
new file mode 100644
index 0000000..1ddef62
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
@@ -0,0 +1,149 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper methods for reading and writing segmented NDN packets. See <a
+ * href="http://named-data.net/doc/tech-memos/naming-conventions.pdf">NDN Naming Conventions</a> for information used
+ * in this class
+ * <p>
+ * For segmentation of streams: the current use of the default segment size of 4096
+ * (only for {@link #segment(net.named_data.jndn.Data, java.io.InputStream)} is based on several assumptions: NDN packet
+ * size was limited to 8000 at the time this was written and the signature size is unknown.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class SegmentationHelper {
+
+  public static final int DEFAULT_SEGMENT_SIZE = 4096;
+  private static final byte NDN_SEGMENT_MARKER = 0x00;
+
+  private SegmentationHelper() {
+    // do not instantiate this class
+  }
+
+  /**
+   * Determine if a name is segmented, i.e. if it ends with the correct marker type.
+   *
+   * @param name the name of a packet
+   * @param marker the marker type (the initial byte of the component)
+   * @return true if the name is segmented
+   */
+  public static boolean isSegmented(Name name, byte marker) {
+    return name.size() > 0 && name.get(-1).getValue().buf().get(0) == marker;
+  }
+
+  /**
+   * Retrieve the segment number from the last component of a name.
+   *
+   * @param name the name of a packet
+   * @param marker the marker type (the initial byte of the component)
+   * @return the segment number
+   * @throws EncodingException if the name does not have a final component of the correct marker type
+   */
+  public static long parseSegment(Name name, byte marker) throws EncodingException {
+    if (name.size() == 0) {
+      throw new EncodingException("No components to parse.");
+    }
+    return name.get(-1).toNumberWithMarker(marker);
+  }
+
+  /**
+   * Remove a segment component from the end of a name
+   *
+   * @param name the name of a packet
+   * @param marker the marker type (the initial byte of the component)
+   * @return the new name with the segment component removed or a copy of the name if no segment component was present
+   */
+  public static Name removeSegment(Name name, byte marker) {
+    return isSegmented(name, marker) ? name.getPrefix(-1) : new Name(name);
+  }
+
+  /**
+   * Segment a stream of bytes into a list of Data packets; this must read all
+   * the bytes first in order to determine the end segment for FinalBlockId.
+   *
+   * @param template the {@link Data} packet to use for the segment {@link Name}, {@link net.named_data.jndn.MetaInfo},
+   * etc.
+   * @param bytes an {@link InputStream} to the bytes to segment
+   * @return a list of segmented {@link Data} packets
+   * @throws IOException if the stream fails
+   */
+  public static List<Data> segment(Data template, InputStream bytes) throws IOException {
+    return segment(template, bytes, DEFAULT_SEGMENT_SIZE);
+  }
+
+  /**
+   * Segment a stream of bytes into a list of Data packets; this must read all the bytes first in order to determine the
+   * end segment for FinalBlockId. TODO this could be smarter and only add segments if necessary
+   *
+   * @param template the {@link Data} packet to use for the segment {@link Name}, {@link net.named_data.jndn.MetaInfo},
+   * etc.
+   * @param bytes an {@link InputStream} to the bytes to segment
+   * @return a list of segmented {@link Data} packets
+   * @throws IOException if the stream fails
+   */
+  public static List<Data> segment(Data template, InputStream bytes, int segmentSize) throws IOException {
+    List<Data> segments = new ArrayList<>();
+    byte[] readBytes = readAll(bytes);
+    int numBytes = readBytes.length;
+    int numPackets = (int) Math.ceil((double) numBytes / segmentSize);
+    ByteBuffer buffer = ByteBuffer.wrap(readBytes, 0, numBytes);
+    Name.Component lastSegment = Name.Component.fromNumberWithMarker((long) numPackets - 1, NDN_SEGMENT_MARKER);
+
+    for (int i = 0; i < numPackets; i++) {
+      Data segment = new Data(template);
+      segment.getName().appendSegment(i);
+      segment.getMetaInfo().setFinalBlockId(lastSegment);
+      byte[] content = new byte[Math.min(segmentSize, buffer.remaining())];
+      buffer.get(content);
+      segment.setContent(new Blob(content));
+      segments.add(segment);
+    }
+
+    return segments;
+  }
+
+  /**
+   * Read all of the bytes in an input stream.
+   *
+   * @param bytes the {@link InputStream} of bytes to read
+   * @return an array of all bytes retrieved from the stream
+   * @throws IOException if the stream fails
+   */
+  public static byte[] readAll(InputStream bytes) throws IOException {
+    ByteArrayOutputStream builder = new ByteArrayOutputStream();
+    int read = bytes.read();
+    while (read != -1) {
+      builder.write(read);
+      read = bytes.read();
+    }
+    builder.flush();
+    bytes.close();
+    return builder.toByteArray();
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
index 21862fa..5befb6d 100644
--- a/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
@@ -14,15 +14,16 @@
 package com.intel.jndn.utils.processing.impl;
 
 import com.intel.jndn.utils.ProcessingStage;
-import java.io.ByteArrayOutputStream;
-import java.util.zip.GZIPOutputStream;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.util.Blob;
 
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPOutputStream;
+
 /**
  * Sample stage for compressing {@link Data} content using GZIP
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class CompressionStage implements ProcessingStage<Data, Data> {
 
diff --git a/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
index 3125db8..d9eb897 100644
--- a/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
@@ -20,10 +20,9 @@
 import net.named_data.jndn.security.SecurityException;
 
 /**
- * As a part of a {@link com.intel.jndn.utils.ServerIn} pipeline, this stage
- * will sign a {@link Data} packet.
+ * As a part of a server pipeline, this stage will sign a {@link Data} packet.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SigningStage implements ProcessingStage<Data, Data> {
 
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
new file mode 100644
index 0000000..363a9ce
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
@@ -0,0 +1,63 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+import com.intel.jndn.utils.Cancellation;

+import com.intel.jndn.utils.On;

+

+import java.io.IOException;

+

+/**

+ * API for entering and exiting a group and tracking the accompanying announcements.

+ *

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+interface AnnouncementService {

+  /**

+   * Announce entrance into the group

+   *

+   * @param id the ID of the agent

+   * @throws IOException if the announcement fails

+   */

+  void announceEntrance(long id) throws IOException;

+

+  /**

+   * Announce exit from the group

+   *

+   * @param id the ID of the agent

+   * @throws IOException if the announcement fails

+   */

+  void announceExit(long id) throws IOException;

+

+  /**

+   * Discover all previously announced entrances; the implementation is expected to be driven by network IO

+   * @param onFound callback fired when a group member is discovered

+   * @param onComplete callback fired when all group members have been discovered

+   * @param onError callback fired if errors occur during discovery

+   * @return a cancellation token for stopping discovery

+   * @throws IOException if the network fails during discovery

+   */

+  Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException;

+

+  /**

+   * Observe any new entrances/exits; the implementation is expected to be driven by network IO

+   * @param onAdded callback fired when a group member announces its entry into the group, see {@link #announceEntrance(long)}

+   * @param onRemoved callback fired when a group member announces its exit from the group, see {@link #announceExit(long)}

+   * @param onError callback fired if errors are encountered while processing announcements

+   * @return a cancellation token for stopping the observer

+   * @throws IOException if the network fails while setting up the observer

+   */

+  Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException;

+}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
new file mode 100644
index 0000000..ccb058c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
@@ -0,0 +1,210 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+import com.intel.jndn.utils.Cancellation;

+import com.intel.jndn.utils.On;

+import com.intel.jndn.utils.client.impl.BackoffRetryClient;

+import net.named_data.jndn.Exclude;

+import net.named_data.jndn.Face;

+import net.named_data.jndn.Interest;

+import net.named_data.jndn.InterestFilter;

+import net.named_data.jndn.Name;

+import net.named_data.jndn.OnInterestCallback;

+import net.named_data.jndn.OnRegisterFailed;

+import net.named_data.jndn.encoding.EncodingException;

+import net.named_data.jndn.security.SecurityException;

+

+import java.io.IOException;

+import java.util.HashSet;

+import java.util.Set;

+import java.util.concurrent.CompletableFuture;

+import java.util.logging.Level;

+import java.util.logging.Logger;

+

+import static com.intel.jndn.utils.Cancellation.CANCELLED;

+

+/**

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+class NdnAnnouncementService implements AnnouncementService {

+  private static final Logger LOGGER = Logger.getLogger(NdnAnnouncementService.class.getName());

+  private static final double STARTING_DISCOVERY_LIFETIME = 100.0;

+  private static final double MAX_DISCOVERY_LIFETIME = 45000.0;

+  private final Face face;

+  private final Name topicPrefix;

+  private final Name broadcastPrefix;

+  private final Set<Long> known = new HashSet<>();

+  private final BackoffRetryClient client;

+  private boolean stopped = false;

+

+  private NdnAnnouncementService(Face face, Name broadcastPrefix, Name topicPrefix) {

+    this.face = face;

+    this.topicPrefix = topicPrefix;

+    this.broadcastPrefix = new Name(broadcastPrefix).append(topicPrefix);

+    this.client = new BackoffRetryClient(MAX_DISCOVERY_LIFETIME, 2);

+  }

+

+  NdnAnnouncementService(Face face, Name topicPrefix) {

+    this(face, PubSubNamespace.DEFAULT_BROADCAST_PREFIX, topicPrefix);

+  }

+

+  @Override

+  public void announceEntrance(long id) throws IOException {

+    LOGGER.log(Level.INFO, "Announcing publisher entrance: {0} to {1}", new Object[]{id, broadcastPrefix});

+    Name name = PubSubNamespace.toAnnouncement(broadcastPrefix, id, PubSubNamespace.Announcement.ENTRANCE);

+    Interest interest = new Interest(name);

+    face.expressInterest(interest, null);

+  }

+

+  @Override

+  public void announceExit(long id) throws IOException {

+    LOGGER.log(Level.INFO, "Announcing publisher exit: {0} from {1}", new Object[]{id, broadcastPrefix});

+    Name name = PubSubNamespace.toAnnouncement(broadcastPrefix, id, PubSubNamespace.Announcement.EXIT);

+    Interest interest = new Interest(name);

+    face.expressInterest(interest, null);

+  }

+

+  @Override

+  public Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException {

+    LOGGER.log(Level.INFO, "Discover existing publishers: {0}", topicPrefix);

+    if (onFound == null) {

+      return CANCELLED;

+    }

+

+    for (long id : known) {

+      LOGGER.info("Passing known publisher: " + id);

+      onFound.on(id);

+    }

+

+    discover(client, onFound, onComplete, onError);

+    return () -> stopped = true;

+  }

+

+  // TODO need special namespace for discovery

+  private Interest discover(BackoffRetryClient client, On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException {

+    Interest interest = new Interest(topicPrefix);

+    interest.setInterestLifetimeMilliseconds(STARTING_DISCOVERY_LIFETIME);

+    interest.setExclude(excludeKnownPublishers());

+    client.retry(face, interest, (interest1, data) -> {

+      if (stopped) {

+        return;

+      }

+

+      LOGGER.log(Level.INFO, "Received discovery data ({0} bytes): {1}", new Object[]{data.getContent().size(), data.getName()});

+      found(data.getName(), onFound, onError);

+

+      // TODO instead of inspecting name should look at content and examine for multiple publishers

+      try {

+        discover(client, onFound, onComplete, onError); // recursion

+      } catch (IOException e) {

+        LOGGER.log(Level.SEVERE, "Failed while discovering publishers, aborting: {0}", new Object[]{broadcastPrefix, e});

+        onError.on(e);

+        // TODO re-call discover here?

+      }

+    }, interest2 -> {

+      // TODO recall client here, we should never be done

+    });

+    return interest;

+  }

+

+  private Exclude excludeKnownPublishers() {

+    Exclude exclude = new Exclude();

+    for (long pid : known) {

+      exclude.appendComponent(PubSubNamespace.toPublisherComponent(pid));

+    }

+    return exclude;

+  }

+

+  private void found(Name publisherName, On<Long> onFound, On<Exception> onError) {

+    try {

+      found(PubSubNamespace.parsePublisher(publisherName), onFound);

+    } catch (EncodingException e) {

+      LOGGER.log(Level.SEVERE, "Failed to parse new publisher name, ignoring: {0}", publisherName);

+      onError.on(e);

+    }

+  }

+

+  private void found(long publisherId, On<Long> onFound) {

+    LOGGER.log(Level.INFO, "Found new publisher: {0}", publisherId);

+    known.add(publisherId);

+    onFound.on(publisherId);

+  }

+

+  @Override

+  public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException {

+    LOGGER.log(Level.INFO, "Observing new announcements: {0}", broadcastPrefix);

+    CompletableFuture<Void> future = new CompletableFuture<>();

+    OnRegistration onRegistration = new OnRegistration(future);

+    OnAnnouncement onAnnouncement = new OnAnnouncement(onAdded, onRemoved, onError);

+

+    try {

+      long registeredPrefix = face.registerPrefix(broadcastPrefix, onAnnouncement, (OnRegisterFailed) onRegistration, onRegistration);

+      return () -> face.removeRegisteredPrefix(registeredPrefix);

+    } catch (SecurityException e) {

+      throw new IOException("Failed while using transport security key chain", e);

+    }

+  }

+

+  private class OnAnnouncement implements OnInterestCallback {

+    private final On<Long> onAdded;

+    private final On<Long> onRemoved;

+    private final On<Exception> onError;

+

+    OnAnnouncement(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) {

+      this.onAdded = onAdded;

+      this.onRemoved = onRemoved;

+      this.onError = onError;

+    }

+

+    @Override

+    public void onInterest(Name name, Interest interest, Face face, long l, InterestFilter interestFilter) {

+      LOGGER.log(Level.INFO, "Received announcement: {0}", interest.toUri());

+      try {

+        long publisherId = PubSubNamespace.parsePublisher(interest.getName());

+        PubSubNamespace.Announcement announcement = PubSubNamespace.parseAnnouncement(interest.getName());

+        switch (announcement) {

+          case ENTRANCE:

+            add(publisherId);

+            break;

+          case EXIT:

+            remove(publisherId);

+            break;

+          default:

+            LOGGER.warning("Unknown announcement action, ignoring: " + interest.toUri());

+        }

+      } catch (EncodingException e) {

+        LOGGER.log(Level.SEVERE, "Failed to decode announcement: " + interest.toUri(), e);

+        onError.on(e);

+      }

+    }

+

+    private void remove(long publisherId) {

+      LOGGER.info("Publisher leaving topic: " + publisherId);

+      known.remove(publisherId);

+      if (onRemoved != null) {

+        onRemoved.on(publisherId);

+      }

+    }

+

+    private void add(long publisherId) {

+      LOGGER.info("Publisher entering topic: " + publisherId);

+      known.add(publisherId);

+      if (onAdded != null) {

+        onAdded.on(publisherId);

+      }

+    }

+  }

+}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
new file mode 100644
index 0000000..adb069b
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -0,0 +1,158 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+import com.intel.jndn.utils.ContentStore;

+import com.intel.jndn.utils.PendingInterestTable;

+import com.intel.jndn.utils.Publisher;

+import net.named_data.jndn.Data;

+import net.named_data.jndn.Face;

+import net.named_data.jndn.Interest;

+import net.named_data.jndn.InterestFilter;

+import net.named_data.jndn.Name;

+import net.named_data.jndn.OnInterestCallback;

+import net.named_data.jndn.OnRegisterFailed;

+import net.named_data.jndn.security.SecurityException;

+import net.named_data.jndn.util.Blob;

+

+import java.io.IOException;

+import java.util.concurrent.CompletableFuture;

+import java.util.concurrent.ExecutionException;

+import java.util.concurrent.TimeUnit;

+import java.util.concurrent.TimeoutException;

+import java.util.concurrent.atomic.AtomicLong;

+import java.util.logging.Level;

+import java.util.logging.Logger;

+

+/**

+ * TODO look at thread safety

+ *

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+class NdnPublisher implements Publisher, OnInterestCallback {

+  private static final Logger LOGGER = Logger.getLogger(NdnPublisher.class.getName());

+  private static final int ATTRIBUTES_FRESHNESS_PERIOD = 2000;

+  private final Face face;

+  private final Name prefix;

+  private final AnnouncementService announcementService;

+  private final PendingInterestTable pendingInterestTable;

+  private final ContentStore contentStore;

+  private final long publisherId;

+  private final AtomicLong latestMessageId = new AtomicLong(0);

+  private long registrationId;

+  private boolean opened = false;

+

+  NdnPublisher(Face face, Name prefix, long publisherId, AnnouncementService announcementService, PendingInterestTable pendingInterestTable, ContentStore contentStore) {

+    this.face = face;

+    this.prefix = PubSubNamespace.toPublisherName(prefix, publisherId);

+    this.publisherId = publisherId;

+    this.announcementService = announcementService;

+    this.pendingInterestTable = pendingInterestTable;

+    this.contentStore = contentStore;

+  }

+

+  synchronized void open() throws IOException {

+    opened = true;

+    CompletableFuture<Void> future = new CompletableFuture<>();

+    OnRegistration onRegistration = new OnRegistration(future);

+

+    try {

+      registrationId = face.registerPrefix(prefix, this, (OnRegisterFailed) onRegistration, onRegistration);

+      // assumes face.processEvents is driven concurrently elsewhere

+      future.get(10, TimeUnit.SECONDS);

+      announcementService.announceEntrance(publisherId);

+    } catch (IOException | SecurityException | InterruptedException | ExecutionException | TimeoutException e) {

+      throw new IOException("Failed to register NDN prefix; pub-sub IO will be impossible", e);

+    }

+  }

+

+  /**

+   * TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers

+   * to retrieve still-alive messages

+   *

+   * @throws IOException if the group exit announcement fails

+   */

+  @Override

+  public synchronized void close() throws IOException {

+    if (opened) {

+      face.removeRegisteredPrefix(registrationId);

+      contentStore.clear();

+      announcementService.announceExit(publisherId);

+    }

+  }

+

+  @Override

+  public void publish(Blob message) throws IOException {

+    if (!opened) {

+      open();

+    }

+

+    long id = latestMessageId.getAndIncrement();

+    Name name = PubSubNamespace.toMessageName(prefix, id);

+

+    contentStore.put(name, PubSubNamespace.toResponse(null, message));

+    LOGGER.log(Level.INFO, "Published message {0} to content store: {1}", new Object[]{id, name});

+

+    if (pendingInterestTable.has(new Interest(name))) {

+      sendContent(face, name);

+      // TODO extract satisfied interests

+    }

+  }

+

+  @Override

+  public void onInterest(Name name, Interest interest, Face face, long registrationId, InterestFilter interestFilter) {

+    LOGGER.log(Level.INFO, "Client requesting message: {0}", interest.toUri());

+    if (isAttributesRequest(name, interest)) {

+      sendAttributes(face, name);

+    } else {

+      if (contentStore.has(interest)) {

+        sendContent(face, interest);

+      } else {

+        pendingInterestTable.add(interest);

+      }

+    }

+  }

+

+  private static boolean isAttributesRequest(Name name, Interest interest) {

+    return name.equals(interest.getName()) && interest.getChildSelector() == -1;

+  }

+

+  private void sendContent(Face face, Name name) {

+    try {

+      contentStore.push(face, name);

+    } catch (IOException e) {

+      LOGGER.log(Level.SEVERE, "Failed to publish message, aborting: {0}", new Object[]{name, e});

+    }

+  }

+

+  private void sendContent(Face face, Interest interest) {

+    try {

+      contentStore.push(face, interest);

+    } catch (IOException e) {

+      LOGGER.log(Level.SEVERE, "Failed to publish message, aborting: {0}", new Object[]{interest.getName(), e});

+    }

+  }

+

+  private static void sendAttributes(Face face, Name publisherName) {

+    Data data = new Data(publisherName);

+    data.setContent(new Blob("[attributes here]"));

+    data.getMetaInfo().setFreshnessPeriod(ATTRIBUTES_FRESHNESS_PERIOD);

+    try {

+      face.putData(data);

+    } catch (IOException e) {

+      LOGGER.log(Level.SEVERE, "Failed to publish attributes for publisher: " + publisherName, e);

+    }

+  }

+}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
new file mode 100644
index 0000000..51aee3a
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
@@ -0,0 +1,186 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+import com.intel.jndn.utils.Cancellation;

+import com.intel.jndn.utils.Client;

+import com.intel.jndn.utils.On;

+import com.intel.jndn.utils.Subscriber;

+import net.named_data.jndn.Data;

+import net.named_data.jndn.Face;

+import net.named_data.jndn.Interest;

+import net.named_data.jndn.Name;

+import net.named_data.jndn.encoding.EncodingException;

+import net.named_data.jndn.util.Blob;

+

+import java.io.IOException;

+import java.util.Map;

+import java.util.Set;

+import java.util.concurrent.CompletableFuture;

+import java.util.concurrent.ConcurrentHashMap;

+import java.util.logging.Level;

+import java.util.logging.Logger;

+

+/**

+ * TODO look at thread safety

+ *

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+class NdnSubscriber implements Subscriber {

+  private static final Logger LOGGER = Logger.getLogger(NdnSubscriber.class.getName());

+  private final Face face;

+  private final Name prefix;

+  private final On<Blob> onMessage;

+  private final On<Exception> onError;

+  private final AnnouncementService announcementService;

+  private final Client client;

+  private final Map<Long, Subscription> subscriptions = new ConcurrentHashMap<>();

+  private Cancellation newAnnouncementCancellation;

+  private Cancellation existingAnnouncementsCancellation;

+

+  NdnSubscriber(Face face, Name prefix, On<Blob> onMessage, On<Exception> onError, AnnouncementService announcementService, Client client) {

+    this.face = face;

+    this.prefix = prefix;

+    this.onMessage = onMessage;

+    this.onError = onError;

+    this.announcementService = announcementService;

+    this.client = client;

+  }

+

+  @Override

+  public Set<Long> knownPublishers() {

+    return subscriptions.keySet();

+  }

+

+  @Override

+  public void open() throws IOException {

+    LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);

+    existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::addPublisher, null, e -> close());

+    newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::addPublisher, this::removePublisher, e -> close());

+  }

+

+  void addPublisher(long publisherId) {

+    if (subscriptions.containsKey(publisherId)) {

+      LOGGER.log(Level.WARNING, "Duplicate publisher ID {} received from announcement service; this should not happen and will be ignored", publisherId);

+    } else {

+      Subscription subscription = new Subscription(publisherId);

+      subscriptions.put(publisherId, subscription);

+      subscription.subscribe();

+    }

+  }

+

+  void removePublisher(long publisherId) {

+    Subscription removed = subscriptions.remove(publisherId);

+    removed.cancel();

+  }

+

+  @Override

+  public void close() {

+    LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{subscriptions.size(), subscriptions});

+

+    if (newAnnouncementCancellation != null) {

+      newAnnouncementCancellation.cancel();

+    }

+

+    if (existingAnnouncementsCancellation != null) {

+      existingAnnouncementsCancellation.cancel();

+    }

+

+    for (Subscription c : subscriptions.values()) {

+      c.cancel();

+    }

+  }

+

+  private class Subscription implements Cancellation {

+    final long publisherId;

+    long messageId;

+    CompletableFuture<Data> currentRequest;

+

+    Subscription(long publisherId) {

+      this.publisherId = publisherId;

+    }

+

+    @Override

+    public synchronized void cancel() {

+      if (currentRequest != null) {

+        currentRequest.cancel(true);

+      }

+    }

+

+    synchronized void subscribe() {

+      // would prefer this to be getAsync(on<>, on<>)?

+      currentRequest = client.getAsync(face, buildLatestInterest(publisherId));

+      currentRequest.handle(this::handleResponse);

+    }

+

+    private Interest buildLatestInterest(long publisherId) {

+      Name name = PubSubNamespace.toPublisherName(prefix, publisherId);

+      Interest interest = new Interest(name); // TODO ms lifetime

+      interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);

+      return interest;

+    }

+

+    synchronized void next(long publisherId, long messageId) {

+      currentRequest = client.getAsync(face, buildNextInterest(publisherId, messageId));

+      currentRequest.handle(this::handleResponse);

+    }

+

+    private Interest buildNextInterest(long publisherId, long messageId) {

+      Name name = PubSubNamespace.toMessageName(prefix, publisherId, messageId + 1);

+      return new Interest(name); // TODO ms lifetime

+    }

+

+    private Void handleResponse(Data data, Throwable throwable) {

+      if (throwable != null) {

+        onError.on((Exception) throwable); // TODO avoid cast?

+      } else {

+        try {

+          Response response = PubSubNamespace.parseResponse(data);

+          this.messageId = response.messageId();

+          onMessage.on(response.content()); // TODO buffer and catch exceptions

+        } catch (EncodingException e) {

+          onError.on(e);

+        }

+

+        // TODO catchup and loop detection (if above fails next() is called again with same params)

+        next(this.publisherId, this.messageId);

+      }

+

+      return null;

+    }

+

+    @Override

+    public boolean equals(Object o) {

+      if (this == o) {

+        return true;

+      }

+      if (o == null || getClass() != o.getClass()) {

+        return false;

+      }

+      Subscription subscription = (Subscription) o;

+      return publisherId == subscription.publisherId;

+    }

+

+    @Override

+    public int hashCode() {

+      return (int) (publisherId ^ (publisherId >>> 32));

+    }

+

+    @Override

+    public String toString() {

+      return "Subscription{" + "publisherId=" + publisherId + '}';

+    }

+  }

+}
\ No newline at end of file
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
new file mode 100644
index 0000000..7b5a160
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
@@ -0,0 +1,49 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+import net.named_data.jndn.Name;

+import net.named_data.jndn.OnRegisterFailed;

+import net.named_data.jndn.OnRegisterSuccess;

+

+import java.util.concurrent.CompletableFuture;

+import java.util.logging.Logger;

+

+/**

+ * Helper for tracking and logging prefix registrations; TODO replace or remove this in the future

+ *

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+class OnRegistration implements OnRegisterFailed, OnRegisterSuccess {

+  private static final Logger LOGGER = Logger.getLogger(OnRegistration.class.getName());

+  private final CompletableFuture<Void> future;

+

+  OnRegistration(CompletableFuture<Void> future) {

+    this.future = future;

+  }

+

+  @Override

+  public void onRegisterSuccess(Name name, long l) {

+    LOGGER.info("Registered prefix: " + name);

+    future.complete(null);

+  }

+

+  @Override

+  public void onRegisterFailed(Name name) {

+    String message = "Failed to register prefix: " + name;

+    LOGGER.severe(message);

+    future.completeExceptionally(new RegistrationFailureException(message));

+  }

+}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
new file mode 100644
index 0000000..c0caddc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import com.intel.jndn.utils.impl.InMemoryContentStore;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Assemble the necessary elements for building the {@link Publisher} and {@link Subscriber} implementations
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class PubSubFactory {
+
+  private PubSubFactory() {
+    // do not instantiate this factory
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @param prefix the NDN namespace under which messages are published
+   * @param onMessage callback fired when a message is received
+   * @param onError callback fired when an error happens after subscription
+   * @return an open, publisher-discovering, message-retrieving subscriber
+   * @throws IOException if the initial subscription or announcement IO fails
+   */
+  public static Subscriber newSubscriber(Face face, Name prefix, On<Blob> onMessage, On<Exception> onError) throws IOException {
+    NdnAnnouncementService announcementService = new NdnAnnouncementService(face, prefix);
+    AdvancedClient client = new AdvancedClient();
+    NdnSubscriber subscriber = new NdnSubscriber(face, prefix, onMessage, onError, announcementService, client);
+    subscriber.open();
+    return subscriber;
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @param prefix the NDN namespace under which messages are published
+   * @return a group-announcing, unopened subscriber (it will automatically open on first publish)
+   */
+  public static Publisher newPublisher(Face face, Name prefix) {
+    long publisherId = Math.abs(new Random().nextLong());
+    return new NdnPublisher(face, prefix, publisherId, new NdnAnnouncementService(face, prefix), new BoundedInMemoryPendingInterestTable(1024), new InMemoryContentStore(2000));
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
new file mode 100644
index 0000000..53c8cbc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
@@ -0,0 +1,139 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+import net.named_data.jndn.Data;

+import net.named_data.jndn.Name;

+import net.named_data.jndn.encoding.EncodingException;

+import net.named_data.jndn.encoding.tlv.TlvDecoder;

+import net.named_data.jndn.encoding.tlv.TlvEncoder;

+import net.named_data.jndn.util.Blob;

+

+import java.util.Collections;

+

+/**

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+class PubSubNamespace {

+  static final int MESSAGE_ID_MARKER = 42;

+  static final int PUBLISHER_ID_MARKER = 43;

+  static final int ANNOUNCEMENT_ACTION_MARKER = 44;

+

+  static final int MESSAGE_MARKER = 100;

+

+  static final int MESSAGE_ATTRIBUTES_MARKER = 101;

+  static final int MESSAGE_CONTENT_MARKER = 102;

+

+  static final Name DEFAULT_BROADCAST_PREFIX = new Name("/ndn/broadcast");

+  public static final int MESSAGE_PAYLOAD_INITIAL_CAPACITY = 4096;

+

+  private PubSubNamespace() {

+    // do not instantiate this class

+  }

+

+  static Name.Component toPublisherComponent(long publisherId) {

+    return Name.Component.fromNumberWithMarker(publisherId, PubSubNamespace.PUBLISHER_ID_MARKER);

+  }

+

+  static Name toPublisherName(Name prefix, long publisherId) {

+    return new Name(prefix).append(toPublisherComponent(publisherId));

+  }

+

+  static long parsePublisher(Name name) throws EncodingException {

+    return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);

+  }

+

+  static Name toAnnouncement(Name prefix, long publisherId, Announcement announcement) {

+    Name.Component announcementComponent = Name.Component.fromNumberWithMarker(announcement.id, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);

+    return toPublisherName(prefix, publisherId).append(announcementComponent);

+  }

+

+  static Announcement parseAnnouncement(Name name) throws EncodingException {

+    return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));

+  }

+

+  static Name toMessageName(Name prefix, long publisherId, long messageId) {

+    Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);

+    return toPublisherName(prefix, publisherId).append(messageComponent);

+  }

+

+  static Name toMessageName(Name publisherPrefix, long messageId) {

+    Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);

+    return new Name(publisherPrefix).append(messageComponent);

+  }

+

+  static long parseMessageId(Name name) throws EncodingException {

+    return findMarkerFromEnd(name, MESSAGE_ID_MARKER);

+  }

+

+  static Blob toResponse(Blob attributes, Blob content) {

+    TlvEncoder encoder = new TlvEncoder(MESSAGE_PAYLOAD_INITIAL_CAPACITY);

+    int initialLength = encoder.getLength();

+

+    encoder.writeBlobTlv(MESSAGE_CONTENT_MARKER, content.buf()); // encode backwards, see Tlv0_1_1WireFormat.java

+    if(attributes != null && attributes.size() > 0){

+      encoder.writeBlobTlv(MESSAGE_ATTRIBUTES_MARKER, content.buf());

+    }

+    encoder.writeTypeAndLength(MESSAGE_MARKER, encoder.getLength() - initialLength);

+

+    return new Blob(encoder.getOutput(), false);

+  }

+

+  static Response parseResponse(Data data) throws EncodingException {

+    long messageId = parseMessageId(data.getName());

+    TlvDecoder decoder = new TlvDecoder(data.getContent().buf());

+    int endOffset = decoder.readNestedTlvsStart(MESSAGE_MARKER);

+    Blob attributes = new Blob(decoder.readOptionalBlobTlv(MESSAGE_ATTRIBUTES_MARKER, endOffset), true);

+    Blob content = new Blob(decoder.readBlobTlv(MESSAGE_CONTENT_MARKER), true);

+    return new Response(data.getName(), messageId, Collections.singletonMap("*", attributes), content);

+  }

+

+  private static long findMarkerFromEnd(Name name, int marker) throws EncodingException {

+    for (int i = name.size() - 1; i >= 0; i--) {

+      try {

+        return name.get(i).toNumberWithMarker(marker);

+      } catch (EncodingException e) {

+        // do nothing

+      }

+    }

+    throw new EncodingException("Failed to find marker " + marker + " in name: " + name.toUri());

+  }

+

+  enum Announcement {

+    ENTRANCE(0),

+    EXIT(1);

+

+    private final int id;

+

+    Announcement(int id) {

+      this.id = id;

+    }

+

+    static Announcement from(int value) {

+      for (Announcement a : Announcement.values()) {

+        if (a.id == value) return a;

+      }

+      return null;

+    }

+

+    int value() {

+      return id;

+    }

+

+    public int getValue() {

+      return id;

+    }

+  }

+}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java b/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java
new file mode 100644
index 0000000..aa7919d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java
@@ -0,0 +1,28 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+/**

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+class RegistrationFailureException extends Exception {

+  RegistrationFailureException(String message) {

+    super(message);

+  }

+

+  public RegistrationFailureException(Exception e) {

+    super(e);

+  }

+}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Response.java b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
new file mode 100644
index 0000000..29aa1cd
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
@@ -0,0 +1,56 @@
+/*

+ * jndn-utils

+ * Copyright (c) 2016, Intel Corporation.

+ *

+ * This program is free software; you can redistribute it and/or modify it

+ * under the terms and conditions of the GNU Lesser General Public License,

+ * version 3, as published by the Free Software Foundation.

+ *

+ * This program is distributed in the hope it will be useful, but WITHOUT ANY

+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

+ * more details.

+ */

+

+package com.intel.jndn.utils.pubsub;

+

+import net.named_data.jndn.Name;

+import net.named_data.jndn.util.Blob;

+

+import java.util.Map;

+

+/**

+ * Represent a publisher response to a subscriber request for a message. Note that the {@link #attributes} act as a type

+ * of header optionally prepended to the message content. The intent of this is to allow publishers to inform

+ * subscribers of publisher-side state changes without requiring a separate mechanism (with accompanying complexity and

+ * overhead); the full set of publisher state attributes should be served elsewhere, not in this class.

+ *

+ * @author Andrew Brown, andrew.brown@intel.com

+ */

+class Response {

+  private final Name name;

+  private final long messageId;

+  private final Map<String, Object> attributes;

+  private final Blob content;

+

+  Response(Name name, long messageId, Map<String, Object> attributes, Blob content) {

+    this.name = name;

+    this.messageId = messageId;

+    this.attributes = attributes;

+    this.content = content;

+  }

+

+  /**

+   * @return the message ID of the published message

+   */

+  long messageId() {

+    return messageId;

+  }

+

+  /**

+   * @return the content of the message

+   */

+  Blob content() {

+    return content;

+  }

+}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/package.html b/src/main/java/com/intel/jndn/utils/pubsub/package.html
new file mode 100644
index 0000000..562c542
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/package.html
@@ -0,0 +1,26 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
+<html>
+<body>
+<p>See [need link here] for algorithm details.</p>
+
+<p>The NDN pub-sub algorithm is designed using the following criteria:</p>
+<ul>
+    <li>subscriber-driven</li>
+    <li>lightweight</li>
+    <li>avoid unnecessary IO</li>
+</ul>
+
+<p>It makes the following guarantees:</p>
+<ul>
+    <li>each publisher will order its own messages by the order in which they were published; however, the mechanism
+        for this ordering (i.e. a monotonically-increasing ID) will not maintain the guarantee across multiple
+        publishers (in this case, an application-level mechanism, like a time-synchronized timestamp, must be used;
+        alternatively, try ChronoSync)
+    </li>
+    <li>publishers will provide the messages published on a topic for a specified lifetime; even after being closed,
+        the publisher must maintain a registered prefix and respond to subscriber requests for data
+    </li>
+    <li>subscribers will eventually discover all routable publishers</li>
+</ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java b/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
index eba700a..df0659f 100644
--- a/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
+++ b/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
@@ -14,9 +14,9 @@
 package com.intel.jndn.utils.repository.impl;
 
 /**
- * Thrown when a {@link Repository} cannot retrieve a stored packet.
+ * Thrown when a {@link com.intel.jndn.utils.Repository} cannot retrieve a stored packet.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class DataNotFoundException extends Exception {
 
diff --git a/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
index e40890d..bb259c1 100644
--- a/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
+++ b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
@@ -26,11 +26,11 @@
  * {@link net.named_data.jndn.util.MemoryContentCache} and borrows the matching
  * logic from there.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class ForLoopRepository implements Repository {
 
-  private List<Record> storage = new ArrayList<>();
+  private final List<Record> storage = new ArrayList<>();
 
   /**
    * {@inheritDoc}
@@ -95,7 +95,7 @@
    * components (e.g. c); if the Data is not longer than the Interest, return an
    * empty component.
    */
-  private Name.Component getNextComponentAfterLastInterestComponent(Data content, Interest interest) {
+  private static Name.Component getNextComponentAfterLastInterestComponent(Data content, Interest interest) {
     if (content.getName().size() > interest.getName().size()) {
       return content.getName().get(interest.getName().size());
     } else {
@@ -132,11 +132,7 @@
    * record is fresh
    */
   private boolean hasAcceptableFreshness(Interest interest, Record record) {
-    if (!interest.getMustBeFresh()) {
-      return true;
-    } else {
-      return isFresh(record);
-    }
+    return !interest.getMustBeFresh() || isFresh(record);
   }
 
   /**
@@ -171,10 +167,10 @@
    */
   private class Record {
 
-    public final Data data;
-    public final long addedAt;
+    final Data data;
+    final long addedAt;
 
-    public Record(Data data) {
+    Record(Data data) {
       this.data = data;
       this.addedAt = System.currentTimeMillis();
     }
diff --git a/src/main/java/com/intel/jndn/utils/server/DynamicServer.java b/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
index 60caf35..f8432ca 100644
--- a/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
@@ -14,27 +14,27 @@
 package com.intel.jndn.utils.server;
 
 import com.intel.jndn.utils.Server;
-import com.intel.jndn.utils.server.RespondWithData;
+
 import java.io.IOException;
 
 /**
- * Defines the API for a {@link Server} producing {@link Data} packets
- * dynamically; in other words, when an {@link Interest} arrives, this server
+ * Defines the API for a {@link Server} producing data packets
+ * dynamically; in other words, when an interest arrives, this server
  * will run a callback to determine what packet to send back. As good practice,
  * keep callback methods as short as possible.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface DynamicServer extends Server {
 
   /**
-   * Set the callback method to run when an {@link Interest} packet is passed to
-   * this server. This method should either return a {@link Data} packet that
+   * Set the callback method to run when an interest packet is passed to
+   * this server. This method should either return a data packet that
    * satisfies the Interest or throw an Exception to avoid sending. Calling this
    * method a second time should replace the callback.
    *
    * @param callback the callback instance
    * @throws java.io.IOException if the server fails to register a prefix
    */
-  public void respondUsing(RespondWithData callback) throws IOException;
+  void respondUsing(RespondWithData callback) throws IOException;
 }
diff --git a/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java b/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
index 09ebc23..a0db7e9 100644
--- a/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
@@ -14,15 +14,16 @@
 package com.intel.jndn.utils.server;
 
 import com.intel.jndn.utils.Server;
-import java.io.IOException;
 import net.named_data.jndn.Data;
 
+import java.io.IOException;
+
 /**
  * Defines the API for a {@link Server} producing {@link Data} packets and
  * storing them until they are requested; this server corresponds closely to use
  * cases such as: cache, file system, web server.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface RepositoryServer extends Server {
 
@@ -32,10 +33,10 @@
    * @param data the {@link Data} packet to store and serve
    * @throws IOException if the underlying server fails to store the packet
    */
-  public void serve(Data data) throws IOException;
+  void serve(Data data) throws IOException;
   
   /**
    * Clean up stale {@link Data} packets from the underlying content store.
    */
-  public void cleanup();
+  void cleanup();
 }
diff --git a/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java b/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
index fe7f74a..d83e1d6 100644
--- a/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
+++ b/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
@@ -20,9 +20,9 @@
 /**
  * Functional interface for serving data from Server.on()
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface RespondWithBlob {
 
-  public Blob onInterest(Name prefix, Interest interest) throws Exception;
+  Blob onInterest(Name prefix, Interest interest) throws Exception;
 }
diff --git a/src/main/java/com/intel/jndn/utils/server/RespondWithData.java b/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
index 95d0e6a..556d1d6 100644
--- a/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
+++ b/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
@@ -20,9 +20,9 @@
 /**
  * Functional interface for serving data from Server.on()
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public interface RespondWithData {
 
-  public Data onInterest(Name prefix, Interest interest) throws Exception;
+  Data onInterest(Name prefix, Interest interest) throws Exception;
 }
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
index 6be4280..38a4fde 100644
--- a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -13,17 +13,10 @@
  */
 package com.intel.jndn.utils.server.impl;
 
-import com.intel.jndn.utils.server.RepositoryServer;
-import com.intel.jndn.utils.repository.impl.ForLoopRepository;
 import com.intel.jndn.utils.Repository;
-import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
-import com.intel.jndn.utils.server.impl.ServerBaseImpl;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import com.intel.jndn.utils.impl.SegmentationHelper;
+import com.intel.jndn.utils.repository.impl.ForLoopRepository;
+import com.intel.jndn.utils.server.RepositoryServer;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -31,11 +24,18 @@
 import net.named_data.jndn.Name;
 import net.named_data.jndn.encoding.EncodingException;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 /**
  * Implementation of a {@link RepositoryServer} that segments packets stored in
  * its repository.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SegmentedServer extends ServerBaseImpl implements RepositoryServer {
 
@@ -58,9 +58,9 @@
       register();
     }
 
-    if (data.getContent().size() >= SegmentedServerHelper.DEFAULT_SEGMENT_SIZE) {
+    if (data.getContent().size() >= SegmentationHelper.DEFAULT_SEGMENT_SIZE) {
       InputStream stream = new ByteArrayInputStream(data.getContent().getImmutableArray());
-      List<Data> segments = SegmentedServerHelper.segment(data, stream);
+      List<Data> segments = SegmentationHelper.segment(data, stream);
       for (Data segment : segments) {
         logger.fine("Adding segment: " + segment.getName().toUri());
         repository.put(segment);
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java
deleted file mode 100644
index 78e3c27..0000000
--- a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.server.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-/**
- * Helper for segmenting an input stream into a list of Data packets. Current
- * use of the default segment size of 4096 (only for
- * {@link #segment(net.named_data.jndn.Data, java.io.InputStream)} is based on
- * several assumptions: NDN packet size was limited to 8000 at the time this was
- * written and signature size is unknown.
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedServerHelper {
-
-  public static final int DEFAULT_SEGMENT_SIZE = 4096;
-
-  /**
-   * Segment a stream of bytes into a list of Data packets; this must read all
-   * the bytes first in order to determine the end segment for FinalBlockId.
-   *
-   * @param template the {@link Data} packet to use for the segment {@link Name},
-   * {@link net.named_data.jndn.MetaInfo}, etc.
-   * @param bytes an {@link InputStream} to the bytes to segment
-   * @return a list of segmented {@link Data} packets
-   * @throws IOException if the stream fails
-   */
-  public static List<Data> segment(Data template, InputStream bytes) throws IOException {
-    return segment(template, bytes, DEFAULT_SEGMENT_SIZE);
-  }
-
-  /**
-   * Segment a stream of bytes into a list of Data packets; this must read all
-   * the bytes first in order to determine the end segment for FinalBlockId.
-   *
-   * @param template the {@link Data} packet to use for the segment {@link Name},
-   * {@link net.named_data.jndn.MetaInfo}, etc.
-   * @param bytes an {@link InputStream} to the bytes to segment
-   * @return a list of segmented {@link Data} packets
-   * @throws IOException if the stream fails
-   */
-  public static List<Data> segment(Data template, InputStream bytes, int segmentSize) throws IOException {
-    List<Data> segments = new ArrayList<>();
-    byte[] buffer_ = readAll(bytes);
-    int numBytes = buffer_.length;
-    int numPackets = (int) Math.ceil((double) numBytes / segmentSize);
-    ByteBuffer buffer = ByteBuffer.wrap(buffer_, 0, numBytes);
-    Name.Component lastSegment = Name.Component.fromNumberWithMarker(numPackets - 1, 0x00);
-
-    for (int i = 0; i < numPackets; i++) {
-      Data segment = new Data(template);
-      segment.getName().appendSegment(i);
-      segment.getMetaInfo().setFinalBlockId(lastSegment);
-      byte[] content = new byte[Math.min(segmentSize, buffer.remaining())];
-      buffer.get(content);
-      segment.setContent(new Blob(content));
-      segments.add(segment);
-    }
-
-    return segments;
-  }
-
-  /**
-   * Read all of the bytes in an input stream.
-   *
-   * @param bytes the {@link InputStream} of bytes to read
-   * @return an array of all bytes retrieved from the stream
-   * @throws IOException if the stream fails
-   */
-  public static byte[] readAll(InputStream bytes) throws IOException {
-    ByteArrayOutputStream builder = new ByteArrayOutputStream();
-    int read = bytes.read();
-    while (read != -1) {
-      builder.write(read);
-      read = bytes.read();
-    }
-    builder.flush();
-    bytes.close();
-    return builder.toByteArray();
-  }
-}
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
index 6110005..64130bd 100644
--- a/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
@@ -15,11 +15,6 @@
 
 import com.intel.jndn.utils.ProcessingStage;
 import com.intel.jndn.utils.Server;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.ForwardingFlags;
@@ -27,10 +22,16 @@
 import net.named_data.jndn.OnRegisterFailed;
 import net.named_data.jndn.encoding.EncodingException;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 /**
  * Base implementation for a {@link Server}.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public abstract class ServerBaseImpl implements Server {
 
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java b/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
index cee7d7f..1c1a643 100644
--- a/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
@@ -14,14 +14,8 @@
 package com.intel.jndn.utils.server.impl;
 
 import com.intel.jndn.utils.server.DynamicServer;
-import com.intel.jndn.utils.server.RespondWithData;
-import com.intel.jndn.utils.server.RespondWithBlob;
 import com.intel.jndn.utils.server.RespondWithBlob;
 import com.intel.jndn.utils.server.RespondWithData;
-import com.intel.jndn.utils.server.impl.ServerBaseImpl;
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -30,11 +24,15 @@
 import net.named_data.jndn.OnInterest;
 import net.named_data.jndn.util.Blob;
 
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 /**
  * Implementation of a {@link DynamicServer} that wraps the {@link OnInterest}
  * callback with some encoding and pipeline support.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SimpleServer extends ServerBaseImpl implements DynamicServer {
 
diff --git a/src/test/java/com/intel/jndn/utils/TestHelper.java b/src/test/java/com/intel/jndn/utils/TestHelper.java
index a73aba4..72a96d7 100644
--- a/src/test/java/com/intel/jndn/utils/TestHelper.java
+++ b/src/test/java/com/intel/jndn/utils/TestHelper.java
@@ -14,6 +14,15 @@
 package com.intel.jndn.utils;
 
 import com.intel.jndn.mock.MockKeyChain;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.security.KeyChain;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.transport.UdpTransport;
+import net.named_data.jndn.util.Blob;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,19 +35,11 @@
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.encoding.EncodingException;
-import net.named_data.jndn.security.KeyChain;
-import net.named_data.jndn.security.SecurityException;
-import net.named_data.jndn.transport.UdpTransport;
-import net.named_data.jndn.util.Blob;
 
 /**
  * Collect assorted methods to help with testing
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class TestHelper {
 
@@ -104,7 +105,7 @@
 
     public ScheduledExecutorService executor;
     public KeyChain keyChain;
-    public List<Face> faces = new ArrayList<>();
+    public final List<Face> faces = new ArrayList<>();
   }
 
   public static class EventProcessor implements Runnable {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
index 45572a7..d778a40 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -15,16 +15,7 @@
 
 import com.intel.jndn.utils.Client;
 import com.intel.jndn.utils.TestHelper;
-import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import com.intel.jndn.utils.impl.SegmentationHelper;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -34,11 +25,21 @@
 import net.named_data.jndn.security.SecurityException;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * Test AdvancedClient.java; requires a hostname to an NFD accepting a generated
  * key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class AdvancedClientFileTestIT {
 
@@ -68,7 +69,7 @@
     long startTime = System.currentTimeMillis();
     AdvancedClient client = new AdvancedClient();
     List<CompletableFuture<Data>> requests = expressInterests(client, consumer, PREFIX, NUM_MESSAGES);
-    List<Data> datas = requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
+    requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
     long endTime = System.currentTimeMillis();
 
     logger.info(String.format("Transfered %d bytes in %d ms", MESSAGE_SIZE_BYTES * NUM_MESSAGES, endTime - startTime));
@@ -106,7 +107,7 @@
       data.getMetaInfo().setFreshnessPeriod(0);
 
       try {
-        for (Data segment : SegmentedServerHelper.segment(data, bytes, segmentSize)) {
+        for (Data segment : SegmentationHelper.segment(data, bytes, segmentSize)) {
           logger.log(Level.INFO, "Put data: " + segment.getName().toUri());
           face.putData(segment);
         }
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
index 6a99599..b09ae6f 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -15,16 +15,7 @@
 
 import com.intel.jndn.utils.Client;
 import com.intel.jndn.utils.TestHelper;
-import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import com.intel.jndn.utils.impl.SegmentationHelper;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -34,11 +25,21 @@
 import net.named_data.jndn.security.SecurityException;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * Test AdvancedClient.java; requires a hostname to an NFD accepting a generated
  * key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class AdvancedClientStressTestIT {
 
@@ -68,7 +69,7 @@
     long startTime = System.currentTimeMillis();
     AdvancedClient client = new AdvancedClient();
     List<CompletableFuture<Data>> requests = expressInterests(client, consumer, PREFIX, NUM_MESSAGES);
-    List<Data> datas = requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
+    requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
     long endTime = System.currentTimeMillis();
 
     logger.info(String.format("Transfered %d bytes in %d ms", MESSAGE_SIZE_BYTES * NUM_MESSAGES, endTime - startTime));
@@ -106,7 +107,7 @@
       data.getMetaInfo().setFreshnessPeriod(0);
 
       try {
-        for (Data segment : SegmentedServerHelper.segment(data, bytes, segmentSize)) {
+        for (Data segment : SegmentationHelper.segment(data, bytes, segmentSize)) {
           logger.log(Level.INFO, "Put data: " + segment.getName().toUri());
           face.putData(segment);
         }
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
index 3ee3bb8..2462f23 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -17,6 +17,7 @@
 import com.intel.jndn.mock.MockForwarder;
 import com.intel.jndn.utils.TestHelper;
 import com.intel.jndn.utils.client.SegmentationType;
+import com.intel.jndn.utils.impl.SegmentationHelper;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -70,7 +71,7 @@
 
     Face face = forwarder.connect();
     face.registerPrefix(name, new OnInterestCallback() {
-      private int max = 9;
+      private final int max = 9;
 
       @Override
       public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/BackoffRetryClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/BackoffRetryClientTest.java
new file mode 100644
index 0000000..f070911
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/BackoffRetryClientTest.java
@@ -0,0 +1,65 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.client.impl;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnTimeout;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static junit.framework.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BackoffRetryClientTest {
+
+  private BackoffRetryClient instance;
+
+  @Before
+  public void before() {
+    instance = new BackoffRetryClient(10, 2);
+  }
+
+  @Test
+  public void retry() throws Exception {
+    Face face = mock(Face.class);
+    ArgumentCaptor<Interest> interestCaptor = ArgumentCaptor.forClass(Interest.class);
+    ArgumentCaptor<OnTimeout> onTimeoutCaptor = ArgumentCaptor.forClass(OnTimeout.class);
+    when(face.expressInterest(interestCaptor.capture(), any(), onTimeoutCaptor.capture())).then(new Answer<Long>() {
+      @Override
+      public Long answer(InvocationOnMock invocation) throws Throwable {
+        onTimeoutCaptor.getValue().onTimeout(interestCaptor.getValue());
+        return -1L;
+      }
+    });
+    Interest interest = new Interest(new Name("/backoff/test"), 1);
+    AtomicInteger timeouts = new AtomicInteger();
+
+    instance.retry(face, interest, null, interest1 -> timeouts.incrementAndGet());
+
+    assertEquals(1, timeouts.get());
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
index 39b6586..5d15a89 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
@@ -14,16 +14,18 @@
 package com.intel.jndn.utils.client.impl;
 
 import com.intel.jndn.utils.TestHelper;
-import java.util.concurrent.ExecutionException;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Name;
-import static org.junit.Assert.*;
 import org.junit.Test;
 
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+
 /**
  * Test that segment data packets are re-assembled correctly.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class DataAssemblerTest {
 
@@ -71,6 +73,6 @@
     byte marker = 0x00;
     DataAssembler instance = new DataAssembler(packets, marker);
 
-    Data reassembled = instance.assemble();
+    instance.assemble();
   }
 }
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
index f529b58..c5f93b2 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
@@ -34,9 +34,9 @@
 
   private static final double INTEREST_LIFETIME_MS = 1.0;
   private DefaultRetryClient client;
-  private Name name = new Name("/test/retry/client");
-  private Interest interest = new Interest(name, INTEREST_LIFETIME_MS);
-  private TestCounter counter = new TestCounter();
+  private final Name name = new Name("/test/retry/client");
+  private final Interest interest = new Interest(name, INTEREST_LIFETIME_MS);
+  private final TestCounter counter = new TestCounter();
 
   @Before
   public void before() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
index 05c85f5..e4082c3 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
@@ -34,7 +34,7 @@
  */
 public class DefaultSegmentedClientTest {
 
-  private DefaultSegmentedClient instance = new DefaultSegmentedClient();
+  private final DefaultSegmentedClient instance = new DefaultSegmentedClient();
 
   @Test
   public void testGetSegmentsAsync() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
index aaf43df..7497f72 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
@@ -15,18 +15,21 @@
 
 import com.intel.jndn.utils.TestHelper;
 import com.intel.jndn.utils.client.OnException;
-import java.io.InputStream;
 import net.named_data.jndn.Name;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class DefaultStreamingClientTest {
 
-  DefaultStreamingClient instance = new DefaultStreamingClient();
+  private final DefaultStreamingClient instance = new DefaultStreamingClient();
 
   @Test
   public void testGetAsyncStream() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java b/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
index 2f165b9..a8203a8 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
@@ -14,24 +14,26 @@
 package com.intel.jndn.utils.client.impl;
 
 import com.intel.jndn.utils.TestHelper;
-import java.util.ArrayList;
-import java.util.stream.IntStream;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.Name;
 import net.named_data.jndn.Name.Component;
 import net.named_data.jndn.encoding.EncodingException;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.stream.IntStream;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import org.junit.Test;
 
 /**
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SegmentedDataStreamTest {
 
-  SegmentedDataStream instance = new SegmentedDataStream();
+  private final SegmentedDataStream instance = new SegmentedDataStream();
 
   @Test
   public void testAddingSequentialData() throws StreamException {
@@ -131,7 +133,7 @@
     assertEquals(7, instance.list().length);
   }
 
-  protected void addPacketToInstance(long i) {
+  private void addPacketToInstance(long i) {
     Name name = new Name().appendSegment(i);
     instance.onData(new Interest(name), new Data(name));
   }
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
index 4e529be..fc5ac4b 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
@@ -36,7 +36,7 @@
 /**
  * Test SimpleClient.java
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SimpleClientTest {
 
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
index f49525b..67f5ce7 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
@@ -13,14 +13,7 @@
  */
 package com.intel.jndn.utils.client.impl;
 
-import com.intel.jndn.utils.client.impl.SimpleClient;
 import com.intel.jndn.utils.TestHelper;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -30,24 +23,31 @@
 import net.named_data.jndn.security.SecurityException;
 import net.named_data.jndn.util.Blob;
 import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import org.junit.Test;
 
 /**
  * Test SimpleClient.java; requires a hostname to an NFD accepting a generated
  * key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SimpleClientTestIT {
 
   private static final Logger logger = Logger.getLogger(SimpleClientTestIT.class.getName());
   private static final Name PREFIX_RETRIEVE = new Name("/test/simple-client/retrieve-one").append(TestHelper.buildRandomString(10));
   private static final Name PREFIX_RETRIEVE_MULTIPLE = new Name("/test/simple-client/retrieve-multiple").append(TestHelper.buildRandomString(10));
-
-  SimpleClient instance;
   private final TestHelper.NdnEnvironment environment;
+  private SimpleClient instance;
 
   public SimpleClientTestIT() throws SecurityException {
     String ip = System.getProperty("nfd.ip");
@@ -116,7 +116,7 @@
 
   private class DataServer implements OnInterestCallback {
 
-    private Data data;
+    private final Data data;
 
     public DataServer(Data data) {
       this.data = data;
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
new file mode 100644
index 0000000..739e215
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTableTest {
+
+  private BoundedInMemoryPendingInterestTable instance;
+
+  @Before
+  public void before() {
+    instance = new BoundedInMemoryPendingInterestTable(5);
+  }
+
+  @Test
+  public void add() throws Exception {
+    Name name = new Name("/a/b/c");
+    instance.add(new Interest(name));
+    assertTrue(instance.has(name));
+  }
+
+  @Test
+  public void has() throws Exception {
+    Name name = new Name("/a/b/c");
+    instance.add(new Interest(name));
+
+    Interest interest = new Interest(new Name("/a/b"));
+    interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+    assertTrue(instance.has(interest));
+  }
+
+  @Test
+  public void extract() throws Exception {
+    instance.add(new Interest(new Name("/a")));
+    instance.add(new Interest(new Name("/a/b")));
+    instance.add(new Interest(new Name("/a/b/c")));
+
+    Collection<Interest> extracted = instance.extract(new Name("/a/b"));
+    assertEquals(2, extracted.size()); // TODO not sure about this...
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
new file mode 100644
index 0000000..d4ca544
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
@@ -0,0 +1,177 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test BoundedLinkedMapTest
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedLinkedMapTest {
+  private static final Logger LOGGER = Logger.getLogger(BoundedLinkedMapTest.class.getName());
+  private BoundedLinkedMap<String, Object> instance;
+
+  @Before
+  public void beforeTest() {
+    instance = new BoundedLinkedMap<>(2);
+  }
+
+  @Test
+  public void testUsage() {
+    Object object0 = new Object();
+    Object object1 = new Object();
+    Object object2 = new Object();
+
+    instance.put("0", object0);
+    assertEquals(1, instance.size());
+
+    instance.put("1", object1);
+    assertEquals(2, instance.size());
+
+    instance.put("2", object2);
+    assertEquals(2, instance.size());
+
+    assertNull(instance.get("0"));
+    assertEquals("2", instance.latest());
+  }
+
+  @Test
+  public void testEarliestLatest() {
+    assertNull(instance.earliest());
+    assertNull(instance.latest());
+
+    instance.put(".", new Object());
+    assertEquals(instance.earliest(), instance.latest());
+
+    instance.put("..", new Object());
+    assertEquals(".", instance.earliest());
+    assertEquals("..", instance.latest());
+
+    instance.put("...", new Object());
+    assertEquals("..", instance.earliest());
+    assertEquals("...", instance.latest());
+  }
+
+  @Test
+  public void testIsEmpty() {
+    assertTrue(instance.isEmpty());
+    instance.put("...", new Object());
+    assertFalse(instance.isEmpty());
+  }
+
+  @Test
+  public void testContainsKey() {
+    assertFalse(instance.containsKey("..."));
+    instance.put("...", new Object());
+    assertTrue(instance.containsKey("..."));
+  }
+
+  @Test
+  public void testContainsValue() {
+    Object o = new Object();
+    assertFalse(instance.containsValue(o));
+    instance.put("...", o);
+    assertTrue(instance.containsValue(o));
+  }
+
+  @Test
+  public void testRemove() {
+    Object o = new Object();
+    String key = "...";
+
+    instance.put(key, o);
+    assertTrue(instance.containsKey(key));
+    assertTrue(instance.containsValue(o));
+
+    instance.remove(key);
+    assertFalse(instance.containsKey(key));
+    assertFalse(instance.containsValue(o));
+    assertEquals(0, instance.size());
+  }
+
+  @Test
+  public void testPutAll() {
+    HashMap<String, Object> map = new HashMap<>();
+    map.put("1", new Object());
+    map.put("2", new Object());
+    map.put("99", new Object());
+
+    instance.putAll(map);
+    LOGGER.log(Level.FINE, "Map passed to putAll(): {0}", map.toString());
+    LOGGER.log(Level.FINE, "Resulting bounded map after putAll(): {0}", instance.toString());
+
+    assertEquals(2, instance.size()); // note: this is not 3 because the max size is bounded
+    assertEquals("2", instance.latest()); // note: put all doesn't do the FIFO replacement
+  }
+
+  @Test
+  public void testClear() {
+    instance.put("...", new Object());
+
+    instance.clear();
+
+    assertEquals(0, instance.size());
+    assertNull(instance.get("..."));
+    assertNull(instance.latest());
+    assertNull(instance.earliest());
+  }
+
+  @Test
+  public void testConversions() {
+    instance.put("...", new Object());
+
+    assertEquals(1, instance.keySet().size());
+    assertEquals(1, instance.values().size());
+    assertEquals(1, instance.entrySet().size());
+  }
+
+  @Test
+  public void testPerformanceAgainstArrayList() {
+    int numMessages = 10000;
+    BoundedLinkedMap<Integer, Object> map = new BoundedLinkedMap<>(numMessages);
+    ArrayList<Object> list = new ArrayList<>(numMessages);
+
+    long mapPutTime = measure(numMessages, i -> map.put(i, new Object()));
+    long listPutTime = measure(numMessages, i -> list.add(i, new Object()));
+    LOGGER.log(Level.FINE, "Custom map put has overhead of {0}% versus list put", toPercent((mapPutTime - listPutTime) / (double) listPutTime));
+
+    long mapGetTime = measure(numMessages, map::get);
+    long listGetTime = measure(numMessages, list::get);
+    LOGGER.log(Level.FINE, "Custom map get has overhead of {0}% versus list get", toPercent((mapGetTime - listGetTime) / (double) listPutTime));
+  }
+
+  private long measure(int numTimes, Consumer<Integer> work) {
+    long start = System.nanoTime();
+    for (int i = 0; i < numTimes; i++) {
+      work.accept(i);
+    }
+    return System.nanoTime() - start;
+  }
+
+  private double toPercent(double number) {
+    return Math.round(number * 100);
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/DefaultNameTreeTest.java b/src/test/java/com/intel/jndn/utils/impl/DefaultNameTreeTest.java
new file mode 100644
index 0000000..9af62c9
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/DefaultNameTreeTest.java
@@ -0,0 +1,91 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class DefaultNameTreeTest {
+
+  private NameTree<String> instance;
+
+  @Before
+  public void setUp() throws Exception {
+    instance = DefaultNameTree.newRootTree();
+    instance.insert(new Name("/a/b/c"), ".");
+    instance.insert(new Name("/a/b/d"), "..");
+    instance.insert(new Name("/a/e"), "...");
+  }
+
+  @Test
+  public void content() throws Exception {
+    // TODO
+  }
+
+  @Test
+  public void retrieveName() throws Exception {
+    assertEquals("/a", instance.find(new Name("/a")).get().fullName().toString());
+    assertEquals("/a/b", instance.find(new Name("/a/b")).get().fullName().toString());
+    assertEquals("/a/b/c", instance.find(new Name("/a/b/c")).get().fullName().toString());
+  }
+
+  @Test
+  public void children() throws Exception {
+    // TODO
+  }
+
+  @Test
+  public void parent() throws Exception {
+    assertNull(instance.parent());
+  }
+
+  @Test
+  public void find() throws Exception {
+    assertEquals(2, instance.find(new Name("/a/b")).get().children().size());
+  }
+
+  @Test
+  public void findDeep() throws Exception {
+    instance.insert(new Name("/a/e/x/y/z"), "...");
+    assertEquals(2, instance.find(new Name("/a")).get().children().size());
+    assertEquals(1, instance.find(new Name("/a/e")).get().children().size());
+    assertEquals(1, instance.find(new Name("/a/e/x")).get().children().size());
+    assertEquals(1, instance.find(new Name("/a/e/x/y")).get().children().size());
+    assertEquals(0, instance.find(new Name("/a/e/x/y/z")).get().children().size());
+  }
+
+  @Test
+  public void insert() throws Exception {
+    // TODO
+  }
+
+  @Test
+  public void delete() throws Exception {
+    // TODO
+  }
+
+  @Test
+  public void count() throws Exception {
+    // TODO
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/InMemoryContentStoreTest.java b/src/test/java/com/intel/jndn/utils/impl/InMemoryContentStoreTest.java
new file mode 100644
index 0000000..449af17
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/InMemoryContentStoreTest.java
@@ -0,0 +1,92 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.mock.MockFace;
+import com.intel.jndn.utils.ContentStore;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class InMemoryContentStoreTest {
+  private final ContentStore instance = new InMemoryContentStore(1000);
+
+  @Test
+  public void basicUsage() throws Exception {
+    instance.put(new Name("/a"), new Blob("."));
+
+    assertTrue(instance.has(new Name("/a")));
+    assertArrayEquals(".".getBytes(), instance.get(new Name("/a")).get().getImmutableArray());
+  }
+
+  @Ignore // TODO need bounds on NameTree
+  @Test
+  public void replacement() throws Exception {
+    instance.put(new Name("/a"), new Blob("."));
+    instance.put(new Name("/a/b"), new Blob(".."));
+    instance.put(new Name("/a/b/c"), new Blob("..."));
+    instance.put(new Name("/a/b/c/d"), new Blob("...."));
+    instance.put(new Name("/a/b/c/d/e"), new Blob("...."));
+    assertTrue(instance.has(new Name("/a")));
+
+    instance.put(new Name("/replace/oldest"), new Blob("."));
+
+    assertFalse(instance.has(new Name("/a")));
+    assertTrue(instance.has(new Name("/replace/oldest")));
+  }
+
+  @Test
+  public void push() throws Exception {
+    MockFace face = new MockFace();
+    Name name = new Name("/a");
+    instance.put(name, new Blob("."));
+
+    instance.push(face, name);
+
+    assertEquals(1, face.sentData.size());
+    assertEquals(name.appendSegment(0), face.sentData.get(0).getName()); // TODO this should probably be smarter and avoid appending segments if not needed
+  }
+
+  @Test
+  public void clear() throws Exception {
+    instance.put(new Name("/a"), new Blob("."));
+    assertTrue(instance.has(new Name("/a")));
+
+    instance.clear();
+
+    assertFalse(instance.has(new Name("/a")));
+  }
+
+  @Test
+  public void retrieveWithSelectors() throws Exception {
+    instance.put(new Name("/a/1"), new Blob("."));
+    instance.put(new Name("/a/2"), new Blob(".."));
+    instance.put(new Name("/a/3"), new Blob("..."));
+
+    Interest interest1 = new Interest(new Name("/a")).setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+    assertTrue(instance.has(interest1));
+    assertEquals("...", instance.get(interest1).get().toString());
+
+    Interest interest2 = new Interest(new Name("/a")).setChildSelector(Interest.CHILD_SELECTOR_LEFT);
+    assertEquals(".", instance.get(interest2).get().toString());
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/SegmentationHelperTest.java b/src/test/java/com/intel/jndn/utils/impl/SegmentationHelperTest.java
new file mode 100644
index 0000000..f9d5384
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/SegmentationHelperTest.java
@@ -0,0 +1,112 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test the SegmentationHelper
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class SegmentationHelperTest {
+
+  private static final byte MARKER = 0x0F;
+
+  @Test
+  public void testSegmentation() throws Exception {
+    final Data template = new Data(new Name("/segmented/data"));
+    final InputStream content = new ByteArrayInputStream("0123456789".getBytes());
+    List<Data> segments = SegmentationHelper.segment(template, content, 1);
+    assertEquals(10, segments.size());
+
+    // test first packet
+    assertEquals(0, segments.get(0).getName().get(-1).toSegment());
+    assertEquals(9, segments.get(0).getMetaInfo().getFinalBlockId().toSegment());
+    assertEquals("0", segments.get(0).getContent().toString());
+
+    // test last packet
+    assertEquals(9, segments.get(9).getName().get(-1).toSegment());
+    assertEquals(9, segments.get(9).getMetaInfo().getFinalBlockId().toSegment());
+    assertEquals("9", segments.get(9).getContent().toString());
+  }
+
+  @Test
+  public void testSegmentationDifferentSizes() throws Exception {
+    final Data template = new Data(new Name("/segmented/data"));
+
+    // size 2
+    final InputStream content2 = new ByteArrayInputStream("0123456789".getBytes());
+    List<Data> segments2 = SegmentationHelper.segment(template, content2, 2);
+    assertEquals(5, segments2.size());
+    assertEquals("89", segments2.get(4).getContent().toString());
+
+    // size 3
+    final InputStream content3 = new ByteArrayInputStream("0123456789".getBytes());
+    List<Data> segments3 = SegmentationHelper.segment(template, content3, 3);
+    assertEquals(4, segments3.size());
+    assertEquals("9", segments3.get(3).getContent().toString());
+
+    // size 4
+    final InputStream content4 = new ByteArrayInputStream("0123456789".getBytes());
+    List<Data> segments4 = SegmentationHelper.segment(template, content4, 4);
+    assertEquals(3, segments4.size());
+    assertEquals("89", segments4.get(2).getContent().toString());
+  }
+
+  @Test
+  public void isSegmented() {
+    Name.Component component = Name.Component.fromNumberWithMarker(42, MARKER);
+    assertTrue(SegmentationHelper.isSegmented(new Name("/segmented/data").append(component), MARKER));
+  }
+
+  @Test
+  public void isNotSegmented() {
+    assertFalse(SegmentationHelper.isSegmented(new Name("/segmented/data"), MARKER));
+  }
+
+  @Test
+  public void parseSegment() throws Exception {
+    Name.Component component = Name.Component.fromNumberWithMarker(42, MARKER);
+    assertEquals(42, SegmentationHelper.parseSegment(new Name("/segmented/data").append(component), MARKER));
+  }
+
+  @Test(expected = EncodingException.class)
+  public void parseMissingSegment() throws Exception {
+    SegmentationHelper.parseSegment(new Name("/segmented/data"), MARKER);
+  }
+
+  @Test
+  public void removeSegment() throws Exception {
+    Name unsegmentedName = new Name("/name");
+    Name segmentedName = new Name(unsegmentedName).append(Name.Component.fromNumberWithMarker(42, MARKER));
+    assertEquals(unsegmentedName, SegmentationHelper.removeSegment(segmentedName, MARKER));
+  }
+
+  @Test
+  public void removeNoSegment() throws Exception {
+    Name unsegmentedName = new Name("/unsegmented/name");
+    assertEquals(unsegmentedName, SegmentationHelper.removeSegment(unsegmentedName, MARKER));
+  }
+}
diff --git a/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java b/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
index 763c361..495e307 100644
--- a/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
+++ b/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
@@ -14,17 +14,19 @@
 package com.intel.jndn.utils.processing.impl;
 
 import com.intel.jndn.utils.client.impl.AdvancedClient;
-import java.util.logging.Logger;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Name;
 import net.named_data.jndn.util.Blob;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test {@link CompressionStage}.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class CompressionStageTest {
 
diff --git a/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java b/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
index 107e6c1..e118fe2 100644
--- a/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
+++ b/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
@@ -20,16 +20,17 @@
 import net.named_data.jndn.security.SecurityException;
 import net.named_data.jndn.util.Blob;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test {@link SigningStage}.
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SigningStageTest {
 
-  KeyChain keyChain;
-  SigningStage instance;
+  private KeyChain keyChain;
+  private SigningStage instance;
 
   public SigningStageTest() throws SecurityException {
     keyChain = MockKeyChain.configure(new Name("/test/signer"));
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
new file mode 100644
index 0000000..65bd063
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
@@ -0,0 +1,74 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnAnnouncementServiceTest {
+
+  private NdnAnnouncementService instance;
+  private Face face;
+
+  @Before
+  public void before() {
+    face = mock(Face.class);
+    instance = new NdnAnnouncementService(face, new Name("/topic/prefix"));
+  }
+
+  @Test
+  public void announceEntrance() throws Exception {
+    instance.announceEntrance(42);
+
+    ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+    verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+    assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+    assertEquals(PubSubNamespace.Announcement.ENTRANCE, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+  }
+
+  @Test
+  public void announceExit() throws Exception {
+    instance.announceExit(42);
+
+    ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+    verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+    assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+    assertEquals(PubSubNamespace.Announcement.EXIT, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+  }
+
+  @Test
+  public void discoverExistingAnnouncements() throws Exception {
+
+  }
+
+  @Test
+  public void observeNewAnnouncements() throws Exception {
+
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
new file mode 100644
index 0000000..acd18c1
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
@@ -0,0 +1,166 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MeasurableFace;
+import com.intel.jndn.mock.MockForwarder;
+import com.intel.jndn.utils.impl.InMemoryContentStore;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnPublisherTest {
+  private static final Logger LOGGER = Logger.getLogger(NdnPublisherTest.class.getName());
+  private static final Name PUBLISHER_PREFIX = new Name("/publisher");
+  private static final long PUBLISHER_ID = new Random().nextLong();
+  private NdnPublisher instance;
+  private Face face;
+  private MockForwarder forwarder;
+
+  @Before
+  public void before() throws Exception {
+    forwarder = new MockForwarder();
+    face = forwarder.connect();
+    NdnAnnouncementService announcementService = new NdnAnnouncementService(face, PUBLISHER_PREFIX);
+    BoundedInMemoryPendingInterestTable pendingInterestTable = new BoundedInMemoryPendingInterestTable(1024);
+    InMemoryContentStore contentStore = new InMemoryContentStore(2000);
+    instance = new NdnPublisher(face, PUBLISHER_PREFIX, PUBLISHER_ID, announcementService, pendingInterestTable, contentStore);
+
+    driveFace(face); // TODO preferably do this in MockForwarder; causes failures if run in certain orders
+  }
+
+  @Test
+  public void basicUsage() throws Exception {
+    instance.open();
+    assertEquals(1, numSentInterests()); // doesn't count prefix registration, only announcement
+
+    instance.publish(new Blob("..."));
+
+    instance.close();
+    assertEquals(2, numSentInterests());
+    assertEquals(0, numReceivedDatas());
+  }
+
+  @Test
+  public void closeWithoutOpen() throws Exception {
+    instance.close();
+
+    assertEquals(0, numSentInterests()); // if the publisher has not been opened, it will not announce an exit
+    assertEquals(0, numReceivedDatas());
+  }
+
+  @Test
+  public void publish() throws Exception {
+    instance.open();
+    assertEquals(1, numSentInterests());
+
+    instance.publish(new Blob("..."));
+    assertEquals(0, numSentDatas());
+    assertEquals(1, numSentInterests()); // none more than from opening
+  }
+
+  @Test
+  public void publishWithPendingInterest() throws Exception {
+    instance.open();
+    assertEquals(1, numSentInterests());
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Face client = forwarder.connect();
+    client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    instance.publish(new Blob("..."));
+    assertEquals(1, numSentDatas());
+    assertEquals(1, numSentInterests()); // none more than from opening
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    latch.await(1, TimeUnit.SECONDS);
+    assertEquals(0, latch.getCount());
+  }
+
+  @Test
+  public void publishAndRespondToNewInterest() throws Exception {
+    Face client = forwarder.connect();
+    instance.open();
+    assertEquals(1, numSentInterests());
+
+    instance.publish(new Blob("..."));
+    assertEquals(0, numSentDatas());
+    assertEquals(1, numSentInterests()); // none more than from opening
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    CountDownLatch latch = new CountDownLatch(1);
+    client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    latch.await(1, TimeUnit.SECONDS);
+    assertEquals(0, latch.getCount());
+  }
+
+  private int numSentDatas() {
+    return ((MeasurableFace) face).sentDatas().size();
+  }
+
+  private int numReceivedDatas() {
+    return ((MeasurableFace) face).receivedDatas().size();
+  }
+
+  private int numSentInterests() {
+    return ((MeasurableFace) face).sentInterests().size();
+  }
+
+  private void driveFace(Face face) {
+    ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
+    pool.schedule(() -> {
+      try {
+        face.processEvents();
+      } catch (IOException | EncodingException e) {
+        LOGGER.log(Level.SEVERE, "Failed to process face events", e);
+        fail(e.getMessage());
+      }
+    }, 50, TimeUnit.MILLISECONDS);
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
new file mode 100644
index 0000000..5370f29
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
@@ -0,0 +1,92 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Client;
+import com.intel.jndn.utils.On;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnSubscriberTest {
+
+  private static final Name TOPIC_NAME = new Name("/subscriber");
+  private NdnSubscriber instance;
+  private AnnouncementService announcementService;
+
+  @Before
+  public void before() {
+    Face face = mock(Face.class);
+    announcementService = mock(AnnouncementService.class);
+    Client client = mock(Client.class);
+    when(client.getAsync(any(Face.class), any(Interest.class))).thenReturn(new CompletableFuture<>());
+    instance = new NdnSubscriber(face, TOPIC_NAME, null, null, announcementService, client);
+  }
+
+  @Test
+  public void basicUsage() throws Exception {
+    instance.open();
+
+    // subscriber is driven by IO from announcements and publisher messages
+
+    instance.close();
+  }
+
+  @Test
+  public void knownPublishersUsingCallbacks() throws Exception {
+    ArgumentCaptor<On<Long>> existingPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+    ArgumentCaptor<On<Long>> newPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+    when(announcementService.discoverExistingAnnouncements(existingPublishersSignal.capture(), any(), any())).thenReturn(null);
+    when(announcementService.observeNewAnnouncements(newPublishersSignal.capture(), any(), any())).thenReturn(null);
+    instance.open();
+
+    assertEquals(0, instance.knownPublishers().size());
+
+    existingPublishersSignal.getValue().on(42L);
+    assertEquals(1, instance.knownPublishers().size());
+
+    newPublishersSignal.getValue().on(42L);
+    assertEquals(1, instance.knownPublishers().size());
+
+    existingPublishersSignal.getValue().on(99L);
+    assertEquals(2, instance.knownPublishers().size());
+  }
+
+  @Test
+  public void knownPublishersUsingMethods() throws Exception {
+    instance.addPublisher(99);
+
+    assertEquals(1, instance.knownPublishers().size());
+    assertTrue(instance.knownPublishers().contains(99L));
+
+    instance.removePublisher(99);
+
+    assertEquals(0, instance.knownPublishers().size());
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/PubSubTestIT.java b/src/test/java/com/intel/jndn/utils/pubsub/PubSubTestIT.java
new file mode 100644
index 0000000..77c93f6
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/PubSubTestIT.java
@@ -0,0 +1,84 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MockKeyChain;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Topic;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.ThreadPoolFace;
+import net.named_data.jndn.security.KeyChain;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.transport.AsyncTcpTransport;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class PubSubTestIT {
+  private static final Logger LOGGER = Logger.getLogger(PubSubTestIT.class.getName());
+  private Face pubFace;
+  private Face subFace;
+
+  @Before
+  public void before() throws Exception {
+    String hostname = System.getProperty("nfd.ip");
+    LOGGER.info("Testing on NFD at: " + hostname);
+    pubFace = connect(hostname);
+    subFace = connect(hostname);
+  }
+
+  @Test
+  public void basicUsage() throws Exception {
+    Topic topic = new Topic(new Name("/pub/sub/topic"));
+    CountDownLatch latch = new CountDownLatch(1);
+
+    topic.subscribe(pubFace, b -> latch.countDown(), e -> fail(e.getMessage()));
+
+    Publisher publisher = topic.newPublisher(subFace);
+    publisher.publish(new Blob("."));
+    publisher.publish(new Blob(".."));
+    publisher.publish(new Blob("..."));
+
+    latch.await(20, TimeUnit.SECONDS);
+    assertEquals(0, latch.getCount());
+  }
+
+  private Face connect(String hostName) throws SecurityException {
+    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
+    AsyncTcpTransport transport = new AsyncTcpTransport(pool);
+    AsyncTcpTransport.ConnectionInfo connectionInfo = new AsyncTcpTransport.ConnectionInfo(hostName, 6363, true);
+    ThreadPoolFace face = new ThreadPoolFace(pool, transport, connectionInfo);
+
+    Name signatureName = new Name("/topic/test/it").appendVersion(new Random().nextLong()); // note that using the same signature name seemed to cause registration failures
+    KeyChain keyChain = MockKeyChain.configure(signatureName);
+    face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName());
+
+    return face;
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java b/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
index 5691b8b..d73604e 100644
--- a/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
@@ -16,7 +16,7 @@
 /**
  * Test {@link ForLoopRepository}.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class ForLoopRepositoryTest extends RepositoryTest {
 
diff --git a/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java b/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
index fac32f9..e12e2da 100644
--- a/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
@@ -21,7 +21,7 @@
 /**
  * Helper methods for testing.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class RepoHelper {
 
diff --git a/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java b/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
index d981f91..abe0480 100644
--- a/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
@@ -14,17 +14,18 @@
 package com.intel.jndn.utils.repository.impl;
 
 import com.intel.jndn.utils.Repository;
-import static com.intel.jndn.utils.repository.impl.RepoHelper.*;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.Name;
 import org.junit.Test;
+
+import static com.intel.jndn.utils.repository.impl.RepoHelper.*;
 import static org.junit.Assert.*;
 
 /**
  * Extend this in descendant tests.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public abstract class RepositoryTest {
 
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java
deleted file mode 100644
index b93e1a8..0000000
--- a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.server.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.List;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Name;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-/**
- * Test the SegmentedServerHelper
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedServerHelperTest {
-
-  /**
-   * Test of segment method, of class SegmentedServerHelper.
-   *
-   * @throws java.lang.Exception
-   */
-  @Test
-  public void testSegmentation() throws Exception {
-    final Data template = new Data(new Name("/segmented/data"));
-    final InputStream content = new ByteArrayInputStream("0123456789".getBytes());
-    List<Data> segments = SegmentedServerHelper.segment(template, content, 1);
-    assertEquals(10, segments.size());
-
-    // test first packet
-    assertEquals(0, segments.get(0).getName().get(-1).toSegment());
-    assertEquals(9, segments.get(0).getMetaInfo().getFinalBlockId().toSegment());
-    assertEquals("0", segments.get(0).getContent().toString());
-
-    // test last packet
-    assertEquals(9, segments.get(9).getName().get(-1).toSegment());
-    assertEquals(9, segments.get(9).getMetaInfo().getFinalBlockId().toSegment());
-    assertEquals("9", segments.get(9).getContent().toString());
-  }
-
-   /**
-   * Test of segment method, of class SegmentedServerHelper.
-   *
-   * @throws java.lang.Exception
-   */
-  @Test
-  public void testSegmentationDifferentSizes() throws Exception {
-    final Data template = new Data(new Name("/segmented/data"));
-
-    // size 2
-    final InputStream content2 = new ByteArrayInputStream("0123456789".getBytes());
-    List<Data> segments2 = SegmentedServerHelper.segment(template, content2, 2);
-    assertEquals(5, segments2.size());
-    assertEquals("89", segments2.get(4).getContent().toString());
-
-    // size 3
-    final InputStream content3 = new ByteArrayInputStream("0123456789".getBytes());
-    List<Data> segments3 = SegmentedServerHelper.segment(template, content3, 3);
-    assertEquals(4, segments3.size());
-    assertEquals("9", segments3.get(3).getContent().toString());
-
-    // size 4
-    final InputStream content4 = new ByteArrayInputStream("0123456789".getBytes());
-    List<Data> segments4 = SegmentedServerHelper.segment(template, content4, 4);
-    assertEquals(3, segments4.size());
-    assertEquals("89", segments4.get(2).getContent().toString());
-  }
-}
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
index fd6b516..c21d40b 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
@@ -15,26 +15,27 @@
 
 import com.intel.jndn.mock.MockForwarder;
 import com.intel.jndn.utils.client.impl.AdvancedClient;
-import com.intel.jndn.mock.MockFace;
-import java.io.IOException;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.Name;
 import net.named_data.jndn.util.Blob;
-import static org.junit.Assert.*;
-
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 /**
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SegmentedServerTest {
 
-  Face face;
-  SegmentedServer instance;
+  private Face face;
+  private SegmentedServer instance;
 
   @Before
   public void before() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
index a3409a3..8eb991f 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
@@ -16,8 +16,6 @@
 import com.intel.jndn.mock.MockKeyChain;
 import com.intel.jndn.utils.TestHelper;
 import com.intel.jndn.utils.client.impl.AdvancedClient;
-import java.io.IOException;
-import java.util.logging.Logger;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -26,23 +24,28 @@
 import net.named_data.jndn.security.KeyChain;
 import net.named_data.jndn.security.SecurityException;
 import net.named_data.jndn.util.Blob;
-import static org.junit.Assert.*;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 /**
  * Test the {@link SegmentedServer} on an actual NFD; to run this, pass the NFD
  * IP/host name as a parameter like <code>-Dnfd.ip=10.1.1.1</code>.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SegmentedServerTestIT {
 
+  private static final int DATA_SIZE_BYTES = 10000;
   private static final Logger logger = Logger.getLogger(SegmentedServerTestIT.class.getName());
   private static final Name PREFIX = new Name("/test/for/segmented-server");
-  protected static final int DATA_SIZE_BYTES = 10000;
-  Face face;
-  SegmentedServer instance;
-  private String ip;
+  private final Face face;
+  private final SegmentedServer instance;
+  private final String ip;
 
   public SegmentedServerTestIT() throws SecurityException {
     this.ip = System.getProperty("nfd.ip");
@@ -95,7 +98,7 @@
     logger.info("Retrieved data: " + retrieved.getName().toUri());
   }
 
-  Data buildDataPacket(Name name) {
+  private Data buildDataPacket(Name name) {
     Data data = new Data(new Name(name).appendSegment(0));
     data.setContent(new Blob(TestHelper.buildRandomBytes(DATA_SIZE_BYTES)));
     data.getMetaInfo().setFreshnessPeriod(30000);
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
index bd054cc..346c570 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
@@ -13,10 +13,8 @@
  */
 package com.intel.jndn.utils.server.impl;
 
-import com.intel.jndn.utils.ProcessingStage;
 import com.intel.jndn.mock.MockFace;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.intel.jndn.utils.ProcessingStage;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -24,17 +22,21 @@
 import net.named_data.jndn.Name;
 import org.junit.Before;
 import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import static org.junit.Assert.*;
 
 /**
  * Test base server implementation.
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class ServerBaseImplTest {
 
-  Face face;
-  ServerBaseImpl instance;
+  private Face face;
+  private ServerBaseImpl instance;
 
   @Before
   public void before() throws Exception {
@@ -42,18 +44,6 @@
     instance = new ServerBaseImplImpl(face, new Name("/test/base"));
   }
 
-  public class ServerBaseImplImpl extends ServerBaseImpl {
-
-    public ServerBaseImplImpl(Face face, Name prefix) {
-      super(face, prefix);
-    }
-
-    @Override
-    public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
-      throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-    }
-  }
-
   /**
    * Test of getPrefix method, of class ServerBaseImpl.
    */
@@ -61,7 +51,7 @@
   public void testGetPrefix() {
     assertNotNull(instance.getPrefix());
   }
-  
+
   /**
    * Test of getRegisteredPrefixId method, of class ServerBaseImpl
    */
@@ -106,4 +96,16 @@
     assertTrue(instance.isRegistered());
     executor.shutdownNow();
   }
+
+  public class ServerBaseImplImpl extends ServerBaseImpl {
+
+    public ServerBaseImplImpl(Face face, Name prefix) {
+      super(face, prefix);
+    }
+
+    @Override
+    public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
+      throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    }
+  }
 }
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
index 4ce40ae..0b05ff1 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
@@ -14,32 +14,32 @@
 package com.intel.jndn.utils.server.impl;
 
 import com.intel.jndn.mock.MeasurableFace;
-import com.intel.jndn.mock.MockFace;
 import com.intel.jndn.mock.MockForwarder;
 import com.intel.jndn.utils.server.RespondWithBlob;
 import com.intel.jndn.utils.server.RespondWithData;
-import java.io.IOException;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.Name;
-import net.named_data.jndn.OnData;
 import net.named_data.jndn.encoding.EncodingException;
 import net.named_data.jndn.util.Blob;
-import static org.junit.Assert.*;
-
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 /**
  * Test {@link SimpleServer}
  *
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
  */
 public class SimpleServerTest {
 
-  Face face;
-  SimpleServer instance;
+  private Face face;
+  private SimpleServer instance;
 
   @Before
   public void before() throws Exception {