Add server capabilities, refactor observer pattern
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 3425950..39c4ada 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -13,15 +13,11 @@
import java.io.IOException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
-import net.named_data.jndn.ForwardingFlags;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.OnData;
-import net.named_data.jndn.OnInterest;
-import net.named_data.jndn.OnRegisterFailed;
import net.named_data.jndn.OnTimeout;
import net.named_data.jndn.encoding.EncodingException;
-import net.named_data.jndn.transport.Transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -60,7 +56,7 @@
public Data getSync(Face face, Interest interest) {
// setup event
long startTime = System.currentTimeMillis();
- final ClientEvent event = new ClientEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
+ final NDNEvent event = new NDNEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
// send interest
try {
@@ -113,28 +109,28 @@
/**
* Asynchronously retrieve the Data for a given interest; use the returned
- * ClientObserver to handle the Data when it arrives. For example (with lambdas):
+ NDNObserver to handle the Data when it arrives. For example (with lambdas):
* <pre><code>
* Client.getDefault().get(face, interest).then((event) -> doSomething(event));
* </code></pre>
*
* If you want to block until the response returns, try something like:
* <pre><code>
- * ClientObserver observer = Client.getDefault().get(face, interest);
- * while(observer.eventCount() == 0){
- * Thread.sleep(50);
- * }
- * doSomething(observer.getFirst());
- * </code></pre>
+ NDNObserver observer = Client.getDefault().get(face, interest);
+ while(observer.eventCount() == 0){
+ Thread.sleep(50);
+ }
+ doSomething(observer.getFirst());
+ </code></pre>
*
* @param face
* @param interest
* @return
*/
- public ClientObserver get(final Face face, final Interest interest) {
+ public NDNObserver get(final Face face, final Interest interest) {
// setup observer
- final ClientObserver observer = new ClientObserver();
- final ClientObservable eventHandler = new ClientObservable();
+ final NDNObserver observer = new NDNObserver();
+ final NDNObservable eventHandler = new NDNObservable();
eventHandler.addObserver(observer);
// setup background thread
@@ -182,151 +178,11 @@
* @param name
* @return
*/
- public ClientObserver get(Face face, Name name) {
+ public NDNObserver get(Face face, Name name) {
return get(face, getDefaultInterest(name));
}
/**
- * Synchronously serve a Data on the given face until one request accesses the
- * data; will return incoming Interest request. E.g.: Interest request =
- * Client.putSync(face, data);
- *
- * @param face
- * @param data
- * @return
- */
- public Interest putSync(Face face, final Data data) {
- // setup event
- long startTime = System.currentTimeMillis();
- final String dataName = data.getName().toUri();
- final ClientEvent event = new ClientEvent();
-
- // setup flags
- ForwardingFlags flags = new ForwardingFlags();
- flags.setCapture(true);
-
- // register the data name on the face
- try {
- face.registerPrefix(data.getName(), new OnInterest() {
- @Override
- public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
- try {
- transport.send(data.wireEncode().buf());
- logger.debug("Sent data: " + dataName);
- event.fromPacket(interest);
- } catch (IOException e) {
- logger.error("Failed to send data for: " + dataName);
- event.fromPacket(e);
- }
- }
- }, new OnRegisterFailed() {
- @Override
- public void onRegisterFailed(Name prefix) {
- event.fromPacket(new Exception("Failed to register name: " + dataName));
- }
- }, flags);
- logger.info("Registered data: " + dataName);
- } catch (IOException e) {
- logger.error("Could not connect to face to register prefix: " + dataName, e);
- event.fromPacket(e);
- } catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Error registering prefix: " + dataName, e);
- event.fromPacket(e);
- }
-
- // process eventCount until one response is sent or error
- while (event.getPacket() == null) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- event.fromPacket(e);
- }
- sleep();
- }
-
- // return
- logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
- return (event.isSuccess()) ? (Interest) event.getPacket() : null;
- }
-
- /**
- * Asynchronously serve a Data on the given face until an observer stops it.
- * E.g.: ClientObserver observer = Client.put(face, data); // when finished
- * serving the data, stop the background thread observer.stop();
- *
- * @param face
- * @param data
- * @return
- */
- public ClientObserver put(final Face face, final Data data) {
- // setup observer
- final ClientObserver observer = new ClientObserver();
- final ClientObservable eventHandler = new ClientObservable();
- eventHandler.addObserver(observer);
-
- // setup handlers
- final OnInterest interestHandler = new OnInterest() {
- @Override
- public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
- try {
- transport.send(data.wireEncode().buf());
- } catch (IOException e) {
- logger.error("Failed to send data for: " + data.getName().toUri());
- eventHandler.onError(e);
- }
- }
- };
- final OnRegisterFailed failureHandler = new OnRegisterFailed() {
- @Override
- public void onRegisterFailed(Name prefix) {
- logger.error("Failed to register name to put: " + data.getName().toUri());
- eventHandler.onError(new Exception("Failed to register name to put: " + data.getName().toUri()));
- }
- };
- final ForwardingFlags flags = new ForwardingFlags();
- flags.setCapture(true);
-
- // setup background thread
- Thread backgroundThread = new Thread(new Runnable() {
- @Override
- public void run() {
- // register name on the face
- try {
- face.registerPrefix(data.getName(), interestHandler, failureHandler, flags);
- logger.info("Registered data : " + data.getName().toUri());
- } catch (IOException e) {
- logger.error("Could not connect to face to register prefix: " + data.getName().toUri(), e);
- eventHandler.onError(e);
- } catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Error registering prefix: " + data.getName().toUri(), e);
- eventHandler.onError(e);
- }
-
- // process eventCount until a request is received
- while (observer.interestCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- observer.update(eventHandler, new ClientEvent());
- }
- sleep();
- }
- }
- });
- backgroundThread.setName(String.format("Client.put(%s)", data.getName().toUri()));
- backgroundThread.setDaemon(true);
- backgroundThread.start();
-
- return observer;
- }
-
- /**
* Put the current thread to sleep to allow time for IO
*/
protected void sleep() {
diff --git a/src/main/java/com/intel/jndn/utils/ClientEvent.java b/src/main/java/com/intel/jndn/utils/NDNEvent.java
similarity index 88%
rename from src/main/java/com/intel/jndn/utils/ClientEvent.java
rename to src/main/java/com/intel/jndn/utils/NDNEvent.java
index a125286..5316ae1 100644
--- a/src/main/java/com/intel/jndn/utils/ClientEvent.java
+++ b/src/main/java/com/intel/jndn/utils/NDNEvent.java
@@ -1,5 +1,5 @@
/*
- * File name: ClientEvent.java
+ * File name: NDNEvent.java
*
* Purpose: Signals a Client event for observers to act on
*
@@ -10,10 +10,10 @@
package com.intel.jndn.utils;
/**
- * Signals a Client event for observers to act on
+ * Signals an event (from Client or Server) for observers to act on
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class ClientEvent<T> {
+public class NDNEvent<T> {
protected boolean success;
protected long timestamp;
@@ -22,7 +22,7 @@
/**
* Constructor
*/
- public ClientEvent() {
+ public NDNEvent() {
timestamp = System.currentTimeMillis();
success = false; // an event without a packet is a failed event
}
@@ -32,7 +32,7 @@
*
* @param packet
*/
- public ClientEvent(T packet) {
+ public NDNEvent(T packet) {
fromPacket(packet);
}
diff --git a/src/main/java/com/intel/jndn/utils/ClientObservable.java b/src/main/java/com/intel/jndn/utils/NDNObservable.java
similarity index 91%
rename from src/main/java/com/intel/jndn/utils/ClientObservable.java
rename to src/main/java/com/intel/jndn/utils/NDNObservable.java
index 0a4b341..003fe45 100644
--- a/src/main/java/com/intel/jndn/utils/ClientObservable.java
+++ b/src/main/java/com/intel/jndn/utils/NDNObservable.java
@@ -1,5 +1,5 @@
/*
- * File name: ClientObservable.java
+ * File name: NDNObservable.java
*
* Purpose:
*
@@ -24,9 +24,9 @@
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class ClientObservable extends Observable implements OnData, OnTimeout, OnInterest {
+public class NDNObservable extends Observable implements OnData, OnTimeout, OnInterest {
- protected List<ClientEvent> events = new ArrayList<>();
+ protected List<NDNEvent> events = new ArrayList<>();
protected List<Interest> incomingInterestPackets = new ArrayList<>();
protected List<Data> incomingDataPackets;
@@ -38,7 +38,7 @@
*/
public <T> void notify(T packet) {
setChanged();
- notifyObservers(new ClientEvent(packet));
+ notifyObservers(new NDNEvent(packet));
}
/**
diff --git a/src/main/java/com/intel/jndn/utils/ClientObserver.java b/src/main/java/com/intel/jndn/utils/NDNObserver.java
similarity index 84%
rename from src/main/java/com/intel/jndn/utils/ClientObserver.java
rename to src/main/java/com/intel/jndn/utils/NDNObserver.java
index 9bfdd7e..5b9ddd9 100644
--- a/src/main/java/com/intel/jndn/utils/ClientObserver.java
+++ b/src/main/java/com/intel/jndn/utils/NDNObserver.java
@@ -1,7 +1,7 @@
/*
- * File name: ClientObserver.java
+ * File name: NDNObserver.java
*
- * Purpose: Track asynchronous events from Client and provide simplified API
+ * Purpose: Track asynchronous events from Client and Server
*
* © Copyright Intel Corporation. All rights reserved.
* Intel Corporation, 2200 Mission College Boulevard,
@@ -15,15 +15,14 @@
import java.util.Observer;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
-import net.named_data.jndn.OnData;
/**
- *
+ * Track asynchronous events from Client and Server
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class ClientObserver implements Observer {
+public class NDNObserver implements Observer {
- protected List<ClientEvent> events = new ArrayList<>();
+ protected List<NDNEvent> events = new ArrayList<>();
protected long timestamp;
protected OnEvent then;
protected boolean stopThread;
@@ -31,7 +30,7 @@
/**
* Constructor
*/
- public ClientObserver() {
+ public NDNObserver() {
timestamp = System.currentTimeMillis();
}
@@ -43,7 +42,7 @@
*/
@Override
public void update(Observable o, Object arg) {
- ClientEvent event = (ClientEvent) arg;
+ NDNEvent event = (NDNEvent) arg;
events.add(event);
// call onData callbacks
if (Data.class.isInstance(event.packet) && then != null) {
@@ -57,7 +56,7 @@
* @param callback
* @return
*/
- public ClientObserver then(OnEvent callback) {
+ public NDNObserver then(OnEvent callback) {
then = callback;
return this;
}
@@ -106,7 +105,7 @@
*/
public int count(Class type) {
int count = 0;
- for (ClientEvent event : events) {
+ for (NDNEvent event : events) {
if (type.isInstance(event.packet)) {
count++;
}
@@ -132,7 +131,7 @@
*
* @return event or null
*/
- public List<ClientEvent> getEvents() {
+ public List<NDNEvent> getEvents() {
return events;
}
@@ -141,7 +140,7 @@
*
* @return event or null
*/
- public ClientEvent getFirst() {
+ public NDNEvent getFirst() {
if (events.size() > 0) {
return events.get(0);
}
@@ -153,7 +152,7 @@
*
* @return event or null
*/
- public ClientEvent getLast() {
+ public NDNEvent getLast() {
if (events.size() > 0) {
return events.get(events.size() - 1);
}
diff --git a/src/main/java/com/intel/jndn/utils/OnEvent.java b/src/main/java/com/intel/jndn/utils/OnEvent.java
index e9c7f67..62bdd56 100644
--- a/src/main/java/com/intel/jndn/utils/OnEvent.java
+++ b/src/main/java/com/intel/jndn/utils/OnEvent.java
@@ -16,5 +16,5 @@
* @author Andrew Brown <andrew.brown@intel.com>
*/
public interface OnEvent {
- public void onEvent(ClientEvent event);
+ public void onEvent(NDNEvent event);
}
diff --git a/src/main/java/com/intel/jndn/utils/OnServeInterest.java b/src/main/java/com/intel/jndn/utils/OnServeInterest.java
new file mode 100644
index 0000000..0f235d6
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/OnServeInterest.java
@@ -0,0 +1,22 @@
+/*
+ * File name: OnServeInterest.java
+ *
+ * Purpose: Functional interface for serving data from Server.on()
+ *
+ * © Copyright Intel Corporation. All rights reserved.
+ * Intel Corporation, 2200 Mission College Boulevard,
+ * Santa Clara, CA 95052-8119, USA
+ */
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+/**
+ * Functional interface for serving data from Server.on()
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface OnServeInterest {
+ public Data onInterest(Name prefix, Interest interest);
+}
diff --git a/src/main/java/com/intel/jndn/utils/Server.java b/src/main/java/com/intel/jndn/utils/Server.java
new file mode 100644
index 0000000..d332acf
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Server.java
@@ -0,0 +1,283 @@
+/*
+ * File name: Server.java
+ *
+ * Purpose: Provide a server to simplify serving data over the NDN
+ * network.
+ *
+ * © Copyright Intel Corporation. All rights reserved.
+ * Intel Corporation, 2200 Mission College Boulevard,
+ * Santa Clara, CA 95052-8119, USA
+ */
+package com.intel.jndn.utils;
+
+import java.io.IOException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.ForwardingFlags;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterest;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.transport.Transport;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Provide a server to simplify serving data over the NDN network. Exposes two
+ * main methods: put() for serving static, known data packets and on() for
+ * serving dynamically created packets on-demand.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class Server {
+
+ public static final long DEFAULT_SLEEP_TIME = 20;
+ public static final long DEFAULT_TIMEOUT = 2000;
+ private static final Logger logger = LogManager.getLogger();
+ private static Server defaultInstance;
+
+ /**
+ * Singleton access for simpler server use
+ *
+ * @return
+ */
+ public static Server getDefault() {
+ if (defaultInstance == null) {
+ defaultInstance = new Server();
+ }
+ return defaultInstance;
+ }
+
+ /**
+ * Synchronously serve a Data on the given face until one request accesses the
+ * data; will return incoming Interest request.
+ * <pre><code> Interest request = Client.putSync(face, data); </code></pre>
+ *
+ * @param face
+ * @param data
+ * @return
+ */
+ public Interest putSync(Face face, final Data data) {
+ // setup event
+ long startTime = System.currentTimeMillis();
+ final String dataName = data.getName().toUri();
+ final NDNEvent event = new NDNEvent();
+
+ // setup flags
+ ForwardingFlags flags = new ForwardingFlags();
+ flags.setCapture(true);
+
+ // register the data name on the face
+ try {
+ face.registerPrefix(data.getName(), new OnInterest() {
+ @Override
+ public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
+ try {
+ transport.send(data.wireEncode().buf());
+ logger.debug("Sent data: " + dataName);
+ event.fromPacket(interest);
+ } catch (IOException e) {
+ logger.error("Failed to send data for: " + dataName);
+ event.fromPacket(e);
+ }
+ }
+ }, new OnRegisterFailed() {
+ @Override
+ public void onRegisterFailed(Name prefix) {
+ event.fromPacket(new Exception("Failed to register name: " + dataName));
+ }
+ }, flags);
+ logger.info("Registered data: " + dataName);
+ } catch (IOException e) {
+ logger.error("Could not connect to face to register prefix: " + dataName, e);
+ event.fromPacket(e);
+ } catch (net.named_data.jndn.security.SecurityException e) {
+ logger.error("Error registering prefix: " + dataName, e);
+ event.fromPacket(e);
+ }
+
+ // process eventCount until one response is sent or error
+ while (event.getPacket() == null) {
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (IOException | EncodingException e) {
+ logger.warn("Failed to process events.", e);
+ event.fromPacket(e);
+ }
+ sleep();
+ }
+
+ // return
+ logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
+ return (event.isSuccess()) ? (Interest) event.getPacket() : null;
+ }
+
+ /**
+ * Asynchronously serve a Data on the given face until an observer stops it.
+ * E.g.: NDNObserver observer = Client.put(face, data); // when finished
+ serving the data, stop the background thread observer.stop();
+ *
+ * @param face
+ * @param data
+ * @return
+ */
+ public NDNObserver put(final Face face, final Data data) {
+ // setup observer
+ final NDNObserver observer = new NDNObserver();
+ final NDNObservable eventHandler = new NDNObservable();
+ eventHandler.addObserver(observer);
+
+ // setup handlers
+ final OnInterest interestHandler = new OnInterest() {
+ @Override
+ public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
+ try {
+ transport.send(data.wireEncode().buf());
+ } catch (IOException e) {
+ logger.error("Failed to send data for: " + data.getName().toUri());
+ eventHandler.notify(e);
+ }
+ }
+ };
+ final OnRegisterFailed failureHandler = new OnRegisterFailed() {
+ @Override
+ public void onRegisterFailed(Name prefix) {
+ logger.error("Failed to register name to put: " + data.getName().toUri());
+ eventHandler.notify(new Exception("Failed to register name to put: " + data.getName().toUri()));
+ }
+ };
+ final ForwardingFlags flags = new ForwardingFlags();
+ flags.setCapture(true); // no shorter routes will answer for this prefix, see http://redmine.named-data.net/projects/nfd/wiki/RibMgmt#Route-inheritance
+ flags.setChildInherit(false); // the interest name must be exact, no child components after the prefix
+
+ // setup background thread
+ Thread backgroundThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // register name on the face
+ try {
+ face.registerPrefix(data.getName(), interestHandler, failureHandler, flags);
+ logger.info("Registered data : " + data.getName().toUri());
+ } catch (IOException e) {
+ logger.error("Could not connect to face to register prefix: " + data.getName().toUri(), e);
+ eventHandler.notify(e);
+ } catch (net.named_data.jndn.security.SecurityException e) {
+ logger.error("Error registering prefix: " + data.getName().toUri(), e);
+ eventHandler.notify(e);
+ }
+
+ // process eventCount until a request is received
+ while (observer.interestCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (IOException | EncodingException e) {
+ logger.warn("Failed to process events.", e);
+ eventHandler.notify(e);
+ }
+ sleep();
+ }
+ }
+ });
+ backgroundThread.setName(String.format("Server.put(%s)", data.getName().toUri()));
+ backgroundThread.setDaemon(true);
+ backgroundThread.start();
+
+ return observer;
+ }
+
+ /**
+ * Register a prefix on the face to serve Data packets for incoming Interests.
+ * This method will create a background thread to process events until
+ * the user calls stop() on the returned observer
+ *
+ * @param face
+ * @param prefix
+ * @param handler
+ * @return
+ */
+ public NDNObserver on(final Face face, final Name prefix, final OnServeInterest handler) {
+ // setup observer
+ final NDNObserver observer = new NDNObserver();
+ final NDNObservable eventHandler = new NDNObservable();
+ eventHandler.addObserver(observer);
+
+ // setup handlers
+ final OnInterest interestHandler = new OnInterest() {
+ @Override
+ public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
+ eventHandler.notify(interest);
+ try {
+ Data data = handler.onInterest(prefix, interest);
+ // TODO do signing here?
+ transport.send(data.wireEncode().buf());
+ } catch (IOException e) {
+ logger.error("Failed to send data for: " + prefix.toUri());
+ eventHandler.notify(e);
+ }
+ }
+ };
+ final OnRegisterFailed failureHandler = new OnRegisterFailed() {
+ @Override
+ public void onRegisterFailed(Name prefix) {
+ logger.error("Failed to register name to put: " + prefix.toUri());
+ eventHandler.notify(new Exception("Failed to register name to put: " + prefix.toUri()));
+ }
+ };
+ final ForwardingFlags flags = new ForwardingFlags();
+ flags.setCapture(true); // no shorter routes will answer for this prefix, see http://redmine.named-data.net/projects/nfd/wiki/RibMgmt#Route-inheritance
+ flags.setChildInherit(true); // the interest name may have child components after the prefix
+
+ // setup background thread
+ Thread backgroundThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // register name on the face
+ try {
+ face.registerPrefix(prefix, interestHandler, failureHandler, flags);
+ logger.info("Registered data : " + prefix.toUri());
+ } catch (IOException e) {
+ logger.error("Could not connect to face to register prefix: " + prefix.toUri(), e);
+ eventHandler.notify(e);
+ } catch (net.named_data.jndn.security.SecurityException e) {
+ logger.error("Error registering prefix: " + prefix.toUri(), e);
+ eventHandler.notify(e);
+ }
+
+ // process events until told to stop or error bubbles up
+ while (observer.errorCount() == 0 && !observer.mustStop()) {
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (IOException | EncodingException e) {
+ logger.warn("Failed to process events.", e);
+ eventHandler.notify(e);
+ }
+ sleep();
+ }
+ }
+ });
+ backgroundThread.setName(String.format("Client.put(%s)", prefix.toUri()));
+ backgroundThread.setDaemon(true);
+ backgroundThread.start();
+
+ return observer;
+ }
+
+ /**
+ * Put the current thread to sleep to allow time for IO
+ */
+ protected void sleep() {
+ try {
+ Thread.currentThread().sleep(DEFAULT_SLEEP_TIME);
+ } catch (InterruptedException e) {
+ logger.error("Event loop interrupted.", e);
+ }
+ }
+
+}