Initial pubsub; waiting on MockFace to fully test
diff --git a/src/main/java/com/intel/jndn/utils/Publisher.java b/src/main/java/com/intel/jndn/utils/Publisher.java
index 862f001..0ae79b1 100644
--- a/src/main/java/com/intel/jndn/utils/Publisher.java
+++ b/src/main/java/com/intel/jndn/utils/Publisher.java
@@ -41,175 +41,188 @@
*/
public class Publisher implements OnInterest, OnRegisterFailed {
- public static final long DEFAULT_TIMEOUT = 2000;
- private static final Logger logger = LogManager.getLogger();
- protected HashMap<Class, String> types = new HashMap<>();
- protected HashMap<Name, PublishedObject> published = new HashMap<>();
- protected Face face;
- protected Name channel;
- protected boolean registered = true;
- protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives
+ public static final long DEFAULT_TIMEOUT = 2000;
+ private static final Logger logger = LogManager.getLogger();
+ protected HashMap<Class, String> types = new HashMap<>();
+ protected HashMap<Name, PublishedObject> published = new HashMap<>();
+ protected Face face;
+ protected Name channel;
+ protected boolean registered = true;
+ protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives
- /**
- * Initialize a publisher on the given channel; published objects will
- * persist as long as this instance lives.
- *
- * @param face
- * @param channel
- */
- public Publisher(final Face face, final Name channel) {
- this.face = face;
- this.channel = channel;
- try {
- this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
- } catch (IOException e) {
- logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
- } catch (SecurityException e) {
- logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
- }
- }
+ /**
+ * Initialize a publisher on the given channel; published objects will persist
+ * as long as this instance lives.
+ *
+ * @param face
+ * @param channel
+ * @param registerOnForwarder set this to false to disable interaction with the forwarder (e.g. as in tests)
+ */
+ public Publisher(final Face face, final Name channel, boolean registerOnForwarder) {
+ this.face = face;
+ this.channel = channel;
+ if (registerOnForwarder) {
+ try {
+ this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
+ } catch (IOException e) {
+ logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
+ } catch (SecurityException e) {
+ logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
+ }
+ }
+ }
- /**
- * Initialize a publisher on the given channel; limit the lifetime of
- * published objects to save memory.
- *
- * @param face
- * @param channel
- * @param publishLifetime
- */
- public Publisher(final Face face, final Name channel, long publishLifetime) {
- this(face, channel);
- this.publishLifetime = publishLifetime;
- }
+ /**
+ * Initialize a publisher on the given channel; published objects will persist
+ * as long as this instance lives.
+ *
+ * @param face
+ * @param channel
+ */
+ public Publisher(final Face face, final Name channel) {
+ this(face, channel, true);
+ }
- /**
- * Add a type and its alias to this publisher
- *
- * @param typeAlias
- * @param type
- */
- public void addType(String typeAlias, Class type) {
- types.put(type, typeAlias);
- }
+ /**
+ * Initialize a publisher on the given channel; limit the lifetime of
+ * published objects to save memory.
+ *
+ * @param face
+ * @param channel
+ * @param publishLifetime
+ */
+ public Publisher(final Face face, final Name channel, long publishLifetime) {
+ this(face, channel);
+ this.publishLifetime = publishLifetime;
+ }
+ /**
+ * Add a type and its alias to this publisher
+ *
+ * @param typeAlias
+ * @param type
+ */
+ public void addType(String typeAlias, Class type) {
+ types.put(type, typeAlias);
+ }
- /**
- * Publish the given object by sending an alert to all subscribers
- *
- * @param publishedObject
- */
- public void publish(Object publishedObject) {
- // check if this publisher is registered on the face
- if (!registered) {
- logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri());
- return;
- }
+ /**
+ * Publish the given object by sending an alert to all subscribers
+ *
+ * @param publishedObject
+ */
+ public void publish(Object publishedObject) {
+ // check if this publisher is registered on the face
+ if (!registered) {
+ logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri());
+ return;
+ }
- // check known types
- if (!types.containsKey(publishedObject.getClass())) {
- logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName());
- return;
- }
+ // check known types
+ if (!types.containsKey(publishedObject.getClass())) {
+ logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName());
+ return;
+ }
- // check if old published objects need to be removed
- clearOldPublished();
+ // check if old published objects need to be removed
+ clearOldPublished();
- // track published objects
- Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis());
- published.put(publishedName, new PublishedObject(publishedObject));
+ // track published objects
+ Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis());
+ published.put(publishedName, new PublishedObject(publishedObject));
- // send out alert and don't wait for response
- Client client = new Client();
- client.get(face, getAlert(publishedName));
- logger.debug("Notified channel of new published object: " + publishedName.toUri());
- }
+ // send out alert and don't wait for response
+ Client client = new Client();
+ client.get(face, getAlert(publishedName));
+ logger.debug("Notified channel of new published object: " + publishedName.toUri());
+ }
- /**
- * Handle incoming requests for published objects; will check the published
- * list and respond with objects that exactly match the incoming interest.
- * TODO signing, encryption
- *
- * @param prefix
- * @param interest
- * @param transport
- * @param registeredPrefixId
- */
- @Override
- public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
- // check for old published objects
- clearOldPublished();
+ /**
+ * Handle incoming requests for published objects; will check the published
+ * list and respond with objects that exactly match the incoming interest.
+ * TODO signing, encryption
+ *
+ * @param prefix
+ * @param interest
+ * @param transport
+ * @param registeredPrefixId
+ */
+ @Override
+ public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
+ // check for old published objects
+ clearOldPublished();
- // publish object if available
- if (published.containsKey(interest.getName())) {
- try {
- Data data = new Data(interest.getName());
- data.setContent(new Blob("TODO parse object"));
- if (publishLifetime != -1) {
- data.getMetaInfo().setFreshnessPeriod(publishLifetime);
- }
- // data.setSignature(...);
- transport.send(data.wireEncode().signedBuf());
- logger.debug("Sent data: " + interest.getName().toUri());
- } catch (IOException e) {
- logger.error("Failed to send data: " + interest.getName().toUri());
- }
- }
- }
+ // publish object if available
+ if (published.containsKey(interest.getName())) {
+ try {
+ Data data = new Data(interest.getName());
+ data.setContent(new Blob("TODO parse object"));
+ if (publishLifetime != -1) {
+ data.getMetaInfo().setFreshnessPeriod(publishLifetime);
+ }
+ // data.setSignature(...);
+ transport.send(data.wireEncode().signedBuf());
+ logger.debug("Sent data: " + interest.getName().toUri());
+ } catch (IOException e) {
+ logger.error("Failed to send data: " + interest.getName().toUri());
+ }
+ }
+ }
- /**
- * Handle registration failure; this will stop the publisher from sending
- * notifications since the routes will not be setup to respond
- *
- * @param prefix
- */
- @Override
- public void onRegisterFailed(Name prefix) {
- logger.error("Failed to register publisher: " + channel.toUri());
- registered = false;
- }
+ /**
+ * Handle registration failure; this will stop the publisher from sending
+ * notifications since the routes will not be setup to respond
+ *
+ * @param prefix
+ */
+ @Override
+ public void onRegisterFailed(Name prefix) {
+ logger.error("Failed to register publisher: " + channel.toUri());
+ registered = false;
+ }
- /**
- * Remove any published objects that have outlived their lifetime; a
- * lifetime of -1 persists them forever.
- */
- public void clearOldPublished() {
- if (publishLifetime == -1) {
- return;
- }
- for (Entry<Name, PublishedObject> e : published.entrySet()) {
- if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) {
- published.remove(e.getKey());
- logger.debug("Removing old published object: " + e.getKey().toUri());
- }
- }
- }
-
- /**
- * Build an alert Interest to notify subscribers to retrieve data from this
- * publisher; the Interest will have the form
- * /this/published/channel/ALERT/[encoded channel to request]
- *
- * @param nameToPublish
- * @return
- */
- protected Interest getAlert(Name nameToPublish) {
- Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode());
- Interest alert = new Interest(alertName);
- alert.setMustBeFresh(true); // interest must reach the application subscribers
- alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT);
- return alert;
- }
+ /**
+ * Remove any published objects that have outlived their lifetime; a lifetime
+ * of -1 persists them forever.
+ */
+ public void clearOldPublished() {
+ if (publishLifetime == -1) {
+ return;
+ }
+ for (Entry<Name, PublishedObject> e : published.entrySet()) {
+ if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) {
+ published.remove(e.getKey());
+ logger.debug("Removing old published object: " + e.getKey().toUri());
+ }
+ }
+ }
- /**
- * Helper class to track published objects and their timestamps
- */
- private class PublishedObject {
+ /**
+ * Build an alert Interest to notify subscribers to retrieve data from this
+ * publisher; the Interest will have the form
+ * /this/published/channel/ALERT/[encoded channel to request]
+ *
+ * @param nameToPublish
+ * @return
+ */
+ protected Interest getAlert(Name nameToPublish) {
+ Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode());
+ Interest alert = new Interest(alertName);
+ alert.setMustBeFresh(true); // interest must reach the application subscribers
+ alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT);
+ return alert;
+ }
- Object publishedObject;
- long publishedOn;
+ /**
+ * Helper class to track published objects and their timestamps
+ */
+ private class PublishedObject {
- public PublishedObject(Object object) {
- publishedObject = object;
- }
- }
+ Object publishedObject;
+ long publishedOn;
+
+ public PublishedObject(Object object) {
+ publishedObject = object;
+ }
+ }
}