Move Future implementations to subfolder
diff --git a/src/main/java/com/intel/jndn/utils/FutureData.java b/src/main/java/com/intel/jndn/utils/client/FutureData.java
similarity index 83%
rename from src/main/java/com/intel/jndn/utils/FutureData.java
rename to src/main/java/com/intel/jndn/utils/client/FutureData.java
index 0b4e24d..dc6b393 100644
--- a/src/main/java/com/intel/jndn/utils/FutureData.java
+++ b/src/main/java/com/intel/jndn/utils/client/FutureData.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.client;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@@ -24,23 +24,23 @@
import net.named_data.jndn.encoding.EncodingException;
/**
- * Reference to a Packet that has yet to be returned from the network; see use
- * in WindowBuffer.java. Usage:
+ * Reference to a Packet that has yet to be returned from the network. Usage:
+ *
* <pre><code>
- * FuturePacket futurePacket = new FuturePacket(face);
+ * FutureData futureData = new FutureData(face, interest.getName());
* face.expressInterest(interest, new OnData(){
- * ... futurePacket.resolve(data); ...
+ * ... futureData.resolve(data); ...
* }, new OnTimeout(){
- * ... futurePacket.reject(new TimeoutException());
+ * ... futureData.reject(new TimeoutException());
* });
- * Packet resolvedPacket = futurePacket.get(); // will block and call face.processEvents() until complete
+ * Data resolvedData = futureData.get(); // will block and call face.processEvents() until complete
* </code></pre>
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
public class FutureData implements Future<Data> {
- private final Face face;
+ protected final Face face;
private final Name name;
private Data data;
private boolean cancelled = false;
@@ -58,7 +58,7 @@
}
/**
- * Get the packet interest name
+ * Get the Interest name.
*
* @return
*/
@@ -95,7 +95,7 @@
*/
@Override
public boolean isDone() {
- return data != null || error != null;
+ return data != null || error != null || isCancelled();
}
/**
@@ -125,7 +125,8 @@
*/
@Override
public Data get() throws InterruptedException, ExecutionException {
- while (!isDone() && !isCancelled()) {
+ while (!isDone()) {
+ // process face events
try {
synchronized (face) {
face.processEvents();
@@ -134,15 +135,17 @@
throw new ExecutionException("Failed to retrieve packet.", e);
}
}
+
// case: cancelled
if (cancelled) {
throw new InterruptedException("Interrupted by user.");
}
+
// case: error
if (error != null) {
throw new ExecutionException("Future rejected with error.", error);
}
- // case: packet
+
return data;
}
@@ -160,7 +163,9 @@
public Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long interval = TimeUnit.MILLISECONDS.convert(timeout, unit);
long endTime = System.currentTimeMillis() + interval;
- while (!isDone() && !isCancelled() && System.currentTimeMillis() < endTime) {
+ long currentTime = System.currentTimeMillis();
+ while (!isDone() && !isCancelled() && currentTime <= endTime) {
+ // process face events
try {
synchronized (face) {
face.processEvents();
@@ -168,20 +173,25 @@
} catch (EncodingException | IOException e) {
throw new ExecutionException("Failed to retrieve packet.", e);
}
+
+ currentTime = System.currentTimeMillis();
}
- // case: timed out
- if (System.currentTimeMillis() < endTime) {
- throw new TimeoutException("Timed out");
- }
+
// case: cancelled
if (cancelled) {
throw new InterruptedException("Interrupted by user.");
}
+
// case: error
if (error != null) {
throw new ExecutionException("Future rejected with error.", error);
}
- // case: packet
+
+ // case: timed out
+ if (currentTime > endTime) {
+ throw new TimeoutException("Timed out.");
+ }
+
return data;
}
}
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java b/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java
new file mode 100644
index 0000000..dec33df
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java
@@ -0,0 +1,172 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+/**
+ * Represents a list of Packets that have been requested asynchronously and have
+ * yet to be returned from the network. Usage:
+ *
+ * <pre><code>
+ * SegmentedFutureData segmentedFutureData = new SegmentedFutureData(face, name, futureDataList);
+ * Data data = segmentedFutureData.get(); // will block until complete
+ * </code></pre>
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentedFutureData implements Future<Data> {
+
+ private final Name name;
+ List<Future<Data>> segments;
+ private boolean cancelled = false;
+
+ /**
+ * Constructor
+ *
+ * @param name this will be the name of the returned Data packet, regardless
+ * of suffixes (e.g. segment components) on each segment packet
+ * @param segments
+ */
+ public SegmentedFutureData(Name name, List<Future<Data>> segments) {
+ this.name = name;
+ this.segments = segments;
+ }
+
+ /**
+ * Get the Interest name; this will also be the name of the Data packet
+ * returned from get().
+ *
+ * @return
+ */
+ public Name getName() {
+ return name;
+ }
+
+ /**
+ * Cancel the current request.
+ *
+ * @param mayInterruptIfRunning
+ * @return
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ cancelled = true;
+ return cancelled;
+ }
+
+ /**
+ * Determine if this request is cancelled.
+ *
+ * @return
+ */
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ /**
+ * Determine if the request has completed (successfully or not).
+ *
+ * @return
+ */
+ @Override
+ public boolean isDone() {
+ // check for errors, cancellation
+ if (isCancelled()) {
+ return true;
+ }
+
+ // check each segment for completion
+ for (Future<Data> futureData : segments) {
+ if (!futureData.isDone()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Block until packet is retrieved.
+ *
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Override
+ public Data get() throws InterruptedException, ExecutionException {
+ // aggregate bytes
+ ByteArrayOutputStream content = new ByteArrayOutputStream();
+ for (Future<Data> futureData : segments) {
+ try {
+ content.write(futureData.get().getContent().getImmutableArray());
+ } catch (ExecutionException | IOException | InterruptedException e) {
+ throw new ExecutionException("Failed while aggregating retrieved packets.", e);
+ }
+ }
+
+ // build aggregated packet (copy first packet)
+ Data data = new Data(segments.get(0).get());
+ data.setName(getName());
+ data.setContent(new Blob(content.toByteArray()));
+ return data;
+ }
+
+ /**
+ * Block until packet is retrieved or timeout is reached.
+ *
+ * @param timeout
+ * @param unit
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ @Override
+ public Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long interval = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ long endTime = System.currentTimeMillis() + interval;
+
+ // aggregate bytes
+ ByteArrayOutputStream content = new ByteArrayOutputStream();
+ for (Future<Data> futureData : segments) {
+ try {
+ content.write(futureData.get().getContent().getImmutableArray());
+ } catch (ExecutionException | IOException | InterruptedException e) {
+ throw new ExecutionException("Failed while aggregating retrieved packets.", e);
+ }
+
+ // check for timeout
+ if (System.currentTimeMillis() > endTime) {
+ throw new TimeoutException("Timed out.");
+ }
+ }
+
+ // build aggregated packet (copy first packet)
+ Data data = new Data(segments.get(0).get());
+ data.setName(getName());
+ data.setContent(new Blob(content.toByteArray()));
+ return data;
+ }
+}