Refactor, add documentation; fix timeouts
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 269fe5c..3425950 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -26,6 +26,7 @@
import org.apache.logging.log4j.Logger;
/**
+ * Provide a client to simplify information retrieval over the NDN network.
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
@@ -34,6 +35,19 @@
public static final long DEFAULT_SLEEP_TIME = 20;
public static final long DEFAULT_TIMEOUT = 2000;
private static final Logger logger = LogManager.getLogger();
+ private static Client defaultInstance;
+
+ /**
+ * Singleton access for simpler client use
+ *
+ * @return
+ */
+ public static Client getDefault() {
+ if (defaultInstance == null) {
+ defaultInstance = new Client();
+ }
+ return defaultInstance;
+ }
/**
* Synchronously retrieve the Data for an Interest; this will block until
@@ -46,23 +60,19 @@
public Data getSync(Face face, Interest interest) {
// setup event
long startTime = System.currentTimeMillis();
- final ClientObservableEvent event = new ClientObservableEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
+ final ClientEvent event = new ClientEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
// send interest
try {
face.expressInterest(interest, new OnData() {
@Override
public void onData(Interest interest, Data data) {
- event.setTimestamp(System.currentTimeMillis());
- event.setSuccess(true);
- event.setPacket(data);
+ event.fromPacket(data);
}
}, new OnTimeout() {
@Override
public void onTimeout(Interest interest) {
- event.setTimestamp(System.currentTimeMillis());
- event.setSuccess(false);
- event.setPacket(new Object());
+ event.fromPacket(new Exception("Interest timed out: " + interest.getName().toUri()));
}
});
} catch (IOException e) {
@@ -70,7 +80,7 @@
return null;
}
- // process events until a response is received or timeout
+ // process eventCount until a response is received or timeout
while (event.getPacket() == null) {
try {
synchronized (face) {
@@ -103,8 +113,19 @@
/**
* Asynchronously retrieve the Data for a given interest; use the returned
- * ClientObserver to handle the Data when it arrives. E.g.: Client.get(face,
- * interest).then((data) -> doSomething(data));
+ * ClientObserver to handle the Data when it arrives. For example (with lambdas):
+ * <pre><code>
+ * Client.getDefault().get(face, interest).then((event) -> doSomething(event));
+ * </code></pre>
+ *
+ * If you want to block until the response returns, try something like:
+ * <pre><code>
+ * ClientObserver observer = Client.getDefault().get(face, interest);
+ * while(observer.eventCount() == 0){
+ * Thread.sleep(50);
+ * }
+ * doSomething(observer.getFirst());
+ * </code></pre>
*
* @param face
* @param interest
@@ -128,8 +149,8 @@
eventHandler.notify(e);
}
- // process events until a response is received or timeout
- while (observer.responses() == 0 && observer.errors() == 0 && !observer.mustStop()) {
+ // process eventCount until a response is received or timeout
+ while (observer.dataCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
try {
synchronized (face) {
face.processEvents();
@@ -140,7 +161,7 @@
}
sleep();
}
-
+
// finished
logger.trace("Received response; stopping thread.");
}
@@ -178,7 +199,7 @@
// setup event
long startTime = System.currentTimeMillis();
final String dataName = data.getName().toUri();
- final ClientObservableEvent event = new ClientObservableEvent();
+ final ClientEvent event = new ClientEvent();
// setup flags
ForwardingFlags flags = new ForwardingFlags();
@@ -213,7 +234,7 @@
event.fromPacket(e);
}
- // process events until one response is sent or error
+ // process eventCount until one response is sent or error
while (event.getPacket() == null) {
try {
synchronized (face) {
@@ -284,15 +305,15 @@
eventHandler.onError(e);
}
- // process events until a request is received
- while (observer.requests() == 0 && observer.errors() == 0 && !observer.mustStop()) {
+ // process eventCount until a request is received
+ while (observer.interestCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
try {
synchronized (face) {
face.processEvents();
}
} catch (IOException | EncodingException e) {
logger.warn("Failed to process events.", e);
- observer.update(eventHandler, new ClientObservableEvent());
+ observer.update(eventHandler, new ClientEvent());
}
sleep();
}
diff --git a/src/main/java/com/intel/jndn/utils/ClientEvent.java b/src/main/java/com/intel/jndn/utils/ClientEvent.java
new file mode 100644
index 0000000..a125286
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/ClientEvent.java
@@ -0,0 +1,77 @@
+/*
+ * File name: ClientEvent.java
+ *
+ * Purpose: Signals a Client event for observers to act on
+ *
+ * © Copyright Intel Corporation. All rights reserved.
+ * Intel Corporation, 2200 Mission College Boulevard,
+ * Santa Clara, CA 95052-8119, USA
+ */
+package com.intel.jndn.utils;
+
+/**
+ * Signals a Client event for observers to act on
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class ClientEvent<T> {
+
+ protected boolean success;
+ protected long timestamp;
+ protected T packet;
+
+ /**
+ * Constructor
+ */
+ public ClientEvent() {
+ timestamp = System.currentTimeMillis();
+ success = false; // an event without a packet is a failed event
+ }
+
+ /**
+ * Constructor
+ *
+ * @param packet
+ */
+ public ClientEvent(T packet) {
+ fromPacket(packet);
+ }
+
+ /**
+ * Build this event from a passed packet; the event is considered a failure
+ * if the packet is any type of Exception
+ *
+ * @param packet
+ */
+ public final void fromPacket(T packet) {
+ this.timestamp = System.currentTimeMillis();
+ this.success = !Exception.class.isInstance(packet);
+ this.packet = packet;
+ }
+
+ /**
+ * Retrieve success status
+ *
+ * @return
+ */
+ public boolean isSuccess() {
+ return success;
+ }
+
+ /**
+ * Retrieve event timestamp
+ *
+ * @return
+ */
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Retrieve event packet
+ *
+ * @return
+ */
+ public T getPacket() {
+ return packet;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/ClientObservable.java b/src/main/java/com/intel/jndn/utils/ClientObservable.java
index ce6beda..0a4b341 100644
--- a/src/main/java/com/intel/jndn/utils/ClientObservable.java
+++ b/src/main/java/com/intel/jndn/utils/ClientObservable.java
@@ -26,7 +26,7 @@
*/
public class ClientObservable extends Observable implements OnData, OnTimeout, OnInterest {
- protected List<ClientObservableEvent> events = new ArrayList<>();
+ protected List<ClientEvent> events = new ArrayList<>();
protected List<Interest> incomingInterestPackets = new ArrayList<>();
protected List<Data> incomingDataPackets;
@@ -38,7 +38,7 @@
*/
public <T> void notify(T packet) {
setChanged();
- notifyObservers(new ClientObservableEvent(packet));
+ notifyObservers(new ClientEvent(packet));
}
/**
@@ -68,7 +68,7 @@
*/
@Override
public void onTimeout(Interest interest) {
- notify(interest);
+ notify(new Exception("Interest timed out: " + interest.getName().toUri()));
}
/**
diff --git a/src/main/java/com/intel/jndn/utils/ClientObservableEvent.java b/src/main/java/com/intel/jndn/utils/ClientObservableEvent.java
deleted file mode 100644
index af2a614..0000000
--- a/src/main/java/com/intel/jndn/utils/ClientObservableEvent.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * File name: ClientObservableEvent.java
- *
- * Purpose:
- *
- * © Copyright Intel Corporation. All rights reserved.
- * Intel Corporation, 2200 Mission College Boulevard,
- * Santa Clara, CA 95052-8119, USA
- */
-package com.intel.jndn.utils;
-
-/**
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class ClientObservableEvent<T> {
-
- boolean success;
- long timestamp;
- T packet;
-
- public ClientObservableEvent(T packet) {
- fromPacket(packet);
- }
-
- public ClientObservableEvent() {
- timestamp = System.currentTimeMillis();
- success = false;
- }
-
- public final void fromPacket(T packet_) {
- timestamp = System.currentTimeMillis();
- success = !Exception.class.isInstance(packet_);
- packet = packet_;
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public T getPacket() {
- return packet;
- }
-
- public void setPacket(T packet) {
- this.packet = packet;
- }
-
-}
diff --git a/src/main/java/com/intel/jndn/utils/ClientObserver.java b/src/main/java/com/intel/jndn/utils/ClientObserver.java
index 0ed4e11..9bfdd7e 100644
--- a/src/main/java/com/intel/jndn/utils/ClientObserver.java
+++ b/src/main/java/com/intel/jndn/utils/ClientObserver.java
@@ -1,7 +1,7 @@
/*
* File name: ClientObserver.java
*
- * Purpose:
+ * Purpose: Track asynchronous events from Client and provide simplified API
*
* © Copyright Intel Corporation. All rights reserved.
* Intel Corporation, 2200 Mission College Boulevard,
@@ -23,9 +23,9 @@
*/
public class ClientObserver implements Observer {
- protected List<ClientObservableEvent> events = new ArrayList<>();
+ protected List<ClientEvent> events = new ArrayList<>();
protected long timestamp;
- protected OnData then;
+ protected OnEvent then;
protected boolean stopThread;
/**
@@ -43,26 +43,62 @@
*/
@Override
public void update(Observable o, Object arg) {
- ClientObservableEvent event = (ClientObservableEvent) arg;
+ ClientEvent event = (ClientEvent) arg;
events.add(event);
// call onData callbacks
if (Data.class.isInstance(event.packet) && then != null) {
- then.onData(null, (Data) event.packet);
+ then.onEvent(event);
}
}
/**
- * Register a handler for data packets
+ * Register a handler for events
*
- * @param handler
+ * @param callback
* @return
*/
- public ClientObserver then(OnData handler) {
- then = handler;
+ public ClientObserver then(OnEvent callback) {
+ then = callback;
return this;
}
/**
+ * Count the number of eventCount observed
+ *
+ * @return
+ */
+ public int eventCount() {
+ return events.size();
+ }
+
+ /**
+ * Count the number of interest packets observed (received or sent)
+ *
+ * @return
+ */
+ public int interestCount() {
+ return count(Interest.class);
+ }
+
+ /**
+ * Count the number of Data packets observed
+ *
+ * @return
+ */
+ public int dataCount() {
+ return count(Data.class);
+ }
+
+ /**
+ * Count the number of errors observed
+ *
+ * @return
+ */
+ public int errorCount() {
+ return count(Exception.class);
+ }
+
+ /**
* Count the number of observed packets by type
*
* @param type
@@ -70,7 +106,7 @@
*/
public int count(Class type) {
int count = 0;
- for (ClientObservableEvent event : events) {
+ for (ClientEvent event : events) {
if (type.isInstance(event.packet)) {
count++;
}
@@ -79,34 +115,8 @@
}
/**
- * Count the number of interest packets observed (received or sent)
- *
- * @return
- */
- public int requests() {
- return count(Interest.class);
- }
-
- /**
- * Count the number of Data packets observed
- *
- * @return
- */
- public int responses() {
- return count(Data.class);
- }
-
- /**
- * Count the number of errors
- *
- * @return
- */
- public int errors() {
- return count(Exception.class);
- }
-
- /**
- * Get time since observer start
+ * Calculate time elapsed since observer started observing until this method
+ * is called
*
* @return
*/
@@ -118,11 +128,20 @@
}
/**
- * Retrieve first event
+ * Retrieve a list of observed events
*
* @return event or null
*/
- public ClientObservableEvent getFirst() {
+ public List<ClientEvent> getEvents() {
+ return events;
+ }
+
+ /**
+ * Retrieve the first event
+ *
+ * @return event or null
+ */
+ public ClientEvent getFirst() {
if (events.size() > 0) {
return events.get(0);
}
@@ -130,11 +149,11 @@
}
/**
- * Retrieve last event
+ * Retrieve the last event
*
* @return event or null
*/
- public ClientObservableEvent getLast() {
+ public ClientEvent getLast() {
if (events.size() > 0) {
return events.get(events.size() - 1);
}
@@ -142,14 +161,16 @@
}
/**
- * Stop the current Client thread
+ * Stop the current Client thread; used by asynchronous Client methods to
+ * stop the request/response thread
*/
public void stop() {
stopThread = true;
}
/**
- * Check the current stop status
+ * Check the current stop status; used by asynchronous Client methods to
+ * stop the request/response thread
*
* @return
*/
diff --git a/src/main/java/com/intel/jndn/utils/OnEvent.java b/src/main/java/com/intel/jndn/utils/OnEvent.java
new file mode 100644
index 0000000..e9c7f67
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/OnEvent.java
@@ -0,0 +1,20 @@
+/*
+ * File name: OnEvent.java
+ *
+ * Purpose: Interface for registering generic event handlers
+ *
+ * © Copyright Intel Corporation. All rights reserved.
+ * Intel Corporation, 2200 Mission College Boulevard,
+ * Santa Clara, CA 95052-8119, USA
+ */
+package com.intel.jndn.utils;
+
+/**
+ * Interface for registering generic event handlers. E.g.: observer.then((event)
+ * => doSomething(event.getFirst()));
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface OnEvent {
+ public void onEvent(ClientEvent event);
+}
diff --git a/src/test/java/com/intel/jndn/utils/ClientTest.java b/src/test/java/com/intel/jndn/utils/ClientTest.java
index 787a395..a294817 100644
--- a/src/test/java/com/intel/jndn/utils/ClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/ClientTest.java
@@ -20,7 +20,7 @@
import org.apache.logging.log4j.Logger;
/**
- *
+ * Test Client.java
* @author Andrew Brown <andrew.brown@intel.com>
*/
public class ClientTest {
@@ -40,14 +40,14 @@
Face face = new Face(transport, null);
// setup return data
- Data response = new Data(new Name("/a/b/c"));
+ Data response = new Data(new Name("/test/sync"));
response.setContent(new Blob("..."));
transport.respondWith(response);
// retrieve data
- logger.info("Client expressing interest synchronously: /a/b/c");
+ logger.info("Client expressing interest synchronously: /test/sync");
Client client = new Client();
- Data data = client.getSync(face, new Name("/a/b/c"));
+ Data data = client.getSync(face, new Name("/test/sync"));
assertEquals(new Blob("...").buf(), data.getContent().buf());
}
@@ -63,20 +63,77 @@
Face face = new Face(transport, null);
// setup return data
- Data response = new Data(new Name("/a/b/c"));
+ Data response = new Data(new Name("/test/async"));
response.setContent(new Blob("..."));
transport.respondWith(response);
// retrieve data
- logger.info("Client expressing interest asynchronously: /a/b/c");
+ logger.info("Client expressing interest asynchronously: /test/async");
Client client = new Client();
- ClientObserver observer = client.get(face, new Name("/a/b/c"));
+ ClientObserver observer = client.get(face, new Name("/test/async"));
// wait
- while (observer.responses() == 0) {
+ while (observer.eventCount() == 0) {
Thread.sleep(10);
}
+ assertEquals(1, observer.eventCount());
+ assertEquals(1, observer.dataCount());
Data data = (Data) observer.getFirst().getPacket();
assertEquals(new Blob("...").buf(), data.getContent().buf());
}
+
+ /**
+ * Test that asynchronous client times out correctly
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTimeout() throws InterruptedException {
+ // setup face
+ MockTransport transport = new MockTransport();
+ Face face = new Face(transport, null);
+
+ // retrieve non-existent data, should timeout
+ logger.info("Client expressing interest asynchronously: /test/timeout");
+ ClientObserver observer = Client.getDefault().get(face, new Name("/test/timeout"));
+
+ // wait
+ while (observer.errorCount() == 0) {
+ Thread.sleep(100);
+ }
+ Exception e = (Exception) observer.getFirst().getPacket();
+ assertEquals(1, observer.errorCount());
+ }
+
+ /**
+ * Test that callback is called on event
+ * @throws InterruptedException
+ */
+ @Test
+ public void testCallback() throws InterruptedException {
+ // setup face
+ MockTransport transport = new MockTransport();
+ Face face = new Face(transport, null);
+
+ // setup return data
+ Data response = new Data(new Name("/test/callback"));
+ response.setContent(new Blob("..."));
+ transport.respondWith(response);
+
+ // retrieve non-existent data, should timeout
+ logger.info("Client expressing interest asynchronously: /test/callback");
+ ClientObserver observer = Client.getDefault().get(face, new Name("/test/callback"));
+ observer.then(new OnEvent(){
+ @Override
+ public void onEvent(ClientEvent event) {
+ assertEquals(new Blob("...").buf(), ((Data) event.getPacket()).getContent().buf());
+ }
+ });
+
+ // wait
+ while (observer.eventCount() == 0) {
+ Thread.sleep(100);
+ }
+ assertEquals(1, observer.eventCount());
+ }
}