Switch to FutureData; add SegmentedClient
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 39c4ada..0ded051 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -11,15 +11,15 @@
package com.intel.jndn.utils;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.OnData;
import net.named_data.jndn.OnTimeout;
-import net.named_data.jndn.encoding.EncodingException;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import java.util.logging.Logger;
/**
* Provide a client to simplify information retrieval over the NDN network.
@@ -30,7 +30,7 @@
public static final long DEFAULT_SLEEP_TIME = 20;
public static final long DEFAULT_TIMEOUT = 2000;
- private static final Logger logger = LogManager.getLogger();
+ private static final Logger logger = Logger.getLogger(Client.class.getName());
private static Client defaultInstance;
/**
@@ -46,6 +46,52 @@
}
/**
+ * Asynchronously request the Data for an Interest. This will send the
+ * Interest and return immediately; use futureData.get() to block until the
+ * Data returns (see FutureData) or manage the event processing independently.
+ *
+ * @param face
+ * @param interest
+ * @return
+ */
+ public FutureData getAsync(Face face, Interest interest) {
+ final FutureData futureData = new FutureData(face, interest.getName());
+
+ // send interest
+ try {
+ face.expressInterest(interest, new OnData() {
+ @Override
+ public void onData(Interest interest, Data data) {
+ futureData.resolve(data);
+ }
+ }, new OnTimeout() {
+ @Override
+ public void onTimeout(Interest interest) {
+ futureData.reject(new TimeoutException());
+ }
+ });
+ } catch (IOException e) {
+ logger.warning("IO failure while sending interest: " + e);
+ futureData.reject(e);
+ }
+
+ return futureData;
+ }
+
+ /**
+ * Synchronously retrieve the Data for a Name using a default interest (e.g. 2
+ * second timeout); this will block until complete (i.e. either data is
+ * received or the interest times out).
+ *
+ * @param face
+ * @param name
+ * @return
+ */
+ public FutureData getAsync(Face face, Name name) {
+ return getAsync(face, getDefaultInterest(name));
+ }
+
+ /**
* Synchronously retrieve the Data for an Interest; this will block until
* complete (i.e. either data is received or the interest times out).
*
@@ -56,42 +102,19 @@
public Data getSync(Face face, Interest interest) {
// setup event
long startTime = System.currentTimeMillis();
- final NDNEvent event = new NDNEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
- // send interest
- try {
- face.expressInterest(interest, new OnData() {
- @Override
- public void onData(Interest interest, Data data) {
- event.fromPacket(data);
- }
- }, new OnTimeout() {
- @Override
- public void onTimeout(Interest interest) {
- event.fromPacket(new Exception("Interest timed out: " + interest.getName().toUri()));
- }
- });
- } catch (IOException e) {
- logger.warn("IO failure while sending interest.", e);
- return null;
- }
+ // get future data
+ FutureData futureData = getAsync(face, interest);
// process eventCount until a response is received or timeout
- while (event.getPacket() == null) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- return null;
- }
- sleep();
+ try {
+ Data data = futureData.get();
+ logger.fine("Request time (ms): " + (System.currentTimeMillis() - startTime));
+ return data;
+ } catch (ExecutionException | InterruptedException e) {
+ logger.warning("Failed to retrieve data: " + e);
+ return null;
}
-
- // return
- logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
- return (event.isSuccess()) ? (Data) event.getPacket() : null;
}
/**
@@ -108,99 +131,13 @@
}
/**
- * Asynchronously retrieve the Data for a given interest; use the returned
- NDNObserver to handle the Data when it arrives. For example (with lambdas):
- * <pre><code>
- * Client.getDefault().get(face, interest).then((event) -> doSomething(event));
- * </code></pre>
- *
- * If you want to block until the response returns, try something like:
- * <pre><code>
- NDNObserver observer = Client.getDefault().get(face, interest);
- while(observer.eventCount() == 0){
- Thread.sleep(50);
- }
- doSomething(observer.getFirst());
- </code></pre>
- *
- * @param face
- * @param interest
- * @return
- */
- public NDNObserver get(final Face face, final Interest interest) {
- // setup observer
- final NDNObserver observer = new NDNObserver();
- final NDNObservable eventHandler = new NDNObservable();
- eventHandler.addObserver(observer);
-
- // setup background thread
- Thread backgroundThread = new Thread(new Runnable() {
- @Override
- public void run() {
- // send interest
- try {
- face.expressInterest(interest, eventHandler, eventHandler);
- } catch (IOException e) {
- logger.warn("IO failure while sending interest.", e);
- eventHandler.notify(e);
- }
-
- // process eventCount until a response is received or timeout
- while (observer.dataCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
- eventHandler.notify(e);
- }
- sleep();
- }
-
- // finished
- logger.trace("Received response; stopping thread.");
- }
- });
- backgroundThread.setName(String.format("Client.get(%s)", interest.getName().toUri()));
- backgroundThread.setDaemon(true);
- backgroundThread.start();
-
- // return
- return observer;
- }
-
- /**
- * Asynchronously retrieve the Data for a Name using default Interest
- * parameters; see get(Face, Interest) for examples.
- *
- * @param face
- * @param name
- * @return
- */
- public NDNObserver get(Face face, Name name) {
- return get(face, getDefaultInterest(name));
- }
-
- /**
- * Put the current thread to sleep to allow time for IO
- */
- protected void sleep() {
- try {
- Thread.currentThread().sleep(DEFAULT_SLEEP_TIME);
- } catch (InterruptedException e) {
- logger.error("Event loop interrupted.", e);
- }
- }
-
- /**
* Create a default interest for a given Name using some common settings: -
* lifetime: 2 seconds
*
* @param name
* @return
*/
- public Interest getDefaultInterest(Name name) {
+ public static Interest getDefaultInterest(Name name) {
Interest interest = new Interest(name, DEFAULT_TIMEOUT);
return interest;
}
diff --git a/src/main/java/com/intel/jndn/utils/FutureData.java b/src/main/java/com/intel/jndn/utils/FutureData.java
new file mode 100644
index 0000000..5c881ee
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/FutureData.java
@@ -0,0 +1,183 @@
+/*
+ * File name: FuturePacket.java
+ *
+ * Purpose: Reference to a Packet that has yet to be returned from the network.
+ *
+ * © Copyright Intel Corporation. All rights reserved.
+ * Intel Corporation, 2200 Mission College Boulevard,
+ * Santa Clara, CA 95052-8119, USA
+ */
+package com.intel.jndn.utils;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+
+/**
+ * Reference to a Packet that has yet to be returned from the network; see use
+ * in WindowBuffer.java. Usage:
+ * <pre><code>
+ * FuturePacket futurePacket = new FuturePacket(face);
+ * face.expressInterest(interest, new OnData(){
+ * ... futurePacket.resolve(data); ...
+ * }, new OnTimeout(){
+ * ... futurePacket.reject(new TimeoutException());
+ * });
+ * Packet resolvedPacket = futurePacket.get(); // will block and call face.processEvents() until complete
+ * </code></pre>
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class FutureData implements Future<Data> {
+
+ private final Face face;
+ private final Name name;
+ private Data data;
+ private boolean cancelled = false;
+ private Throwable error;
+
+ /**
+ * Constructor
+ *
+ * @param face
+ * @param name
+ */
+ public FutureData(Face face, Name name) {
+ this.face = face;
+ this.name = new Name(name);
+ }
+
+ /**
+ * Get the packet interest name
+ *
+ * @return
+ */
+ public Name getName() {
+ return name;
+ }
+
+ /**
+ * Cancel the current request.
+ *
+ * @param mayInterruptIfRunning
+ * @return
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ cancelled = true;
+ return cancelled;
+ }
+
+ /**
+ * Determine if this request is cancelled.
+ *
+ * @return
+ */
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ /**
+ * Determine if the request has completed (successfully or not).
+ *
+ * @return
+ */
+ @Override
+ public boolean isDone() {
+ return data != null || error != null;
+ }
+
+ /**
+ * Set the packet when successfully retrieved; unblocks get().
+ *
+ * @param d
+ */
+ public void resolve(Data d) {
+ data = d;
+ }
+
+ /**
+ * Set the exception when request failed; unblocks get().
+ *
+ * @param e
+ */
+ public void reject(Throwable e) {
+ error = e;
+ }
+
+ /**
+ * Block until packet is retrieved.
+ *
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Override
+ public Data get() throws InterruptedException, ExecutionException {
+ while (!isDone() && !isCancelled()) {
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (EncodingException | IOException e) {
+ throw new ExecutionException("Failed to retrieve packet.", e);
+ }
+ }
+ // case: cancelled
+ if (cancelled) {
+ throw new InterruptedException("Interrupted by user.");
+ }
+ // case: error
+ if (error != null) {
+ throw new ExecutionException("Future rejected with error.", error);
+ }
+ // case: packet
+ return data;
+ }
+
+ /**
+ * Block until packet is retrieved or timeout is reached.
+ *
+ * @param timeout
+ * @param unit
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ @Override
+ public Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long interval = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ long endTime = System.currentTimeMillis() + interval;
+ while (!isDone() && !isCancelled() && System.currentTimeMillis() < endTime) {
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (EncodingException | IOException e) {
+ throw new ExecutionException("Failed to retrieve packet.", e);
+ }
+ }
+ // case: timed out
+ if (System.currentTimeMillis() < endTime) {
+ throw new TimeoutException("Timed out");
+ }
+ // case: cancelled
+ if (cancelled) {
+ throw new InterruptedException("Interrupted by user.");
+ }
+ // case: error
+ if (error != null) {
+ throw new ExecutionException("Future rejected.", error);
+ }
+ // case: packet
+ return data;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/SegmentedClient.java b/src/main/java/com/intel/jndn/utils/SegmentedClient.java
new file mode 100644
index 0000000..aaa5c55
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/SegmentedClient.java
@@ -0,0 +1,201 @@
+/*
+ * File name: SegmentedClient.java
+ *
+ * Purpose: Provide a client to simplify retrieving segmented Data packets over
+ * the NDN network.
+ *
+ * © Copyright Intel Corporation. All rights reserved.
+ * Intel Corporation, 2200 Mission College Boulevard,
+ * Santa Clara, CA 95052-8119, USA
+ */
+package com.intel.jndn.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+
+/**
+ * Provide a client to simplify retrieving segmented Data packets over the NDN
+ * network. This class expects the Data producer to follow the NDN naming
+ * conventions (see http://named-data.net/doc/tech-memos/naming-conventions.pdf)
+ * and produce Data packets with a valid segment as the last component of their
+ * name; additionally, at least the first packet should set the FinalBlockId of
+ * the packet's MetaInfo (see
+ * http://named-data.net/doc/ndn-tlv/data.html#finalblockid).
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentedClient {
+
+ private static SegmentedClient defaultInstance;
+ private static final Logger logger = Logger.getLogger(SegmentedClient.class.getName());
+ private static final int SLEEP_TIME_MS = 10;
+
+ /**
+ * Singleton access for simpler client use
+ *
+ * @return
+ */
+ public static SegmentedClient getDefault() {
+ if (defaultInstance == null) {
+ defaultInstance = new SegmentedClient();
+ }
+ return defaultInstance;
+ }
+
+ /**
+ * Asynchronously send Interest packets for a segmented result; will block
+ * until the first packet is received and then send remaining interests until
+ * the specified FinalBlockId.
+ *
+ * @param face
+ * @param interest should include either a ChildSelector or an initial segment
+ * number
+ * @return
+ */
+ public List<FutureData> getAsync(Face face, Interest interest) {
+ // get first segment; default 0 or use a specified start segment
+ long firstSegment = 0;
+ boolean specifiedSegment = false;
+ try {
+ firstSegment = interest.getName().get(-1).toSegment();
+ specifiedSegment = true;
+ } catch (EncodingException e) {
+ // check for interest selector if no initial segment found
+ if (interest.getChildSelector() == -1) {
+ logger.warning("No child selector set for a segmented Interest; this may result in incorrect retrieval.");
+ }
+ }
+
+ // setup segments
+ final List<FutureData> segments = new ArrayList<>();
+ segments.add(Client.getDefault().getAsync(face, interest));
+
+ // retrieve first packet to find last segment value
+ long lastSegment;
+ try {
+ lastSegment = segments.get(0).get().getMetaInfo().getFinalBlockId().toSegment();
+ } catch (ExecutionException | InterruptedException | EncodingException e) {
+ logger.severe("Failed to retrieve first segment: " + e);
+ return null;
+ }
+
+ // cut interest segment off
+ if (specifiedSegment) {
+ interest.setName(interest.getName().getPrefix(-1));
+ }
+
+ // send interests in remaining segments
+ for (long i = firstSegment + 1; i <= lastSegment; i++) {
+ Interest segmentedInterest = new Interest(interest);
+ segmentedInterest.getName().appendSegment(i);
+ FutureData futureData = Client.getDefault().getAsync(face, segmentedInterest);
+ segments.add((int) i, futureData);
+ }
+
+ return segments;
+ }
+
+ /**
+ * Asynchronously send Interests for a segmented Data packet using a default
+ * interest (e.g. 2 second timeout); this will block until complete (i.e.
+ * either data is received or the interest times out).
+ *
+ * @param face
+ * @param name
+ * @return
+ */
+ public List<FutureData> getAsync(Face face, Name name) {
+ return getAsync(face, Client.getDefaultInterest(name));
+ }
+
+ /**
+ * Retrieve a segmented Data packet; will block until all segments are
+ * received and will re-assemble these.
+ *
+ * @param face
+ * @param interest should include either a ChildSelector or an initial segment
+ * number
+ * @return
+ */
+ public Data getSync(Face face, Interest interest) {
+ List<FutureData> segments = getAsync(face, interest);
+
+ // process events until complete
+ while (!isSegmentListComplete(segments)) {
+ try {
+ face.processEvents();
+ Thread.sleep(SLEEP_TIME_MS);
+ } catch (EncodingException | IOException e) {
+ logger.warning("Failed to retrieve data: " + e);
+ return null;
+ } catch (InterruptedException ex) {
+ // do nothing
+ }
+ }
+
+ // build final blob
+ ByteArrayOutputStream content = new ByteArrayOutputStream();
+ for (FutureData futureData : segments) {
+ try {
+ content.write(futureData.get().getContent().getImmutableArray());
+ } catch (ExecutionException | IOException | InterruptedException e) {
+ logger.warning("Failed to parse retrieved data: " + e);
+ return null;
+ }
+ }
+
+ Data data = new Data(interest.getName()); // TODO this name may not be correct; may need to contain additional suffixes
+ data.setContent(new Blob(content.toByteArray()));
+ return data;
+ }
+
+ /**
+ * Synchronously retrieve the Data for a Name using a default interest (e.g. 2
+ * second timeout); this will block until complete (i.e. either data is
+ * received or the interest times out).
+ *
+ * @param face
+ * @param name
+ * @return
+ */
+ public Data getSync(Face face, Name name) {
+ return getSync(face, Client.getDefaultInterest(name));
+ }
+
+ /**
+ * Check if a name ends in a segment component; uses marker value found in the
+ * NDN naming conventions (see
+ * http://named-data.net/doc/tech-memos/naming-conventions.pdf).
+ *
+ * @param name
+ * @return
+ */
+ public static boolean hasSegment(Name name) {
+ return name.get(-1).getValue().buf().get(0) == 0x00;
+ }
+
+ /**
+ * Check if a list of segments have returned from the network.
+ *
+ * @param segments
+ * @return
+ */
+ protected boolean isSegmentListComplete(List<FutureData> segments) {
+ for (FutureData futureData : segments) {
+ if (!futureData.isDone()) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/Server.java b/src/main/java/com/intel/jndn/utils/Server.java
index 2f58ac9..decd07a 100644
--- a/src/main/java/com/intel/jndn/utils/Server.java
+++ b/src/main/java/com/intel/jndn/utils/Server.java
@@ -10,6 +10,9 @@
*/
package com.intel.jndn.utils;
+import com.intel.jndn.utils.event.NDNEvent;
+import com.intel.jndn.utils.event.NDNObservable;
+import com.intel.jndn.utils.event.NDNObserver;
import java.io.IOException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
@@ -21,8 +24,7 @@
import net.named_data.jndn.encoding.EncodingException;
import net.named_data.jndn.security.KeyChain;
import net.named_data.jndn.transport.Transport;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import java.util.logging.Logger;
/**
* Provide a server to simplify serving data over the NDN network. Exposes two
@@ -35,7 +37,7 @@
public static final long DEFAULT_SLEEP_TIME = 20;
public static final long DEFAULT_TIMEOUT = 2000;
- private static final Logger logger = LogManager.getLogger();
+ private static final Logger logger = Logger.getLogger(Server.class.getName());
private static Server defaultInstance;
private KeyChain keyChain;
private Name certificateName;
@@ -99,7 +101,7 @@
try {
keyChain.sign(data, certificateName != null ? certificateName : keyChain.getDefaultCertificateName());
} catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Failed to sign data for: " + dataName, e);
+ logger.severe("Failed to sign data for: " + dataName + e);
event.fromPacket(e);
}
}
@@ -107,10 +109,10 @@
// send packet
try {
transport.send(data.wireEncode().buf());
- logger.debug("Sent data: " + dataName);
+ logger.fine("Sent data: " + dataName);
event.fromPacket(interest);
} catch (IOException e) {
- logger.error("Failed to send data for: " + dataName);
+ logger.severe("Failed to send data for: " + dataName);
event.fromPacket(e);
}
}
@@ -122,10 +124,10 @@
}, flags);
logger.info("Registered data: " + dataName);
} catch (IOException e) {
- logger.error("Could not connect to face to register prefix: " + dataName, e);
+ logger.severe("Could not connect to face to register prefix: " + dataName + e);
event.fromPacket(e);
} catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Error registering prefix: " + dataName, e);
+ logger.severe("Error registering prefix: " + dataName + e);
event.fromPacket(e);
}
@@ -136,14 +138,14 @@
face.processEvents();
}
} catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
+ logger.warning("Failed to process events." + e);
event.fromPacket(e);
}
sleep();
}
// return
- logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
+ logger.fine("Request time (ms): " + (event.getTimestamp() - startTime));
return (event.isSuccess()) ? (Interest) event.getPacket() : null;
}
@@ -171,7 +173,7 @@
try {
keyChain.sign(data, certificateName != null ? certificateName : keyChain.getDefaultCertificateName());
} catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Failed to sign data for: " + data.getName().toUri(), e);
+ logger.severe("Failed to sign data for: " + data.getName().toUri() + e);
eventHandler.notify(e);
}
}
@@ -180,7 +182,7 @@
try {
transport.send(data.wireEncode().buf());
} catch (IOException e) {
- logger.error("Failed to send data for: " + data.getName().toUri());
+ logger.severe("Failed to send data for: " + data.getName().toUri());
eventHandler.notify(e);
}
}
@@ -190,7 +192,7 @@
final OnRegisterFailed failureHandler = new OnRegisterFailed() {
@Override
public void onRegisterFailed(Name prefix) {
- logger.error("Failed to register name to put: " + data.getName().toUri());
+ logger.severe("Failed to register name to put: " + data.getName().toUri());
eventHandler.notify(new Exception("Failed to register name to put: " + data.getName().toUri()));
}
};
@@ -209,10 +211,10 @@
face.registerPrefix(data.getName(), interestHandler, failureHandler, flags);
logger.info("Registered data : " + data.getName().toUri());
} catch (IOException e) {
- logger.error("Could not connect to face to register prefix: " + data.getName().toUri(), e);
+ logger.severe("Could not connect to face to register prefix: " + data.getName().toUri() + e);
eventHandler.notify(e);
} catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Error registering prefix: " + data.getName().toUri(), e);
+ logger.severe("Error registering prefix: " + data.getName().toUri() + e);
eventHandler.notify(e);
}
@@ -223,7 +225,7 @@
face.processEvents();
}
} catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
+ logger.warning("Failed to process events." + e);
eventHandler.notify(e);
}
sleep();
@@ -268,7 +270,7 @@
try {
keyChain.sign(data, certificateName != null ? certificateName : keyChain.getDefaultCertificateName());
} catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Failed to sign data for: " + interest.getName().toUri(), e);
+ logger.severe("Failed to sign data for: " + interest.getName().toUri() + e);
eventHandler.notify(e);
}
}
@@ -278,7 +280,7 @@
transport.send(data.wireEncode().buf());
eventHandler.notify(data); // notify observers of data sent
} catch (IOException e) {
- logger.error("Failed to send data for: " + interest.getName().toUri());
+ logger.severe("Failed to send data for: " + interest.getName().toUri());
eventHandler.notify(e);
}
}
@@ -288,7 +290,7 @@
final OnRegisterFailed failureHandler = new OnRegisterFailed() {
@Override
public void onRegisterFailed(Name prefix) {
- logger.error("Failed to register name to put: " + prefix.toUri());
+ logger.severe("Failed to register name to put: " + prefix.toUri());
eventHandler.notify(new Exception("Failed to register name to put: " + prefix.toUri()));
}
};
@@ -307,10 +309,10 @@
face.registerPrefix(prefix, interestHandler, failureHandler, flags);
logger.info("Registered data : " + prefix.toUri());
} catch (IOException e) {
- logger.error("Could not connect to face to register prefix: " + prefix.toUri(), e);
+ logger.severe("Could not connect to face to register prefix: " + prefix.toUri() + e);
eventHandler.notify(e);
} catch (net.named_data.jndn.security.SecurityException e) {
- logger.error("Error registering prefix: " + prefix.toUri(), e);
+ logger.severe("Error registering prefix: " + prefix.toUri() + e);
eventHandler.notify(e);
}
@@ -321,7 +323,7 @@
face.processEvents();
}
} catch (IOException | EncodingException e) {
- logger.warn("Failed to process events.", e);
+ logger.warning("Failed to process events." + e);
eventHandler.notify(e);
}
sleep();
@@ -342,7 +344,7 @@
try {
Thread.currentThread().sleep(DEFAULT_SLEEP_TIME);
} catch (InterruptedException e) {
- logger.error("Event loop interrupted.", e);
+ logger.severe("Event loop interrupted." + e);
}
}
diff --git a/src/main/java/com/intel/jndn/utils/NDNEvent.java b/src/main/java/com/intel/jndn/utils/event/NDNEvent.java
similarity index 86%
rename from src/main/java/com/intel/jndn/utils/NDNEvent.java
rename to src/main/java/com/intel/jndn/utils/event/NDNEvent.java
index 5316ae1..c2f7f67 100644
--- a/src/main/java/com/intel/jndn/utils/NDNEvent.java
+++ b/src/main/java/com/intel/jndn/utils/event/NDNEvent.java
@@ -7,10 +7,11 @@
* Intel Corporation, 2200 Mission College Boulevard,
* Santa Clara, CA 95052-8119, USA
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.event;
/**
* Signals an event (from Client or Server) for observers to act on
+ *
* @author Andrew Brown <andrew.brown@intel.com>
*/
public class NDNEvent<T> {
@@ -26,21 +27,21 @@
timestamp = System.currentTimeMillis();
success = false; // an event without a packet is a failed event
}
-
+
/**
* Constructor
- *
- * @param packet
+ *
+ * @param packet
*/
public NDNEvent(T packet) {
fromPacket(packet);
}
/**
- * Build this event from a passed packet; the event is considered a failure
- * if the packet is any type of Exception
- *
- * @param packet
+ * Build this event from a passed packet; the event is considered a failure if
+ * the packet is any type of Exception
+ *
+ * @param packet
*/
public final void fromPacket(T packet) {
this.timestamp = System.currentTimeMillis();
@@ -50,8 +51,8 @@
/**
* Retrieve success status
- *
- * @return
+ *
+ * @return
*/
public boolean isSuccess() {
return success;
@@ -59,8 +60,8 @@
/**
* Retrieve event timestamp
- *
- * @return
+ *
+ * @return
*/
public long getTimestamp() {
return timestamp;
@@ -68,8 +69,8 @@
/**
* Retrieve event packet
- *
- * @return
+ *
+ * @return
*/
public T getPacket() {
return packet;
diff --git a/src/main/java/com/intel/jndn/utils/NDNObservable.java b/src/main/java/com/intel/jndn/utils/event/NDNObservable.java
similarity index 92%
rename from src/main/java/com/intel/jndn/utils/NDNObservable.java
rename to src/main/java/com/intel/jndn/utils/event/NDNObservable.java
index 003fe45..a2c9fdd 100644
--- a/src/main/java/com/intel/jndn/utils/NDNObservable.java
+++ b/src/main/java/com/intel/jndn/utils/event/NDNObservable.java
@@ -7,7 +7,7 @@
* Intel Corporation, 2200 Mission College Boulevard,
* Santa Clara, CA 95052-8119, USA
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.event;
import java.util.ArrayList;
import java.util.List;
@@ -32,7 +32,7 @@
/**
* Generic notification
- *
+ *
* @param <T>
* @param packet
*/
@@ -43,9 +43,9 @@
/**
* Handle data packets
- *
+ *
* @param interest
- * @param data
+ * @param data
*/
@Override
public void onData(Interest interest, Data data) {
@@ -54,8 +54,8 @@
/**
* Handle exceptions
- *
- * @param e
+ *
+ * @param e
*/
public void onError(Exception e) {
notify(e);
@@ -63,8 +63,8 @@
/**
* Handle timeouts
- *
- * @param interest
+ *
+ * @param interest
*/
@Override
public void onTimeout(Interest interest) {
@@ -73,11 +73,11 @@
/**
* Handle incoming interests
- *
+ *
* @param prefix
* @param interest
* @param transport
- * @param registeredPrefixId
+ * @param registeredPrefixId
*/
@Override
public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
@@ -105,11 +105,13 @@
return interest;
}
}
-
+
/**
- * Helper to reference both incoming interest and the transport to send data on
+ * Helper to reference both incoming interest and the transport to send data
+ * on
*/
- class InterestTransportPacket{
+ class InterestTransportPacket {
+
private Interest interest;
private Transport transport;
@@ -125,6 +127,6 @@
public Transport getTransport() {
return transport;
}
-
+
}
}
diff --git a/src/main/java/com/intel/jndn/utils/NDNObserver.java b/src/main/java/com/intel/jndn/utils/event/NDNObserver.java
similarity index 95%
rename from src/main/java/com/intel/jndn/utils/NDNObserver.java
rename to src/main/java/com/intel/jndn/utils/event/NDNObserver.java
index c6d820e..ae0851f 100644
--- a/src/main/java/com/intel/jndn/utils/NDNObserver.java
+++ b/src/main/java/com/intel/jndn/utils/event/NDNObserver.java
@@ -7,7 +7,7 @@
* Intel Corporation, 2200 Mission College Boulevard,
* Santa Clara, CA 95052-8119, USA
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.event;
import java.util.ArrayList;
import java.util.List;
@@ -18,6 +18,7 @@
/**
* Track asynchronous events from Client and Server
+ *
* @author Andrew Brown <andrew.brown@intel.com>
*/
public class NDNObserver implements Observer {
@@ -160,16 +161,16 @@
}
/**
- * Stop the current Client thread; used by asynchronous Client methods to
- * stop the request/response thread
+ * Stop the current Client thread; used by asynchronous Client methods to stop
+ * the request/response thread
*/
public void stop() {
stopThread = true;
}
/**
- * Check the current stop status; used by asynchronous Client methods to
- * stop the request/response thread
+ * Check the current stop status; used by asynchronous Client methods to stop
+ * the request/response thread
*
* @return
*/
diff --git a/src/main/java/com/intel/jndn/utils/OnEvent.java b/src/main/java/com/intel/jndn/utils/event/OnEvent.java
similarity index 92%
rename from src/main/java/com/intel/jndn/utils/OnEvent.java
rename to src/main/java/com/intel/jndn/utils/event/OnEvent.java
index 62bdd56..11b0b86 100644
--- a/src/main/java/com/intel/jndn/utils/OnEvent.java
+++ b/src/main/java/com/intel/jndn/utils/event/OnEvent.java
@@ -7,7 +7,7 @@
* Intel Corporation, 2200 Mission College Boulevard,
* Santa Clara, CA 95052-8119, USA
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.event;
/**
* Interface for registering generic event handlers. E.g.: observer.then((event)
@@ -16,5 +16,6 @@
* @author Andrew Brown <andrew.brown@intel.com>
*/
public interface OnEvent {
+
public void onEvent(NDNEvent event);
}
diff --git a/src/test/java/com/intel/jndn/utils/ClientTest.java b/src/test/java/com/intel/jndn/utils/ClientTest.java
index 88c39cc..4b6d9e7 100644
--- a/src/test/java/com/intel/jndn/utils/ClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/ClientTest.java
@@ -12,12 +12,15 @@
import org.junit.Test;
import static org.junit.Assert.*;
import com.intel.jndn.mock.MockTransport;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Name;
import net.named_data.jndn.util.Blob;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.junit.rules.ExpectedException;
/**
* Test Client.java
@@ -28,7 +31,7 @@
/**
* Setup logging
*/
- private static final Logger logger = LogManager.getLogger();
+ private static final Logger logger = Logger.getLogger(Client.class.getName());
/**
* Test retrieving data synchronously
@@ -57,7 +60,7 @@
* @throws InterruptedException
*/
@Test
- public void testGetAsync() throws InterruptedException {
+ public void testGetAsync() throws InterruptedException, ExecutionException {
// setup face
MockTransport transport = new MockTransport();
Face face = new Face(transport, null);
@@ -70,16 +73,12 @@
// retrieve data
logger.info("Client expressing interest asynchronously: /test/async");
Client client = new Client();
- NDNObserver observer = client.get(face, new Name("/test/async"));
-
- // wait
- while (observer.eventCount() == 0) {
- Thread.sleep(10);
- }
- assertEquals(1, observer.eventCount());
- assertEquals(1, observer.dataCount());
- Data data = (Data) observer.getFirst().getPacket();
- assertEquals(new Blob("...").buf(), data.getContent().buf());
+ FutureData futureData = client.getAsync(face, new Name("/test/async"));
+
+ assertTrue(!futureData.isDone());
+ futureData.get();
+ assertTrue(futureData.isDone());
+ assertEquals(new Blob("...").toString(), futureData.get().getContent().toString());
}
/**
@@ -88,52 +87,16 @@
* @throws InterruptedException
*/
@Test
- public void testTimeout() throws InterruptedException {
+ public void testTimeout() throws InterruptedException, ExecutionException, TimeoutException {
// setup face
MockTransport transport = new MockTransport();
Face face = new Face(transport, null);
// retrieve non-existent data, should timeout
logger.info("Client expressing interest asynchronously: /test/timeout");
- NDNObserver observer = Client.getDefault().get(face, new Name("/test/timeout"));
-
- // wait
- while (observer.errorCount() == 0) {
- Thread.sleep(100);
- }
- Exception e = (Exception) observer.getFirst().getPacket();
- assertEquals(1, observer.errorCount());
- }
-
- /**
- * Test that callback is called on event
- * @throws InterruptedException
- */
- @Test
- public void testCallback() throws InterruptedException {
- // setup face
- MockTransport transport = new MockTransport();
- Face face = new Face(transport, null);
+ FutureData futureData = Client.getDefault().getAsync(face, new Name("/test/timeout"));
- // setup return data
- Data response = new Data(new Name("/test/callback"));
- response.setContent(new Blob("..."));
- transport.respondWith(response);
-
- // retrieve non-existent data, should timeout
- logger.info("Client expressing interest asynchronously: /test/callback");
- NDNObserver observer = Client.getDefault().get(face, new Name("/test/callback"));
- observer.then(new OnEvent(){
- @Override
- public void onEvent(NDNEvent event) {
- assertEquals(new Blob("...").buf(), ((Data) event.getPacket()).getContent().buf());
- }
- });
-
- // wait
- while (observer.eventCount() == 0) {
- Thread.sleep(100);
- }
- assertEquals(1, observer.eventCount());
+ ExpectedException.none().expect(TimeoutException.class);
+ futureData.get(50, TimeUnit.MILLISECONDS);
}
}
diff --git a/src/test/java/com/intel/jndn/utils/SegmentedClientTest.java b/src/test/java/com/intel/jndn/utils/SegmentedClientTest.java
new file mode 100644
index 0000000..2db9071
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/SegmentedClientTest.java
@@ -0,0 +1,60 @@
+/*
+ * File name: SegmentedClientTest.java
+ *
+ * Purpose: Test SegmentedClient functionality.
+ *
+ * © Copyright Intel Corporation. All rights reserved.
+ * Intel Corporation, 2200 Mission College Boulevard,
+ * Santa Clara, CA 95052-8119, USA
+ */
+package com.intel.jndn.utils;
+
+import com.intel.jndn.mock.MockFace;
+import java.io.IOException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.Name.Component;
+import net.named_data.jndn.OnInterest;
+import net.named_data.jndn.transport.Transport;
+import net.named_data.jndn.util.Blob;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test SegmentedClient functionality.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentedClientTest {
+
+ /**
+ * Test of getSync method, of class SegmentedClient.
+ */
+ @Test
+ public void testGetSync() throws Exception {
+ MockFace face = new MockFace();
+ face.registerPrefix(new Name("/segmented/data"), new OnInterest() {
+ private int count = 0;
+ private int max = 9;
+
+ @Override
+ public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
+ Data data = new Data(interest.getName());
+ if (!SegmentedClient.hasSegment(data.getName())) {
+ data.getName().appendSegment(0);
+ }
+ data.getMetaInfo().setFinalBlockId(Component.fromNumberWithMarker(max, 0x00));
+ data.setContent(new Blob("."));
+ try {
+ transport.send(data.wireEncode().buf());
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ }
+ }, null);
+
+ Data data = SegmentedClient.getDefault().getSync(face, new Name("/segmented/data").appendSegment(0));
+ assertEquals(10, data.getContent().size());
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/ServerTest.java b/src/test/java/com/intel/jndn/utils/ServerTest.java
index a2b1880..6900ed9 100644
--- a/src/test/java/com/intel/jndn/utils/ServerTest.java
+++ b/src/test/java/com/intel/jndn/utils/ServerTest.java
@@ -9,6 +9,7 @@
*/
package com.intel.jndn.utils;
+import com.intel.jndn.utils.event.NDNObserver;
import com.intel.jndn.mock.MockTransport;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;