Add server capabilities, refactor observer pattern
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 3425950..39c4ada 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -13,15 +13,11 @@
import java.io.IOException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
-import net.named_data.jndn.ForwardingFlags;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.OnData;
-import net.named_data.jndn.OnInterest;
-import net.named_data.jndn.OnRegisterFailed;
import net.named_data.jndn.OnTimeout;
import net.named_data.jndn.encoding.EncodingException;
-import net.named_data.jndn.transport.Transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -60,7 +56,7 @@
public Data getSync(Face face, Interest interest) {
// setup event
long startTime = System.currentTimeMillis();
- final ClientEvent event = new ClientEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
+ final NDNEvent event = new NDNEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
// send interest
try {
@@ -113,28 +109,28 @@
/**
* Asynchronously retrieve the Data for a given interest; use the returned
- * ClientObserver to handle the Data when it arrives. For example (with lambdas):
+ NDNObserver to handle the Data when it arrives. For example (with lambdas):
* <pre><code>
* Client.getDefault().get(face, interest).then((event) -> doSomething(event));
* </code></pre>
*
* If you want to block until the response returns, try something like:
* <pre><code>
- * ClientObserver observer = Client.getDefault().get(face, interest);
- * while(observer.eventCount() == 0){
- * Thread.sleep(50);
- * }
- * doSomething(observer.getFirst());
- * </code></pre>
+ NDNObserver observer = Client.getDefault().get(face, interest);
+ while(observer.eventCount() == 0){
+ Thread.sleep(50);
+ }
+ doSomething(observer.getFirst());
+ </code></pre>
*
* @param face
* @param interest
* @return
*/
- public ClientObserver get(final Face face, final Interest interest) {
+ public NDNObserver get(final Face face, final Interest interest) {
// setup observer
- final ClientObserver observer = new ClientObserver();
- final ClientObservable eventHandler = new ClientObservable();
+ final NDNObserver observer = new NDNObserver();
+ final NDNObservable eventHandler = new NDNObservable();
eventHandler.addObserver(observer);
// setup background thread
@@ -182,151 +178,11 @@
* @param name
* @return
*/
- public ClientObserver get(Face face, Name name) {
+ public NDNObserver get(Face face, Name name) {
return get(face, getDefaultInterest(name));
}
/**
- * Synchronously serve a Data on the given face until one request accesses the
- * data; will return incoming Interest request. E.g.: Interest request =
- * Client.putSync(face, data);
- *
- * @param face
- * @param data
- * @return
- */
- public Interest putSync(Face face, final Data data) {
- // setup event
- long startTime = System.currentTimeMillis();
- final String dataName = data.getName().toUri();
- final ClientEvent event = new ClientEvent();
-
- // setup flags
- ForwardingFlags flags = new ForwardingFlags();
- flags.setCapture(true);
-
- // register the data name on the face
- try {
- face.registerPrefix(data.getName(), new OnInterest() {
- @Override
- public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
- try {
- transport.send(data.wireEncode().buf());
- logger.debug("Sent data: " + dataName);
- event.fromPacket(interest);
- } catch (IOException e) {
- logger.error("Failed to send data for: " + dataName);
- event.fromPacket(e);
- }
- }
- }, new OnRegisterFailed() {
- @Override
- public void onRegisterFailed(Name prefix) {
- event.fromPacket(new Exception("Failed to register name: " + dataName));
- }
- }, flags);
- logger.info("Registered data: " + dataName);
- } catch (IOException e) {
- logger.error("Could not connect to face to register prefix: " + dataName, e);
- event.fromPacket(e);
- } catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Error registering prefix: " + dataName, e);
- event.fromPacket(e);
- }
-
- // process eventCount until one response is sent or error
- while (event.getPacket() == null) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- event.fromPacket(e);
- }
- sleep();
- }
-
- // return
- logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
- return (event.isSuccess()) ? (Interest) event.getPacket() : null;
- }
-
- /**
- * Asynchronously serve a Data on the given face until an observer stops it.
- * E.g.: ClientObserver observer = Client.put(face, data); // when finished
- * serving the data, stop the background thread observer.stop();
- *
- * @param face
- * @param data
- * @return
- */
- public ClientObserver put(final Face face, final Data data) {
- // setup observer
- final ClientObserver observer = new ClientObserver();
- final ClientObservable eventHandler = new ClientObservable();
- eventHandler.addObserver(observer);
-
- // setup handlers
- final OnInterest interestHandler = new OnInterest() {
- @Override
- public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
- try {
- transport.send(data.wireEncode().buf());
- } catch (IOException e) {
- logger.error("Failed to send data for: " + data.getName().toUri());
- eventHandler.onError(e);
- }
- }
- };
- final OnRegisterFailed failureHandler = new OnRegisterFailed() {
- @Override
- public void onRegisterFailed(Name prefix) {
- logger.error("Failed to register name to put: " + data.getName().toUri());
- eventHandler.onError(new Exception("Failed to register name to put: " + data.getName().toUri()));
- }
- };
- final ForwardingFlags flags = new ForwardingFlags();
- flags.setCapture(true);
-
- // setup background thread
- Thread backgroundThread = new Thread(new Runnable() {
- @Override
- public void run() {
- // register name on the face
- try {
- face.registerPrefix(data.getName(), interestHandler, failureHandler, flags);
- logger.info("Registered data : " + data.getName().toUri());
- } catch (IOException e) {
- logger.error("Could not connect to face to register prefix: " + data.getName().toUri(), e);
- eventHandler.onError(e);
- } catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Error registering prefix: " + data.getName().toUri(), e);
- eventHandler.onError(e);
- }
-
- // process eventCount until a request is received
- while (observer.interestCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- observer.update(eventHandler, new ClientEvent());
- }
- sleep();
- }
- }
- });
- backgroundThread.setName(String.format("Client.put(%s)", data.getName().toUri()));
- backgroundThread.setDaemon(true);
- backgroundThread.start();
-
- return observer;
- }
-
- /**
* Put the current thread to sleep to allow time for IO
*/
protected void sleep() {