Create working pub-sub integration test

Also, refactors locations of some classes
diff --git a/src/main/java/com/intel/jndn/utils/Cancellation.java b/src/main/java/com/intel/jndn/utils/Cancellation.java
new file mode 100644
index 0000000..1a8e29f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Cancellation.java
@@ -0,0 +1,35 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Represents a token used for canceling some ongoing action. This approach was chosen as it is lighter and more
+ * flexible than a cancellable {@link java.util.concurrent.Future}; perhaps it could be replaced by {@link
+ * AutoCloseable} although this seems to imply something else (however, the language support is a benefit, TODO
+ * re-look at this).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Cancellation {
+  /**
+   * Shortcut for a no-op, already-cancelled cancellation
+   */
+  Cancellation CANCELLED = () -> {/* do nothing */};
+
+  /**
+   * Cancel the action referenced by this token
+   */
+  void cancel();
+}
diff --git a/src/main/java/com/intel/jndn/utils/ContentStore.java b/src/main/java/com/intel/jndn/utils/ContentStore.java
index e0ccc51..e6ad68c 100644
--- a/src/main/java/com/intel/jndn/utils/ContentStore.java
+++ b/src/main/java/com/intel/jndn/utils/ContentStore.java
@@ -23,7 +23,7 @@
 import java.util.Optional;
 
 /**
- * TODO merge with Repository
+ * Represents a named content store of NDN data which allows querying by name or interest; TODO merge with Repository
  *
  * @author Andrew Brown, andrew.brown@intel.com
  */
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/On.java b/src/main/java/com/intel/jndn/utils/On.java
similarity index 95%
rename from src/main/java/com/intel/jndn/utils/pubsub/On.java
rename to src/main/java/com/intel/jndn/utils/On.java
index 5bb7951..96a84d2 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/On.java
+++ b/src/main/java/com/intel/jndn/utils/On.java
@@ -1,28 +1,28 @@
-/*

- * jndn-utils

- * Copyright (c) 2016, Intel Corporation.

- *

- * This program is free software; you can redistribute it and/or modify it

- * under the terms and conditions of the GNU Lesser General Public License,

- * version 3, as published by the Free Software Foundation.

- *

- * This program is distributed in the hope it will be useful, but WITHOUT ANY

- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

- * more details.

- */

-

-package com.intel.jndn.utils.pubsub;

-

-/**

- * Generic callback API for receiving some signal T when an event is fired.

- *

- * @author Andrew Brown, andrew.brown@intel.com

- */

-@FunctionalInterface

-public interface On<T> {

-  /**

-   * @param event the event object describing the occurrence

-   */

-  void on(T event);

-}

+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Generic callback API for receiving some signal T when an event is fired.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+@FunctionalInterface
+public interface On<T> {
+  /**
+   * @param event the event object describing the occurrence
+   */
+  void on(T event);
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
similarity index 79%
rename from src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java
rename to src/main/java/com/intel/jndn/utils/PendingInterestTable.java
index 410a565..3e49f11 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java
+++ b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
@@ -1,42 +1,46 @@
-/*

- * jndn-utils

- * Copyright (c) 2016, Intel Corporation.

- *

- * This program is free software; you can redistribute it and/or modify it

- * under the terms and conditions of the GNU Lesser General Public License,

- * version 3, as published by the Free Software Foundation.

- *

- * This program is distributed in the hope it will be useful, but WITHOUT ANY

- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

- * more details.

- */

-

-package com.intel.jndn.utils.pubsub;

-

-import net.named_data.jndn.Interest;

-import net.named_data.jndn.Name;

-

-import java.util.Collection;

-

-/**

- * @author Andrew Brown, andrew.brown@intel.com

- */

-public interface PendingInterestTable {

-  /**

-   * @param interest the PIT entry to add

-   */

-  void add(Interest interest);

-

-  /**

-   * @param interest the incoming interest to match against

-   * @return true if the interest matches an entry already in the PIT

-   */

-  boolean has(Interest interest);

-

-  /**

-   * @param name the name to match against

-   * @return the PIT entries matching a name, removing them from the PIT

-   */

-  Collection<Interest> extract(Name name);

-}

+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+
+/**
+ * Represent a pending interest table; the jndn {@link net.named_data.jndn.impl.PendingInterestTable} depended on
+ * several patterns such as List output parameters for collecting output and did not allow querying without extracting
+ * (see {@link #has(Interest)}).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface PendingInterestTable {
+  /**
+   * @param interest the PIT entry to add
+   */
+  void add(Interest interest);
+
+  /**
+   * @param interest the incoming interest to match against
+   * @return true if the interest matches an entry already in the PIT
+   */
+  boolean has(Interest interest);
+
+  /**
+   * @param name the name to match against
+   * @return the PIT entries matching a name, removing them from the PIT
+   */
+  Collection<Interest> extract(Name name);
+}
diff --git a/src/main/java/com/intel/jndn/utils/PushableRepository.java b/src/main/java/com/intel/jndn/utils/PushableRepository.java
deleted file mode 100644
index 4da471a..0000000
--- a/src/main/java/com/intel/jndn/utils/PushableRepository.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils;
-
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-
-import java.io.IOException;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public interface PushableRepository extends Repository {
-  /**
-   * Write data to a face. Each name must correspond to one datum, but the implementation may choose to write these
-   * as separate data packets (e.g. as segments of a file).
-   *
-   * @param face the face to write the data to
-   * @param name the name of the data to write
-   * @throws IOException if the writing fails
-   */
-  void push(Face face, Name name) throws IOException;
-}
diff --git a/src/main/java/com/intel/jndn/utils/Subscriber.java b/src/main/java/com/intel/jndn/utils/Subscriber.java
index 89e5168..2830cbc 100644
--- a/src/main/java/com/intel/jndn/utils/Subscriber.java
+++ b/src/main/java/com/intel/jndn/utils/Subscriber.java
@@ -14,18 +14,22 @@
 

 package com.intel.jndn.utils;

 

