Finish BoundedInMemoryContentStore
diff --git a/pom.xml b/pom.xml
index 323a19c..f5ca9c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
   <groupId>com.intel.jndn.utils</groupId>
   <artifactId>jndn-utils</artifactId>
   <version>1.0.4</version>
-  <prefix>jndn-utils</prefix>
+  <name>jndn-utils</name>
   <description>Collection of tools to simplify synchronous and asynchronous data transfer over the NDN network
   </description>
   <url>https://github.com/01org/jndn-utils</url>
@@ -14,14 +14,14 @@
 
   <licenses>
     <license>
-      <prefix>LGPL v3</prefix>
+      <name>LGPL v3</name>
       <url>https://www.gnu.org/licenses/lgpl.html</url>
     </license>
   </licenses>
 
   <developers>
     <developer>
-      <prefix>Andrew Brown</prefix>
+      <name>Andrew Brown</name>
       <url>http://github.com/andrewsbrown</url>
     </developer>
   </developers>
@@ -58,6 +58,12 @@
       <version>1.1.0</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>1.10.19</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <properties>
diff --git a/src/main/java/com/intel/jndn/utils/ContentStore.java b/src/main/java/com/intel/jndn/utils/ContentStore.java
index aad8a0d..f446dea 100644
--- a/src/main/java/com/intel/jndn/utils/ContentStore.java
+++ b/src/main/java/com/intel/jndn/utils/ContentStore.java
@@ -51,7 +51,8 @@
   Blob get(Name name);
 
   /**
-   * Write the data under a given name to the face
+   * Write the stored content to the face as Data packets. If no content exists for the given name, this method should
+   * have no effect.
    *
    * @param face the face to write to
    * @param name the name of the data to write
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
similarity index 60%
rename from src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java
rename to src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
index 375fae5..477a573 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
@@ -1,56 +1,71 @@
-/*

- * jndn-utils

- * Copyright (c) 2016, Intel Corporation.

- *

- * This program is free software; you can redistribute it and/or modify it

- * under the terms and conditions of the GNU Lesser General Public License,

- * version 3, as published by the Free Software Foundation.

- *

- * This program is distributed in the hope it will be useful, but WITHOUT ANY

- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

- * more details.

- */

-

-package com.intel.jndn.utils.pubsub;

-

-import com.intel.jndn.utils.ContentStore;

-import net.named_data.jndn.Face;

-import net.named_data.jndn.Name;

-import net.named_data.jndn.util.Blob;

-

-/**

- * @author Andrew Brown, andrew.brown@intel.com

- */

-class BlobContentStore implements ContentStore {

-  private final BoundedLinkedMap<Name, Blob> store;

-

-  BlobContentStore(int maxSize) {

-    this.store = new BoundedLinkedMap<>(maxSize);

-  }

-

-  @Override

-  public void put(Name name, Blob data) {

-    store.put(name, data);

-  }

-

-  @Override

-  public boolean has(Name name) {

-    return store.containsKey(name);

-  }

-

-  @Override

-  public Blob get(Name name) {

-    return store.get(name);

-  }

-

-  @Override

-  public void push(Face face, Name name) {

-    throw new UnsupportedOperationException();

-  }

-

-  @Override

-  public void clear() {

-    store.clear();

-  }

-}

