Switch to FutureData; add SegmentedClient
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 39c4ada..0ded051 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -11,15 +11,15 @@
package com.intel.jndn.utils;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.OnData;
import net.named_data.jndn.OnTimeout;
-import net.named_data.jndn.encoding.EncodingException;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import java.util.logging.Logger;
/**
* Provide a client to simplify information retrieval over the NDN network.
@@ -30,7 +30,7 @@
public static final long DEFAULT_SLEEP_TIME = 20;
public static final long DEFAULT_TIMEOUT = 2000;
- private static final Logger logger = LogManager.getLogger();
+ private static final Logger logger = Logger.getLogger(Client.class.getName());
private static Client defaultInstance;
/**
@@ -46,6 +46,52 @@
}
/**
+ * Asynchronously request the Data for an Interest. This will send the
+ * Interest and return immediately; use futureData.get() to block until the
+ * Data returns (see FutureData) or manage the event processing independently.
+ *
+ * @param face
+ * @param interest
+ * @return
+ */
+ public FutureData getAsync(Face face, Interest interest) {
+ final FutureData futureData = new FutureData(face, interest.getName());
+
+ // send interest
+ try {
+ face.expressInterest(interest, new OnData() {
+ @Override
+ public void onData(Interest interest, Data data) {
+ futureData.resolve(data);
+ }
+ }, new OnTimeout() {
+ @Override
+ public void onTimeout(Interest interest) {
+ futureData.reject(new TimeoutException());
+ }
+ });
+ } catch (IOException e) {
+ logger.warning("IO failure while sending interest: " + e);
+ futureData.reject(e);
+ }
+
+ return futureData;
+ }
+
+ /**
+ * Synchronously retrieve the Data for a Name using a default interest (e.g. 2
+ * second timeout); this will block until complete (i.e. either data is
+ * received or the interest times out).
+ *
+ * @param face
+ * @param name
+ * @return
+ */
+ public FutureData getAsync(Face face, Name name) {
+ return getAsync(face, getDefaultInterest(name));
+ }
+
+ /**
* Synchronously retrieve the Data for an Interest; this will block until
* complete (i.e. either data is received or the interest times out).
*
@@ -56,42 +102,19 @@
public Data getSync(Face face, Interest interest) {
// setup event
long startTime = System.currentTimeMillis();
- final NDNEvent event = new NDNEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
- // send interest
- try {
- face.expressInterest(interest, new OnData() {
- @Override
- public void onData(Interest interest, Data data) {
- event.fromPacket(data);
- }
- }, new OnTimeout() {
- @Override
- public void onTimeout(Interest interest) {
- event.fromPacket(new Exception("Interest timed out: " + interest.getName().toUri()));
- }
- });
- } catch (IOException e) {
- logger.warn("IO failure while sending interest.", e);
- return null;
- }
+ // get future data
+ FutureData futureData = getAsync(face, interest);
// process eventCount until a response is received or timeout
- while (event.getPacket() == null) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- return null;
- }
- sleep();
+ try {
+ Data data = futureData.get();
+ logger.fine("Request time (ms): " + (System.currentTimeMillis() - startTime));
+ return data;
+ } catch (ExecutionException | InterruptedException e) {
+ logger.warning("Failed to retrieve data: " + e);
+ return null;
}
-
- // return
- logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
- return (event.isSuccess()) ? (Data) event.getPacket() : null;
}
/**
@@ -108,99 +131,13 @@
}
/**
- * Asynchronously retrieve the Data for a given interest; use the returned
- NDNObserver to handle the Data when it arrives. For example (with lambdas):
- * <pre><code>
- * Client.getDefault().get(face, interest).then((event) -> doSomething(event));
- * </code></pre>
- *
- * If you want to block until the response returns, try something like:
- * <pre><code>
- NDNObserver observer = Client.getDefault().get(face, interest);
- while(observer.eventCount() == 0){
- Thread.sleep(50);
- }
- doSomething(observer.getFirst());
- </code></pre>
- *
- * @param face
- * @param interest
- * @return
- */
- public NDNObserver get(final Face face, final Interest interest) {
- // setup observer
- final NDNObserver observer = new NDNObserver();
- final NDNObservable eventHandler = new NDNObservable();
- eventHandler.addObserver(observer);
-
- // setup background thread
- Thread backgroundThread = new Thread(new Runnable() {
- @Override
- public void run() {
- // send interest
- try {
- face.expressInterest(interest, eventHandler, eventHandler);
- } catch (IOException e) {
- logger.warn("IO failure while sending interest.", e);
- eventHandler.notify(e);
- }
-
- // process eventCount until a response is received or timeout
- while (observer.dataCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- eventHandler.notify(e);
- }
- sleep();
- }
-
- // finished
- logger.trace("Received response; stopping thread.");
- }
- });
- backgroundThread.setName(String.format("Client.get(%s)", interest.getName().toUri()));
- backgroundThread.setDaemon(true);
- backgroundThread.start();
-
- // return
- return observer;
- }
-
- /**
- * Asynchronously retrieve the Data for a Name using default Interest
- * parameters; see get(Face, Interest) for examples.
- *
- * @param face
- * @param name
- * @return
- */
- public NDNObserver get(Face face, Name name) {
- return get(face, getDefaultInterest(name));
- }
-
- /**
- * Put the current thread to sleep to allow time for IO
- */
- protected void sleep() {
- try {
- Thread.currentThread().sleep(DEFAULT_SLEEP_TIME);
- } catch (InterruptedException e) {
- logger.error("Event loop interrupted.", e);
- }
- }
-
- /**
* Create a default interest for a given Name using some common settings: -
* lifetime: 2 seconds
*
* @param name
* @return
*/
- public Interest getDefaultInterest(Name name) {
+ public static Interest getDefaultInterest(Name name) {
Interest interest = new Interest(name, DEFAULT_TIMEOUT);
return interest;
}