-import com.intel.jndn.utils.pubsub.Cancellation;

-import com.intel.jndn.utils.pubsub.On;

-import net.named_data.jndn.util.Blob;

+import java.io.IOException;

+import java.util.Set;

 

 /**

  * @author Andrew Brown, andrew.brown@intel.com

  */

 public interface Subscriber extends AutoCloseable {

   /**

-   * @param onMessage called every time a new message is received

-   * @param onError called every time an error occurs

-   * @return a cancellation token for stopping the subscription

+   * @return the currently known publishers

    */

-  Cancellation subscribe(On<Blob> onMessage, On<Exception> onError);

+  Set<Long> knownPublishers();

+

+  /**

+   * Open the subscriber transport mechanisms and retrieve publisher messages

+   *

+   * @throws IOException if the subscriber fails during network access

+   */

+  void open() throws IOException;

 }

diff --git a/src/main/java/com/intel/jndn/utils/Topic.java b/src/main/java/com/intel/jndn/utils/Topic.java
new file mode 100644
index 0000000..8816e5d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Topic.java
@@ -0,0 +1,68 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.pubsub.PubSubFactory;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+
+/**
+ * A publish-subscrib topic; see the {@code pubsub} package for implementation details
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public final class Topic {
+  private final Name name;
+
+  /**
+   * @param name the NDN name of the topic
+   */
+  public Topic(Name name) {
+    this.name = name;
+  }
+
+  /**
+   * @return the NDN name of the topic; the return is wrapped so that the original value is immutable and the return
+   * type can be modified
+   */
+  public Name name() {
+    return new Name(name);
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @param onMessage callback fired when a message is received
+   * @param onError callback fired when an error happens after subscription
+   * @return an open subscriber
+   * @throws IOException if the subscription fails
+   */
+  public Subscriber subscribe(Face face, On<Blob> onMessage, On<Exception> onError) throws IOException {
+    return PubSubFactory.newSubscriber(face, name, onMessage, onError);
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @return a factory-built publisher
+   */
+  public Publisher newPublisher(Face face) {
+    return PubSubFactory.newPublisher(face, name);
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
deleted file mode 100644
index 5b269bf..0000000
--- a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.impl;
-
-import com.intel.jndn.utils.ContentStore;
-import com.intel.jndn.utils.NameTree;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public class BoundedInMemoryContentStore implements ContentStore {
-  private final NameTree<Blob> store;
-  private final Data template;
-
-  public BoundedInMemoryContentStore(int maxSize, int freshnessMs) {
-    this.template = new Data();
-    this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
-    this.store = DefaultNameTree.newRootTree();
-  }
-
-  @Override
-  public void put(Name name, Blob data) {
-    store.insert(name, data);
-  }
-
-  @Override
-  public Optional<Blob> get(Interest interest) {
-    Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
-    if (!possibleBlob.isPresent()) {
-      return Optional.empty();
-    } else if (hasSelectors(interest)) {
-      List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
-      if (children.isEmpty()) {
-        return Optional.empty();
-      } else if (children.size() == 1) {
-        return children.get(0).content();
-      } else {
-        if (isRightMost(interest)) {
-          Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
-        } else if (isLeftMost(interest)) {
-          Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
-        }
-        return children.get(0).content();
-      }
-
-      // TODO max min suffix components
-
-    } else {
-      return possibleBlob.get().content();
-    }
-  }
-
-  // TODO merge with above
-  private Optional<NamePair> getNamePair(Interest interest) {
-    Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
-    if (!possibleBlob.isPresent()) {
-      return Optional.empty();
-    } else if (hasSelectors(interest)) {
-      List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
-      if (children.isEmpty()) {
-        return Optional.empty();
-      } else if (children.size() == 1) {
-        return NamePair.fromContent(children.get(0));
-      } else {
-        if (isRightMost(interest)) {
-          Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
-        } else if (isLeftMost(interest)) {
-          Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
-        }
-        return NamePair.fromContent(children.get(0));
-      }
-
-      // TODO max min suffix components
-
-    } else {
-      return NamePair.fromContent(possibleBlob.get());
-    }
-  }
-
-  private boolean hasSelectors(Interest interest) {
-    return interest.getChildSelector() != -1 || interest.getExclude().size() > 0;
-  }
-
-  private boolean isRightMost(Interest interest) {
-    return interest.getChildSelector() == Interest.CHILD_SELECTOR_RIGHT;
-  }
-
-  private boolean isLeftMost(Interest interest) {
-    return interest.getChildSelector() == Interest.CHILD_SELECTOR_LEFT;
-  }
-
-  @Override
-  public boolean has(Name name) {
-    return store.find(name).isPresent();
-  }
-
-  @Override
-  public boolean has(Interest interest) {
-    return get(interest).isPresent();
-  }
-
-  @Override
-  public Optional<Blob> get(Name name) {
-    Optional<NameTree<Blob>> tree = store.find(name);
-    return tree.isPresent() ? tree.get().content() : Optional.empty();
-  }
-
-  @Override
-  public void push(Face face, Name name) throws IOException {
-    Optional<Blob> blob = get(name);
-    if (blob.isPresent()) {
-      Data t = new Data(template);
-      t.setName(name);
-      ByteArrayInputStream b = new ByteArrayInputStream(blob.get().getImmutableArray());
-      for (Data d : SegmentationHelper.segment(t, b)) {
-        face.putData(d);
-      }
-    }
-  }
-
-  @Override
-  public void push(Face face, Interest interest) throws IOException {
-    Optional<NamePair> pair = getNamePair(interest);
-    if (pair.isPresent()) {
-      Data t = new Data(template);
-      t.setName(pair.get().name);
-      ByteArrayInputStream b = new ByteArrayInputStream(pair.get().blob.getImmutableArray());
-      for (Data d : SegmentationHelper.segment(t, b)) {
-        face.putData(d);
-      }
-    }
-  }
-
-  @Override
-  public void clear() {
-    store.clear();
-  }
-
-  private static class NamePair {
-    public final Name name;
-    public final Blob blob;
-
-    public NamePair(Name name, Blob blob) {
-      this.name = name;
-      this.blob = blob;
-    }
-
-    static Optional<NamePair> fromContent(NameTree<Blob> leaf) {
-      Optional<Blob> content = leaf.content();
-      if (content.isPresent()) {
-        return Optional.of(new NamePair(leaf.fullName(), leaf.content().get()));
-      } else {
-        return Optional.empty();
-      }
-    }
-  }
-}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
index 460d08f..5ed94cb 100644
--- a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
@@ -14,7 +14,7 @@
 
 package com.intel.jndn.utils.impl;
 
-import com.intel.jndn.utils.pubsub.PendingInterestTable;
+import com.intel.jndn.utils.PendingInterestTable;
 import net.named_data.jndn.Interest;
 import net.named_data.jndn.Name;
 
@@ -24,6 +24,8 @@
 import java.util.stream.Collectors;
 
 /**
+ * TODO use NameTree for storage? or Set and override Interest equals()
+ *
  * @author Andrew Brown, andrew.brown@intel.com
  */
 public class BoundedInMemoryPendingInterestTable implements PendingInterestTable {
@@ -44,6 +46,7 @@
   public boolean has(Interest interest) {
     if (interest.getChildSelector() != -1) {
       for (Name name : table.keySet()) {
+        // TODO this logic must be more complex; must match selectors as well
         if (interest.matchesName(name)) {
           return true;
         }
diff --git a/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
index 5916cc3..048f304 100644
--- a/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
+++ b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
@@ -19,13 +19,15 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 /**
- * // TODO need a way to bound the size
+ * TODO need a way to bound the size
+ *
  * @author Andrew Brown, andrew.brown@intel.com
  */
 public class DefaultNameTree<T> implements NameTree<T> {
@@ -49,7 +51,14 @@
 
   @Override
   public Name fullName() {
-    throw new UnsupportedOperationException("Need recursive call");
+    ArrayList<Name.Component> components = new ArrayList<>();
+    NameTree<T> self = this;
+    while (self.lastComponent() != null) {
+      components.add(self.lastComponent());
+      self = self.parent();
+    }
+    Collections.reverse(components);
+    return new Name(components);
   }
 
   @Override
@@ -81,7 +90,7 @@
       if (child == null) {
         return Optional.empty();
       } else {
-        return child.find(name.size() > 1 ? name.getSubName(-1) : new Name());
+        return child.find(name.size() > 1 ? name.getSubName(1) : new Name());
       }
     }
   }
diff --git a/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
new file mode 100644
index 0000000..713442c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
@@ -0,0 +1,141 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.ContentStore;
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.logging.Logger;
+
+/**
+ * TODO must bound the size of the content store
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class InMemoryContentStore implements ContentStore {
+  private static final Logger LOGGER = Logger.getLogger(InMemoryContentStore.class.getName());
+  private final NameTree<Blob> store;
+  private final Data template;
+
+  public InMemoryContentStore(int freshnessMs) {
+    this.template = new Data();
+    this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
+    this.store = DefaultNameTree.newRootTree();
+  }
+
+  @Override
+  public void put(Name name, Blob data) {
+    store.insert(name, data);
+  }
+
+  @Override
+  public Optional<Blob> get(Interest interest) {
+    Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+    return leaf.isPresent() ? leaf.get().content() : Optional.empty();
+  }
+
+  private Optional<NameTree<Blob>> getWithSelectors(Interest interest) {
+    Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
+    if (possibleBlob.isPresent() && hasSelectors(interest)) {
+      List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
+      if (children.isEmpty()) {
+        return Optional.empty();
+      } else if (children.size() == 1) {
+        return Optional.of(children.get(0));
+      } else if (isRightMost(interest)) {
+        Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
+        return Optional.of(children.get(0));
+      } else if (isLeftMost(interest)) {
+        Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
+        return Optional.of(children.get(0));
+      } else {
+        // TODO max/min suffix components and excludes
+        LOGGER.warning("The interest requested has selectors not yet implemented; returning empty content");
+        return Optional.empty();
+      }
+    }
+    return possibleBlob;
+  }
+
+  private static boolean hasSelectors(Interest interest) {
+    return interest.getChildSelector() != -1 || interest.getExclude().size() > 0;
+  }
+
+  private static boolean isRightMost(Interest interest) {
+    return interest.getChildSelector() == Interest.CHILD_SELECTOR_RIGHT;
+  }
+
+  private static boolean isLeftMost(Interest interest) {
+    return interest.getChildSelector() == Interest.CHILD_SELECTOR_LEFT;
+  }
+
+  @Override
+  public boolean has(Name name) {
+    return store.find(name).isPresent();
+  }
+
+  @Override
+  public boolean has(Interest interest) {
+    return get(interest).isPresent();
+  }
+
+  @Override
+  public Optional<Blob> get(Name name) {
+    Optional<NameTree<Blob>> tree = store.find(name);
+    return tree.isPresent() ? tree.get().content() : Optional.empty();
+  }
+
+  @Override
+  public void push(Face face, Name name) throws IOException {
+    Optional<Blob> blob = get(name);
+    if (blob.isPresent()) {
+      Data t = new Data(template);
+      t.setName(name);
+      ByteArrayInputStream b = new ByteArrayInputStream(blob.get().getImmutableArray());
+      for (Data d : SegmentationHelper.segment(t, b)) {
+        face.putData(d);
+      }
+    }
+  }
+
+  @Override
+  public void push(Face face, Interest interest) throws IOException {
+    Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+    if (leaf.isPresent() && leaf.get().content().isPresent()) {
+      Data t = new Data(template);
+      t.setName(leaf.get().fullName());
+      ByteArrayInputStream b = new ByteArrayInputStream(leaf.get().content().get().getImmutableArray());
+      for (Data d : SegmentationHelper.segment(t, b)) {
+        face.putData(d);
+      }
+    }
+  }
+
+  @Override
+  public void clear() {
+    store.clear();
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
index 92be1ba..363a9ce 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
@@ -14,15 +14,50 @@
 

 package com.intel.jndn.utils.pubsub;

 

+import com.intel.jndn.utils.Cancellation;

+import com.intel.jndn.utils.On;

+

 import java.io.IOException;

 

 /**

+ * API for entering and exiting a group and tracking the accompanying announcements.

+ *

  * @author Andrew Brown, andrew.brown@intel.com

  */

 interface AnnouncementService {

+  /**

+   * Announce entrance into the group

+   *

+   * @param id the ID of the agent

+   * @throws IOException if the announcement fails

+   */

   void announceEntrance(long id) throws IOException;

+

+  /**

+   * Announce exit from the group

+   *

+   * @param id the ID of the agent

+   * @throws IOException if the announcement fails

+   */

   void announceExit(long id) throws IOException;

 

+  /**

+   * Discover all previously announced entrances; the implementation is expected to be driven by network IO

+   * @param onFound callback fired when a group member is discovered

+   * @param onComplete callback fired when all group members have been discovered

+   * @param onError callback fired if errors occur during discovery

+   * @return a cancellation token for stopping discovery

+   * @throws IOException if the network fails during discovery

+   */

   Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException;

-  Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException;

+

+  /**

+   * Observe any new entrances/exits; the implementation is expected to be driven by network IO

+   * @param onAdded callback fired when a group member announces its entry into the group, see {@link #announceEntrance(long)}

+   * @param onRemoved callback fired when a group member announces its exit from the group, see {@link #announceExit(long)}

+   * @param onError callback fired if errors are encountered while processing announcements

+   * @return a cancellation token for stopping the observer

+   * @throws IOException if the network fails while setting up the observer

+   */

+  Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException;

 }

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java b/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java
deleted file mode 100644
index 2fe42df..0000000
--- a/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*

- * jndn-utils

- * Copyright (c) 2016, Intel Corporation.

- *

- * This program is free software; you can redistribute it and/or modify it

- * under the terms and conditions of the GNU Lesser General Public License,

- * version 3, as published by the Free Software Foundation.

- *

- * This program is distributed in the hope it will be useful, but WITHOUT ANY

- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

- * more details.

- */

-

-package com.intel.jndn.utils.pubsub;

-

-/**

- * @author Andrew Brown, andrew.brown@intel.com

- */

-public interface Cancellation {

-  Cancellation CANCELLED = () -> {/* do nothing */};

-

-  void cancel();

-}

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
index 654051f..b1e5180 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
@@ -14,6 +14,8 @@
 

 package com.intel.jndn.utils.pubsub;

 

+import com.intel.jndn.utils.Cancellation;

+import com.intel.jndn.utils.On;

 import com.intel.jndn.utils.client.impl.BackoffRetryClient;

 import net.named_data.jndn.Exclude;

 import net.named_data.jndn.Face;

@@ -32,7 +34,7 @@
 import java.util.logging.Level;

 import java.util.logging.Logger;

 

-import static com.intel.jndn.utils.pubsub.Cancellation.CANCELLED;

+import static com.intel.jndn.utils.Cancellation.CANCELLED;

 

 /**

  * @author Andrew Brown, andrew.brown@intel.com

@@ -91,6 +93,7 @@
     return () -> stopped = true;

   }

 

+  // TODO need special namespace for discovery

   private Interest discover(BackoffRetryClient client, On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException {

     Interest interest = new Interest(topicPrefix);

     interest.setInterestLifetimeMilliseconds(STARTING_DISCOVERY_LIFETIME);

@@ -141,7 +144,7 @@
   }

 

   @Override

-  public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException {

+  public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException {

     LOGGER.log(Level.INFO, "Observing new announcements: {0}", broadcastPrefix);

     CompletableFuture<Void> future = new CompletableFuture<>();

     OnRegistration onRegistration = new OnRegistration(future);

@@ -150,8 +153,8 @@
     try {

       long registeredPrefix = face.registerPrefix(broadcastPrefix, onAnnouncement, (OnRegisterFailed) onRegistration, onRegistration);

       return () -> face.removeRegisteredPrefix(registeredPrefix);

-    } catch (IOException | SecurityException e) {

-      throw new RegistrationFailureException(e);

+    } catch (SecurityException e) {

+      throw new IOException("Failed while using transport security key chain", e);

     }

   }

 

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
index 48c11bb..adb069b 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -15,6 +15,7 @@
 package com.intel.jndn.utils.pubsub;

 

 import com.intel.jndn.utils.ContentStore;

+import com.intel.jndn.utils.PendingInterestTable;

 import com.intel.jndn.utils.Publisher;

 import net.named_data.jndn.Data;

 import net.named_data.jndn.Face;

@@ -36,6 +37,8 @@
 import java.util.logging.Logger;

 

 /**

+ * TODO look at thread safety

+ *

  * @author Andrew Brown, andrew.brown@intel.com

  */

 class NdnPublisher implements Publisher, OnInterestCallback {

@@ -60,7 +63,7 @@
     this.contentStore = contentStore;

   }

 

-  synchronized void open() throws RegistrationFailureException {

+  synchronized void open() throws IOException {

     opened = true;

     CompletableFuture<Void> future = new CompletableFuture<>();

     OnRegistration onRegistration = new OnRegistration(future);

@@ -71,12 +74,16 @@
       future.get(10, TimeUnit.SECONDS);

       announcementService.announceEntrance(publisherId);

     } catch (IOException | SecurityException | InterruptedException | ExecutionException | TimeoutException e) {

-      throw new RegistrationFailureException(e);

+      throw new IOException("Failed to register NDN prefix; pub-sub IO will be impossible", e);

     }

   }

 

-  // TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers

-  // to retrieve still-alive messages

+  /**

+   * TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers

+   * to retrieve still-alive messages

+   *

+   * @throws IOException if the group exit announcement fails

+   */

   @Override

   public synchronized void close() throws IOException {

     if (opened) {

@@ -89,17 +96,13 @@
   @Override

   public void publish(Blob message) throws IOException {

     if (!opened) {

-      try {

-        open();

-      } catch (RegistrationFailureException e) {

-        throw new IOException(e);

-      }

+      open();

     }

 

     long id = latestMessageId.getAndIncrement();

-    Name name = PubSubNamespace.toMessageName(prefix, publisherId, id);

+    Name name = PubSubNamespace.toMessageName(prefix, id);

 

-    contentStore.put(name, message);

+    contentStore.put(name, PubSubNamespace.toResponse(null, message));

     LOGGER.log(Level.INFO, "Published message {0} to content store: {1}", new Object[]{id, name});

 

     if (pendingInterestTable.has(new Interest(name))) {

@@ -122,7 +125,7 @@
     }

   }

 

-  private boolean isAttributesRequest(Name name, Interest interest) {

+  private static boolean isAttributesRequest(Name name, Interest interest) {

     return name.equals(interest.getName()) && interest.getChildSelector() == -1;

   }

 

@@ -142,7 +145,7 @@
     }

   }

 

-  private void sendAttributes(Face face, Name publisherName) {

+  private static void sendAttributes(Face face, Name publisherName) {

     Data data = new Data(publisherName);

     data.setContent(new Blob("[attributes here]"));

     data.getMetaInfo().setFreshnessPeriod(ATTRIBUTES_FRESHNESS_PERIOD);

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
index 37f71fa..51aee3a 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
@@ -14,7 +14,9 @@
 

 package com.intel.jndn.utils.pubsub;

 

+import com.intel.jndn.utils.Cancellation;

 import com.intel.jndn.utils.Client;

+import com.intel.jndn.utils.On;

 import com.intel.jndn.utils.Subscriber;

 import net.named_data.jndn.Data;

 import net.named_data.jndn.Face;

@@ -24,14 +26,16 @@
 import net.named_data.jndn.util.Blob;

 

 import java.io.IOException;

-import java.util.HashMap;

 import java.util.Map;

 import java.util.Set;

 import java.util.concurrent.CompletableFuture;

+import java.util.concurrent.ConcurrentHashMap;

 import java.util.logging.Level;

 import java.util.logging.Logger;

 

 /**

+ * TODO look at thread safety

+ *

  * @author Andrew Brown, andrew.brown@intel.com

  */

 class NdnSubscriber implements Subscriber {

@@ -42,7 +46,7 @@
   private final On<Exception> onError;

   private final AnnouncementService announcementService;

   private final Client client;

-  private final Map<Long, Subscription> subscriptions = new HashMap<>();

+  private final Map<Long, Subscription> subscriptions = new ConcurrentHashMap<>();

   private Cancellation newAnnouncementCancellation;

   private Cancellation existingAnnouncementsCancellation;

 

@@ -55,13 +59,19 @@
     this.client = client;

   }

 

-  void open() throws RegistrationFailureException, IOException {

-    LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);

-    existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::add, null, e -> close());

-    newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::add, this::remove, e -> close());

+  @Override

+  public Set<Long> knownPublishers() {

+    return subscriptions.keySet();

   }

 

-  void add(long publisherId) {

+  @Override

+  public void open() throws IOException {

+    LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);

+    existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::addPublisher, null, e -> close());

+    newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::addPublisher, this::removePublisher, e -> close());

+  }

+

+  void addPublisher(long publisherId) {

     if (subscriptions.containsKey(publisherId)) {

       LOGGER.log(Level.WARNING, "Duplicate publisher ID {} received from announcement service; this should not happen and will be ignored", publisherId);

     } else {

@@ -71,7 +81,7 @@
     }

   }

 

-  void remove(long publisherId) {

+  void removePublisher(long publisherId) {

     Subscription removed = subscriptions.remove(publisherId);

     removed.cancel();

   }

@@ -93,32 +103,6 @@
     }

   }

 

-  Set<Long> knownPublishers() {

-    return subscriptions.keySet();

-  }

-

-  // TODO repeated calls?

-  // TODO remove this, do topic.subscribe(On..., On...) instead; or new Subscriber(On..., On..., ...).open()

-  @Override

-  public Cancellation subscribe(On<Blob> onMessage, On<Exception> onError) {

-//    if (!started) {

-//      try {

-//        open();

-//      } catch (RegistrationFailureException e) {

-//        LOGGER.log(Level.SEVERE, "Failed to start announcement service, aborting subscription", e);

-//        onError.on(e);

-//        return Cancellation.CANCELLED;

-//      }

-//    }

-//

-//    LOGGER.log(Level.INFO, "Subscribing: {0}", prefix);

-//    for (Subscription c : subscriptions.values()) {

-//      c.subscribe();

-//    }

-//

-    return this::close;

-  }

-

   private class Subscription implements Cancellation {

     final long publisherId;

     long messageId;

@@ -128,12 +112,6 @@
       this.publisherId = publisherId;

     }

 

-    synchronized void subscribe() {

-      // would prefer this to be getAsync(on<>, on<>)?

-      currentRequest = client.getAsync(face, buildLatestInterest(publisherId)); // TODO backoff

-      currentRequest.handle(this::handleResponse);

-    }

-

     @Override

     public synchronized void cancel() {

       if (currentRequest != null) {

@@ -141,12 +119,35 @@
       }

     }

 

+    synchronized void subscribe() {

+      // would prefer this to be getAsync(on<>, on<>)?

+      currentRequest = client.getAsync(face, buildLatestInterest(publisherId));

+      currentRequest.handle(this::handleResponse);

+    }

+

+    private Interest buildLatestInterest(long publisherId) {

+      Name name = PubSubNamespace.toPublisherName(prefix, publisherId);

+      Interest interest = new Interest(name); // TODO ms lifetime

+      interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);

+      return interest;

+    }

+

+    synchronized void next(long publisherId, long messageId) {

+      currentRequest = client.getAsync(face, buildNextInterest(publisherId, messageId));

+      currentRequest.handle(this::handleResponse);

+    }

+

+    private Interest buildNextInterest(long publisherId, long messageId) {

+      Name name = PubSubNamespace.toMessageName(prefix, publisherId, messageId + 1);

+      return new Interest(name); // TODO ms lifetime

+    }

+

     private Void handleResponse(Data data, Throwable throwable) {

       if (throwable != null) {

         onError.on((Exception) throwable); // TODO avoid cast?

       } else {

         try {

-          Response response = PubSubNamespace.toResponse(data);

+          Response response = PubSubNamespace.parseResponse(data);

           this.messageId = response.messageId();

           onMessage.on(response.content()); // TODO buffer and catch exceptions

         } catch (EncodingException e) {

@@ -160,23 +161,6 @@
       return null;

     }

 

-    private void next(long publisherId, long messageId) {

-      currentRequest = client.getAsync(face, buildNextInterest(publisherId, messageId));

-      currentRequest.handle(this::handleResponse);

-    }

-

-    private Interest buildLatestInterest(long publisherId) {

-      Name name = PubSubNamespace.toPublisherName(prefix, publisherId);

-      Interest interest = new Interest(name); // TODO ms lifetime

-      interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);

-      return interest;

-    }

-

-    private Interest buildNextInterest(long publisherId, long messageId) {

-      Name name = PubSubNamespace.toMessageName(prefix, publisherId, messageId + 1);

-      return new Interest(name); // TODO ms lifetime

-    }

-

     @Override

     public boolean equals(Object o) {

       if (this == o) {

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
index 05f631c..7b5a160 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
@@ -22,6 +22,8 @@
 import java.util.logging.Logger;

 

 /**

+ * Helper for tracking and logging prefix registrations; TODO replace or remove this in the future

+ *

  * @author Andrew Brown, andrew.brown@intel.com

  */

 class OnRegistration implements OnRegisterFailed, OnRegisterSuccess {

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
new file mode 100644
index 0000000..c0caddc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import com.intel.jndn.utils.impl.InMemoryContentStore;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Assemble the necessary elements for building the {@link Publisher} and {@link Subscriber} implementations
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class PubSubFactory {
+
+  private PubSubFactory() {
+    // do not instantiate this factory
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @param prefix the NDN namespace under which messages are published
+   * @param onMessage callback fired when a message is received
+   * @param onError callback fired when an error happens after subscription
+   * @return an open, publisher-discovering, message-retrieving subscriber
+   * @throws IOException if the initial subscription or announcement IO fails
+   */
+  public static Subscriber newSubscriber(Face face, Name prefix, On<Blob> onMessage, On<Exception> onError) throws IOException {
+    NdnAnnouncementService announcementService = new NdnAnnouncementService(face, prefix);
+    AdvancedClient client = new AdvancedClient();
+    NdnSubscriber subscriber = new NdnSubscriber(face, prefix, onMessage, onError, announcementService, client);
+    subscriber.open();
+    return subscriber;
+  }
+
+  /**
+   * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+   * @param prefix the NDN namespace under which messages are published
+   * @return a group-announcing, unopened subscriber (it will automatically open on first publish)
+   */
+  public static Publisher newPublisher(Face face, Name prefix) {
+    long publisherId = Math.abs(new Random().nextLong());
+    return new NdnPublisher(face, prefix, publisherId, new NdnAnnouncementService(face, prefix), new BoundedInMemoryPendingInterestTable(1024), new InMemoryContentStore(2000));
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
index 94da2db..f524a10 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
@@ -17,7 +17,9 @@
 import net.named_data.jndn.Data;

 import net.named_data.jndn.Name;

 import net.named_data.jndn.encoding.EncodingException;

+import net.named_data.jndn.encoding.tlv.Tlv;

 import net.named_data.jndn.encoding.tlv.TlvDecoder;

+import net.named_data.jndn.encoding.tlv.TlvEncoder;

 import net.named_data.jndn.util.Blob;

 

 import java.util.Collections;

@@ -36,6 +38,7 @@
   static final int MESSAGE_CONTENT_MARKER = 102;

 

   static final Name DEFAULT_BROADCAST_PREFIX = new Name("/ndn/broadcast");

+  public static final int MESSAGE_PAYLOAD_INITIAL_CAPACITY = 4096;

 

   private PubSubNamespace() {

     // do not instantiate this class

@@ -49,18 +52,48 @@
     return new Name(prefix).append(toPublisherComponent(publisherId));

   }

 

+  static long parsePublisher(Name name) throws EncodingException {

+    return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);

+  }

+

   static Name toAnnouncement(Name prefix, long publisherId, Announcement announcement) {

     Name.Component announcementComponent = Name.Component.fromNumberWithMarker(announcement.id, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);

     return toPublisherName(prefix, publisherId).append(announcementComponent);

   }

 

+  static Announcement parseAnnouncement(Name name) throws EncodingException {

+    return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));

+  }

+

   static Name toMessageName(Name prefix, long publisherId, long messageId) {

     Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);

     return toPublisherName(prefix, publisherId).append(messageComponent);

   }

 

-  static Response toResponse(Data data) throws EncodingException {

-    long messageId = findMarkerFromEnd(data.getName(), MESSAGE_ID_MARKER);

+  static Name toMessageName(Name publisherPrefix, long messageId) {

+    Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);

+    return new Name(publisherPrefix).append(messageComponent);

+  }

+

+  static long parseMessageId(Name name) throws EncodingException {

+    return findMarkerFromEnd(name, MESSAGE_ID_MARKER);

+  }

+

+  static Blob toResponse(Blob attributes, Blob content) {

+    TlvEncoder encoder = new TlvEncoder(MESSAGE_PAYLOAD_INITIAL_CAPACITY);

+    int initialLength = encoder.getLength();

+

+    encoder.writeBlobTlv(MESSAGE_CONTENT_MARKER, content.buf()); // encode backwards, see Tlv0_1_1WireFormat.java

+    if(attributes != null && attributes.size() > 0){

+      encoder.writeBlobTlv(MESSAGE_ATTRIBUTES_MARKER, content.buf());

+    }

+    encoder.writeTypeAndLength(MESSAGE_MARKER, encoder.getLength() - initialLength);

+

+    return new Blob(encoder.getOutput(), false);

+  }

+

+  static Response parseResponse(Data data) throws EncodingException {

+    long messageId = parseMessageId(data.getName());

     TlvDecoder decoder = new TlvDecoder(data.getContent().buf());

     int endOffset = decoder.readNestedTlvsStart(MESSAGE_MARKER);

     Blob attributes = new Blob(decoder.readOptionalBlobTlv(MESSAGE_ATTRIBUTES_MARKER, endOffset), true);

@@ -68,14 +101,6 @@
     return new Response(data.getName(), messageId, Collections.singletonMap("*", attributes), content);

   }

 

-  static long parsePublisher(Name name) throws EncodingException {

-    return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);

-  }

-

-  static Announcement parseAnnouncement(Name name) throws EncodingException {

-    return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));

-  }

-

   private static long findMarkerFromEnd(Name name, int marker) throws EncodingException {

     for (int i = name.size() - 1; i >= 0; i--) {

       try {

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Response.java b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
index 9d84f44..29aa1cd 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/Response.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
@@ -20,6 +20,11 @@
 import java.util.Map;

 

 /**

+ * Represent a publisher response to a subscriber request for a message. Note that the {@link #attributes} act as a type

+ * of header optionally prepended to the message content. The intent of this is to allow publishers to inform

+ * subscribers of publisher-side state changes without requiring a separate mechanism (with accompanying complexity and

+ * overhead); the full set of publisher state attributes should be served elsewhere, not in this class.

+ *

  * @author Andrew Brown, andrew.brown@intel.com

  */

 class Response {

@@ -35,10 +40,16 @@
     this.content = content;

   }

 

+  /**

+   * @return the message ID of the published message

+   */

   long messageId() {

     return messageId;

   }

 

+  /**

+   * @return the content of the message

+   */

   Blob content() {

     return content;

   }

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
deleted file mode 100644
index b0b4ad4..0000000
--- a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*

- * jndn-utils

- * Copyright (c) 2016, Intel Corporation.

- *

- * This program is free software; you can redistribute it and/or modify it

- * under the terms and conditions of the GNU Lesser General Public License,

- * version 3, as published by the Free Software Foundation.

- *

- * This program is distributed in the hope it will be useful, but WITHOUT ANY

- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS

- * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for

- * more details.

- */

-

-package com.intel.jndn.utils.pubsub;

-

-import com.intel.jndn.utils.Publisher;

-import com.intel.jndn.utils.Subscriber;

-import com.intel.jndn.utils.client.impl.AdvancedClient;

-import com.intel.jndn.utils.impl.BoundedInMemoryContentStore;

-import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;

-import net.named_data.jndn.Face;

-import net.named_data.jndn.Name;

-import net.named_data.jndn.util.Blob;

-

-import java.io.IOException;

-import java.util.Random;

-

-/**

- * @author Andrew Brown, andrew.brown@intel.com

- */

-public final class Topic {

-  private final Name name;

-

-  public Topic(Name name) {

-    this.name = name;

-  }

-

-  public Name name() {

-    return new Name(name);

-  }

-

-  // TODO move to PubSubFactory? change this to subscribe()

-  public Subscriber subscribe(Face face, On<Blob> onMessage, On<Exception> onError) throws RegistrationFailureException, IOException {

-    NdnSubscriber subscriber = new NdnSubscriber(face, name, onMessage, onError, new NdnAnnouncementService(face, name), new AdvancedClient());

-    subscriber.open();

-    return subscriber;

-  }

-

-  // TODO move to PubSubFactory? change this to publish()

-  public Publisher newPublisher(Face face) {

-    long publisherId = Math.abs(new Random().nextLong());

-    return new NdnPublisher(face, name, publisherId, new NdnAnnouncementService(face, name), new BoundedInMemoryPendingInterestTable(1024), new BoundedInMemoryContentStore(1024, 2000));

-  }

-}