Finish BoundedInMemoryContentStore
diff --git a/pom.xml b/pom.xml
index 323a19c..f5ca9c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.intel.jndn.utils</groupId>
<artifactId>jndn-utils</artifactId>
<version>1.0.4</version>
- <prefix>jndn-utils</prefix>
+ <name>jndn-utils</name>
<description>Collection of tools to simplify synchronous and asynchronous data transfer over the NDN network
</description>
<url>https://github.com/01org/jndn-utils</url>
@@ -14,14 +14,14 @@
<licenses>
<license>
- <prefix>LGPL v3</prefix>
+ <name>LGPL v3</name>
<url>https://www.gnu.org/licenses/lgpl.html</url>
</license>
</licenses>
<developers>
<developer>
- <prefix>Andrew Brown</prefix>
+ <name>Andrew Brown</name>
<url>http://github.com/andrewsbrown</url>
</developer>
</developers>
@@ -58,6 +58,12 @@
<version>1.1.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.10.19</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<properties>
diff --git a/src/main/java/com/intel/jndn/utils/ContentStore.java b/src/main/java/com/intel/jndn/utils/ContentStore.java
index aad8a0d..f446dea 100644
--- a/src/main/java/com/intel/jndn/utils/ContentStore.java
+++ b/src/main/java/com/intel/jndn/utils/ContentStore.java
@@ -51,7 +51,8 @@
Blob get(Name name);
/**
- * Write the data under a given name to the face
+ * Write the stored content to the face as Data packets. If no content exists for the given name, this method should
+ * have no effect.
*
* @param face the face to write to
* @param name the name of the data to write
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
similarity index 60%
rename from src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java
rename to src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
index 375fae5..477a573 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStore.java
@@ -1,56 +1,71 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.pubsub;
-
-import com.intel.jndn.utils.ContentStore;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-class BlobContentStore implements ContentStore {
- private final BoundedLinkedMap<Name, Blob> store;
-
- BlobContentStore(int maxSize) {
- this.store = new BoundedLinkedMap<>(maxSize);
- }
-
- @Override
- public void put(Name name, Blob data) {
- store.put(name, data);
- }
-
- @Override
- public boolean has(Name name) {
- return store.containsKey(name);
- }
-
- @Override
- public Blob get(Name name) {
- return store.get(name);
- }
-
- @Override
- public void push(Face face, Name name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clear() {
- store.clear();
- }
-}
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.ContentStore;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryContentStore implements ContentStore {
+ private final BoundedLinkedMap<Name, Blob> store;
+ private final Data template;
+
+ public BoundedInMemoryContentStore(int maxSize, int freshnessMs) {
+ this.store = new BoundedLinkedMap<>(maxSize);
+ this.template = new Data();
+ this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
+ }
+
+ @Override
+ public void put(Name name, Blob data) {
+ store.put(name, data);
+ }
+
+ @Override
+ public boolean has(Name name) {
+ return store.containsKey(name);
+ }
+
+ @Override
+ public Blob get(Name name) {
+ return store.get(name);
+ }
+
+ @Override
+ public void push(Face face, Name name) throws IOException {
+ Blob blob = get(name);
+ if (blob != null) {
+ Data t = new Data(template);
+ t.setName(name);
+ ByteArrayInputStream b = new ByteArrayInputStream(blob.getImmutableArray());
+ for (Data d : SegmentationHelper.segment(t, b)) {
+ face.putData(d);
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ store.clear();
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
new file mode 100644
index 0000000..b214019
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
@@ -0,0 +1,62 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.pubsub.PendingInterestTable;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTable implements PendingInterestTable {
+ //private static final Logger LOGGER = Logger.getLogger(BoundedInMemoryPendingInterestTable.class.getName());
+ private final BoundedLinkedMap<Name, Interest> table;
+
+ public BoundedInMemoryPendingInterestTable(int maxSize) {
+ this.table = new BoundedLinkedMap<>(maxSize);
+ }
+
+ @Override
+ public void add(Interest interest) {
+ table.put(interest.getName(), interest);
+ }
+
+ @Override
+ public boolean has(Interest interest) {
+ if (interest.getChildSelector() != -1) {
+ for (Name name : table.keySet()) {
+ if (interest.matchesName(name)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return has(interest.getName());
+ }
+ }
+
+ public boolean has(Name name) {
+ return table.containsKey(name);
+ }
+
+ @Override
+ public Collection<Interest> extract(Name name) {
+ return table.values().stream().filter(i -> i.matchesName(name)).collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/BoundedLinkedMap.java b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
similarity index 97%
rename from src/main/java/com/intel/jndn/utils/pubsub/BoundedLinkedMap.java
rename to src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
index 8211fe1..3cc4538 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/BoundedLinkedMap.java
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
@@ -12,7 +12,7 @@
* more details.
*/
-package com.intel.jndn.utils.pubsub;
+package com.intel.jndn.utils.impl;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -106,7 +106,9 @@
@Override
public synchronized V remove(Object key) {
V value = map.remove(key);
- if (key == latest) latest = findLatest(map);
+ if (key == latest) {
+ latest = findLatest(map);
+ }
return value;
}
diff --git a/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
index f63fe2f..1ddef62 100644
--- a/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
+++ b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
@@ -98,8 +98,8 @@
}
/**
- * Segment a stream of bytes into a list of Data packets; this must read all
- * the bytes first in order to determine the end segment for FinalBlockId.
+ * Segment a stream of bytes into a list of Data packets; this must read all the bytes first in order to determine the
+ * end segment for FinalBlockId. TODO this could be smarter and only add segments if necessary
*
* @param template the {@link Data} packet to use for the segment {@link Name}, {@link net.named_data.jndn.MetaInfo},
* etc.
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java
deleted file mode 100644
index 10e4084..0000000
--- a/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2016, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-
-package com.intel.jndn.utils.pubsub;
-
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author Andrew Brown, andrew.brown@intel.com
- */
-public class ForLoopPendingInterestTable implements PendingInterestTable {
- Map<Name, Interest> table = new ConcurrentHashMap<>();
-
- @Override
- public void add(Interest interest) {
- table.put(interest.getName(), interest);
- }
-
- @Override
- public boolean has(Interest interest) {
- // TODO must handle more complex logic (selectors)
- return has(interest.getName());
- }
-
- public boolean has(Name name) {
- return table.containsKey(name);
- }
-
- @Override
- public Collection<Interest> extract(Name name) {
- // TODO more complexity to return multiple results
- return has(name) ? Collections.singleton(table.get(name)) : Collections.emptyList();
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
index 9679d7e..4fa8409 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
@@ -56,22 +56,23 @@
@Override
public void announceEntrance(long id) throws IOException {
- Name.Component publisherId = Name.Component.fromNumberWithMarker(id, PubSubNamespace.PUBLISHER_ID_MARKER);
- Name.Component announcementAction = Name.Component.fromNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_ENTRANCE, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
- Interest interest = new Interest(new Name(broadcastPrefix).append(topicPrefix).append(publisherId).append(announcementAction));
+ LOGGER.log(Level.INFO, "Announcing publisher entrance: {0} to {1}", new Object[]{id, topicPrefix});
+ Name name = PubSubNamespace.toAnnouncement(new Name(broadcastPrefix).append(topicPrefix), id, PubSubNamespace.Announcement.ENTRANCE);
+ Interest interest = new Interest(name);
face.expressInterest(interest, null);
}
@Override
public void announceExit(long id) throws IOException {
- Name.Component publisherId = Name.Component.fromNumberWithMarker(id, PubSubNamespace.PUBLISHER_ID_MARKER);
- Name.Component announcementAction = Name.Component.fromNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_EXIT, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
- Interest interest = new Interest(new Name(broadcastPrefix).append(topicPrefix).append(publisherId).append(announcementAction));
+ LOGGER.log(Level.INFO, "Announcing publisher exit: {0} from {1}", new Object[]{id, topicPrefix});
+ Name name = PubSubNamespace.toAnnouncement(new Name(broadcastPrefix).append(topicPrefix), id, PubSubNamespace.Announcement.EXIT);
+ Interest interest = new Interest(name);
face.expressInterest(interest, null);
}
@Override
public Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) {
+ LOGGER.log(Level.INFO, "Discover existing publishers: {0}", topicPrefix);
if (onFound == null) {
return CANCELLED;
}
@@ -100,6 +101,7 @@
@Override
public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException {
+ LOGGER.log(Level.INFO, "Observing new announcements: {0}", topicPrefix);
CompletableFuture<Void> future = new CompletableFuture<>();
OnRegistration onRegistration = new OnRegistration(future);
OnAnnouncement onAnnouncement = new OnAnnouncement(onAdded, onRemoved, onError);
@@ -127,12 +129,12 @@
public void onInterest(Name name, Interest interest, Face face, long l, InterestFilter interestFilter) {
try {
long publisherId = interest.getName().get(-2).toNumberWithMarker(PubSubNamespace.PUBLISHER_ID_MARKER);
- int announcement = (int) interest.getName().get(-1).toNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
+ PubSubNamespace.Announcement announcement = PubSubNamespace.parseAnnouncement(interest.getName());
switch (announcement) {
- case PubSubNamespace.ANNOUNCEMENT_ENTRANCE:
+ case ENTRANCE:
add(publisherId);
break;
- case PubSubNamespace.ANNOUNCEMENT_EXIT:
+ case EXIT:
remove(publisherId);
break;
default:
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
index f68e62b..a6e1b87 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -16,14 +16,12 @@
import com.intel.jndn.utils.ContentStore;
import com.intel.jndn.utils.Publisher;
-import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.InterestFilter;
import net.named_data.jndn.Name;
import net.named_data.jndn.OnInterestCallback;
import net.named_data.jndn.OnRegisterFailed;
-import net.named_data.jndn.encoding.EncodingException;
import net.named_data.jndn.security.SecurityException;
import net.named_data.jndn.util.Blob;
@@ -32,6 +30,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -46,9 +45,9 @@
private final PendingInterestTable pendingInterestTable;
private final ContentStore contentStore;
private final long publisherId;
- private volatile long latestMessageId = 0;
+ private final AtomicLong latestMessageId = new AtomicLong(0);
private long registrationId;
- private boolean started = false;
+ private boolean opened = false;
NdnPublisher(Face face, Name prefix, long publisherId, AnnouncementService announcementService, PendingInterestTable pendingInterestTable, ContentStore contentStore) {
this.face = face;
@@ -59,13 +58,14 @@
this.contentStore = contentStore;
}
- void start() throws RegistrationFailureException {
- started = true;
+ synchronized void open() throws RegistrationFailureException {
+ opened = true;
CompletableFuture<Void> future = new CompletableFuture<>();
OnRegistration onRegistration = new OnRegistration(future);
try {
registrationId = face.registerPrefix(prefix, this, (OnRegisterFailed) onRegistration, onRegistration);
+ // assumes face.processEvents is driven concurrently elsewhere
future.get(10, TimeUnit.SECONDS);
announcementService.announceEntrance(publisherId);
} catch (IOException | SecurityException | InterruptedException | ExecutionException | TimeoutException e) {
@@ -73,33 +73,37 @@
}
}
+ // TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers
+ // to retrieve still-alive messages
@Override
- public void close() throws IOException {
- face.removeRegisteredPrefix(registrationId);
- contentStore.clear();
- announcementService.announceExit(publisherId);
+ public synchronized void close() throws IOException {
+ if (opened) {
+ face.removeRegisteredPrefix(registrationId);
+ contentStore.clear();
+ announcementService.announceExit(publisherId);
+ }
}
- // TODO should throw IOException?
@Override
public void publish(Blob message) throws IOException {
- if (!started) {
+ if (!opened) {
try {
- start();
+ open();
} catch (RegistrationFailureException e) {
throw new IOException(e);
}
}
- long id = latestMessageId++; // TODO synchronize?
+ long id = latestMessageId.getAndIncrement();
Name name = PubSubNamespace.toMessageName(prefix, publisherId, id);
contentStore.put(name, message);
- LOGGER.log(Level.INFO, "Published message {0} to content store", id);
+ LOGGER.log(Level.INFO, "Published message {0} to content store: {1}", new Object[]{id, name});
if (pendingInterestTable.has(new Interest(name))) {
try {
contentStore.push(face, name);
+ // TODO extract satisfied interests
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Failed to send message {0} for pending interests: {1}", new Object[]{id, name, e});
}
@@ -108,23 +112,15 @@
@Override
public void onInterest(Name name, Interest interest, Face face, long registrationId, InterestFilter interestFilter) {
+ LOGGER.log(Level.INFO, "Client requesting message: {0}", interest.toUri());
try {
if (contentStore.has(interest.getName())) {
contentStore.push(face, interest.getName());
} else {
pendingInterestTable.add(interest);
}
-
- long id = interest.getName().get(-1).toNumberWithMarker(43);
- Blob blob = contentStore.get(interest.getName());
- Data data = new Data(interest.getName());
- data.setContent(blob);
- face.putData(data);
- LOGGER.info("Published message " + id + " for interest: " + interest.toUri());
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Failed to publish message for interest: " + interest.toUri(), e);
- } catch (EncodingException e) {
- LOGGER.log(Level.SEVERE, "Failed to decode message ID for interest: " + interest.toUri(), e);
}
}
}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
index e2deebd..164dd89 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
@@ -37,9 +37,11 @@
private static final Logger LOGGER = Logger.getLogger(NdnSubscriber.class.getName());
private final Face face;
private final Name prefix;
+ private final On<Blob> onMessage = null;
+ private final On<Exception> onError = null;
private final AnnouncementService announcementService;
private final Client client;
- private final Map<Long, Context> known = new HashMap<>();
+ private final Map<Long, Subscription> subscriptions = new HashMap<>();
private Cancellation newAnnouncementCancellation;
private Cancellation existingAnnouncementsCancellation;
private volatile boolean started = false;
@@ -51,8 +53,8 @@
this.client = client;
}
- private void start() throws RegistrationFailureException {
- LOGGER.log(Level.INFO, "Starting subscriber");
+ void open() throws RegistrationFailureException {
+ LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);
existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::add, null, e -> close());
newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::add, this::remove, e -> close());
@@ -60,21 +62,24 @@
started = true;
}
- private void add(long publisherId) {
- if (known.containsKey(publisherId)) {
+ void add(long publisherId) {
+ if (subscriptions.containsKey(publisherId)) {
LOGGER.log(Level.WARNING, "Duplicate publisher ID {} received from announcement service; this should not happen and will be ignored", publisherId);
} else {
- known.put(publisherId, new Context(publisherId));
+ Subscription subscription = new Subscription(publisherId);
+ subscriptions.put(publisherId, subscription);
+ subscription.subscribe();
}
}
- private void remove(long publisherId) {
- known.remove(publisherId);
+ void remove(long publisherId) {
+ Subscription removed = subscriptions.remove(publisherId);
+ removed.cancel();
}
@Override
public void close() {
- LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{known.size(), known});
+ LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{subscriptions.size(), subscriptions});
if (newAnnouncementCancellation != null) {
newAnnouncementCancellation.cancel();
@@ -84,7 +89,7 @@
existingAnnouncementsCancellation.cancel();
}
- for (Context c : known.values()) {
+ for (Subscription c : subscriptions.values()) {
c.cancel();
}
@@ -92,15 +97,16 @@
}
Set<Long> knownPublishers() {
- return known.keySet();
+ return subscriptions.keySet();
}
// TODO repeated calls?
+ // TODO remove this, do topic.subscribe(On..., On...) instead; or new Subscriber(On..., On..., ...).open()
@Override
public Cancellation subscribe(On<Blob> onMessage, On<Exception> onError) {
if (!started) {
try {
- start();
+ open();
} catch (RegistrationFailureException e) {
LOGGER.log(Level.SEVERE, "Failed to start announcement service, aborting subscription", e);
onError.on(e);
@@ -108,29 +114,25 @@
}
}
- for (Context c : known.values()) {
- c.subscribe(onMessage, onError);
+ LOGGER.log(Level.INFO, "Subscribing: {0}", prefix);
+ for (Subscription c : subscriptions.values()) {
+ c.subscribe();
}
return this::close;
}
- private class Context implements Cancellation {
+ private class Subscription implements Cancellation {
final long publisherId;
long messageId;
boolean subscribed = false;
- On<Blob> onMessage;
- On<Exception> onError;
CompletableFuture<Data> currentRequest;
- Context(long publisherId) {
+ Subscription(long publisherId) {
this.publisherId = publisherId;
}
- synchronized void subscribe(On<Blob> onMessage, On<Exception> onError) {
- this.onMessage = onMessage;
- this.onError = onError;
-
+ synchronized void subscribe() {
if (subscribed) {
return;
}
@@ -191,8 +193,8 @@
if (o == null || getClass() != o.getClass()) {
return false;
}
- Context context = (Context) o;
- return publisherId == context.publisherId;
+ Subscription subscription = (Subscription) o;
+ return publisherId == subscription.publisherId;
}
@Override
@@ -202,7 +204,7 @@
@Override
public String toString() {
- return "Context{" + "publisherId=" + publisherId + '}';
+ return "Subscription{" + "publisherId=" + publisherId + '}';
}
}
}
\ No newline at end of file
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
index 5df1c79..89221cc 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
@@ -30,10 +30,8 @@
static final int PUBLISHER_ID_MARKER = 43;
static final int ANNOUNCEMENT_ACTION_MARKER = 44;
- static final int ANNOUNCEMENT_ENTRANCE = 1;
- static final int ANNOUNCEMENT_EXIT = 0;
-
static final int MESSAGE_MARKER = 100;
+
static final int MESSAGE_ATTRIBUTES_MARKER = 101;
static final int MESSAGE_CONTENT_MARKER = 102;
@@ -48,13 +46,18 @@
return new Name(prefix).append(publisherComponent);
}
+ static Name toAnnouncement(Name prefix, long publisherId, Announcement announcement) {
+ Name.Component announcementComponent = Name.Component.fromNumberWithMarker(announcement.id, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
+ return toPublisherName(prefix, publisherId).append(announcementComponent);
+ }
+
static Name toMessageName(Name prefix, long publisherId, long messageId) {
Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);
return toPublisherName(prefix, publisherId).append(messageComponent);
}
static Response toResponse(Data data) throws EncodingException {
- long messageId = extractMessageId(data.getName());
+ long messageId = findMarkerFromEnd(data.getName(), MESSAGE_ID_MARKER);
TlvDecoder decoder = new TlvDecoder(data.getContent().buf());
int endOffset = decoder.readNestedTlvsStart(MESSAGE_MARKER);
Blob attributes = new Blob(decoder.readOptionalBlobTlv(MESSAGE_ATTRIBUTES_MARKER, endOffset), true);
@@ -62,14 +65,48 @@
return new Response(data.getName(), messageId, Collections.singletonMap("*", attributes), content);
}
- private static long extractMessageId(Name name) throws EncodingException {
- for (int i = name.size(); i >= 0; i--) {
+ static long parsePublisher(Name name) throws EncodingException {
+ return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);
+ }
+
+ static Announcement parseAnnouncement(Name name) throws EncodingException {
+ return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));
+ }
+
+ private static long findMarkerFromEnd(Name name, int marker) throws EncodingException {
+ for (int i = name.size() - 1; i >= 0; i--) {
try {
- return name.get(i).toNumberWithMarker(MESSAGE_ID_MARKER);
+ return name.get(i).toNumberWithMarker(marker);
} catch (EncodingException e) {
// do nothing
}
}
- throw new EncodingException("Failed to find message ID marker in name: " + name.toUri());
+ throw new EncodingException("Failed to find marker " + marker + " in name: " + name.toUri());
+ }
+
+ enum Announcement {
+ ENTRANCE(0),
+ EXIT(1);
+
+ private final int id;
+
+ Announcement(int id) {
+ this.id = id;
+ }
+
+ static Announcement from(int value) {
+ for (Announcement a : Announcement.values()) {
+ if (a.id == value) return a;
+ }
+ return null;
+ }
+
+ int value() {
+ return id;
+ }
+
+ public int getValue() {
+ return id;
+ }
}
}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
index 4a60b1c..5575712 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
@@ -17,6 +17,8 @@
import com.intel.jndn.utils.Publisher;
import com.intel.jndn.utils.Subscriber;
import com.intel.jndn.utils.client.impl.AdvancedClient;
+import com.intel.jndn.utils.impl.BoundedInMemoryContentStore;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
import net.named_data.jndn.Face;
import net.named_data.jndn.Name;
@@ -36,13 +38,13 @@
return new Name(name);
}
- // TODO move to PubSubFactory?
+ // TODO move to PubSubFactory? change this to subscribe()
public Subscriber newSubscriber(Face face) {
return new NdnSubscriber(face, name, new NdnAnnouncementService(face, name), new AdvancedClient());
}
- // TODO move to PubSubFactory?
+ // TODO move to PubSubFactory? change this to publish()
public Publisher newPublisher(Face face) {
- return new NdnPublisher(face, name, new Random().nextLong(), new NdnAnnouncementService(face, name), new ForLoopPendingInterestTable(), new BlobContentStore(1024));
+ return new NdnPublisher(face, name, new Random().nextLong(), new NdnAnnouncementService(face, name), new BoundedInMemoryPendingInterestTable(1024), new BoundedInMemoryContentStore(1024, 2000));
}
}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/package.html b/src/main/java/com/intel/jndn/utils/pubsub/package.html
new file mode 100644
index 0000000..562c542
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/package.html
@@ -0,0 +1,26 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
+<html>
+<body>
+<p>See [need link here] for algorithm details.</p>
+
+<p>The NDN pub-sub algorithm is designed using the following criteria:</p>
+<ul>
+ <li>subscriber-driven</li>
+ <li>lightweight</li>
+ <li>avoid unnecessary IO</li>
+</ul>
+
+<p>It makes the following guarantees:</p>
+<ul>
+ <li>each publisher will order its own messages by the order in which they were published; however, the mechanism
+ for this ordering (i.e. a monotonically-increasing ID) will not maintain the guarantee across multiple
+ publishers (in this case, an application-level mechanism, like a time-synchronized timestamp, must be used;
+ alternatively, try ChronoSync)
+ </li>
+ <li>publishers will provide the messages published on a topic for a specified lifetime; even after being closed,
+ the publisher must maintain a registered prefix and respond to subscriber requests for data
+ </li>
+ <li>subscribers will eventually discover all routable publishers</li>
+</ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStoreTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStoreTest.java
new file mode 100644
index 0000000..aa896ba
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryContentStoreTest.java
@@ -0,0 +1,82 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.mock.MockFace;
+import com.intel.jndn.utils.ContentStore;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryContentStoreTest {
+ ContentStore instance = new BoundedInMemoryContentStore(5, 1000);
+
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @Test
+ public void basicUsage() throws Exception {
+ instance.put(new Name("/a"), new Blob("."));
+
+ assertTrue(instance.has(new Name("/a")));
+ assertArrayEquals(".".getBytes(), instance.get(new Name("/a")).getImmutableArray());
+ }
+
+ @Test
+ public void replacement() throws Exception {
+ instance.put(new Name("/a"), new Blob("."));
+ instance.put(new Name("/a/b"), new Blob(".."));
+ instance.put(new Name("/a/b/c"), new Blob("..."));
+ instance.put(new Name("/a/b/c/d"), new Blob("...."));
+ instance.put(new Name("/a/b/c/d/e"), new Blob("...."));
+ assertTrue(instance.has(new Name("/a")));
+
+ instance.put(new Name("/replace/oldest"), new Blob("."));
+
+ assertFalse(instance.has(new Name("/a")));
+ assertTrue(instance.has(new Name("/replace/oldest")));
+ }
+
+ @Test
+ public void push() throws Exception {
+ MockFace face = new MockFace();
+ Name name = new Name("/a");
+ instance.put(name, new Blob("."));
+
+ instance.push(face, name);
+
+ assertEquals(1, face.sentData.size());
+ assertEquals(name.appendSegment(0), face.sentData.get(0).getName()); // TODO this should probably be smarter and avoid appending segments if not needed
+ }
+
+ @Test
+ public void clear() throws Exception {
+ instance.put(new Name("/a"), new Blob("."));
+ assertTrue(instance.has(new Name("/a")));
+
+ instance.clear();
+
+ assertFalse(instance.has(new Name("/a")));
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
new file mode 100644
index 0000000..739e215
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTableTest {
+
+ private BoundedInMemoryPendingInterestTable instance;
+
+ @Before
+ public void before() {
+ instance = new BoundedInMemoryPendingInterestTable(5);
+ }
+
+ @Test
+ public void add() throws Exception {
+ Name name = new Name("/a/b/c");
+ instance.add(new Interest(name));
+ assertTrue(instance.has(name));
+ }
+
+ @Test
+ public void has() throws Exception {
+ Name name = new Name("/a/b/c");
+ instance.add(new Interest(name));
+
+ Interest interest = new Interest(new Name("/a/b"));
+ interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+ assertTrue(instance.has(interest));
+ }
+
+ @Test
+ public void extract() throws Exception {
+ instance.add(new Interest(new Name("/a")));
+ instance.add(new Interest(new Name("/a/b")));
+ instance.add(new Interest(new Name("/a/b/c")));
+
+ Collection<Interest> extracted = instance.extract(new Name("/a/b"));
+ assertEquals(2, extracted.size()); // TODO not sure about this...
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/BoundedLinkedMapTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
similarity index 98%
rename from src/test/java/com/intel/jndn/utils/pubsub/BoundedLinkedMapTest.java
rename to src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
index 3107db6..d4ca544 100644
--- a/src/test/java/com/intel/jndn/utils/pubsub/BoundedLinkedMapTest.java
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
@@ -12,7 +12,7 @@
* more details.
*/
-package com.intel.jndn.utils.pubsub;
+package com.intel.jndn.utils.impl;
import org.junit.Before;
import org.junit.Test;
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
new file mode 100644
index 0000000..26cd0d8
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
@@ -0,0 +1,72 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnAnnouncementServiceTest {
+
+ private NdnAnnouncementService instance;
+ private Face face;
+
+ @Before
+ public void before() {
+ face = mock(Face.class);
+ instance = new NdnAnnouncementService(face, new Name("/topic/prefix"));
+ }
+
+ @Test
+ public void announceEntrance() throws Exception {
+ instance.announceEntrance(42);
+
+ ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+ verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+ assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+ assertEquals(PubSubNamespace.Announcement.ENTRANCE, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+ }
+
+ @Test
+ public void announceExit() throws Exception {
+ instance.announceExit(42);
+
+ ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+ verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+ assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+ assertEquals(PubSubNamespace.Announcement.EXIT, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+ }
+
+ @Test
+ public void discoverExistingAnnouncements() throws Exception {
+
+ }
+
+ @Test
+ public void observeNewAnnouncements() throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
new file mode 100644
index 0000000..bdb0693
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
@@ -0,0 +1,169 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MeasurableFace;
+import com.intel.jndn.mock.MockForwarder;
+import com.intel.jndn.utils.impl.BoundedInMemoryContentStore;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnPublisherTest {
+ private static final Logger LOGGER = Logger.getLogger(NdnPublisherTest.class.getName());
+ private static final Name PUBLISHER_PREFIX = new Name("/publisher");
+ private static final long PUBLISHER_ID = new Random().nextLong();
+ private NdnPublisher instance;
+ private Face face;
+ private MockForwarder forwarder;
+
+ @Before
+ public void before() throws Exception {
+ forwarder = new MockForwarder();
+ face = forwarder.connect();
+ NdnAnnouncementService announcementService = new NdnAnnouncementService(face, PUBLISHER_PREFIX);
+ BoundedInMemoryPendingInterestTable pendingInterestTable = new BoundedInMemoryPendingInterestTable(1024);
+ BoundedInMemoryContentStore contentStore = new BoundedInMemoryContentStore(1024, 2000);
+ instance = new NdnPublisher(face, PUBLISHER_PREFIX, PUBLISHER_ID, announcementService, pendingInterestTable, contentStore);
+
+ driveFace(face); // TODO preferably do this in MockForwarder
+ }
+
+ @Test
+ public void openClose() throws Exception {
+ instance.open();
+ assertEquals(1, numSentInterests()); // doesn't count prefix registration, only announcement
+
+ instance.close();
+ assertEquals(2, numSentInterests());
+ assertEquals(0, numReceivedDatas());
+ }
+
+ @Test
+ public void close() throws Exception {
+ instance.close();
+
+ assertEquals(0, numSentInterests()); // if the publisher has not been opened, it will not announce an exit
+ assertEquals(0, numReceivedDatas());
+ }
+
+ @Test
+ public void publish() throws Exception {
+ instance.open();
+ assertEquals(1, numSentInterests());
+
+ instance.publish(new Blob("..."));
+ assertEquals(0, numSentDatas());
+ assertEquals(1, numSentInterests()); // none more than from opening
+ }
+
+ @Test
+ public void publishWithPendingInterest() throws Exception {
+ instance.open();
+ assertEquals(1, numSentInterests());
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Face client = forwarder.connect();
+ client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ instance.publish(new Blob("..."));
+ assertEquals(1, numSentDatas());
+ assertEquals(1, numSentInterests()); // none more than from opening
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ latch.await(1, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void publishAndRespondToNewInterest() throws Exception {
+ Face client = forwarder.connect();
+ instance.open();
+ assertEquals(1, numSentInterests());
+
+ instance.publish(new Blob("..."));
+ assertEquals(0, numSentDatas());
+ assertEquals(1, numSentInterests()); // none more than from opening
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ latch.await(1, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ private int numSentDatas() {
+ return ((MeasurableFace) face).sentDatas().size();
+ }
+
+ private int numReceivedDatas() {
+ return ((MeasurableFace) face).receivedDatas().size();
+ }
+
+ private int numSentInterests() {
+ return ((MeasurableFace) face).sentInterests().size();
+ }
+
+ private void driveFace(Face face) throws InterruptedException {
+ ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
+ //CountDownLatch latch = new CountDownLatch(1);
+
+ pool.schedule(() -> {
+ //latch.countDown();
+ try {
+ face.processEvents();
+ } catch (IOException | EncodingException e) {
+ LOGGER.log(Level.SEVERE, "Failed to process face events", e);
+ fail(e.getMessage());
+ }
+ }, 50, TimeUnit.MILLISECONDS);
+
+ //latch.await();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
new file mode 100644
index 0000000..b3ca959
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
@@ -0,0 +1,82 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Client;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnSubscriberTest {
+
+ public static final Name TOPIC_NAME = new Name("/subscriber");
+ private NdnSubscriber instance;
+ private AnnouncementService announcementService;
+
+ @Before
+ public void before() {
+ Face face = mock(Face.class);
+ announcementService = mock(AnnouncementService.class);
+ Client client = mock(Client.class);
+ instance = new NdnSubscriber(face, TOPIC_NAME, announcementService, client);
+ }
+
+ @Test
+ public void open() throws Exception {
+ instance.open();
+ }
+
+ @Test
+ public void close() throws Exception {
+ instance.close();
+ }
+
+ @Test
+ public void knownPublishers() throws Exception {
+ ArgumentCaptor<On<Long>> existingPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+ ArgumentCaptor<On<Long>> newPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+ when(announcementService.discoverExistingAnnouncements(existingPublishersSignal.capture(), any(), any())).thenReturn(null);
+ when(announcementService.observeNewAnnouncements(newPublishersSignal.capture(), any(), any())).thenReturn(null);
+ instance.open();
+
+ assertEquals(0, instance.knownPublishers().size());
+
+ existingPublishersSignal.getValue().on(42L);
+ assertEquals(1, instance.knownPublishers().size());
+
+ newPublishersSignal.getValue().on(42L);
+ assertEquals(1, instance.knownPublishers().size());
+
+ existingPublishersSignal.getValue().on(99L);
+ assertEquals(2, instance.knownPublishers().size());
+ }
+
+ @Test
+ public void subscribe() throws Exception {
+ instance.add(1);
+ instance.remove(1);
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/TopicTestIT.java b/src/test/java/com/intel/jndn/utils/pubsub/TopicTestIT.java
new file mode 100644
index 0000000..2b06e6d
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/TopicTestIT.java
@@ -0,0 +1,74 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MockKeyChain;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.ThreadPoolFace;
+import net.named_data.jndn.security.KeyChain;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.transport.AsyncTcpTransport;
+import net.named_data.jndn.util.Blob;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class TopicTestIT {
+ private static final Logger LOGGER = Logger.getLogger(TopicTestIT.class.getName());
+
+ @Test
+ public void basicUsage() throws Exception {
+ Face pubFace = connect("localhost");
+ Face subFace = connect("localhost");
+ Topic topic = new Topic(new Name("/pub/sub/topic"));
+ CountDownLatch latch = new CountDownLatch(3);
+
+ Subscriber subscriber = topic.newSubscriber(pubFace);
+ subscriber.subscribe(b -> latch.countDown(), e -> fail(e.getMessage()));
+
+ Publisher publisher = topic.newPublisher(subFace);
+ publisher.publish(new Blob("."));
+ publisher.publish(new Blob(".."));
+ publisher.publish(new Blob("..."));
+
+ latch.await(2, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ private Face connect(String hostName) throws SecurityException {
+ ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
+ AsyncTcpTransport transport = new AsyncTcpTransport(pool);
+ AsyncTcpTransport.ConnectionInfo connectionInfo = new AsyncTcpTransport.ConnectionInfo(hostName, 6363, true);
+ ThreadPoolFace face = new ThreadPoolFace(pool, transport, connectionInfo);
+
+ KeyChain keyChain = MockKeyChain.configure(new Name("/topic/test/it"));
+ face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName());
+
+ return face;
+ }
+}
\ No newline at end of file