+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.ContentStore;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryContentStore implements ContentStore {
+  private final BoundedLinkedMap<Name, Blob> store;
+  private final Data template;
+
+  public BoundedInMemoryContentStore(int maxSize, int freshnessMs) {
+    this.store = new BoundedLinkedMap<>(maxSize);
+    this.template = new Data();
+    this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
+  }
+
+  @Override
+  public void put(Name name, Blob data) {
+    store.put(name, data);
+  }
+
+  @Override
+  public boolean has(Name name) {
+    return store.containsKey(name);
+  }
+
+  @Override
+  public Blob get(Name name) {
+    return store.get(name);
+  }
+
+  @Override
+  public void push(Face face, Name name) throws IOException {
+    Blob blob = get(name);
+    if (blob != null) {
+      Data t = new Data(template);
+      t.setName(name);
+      ByteArrayInputStream b = new ByteArrayInputStream(blob.getImmutableArray());
+      for (Data d : SegmentationHelper.segment(t, b)) {
+        face.putData(d);
+      }
+    }
+  }
+
+  @Override
+  public void clear() {
+    store.clear();
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
new file mode 100644
index 0000000..b214019
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
@@ -0,0 +1,62 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.pubsub.PendingInterestTable;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTable implements PendingInterestTable {
+  //private static final Logger LOGGER = Logger.getLogger(BoundedInMemoryPendingInterestTable.class.getName());
+  private final BoundedLinkedMap<Name, Interest> table;
+
+  public BoundedInMemoryPendingInterestTable(int maxSize) {
+    this.table = new BoundedLinkedMap<>(maxSize);
+  }
+
+  @Override
+  public void add(Interest interest) {
+    table.put(interest.getName(), interest);
+  }
+
+  @Override
+  public boolean has(Interest interest) {
+    if (interest.getChildSelector() != -1) {
+      for (Name name : table.keySet()) {
+        if (interest.matchesName(name)) {
+          return true;
+        }
+      }
+      return false;
+    } else {
+      return has(interest.getName());
+    }
+  }
+
+  public boolean has(Name name) {
+    return table.containsKey(name);
+  }
+
+  @Override
+  public Collection<Interest> extract(Name name) {
+    return table.values().stream().filter(i -> i.matchesName(name)).collect(Collectors.toList());
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/BoundedLinkedMap.java b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
similarity index 97%
rename from src/main/java/com/intel/jndn/utils/pubsub/BoundedLinkedMap.java
rename to src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
index 8211fe1..3cc4538 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/BoundedLinkedMap.java
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
@@ -12,7 +12,7 @@
  * more details.
  */
 
-package com.intel.jndn.utils.pubsub;
+package com.intel.jndn.utils.impl;
 
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -106,7 +106,9 @@
   @Override
   public synchronized V remove(Object key) {
     V value = map.remove(key);
-    if (key == latest) latest = findLatest(map);
+    if (key == latest) {
+      latest = findLatest(map);
+    }
     return value;
   }
 
diff --git a/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
index f63fe2f..1ddef62 100644
--- a/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
+++ b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
@@ -98,8 +98,8 @@
   }
 
   /**
-   * Segment a stream of bytes into a list of Data packets; this must read all
-   * the bytes first in order to determine the end segment for FinalBlockId.
+   * Segment a stream of bytes into a list of Data packets; this must read all the bytes first in order to determine the
+   * end segment for FinalBlockId. TODO this could be smarter and only add segments if necessary
    *
    * @param template the {@link Data} packet to use for the segment {@link Name}, {@link net.named_data.jndn.MetaInfo},
    * etc.
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java
deleted file mode 100644
index 10e4084..0000000
--- a/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.pubsub;
-
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public class ForLoopPendingInterestTable implements PendingInterestTable {
-  Map<Name, Interest> table = new ConcurrentHashMap<>();
-
-  @Override
-  public void add(Interest interest) {
-    table.put(interest.getName(), interest);
-  }
-
-  @Override
-  public boolean has(Interest interest) {
-    // TODO must handle more complex logic (selectors)
-    return has(interest.getName());
-  }
-
-  public boolean has(Name name) {
-    return table.containsKey(name);
-  }
-
-  @Override
-  public Collection<Interest> extract(Name name) {
-    // TODO more complexity to return multiple results
-    return has(name) ? Collections.singleton(table.get(name)) : Collections.emptyList();
-  }
-}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
index 9679d7e..4fa8409 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
@@ -56,22 +56,23 @@
 

   @Override

   public void announceEntrance(long id) throws IOException {

-    Name.Component publisherId = Name.Component.fromNumberWithMarker(id, PubSubNamespace.PUBLISHER_ID_MARKER);

-    Name.Component announcementAction = Name.Component.fromNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_ENTRANCE, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);

-    Interest interest = new Interest(new Name(broadcastPrefix).append(topicPrefix).append(publisherId).append(announcementAction));

+    LOGGER.log(Level.INFO, "Announcing publisher entrance: {0} to {1}", new Object[]{id, topicPrefix});

+    Name name = PubSubNamespace.toAnnouncement(new Name(broadcastPrefix).append(topicPrefix), id, PubSubNamespace.Announcement.ENTRANCE);

+    Interest interest = new Interest(name);

     face.expressInterest(interest, null);

   }

 

   @Override

   public void announceExit(long id) throws IOException {

-    Name.Component publisherId = Name.Component.fromNumberWithMarker(id, PubSubNamespace.PUBLISHER_ID_MARKER);

-    Name.Component announcementAction = Name.Component.fromNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_EXIT, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);

-    Interest interest = new Interest(new Name(broadcastPrefix).append(topicPrefix).append(publisherId).append(announcementAction));

+    LOGGER.log(Level.INFO, "Announcing publisher exit: {0} from {1}", new Object[]{id, topicPrefix});

+    Name name = PubSubNamespace.toAnnouncement(new Name(broadcastPrefix).append(topicPrefix), id, PubSubNamespace.Announcement.EXIT);

+    Interest interest = new Interest(name);

     face.expressInterest(interest, null);

   }

 

   @Override

   public Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) {

+    LOGGER.log(Level.INFO, "Discover existing publishers: {0}", topicPrefix);

     if (onFound == null) {

       return CANCELLED;

     }

@@ -100,6 +101,7 @@
 

   @Override

   public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException {

+    LOGGER.log(Level.INFO, "Observing new announcements: {0}", topicPrefix);

     CompletableFuture<Void> future = new CompletableFuture<>();

     OnRegistration onRegistration = new OnRegistration(future);

     OnAnnouncement onAnnouncement = new OnAnnouncement(onAdded, onRemoved, onError);

@@ -127,12 +129,12 @@
     public void onInterest(Name name, Interest interest, Face face, long l, InterestFilter interestFilter) {

       try {

         long publisherId = interest.getName().get(-2).toNumberWithMarker(PubSubNamespace.PUBLISHER_ID_MARKER);

-        int announcement = (int) interest.getName().get(-1).toNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);

+        PubSubNamespace.Announcement announcement = PubSubNamespace.parseAnnouncement(interest.getName());

         switch (announcement) {

-          case PubSubNamespace.ANNOUNCEMENT_ENTRANCE:

+          case ENTRANCE:

             add(publisherId);

             break;

-          case PubSubNamespace.ANNOUNCEMENT_EXIT:

+          case EXIT:

             remove(publisherId);

             break;

           default:

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
index f68e62b..a6e1b87 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -16,14 +16,12 @@
 

 import com.intel.jndn.utils.ContentStore;

 import com.intel.jndn.utils.Publisher;

-import net.named_data.jndn.Data;

 import net.named_data.jndn.Face;

 import net.named_data.jndn.Interest;

 import net.named_data.jndn.InterestFilter;

 import net.named_data.jndn.Name;

 import net.named_data.jndn.OnInterestCallback;

 import net.named_data.jndn.OnRegisterFailed;

-import net.named_data.jndn.encoding.EncodingException;

 import net.named_data.jndn.security.SecurityException;

 import net.named_data.jndn.util.Blob;

 

@@ -32,6 +30,7 @@
 import java.util.concurrent.ExecutionException;

 import java.util.concurrent.TimeUnit;

 import java.util.concurrent.TimeoutException;

+import java.util.concurrent.atomic.AtomicLong;

 import java.util.logging.Level;

 import java.util.logging.Logger;

 

@@ -46,9 +45,9 @@
   private final PendingInterestTable pendingInterestTable;

   private final ContentStore contentStore;

   private final long publisherId;

-  private volatile long latestMessageId = 0;

+  private final AtomicLong latestMessageId = new AtomicLong(0);

   private long registrationId;

-  private boolean started = false;

+  private boolean opened = false;

 

   NdnPublisher(Face face, Name prefix, long publisherId, AnnouncementService announcementService, PendingInterestTable pendingInterestTable, ContentStore contentStore) {

     this.face = face;

@@ -59,13 +58,14 @@
     this.contentStore = contentStore;

   }

 

-  void start() throws RegistrationFailureException {

-    started = true;

+  synchronized void open() throws RegistrationFailureException {

+    opened = true;

     CompletableFuture<Void> future = new CompletableFuture<>();

     OnRegistration onRegistration = new OnRegistration(future);

 

     try {

       registrationId = face.registerPrefix(prefix, this, (OnRegisterFailed) onRegistration, onRegistration);

+      // assumes face.processEvents is driven concurrently elsewhere

       future.get(10, TimeUnit.SECONDS);

       announcementService.announceEntrance(publisherId);

     } catch (IOException | SecurityException | InterruptedException | ExecutionException | TimeoutException e) {

@@ -73,33 +73,37 @@
     }

   }

 

+  // TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers

+  // to retrieve still-alive messages

   @Override

-  public void close() throws IOException {

-    face.removeRegisteredPrefix(registrationId);

-    contentStore.clear();

-    announcementService.announceExit(publisherId);

+  public synchronized void close() throws IOException {

+    if (opened) {

+      face.removeRegisteredPrefix(registrationId);

+      contentStore.clear();

+      announcementService.announceExit(publisherId);

+    }

   }

 

-  // TODO should throw IOException?

   @Override

   public void publish(Blob message) throws IOException {

-    if (!started) {

+    if (!opened) {

       try {

-        start();

+        open();

       } catch (RegistrationFailureException e) {

         throw new IOException(e);

       }

     }

 

-    long id = latestMessageId++; // TODO synchronize?

+    long id = latestMessageId.getAndIncrement();

     Name name = PubSubNamespace.toMessageName(prefix, publisherId, id);

 

     contentStore.put(name, message);

-    LOGGER.log(Level.INFO, "Published message {0} to content store", id);

+    LOGGER.log(Level.INFO, "Published message {0} to content store: {1}", new Object[]{id, name});

 

     if (pendingInterestTable.has(new Interest(name))) {

       try {

         contentStore.push(face, name);

+        // TODO extract satisfied interests

       } catch (IOException e) {

         LOGGER.log(Level.SEVERE, "Failed to send message {0} for pending interests: {1}", new Object[]{id, name, e});

       }

@@ -108,23 +112,15 @@
 

   @Override

   public void onInterest(Name name, Interest interest, Face face, long registrationId, InterestFilter interestFilter) {

+    LOGGER.log(Level.INFO, "Client requesting message: {0}", interest.toUri());

     try {

       if (contentStore.has(interest.getName())) {

         contentStore.push(face, interest.getName());

       } else {

         pendingInterestTable.add(interest);

       }

-

-      long id = interest.getName().get(-1).toNumberWithMarker(43);

-      Blob blob = contentStore.get(interest.getName());

-      Data data = new Data(interest.getName());

-      data.setContent(blob);

-      face.putData(data);

-      LOGGER.info("Published message " + id + " for interest: " + interest.toUri());

     } catch (IOException e) {

       LOGGER.log(Level.SEVERE, "Failed to publish message for interest: " + interest.toUri(), e);

-    } catch (EncodingException e) {

-      LOGGER.log(Level.SEVERE, "Failed to decode message ID for interest: " + interest.toUri(), e);

     }

   }

 }

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
index e2deebd..164dd89 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
@@ -37,9 +37,11 @@
   private static final Logger LOGGER = Logger.getLogger(NdnSubscriber.class.getName());

   private final Face face;

   private final Name prefix;

+  private final On<Blob> onMessage = null;

+  private final On<Exception> onError = null;

   private final AnnouncementService announcementService;

   private final Client client;

-  private final Map<Long, Context> known = new HashMap<>();

+  private final Map<Long, Subscription> subscriptions = new HashMap<>();

   private Cancellation newAnnouncementCancellation;

   private Cancellation existingAnnouncementsCancellation;

   private volatile boolean started = false;

@@ -51,8 +53,8 @@
     this.client = client;

   }

 

-  private void start() throws RegistrationFailureException {

-    LOGGER.log(Level.INFO, "Starting subscriber");

+  void open() throws RegistrationFailureException {

+    LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);

 

     existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::add, null, e -> close());

     newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::add, this::remove, e -> close());

@@ -60,21 +62,24 @@
     started = true;

   }

 

-  private void add(long publisherId) {

-    if (known.containsKey(publisherId)) {

+  void add(long publisherId) {

+    if (subscriptions.containsKey(publisherId)) {

       LOGGER.log(Level.WARNING, "Duplicate publisher ID {} received from announcement service; this should not happen and will be ignored", publisherId);

     } else {

-      known.put(publisherId, new Context(publisherId));

+      Subscription subscription = new Subscription(publisherId);

+      subscriptions.put(publisherId, subscription);

+      subscription.subscribe();

     }

   }

 

-  private void remove(long publisherId) {

-    known.remove(publisherId);

+  void remove(long publisherId) {

+    Subscription removed = subscriptions.remove(publisherId);

+    removed.cancel();

   }

 

   @Override

   public void close() {

-    LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{known.size(), known});

+    LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{subscriptions.size(), subscriptions});

 

     if (newAnnouncementCancellation != null) {

       newAnnouncementCancellation.cancel();

@@ -84,7 +89,7 @@
       existingAnnouncementsCancellation.cancel();

     }

 

-    for (Context c : known.values()) {

+    for (Subscription c : subscriptions.values()) {

       c.cancel();

     }

 

@@ -92,15 +97,16 @@
   }

 

   Set<Long> knownPublishers() {

-    return known.keySet();

+    return subscriptions.keySet();

   }

 

   // TODO repeated calls?

+  // TODO remove this, do topic.subscribe(On..., On...) instead; or new Subscriber(On..., On..., ...).open()

   @Override

   public Cancellation subscribe(On<Blob> onMessage, On<Exception> onError) {

     if (!started) {

       try {

-        start();

+        open();

       } catch (RegistrationFailureException e) {

         LOGGER.log(Level.SEVERE, "Failed to start announcement service, aborting subscription", e);

         onError.on(e);

@@ -108,29 +114,25 @@
       }

     }

 

-    for (Context c : known.values()) {

-      c.subscribe(onMessage, onError);

+    LOGGER.log(Level.INFO, "Subscribing: {0}", prefix);

+    for (Subscription c : subscriptions.values()) {

+      c.subscribe();

     }

 

     return this::close;

   }

 

-  private class Context implements Cancellation {

+  private class Subscription implements Cancellation {

     final long publisherId;

     long messageId;

     boolean subscribed = false;

-    On<Blob> onMessage;

-    On<Exception> onError;

     CompletableFuture<Data> currentRequest;

 

-    Context(long publisherId) {

+    Subscription(long publisherId) {

       this.publisherId = publisherId;

     }

 

-    synchronized void subscribe(On<Blob> onMessage, On<Exception> onError) {

-      this.onMessage = onMessage;

-      this.onError = onError;

-

+    synchronized void subscribe() {

       if (subscribed) {

         return;

       }

@@ -191,8 +193,8 @@
       if (o == null || getClass() != o.getClass()) {

         return false;

       }

-      Context context = (Context) o;

-      return publisherId == context.publisherId;

+      Subscription subscription = (Subscription) o;

+      return publisherId == subscription.publisherId;

     }

 

     @Override

@@ -202,7 +204,7 @@
 

     @Override

     public String toString() {

-      return "Context{" + "publisherId=" + publisherId + '}';

+      return "Subscription{" + "publisherId=" + publisherId + '}';

     }

   }

 }
\ No newline at end of file
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
index 5df1c79..89221cc 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
@@ -30,10 +30,8 @@
   static final int PUBLISHER_ID_MARKER = 43;

   static final int ANNOUNCEMENT_ACTION_MARKER = 44;

 

-  static final int ANNOUNCEMENT_ENTRANCE = 1;

-  static final int ANNOUNCEMENT_EXIT = 0;

-

   static final int MESSAGE_MARKER = 100;

+

   static final int MESSAGE_ATTRIBUTES_MARKER = 101;

   static final int MESSAGE_CONTENT_MARKER = 102;

 

@@ -48,13 +46,18 @@
     return new Name(prefix).append(publisherComponent);

   }

 

+  static Name toAnnouncement(Name prefix, long publisherId, Announcement announcement) {

+    Name.Component announcementComponent = Name.Component.fromNumberWithMarker(announcement.id, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);

+    return toPublisherName(prefix, publisherId).append(announcementComponent);

+  }

+

   static Name toMessageName(Name prefix, long publisherId, long messageId) {

     Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);

     return toPublisherName(prefix, publisherId).append(messageComponent);

   }

 

   static Response toResponse(Data data) throws EncodingException {

-    long messageId = extractMessageId(data.getName());

+    long messageId = findMarkerFromEnd(data.getName(), MESSAGE_ID_MARKER);

     TlvDecoder decoder = new TlvDecoder(data.getContent().buf());

     int endOffset = decoder.readNestedTlvsStart(MESSAGE_MARKER);

     Blob attributes = new Blob(decoder.readOptionalBlobTlv(MESSAGE_ATTRIBUTES_MARKER, endOffset), true);

@@ -62,14 +65,48 @@
     return new Response(data.getName(), messageId, Collections.singletonMap("*", attributes), content);

   }

 

-  private static long extractMessageId(Name name) throws EncodingException {

-    for (int i = name.size(); i >= 0; i--) {

+  static long parsePublisher(Name name) throws EncodingException {

+    return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);

+  }

+

+  static Announcement parseAnnouncement(Name name) throws EncodingException {

+    return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));

+  }

+

+  private static long findMarkerFromEnd(Name name, int marker) throws EncodingException {

+    for (int i = name.size() - 1; i >= 0; i--) {

       try {

-        return name.get(i).toNumberWithMarker(MESSAGE_ID_MARKER);

+        return name.get(i).toNumberWithMarker(marker);

       } catch (EncodingException e) {

         // do nothing

       }

     }

-    throw new EncodingException("Failed to find message ID marker in name: " + name.toUri());

+    throw new EncodingException("Failed to find marker " + marker + " in name: " + name.toUri());

+  }

+

+  enum Announcement {

+    ENTRANCE(0),

+    EXIT(1);

+

+    private final int id;

+

+    Announcement(int id) {

+      this.id = id;

+    }

+

+    static Announcement from(int value) {

+      for (Announcement a : Announcement.values()) {

+        if (a.id == value) return a;

+      }

+      return null;

+    }

+

+    int value() {

+      return id;

+    }

+

+    public int getValue() {

+      return id;

+    }

   }

 }

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
index 4a60b1c..5575712 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
@@ -17,6 +17,8 @@
 import com.intel.jndn.utils.Publisher;

 import com.intel.jndn.utils.Subscriber;

 import com.intel.jndn.utils.client.impl.AdvancedClient;

+import com.intel.jndn.utils.impl.BoundedInMemoryContentStore;

+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;

 import net.named_data.jndn.Face;

 import net.named_data.jndn.Name;

 

@@ -36,13 +38,13 @@
     return new Name(name);

   }

 

-  // TODO move to PubSubFactory?

+  // TODO move to PubSubFactory? change this to subscribe()

   public Subscriber newSubscriber(Face face) {

     return new NdnSubscriber(face, name, new NdnAnnouncementService(face, name), new AdvancedClient());

   }

 

-  // TODO move to PubSubFactory?

+  // TODO move to PubSubFactory? change this to publish()

   public Publisher newPublisher(Face face) {

-    return new NdnPublisher(face, name, new Random().nextLong(), new NdnAnnouncementService(face, name), new ForLoopPendingInterestTable(), new BlobContentStore(1024));

+    return new NdnPublisher(face, name, new Random().nextLong(), new NdnAnnouncementService(face, name), new BoundedInMemoryPendingInterestTable(1024), new BoundedInMemoryContentStore(1024, 2000));

   }

 }

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/package.html b/src/main/java/com/intel/jndn/utils/pubsub/package.html
new file mode 100644
index 0000000..562c542
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/package.html
@@ -0,0 +1,26 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
+<html>
+<body>
+<p>See [need link here] for algorithm details.</p>
+
+<p>The NDN pub-sub algorithm is designed using the following criteria:</p>
+<ul>
+    <li>subscriber-driven</li>
+    <li>lightweight</li>
+    <li>avoid unnecessary IO</li>
+</ul>
+
+<p>It makes the following guarantees:</p>
+<ul>
+    <li>each publisher will order its own messages by the order in which they were published; however, the mechanism
+        for this ordering (i.e. a monotonically-increasing ID) will not maintain the guarantee across multiple
+        publishers (in this case, an application-level mechanism, like a time-synchronized timestamp, must be used;
+        alternatively, try ChronoSync)
+    </li>
+    <li>publishers will provide the messages published on a topic for a specified lifetime; even after being closed,
+        the publisher must maintain a registered prefix and respond to subscriber requests for data
+    </li>
+    <li>subscribers will eventually discover all routable publishers</li>
+</ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStoreTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStoreTest.java
new file mode 100644
index 0000000..aa896ba
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStoreTest.java
@@ -0,0 +1,82 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.mock.MockFace;
+import com.intel.jndn.utils.ContentStore;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryContentStoreTest {
+  ContentStore instance = new BoundedInMemoryContentStore(5, 1000);
+
+  @Before
+  public void setUp() throws Exception {
+
+  }
+
+  @Test
+  public void basicUsage() throws Exception {
+    instance.put(new Name("/a"), new Blob("."));
+
+    assertTrue(instance.has(new Name("/a")));
+    assertArrayEquals(".".getBytes(), instance.get(new Name("/a")).getImmutableArray());
+  }
+
+  @Test
+  public void replacement() throws Exception {
+    instance.put(new Name("/a"), new Blob("."));
+    instance.put(new Name("/a/b"), new Blob(".."));
+    instance.put(new Name("/a/b/c"), new Blob("..."));
+    instance.put(new Name("/a/b/c/d"), new Blob("...."));
+    instance.put(new Name("/a/b/c/d/e"), new Blob("...."));
+    assertTrue(instance.has(new Name("/a")));
+
+    instance.put(new Name("/replace/oldest"), new Blob("."));
+
+    assertFalse(instance.has(new Name("/a")));
+    assertTrue(instance.has(new Name("/replace/oldest")));
+  }
+
+  @Test
+  public void push() throws Exception {
+    MockFace face = new MockFace();
+    Name name = new Name("/a");
+    instance.put(name, new Blob("."));
+
+    instance.push(face, name);
+
+    assertEquals(1, face.sentData.size());
+    assertEquals(name.appendSegment(0), face.sentData.get(0).getName()); // TODO this should probably be smarter and avoid appending segments if not needed
+  }
+
+  @Test
+  public void clear() throws Exception {
+    instance.put(new Name("/a"), new Blob("."));
+    assertTrue(instance.has(new Name("/a")));
+
+    instance.clear();
+
+    assertFalse(instance.has(new Name("/a")));
+  }
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
new file mode 100644
index 0000000..739e215
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTableTest {
+
+  private BoundedInMemoryPendingInterestTable instance;
+
+  @Before
+  public void before() {
+    instance = new BoundedInMemoryPendingInterestTable(5);
+  }
+
+  @Test
+  public void add() throws Exception {
+    Name name = new Name("/a/b/c");
+    instance.add(new Interest(name));
+    assertTrue(instance.has(name));
+  }
+
+  @Test
+  public void has() throws Exception {
+    Name name = new Name("/a/b/c");
+    instance.add(new Interest(name));
+
+    Interest interest = new Interest(new Name("/a/b"));
+    interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+    assertTrue(instance.has(interest));
+  }
+
+  @Test
+  public void extract() throws Exception {
+    instance.add(new Interest(new Name("/a")));
+    instance.add(new Interest(new Name("/a/b")));
+    instance.add(new Interest(new Name("/a/b/c")));
+
+    Collection<Interest> extracted = instance.extract(new Name("/a/b"));
+    assertEquals(2, extracted.size()); // TODO not sure about this...
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/BoundedLinkedMapTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
similarity index 98%
rename from src/test/java/com/intel/jndn/utils/pubsub/BoundedLinkedMapTest.java
rename to src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
index 3107db6..d4ca544 100644
--- a/src/test/java/com/intel/jndn/utils/pubsub/BoundedLinkedMapTest.java
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
@@ -12,7 +12,7 @@
  * more details.
  */
 
-package com.intel.jndn.utils.pubsub;
+package com.intel.jndn.utils.impl;
 
 import org.junit.Before;
 import org.junit.Test;
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
new file mode 100644
index 0000000..26cd0d8
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
@@ -0,0 +1,72 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnAnnouncementServiceTest {
+
+  private NdnAnnouncementService instance;
+  private Face face;
+
+  @Before
+  public void before() {
+    face = mock(Face.class);
+    instance = new NdnAnnouncementService(face, new Name("/topic/prefix"));
+  }
+
+  @Test
+  public void announceEntrance() throws Exception {
+    instance.announceEntrance(42);
+
+    ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+    verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+    assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+    assertEquals(PubSubNamespace.Announcement.ENTRANCE, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+  }
+
+  @Test
+  public void announceExit() throws Exception {
+    instance.announceExit(42);
+
+    ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+    verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+    assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+    assertEquals(PubSubNamespace.Announcement.EXIT, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+  }
+
+  @Test
+  public void discoverExistingAnnouncements() throws Exception {
+
+  }
+
+  @Test
+  public void observeNewAnnouncements() throws Exception {
+
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
new file mode 100644
index 0000000..bdb0693
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
@@ -0,0 +1,169 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MeasurableFace;
+import com.intel.jndn.mock.MockForwarder;
+import com.intel.jndn.utils.impl.BoundedInMemoryContentStore;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnPublisherTest {
+  private static final Logger LOGGER = Logger.getLogger(NdnPublisherTest.class.getName());
+  private static final Name PUBLISHER_PREFIX = new Name("/publisher");
+  private static final long PUBLISHER_ID = new Random().nextLong();
+  private NdnPublisher instance;
+  private Face face;
+  private MockForwarder forwarder;
+
+  @Before
+  public void before() throws Exception {
+    forwarder = new MockForwarder();
+    face = forwarder.connect();
+    NdnAnnouncementService announcementService = new NdnAnnouncementService(face, PUBLISHER_PREFIX);
+    BoundedInMemoryPendingInterestTable pendingInterestTable = new BoundedInMemoryPendingInterestTable(1024);
+    BoundedInMemoryContentStore contentStore = new BoundedInMemoryContentStore(1024, 2000);
+    instance = new NdnPublisher(face, PUBLISHER_PREFIX, PUBLISHER_ID, announcementService, pendingInterestTable, contentStore);
+
+    driveFace(face); // TODO preferably do this in MockForwarder
+  }
+
+  @Test
+  public void openClose() throws Exception {
+    instance.open();
+    assertEquals(1, numSentInterests()); // doesn't count prefix registration, only announcement
+
+    instance.close();
+    assertEquals(2, numSentInterests());
+    assertEquals(0, numReceivedDatas());
+  }
+
+  @Test
+  public void close() throws Exception {
+    instance.close();
+
+    assertEquals(0, numSentInterests()); // if the publisher has not been opened, it will not announce an exit
+    assertEquals(0, numReceivedDatas());
+  }
+
+  @Test
+  public void publish() throws Exception {
+    instance.open();
+    assertEquals(1, numSentInterests());
+
+    instance.publish(new Blob("..."));
+    assertEquals(0, numSentDatas());
+    assertEquals(1, numSentInterests()); // none more than from opening
+  }
+
+  @Test
+  public void publishWithPendingInterest() throws Exception {
+    instance.open();
+    assertEquals(1, numSentInterests());
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Face client = forwarder.connect();
+    client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    instance.publish(new Blob("..."));
+    assertEquals(1, numSentDatas());
+    assertEquals(1, numSentInterests()); // none more than from opening
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    latch.await(1, TimeUnit.SECONDS);
+    assertEquals(0, latch.getCount());
+  }
+
+  @Test
+  public void publishAndRespondToNewInterest() throws Exception {
+    Face client = forwarder.connect();
+    instance.open();
+    assertEquals(1, numSentInterests());
+
+    instance.publish(new Blob("..."));
+    assertEquals(0, numSentDatas());
+    assertEquals(1, numSentInterests()); // none more than from opening
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    CountDownLatch latch = new CountDownLatch(1);
+    client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    client.processEvents(); // TODO remove somehow
+    face.processEvents();
+
+    latch.await(1, TimeUnit.SECONDS);
+    assertEquals(0, latch.getCount());
+  }
+
+  private int numSentDatas() {
+    return ((MeasurableFace) face).sentDatas().size();
+  }
+
+  private int numReceivedDatas() {
+    return ((MeasurableFace) face).receivedDatas().size();
+  }
+
+  private int numSentInterests() {
+    return ((MeasurableFace) face).sentInterests().size();
+  }
+
+  private void driveFace(Face face) throws InterruptedException {
+    ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
+    //CountDownLatch latch = new CountDownLatch(1);
+
+    pool.schedule(() -> {
+      //latch.countDown();
+      try {
+        face.processEvents();
+      } catch (IOException | EncodingException e) {
+        LOGGER.log(Level.SEVERE, "Failed to process face events", e);
+        fail(e.getMessage());
+      }
+    }, 50, TimeUnit.MILLISECONDS);
+
+    //latch.await();
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
new file mode 100644
index 0000000..b3ca959
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
@@ -0,0 +1,82 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Client;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnSubscriberTest {
+
+  public static final Name TOPIC_NAME = new Name("/subscriber");
+  private NdnSubscriber instance;
+  private AnnouncementService announcementService;
+
+  @Before
+  public void before() {
+    Face face = mock(Face.class);
+    announcementService = mock(AnnouncementService.class);
+    Client client = mock(Client.class);
+    instance = new NdnSubscriber(face, TOPIC_NAME, announcementService, client);
+  }
+
+  @Test
+  public void open() throws Exception {
+    instance.open();
+  }
+
+  @Test
+  public void close() throws Exception {
+    instance.close();
+  }
+
+  @Test
+  public void knownPublishers() throws Exception {
+    ArgumentCaptor<On<Long>> existingPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+    ArgumentCaptor<On<Long>> newPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+    when(announcementService.discoverExistingAnnouncements(existingPublishersSignal.capture(), any(), any())).thenReturn(null);
+    when(announcementService.observeNewAnnouncements(newPublishersSignal.capture(), any(), any())).thenReturn(null);
+    instance.open();
+
+    assertEquals(0, instance.knownPublishers().size());
+
+    existingPublishersSignal.getValue().on(42L);
+    assertEquals(1, instance.knownPublishers().size());
+
+    newPublishersSignal.getValue().on(42L);
+    assertEquals(1, instance.knownPublishers().size());
+
+    existingPublishersSignal.getValue().on(99L);
+    assertEquals(2, instance.knownPublishers().size());
+  }
+
+  @Test
+  public void subscribe() throws Exception {
+    instance.add(1);
+    instance.remove(1);
+  }
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/TopicTestIT.java b/src/test/java/com/intel/jndn/utils/pubsub/TopicTestIT.java
new file mode 100644
index 0000000..2b06e6d
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/TopicTestIT.java
@@ -0,0 +1,74 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MockKeyChain;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.ThreadPoolFace;
+import net.named_data.jndn.security.KeyChain;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.transport.AsyncTcpTransport;
+import net.named_data.jndn.util.Blob;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class TopicTestIT {
+  private static final Logger LOGGER = Logger.getLogger(TopicTestIT.class.getName());
+
+  @Test
+  public void basicUsage() throws Exception {
+    Face pubFace = connect("localhost");
+    Face subFace = connect("localhost");
+    Topic topic = new Topic(new Name("/pub/sub/topic"));
+    CountDownLatch latch = new CountDownLatch(3);
+
+    Subscriber subscriber = topic.newSubscriber(pubFace);
+    subscriber.subscribe(b -> latch.countDown(), e -> fail(e.getMessage()));
+
+    Publisher publisher = topic.newPublisher(subFace);
+    publisher.publish(new Blob("."));
+    publisher.publish(new Blob(".."));
+    publisher.publish(new Blob("..."));
+
+    latch.await(2, TimeUnit.SECONDS);
+    assertEquals(0, latch.getCount());
+  }
+
+  private Face connect(String hostName) throws SecurityException {
+    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
+    AsyncTcpTransport transport = new AsyncTcpTransport(pool);
+    AsyncTcpTransport.ConnectionInfo connectionInfo = new AsyncTcpTransport.ConnectionInfo(hostName, 6363, true);
+    ThreadPoolFace face = new ThreadPoolFace(pool, transport, connectionInfo);
+
+    KeyChain keyChain = MockKeyChain.configure(new Name("/topic/test/it"));
+    face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName());
+
+    return face;
+  }
+}
\ No newline at end of file