Move to Java 8, with CompletableFutures, etc.
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 6b09f9f..711d24c 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -14,10 +14,11 @@
package com.intel.jndn.utils;
import java.io.IOException;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
/**
* Base functionality provided by all NDN clients in this package.
@@ -28,23 +29,53 @@
/**
* Asynchronously request the Data for an Interest. This will send the
- * Interest and return immediately; use futureData.get() to block until the
- * Data returns (see Future) or manage the event processing independently.
+ * Interest and return immediately; with this method, the user is responsible
+ * for calling {@link Face#processEvents()} in order for the
+ * {@link CompletableFuture} to complete.
*
- * @param face
- * @param interest
- * @return
+ * @param face the {@link Face} on which to make the request; call
+ * {@link Face#processEvents()} separately to complete the request
+ * @param interest the {@link Interest} to send over the network
+ * @return a future {@link Data} packet
*/
- public Future<Data> getAsync(Face face, Interest interest);
+ public CompletableFuture<Data> getAsync(Face face, Interest interest);
/**
- * Synchronously retrieve the Data for an Interest; this will block until
- * complete (i.e. either data is received or the interest times out).
+ * Convenience method for calling
+ * {@link #getAsync(net.named_data.jndn.Face, net.named_data.jndn.Interest)}
+ * with a default {@link Interest} packet.
*
- * @param face
- * @param interest
- * @return
- * @throws java.io.IOException
+ * @param face the {@link Face} on which to make the request; call
+ * {@link Face#processEvents()} separately to complete the request
+ * @param name the {@link Name} to wrap inside a default {@link Interest}
+ * @return a future {@link Data} packet
+ */
+ public CompletableFuture<Data> getAsync(Face face, Name name);
+
+ /**
+ * Synchronously retrieve the {@link Data} for an {@link Interest}; this will
+ * block until complete (i.e. either the data is received or the interest
+ * times out).
+ *
+ * @param face the {@link Face} on which to make the request; this method will
+ * call {@link Face#processEvents()} at a configurable interval until complete
+ * or timeout
+ * @param interest the {@link Interest} to send over the network
+ * @return a {@link Data} packet
+ * @throws java.io.IOException if the request fails
*/
public Data getSync(Face face, Interest interest) throws IOException;
+
+ /**
+ * Convenience method for calling
+ * {@link #getSync(net.named_data.jndn.Face, net.named_data.jndn.Interest)}
+ * with a default {@link Interest} packet.
+ *
+ * @param face the {@link Face} on which to make the request; call
+ * {@link Face#processEvents()} separately to complete the request
+ * @param name the {@link Name} to wrap inside a default {@link Interest}
+ * @return a {@link Data} packet
+ * @throws java.io.IOException if the request fails
+ */
+ public Data getSync(Face face, Name name) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/InternalFace.java b/src/main/java/com/intel/jndn/utils/InternalFace.java
deleted file mode 100644
index 2850363..0000000
--- a/src/main/java/com/intel/jndn/utils/InternalFace.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils;
-
-/**
- * TODO waiting on Face to become overridable
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class InternalFace {
-
-}
diff --git a/src/main/java/com/intel/jndn/utils/SegmentedClient.java b/src/main/java/com/intel/jndn/utils/SegmentedClient.java
index 28ac66b..d076906 100644
--- a/src/main/java/com/intel/jndn/utils/SegmentedClient.java
+++ b/src/main/java/com/intel/jndn/utils/SegmentedClient.java
@@ -13,20 +13,18 @@
*/
package com.intel.jndn.utils;
-import com.intel.jndn.utils.client.FutureData;
-import com.intel.jndn.utils.client.SegmentedFutureData;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
+import com.intel.jndn.utils.client.SegmentedDataReassembler;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
-import net.named_data.jndn.OnData;
-import net.named_data.jndn.OnTimeout;
import net.named_data.jndn.encoding.EncodingException;
/**
@@ -40,7 +38,7 @@
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class SegmentedClient implements Client {
+public class SegmentedClient extends SimpleClient {
private static SegmentedClient defaultInstance;
private static final Logger logger = Logger.getLogger(SegmentedClient.class.getName());
@@ -58,6 +56,23 @@
}
/**
+ * See {@link SimpleClient#SimpleClient(long, long)}
+ *
+ * @param sleepTime
+ * @param interestLifetime
+ */
+ public SegmentedClient(long sleepTime, long interestLifetime) {
+ super(sleepTime, interestLifetime);
+ }
+
+ /**
+ * Build a client with default parameters
+ */
+ public SegmentedClient() {
+ super();
+ }
+
+ /**
* Asynchronously send Interest packets for a segmented result; will not
* block, but will wait for the first packet to return before sending
* remaining interests until using the specified FinalBlockId. Will retrieve
@@ -71,43 +86,29 @@
* will contain one FutureData with the failure exception
*/
@Override
- public Future<Data> getAsync(final Face face, Interest interest) {
+ public CompletableFuture<Data> getAsync(final Face face, Interest interest) {
final long firstSegmentId = parseFirstSegmentId(interest);
- final SegmentedFutureData segmentedData = new SegmentedFutureData(face, interest.getName());
- final FutureData firstData = new FutureData(face, interest.getName());
- segmentedData.add(0, firstData);
- // send first interest
- logger.log(Level.FINER, "Sending first interest for: " + interest.getName().toUri());
- try {
- face.expressInterest(interest, new OnData() {
- @Override
- public void onData(Interest interest, Data data) {
+ logger.log(Level.FINER, "Requesting segmented data packets: " + interest.getName().toUri());
+ CompletableFuture<Data> allData = super.getAsync(face, interest).thenCompose(new Function<Data, CompletionStage<Data>>() {
+ @Override
+ public CompletionStage<Data> apply(Data firstSegment) {
+ try {
+ logger.log(Level.FINER, "Received first data packet: " + interest.getName().toUri());
+ long lastSegmentId = parseLastSegmentId(firstSegment);
+ Interest template = new Interest(interest);
+ template.setName(removeSegment(firstSegment.getName()));
+
// request subsequent segments using FinalBlockId and the Interest template
- try {
- long lastSegmentId = parseLastSegmentId(data);
- Interest template = new Interest(interest);
- template.setName(removeSegment(data.getName()));
- requestRemainingSegments(face, segmentedData, template, firstSegmentId + 1, lastSegmentId);
- } catch (EncodingException ex) {
- logger.log(Level.FINER, "No segment ID found in FinalBlockId, assuming first packet is only packet.");
- }
-
- // resolve the first data
- firstData.resolve(data);
+ return requestRemainingSegments(face, template, firstSegment, firstSegmentId + 1, lastSegmentId);
+ } catch (EncodingException ex) {
+ logger.log(Level.FINER, "No segment ID found in FinalBlockId, assuming first packet is only packet.");
+ return CompletableFuture.completedFuture(firstSegment);
}
- }, new OnTimeout() {
- @Override
- public void onTimeout(Interest interest) {
- segmentedData.reject(new TimeoutException());
- }
- });
- } catch (IOException e) {
- logger.log(Level.FINE, "IO failure while sending interest: ", e);
- segmentedData.reject(e);
- }
+ }
+ });
- return segmentedData;
+ return allData;
}
/**
@@ -144,68 +145,18 @@
* @param fromSegment
* @param toSegment
*/
- private void requestRemainingSegments(Face face, SegmentedFutureData segmentedData, Interest template, long fromSegment, long toSegment) {
- // send interests in remaining segments
+ private CompletableFuture<Data> requestRemainingSegments(Face face, Interest template, Data firstSegment, long fromSegment, long toSegment) {
+ List<CompletableFuture<Data>> segments = new ArrayList<>();
+ segments.add(CompletableFuture.completedFuture(firstSegment));
+
for (long i = fromSegment; i <= toSegment; i++) {
Interest segmentedInterest = new Interest(template);
segmentedInterest.getName().appendSegment(i);
- Future<Data> futureData = SimpleClient.getDefault().getAsync(face, segmentedInterest);
- segmentedData.add((int) i, futureData);
+ CompletableFuture<Data> futureData = super.getAsync(face, segmentedInterest);
+ segments.add(futureData);
}
- }
- /**
- * Asynchronously send Interest packets for a segmented result; see {@link #getAsync(net.named_data.jndn.Face, net.named_data.jndn.Interest)
- * } for more information.
- *
- * @param face the {@link Face} on which to retrieve the packets
- * @param name the {@link Name} of the packet to retrieve using a default
- * interest; may optionally end with the segment component of the first
- * segment to retrieve
- * @return an aggregated data packet from all received segments
- */
- public Future<Data> getAsync(Face face, Name name) {
- return getAsync(face, SimpleClient.getDefaultInterest(name));
- }
-
- /**
- * Retrieve a segmented Data packet; see {@link #getAsync(net.named_data.jndn.Face, net.named_data.jndn.Interest)
- * } for more information. This method will block and call
- * {@link Face#processEvents()} until the sent interests timeout or the
- * corresponding data packets are retrieved.
- *
- * @param face
- * @param interest should include either a ChildSelector or an initial segment
- * number
- * @return a Data packet; the name will inherit from the sent Interest, not
- * the returned packets and the content will be a concatenation of all of the
- * packet contents.
- * @throws java.io.IOException
- */
- @Override
- public Data getSync(Face face, Interest interest) throws IOException {
- try {
- return getAsync(face, interest).get();
- } catch (ExecutionException | InterruptedException e) {
- String message = "Failed to retrieve data: " + interest.toUri();
- logger.log(Level.FINE, message, e);
- throw new IOException(message, e);
- }
- }
-
- /**
- * Synchronously retrieve the Data for a Name using a default interest (e.g. 2
- * second timeout). This method will block and call
- * {@link Face#processEvents()} until the sent interests timeout or the
- * corresponding data packets are retrieved.
- *
- * @param face
- * @param name
- * @return
- * @throws java.io.IOException
- */
- public Data getSync(Face face, Name name) throws IOException {
- return getSync(face, SimpleClient.getDefaultInterest(name));
+ return new SegmentedDataReassembler(template.getName(), segments).reassemble();
}
/**
@@ -213,8 +164,8 @@
* NDN naming conventions (see
* http://named-data.net/doc/tech-memos/naming-conventions.pdf).
*
- * @param name
- * @return
+ * @param name the {@link Name} to check
+ * @return true if the {@link Name} ends in a segment component
*/
public static boolean hasSegment(Name name) {
return name.get(-1).getValue().buf().get(0) == 0x00;
diff --git a/src/main/java/com/intel/jndn/utils/SimpleClient.java b/src/main/java/com/intel/jndn/utils/SimpleClient.java
index 7c2bf98..1bf8519 100644
--- a/src/main/java/com/intel/jndn/utils/SimpleClient.java
+++ b/src/main/java/com/intel/jndn/utils/SimpleClient.java
@@ -13,10 +13,9 @@
*/
package com.intel.jndn.utils;
-import com.intel.jndn.utils.client.FutureData;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import net.named_data.jndn.Data;
@@ -26,6 +25,7 @@
import net.named_data.jndn.OnData;
import net.named_data.jndn.OnTimeout;
import java.util.logging.Logger;
+import net.named_data.jndn.encoding.EncodingException;
/**
* Provide a client to simplify information retrieval over the NDN network.
@@ -38,11 +38,13 @@
public static final long DEFAULT_TIMEOUT = 2000;
private static final Logger logger = Logger.getLogger(SimpleClient.class.getName());
private static SimpleClient defaultInstance;
+ private final long sleepTime;
+ private final long interestLifetime;
/**
* Singleton access for simpler client use
*
- * @return
+ * @return a default client
*/
public static SimpleClient getDefault() {
if (defaultInstance == null) {
@@ -52,17 +54,32 @@
}
/**
- * Asynchronously request the Data for an Interest. This will send the
- * Interest and return immediately; use futureData.get() to block until the
- * Data returns (see FutureData) or manage the event processing independently.
+ * Build a simple client
*
- * @param face
- * @param interest
- * @return
+ * @param sleepTime for synchronous processing, the time to sleep the thread
+ * between {@link Face#processEvents()}
+ * @param interestLifetime the {@link Interest} lifetime for default
+ * Interests; see
+ * {@link #getAsync(net.named_data.jndn.Face, net.named_data.jndn.Name)}
+ */
+ public SimpleClient(long sleepTime, long interestLifetime) {
+ this.sleepTime = sleepTime;
+ this.interestLifetime = interestLifetime;
+ }
+
+ /**
+ * Build a simple client using default parameters
+ */
+ public SimpleClient() {
+ this(DEFAULT_SLEEP_TIME, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * {@inheritDoc}
*/
@Override
- public Future<Data> getAsync(Face face, Interest interest) {
- final FutureData futureData = new FutureData(face, interest.getName());
+ public CompletableFuture<Data> getAsync(Face face, Interest interest) {
+ final CompletableFuture futureData = new CompletableFuture<>();
// send interest
logger.log(Level.FINER, "Sending interest for: " + interest.getName().toUri());
@@ -70,64 +87,64 @@
face.expressInterest(interest, new OnData() {
@Override
public void onData(Interest interest, Data data) {
- futureData.resolve(data);
+ futureData.complete(data);
}
}, new OnTimeout() {
@Override
public void onTimeout(Interest interest) {
- futureData.reject(new TimeoutException());
+ String message = interest.getInterestLifetimeMilliseconds() + "ms timeout exceeded";
+ futureData.completeExceptionally(new TimeoutException(message));
}
});
} catch (IOException e) {
logger.log(Level.FINE, "IO failure while sending interest: ", e);
- futureData.reject(e);
+ futureData.completeExceptionally(e);
}
return futureData;
}
/**
- * Synchronously retrieve the Data for a Name using a default interest (e.g. 2
- * second timeout); this will block until complete (i.e. either data is
- * received or the interest times out).
- *
- * @param face
- * @param name
- * @return
+ * {@inheritDoc}
*/
- public Future<Data> getAsync(Face face, Name name) {
+ @Override
+ public CompletableFuture<Data> getAsync(Face face, Name name) {
return getAsync(face, getDefaultInterest(name));
}
/**
- * Synchronously retrieve the Data for an Interest; this will block until
- * complete (i.e. either data is received or the interest times out).
- *
- * @param face
- * @param interest
- * @return Data packet or null
- * @throws java.io.IOException
+ * {@inheritDoc}
*/
@Override
public Data getSync(Face face, Interest interest) throws IOException {
+ CompletableFuture<Data> future = getAsync(face, interest);
try {
- return getAsync(face, interest).get();
- } catch (ExecutionException | InterruptedException e) {
+ // process events until complete
+ while (!future.isDone()) {
+ synchronized (face) {
+ face.processEvents();
+ }
+
+ if (sleepTime > 0) {
+ try{
+ Thread.sleep(sleepTime);
+ }
+ catch(InterruptedException e){
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ return future.get();
+ } catch (InterruptedException | ExecutionException | EncodingException e) {
logger.log(Level.FINE, "Failed to retrieve data.", e);
throw new IOException("Failed to retrieve data.", e);
}
}
/**
- * Synchronously retrieve the Data for a Name using a default interest (e.g. 2
- * second timeout); this will block until complete (i.e. either data is
- * received or the interest times out).
- *
- * @param face
- * @param name
- * @return
- * @throws java.io.IOException
+ * {@inheritDoc}
*/
+ @Override
public Data getSync(Face face, Name name) throws IOException {
return getSync(face, getDefaultInterest(name));
}
@@ -139,8 +156,8 @@
* @param name
* @return
*/
- public static Interest getDefaultInterest(Name name) {
- Interest interest = new Interest(name, DEFAULT_TIMEOUT);
+ public Interest getDefaultInterest(Name name) {
+ Interest interest = new Interest(name, interestLifetime);
return interest;
}
}
diff --git a/src/main/java/com/intel/jndn/utils/client/FutureData.java b/src/main/java/com/intel/jndn/utils/client/FutureData.java
deleted file mode 100644
index 663db81..0000000
--- a/src/main/java/com/intel/jndn/utils/client/FutureData.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.client;
-
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.OnData;
-
-/**
- * Reference to a Packet that has yet to be returned from the network. Usage:
- *
- * <pre><code>
- * FutureData futureData = new FutureData(face, interest.getName());
- * face.expressInterest(interest, new OnData(){
- * ... futureData.resolve(data); ...
- * }, new OnTimeout(){
- * ... futureData.reject(new TimeoutException());
- * });
- * Data resolvedData = futureData.get(); // will block and call face.processEvents() until complete
- * </code></pre>
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class FutureData extends FutureDataBase {
-
- private Data data;
-
- /**
- * Constructor
- *
- * @param face the {@link Face} to use for processing events
- * @param name the {@link Name} of the interest sent
- */
- public FutureData(Face face, Name name) {
- super(face, name);
- }
-
- /**
- * @return true if the request has completed (successfully or not)
- */
- @Override
- public boolean isDone() {
- return isResolved() || isRejected() || isCancelled();
- }
-
- /**
- * Set the packet when successfully retrieved; unblocks {@link #get()}. Use
- * this method inside an {@link OnData} callback to resolve this future.
- *
- * @param returnedData the {@link Data} returned from the network
- */
- public void resolve(Data returnedData) {
- data = returnedData;
- }
-
- /**
- * @return true if the {@link Data} has returned and been resolved with
- * {@link #resolve(net.named_data.jndn.Data)}.
- */
- public boolean isResolved() {
- return data != null;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Data getData() {
- return data;
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/client/FutureDataBase.java b/src/main/java/com/intel/jndn/utils/client/FutureDataBase.java
deleted file mode 100644
index a398cb3..0000000
--- a/src/main/java/com/intel/jndn/utils/client/FutureDataBase.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.client;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.OnTimeout;
-import net.named_data.jndn.encoding.EncodingException;
-
-/**
- * Implement common functionality for re-use in NDN futures
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public abstract class FutureDataBase implements Future<Data> {
-
- private final Face face;
- private final Name name;
- private boolean cancelled = false;
- private Throwable error;
-
- /**
- * Constructor
- *
- * @param face the {@link Face} to use for processing events
- * @param name the {@link Name} of the interest sent
- */
- public FutureDataBase(Face face, Name name) {
- this.face = face;
- this.name = new Name(name);
- }
-
- /**
- * Block until packet is retrieved; will call face.processEvents() until the
- * future is resolved or rejected.
- *
- * @return the {@link Data} when the packet is retrieved
- * @throws InterruptedException
- * @throws ExecutionException
- */
- @Override
- public final Data get() throws InterruptedException, ExecutionException {
- while (!isDone()) {
- // process face events
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (EncodingException | IOException e) {
- throw new ExecutionException("Failed to retrieve packet while processing events: " + name.toUri(), e);
- }
- }
-
- // case: cancelled
- if (isCancelled()) {
- throw new InterruptedException("Interrupted by user while retrieving packet: " + name.toUri());
- }
-
- // case: error
- if (isRejected()) {
- throw new ExecutionException("Error while retrieving packet: " + name.toUri(), error);
- }
-
- return getData();
- }
-
- /**
- * Block until packet is retrieved or timeout is reached.
- *
- * @param timeout
- * @param unit
- * @return
- * @throws InterruptedException
- * @throws ExecutionException
- * @throws TimeoutException
- */
- @Override
- public final Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- long interval = TimeUnit.MILLISECONDS.convert(timeout, unit);
- long endTime = System.currentTimeMillis() + interval;
- long currentTime = System.currentTimeMillis();
- while (!isDone() && currentTime <= endTime) {
- // process face events
- try {
- synchronized (face) {
- face.processEvents();
- }
- } catch (EncodingException | IOException e) {
- throw new ExecutionException("Failed to retrieve packet while processing events: " + name.toUri(), e);
- }
-
- currentTime = System.currentTimeMillis();
- }
-
- // case: cancelled
- if (isCancelled()) {
- throw new InterruptedException("Interrupted by user while retrieving packet: " + name.toUri());
- }
-
- // case: error
- if (isRejected()) {
- throw new ExecutionException("Error while retrieving packet: " + name.toUri(), error);
- }
-
- // case: timed out
- if (currentTime > endTime) {
- throw new TimeoutException("Timed out while retrieving packet: " + name.toUri());
- }
-
- return getData();
- }
-
- /**
- * @return true if the request has completed (successfully or not)
- */
- @Override
- public boolean isDone() {
- return isRejected() || isCancelled();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final boolean cancel(boolean mayInterruptIfRunning) {
- cancelled = true;
- return cancelled;
- }
-
- /**
- * @return true if this request is cancelled
- */
- @Override
- public final boolean isCancelled() {
- return cancelled;
- }
-
- /**
- * @return true if the future has been rejected with an error
- */
- public final boolean isRejected() {
- return error != null;
- }
-
- /**
- * Set the exception when request failed; unblocks {@link #get()}. Use this
- * method inside an {@link OnTimeout} callback to resolve this future.
- *
- * @param failureCause the error that causes this future to fail
- */
- public final void reject(Throwable failureCause) {
- error = failureCause;
- }
-
- /**
- * @return the {@link Data} retrieved by this future
- */
- public abstract Data getData() throws InterruptedException, ExecutionException;
-
- /**
- * @return the {@link Name} used for this future; this is currently only used
- * for debugging purposes, not retrieval
- */
- public Name getName() {
- return name;
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedDataReassembler.java b/src/main/java/com/intel/jndn/utils/client/SegmentedDataReassembler.java
new file mode 100644
index 0000000..0dfa2a7
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentedDataReassembler.java
@@ -0,0 +1,97 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+import com.intel.jndn.utils.SegmentedClient;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+/**
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentedDataReassembler {
+ private static final Logger logger = Logger.getLogger(SegmentedDataReassembler.class.getName());
+ private final Name interestName;
+ private final List<CompletableFuture<Data>> segments;
+
+ public SegmentedDataReassembler(Name interestName, List<CompletableFuture<Data>> segments) {
+ this.interestName = interestName;
+ this.segments = segments;
+ }
+
+ public CompletableFuture<Data> reassemble(){
+ CompletableFuture[] segmentArray = segments.toArray(new CompletableFuture[]{});
+ CompletableFuture<Void> allComplete = CompletableFuture.allOf(segmentArray);
+ return allComplete.thenApply(new Function<Void, Data>() {
+ @Override
+ public Data apply(Void t) {
+ try {
+ logger.finer("Re-assembling data for request: " + interestName.toUri());
+ byte[] content = aggregateBytes();
+ Data data = buildAggregatePacket();
+ data.setContent(new Blob(content));
+ return data;
+ } catch (ExecutionException | InterruptedException ex) {
+ logger.log(Level.FINER, "Failed to re-assemble packet for request: " + interestName.toUri(), ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ }
+
+ /**
+ * @return the array of aggregated bytes for all of the segments retrieved
+ * @throws ExecutionException
+ */
+ private byte[] aggregateBytes() throws ExecutionException {
+ // aggregate bytes
+ ByteArrayOutputStream content = new ByteArrayOutputStream();
+ for (Future<Data> futureData : segments) {
+ try {
+ content.write(futureData.get().getContent().getImmutableArray());
+ } catch (ExecutionException | IOException | InterruptedException e) {
+ throw new ExecutionException("Failed while aggregating retrieved packets: " + interestName.toUri(), e);
+ }
+ }
+ return content.toByteArray();
+ }
+
+ /**
+ * @return an aggregated {@link Data} packet with no content set
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ private Data buildAggregatePacket() throws InterruptedException, ExecutionException {
+ if (segments.isEmpty()) {
+ throw new IllegalStateException("Unable to re-assemble packets; no segments added: " + interestName.toUri());
+ }
+ Data firstData = segments.get(0).get();
+ Data aggregatedData = new Data(firstData);
+ Name shortenedName = SegmentedClient.removeSegment(firstData.getName());
+ aggregatedData.setName(shortenedName);
+ return aggregatedData;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java b/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java
deleted file mode 100644
index 4d58ee4..0000000
--- a/src/main/java/com/intel/jndn/utils/client/SegmentedFutureData.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.client;
-
-import com.intel.jndn.utils.SegmentedClient;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-/**
- * Represents a list of packets that have been requested asynchronously and have
- * yet to be returned from the network. Usage:
- *
- * <pre><code>
- * SegmentedFutureData segmentedFutureData = new SegmentedFutureData(face, name, futureDataList);
- * Data data = segmentedFutureData.get(); // will block until complete
- * </code></pre>
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedFutureData extends FutureDataBase {
-
- List<Future<Data>> segments = new ArrayList<>();
-
- public SegmentedFutureData(Face face, Name name) {
- super(face, name);
- }
-
- /**
- * Add a future data to the list of segments
- *
- * @param index the numeric index in the list, see
- * {@link List#add(int, java.lang.Object)} for more details.
- * @param futureData the {@link Future} to add
- */
- public void add(int index, Future<Data> futureData) {
- segments.add(index, futureData);
- }
-
- /**
- * Add a future data to the end of the list of segments
- *
- * @param futureData the {@link Future} to add
- */
- public void add(FutureData futureData) {
- segments.add(futureData);
- }
-
- /**
- * @return true if the request has completed (successfully or not)
- */
- @Override
- public boolean isDone() {
- return isRejected() || isCancelled() || allSegmentsDone();
- }
-
- /**
- * @return true if all segments are done
- */
- private boolean allSegmentsDone(){
- for (Future<Data> futureData : segments) {
- if (!futureData.isDone()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Data getData() throws ExecutionException, InterruptedException {
- byte[] content = aggregateBytes();
- Data data = buildAggregatePacket();
- data.setContent(new Blob(content));
- return data;
- }
-
- /**
- * @return the array of aggregated bytes for all of the segments retrieved
- * @throws ExecutionException
- */
- private byte[] aggregateBytes() throws ExecutionException {
- // aggregate bytes
- ByteArrayOutputStream content = new ByteArrayOutputStream();
- for (Future<Data> futureData : segments) {
- try {
- content.write(futureData.get().getContent().getImmutableArray());
- } catch (ExecutionException | IOException | InterruptedException e) {
- throw new ExecutionException("Failed while aggregating retrieved packets: " + getName().toUri(), e);
- }
- }
- return content.toByteArray();
- }
-
- /**
- * @return an aggregated {@link Data} packet with no content set
- * @throws InterruptedException
- * @throws ExecutionException
- */
- private Data buildAggregatePacket() throws InterruptedException, ExecutionException {
- if (segments.isEmpty()) {
- throw new IllegalStateException("Unable to aggregate packets; no segments added with SegmentedFutureData.add().");
- }
- Data firstData = segments.get(0).get();
- Data aggregatedData = new Data(firstData);
- aggregatedData.setName(parseName(firstData));
- return aggregatedData;
- }
-
- /**
- * @return parse the name of the segmented packet from the first packet; this
- * will remove the segment number if it is the last name component
- * @throws InterruptedException
- * @throws ExecutionException
- */
- private Name parseName(Data data) throws InterruptedException, ExecutionException {
- Name firstPacketName = data.getName();
- if (SegmentedClient.hasSegment(firstPacketName)) {
- firstPacketName = firstPacketName.getPrefix(-1);
- }
- return firstPacketName;
- }
-}