Create working pub-sub integration test
Also, refactors locations of some classes
diff --git a/src/main/java/com/intel/jndn/utils/Cancellation.java b/src/main/java/com/intel/jndn/utils/Cancellation.java
new file mode 100644
index 0000000..1a8e29f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Cancellation.java
@@ -0,0 +1,35 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Represents a token used for canceling some ongoing action. This approach was chosen as it is lighter and more
+ * flexible than a cancellable {@link java.util.concurrent.Future}; perhaps it could be replaced by {@link
+ * AutoCloseable} although this seems to imply something else (however, the language support is a benefit, TODO
+ * re-look at this).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Cancellation {
+ /**
+ * Shortcut for a no-op, already-cancelled cancellation
+ */
+ Cancellation CANCELLED = () -> {/* do nothing */};
+
+ /**
+ * Cancel the action referenced by this token
+ */
+ void cancel();
+}
diff --git a/src/main/java/com/intel/jndn/utils/ContentStore.java b/src/main/java/com/intel/jndn/utils/ContentStore.java
index e0ccc51..e6ad68c 100644
--- a/src/main/java/com/intel/jndn/utils/ContentStore.java
+++ b/src/main/java/com/intel/jndn/utils/ContentStore.java
@@ -23,7 +23,7 @@
import java.util.Optional;
/**
- * TODO merge with Repository
+ * Represents a named content store of NDN data which allows querying by name or interest; TODO merge with Repository
*
* @author Andrew Brown, andrew.brown@intel.com
*/
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/On.java b/src/main/java/com/intel/jndn/utils/On.java
similarity index 95%
rename from src/main/java/com/intel/jndn/utils/pubsub/On.java
rename to src/main/java/com/intel/jndn/utils/On.java
index 5bb7951..96a84d2 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/On.java
+++ b/src/main/java/com/intel/jndn/utils/On.java
@@ -1,28 +1,28 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.pubsub;
-
-/**
- * Generic callback API for receiving some signal T when an event is fired.
- *
- * @author Andrew Brown, andrew.brown@intel.com
- */
-@FunctionalInterface
-public interface On<T> {
- /**
- * @param event the event object describing the occurrence
- */
- void on(T event);
-}
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Generic callback API for receiving some signal T when an event is fired.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+@FunctionalInterface
+public interface On<T> {
+ /**
+ * @param event the event object describing the occurrence
+ */
+ void on(T event);
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
similarity index 79%
rename from src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java
rename to src/main/java/com/intel/jndn/utils/PendingInterestTable.java
index 410a565..3e49f11 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java
+++ b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
@@ -1,42 +1,46 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.pubsub;
-
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-
-import java.util.Collection;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public interface PendingInterestTable {
- /**
- * @param interest the PIT entry to add
- */
- void add(Interest interest);
-
- /**
- * @param interest the incoming interest to match against
- * @return true if the interest matches an entry already in the PIT
- */
- boolean has(Interest interest);
-
- /**
- * @param name the name to match against
- * @return the PIT entries matching a name, removing them from the PIT
- */
- Collection<Interest> extract(Name name);
-}
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+
+/**
+ * Represent a pending interest table; the jndn {@link net.named_data.jndn.impl.PendingInterestTable} depended on
+ * several patterns such as List output parameters for collecting output and did not allow querying without extracting
+ * (see {@link #has(Interest)}).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface PendingInterestTable {
+ /**
+ * @param interest the PIT entry to add
+ */
+ void add(Interest interest);
+
+ /**
+ * @param interest the incoming interest to match against
+ * @return true if the interest matches an entry already in the PIT
+ */
+ boolean has(Interest interest);
+
+ /**
+ * @param name the name to match against
+ * @return the PIT entries matching a name, removing them from the PIT
+ */
+ Collection<Interest> extract(Name name);
+}
diff --git a/src/main/java/com/intel/jndn/utils/PushableRepository.java b/src/main/java/com/intel/jndn/utils/PushableRepository.java
deleted file mode 100644
index 4da471a..0000000
--- a/src/main/java/com/intel/jndn/utils/PushableRepository.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils;
-
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-
-import java.io.IOException;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public interface PushableRepository extends Repository {
- /**
- * Write data to a face. Each name must correspond to one datum, but the implementation may choose to write these
- * as separate data packets (e.g. as segments of a file).
- *
- * @param face the face to write the data to
- * @param name the name of the data to write
- * @throws IOException if the writing fails
- */
- void push(Face face, Name name) throws IOException;
-}
diff --git a/src/main/java/com/intel/jndn/utils/Subscriber.java b/src/main/java/com/intel/jndn/utils/Subscriber.java
index 89e5168..2830cbc 100644
--- a/src/main/java/com/intel/jndn/utils/Subscriber.java
+++ b/src/main/java/com/intel/jndn/utils/Subscriber.java
@@ -14,18 +14,22 @@
package com.intel.jndn.utils;
-import com.intel.jndn.utils.pubsub.Cancellation;
-import com.intel.jndn.utils.pubsub.On;
-import net.named_data.jndn.util.Blob;
+import java.io.IOException;
+import java.util.Set;
/**
* @author Andrew Brown, andrew.brown@intel.com
*/
public interface Subscriber extends AutoCloseable {
/**
- * @param onMessage called every time a new message is received
- * @param onError called every time an error occurs
- * @return a cancellation token for stopping the subscription
+ * @return the currently known publishers
*/
- Cancellation subscribe(On<Blob> onMessage, On<Exception> onError);
+ Set<Long> knownPublishers();
+
+ /**
+ * Open the subscriber transport mechanisms and retrieve publisher messages
+ *
+ * @throws IOException if the subscriber fails during network access
+ */
+ void open() throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/Topic.java b/src/main/java/com/intel/jndn/utils/Topic.java
new file mode 100644
index 0000000..8816e5d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Topic.java
@@ -0,0 +1,68 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.pubsub.PubSubFactory;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+
+/**
+ * A publish-subscrib topic; see the {@code pubsub} package for implementation details
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public final class Topic {
+ private final Name name;
+
+ /**
+ * @param name the NDN name of the topic
+ */
+ public Topic(Name name) {
+ this.name = name;
+ }
+
+ /**
+ * @return the NDN name of the topic; the return is wrapped so that the original value is immutable and the return
+ * type can be modified
+ */
+ public Name name() {
+ return new Name(name);
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @param onMessage callback fired when a message is received
+ * @param onError callback fired when an error happens after subscription
+ * @return an open subscriber
+ * @throws IOException if the subscription fails
+ */
+ public Subscriber subscribe(Face face, On<Blob> onMessage, On<Exception> onError) throws IOException {
+ return PubSubFactory.newSubscriber(face, name, onMessage, onError);
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @return a factory-built publisher
+ */
+ public Publisher newPublisher(Face face) {
+ return PubSubFactory.newPublisher(face, name);
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
deleted file mode 100644
index 5b269bf..0000000
--- a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.impl;
-
-import com.intel.jndn.utils.ContentStore;
-import com.intel.jndn.utils.NameTree;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public class BoundedInMemoryContentStore implements ContentStore {
- private final NameTree<Blob> store;
- private final Data template;
-
- public BoundedInMemoryContentStore(int maxSize, int freshnessMs) {
- this.template = new Data();
- this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
- this.store = DefaultNameTree.newRootTree();
- }
-
- @Override
- public void put(Name name, Blob data) {
- store.insert(name, data);
- }
-
- @Override
- public Optional<Blob> get(Interest interest) {
- Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
- if (!possibleBlob.isPresent()) {
- return Optional.empty();
- } else if (hasSelectors(interest)) {
- List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
- if (children.isEmpty()) {
- return Optional.empty();
- } else if (children.size() == 1) {
- return children.get(0).content();
- } else {
- if (isRightMost(interest)) {
- Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
- } else if (isLeftMost(interest)) {
- Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
- }
- return children.get(0).content();
- }
-
- // TODO max min suffix components
-
- } else {
- return possibleBlob.get().content();
- }
- }
-
- // TODO merge with above
- private Optional<NamePair> getNamePair(Interest interest) {
- Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
- if (!possibleBlob.isPresent()) {
- return Optional.empty();
- } else if (hasSelectors(interest)) {
- List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
- if (children.isEmpty()) {
- return Optional.empty();
- } else if (children.size() == 1) {
- return NamePair.fromContent(children.get(0));
- } else {
- if (isRightMost(interest)) {
- Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
- } else if (isLeftMost(interest)) {
- Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
- }
- return NamePair.fromContent(children.get(0));
- }
-
- // TODO max min suffix components
-
- } else {
- return NamePair.fromContent(possibleBlob.get());
- }
- }
-
- private boolean hasSelectors(Interest interest) {
- return interest.getChildSelector() != -1 || interest.getExclude().size() > 0;
- }
-
- private boolean isRightMost(Interest interest) {
- return interest.getChildSelector() == Interest.CHILD_SELECTOR_RIGHT;
- }
-
- private boolean isLeftMost(Interest interest) {
- return interest.getChildSelector() == Interest.CHILD_SELECTOR_LEFT;
- }
-
- @Override
- public boolean has(Name name) {
- return store.find(name).isPresent();
- }
-
- @Override
- public boolean has(Interest interest) {
- return get(interest).isPresent();
- }
-
- @Override
- public Optional<Blob> get(Name name) {
- Optional<NameTree<Blob>> tree = store.find(name);
- return tree.isPresent() ? tree.get().content() : Optional.empty();
- }
-
- @Override
- public void push(Face face, Name name) throws IOException {
- Optional<Blob> blob = get(name);
- if (blob.isPresent()) {
- Data t = new Data(template);
- t.setName(name);
- ByteArrayInputStream b = new ByteArrayInputStream(blob.get().getImmutableArray());
- for (Data d : SegmentationHelper.segment(t, b)) {
- face.putData(d);
- }
- }
- }
-
- @Override
- public void push(Face face, Interest interest) throws IOException {
- Optional<NamePair> pair = getNamePair(interest);
- if (pair.isPresent()) {
- Data t = new Data(template);
- t.setName(pair.get().name);
- ByteArrayInputStream b = new ByteArrayInputStream(pair.get().blob.getImmutableArray());
- for (Data d : SegmentationHelper.segment(t, b)) {
- face.putData(d);
- }
- }
- }
-
- @Override
- public void clear() {
- store.clear();
- }
-
- private static class NamePair {
- public final Name name;
- public final Blob blob;
-
- public NamePair(Name name, Blob blob) {
- this.name = name;
- this.blob = blob;
- }
-
- static Optional<NamePair> fromContent(NameTree<Blob> leaf) {
- Optional<Blob> content = leaf.content();
- if (content.isPresent()) {
- return Optional.of(new NamePair(leaf.fullName(), leaf.content().get()));
- } else {
- return Optional.empty();
- }
- }
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
index 460d08f..5ed94cb 100644
--- a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
@@ -14,7 +14,7 @@
package com.intel.jndn.utils.impl;
-import com.intel.jndn.utils.pubsub.PendingInterestTable;
+import com.intel.jndn.utils.PendingInterestTable;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
@@ -24,6 +24,8 @@
import java.util.stream.Collectors;
/**
+ * TODO use NameTree for storage? or Set and override Interest equals()
+ *
* @author Andrew Brown, andrew.brown@intel.com
*/
public class BoundedInMemoryPendingInterestTable implements PendingInterestTable {
@@ -44,6 +46,7 @@
public boolean has(Interest interest) {
if (interest.getChildSelector() != -1) {
for (Name name : table.keySet()) {
+ // TODO this logic must be more complex; must match selectors as well
if (interest.matchesName(name)) {
return true;
}
diff --git a/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
index 5916cc3..048f304 100644
--- a/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
+++ b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
@@ -19,13 +19,15 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
- * // TODO need a way to bound the size
+ * TODO need a way to bound the size
+ *
* @author Andrew Brown, andrew.brown@intel.com
*/
public class DefaultNameTree<T> implements NameTree<T> {
@@ -49,7 +51,14 @@
@Override
public Name fullName() {
- throw new UnsupportedOperationException("Need recursive call");
+ ArrayList<Name.Component> components = new ArrayList<>();
+ NameTree<T> self = this;
+ while (self.lastComponent() != null) {
+ components.add(self.lastComponent());
+ self = self.parent();
+ }
+ Collections.reverse(components);
+ return new Name(components);
}
@Override
@@ -81,7 +90,7 @@
if (child == null) {
return Optional.empty();
} else {
- return child.find(name.size() > 1 ? name.getSubName(-1) : new Name());
+ return child.find(name.size() > 1 ? name.getSubName(1) : new Name());
}
}
}
diff --git a/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
new file mode 100644
index 0000000..713442c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
@@ -0,0 +1,141 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.ContentStore;
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.logging.Logger;
+
+/**
+ * TODO must bound the size of the content store
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class InMemoryContentStore implements ContentStore {
+ private static final Logger LOGGER = Logger.getLogger(InMemoryContentStore.class.getName());
+ private final NameTree<Blob> store;
+ private final Data template;
+
+ public InMemoryContentStore(int freshnessMs) {
+ this.template = new Data();
+ this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
+ this.store = DefaultNameTree.newRootTree();
+ }
+
+ @Override
+ public void put(Name name, Blob data) {
+ store.insert(name, data);
+ }
+
+ @Override
+ public Optional<Blob> get(Interest interest) {
+ Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+ return leaf.isPresent() ? leaf.get().content() : Optional.empty();
+ }
+
+ private Optional<NameTree<Blob>> getWithSelectors(Interest interest) {
+ Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
+ if (possibleBlob.isPresent() && hasSelectors(interest)) {
+ List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
+ if (children.isEmpty()) {
+ return Optional.empty();
+ } else if (children.size() == 1) {
+ return Optional.of(children.get(0));
+ } else if (isRightMost(interest)) {
+ Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
+ return Optional.of(children.get(0));
+ } else if (isLeftMost(interest)) {
+ Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
+ return Optional.of(children.get(0));
+ } else {
+ // TODO max/min suffix components and excludes
+ LOGGER.warning("The interest requested has selectors not yet implemented; returning empty content");
+ return Optional.empty();
+ }
+ }
+ return possibleBlob;
+ }
+
+ private static boolean hasSelectors(Interest interest) {
+ return interest.getChildSelector() != -1 || interest.getExclude().size() > 0;
+ }
+
+ private static boolean isRightMost(Interest interest) {
+ return interest.getChildSelector() == Interest.CHILD_SELECTOR_RIGHT;
+ }
+
+ private static boolean isLeftMost(Interest interest) {
+ return interest.getChildSelector() == Interest.CHILD_SELECTOR_LEFT;
+ }
+
+ @Override
+ public boolean has(Name name) {
+ return store.find(name).isPresent();
+ }
+
+ @Override
+ public boolean has(Interest interest) {
+ return get(interest).isPresent();
+ }
+
+ @Override
+ public Optional<Blob> get(Name name) {
+ Optional<NameTree<Blob>> tree = store.find(name);
+ return tree.isPresent() ? tree.get().content() : Optional.empty();
+ }
+
+ @Override
+ public void push(Face face, Name name) throws IOException {
+ Optional<Blob> blob = get(name);
+ if (blob.isPresent()) {
+ Data t = new Data(template);
+ t.setName(name);
+ ByteArrayInputStream b = new ByteArrayInputStream(blob.get().getImmutableArray());
+ for (Data d : SegmentationHelper.segment(t, b)) {
+ face.putData(d);
+ }
+ }
+ }
+
+ @Override
+ public void push(Face face, Interest interest) throws IOException {
+ Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+ if (leaf.isPresent() && leaf.get().content().isPresent()) {
+ Data t = new Data(template);
+ t.setName(leaf.get().fullName());
+ ByteArrayInputStream b = new ByteArrayInputStream(leaf.get().content().get().getImmutableArray());
+ for (Data d : SegmentationHelper.segment(t, b)) {
+ face.putData(d);
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ store.clear();
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
index 92be1ba..363a9ce 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
@@ -14,15 +14,50 @@
package com.intel.jndn.utils.pubsub;
+import com.intel.jndn.utils.Cancellation;
+import com.intel.jndn.utils.On;
+
import java.io.IOException;
/**
+ * API for entering and exiting a group and tracking the accompanying announcements.
+ *
* @author Andrew Brown, andrew.brown@intel.com
*/
interface AnnouncementService {
+ /**
+ * Announce entrance into the group
+ *
+ * @param id the ID of the agent
+ * @throws IOException if the announcement fails
+ */
void announceEntrance(long id) throws IOException;
+
+ /**
+ * Announce exit from the group
+ *
+ * @param id the ID of the agent
+ * @throws IOException if the announcement fails
+ */
void announceExit(long id) throws IOException;
+ /**
+ * Discover all previously announced entrances; the implementation is expected to be driven by network IO
+ * @param onFound callback fired when a group member is discovered
+ * @param onComplete callback fired when all group members have been discovered
+ * @param onError callback fired if errors occur during discovery
+ * @return a cancellation token for stopping discovery
+ * @throws IOException if the network fails during discovery
+ */
Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException;
- Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException;
+
+ /**
+ * Observe any new entrances/exits; the implementation is expected to be driven by network IO
+ * @param onAdded callback fired when a group member announces its entry into the group, see {@link #announceEntrance(long)}
+ * @param onRemoved callback fired when a group member announces its exit from the group, see {@link #announceExit(long)}
+ * @param onError callback fired if errors are encountered while processing announcements
+ * @return a cancellation token for stopping the observer
+ * @throws IOException if the network fails while setting up the observer
+ */
+ Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java b/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java
deleted file mode 100644
index 2fe42df..0000000
--- a/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.pubsub;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public interface Cancellation {
- Cancellation CANCELLED = () -> {/* do nothing */};
-
- void cancel();
-}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
index 654051f..b1e5180 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
@@ -14,6 +14,8 @@
package com.intel.jndn.utils.pubsub;
+import com.intel.jndn.utils.Cancellation;
+import com.intel.jndn.utils.On;
import com.intel.jndn.utils.client.impl.BackoffRetryClient;
import net.named_data.jndn.Exclude;
import net.named_data.jndn.Face;
@@ -32,7 +34,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import static com.intel.jndn.utils.pubsub.Cancellation.CANCELLED;
+import static com.intel.jndn.utils.Cancellation.CANCELLED;
/**
* @author Andrew Brown, andrew.brown@intel.com
@@ -91,6 +93,7 @@
return () -> stopped = true;
}
+ // TODO need special namespace for discovery
private Interest discover(BackoffRetryClient client, On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException {
Interest interest = new Interest(topicPrefix);
interest.setInterestLifetimeMilliseconds(STARTING_DISCOVERY_LIFETIME);
@@ -141,7 +144,7 @@
}
@Override
- public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException {
+ public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException {
LOGGER.log(Level.INFO, "Observing new announcements: {0}", broadcastPrefix);
CompletableFuture<Void> future = new CompletableFuture<>();
OnRegistration onRegistration = new OnRegistration(future);
@@ -150,8 +153,8 @@
try {
long registeredPrefix = face.registerPrefix(broadcastPrefix, onAnnouncement, (OnRegisterFailed) onRegistration, onRegistration);
return () -> face.removeRegisteredPrefix(registeredPrefix);
- } catch (IOException | SecurityException e) {
- throw new RegistrationFailureException(e);
+ } catch (SecurityException e) {
+ throw new IOException("Failed while using transport security key chain", e);
}
}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
index 48c11bb..adb069b 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -15,6 +15,7 @@
package com.intel.jndn.utils.pubsub;
import com.intel.jndn.utils.ContentStore;
+import com.intel.jndn.utils.PendingInterestTable;
import com.intel.jndn.utils.Publisher;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
@@ -36,6 +37,8 @@
import java.util.logging.Logger;
/**
+ * TODO look at thread safety
+ *
* @author Andrew Brown, andrew.brown@intel.com
*/
class NdnPublisher implements Publisher, OnInterestCallback {
@@ -60,7 +63,7 @@
this.contentStore = contentStore;
}
- synchronized void open() throws RegistrationFailureException {
+ synchronized void open() throws IOException {
opened = true;
CompletableFuture<Void> future = new CompletableFuture<>();
OnRegistration onRegistration = new OnRegistration(future);
@@ -71,12 +74,16 @@
future.get(10, TimeUnit.SECONDS);
announcementService.announceEntrance(publisherId);
} catch (IOException | SecurityException | InterruptedException | ExecutionException | TimeoutException e) {
- throw new RegistrationFailureException(e);
+ throw new IOException("Failed to register NDN prefix; pub-sub IO will be impossible", e);
}
}
- // TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers
- // to retrieve still-alive messages
+ /**
+ * TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers
+ * to retrieve still-alive messages
+ *
+ * @throws IOException if the group exit announcement fails
+ */
@Override
public synchronized void close() throws IOException {
if (opened) {
@@ -89,17 +96,13 @@
@Override
public void publish(Blob message) throws IOException {
if (!opened) {
- try {
- open();
- } catch (RegistrationFailureException e) {
- throw new IOException(e);
- }
+ open();
}
long id = latestMessageId.getAndIncrement();
- Name name = PubSubNamespace.toMessageName(prefix, publisherId, id);
+ Name name = PubSubNamespace.toMessageName(prefix, id);
- contentStore.put(name, message);
+ contentStore.put(name, PubSubNamespace.toResponse(null, message));
LOGGER.log(Level.INFO, "Published message {0} to content store: {1}", new Object[]{id, name});
if (pendingInterestTable.has(new Interest(name))) {
@@ -122,7 +125,7 @@
}
}
- private boolean isAttributesRequest(Name name, Interest interest) {
+ private static boolean isAttributesRequest(Name name, Interest interest) {
return name.equals(interest.getName()) && interest.getChildSelector() == -1;
}
@@ -142,7 +145,7 @@
}
}
- private void sendAttributes(Face face, Name publisherName) {
+ private static void sendAttributes(Face face, Name publisherName) {
Data data = new Data(publisherName);
data.setContent(new Blob("[attributes here]"));
data.getMetaInfo().setFreshnessPeriod(ATTRIBUTES_FRESHNESS_PERIOD);
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
index 37f71fa..51aee3a 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
@@ -14,7 +14,9 @@
package com.intel.jndn.utils.pubsub;
+import com.intel.jndn.utils.Cancellation;
import com.intel.jndn.utils.Client;
+import com.intel.jndn.utils.On;
import com.intel.jndn.utils.Subscriber;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
@@ -24,14 +26,16 @@
import net.named_data.jndn.util.Blob;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
+ * TODO look at thread safety
+ *
* @author Andrew Brown, andrew.brown@intel.com
*/
class NdnSubscriber implements Subscriber {
@@ -42,7 +46,7 @@
private final On<Exception> onError;
private final AnnouncementService announcementService;
private final Client client;
- private final Map<Long, Subscription> subscriptions = new HashMap<>();
+ private final Map<Long, Subscription> subscriptions = new ConcurrentHashMap<>();
private Cancellation newAnnouncementCancellation;
private Cancellation existingAnnouncementsCancellation;
@@ -55,13 +59,19 @@
this.client = client;
}
- void open() throws RegistrationFailureException, IOException {
- LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);
- existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::add, null, e -> close());
- newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::add, this::remove, e -> close());
+ @Override
+ public Set<Long> knownPublishers() {
+ return subscriptions.keySet();
}
- void add(long publisherId) {
+ @Override
+ public void open() throws IOException {
+ LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);
+ existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::addPublisher, null, e -> close());
+ newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::addPublisher, this::removePublisher, e -> close());
+ }
+
+ void addPublisher(long publisherId) {
if (subscriptions.containsKey(publisherId)) {
LOGGER.log(Level.WARNING, "Duplicate publisher ID {} received from announcement service; this should not happen and will be ignored", publisherId);
} else {
@@ -71,7 +81,7 @@
}
}
- void remove(long publisherId) {
+ void removePublisher(long publisherId) {
Subscription removed = subscriptions.remove(publisherId);
removed.cancel();
}
@@ -93,32 +103,6 @@
}
}
- Set<Long> knownPublishers() {
- return subscriptions.keySet();
- }
-
- // TODO repeated calls?
- // TODO remove this, do topic.subscribe(On..., On...) instead; or new Subscriber(On..., On..., ...).open()
- @Override
- public Cancellation subscribe(On<Blob> onMessage, On<Exception> onError) {
-// if (!started) {
-// try {
-// open();
-// } catch (RegistrationFailureException e) {
-// LOGGER.log(Level.SEVERE, "Failed to start announcement service, aborting subscription", e);
-// onError.on(e);
-// return Cancellation.CANCELLED;
-// }
-// }
-//
-// LOGGER.log(Level.INFO, "Subscribing: {0}", prefix);
-// for (Subscription c : subscriptions.values()) {
-// c.subscribe();
-// }
-//
- return this::close;
- }
-
private class Subscription implements Cancellation {
final long publisherId;
long messageId;
@@ -128,12 +112,6 @@
this.publisherId = publisherId;
}
- synchronized void subscribe() {
- // would prefer this to be getAsync(on<>, on<>)?
- currentRequest = client.getAsync(face, buildLatestInterest(publisherId)); // TODO backoff
- currentRequest.handle(this::handleResponse);
- }
-
@Override
public synchronized void cancel() {
if (currentRequest != null) {
@@ -141,12 +119,35 @@
}
}
+ synchronized void subscribe() {
+ // would prefer this to be getAsync(on<>, on<>)?
+ currentRequest = client.getAsync(face, buildLatestInterest(publisherId));
+ currentRequest.handle(this::handleResponse);
+ }
+
+ private Interest buildLatestInterest(long publisherId) {
+ Name name = PubSubNamespace.toPublisherName(prefix, publisherId);
+ Interest interest = new Interest(name); // TODO ms lifetime
+ interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+ return interest;
+ }
+
+ synchronized void next(long publisherId, long messageId) {
+ currentRequest = client.getAsync(face, buildNextInterest(publisherId, messageId));
+ currentRequest.handle(this::handleResponse);
+ }
+
+ private Interest buildNextInterest(long publisherId, long messageId) {
+ Name name = PubSubNamespace.toMessageName(prefix, publisherId, messageId + 1);
+ return new Interest(name); // TODO ms lifetime
+ }
+
private Void handleResponse(Data data, Throwable throwable) {
if (throwable != null) {
onError.on((Exception) throwable); // TODO avoid cast?
} else {
try {
- Response response = PubSubNamespace.toResponse(data);
+ Response response = PubSubNamespace.parseResponse(data);
this.messageId = response.messageId();
onMessage.on(response.content()); // TODO buffer and catch exceptions
} catch (EncodingException e) {
@@ -160,23 +161,6 @@
return null;
}
- private void next(long publisherId, long messageId) {
- currentRequest = client.getAsync(face, buildNextInterest(publisherId, messageId));
- currentRequest.handle(this::handleResponse);
- }
-
- private Interest buildLatestInterest(long publisherId) {
- Name name = PubSubNamespace.toPublisherName(prefix, publisherId);
- Interest interest = new Interest(name); // TODO ms lifetime
- interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
- return interest;
- }
-
- private Interest buildNextInterest(long publisherId, long messageId) {
- Name name = PubSubNamespace.toMessageName(prefix, publisherId, messageId + 1);
- return new Interest(name); // TODO ms lifetime
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
index 05f631c..7b5a160 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
@@ -22,6 +22,8 @@
import java.util.logging.Logger;
/**
+ * Helper for tracking and logging prefix registrations; TODO replace or remove this in the future
+ *
* @author Andrew Brown, andrew.brown@intel.com
*/
class OnRegistration implements OnRegisterFailed, OnRegisterSuccess {
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
new file mode 100644
index 0000000..c0caddc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import com.intel.jndn.utils.impl.InMemoryContentStore;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Assemble the necessary elements for building the {@link Publisher} and {@link Subscriber} implementations
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class PubSubFactory {
+
+ private PubSubFactory() {
+ // do not instantiate this factory
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @param prefix the NDN namespace under which messages are published
+ * @param onMessage callback fired when a message is received
+ * @param onError callback fired when an error happens after subscription
+ * @return an open, publisher-discovering, message-retrieving subscriber
+ * @throws IOException if the initial subscription or announcement IO fails
+ */
+ public static Subscriber newSubscriber(Face face, Name prefix, On<Blob> onMessage, On<Exception> onError) throws IOException {
+ NdnAnnouncementService announcementService = new NdnAnnouncementService(face, prefix);
+ AdvancedClient client = new AdvancedClient();
+ NdnSubscriber subscriber = new NdnSubscriber(face, prefix, onMessage, onError, announcementService, client);
+ subscriber.open();
+ return subscriber;
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @param prefix the NDN namespace under which messages are published
+ * @return a group-announcing, unopened subscriber (it will automatically open on first publish)
+ */
+ public static Publisher newPublisher(Face face, Name prefix) {
+ long publisherId = Math.abs(new Random().nextLong());
+ return new NdnPublisher(face, prefix, publisherId, new NdnAnnouncementService(face, prefix), new BoundedInMemoryPendingInterestTable(1024), new InMemoryContentStore(2000));
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
index 94da2db..f524a10 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
@@ -17,7 +17,9 @@
import net.named_data.jndn.Data;
import net.named_data.jndn.Name;
import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.encoding.tlv.Tlv;
import net.named_data.jndn.encoding.tlv.TlvDecoder;
+import net.named_data.jndn.encoding.tlv.TlvEncoder;
import net.named_data.jndn.util.Blob;
import java.util.Collections;
@@ -36,6 +38,7 @@
static final int MESSAGE_CONTENT_MARKER = 102;
static final Name DEFAULT_BROADCAST_PREFIX = new Name("/ndn/broadcast");
+ public static final int MESSAGE_PAYLOAD_INITIAL_CAPACITY = 4096;
private PubSubNamespace() {
// do not instantiate this class
@@ -49,18 +52,48 @@
return new Name(prefix).append(toPublisherComponent(publisherId));
}
+ static long parsePublisher(Name name) throws EncodingException {
+ return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);
+ }
+
static Name toAnnouncement(Name prefix, long publisherId, Announcement announcement) {
Name.Component announcementComponent = Name.Component.fromNumberWithMarker(announcement.id, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
return toPublisherName(prefix, publisherId).append(announcementComponent);
}
+ static Announcement parseAnnouncement(Name name) throws EncodingException {
+ return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));
+ }
+
static Name toMessageName(Name prefix, long publisherId, long messageId) {
Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);
return toPublisherName(prefix, publisherId).append(messageComponent);
}
- static Response toResponse(Data data) throws EncodingException {
- long messageId = findMarkerFromEnd(data.getName(), MESSAGE_ID_MARKER);
+ static Name toMessageName(Name publisherPrefix, long messageId) {
+ Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);
+ return new Name(publisherPrefix).append(messageComponent);
+ }
+
+ static long parseMessageId(Name name) throws EncodingException {
+ return findMarkerFromEnd(name, MESSAGE_ID_MARKER);
+ }
+
+ static Blob toResponse(Blob attributes, Blob content) {
+ TlvEncoder encoder = new TlvEncoder(MESSAGE_PAYLOAD_INITIAL_CAPACITY);
+ int initialLength = encoder.getLength();
+
+ encoder.writeBlobTlv(MESSAGE_CONTENT_MARKER, content.buf()); // encode backwards, see Tlv0_1_1WireFormat.java
+ if(attributes != null && attributes.size() > 0){
+ encoder.writeBlobTlv(MESSAGE_ATTRIBUTES_MARKER, content.buf());
+ }
+ encoder.writeTypeAndLength(MESSAGE_MARKER, encoder.getLength() - initialLength);
+
+ return new Blob(encoder.getOutput(), false);
+ }
+
+ static Response parseResponse(Data data) throws EncodingException {
+ long messageId = parseMessageId(data.getName());
TlvDecoder decoder = new TlvDecoder(data.getContent().buf());
int endOffset = decoder.readNestedTlvsStart(MESSAGE_MARKER);
Blob attributes = new Blob(decoder.readOptionalBlobTlv(MESSAGE_ATTRIBUTES_MARKER, endOffset), true);
@@ -68,14 +101,6 @@
return new Response(data.getName(), messageId, Collections.singletonMap("*", attributes), content);
}
- static long parsePublisher(Name name) throws EncodingException {
- return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);
- }
-
- static Announcement parseAnnouncement(Name name) throws EncodingException {
- return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));
- }
-
private static long findMarkerFromEnd(Name name, int marker) throws EncodingException {
for (int i = name.size() - 1; i >= 0; i--) {
try {
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Response.java b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
index 9d84f44..29aa1cd 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/Response.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
@@ -20,6 +20,11 @@
import java.util.Map;
/**
+ * Represent a publisher response to a subscriber request for a message. Note that the {@link #attributes} act as a type
+ * of header optionally prepended to the message content. The intent of this is to allow publishers to inform
+ * subscribers of publisher-side state changes without requiring a separate mechanism (with accompanying complexity and
+ * overhead); the full set of publisher state attributes should be served elsewhere, not in this class.
+ *
* @author Andrew Brown, andrew.brown@intel.com
*/
class Response {
@@ -35,10 +40,16 @@
this.content = content;
}
+ /**
+ * @return the message ID of the published message
+ */
long messageId() {
return messageId;
}
+ /**
+ * @return the content of the message
+ */
Blob content() {
return content;
}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
deleted file mode 100644
index b0b4ad4..0000000
--- a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.pubsub;
-
-import com.intel.jndn.utils.Publisher;
-import com.intel.jndn.utils.Subscriber;
-import com.intel.jndn.utils.client.impl.AdvancedClient;
-import com.intel.jndn.utils.impl.BoundedInMemoryContentStore;
-import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-import java.io.IOException;
-import java.util.Random;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public final class Topic {
- private final Name name;
-
- public Topic(Name name) {
- this.name = name;
- }
-
- public Name name() {
- return new Name(name);
- }
-
- // TODO move to PubSubFactory? change this to subscribe()
- public Subscriber subscribe(Face face, On<Blob> onMessage, On<Exception> onError) throws RegistrationFailureException, IOException {
- NdnSubscriber subscriber = new NdnSubscriber(face, name, onMessage, onError, new NdnAnnouncementService(face, name), new AdvancedClient());
- subscriber.open();
- return subscriber;
- }
-
- // TODO move to PubSubFactory? change this to publish()
- public Publisher newPublisher(Face face) {
- long publisherId = Math.abs(new Random().nextLong());
- return new NdnPublisher(face, name, publisherId, new NdnAnnouncementService(face, name), new BoundedInMemoryPendingInterestTable(1024), new BoundedInMemoryContentStore(1024, 2000));
- }
-}