Initial PubSub implementation
Includes initial AnnouncementService, NdnSubscriber, and NdnPublisher implementations; TODO still a WIP
diff --git a/src/main/java/com/intel/jndn/utils/Publisher.java b/src/main/java/com/intel/jndn/utils/Publisher.java
new file mode 100644
index 0000000..7755b5c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Publisher.java
@@ -0,0 +1,24 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.util.Blob;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Publisher {
+ void publish(Blob message);
+}
diff --git a/src/main/java/com/intel/jndn/utils/Repository.java b/src/main/java/com/intel/jndn/utils/Repository.java
index fcc0172..6e314f4 100644
--- a/src/main/java/com/intel/jndn/utils/Repository.java
+++ b/src/main/java/com/intel/jndn/utils/Repository.java
@@ -20,7 +20,7 @@
/**
* Define API for storing and retrieving NDN packets
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface Repository {
@@ -29,7 +29,7 @@
*
* @param data a {@link Data} packet
*/
- public void put(Data data);
+ void put(Data data);
/**
* Retrieve a {@link Data} packet in the repository; this method should
@@ -39,7 +39,7 @@
* @return a {@link Data} packet
* @throws DataNotFoundException if the packet is not found
*/
- public Data get(Interest interest) throws DataNotFoundException;
+ Data get(Interest interest) throws DataNotFoundException;
/**
* Check if this repository can satisfy the {@link Interest} with a
@@ -49,10 +49,10 @@
* @param interest the {@link Interest} to attempt to satisfy
* @return true if a {@link Data} exists that satisfies the {@link Interest}
*/
- public boolean satisfies(Interest interest);
+ boolean satisfies(Interest interest);
/**
* Remove all stale {@link Data} packets from the repository.
*/
- public void cleanup();
+ void cleanup();
}
diff --git a/src/main/java/com/intel/jndn/utils/Subscriber.java b/src/main/java/com/intel/jndn/utils/Subscriber.java
new file mode 100644
index 0000000..4949826
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Subscriber.java
@@ -0,0 +1,26 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.pubsub.Cancellation;
+import com.intel.jndn.utils.pubsub.On;
+import net.named_data.jndn.util.Blob;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Subscriber {
+ Cancellation subscribe(On<Blob> onMessage, On<Exception> onError);
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/DataStream.java b/src/main/java/com/intel/jndn/utils/client/DataStream.java
index 7f33870..3f517a3 100644
--- a/src/main/java/com/intel/jndn/utils/client/DataStream.java
+++ b/src/main/java/com/intel/jndn/utils/client/DataStream.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -22,6 +22,8 @@
* Define a stream of {@link Data} packets as they are retrieved from the
* network and provide methods for observing stream events
*
+ * TODO merge with Observable
+ *
* @author Andrew Brown <andrew.brown@intel.com>
*/
public interface DataStream extends OnData, OnTimeout, OnComplete, OnException {
@@ -29,47 +31,47 @@
/**
* @return true if the stream is complete
*/
- public boolean isComplete();
+ boolean isComplete();
/**
* @return the current list of packets retrieved; this may change as more
* packets are added
*/
- public Data[] list();
+ Data[] list();
/**
* @return an assembled packet containing the concatenated bytes of all
* received packets
* @throws StreamException if assembly fails
*/
- public Data assemble() throws StreamException;
+ Data assemble() throws StreamException;
/**
* Watch all {@link OnData} events
*
* @param onData the callback fired
*/
- public void observe(OnData onData);
+ void observe(OnData onData);
/**
* Watch all {@link OnComplete} events
*
* @param onComplete the callback fired
*/
- public void observe(OnComplete onComplete);
+ void observe(OnComplete onComplete);
/**
* Watch all {@link OnException} events
*
* @param onException the callback fired
*/
- public void observe(OnException onException);
+ void observe(OnException onException);
/**
* Watch all {@link OnTimeout} events
*
* @param onTimeout the callback fired
*/
- public void observe(OnTimeout onTimeout);
+ void observe(OnTimeout onTimeout);
}
diff --git a/src/main/java/com/intel/jndn/utils/client/RetryClient.java b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
index 9a5dacb..b559802 100644
--- a/src/main/java/com/intel/jndn/utils/client/RetryClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -13,16 +13,17 @@
*/
package com.intel.jndn.utils.client;
-import java.io.IOException;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.OnData;
import net.named_data.jndn.OnTimeout;
+import java.io.IOException;
+
/**
* Define a client that can retry {@link Interest} packets on timeout.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface RetryClient {
@@ -40,5 +41,5 @@
* @param onTimeout the application's failure callback
* @throws IOException when the client cannot perform the necessary network IO
*/
- public void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException;
+ void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
new file mode 100644
index 0000000..34818a4
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
@@ -0,0 +1,27 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import java.io.IOException;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface AnnouncementService {
+ void announceEntrance(long id) throws IOException;
+ void announceExit(long id) throws IOException;
+ Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError);
+ Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java b/src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java
new file mode 100644
index 0000000..823bd85
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/BlobContentStore.java
@@ -0,0 +1,49 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class BlobContentStore implements ContentStore<Blob> {
+ @Override
+ public void put(Name name, Blob data) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean has(Name name) {
+ return false;
+ }
+
+ @Override
+ public Blob get(Name name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void push(Face face, Name name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java b/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java
new file mode 100644
index 0000000..2fe42df
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Cancellation.java
@@ -0,0 +1,24 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Cancellation {
+ Cancellation CANCELLED = () -> {/* do nothing */};
+
+ void cancel();
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/ContentStore.java b/src/main/java/com/intel/jndn/utils/pubsub/ContentStore.java
new file mode 100644
index 0000000..34c6ac0
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/ContentStore.java
@@ -0,0 +1,35 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+
+import java.io.IOException;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+interface ContentStore<T> {
+ void put(Name name, T data);
+
+ boolean has(Name name);
+
+ T get(Name name);
+
+ void push(Face face, Name name) throws IOException;
+
+ void clear();
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java
new file mode 100644
index 0000000..10e4084
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/ForLoopPendingInterestTable.java
@@ -0,0 +1,51 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class ForLoopPendingInterestTable implements PendingInterestTable {
+ Map<Name, Interest> table = new ConcurrentHashMap<>();
+
+ @Override
+ public void add(Interest interest) {
+ table.put(interest.getName(), interest);
+ }
+
+ @Override
+ public boolean has(Interest interest) {
+ // TODO must handle more complex logic (selectors)
+ return has(interest.getName());
+ }
+
+ public boolean has(Name name) {
+ return table.containsKey(name);
+ }
+
+ @Override
+ public Collection<Interest> extract(Name name) {
+ // TODO more complexity to return multiple results
+ return has(name) ? Collections.singleton(table.get(name)) : Collections.emptyList();
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
new file mode 100644
index 0000000..9679d7e
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
@@ -0,0 +1,163 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.InterestFilter;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterestCallback;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.security.SecurityException;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.intel.jndn.utils.pubsub.Cancellation.CANCELLED;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class NdnAnnouncementService implements AnnouncementService {
+ private static final Logger LOGGER = Logger.getLogger(NdnAnnouncementService.class.getName());
+ private final Face face;
+ private final Name broadcastPrefix;
+ private final Name topicPrefix;
+ private final Name usablePrefix;
+ private final Set<Long> known = new HashSet<>();
+
+ private NdnAnnouncementService(Face face, Name broadcastPrefix, Name topicPrefix) {
+ this.face = face;
+ this.broadcastPrefix = broadcastPrefix;
+ this.topicPrefix = topicPrefix;
+ this.usablePrefix = new Name(broadcastPrefix).append(topicPrefix);
+ }
+
+ NdnAnnouncementService(Face face, Name topicPrefix) {
+ this(face, PubSubNamespace.DEFAULT_BROADCAST_PREFIX, topicPrefix);
+ }
+
+ @Override
+ public void announceEntrance(long id) throws IOException {
+ Name.Component publisherId = Name.Component.fromNumberWithMarker(id, PubSubNamespace.PUBLISHER_ID_MARKER);
+ Name.Component announcementAction = Name.Component.fromNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_ENTRANCE, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
+ Interest interest = new Interest(new Name(broadcastPrefix).append(topicPrefix).append(publisherId).append(announcementAction));
+ face.expressInterest(interest, null);
+ }
+
+ @Override
+ public void announceExit(long id) throws IOException {
+ Name.Component publisherId = Name.Component.fromNumberWithMarker(id, PubSubNamespace.PUBLISHER_ID_MARKER);
+ Name.Component announcementAction = Name.Component.fromNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_EXIT, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
+ Interest interest = new Interest(new Name(broadcastPrefix).append(topicPrefix).append(publisherId).append(announcementAction));
+ face.expressInterest(interest, null);
+ }
+
+ @Override
+ public Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) {
+ if (onFound == null) {
+ return CANCELLED;
+ }
+
+ for (long id : known) {
+ LOGGER.info("Passing known publisher: " + id);
+ onFound.on(id);
+ }
+
+ // TODO while !backoff interval maxed out, send out interests with excludes
+ Interest interest = new Interest();
+ try {
+ long pendingInterest = face.expressInterest(interest, (i, d) -> {
+ }, (i) -> {
+ });
+ return () -> face.removePendingInterest(pendingInterest);
+ } catch (IOException e) {
+ onError.on(e);
+ return CANCELLED;
+ }
+ }
+
+ private void found(long publisherId) {
+ known.add(publisherId);
+ }
+
+ @Override
+ public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws RegistrationFailureException {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ OnRegistration onRegistration = new OnRegistration(future);
+ OnAnnouncement onAnnouncement = new OnAnnouncement(onAdded, onRemoved, onError);
+
+ try {
+ long registeredPrefix = face.registerPrefix(usablePrefix, onAnnouncement, (OnRegisterFailed) onRegistration, onRegistration);
+ return () -> face.removeRegisteredPrefix(registeredPrefix);
+ } catch (IOException | SecurityException e) {
+ throw new RegistrationFailureException(e);
+ }
+ }
+
+ private class OnAnnouncement implements OnInterestCallback {
+ private final On<Long> onAdded;
+ private final On<Long> onRemoved;
+ private final On<Exception> onError;
+
+ OnAnnouncement(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) {
+ this.onAdded = onAdded;
+ this.onRemoved = onRemoved;
+ this.onError = onError;
+ }
+
+ @Override
+ public void onInterest(Name name, Interest interest, Face face, long l, InterestFilter interestFilter) {
+ try {
+ long publisherId = interest.getName().get(-2).toNumberWithMarker(PubSubNamespace.PUBLISHER_ID_MARKER);
+ int announcement = (int) interest.getName().get(-1).toNumberWithMarker(PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
+ switch (announcement) {
+ case PubSubNamespace.ANNOUNCEMENT_ENTRANCE:
+ add(publisherId);
+ break;
+ case PubSubNamespace.ANNOUNCEMENT_EXIT:
+ remove(publisherId);
+ break;
+ default:
+ LOGGER.warning("Unknown announcement action, ignoring: " + interest.toUri());
+ }
+ } catch (EncodingException e) {
+ LOGGER.log(Level.SEVERE, "Failed to decode announcement: " + interest.toUri(), e);
+ onError.on(e);
+ }
+ }
+
+ private void remove(long publisherId) {
+ LOGGER.info("Publisher leaving topic: " + publisherId);
+ known.remove(publisherId);
+ if (onRemoved != null) {
+ onRemoved.on(publisherId);
+ }
+ }
+
+ private void add(long publisherId) {
+ LOGGER.info("Publisher entering topic: " + publisherId);
+ known.add(publisherId);
+ if (onAdded != null) {
+ onAdded.on(publisherId);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
new file mode 100644
index 0000000..50469ea
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -0,0 +1,120 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Publisher;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.InterestFilter;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterestCallback;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class NdnPublisher implements Publisher, OnInterestCallback {
+ private static final Logger LOGGER = Logger.getLogger(NdnPublisher.class.getName());
+ private final Face face; // TODO only needed in start, remove?
+ private final Name prefix; // TODO only needed in start, remove?
+ private final AnnouncementService announcementService;
+ private final PendingInterestTable pendingInterestTable;
+ private final ContentStore<Blob> contentStore;
+ private final long publisherId;
+ private volatile long latestMessageId = 0;
+ private long registrationId;
+ // TODO need pit
+
+ NdnPublisher(Face face, Name prefix, long publisherId, AnnouncementService announcementService, PendingInterestTable pendingInterestTable, ContentStore<Blob> contentStore) {
+ this.face = face;
+ this.prefix = prefix;
+ this.publisherId = publisherId;
+ this.announcementService = announcementService;
+ this.pendingInterestTable = pendingInterestTable;
+ this.contentStore = contentStore;
+ }
+
+ public void start() throws RegistrationFailureException {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ OnRegistration onRegistration = new OnRegistration(future);
+
+ try {
+ registrationId = face.registerPrefix(prefix, this, (OnRegisterFailed) onRegistration, onRegistration);
+ future.get(10, TimeUnit.SECONDS);
+ announcementService.announceEntrance(publisherId);
+ } catch (IOException | SecurityException | InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RegistrationFailureException(e);
+ }
+ }
+
+ public void stop() throws IOException {
+ face.removeRegisteredPrefix(registrationId);
+ contentStore.clear();
+ announcementService.announceExit(publisherId);
+ }
+
+ // TODO should throw IOException?
+ @Override
+ public void publish(Blob message) {
+ long id = latestMessageId++; // TODO synchronize?
+ Name name = PubSubNamespace.toMessageName(prefix, publisherId, id);
+
+ contentStore.put(name, message);
+ LOGGER.log(Level.INFO, "Published message {0} to content store", id);
+
+ if(pendingInterestTable.has(new Interest(name))){
+ try {
+ contentStore.push(face, name);
+ } catch (IOException e) {
+ LOGGER.log(Level.SEVERE, "Failed to send message {0} for pending interests: {1}", new Object[]{id, name});
+ }
+ }
+ }
+
+ @Override
+ public void onInterest(Name name, Interest interest, Face face, long registrationId, InterestFilter interestFilter) {
+ try {
+ if(contentStore.has(interest.getName())){
+ contentStore.push(face, interest.getName());
+ } else {
+ pendingInterestTable.add(interest);
+ }
+
+ long id = interest.getName().get(-1).toNumberWithMarker(43);
+ Blob blob = contentStore.get(interest.getName());
+ Data data = new Data(interest.getName());
+ data.setContent(blob);
+ face.putData(data);
+ LOGGER.info("Published message " + id + " for interest: " + interest.toUri());
+ } catch (IOException e) {
+ LOGGER.log(Level.SEVERE, "Failed to publish message for interest: " + interest.toUri(), e);
+ } catch (EncodingException e) {
+ LOGGER.log(Level.SEVERE, "Failed to decode message ID for interest: " + interest.toUri(), e);
+ }
+ }
+
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
new file mode 100644
index 0000000..556b91f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
@@ -0,0 +1,197 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Client;
+import com.intel.jndn.utils.Subscriber;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class NdnSubscriber implements Subscriber {
+ private static final Logger LOGGER = Logger.getLogger(NdnSubscriber.class.getName());
+ private final Face face;
+ private final Name prefix;
+ private final AnnouncementService announcementService;
+ private final Client client;
+ private final Set<Context> known = new HashSet<>();
+ private Cancellation newAnnouncementCancellation;
+ private Cancellation existingAnnouncementsCancellation;
+ private volatile boolean started = false;
+
+ NdnSubscriber(Face face, Name prefix, AnnouncementService announcementService, Client client) {
+ this.face = face;
+ this.prefix = prefix;
+ this.announcementService = announcementService;
+ this.client = client;
+ }
+
+ private void start() throws RegistrationFailureException {
+ LOGGER.log(Level.INFO, "Starting subscriber");
+
+ existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::add, null, e -> stop());
+ newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::add, this::remove, e -> stop());
+
+ started = true;
+ }
+
+ private void add(long publisherId) {
+ known.add(new Context(publisherId));
+ }
+
+ private void remove(long publisherId) {
+ known.remove(publisherId); // TODO incorrect
+ }
+
+ private void stop() {
+ LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{known.size(), known});
+
+ if (newAnnouncementCancellation != null) {
+ newAnnouncementCancellation.cancel();
+ }
+ if (existingAnnouncementsCancellation != null) {
+ existingAnnouncementsCancellation.cancel();
+ }
+ for (Context c : known) {
+ c.cancel();
+ }
+
+ started = false;
+ }
+
+ Set<Long> knownPublishers() {
+ return known.stream().map(c -> c.publisherId).collect(Collectors.toSet());
+ }
+
+ // TODO repeated calls?
+ @Override
+ public Cancellation subscribe(On<Blob> onMessage, On<Exception> onError) {
+ if (!started) {
+ try {
+ start();
+ } catch (RegistrationFailureException e) {
+ LOGGER.log(Level.SEVERE, "Failed to start announcement service, aborting subscription", e);
+ onError.on(e);
+ return Cancellation.CANCELLED;
+ }
+ }
+
+ for (Context c : known) {
+ c.subscribe(onMessage, onError);
+ }
+
+ return this::stop;
+ }
+
+ private class Context implements Cancellation {
+ final long publisherId;
+ long messageId;
+ boolean subscribed = false;
+ On<Blob> onMessage;
+ On<Exception> onError;
+ CompletableFuture<Data> currentRequest;
+
+ Context(long publisherId) {
+ this.publisherId = publisherId;
+ }
+
+ synchronized void subscribe(On<Blob> onMessage, On<Exception> onError) {
+ this.onMessage = onMessage;
+ this.onError = onError;
+
+ if (subscribed) {
+ return;
+ }
+
+ currentRequest = client.getAsync(face, buildLatestInterest(publisherId)); // TODO backoff
+ currentRequest.handle(this::handleResponse);
+ subscribed = true;
+ }
+
+ @Override
+ public synchronized void cancel() {
+ if (currentRequest != null) {
+ currentRequest.cancel(true);
+ }
+ }
+
+ private Void handleResponse(Data data, Throwable throwable) {
+ if (throwable != null) {
+ onError.on((Exception) throwable); // TODO avoid cast?
+ } else {
+ try {
+ Response response = PubSubNamespace.toResponse(data);
+ this.messageId = response.messageId();
+ onMessage.on(response.content()); // TODO buffer and catch exceptions
+ } catch (EncodingException e) {
+ onError.on(e);
+ }
+
+ // TODO catchup and loop detection (if above fails next() is called again with same params)
+ next(this.publisherId, this.messageId);
+ }
+
+ return null;
+ }
+
+ private void next(long publisherId, long messageId) {
+ CompletableFuture<Data> nextRequest = client.getAsync(face, buildNextInterest(publisherId, messageId));
+ nextRequest.handle(this::handleResponse);
+ }
+
+ private Interest buildLatestInterest(long publisherId) {
+ Name name = PubSubNamespace.toPublisherName(prefix, publisherId);
+ Interest interest = new Interest(name); // TODO ms lifetime
+ interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+ return interest;
+ }
+
+ private Interest buildNextInterest(long publisherId, long messageId) {
+ Name name = PubSubNamespace.toMessageName(prefix, publisherId, messageId + 1);
+ return new Interest(name); // TODO ms lifetime
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Context context = (Context) o;
+ return publisherId == context.publisherId;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (publisherId ^ (publisherId >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "Context{" + "publisherId=" + publisherId + '}';
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/On.java b/src/main/java/com/intel/jndn/utils/pubsub/On.java
new file mode 100644
index 0000000..5bb7951
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/On.java
@@ -0,0 +1,28 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+/**
+ * Generic callback API for receiving some signal T when an event is fired.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+@FunctionalInterface
+public interface On<T> {
+ /**
+ * @param event the event object describing the occurrence
+ */
+ void on(T event);
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
new file mode 100644
index 0000000..05f631c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
@@ -0,0 +1,47 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.OnRegisterSuccess;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Logger;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class OnRegistration implements OnRegisterFailed, OnRegisterSuccess {
+ private static final Logger LOGGER = Logger.getLogger(OnRegistration.class.getName());
+ private final CompletableFuture<Void> future;
+
+ OnRegistration(CompletableFuture<Void> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void onRegisterSuccess(Name name, long l) {
+ LOGGER.info("Registered prefix: " + name);
+ future.complete(null);
+ }
+
+ @Override
+ public void onRegisterFailed(Name name) {
+ String message = "Failed to register prefix: " + name;
+ LOGGER.severe(message);
+ future.completeExceptionally(new RegistrationFailureException(message));
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java b/src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java
new file mode 100644
index 0000000..410a565
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PendingInterestTable.java
@@ -0,0 +1,42 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface PendingInterestTable {
+ /**
+ * @param interest the PIT entry to add
+ */
+ void add(Interest interest);
+
+ /**
+ * @param interest the incoming interest to match against
+ * @return true if the interest matches an entry already in the PIT
+ */
+ boolean has(Interest interest);
+
+ /**
+ * @param name the name to match against
+ * @return the PIT entries matching a name, removing them from the PIT
+ */
+ Collection<Interest> extract(Name name);
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
new file mode 100644
index 0000000..5df1c79
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
@@ -0,0 +1,75 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.encoding.tlv.TlvDecoder;
+import net.named_data.jndn.util.Blob;
+
+import java.util.Collections;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class PubSubNamespace {
+ static final int MESSAGE_ID_MARKER = 42;
+ static final int PUBLISHER_ID_MARKER = 43;
+ static final int ANNOUNCEMENT_ACTION_MARKER = 44;
+
+ static final int ANNOUNCEMENT_ENTRANCE = 1;
+ static final int ANNOUNCEMENT_EXIT = 0;
+
+ static final int MESSAGE_MARKER = 100;
+ static final int MESSAGE_ATTRIBUTES_MARKER = 101;
+ static final int MESSAGE_CONTENT_MARKER = 102;
+
+ static final Name DEFAULT_BROADCAST_PREFIX = new Name("/bcast");
+
+ private PubSubNamespace() {
+ // do not instantiate this class
+ }
+
+ static Name toPublisherName(Name prefix, long publisherId) {
+ Name.Component publisherComponent = Name.Component.fromNumberWithMarker(publisherId, PubSubNamespace.PUBLISHER_ID_MARKER);
+ return new Name(prefix).append(publisherComponent);
+ }
+
+ static Name toMessageName(Name prefix, long publisherId, long messageId) {
+ Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);
+ return toPublisherName(prefix, publisherId).append(messageComponent);
+ }
+
+ static Response toResponse(Data data) throws EncodingException {
+ long messageId = extractMessageId(data.getName());
+ TlvDecoder decoder = new TlvDecoder(data.getContent().buf());
+ int endOffset = decoder.readNestedTlvsStart(MESSAGE_MARKER);
+ Blob attributes = new Blob(decoder.readOptionalBlobTlv(MESSAGE_ATTRIBUTES_MARKER, endOffset), true);
+ Blob content = new Blob(decoder.readBlobTlv(MESSAGE_CONTENT_MARKER), true);
+ return new Response(data.getName(), messageId, Collections.singletonMap("*", attributes), content);
+ }
+
+ private static long extractMessageId(Name name) throws EncodingException {
+ for (int i = name.size(); i >= 0; i--) {
+ try {
+ return name.get(i).toNumberWithMarker(MESSAGE_ID_MARKER);
+ } catch (EncodingException e) {
+ // do nothing
+ }
+ }
+ throw new EncodingException("Failed to find message ID marker in name: " + name.toUri());
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java b/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java
new file mode 100644
index 0000000..aa7919d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java
@@ -0,0 +1,28 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class RegistrationFailureException extends Exception {
+ RegistrationFailureException(String message) {
+ super(message);
+ }
+
+ public RegistrationFailureException(Exception e) {
+ super(e);
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Response.java b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
new file mode 100644
index 0000000..9d84f44
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
@@ -0,0 +1,45 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.util.Map;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class Response {
+ private final Name name;
+ private final long messageId;
+ private final Map<String, Object> attributes;
+ private final Blob content;
+
+ Response(Name name, long messageId, Map<String, Object> attributes, Blob content) {
+ this.name = name;
+ this.messageId = messageId;
+ this.attributes = attributes;
+ this.content = content;
+ }
+
+ long messageId() {
+ return messageId;
+ }
+
+ Blob content() {
+ return content;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Topic.java b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
new file mode 100644
index 0000000..71c7d9f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Topic.java
@@ -0,0 +1,46 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public final class Topic {
+ private final Name name;
+
+ public Topic(Name name) {
+ this.name = name;
+ }
+
+ public Name name() {
+ return new Name(name);
+ }
+
+ // TODO move to PubSubFactory?
+ public Subscriber newSubscriber(Face face) {
+ return new NdnSubscriber(face, name, new NdnAnnouncementService(face, name), new AdvancedClient());
+ }
+
+ // TODO move to PubSubFactory?
+ public Publisher newPublisher(Face face) {
+ return new NdnPublisher(face, name, 23, new NdnAnnouncementService(face, name), new ForLoopPendingInterestTable(), new BlobContentStore());
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
index e40890d..e83e4e9 100644
--- a/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
+++ b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
@@ -26,7 +26,7 @@
* {@link net.named_data.jndn.util.MemoryContentCache} and borrows the matching
* logic from there.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class ForLoopRepository implements Repository {
@@ -95,7 +95,7 @@
* components (e.g. c); if the Data is not longer than the Interest, return an
* empty component.
*/
- private Name.Component getNextComponentAfterLastInterestComponent(Data content, Interest interest) {
+ private static Name.Component getNextComponentAfterLastInterestComponent(Data content, Interest interest) {
if (content.getName().size() > interest.getName().size()) {
return content.getName().get(interest.getName().size());
} else {
@@ -132,11 +132,7 @@
* record is fresh
*/
private boolean hasAcceptableFreshness(Interest interest, Record record) {
- if (!interest.getMustBeFresh()) {
- return true;
- } else {
- return isFresh(record);
- }
+ return !interest.getMustBeFresh() || isFresh(record);
}
/**
@@ -171,10 +167,10 @@
*/
private class Record {
- public final Data data;
- public final long addedAt;
+ final Data data;
+ final long addedAt;
- public Record(Data data) {
+ Record(Data data) {
this.data = data;
this.addedAt = System.currentTimeMillis();
}