Upgrade SegmentedClient to be fully asynchronous (no forced blocking for first packet)
diff --git a/src/main/java/com/intel/jndn/utils/SegmentedClient.java b/src/main/java/com/intel/jndn/utils/SegmentedClient.java
index d9deb3e..9c9d61f 100644
--- a/src/main/java/com/intel/jndn/utils/SegmentedClient.java
+++ b/src/main/java/com/intel/jndn/utils/SegmentedClient.java
@@ -16,16 +16,17 @@
import com.intel.jndn.utils.client.FutureData;
import com.intel.jndn.utils.client.SegmentedFutureData;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
import net.named_data.jndn.encoding.EncodingException;
/**
@@ -57,11 +58,12 @@
}
/**
- * Asynchronously send Interest packets for a segmented result; will block
- * until the first packet is received and then send remaining interests until
- * the specified FinalBlockId.
+ * Asynchronously send Interest packets for a segmented result; will not
+ * block, but will wait for the first packet to return before sending
+ * remaining interests until using the specified FinalBlockId. Will retrieve
+ * non-segmented packets as well.
*
- * @param face
+ * @param face the {@link Face} on which to retrieve the packets
* @param interest should include either a ChildSelector or an initial segment
* number; the initial segment number will be cut off in the de-segmented
* packet.
@@ -69,19 +71,95 @@
* will contain one FutureData with the failure exception
*/
@Override
- public Future<Data> getAsync(Face face, Interest interest) {
- List<Future<Data>> segments = getAsyncList(face, interest);
- return new SegmentedFutureData(interest.getName(), segments);
+ public Future<Data> getAsync(final Face face, Interest interest) {
+ final long firstSegmentId = parseFirstSegmentId(interest);
+ final SegmentedFutureData segmentedData = new SegmentedFutureData(face, interest.getName());
+ final FutureData firstData = new FutureData(face, interest.getName());
+ segmentedData.add(0, firstData);
+
+ // send first interest
+ logger.log(Level.FINER, "Sending first interest for: " + interest.getName().toUri());
+ try {
+ face.expressInterest(interest, new OnData() {
+ @Override
+ public void onData(Interest interest, Data data) {
+ firstData.resolve(data);
+ // now request subsequent segments using FinalBlockId and the Interest template
+ try {
+ long lastSegmentId = parseLastSegmentId(data);
+ Interest template = new Interest(interest);
+ template.setName(removeSegment(data.getName()));
+ requestRemainingSegments(face, segmentedData, template, firstSegmentId + 1, lastSegmentId);
+ } catch (EncodingException ex) {
+ Logger.getLogger(SegmentedClient.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ }, new OnTimeout() {
+ @Override
+ public void onTimeout(Interest interest) {
+ segmentedData.reject(new TimeoutException());
+ }
+ });
+ } catch (IOException e) {
+ logger.log(Level.FINE, "IO failure while sending interest: ", e);
+ segmentedData.reject(e);
+ }
+
+ return segmentedData;
}
/**
- * Asynchronously send Interest packets for a segmented result; will block
- * until the first packet is received and then send remaining interests until
- * the specified FinalBlockId.
+ * @param interest the request {@link Interest}
+ * @return the first segment the interest is requesting, or 0 if none found
+ */
+ private long parseFirstSegmentId(Interest interest) {
+ try {
+ return interest.getName().get(-1).toSegment();
+ } catch (EncodingException e) {
+ if (interest.getChildSelector() == -1) {
+ logger.log(Level.WARNING, "No child selector set for a segmented Interest; this may result in incorrect retrieval.");
+ // allow this interest to pass without a segment marker since it may still succeed
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * @param firstData the first returned {@link Data}
+ * @return the last segment number available as specified in the FinalBlockId
+ * @throws EncodingException
+ */
+ private long parseLastSegmentId(Data firstData) throws EncodingException {
+ return firstData.getMetaInfo().getFinalBlockId().toSegment();
+ }
+
+ /**
+ * Send interests for remaining segments; adding them to the segmented future
*
* @param face
+ * @param segmentedData
+ * @param template
+ * @param fromSegment
+ * @param toSegment
+ */
+ private void requestRemainingSegments(Face face, SegmentedFutureData segmentedData, Interest template, long fromSegment, long toSegment) {
+ // send interests in remaining segments
+ for (long i = fromSegment; i <= toSegment; i++) {
+ Interest segmentedInterest = new Interest(template);
+ segmentedInterest.getName().appendSegment(i);
+ Future<Data> futureData = SimpleClient.getDefault().getAsync(face, segmentedInterest);
+ segmentedData.add((int) i, futureData);
+ }
+ }
+
+ /**
+ * Asynchronously send Interest packets for a segmented result; see {@link #getAsync(net.named_data.jndn.Face, net.named_data.jndn.Interest)
+ * } for more information.
+ *
+ * @param face the {@link Face} on which to retrieve the packets
* @param name the {@link Name} of the packet to retrieve using a default
- * interest
+ * interest; may optionally end with the segment component of the first
+ * segment to retrieve
* @return an aggregated data packet from all received segments
*/
public Future<Data> getAsync(Face face, Name name) {
@@ -89,85 +167,10 @@
}
/**
- * Asynchronously send Interest packets for a segmented result; will block
- * until the first packet is received and then send remaining interests until
- * the specified FinalBlockId.
- *
- * @param face
- * @param interest should include either a ChildSelector or an initial segment
- * number
- * @return a list of FutureData packets; if the first segment fails, the list
- * will contain one FutureData with the failure exception
- */
- public List<Future<Data>> getAsyncList(Face face, Interest interest) {
- // get first segment; default 0 or use a specified start segment
- long firstSegment = 0;
- try {
- firstSegment = interest.getName().get(-1).toSegment();
- } catch (EncodingException e) {
- // check for interest selector if no initial segment found
- if (interest.getChildSelector() == -1) {
- logger.log(Level.WARNING, "No child selector set for a segmented Interest; this may result in incorrect retrieval.");
- // allow this interest to pass without a segment marker since it may still succeed
- }
- }
-
- // setup segments
- final List<Future<Data>> segments = new ArrayList<>();
- segments.add(SimpleClient.getDefault().getAsync(face, interest));
-
- // retrieve first packet and find the FinalBlockId
- Data firstData;
- long lastSegment;
- try {
- firstData = segments.get(0).get();
- lastSegment = firstData.getMetaInfo().getFinalBlockId().toSegment();
- } catch (ExecutionException | InterruptedException e) {
- logger.log(Level.FINE, "Failed to retrieve first segment: " + interest.toUri(), e);
- ((FutureData) segments.get(0)).reject(e); // TODO implies knowledge of underlying data structure
- return segments;
- } catch (EncodingException e) {
- logger.log(Level.FINER, "No FinalBlockId, returning first packet: " + interest.toUri());
- return segments;
- }
-
- // set follow-on segment names to match first segment retrieve
- Interest interestCopy = new Interest(interest);
- if (hasSegment(firstData.getName())) {
- interestCopy.setName(firstData.getName().getPrefix(-1)); // cut last segment number if present
- } else {
- logger.log(Level.FINER, "First packet retrieved does not have a segment number, continuing on: " + firstData.getName().toUri());
- interestCopy.setName(firstData.getName());
- }
-
- // send interests in remaining segments
- for (long i = firstSegment + 1; i <= lastSegment; i++) {
- Interest segmentedInterest = new Interest(interestCopy);
- segmentedInterest.getName().appendSegment(i);
- Future<Data> futureData = SimpleClient.getDefault().getAsync(face, segmentedInterest);
- segments.add((int) i, futureData);
- }
-
- return segments;
- }
-
- /**
- * Asynchronously send Interests for a segmented Data packet using a default
- * interest (e.g. 2 second timeout); this will block until complete (i.e.
- * either data is received or the interest times out). See getAsync(Face face)
- * for more information.
- *
- * @param face
- * @param name
- * @return
- */
- public List<Future<Data>> getAsyncList(Face face, Name name) {
- return getAsyncList(face, SimpleClient.getDefaultInterest(name));
- }
-
- /**
- * Retrieve a segmented Data packet; will block until all segments are
- * received and will re-assemble these.
+ * Retrieve a segmented Data packet; see {@link #getAsync(net.named_data.jndn.Face, net.named_data.jndn.Interest)
+ * } for more information. This method will block and call
+ * {@link Face#processEvents()} until the sent interests timeout or the
+ * corresponding data packets are retrieved.
*
* @param face
* @param interest should include either a ChildSelector or an initial segment
@@ -190,9 +193,9 @@
/**
* Synchronously retrieve the Data for a Name using a default interest (e.g. 2
- * second timeout); this will block until complete (i.e. either data is
- * received or the interest times out). See getSync(Face face) for more
- * information.
+ * second timeout). This method will block and call
+ * {@link Face#processEvents()} until the sent interests timeout or the
+ * corresponding data packets are retrieved.
*
* @param face
* @param name
@@ -214,4 +217,12 @@
public static boolean hasSegment(Name name) {
return name.get(-1).getValue().buf().get(0) == 0x00;
}
+
+ /**
+ * @param name the {@link Name} to check
+ * @return a new instance of {@link Name} with no segment component appended
+ */
+ public static Name removeSegment(Name name) {
+ return hasSegment(name) ? name.getPrefix(-1) : new Name(name);
+ }
}
diff --git a/src/main/java/com/intel/jndn/utils/client/FutureData.java b/src/main/java/com/intel/jndn/utils/client/FutureData.java
index 55c001e..663db81 100644
--- a/src/main/java/com/intel/jndn/utils/client/FutureData.java
+++ b/src/main/java/com/intel/jndn/utils/client/FutureData.java
@@ -13,15 +13,10 @@
*/
package com.intel.jndn.utils.client;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Name;
-import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.OnData;
/**
* Reference to a Packet that has yet to be returned from the network. Usage:
@@ -38,160 +33,51 @@
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class FutureData implements Future<Data> {
+public class FutureData extends FutureDataBase {
- protected final Face face;
- private final Name name;
private Data data;
- private boolean cancelled = false;
- private Throwable error;
/**
* Constructor
*
- * @param face
- * @param name
+ * @param face the {@link Face} to use for processing events
+ * @param name the {@link Name} of the interest sent
*/
public FutureData(Face face, Name name) {
- this.face = face;
- this.name = new Name(name);
+ super(face, name);
}
/**
- * Get the Interest name.
- *
- * @return
- */
- public Name getName() {
- return name;
- }
-
- /**
- * Cancel the current request.
- *
- * @param mayInterruptIfRunning
- * @return
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- cancelled = true;
- return cancelled;
- }
-
- /**
- * Determine if this request is cancelled.
- *
- * @return
- */
- @Override
- public boolean isCancelled() {
- return cancelled;
- }
-
- /**
- * Determine if the request has completed (successfully or not).
- *
- * @return
+ * @return true if the request has completed (successfully or not)
*/
@Override
public boolean isDone() {
- return data != null || error != null || isCancelled();
+ return isResolved() || isRejected() || isCancelled();
}
/**
- * Set the packet when successfully retrieved; unblocks get().
+ * Set the packet when successfully retrieved; unblocks {@link #get()}. Use
+ * this method inside an {@link OnData} callback to resolve this future.
*
- * @param d
+ * @param returnedData the {@link Data} returned from the network
*/
- public void resolve(Data d) {
- data = d;
+ public void resolve(Data returnedData) {
+ data = returnedData;
}
/**
- * Set the exception when request failed; unblocks get().
- *
- * @param e
+ * @return true if the {@link Data} has returned and been resolved with
+ * {@link #resolve(net.named_data.jndn.Data)}.
*/
- public void reject(Throwable e) {
- error = e;
+ public boolean isResolved() {
+ return data != null;
}
/**
- * Block until packet is retrieved.
- *
- * @return
- * @throws InterruptedException
- * @throws ExecutionException
+ * {@inheritDoc}
*/
@Override
- public Data get() throws InterruptedException, ExecutionException {
- while (!isDone()) {
- // process face events
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (EncodingException | IOException e) {
- throw new ExecutionException("Failed to retrieve packet while processing events: " + name.toUri(), e);
- }
- }
-
- // case: cancelled
- if (cancelled) {
- throw new InterruptedException("Interrupted by user while retrieving packet: " + name.toUri());
- }
-
- // case: error
- if (error != null) {
- throw new ExecutionException("Error while retrieving packet: " + name.toUri(), error);
- }
-
- return data;
- }
-
- /**
- * Block until packet is retrieved or timeout is reached.
- *
- * @param timeout
- * @param unit
- * @return
- * @throws InterruptedException
- * @throws ExecutionException
- * @throws TimeoutException
- */
- @Override
- public Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- long interval = TimeUnit.MILLISECONDS.convert(timeout, unit);
- long endTime = System.currentTimeMillis() + interval;
- long currentTime = System.currentTimeMillis();
- while (!isDone() && !isCancelled() && currentTime <= endTime) {
- // process face events
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (EncodingException | IOException e) {
- throw new ExecutionException("Failed to retrieve packet while processing events: " + name.toUri(), e);
- }
-
- currentTime = System.currentTimeMillis();
- }
-
- // case: cancelled
- if (cancelled) {
- throw new InterruptedException("Interrupted by user while retrieving packet: " + name.toUri());
- }
-
- // case: error
- if (error != null) {
- throw new ExecutionException("Error while retrieving packet: " + name.toUri(), error);
- }
-
- // case: timed out
- if (currentTime > endTime) {
- throw new TimeoutException("Timed out while retrieving packet: " + name.toUri());
- }
-
+ public Data getData() {
return data;
}
}
diff --git a/src/main/java/com/intel/jndn/utils/client/FutureDataBase.java b/src/main/java/com/intel/jndn/utils/client/FutureDataBase.java
new file mode 100644
index 0000000..a398cb3
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/FutureDataBase.java
@@ -0,0 +1,184 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnTimeout;
+import net.named_data.jndn.encoding.EncodingException;
+
+/**
+ * Implement common functionality for re-use in NDN futures
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public abstract class FutureDataBase implements Future<Data> {
+
+ private final Face face;
+ private final Name name;
+ private boolean cancelled = false;
+ private Throwable error;
+
+ /**
+ * Constructor
+ *
+ * @param face the {@link Face} to use for processing events
+ * @param name the {@link Name} of the interest sent
+ */
+ public FutureDataBase(Face face, Name name) {
+ this.face = face;
+ this.name = new Name(name);
+ }
+
+ /**
+ * Block until packet is retrieved; will call face.processEvents() until the
+ * future is resolved or rejected.
+ *
+ * @return the {@link Data} when the packet is retrieved
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Override
+ public final Data get() throws InterruptedException, ExecutionException {
+ while (!isDone()) {
+ // process face events
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (EncodingException | IOException e) {
+ throw new ExecutionException("Failed to retrieve packet while processing events: " + name.toUri(), e);
+ }
+ }
+
+ // case: cancelled
+ if (isCancelled()) {
+ throw new InterruptedException("Interrupted by user while retrieving packet: " + name.toUri());
+ }
+
+ // case: error
+ if (isRejected()) {
+ throw new ExecutionException("Error while retrieving packet: " + name.toUri(), error);
+ }
+
+ return getData();
+ }
+
+ /**
+ * Block until packet is retrieved or timeout is reached.
+ *
+ * @param timeout
+ * @param unit
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ @Override
+ public final Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long interval = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ long endTime = System.currentTimeMillis() + interval;
+ long currentTime = System.currentTimeMillis();
+ while (!isDone() && currentTime <= endTime) {
+ // process face events
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (EncodingException | IOException e) {
+ throw new ExecutionException("Failed to retrieve packet while processing events: " + name.toUri(), e);
+ }
+
+ currentTime = System.currentTimeMillis();
+ }
+
+ // case: cancelled
+ if (isCancelled()) {
+ throw new InterruptedException("Interrupted by user while retrieving packet: " + name.toUri());
+ }
+
+ // case: error
+ if (isRejected()) {
+ throw new ExecutionException("Error while retrieving packet: " + name.toUri(), error);
+ }
+
+ // case: timed out
+ if (currentTime > endTime) {
+ throw new TimeoutException("Timed out while retrieving packet: " + name.toUri());
+ }
+
+ return getData();
+ }
+
+ /**
+ * @return true if the request has completed (successfully or not)
+ */
+ @Override
+ public boolean isDone() {
+ return isRejected() || isCancelled();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean cancel(boolean mayInterruptIfRunning) {
+ cancelled = true;
+ return cancelled;
+ }
+
+ /**
+ * @return true if this request is cancelled
+ */
+ @Override
+ public final boolean isCancelled() {
+ return cancelled;
+ }
+
+ /**
+ * @return true if the future has been rejected with an error
+ */
+ public final boolean isRejected() {
+ return error != null;
+ }
+
+ /**
+ * Set the exception when request failed; unblocks {@link #get()}. Use this
+ * method inside an {@link OnTimeout} callback to resolve this future.
+ *
+ * @param failureCause the error that causes this future to fail
+ */
+ public final void reject(Throwable failureCause) {
+ error = failureCause;
+ }
+
+ /**
+ * @return the {@link Data} retrieved by this future
+ */
+ public abstract Data getData() throws InterruptedException, ExecutionException;
+
+ /**
+ * @return the {@link Name} used for this future; this is currently only used
+ * for debugging purposes, not retrieval
+ */
+ public Name getName() {
+ return name;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java b/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java
index 252f72c..4d58ee4 100644
--- a/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java
@@ -16,17 +16,17 @@
import com.intel.jndn.utils.SegmentedClient;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
import net.named_data.jndn.Name;
import net.named_data.jndn.util.Blob;
/**
- * Represents a list of Packets that have been requested asynchronously and have
+ * Represents a list of packets that have been requested asynchronously and have
* yet to be returned from the network. Usage:
*
* <pre><code>
@@ -36,135 +36,106 @@
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class SegmentedFutureData implements Future<Data> {
+public class SegmentedFutureData extends FutureDataBase {
- private final Name interestName;
- List<Future<Data>> segments;
- private boolean cancelled = false;
+ List<Future<Data>> segments = new ArrayList<>();
- /**
- * Constructor
- *
- * @param interestName the {@link Name} of the original interest, for debugging purposes
- * @param segments the list of future segments to retrieve
- */
- public SegmentedFutureData(Name interestName, List<Future<Data>> segments) {
- this.interestName = interestName;
- this.segments = segments;
- }
-
- /**
- * Cancel the current request.
- *
- * @param mayInterruptIfRunning
- * @return
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- cancelled = true;
- return cancelled;
+ public SegmentedFutureData(Face face, Name name) {
+ super(face, name);
}
/**
- * Determine if this request is cancelled.
+ * Add a future data to the list of segments
*
- * @return
+ * @param index the numeric index in the list, see
+ * {@link List#add(int, java.lang.Object)} for more details.
+ * @param futureData the {@link Future} to add
*/
- @Override
- public boolean isCancelled() {
- return cancelled;
+ public void add(int index, Future<Data> futureData) {
+ segments.add(index, futureData);
}
/**
- * Determine if the request has completed (successfully or not).
+ * Add a future data to the end of the list of segments
*
- * @return
+ * @param futureData the {@link Future} to add
+ */
+ public void add(FutureData futureData) {
+ segments.add(futureData);
+ }
+
+ /**
+ * @return true if the request has completed (successfully or not)
*/
@Override
public boolean isDone() {
- // check for errors, cancellation
- if (isCancelled()) {
- return true;
- }
-
- // check each segment for completion
+ return isRejected() || isCancelled() || allSegmentsDone();
+ }
+
+ /**
+ * @return true if all segments are done
+ */
+ private boolean allSegmentsDone(){
for (Future<Data> futureData : segments) {
if (!futureData.isDone()) {
return false;
}
}
-
return true;
}
/**
- * Block until packet is retrieved.
- *
- * @return
- * @throws InterruptedException
- * @throws ExecutionException
+ * {@inheritDoc}
*/
@Override
- public Data get() throws InterruptedException, ExecutionException {
+ public Data getData() throws ExecutionException, InterruptedException {
+ byte[] content = aggregateBytes();
+ Data data = buildAggregatePacket();
+ data.setContent(new Blob(content));
+ return data;
+ }
+
+ /**
+ * @return the array of aggregated bytes for all of the segments retrieved
+ * @throws ExecutionException
+ */
+ private byte[] aggregateBytes() throws ExecutionException {
// aggregate bytes
ByteArrayOutputStream content = new ByteArrayOutputStream();
for (Future<Data> futureData : segments) {
try {
content.write(futureData.get().getContent().getImmutableArray());
} catch (ExecutionException | IOException | InterruptedException e) {
- throw new ExecutionException("Failed while aggregating retrieved packets: " + interestName.toUri(), e);
+ throw new ExecutionException("Failed while aggregating retrieved packets: " + getName().toUri(), e);
}
}
-
- // build aggregated packet (copy first packet)
- Data firstData = segments.get(0).get();
- Data data = new Data(firstData);
- data.setName(getNameFromFirstData(firstData));
- data.setContent(new Blob(content.toByteArray()));
- return data;
+ return content.toByteArray();
}
/**
- * Block until packet is retrieved or timeout is reached.
- *
- * @param timeout
- * @param unit
- * @return
+ * @return an aggregated {@link Data} packet with no content set
* @throws InterruptedException
* @throws ExecutionException
- * @throws TimeoutException
*/
- @Override
- public Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- long interval = TimeUnit.MILLISECONDS.convert(timeout, unit);
- long endTime = System.currentTimeMillis() + interval;
-
- // aggregate bytes
- ByteArrayOutputStream content = new ByteArrayOutputStream();
- for (Future<Data> futureData : segments) {
- try {
- content.write(futureData.get(timeout, unit).getContent().getImmutableArray());
- } catch (ExecutionException | IOException | InterruptedException e) {
- throw new ExecutionException("Failed while aggregating retrieved packets: " + interestName.toUri(), e);
- }
-
- // check for timeout
- if (System.currentTimeMillis() > endTime) {
- throw new TimeoutException("Timed out while retrieving packets: " + interestName.toUri());
- }
+ private Data buildAggregatePacket() throws InterruptedException, ExecutionException {
+ if (segments.isEmpty()) {
+ throw new IllegalStateException("Unable to aggregate packets; no segments added with SegmentedFutureData.add().");
}
-
- // build aggregated packet (copy first packet)
Data firstData = segments.get(0).get();
- Data data = new Data(firstData);
- data.setName(getNameFromFirstData(firstData));
- data.setContent(new Blob(content.toByteArray()));
- return data;
+ Data aggregatedData = new Data(firstData);
+ aggregatedData.setName(parseName(firstData));
+ return aggregatedData;
}
-
- private Name getNameFromFirstData(Data firstPacket) throws InterruptedException, ExecutionException{
- Name firstPacketName = segments.get(0).get().getName();
- if(SegmentedClient.hasSegment(firstPacketName)){
+
+ /**
+ * @return parse the name of the segmented packet from the first packet; this
+ * will remove the segment number if it is the last name component
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ private Name parseName(Data data) throws InterruptedException, ExecutionException {
+ Name firstPacketName = data.getName();
+ if (SegmentedClient.hasSegment(firstPacketName)) {
firstPacketName = firstPacketName.getPrefix(-1);
}
return firstPacketName;