Move Publisher, Subscriber to jndn-pubsub
diff --git a/src/main/java/com/intel/jndn/utils/InternalFace.java b/src/main/java/com/intel/jndn/utils/InternalFace.java
index 980a457..1985902 100644
--- a/src/main/java/com/intel/jndn/utils/InternalFace.java
+++ b/src/main/java/com/intel/jndn/utils/InternalFace.java
@@ -11,7 +11,7 @@
package com.intel.jndn.utils;
/**
- *
+ * TODO waiting on Face to become overridable
* @author Andrew Brown <andrew.brown@intel.com>
*/
public class InternalFace {
diff --git a/src/main/java/com/intel/jndn/utils/Publisher.java b/src/main/java/com/intel/jndn/utils/Publisher.java
deleted file mode 100644
index 0ae79b1..0000000
--- a/src/main/java/com/intel/jndn/utils/Publisher.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * File channel: Publisher.java
- *
- * Purpose: Provide the publisher side of a pub-sub API.
- *
- * © Copyright Intel Corporation. All rights reserved.
- * Intel Corporation, 2200 Mission College Boulevard,
- * Santa Clara, CA 95052-8119, USA
- */
-package com.intel.jndn.utils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.ForwardingFlags;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.OnInterest;
-import net.named_data.jndn.OnRegisterFailed;
-import net.named_data.jndn.security.SecurityException;
-import net.named_data.jndn.transport.Transport;
-import net.named_data.jndn.util.Blob;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * This implementation requires both ends (publisher and subscriber) to be
- * routable to each other. When this class receives data to publish (through
- * publish()), it sends an alert to all subscribers indicating new data is
- * available to retrieve; subscribers then retrieve this data using the channel
- * provided in the alert packet. The flow is as follows: - Publisher listens on
- * a given channel (e.g. /my/channel) - Subscriber also listens on a given
- * channel (e.g. /my/channel) - Publisher receives a new Foo to publish -
- * Publisher notifies subscribers of new data (e.g.
- * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber
- * retrieves data (e.g. /my/channel/foo/[timestamp])
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class Publisher implements OnInterest, OnRegisterFailed {
-
- public static final long DEFAULT_TIMEOUT = 2000;
- private static final Logger logger = LogManager.getLogger();
- protected HashMap<Class, String> types = new HashMap<>();
- protected HashMap<Name, PublishedObject> published = new HashMap<>();
- protected Face face;
- protected Name channel;
- protected boolean registered = true;
- protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives
-
- /**
- * Initialize a publisher on the given channel; published objects will persist
- * as long as this instance lives.
- *
- * @param face
- * @param channel
- * @param registerOnForwarder set this to false to disable interaction with the forwarder (e.g. as in tests)
- */
- public Publisher(final Face face, final Name channel, boolean registerOnForwarder) {
- this.face = face;
- this.channel = channel;
- if (registerOnForwarder) {
- try {
- this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
- } catch (IOException e) {
- logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
- } catch (SecurityException e) {
- logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
- }
- }
- }
-
- /**
- * Initialize a publisher on the given channel; published objects will persist
- * as long as this instance lives.
- *
- * @param face
- * @param channel
- */
- public Publisher(final Face face, final Name channel) {
- this(face, channel, true);
- }
-
- /**
- * Initialize a publisher on the given channel; limit the lifetime of
- * published objects to save memory.
- *
- * @param face
- * @param channel
- * @param publishLifetime
- */
- public Publisher(final Face face, final Name channel, long publishLifetime) {
- this(face, channel);
- this.publishLifetime = publishLifetime;
- }
-
- /**
- * Add a type and its alias to this publisher
- *
- * @param typeAlias
- * @param type
- */
- public void addType(String typeAlias, Class type) {
- types.put(type, typeAlias);
- }
-
- /**
- * Publish the given object by sending an alert to all subscribers
- *
- * @param publishedObject
- */
- public void publish(Object publishedObject) {
- // check if this publisher is registered on the face
- if (!registered) {
- logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri());
- return;
- }
-
- // check known types
- if (!types.containsKey(publishedObject.getClass())) {
- logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName());
- return;
- }
-
- // check if old published objects need to be removed
- clearOldPublished();
-
- // track published objects
- Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis());
- published.put(publishedName, new PublishedObject(publishedObject));
-
- // send out alert and don't wait for response
- Client client = new Client();
- client.get(face, getAlert(publishedName));
- logger.debug("Notified channel of new published object: " + publishedName.toUri());
- }
-
- /**
- * Handle incoming requests for published objects; will check the published
- * list and respond with objects that exactly match the incoming interest.
- * TODO signing, encryption
- *
- * @param prefix
- * @param interest
- * @param transport
- * @param registeredPrefixId
- */
- @Override
- public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
- // check for old published objects
- clearOldPublished();
-
- // publish object if available
- if (published.containsKey(interest.getName())) {
- try {
- Data data = new Data(interest.getName());
- data.setContent(new Blob("TODO parse object"));
- if (publishLifetime != -1) {
- data.getMetaInfo().setFreshnessPeriod(publishLifetime);
- }
- // data.setSignature(...);
- transport.send(data.wireEncode().signedBuf());
- logger.debug("Sent data: " + interest.getName().toUri());
- } catch (IOException e) {
- logger.error("Failed to send data: " + interest.getName().toUri());
- }
- }
- }
-
- /**
- * Handle registration failure; this will stop the publisher from sending
- * notifications since the routes will not be setup to respond
- *
- * @param prefix
- */
- @Override
- public void onRegisterFailed(Name prefix) {
- logger.error("Failed to register publisher: " + channel.toUri());
- registered = false;
- }
-
- /**
- * Remove any published objects that have outlived their lifetime; a lifetime
- * of -1 persists them forever.
- */
- public void clearOldPublished() {
- if (publishLifetime == -1) {
- return;
- }
- for (Entry<Name, PublishedObject> e : published.entrySet()) {
- if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) {
- published.remove(e.getKey());
- logger.debug("Removing old published object: " + e.getKey().toUri());
- }
- }
- }
-
- /**
- * Build an alert Interest to notify subscribers to retrieve data from this
- * publisher; the Interest will have the form
- * /this/published/channel/ALERT/[encoded channel to request]
- *
- * @param nameToPublish
- * @return
- */
- protected Interest getAlert(Name nameToPublish) {
- Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode());
- Interest alert = new Interest(alertName);
- alert.setMustBeFresh(true); // interest must reach the application subscribers
- alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT);
- return alert;
- }
-
- /**
- * Helper class to track published objects and their timestamps
- */
- private class PublishedObject {
-
- Object publishedObject;
- long publishedOn;
-
- public PublishedObject(Object object) {
- publishedObject = object;
- }
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/Subscriber.java b/src/main/java/com/intel/jndn/utils/Subscriber.java
deleted file mode 100644
index df6d375..0000000
--- a/src/main/java/com/intel/jndn/utils/Subscriber.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * File name: Subscriber.java
- *
- * Purpose: Provide the subscriber side of a pub-sub API.
- *
- * © Copyright Intel Corporation. All rights reserved.
- * Intel Corporation, 2200 Mission College Boulevard,
- * Santa Clara, CA 95052-8119, USA
- */
-package com.intel.jndn.utils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.ForwardingFlags;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.Name.Component;
-import net.named_data.jndn.OnInterest;
-import net.named_data.jndn.OnRegisterFailed;
-import net.named_data.jndn.encoding.EncodingException;
-import net.named_data.jndn.transport.Transport;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * This implementation requires both ends (publisher and subscriber) to be
- * routable to each other. When this class receives data to publish (through
- * publish()), it sends an alert to all subscribers indicating new data is
- * available to retrieve; subscribers then retrieve this data using the channel
- * provided in the alert packet. The flow is as follows: - Publisher listens on
- * a given channel (e.g. /my/channel) - Subscriber also listens on a given
- * channel (e.g. /my/channel) - Publisher receives a new Foo to publish -
- * Publisher notifies subscribers of new data (e.g.
- * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber
- * retrieves data (e.g. /my/channel/foo/[timestamp])
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class Subscriber implements OnInterest, OnRegisterFailed {
-
- public static final long DEFAULT_TIMEOUT = 2000;
- private static final Logger logger = LogManager.getLogger();
- protected HashMap<String, Class> types = new HashMap<>();
- protected HashMap<Class, OnPublish> handlers = new HashMap<>();
- protected Face face;
- protected Name channel;
-
- /**
- * Initialize a publisher on the given channel; published objects will persist
- * as long as this instance lives.
- *
- * @param face
- * @param channel
- * @param registerOnForwarder set this to false to disable interaction with the forwarder (e.g. as in tests)
- */
- public Subscriber(final Face face, final Name channel, boolean registerOnForwarder) {
- this.face = face;
- this.channel = channel;
- if (registerOnForwarder) {
- try {
- this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
- } catch (IOException e) {
- logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
- } catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
- }
- }
- }
-
- /**
- * Initialize a publisher on the given channel; published objects will persist
- * as long as this instance lives.
- *
- * @param face
- * @param channel
- */
- public Subscriber(final Face face, final Name channel) {
- this(face, channel, true);
- }
-
- /**
- * Add a type and its alias to this publisher
- *
- * @param typeAlias
- * @param type
- */
- public void addType(String typeAlias, Class type) {
- types.put(typeAlias, type);
- }
-
- /**
- * Functional interface defining action to take upon receipt of a published
- * object
- *
- * @param <T>
- */
- public interface OnPublish<T> {
-
- public void onPublish(T publishedObject);
- }
-
- /**
- * Register a handler for receipt of a certain published type
- *
- * @param <T>
- * @param type
- * @param handler
- */
- public <T> void on(Class<T> type, OnPublish<T> handler) {
- handlers.put(type, handler);
- }
-
- /**
- * Register a handler for receipt of a certain published type
- *
- * @param typeAlias
- * @param handler
- */
- public void on(String typeAlias, OnPublish handler) {
- if (types.containsKey(typeAlias)) {
- handlers.put(types.get(typeAlias), handler);
- } else {
- logger.warn("Unrecognized type (no handler registered): " + typeAlias);
- }
- }
-
- /**
- * Handle alert notifications for published objects; once the published object
- * is parsed, this will pass the object to the handle registered in on().
- *
- * @param prefix
- * @param interest
- * @param transport
- * @param registeredPrefixId
- */
- @Override
- public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
- // check format
- if (!isAlert(interest)) {
- logger.warn("Incoming interest was not in ALERT format: " + interest.getName().toUri());
- return;
- }
-
- // check signature
- // TODO
- // retrieve name
- Name publishedName = getAlertName(interest);
- if (publishedName == null) {
- return;
- }
-
- // retrieve data
- Client client = new Client();
- Data data = client.getSync(face, publishedName);
- if (data == null) {
- logger.warn("Faled to retrieve published object: " + publishedName.toUri());
- return;
- }
-
- // get handler
- String typeAlias = getPublishedType(data);
- if (!types.containsKey(typeAlias)) {
- logger.warn("Type not found: " + typeAlias);
- return;
- }
- Class type = types.get(typeAlias);
- if (!handlers.containsKey(type)) {
- logger.warn("No handler found for type: " + typeAlias);
- return;
- }
-
- // build object
- Object publishedObject = null;
- // TODO parse object
-
- // call
- handlers.get(type).onPublish(type.cast(publishedObject));
- }
-
- /**
- * Handle registration failure;
- *
- * @param prefix
- */
- @Override
- public void onRegisterFailed(Name prefix) {
- logger.error("Failed to register: " + prefix.toUri());
- }
-
- /**
- * Check if an incoming interest is an alert notification to retrieve data
- *
- * @param interest
- * @return
- */
- protected boolean isAlert(Interest interest) {
- Component alertComponent = interest.getName().get(-2);
- return alertComponent == null || !alertComponent.equals(new Component("ALERT"));
- }
-
- /**
- * Parse an Interest to retrieve the remote name of the published object to
- * then request; the Interest must have the form
- * /this/published/channel/ALERT/[encoded name to request]
- *
- * @param interest
- * @return
- */
- protected Name getAlertName(Interest interest) {
- try {
- Name publishedName = new Name();
- publishedName.wireDecode(interest.getName().get(-1).getValue());
- return publishedName;
- } catch (EncodingException e) {
- logger.error("Failed to parse remote published name", e);
- return null;
- }
- }
-
- /**
- * Parse the published type from a Data packet; must be the second to last
- * component of the name
- *
- * @param data
- * @return
- */
- protected String getPublishedType(Data data) {
- return data.getName().get(-2).toEscapedString();
- }
-}
diff --git a/src/test/java/com/intel/jndn/utils/ClientTest.java b/src/test/java/com/intel/jndn/utils/ClientTest.java
index 62b67d5..787a395 100644
--- a/src/test/java/com/intel/jndn/utils/ClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/ClientTest.java
@@ -9,10 +9,6 @@
*/
package com.intel.jndn.utils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import com.intel.jndn.mock.MockTransport;
@@ -28,8 +24,7 @@
* @author Andrew Brown <andrew.brown@intel.com>
*/
public class ClientTest {
-
-
+
/**
* Setup logging
*/
@@ -58,8 +53,8 @@
/**
* Test retrieving data asynchronously
- *
- * @throws InterruptedException
+ *
+ * @throws InterruptedException
*/
@Test
public void testGetAsync() throws InterruptedException {
diff --git a/src/test/java/com/intel/jndn/utils/PublisherTest.java b/src/test/java/com/intel/jndn/utils/PublisherTest.java
deleted file mode 100644
index 3c780ef..0000000
--- a/src/test/java/com/intel/jndn/utils/PublisherTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * File name: PublisherTest.java
- *
- * Purpose:
- *
- * © Copyright Intel Corporation. All rights reserved.
- * Intel Corporation, 2200 Mission College Boulevard,
- * Santa Clara, CA 95052-8119, USA
- */
-package com.intel.jndn.utils;
-
-import com.intel.jndn.mock.MockTransport;
-import java.io.IOException;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.encoding.EncodingException;
-import net.named_data.jndn.encoding.tlv.Tlv;
-import net.named_data.jndn.encoding.tlv.TlvEncoder;
-import net.named_data.jndn.security.SecurityException;
-import net.named_data.jndn.util.Blob;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-/**
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class PublisherTest {
-
- /**
- * Test of publish method, of class Publisher.
- */
- @Test
- public void testPublishSubscribe() throws IOException, EncodingException, InterruptedException, SecurityException {
- MockTransport transport = new MockTransport();
- Face face = new Face(transport, null);
- final Counter counter = new Counter();
-
- // setup subscriber
- Subscriber subscriber = new Subscriber(face, new Name("/test/channel"));
- subscriber.addType("example", TestExample.class);
- subscriber.on(TestExample.class, new Subscriber.OnPublish<TestExample>() {
- @Override
- public void onPublish(TestExample publishedObject) {
- counter.inc();
- assertEquals(1, publishedObject.a);
- assertEquals(true, publishedObject.b);
- }
- });
-
- // setup publisher
- Publisher publisher = new Publisher(face, new Name("/test/channel"), false);
- publisher.addType("example", TestExample.class);
- publisher.publish(new TestExample(1, true));
-
- // process events
- while(counter.get() == 0){
- Thread.sleep(10);
- }
-
- // check
- assertEquals(1, counter.get());
- }
-
- public Data fakeManagementSuccess(Name forName, int statusCode, String statusText){
- TlvEncoder encoder = new TlvEncoder(1500);
- int saveLength = encoder.getLength();
-
- // encode backwards
- encoder.writeBlobTlv(Tlv.NfdCommand_StatusText, new Blob(statusText).buf());
- encoder.writeNonNegativeIntegerTlv(Tlv.NfdCommand_StatusCode, statusCode);
- encoder.writeTypeAndLength(Tlv.NfdCommand_ControlResponse, encoder.getLength() - saveLength);
- Blob content = new Blob(encoder.getOutput(), false);
-
- // now create data packet
- Data data = new Data(forName);
- data.setContent(content);
- return data;
- }
-
- class TestExample {
-
- int a;
- boolean b;
-
- public TestExample(int a, boolean b) {
- this.a = a;
- this.b = b;
- }
- }
-
- /**
- * Count reference
- */
- class Counter {
-
- int count = 0;
-
- public void inc() {
- count++;
- }
-
- public int get() {
- return count;
- }
- }
-
-// /**
-// * Test of clearOldPublished method, of class Publisher.
-// */
-// @Test
-// public void testClearOldPublished() {
-// System.out.println("clearOldPublished");
-// Publisher instance = null;
-// instance.clearOldPublished();
-// // TODO review the generated test code and remove the default call to fail.
-// fail("The test case is a prototype.");
-// }
-}