Re-factor and upgrade to support Java 8 CompletableFutures as well as AdvancedClient capabilities
The squashed commits include:
Move clients to separate package; fix SimpleClient integration test
Move common test functionality to TestHelper
Add SegmentedClient integration test
Reduce logging messages; add documentation
Bump version; remove unnecessary logging configuration
doclint turns javadoc warnings into errors; ignore for now
see http://blog.joda.org/2014/02/turning-off-doclint-in-jdk-8-javadoc.html
Add snapshot repository for deploying snapshots
Bump major version due to platform change (Java 7 to Java 8)
Add sync retrieval integration test
Re-factor locations of classes: all interfaces in top level folder, change name of PipelineStage to ProcessingStage
In progress commit, tests broken
Refactor, WIP
Tweak synchronization, processing of segmented packets
Move tests around
Fix TestHelper bug
Complete tests
WIP
Add streaming test
Fix segmented server integration test
Finish refactoring
Update POM and README
Update documentation
diff --git a/README.md b/README.md
index e37853d..eecff9e 100644
--- a/README.md
+++ b/README.md
@@ -12,14 +12,25 @@
</dependency>
```
+## Build
+
+To build, run `mvn install` in the cloned directory. Additionally, you may want
+to run integration tests by running the `nfd-integration-tests` profile with
+a running NFD instance (see [pom.xml](https://github.com/01org/jndn-utils/blob/master/pom.xml) for more details);
+
## Use
-Use `SimpleClient` or `SegmentedClient` to retrieve data from the network. For example:
+
+This library provides `Client`, `Server` and `Repository` interfaces. These
+are implemented in their respective `impl` packages and require Java 8.
+
+Use a `SimpleClient` or `AdvancedClient` (provides segmentation, retries, and streaming)
+ to retrieve data from the network. For example:
```
// retrieve a single Data packet synchronously, will block until complete
-Data singleData = Client.getDefault().getSync(face, name);
+Data singleData = SimpleClient.getDefault().getSync(face, name);
-// retrieve a segmented Data packet (i.e. with a last Component containing a segment number and a valid FinalBlockId) by name
-Data segmentedData = SegmentedClient.getDefault().getSync(face, name);
+// retrieve segmented Data packets (i.e. with a last Component containing a segment number and a valid FinalBlockId) by name
+CompletableFuture<Data> segmentedData = AdvancedClient.getDefault().getAsync(face, name);
```
Use `SimpleServer` or `SegmentedServer` to serve data on the network. For example:
@@ -32,10 +43,19 @@
server.addPipelineStage(new SigningStage(keyChain, signingCertificateName));
```
-For full API, see the [Javadoc](http://01org.github.io/jndn-utils/).
+For the full API, see the [Javadoc](http://01org.github.io/jndn-utils/).
+
+## Logging
+
+`jndn-utils` uses Java's default logging utilities (see http://docs.oracle.com/javase/7/docs/api/java/util/logging/package-summary.html). Most messages are logged with the FINER or FINEST status; one way to change this is to add a `logging.properties` file in the classpath with the following lines:
+```
+handlers=java.util.logging.ConsoleHandler
+.level=FINEST
+java.util.logging.ConsoleHandler.level=FINEST
+```
## License
-Copyright © 2015, Intel Corporation.
+Copyright © 2015, Intel Corporation.
This program is free software; you can redistribute it and/or modify it under the terms and conditions of the GNU Lesser General Public License, version 3, as published by the Free Software Foundation.
diff --git a/nbactions.xml b/nbactions.xml
new file mode 100644
index 0000000..b9f0c40
--- /dev/null
+++ b/nbactions.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<actions>
+ <action>
+ <actionName>rebuild</actionName>
+ <packagings>
+ <packaging>*</packaging>
+ </packagings>
+ <goals>
+ <goal>clean</goal>
+ <goal>install</goal>
+ </goals>
+ <properties>
+ <nfd.ip>ndn-lab2.jf.intel.com</nfd.ip>
+ </properties>
+ </action>
+ <action>
+ <actionName>test</actionName>
+ <packagings>
+ <packaging>*</packaging>
+ </packagings>
+ <goals>
+ <goal>test</goal>
+
+ </goals>
+ <properties>
+ <nfd.ip>ndn-lab2.jf.intel.com</nfd.ip>
+ </properties>
+ </action>
+ <action>
+ <actionName>test.single</actionName>
+ <packagings>
+ <packaging>*</packaging>
+ </packagings>
+ <goals>
+ <goal>test-compile</goal>
+ <goal>surefire:test</goal>
+ </goals>
+ <properties>
+ <test>${packageClassName}</test>
+ <nfd.ip>ndn-lab2.jf.intel.com</nfd.ip>
+ </properties>
+ </action>
+ <action>
+ <actionName>build-with-dependencies</actionName>
+ <reactor>also-make</reactor>
+ <packagings>
+ <packaging>*</packaging>
+ </packagings>
+ <goals>
+ <goal>install</goal>
+ </goals>
+ <properties>
+ <nfd.ip>ndn-lab2.jf.intel.com</nfd.ip>
+ </properties>
+ </action>
+ <action>
+ <actionName>build</actionName>
+ <packagings>
+ <packaging>*</packaging>
+ </packagings>
+ <goals>
+ <goal>install</goal>
+
+ </goals>
+ <properties>
+
+ <nfd.ip>ndn-lab2.jf.intel.com</nfd.ip>
+ </properties>
+ </action>
+ <action>
+ <actionName>debug.test.single</actionName>
+ <packagings>
+ <packaging>*</packaging>
+ </packagings>
+ <goals>
+ <goal>test-compile</goal>
+ <goal>surefire:test</goal>
+ </goals>
+ <properties>
+ <test>${packageClassName}</test>
+ <forkMode>once</forkMode>
+ <nfd.ip>ndn-lab2.jf.intel.com</nfd.ip>
+ <maven.surefire.debug>-Xdebug -Xrunjdwp:transport=dt_socket,server=n,address=${jpda.address}</maven.surefire.debug>
+ <jpda.listen>true</jpda.listen>
+ </properties>
+ </action>
+ <action>
+ <actionName>CUSTOM-Run NFD Integration Tests</actionName>
+ <displayName>Run NFD Integration Tests</displayName>
+ <goals>
+ <goal>verify</goal>
+ </goals>
+ <properties>
+ <nfd.ip>ndn-lab2.jf.intel.com</nfd.ip>
+ </properties>
+ <activatedProfiles>
+ <activatedProfile>nfd-integration-tests</activatedProfile>
+ </activatedProfiles>
+ </action>
+
+ </actions>
diff --git a/pom.xml b/pom.xml
index bf6ff98..5e4b3b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.intel.jndn.utils</groupId>
<artifactId>jndn-utils</artifactId>
- <version>0.10.2</version>
+ <version>1.0.0</version>
<name>jndn-utils</name>
<description>Collection of tools to simplify synchronous and asynchronous data transfer over the NDN network</description>
<url>https://github.com/01org/jndn-utils</url>
@@ -20,6 +20,12 @@
<url>http://github.com/andrewsbrown</url>
</developer>
</developers>
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
<scm>
<url>https://github.com/01org/jndn-utils</url>
<connection>scm:git:git@github.com:01org/jndn-utils.git</connection>
@@ -75,11 +81,13 @@
</plugins>
</build>
<profiles>
+
+ <!-- Use OSSRH-directed plugins (see http://central.sonatype.org/pages/apache-maven.html)
+ for deployment to Maven central; e.g. `mvn deploy -P ossrh`-->
<profile>
<id>ossrh</id>
<build>
<plugins>
- <!-- OSSRH-directed plugins (see http://central.sonatype.org/pages/apache-maven.html) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
@@ -96,7 +104,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
- <version>2.10.1</version>
+ <version>2.10.2</version>
+ <configuration>
+ <additionalparam>-Xdoclint:none</additionalparam>
+ </configuration>
<executions>
<execution>
<id>attach-javadocs</id>
@@ -135,8 +146,40 @@
</plugins>
</build>
</profile>
+
+ <!-- Use failsafe to run integration tests with an NFD instance; e.g.
+ `mvn verify -Pnfd-integration-tests -Dnfd.ip=10.0.0.1; these are not run by default
+ due to the external NFD dependency -->
+ <profile>
+ <id>nfd-integration-tests</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.18.1</version>
+ <executions>
+ <execution>
+ <id>runIntegrationTests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>verifyIntegrationTests</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
- <!-- run `mvn clean javadoc:javadoc scm-publish:publish-scm -P docs` to push
+ <!-- Run `mvn clean javadoc:javadoc scm-publish:publish-scm -P docs` to push
Javadoc to Github -->
<profile>
<id>docs</id>
@@ -146,6 +189,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.2</version>
+ <configuration>
+ <additionalparam>-Xdoclint:none</additionalparam>
+ </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/src/main/java/com/intel/jndn/utils/ProcessingStage.java b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
new file mode 100644
index 0000000..68b2649
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
@@ -0,0 +1,34 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils;
+
+/**
+ * Provide a generic API for processing Interest and Data packets; e.g. a data
+ * processing stage may convert a data packet with unencrypted content to one
+ * with encrypted content.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface ProcessingStage<T, Y> {
+
+ /**
+ * Process the input object.
+ *
+ * @param input the object to be processed
+ * @return a processed object (this may be the same instance as the input or
+ * may be a new object)
+ * @throws Exception if the processing fails
+ */
+ public Y process(T input) throws Exception;
+}
diff --git a/src/main/java/com/intel/jndn/utils/repository/Repository.java b/src/main/java/com/intel/jndn/utils/Repository.java
similarity index 94%
rename from src/main/java/com/intel/jndn/utils/repository/Repository.java
rename to src/main/java/com/intel/jndn/utils/Repository.java
index ab9a127..fcc0172 100644
--- a/src/main/java/com/intel/jndn/utils/repository/Repository.java
+++ b/src/main/java/com/intel/jndn/utils/Repository.java
@@ -11,8 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils;
+import com.intel.jndn.utils.repository.impl.DataNotFoundException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
diff --git a/src/main/java/com/intel/jndn/utils/SegmentedClient.java b/src/main/java/com/intel/jndn/utils/SegmentedClient.java
deleted file mode 100644
index d076906..0000000
--- a/src/main/java/com/intel/jndn/utils/SegmentedClient.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils;
-
-import com.intel.jndn.utils.client.SegmentedDataReassembler;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.encoding.EncodingException;
-
-/**
- * Provide a client to simplify retrieving segmented Data packets over the NDN
- * network. This class expects the Data producer to follow the NDN naming
- * conventions (see http://named-data.net/doc/tech-memos/naming-conventions.pdf)
- * and produce Data packets with a valid segment as the last component of their
- * name; additionally, at least the first packet should set the FinalBlockId of
- * the packet's MetaInfo (see
- * http://named-data.net/doc/ndn-tlv/data.html#finalblockid).
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedClient extends SimpleClient {
-
- private static SegmentedClient defaultInstance;
- private static final Logger logger = Logger.getLogger(SegmentedClient.class.getName());
-
- /**
- * Singleton access for simpler client use.
- *
- * @return
- */
- public static SegmentedClient getDefault() {
- if (defaultInstance == null) {
- defaultInstance = new SegmentedClient();
- }
- return defaultInstance;
- }
-
- /**
- * See {@link SimpleClient#SimpleClient(long, long)}
- *
- * @param sleepTime
- * @param interestLifetime
- */
- public SegmentedClient(long sleepTime, long interestLifetime) {
- super(sleepTime, interestLifetime);
- }
-
- /**
- * Build a client with default parameters
- */
- public SegmentedClient() {
- super();
- }
-
- /**
- * Asynchronously send Interest packets for a segmented result; will not
- * block, but will wait for the first packet to return before sending
- * remaining interests until using the specified FinalBlockId. Will retrieve
- * non-segmented packets as well.
- *
- * @param face the {@link Face} on which to retrieve the packets
- * @param interest should include either a ChildSelector or an initial segment
- * number; the initial segment number will be cut off in the de-segmented
- * packet.
- * @return a list of FutureData packets; if the first segment fails, the list
- * will contain one FutureData with the failure exception
- */
- @Override
- public CompletableFuture<Data> getAsync(final Face face, Interest interest) {
- final long firstSegmentId = parseFirstSegmentId(interest);
-
- logger.log(Level.FINER, "Requesting segmented data packets: " + interest.getName().toUri());
- CompletableFuture<Data> allData = super.getAsync(face, interest).thenCompose(new Function<Data, CompletionStage<Data>>() {
- @Override
- public CompletionStage<Data> apply(Data firstSegment) {
- try {
- logger.log(Level.FINER, "Received first data packet: " + interest.getName().toUri());
- long lastSegmentId = parseLastSegmentId(firstSegment);
- Interest template = new Interest(interest);
- template.setName(removeSegment(firstSegment.getName()));
-
- // request subsequent segments using FinalBlockId and the Interest template
- return requestRemainingSegments(face, template, firstSegment, firstSegmentId + 1, lastSegmentId);
- } catch (EncodingException ex) {
- logger.log(Level.FINER, "No segment ID found in FinalBlockId, assuming first packet is only packet.");
- return CompletableFuture.completedFuture(firstSegment);
- }
- }
- });
-
- return allData;
- }
-
- /**
- * @param interest the request {@link Interest}
- * @return the first segment the interest is requesting, or 0 if none found
- */
- private long parseFirstSegmentId(Interest interest) {
- try {
- return interest.getName().get(-1).toSegment();
- } catch (EncodingException e) {
- if (interest.getChildSelector() == -1) {
- logger.log(Level.WARNING, "No child selector set for a segmented Interest; this may result in incorrect retrieval.");
- // allow this interest to pass without a segment marker since it may still succeed
- }
- return 0;
- }
- }
-
- /**
- * @param firstData the first returned {@link Data}
- * @return the last segment number available as specified in the FinalBlockId
- * @throws EncodingException
- */
- private long parseLastSegmentId(Data firstData) throws EncodingException {
- return firstData.getMetaInfo().getFinalBlockId().toSegment();
- }
-
- /**
- * Send interests for remaining segments; adding them to the segmented future
- *
- * @param face
- * @param segmentedData
- * @param template
- * @param fromSegment
- * @param toSegment
- */
- private CompletableFuture<Data> requestRemainingSegments(Face face, Interest template, Data firstSegment, long fromSegment, long toSegment) {
- List<CompletableFuture<Data>> segments = new ArrayList<>();
- segments.add(CompletableFuture.completedFuture(firstSegment));
-
- for (long i = fromSegment; i <= toSegment; i++) {
- Interest segmentedInterest = new Interest(template);
- segmentedInterest.getName().appendSegment(i);
- CompletableFuture<Data> futureData = super.getAsync(face, segmentedInterest);
- segments.add(futureData);
- }
-
- return new SegmentedDataReassembler(template.getName(), segments).reassemble();
- }
-
- /**
- * Check if a name ends in a segment component; uses marker value found in the
- * NDN naming conventions (see
- * http://named-data.net/doc/tech-memos/naming-conventions.pdf).
- *
- * @param name the {@link Name} to check
- * @return true if the {@link Name} ends in a segment component
- */
- public static boolean hasSegment(Name name) {
- return name.get(-1).getValue().buf().get(0) == 0x00;
- }
-
- /**
- * @param name the {@link Name} to check
- * @return a new instance of {@link Name} with no segment component appended
- */
- public static Name removeSegment(Name name) {
- return hasSegment(name) ? name.getPrefix(-1) : new Name(name);
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/server/Server.java b/src/main/java/com/intel/jndn/utils/Server.java
similarity index 91%
rename from src/main/java/com/intel/jndn/utils/server/Server.java
rename to src/main/java/com/intel/jndn/utils/Server.java
index bcf7f3c..197a52a 100644
--- a/src/main/java/com/intel/jndn/utils/server/Server.java
+++ b/src/main/java/com/intel/jndn/utils/Server.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server;
+package com.intel.jndn.utils;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import net.named_data.jndn.Data;
@@ -47,7 +47,7 @@
* has produced a packet); also, stages should be processed in the order they
* are added.
*
- * @param pipelineStage a Data-to-Data processing stage
+ * @param stage a Data-to-Data processing stage
*/
- public void addPipelineStage(PipelineStage<Data, Data> pipelineStage);
+ public void addPostProcessingStage(ProcessingStage<Data, Data> stage);
}
diff --git a/src/main/java/com/intel/jndn/utils/client/DataStream.java b/src/main/java/com/intel/jndn/utils/client/DataStream.java
new file mode 100644
index 0000000..7f33870
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/DataStream.java
@@ -0,0 +1,75 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+import com.intel.jndn.utils.client.impl.StreamException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+
+/**
+ * Define a stream of {@link Data} packets as they are retrieved from the
+ * network and provide methods for observing stream events
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface DataStream extends OnData, OnTimeout, OnComplete, OnException {
+
+ /**
+ * @return true if the stream is complete
+ */
+ public boolean isComplete();
+
+ /**
+ * @return the current list of packets retrieved; this may change as more
+ * packets are added
+ */
+ public Data[] list();
+
+ /**
+ * @return an assembled packet containing the concatenated bytes of all
+ * received packets
+ * @throws StreamException if assembly fails
+ */
+ public Data assemble() throws StreamException;
+
+ /**
+ * Watch all {@link OnData} events
+ *
+ * @param onData the callback fired
+ */
+ public void observe(OnData onData);
+
+ /**
+ * Watch all {@link OnComplete} events
+ *
+ * @param onComplete the callback fired
+ */
+ public void observe(OnComplete onComplete);
+
+ /**
+ * Watch all {@link OnException} events
+ *
+ * @param onException the callback fired
+ */
+ public void observe(OnException onException);
+
+ /**
+ * Watch all {@link OnTimeout} events
+ *
+ * @param onTimeout the callback fired
+ */
+ public void observe(OnTimeout onTimeout);
+
+}
diff --git a/src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java b/src/main/java/com/intel/jndn/utils/client/OnComplete.java
similarity index 74%
copy from src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java
copy to src/main/java/com/intel/jndn/utils/client/OnComplete.java
index 732cc9c..6cbbe86 100644
--- a/src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java
+++ b/src/main/java/com/intel/jndn/utils/client/OnComplete.java
@@ -11,13 +11,17 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.client;
/**
- * Thrown when a {@link Repository} cannot retrieve a stored packet.
+ * Callback fired when an activity is finished.
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class DataNotFoundException extends Exception {
+public interface OnComplete {
+ /**
+ * Called when an activity is finished
+ */
+ public void onComplete();
}
diff --git a/src/main/java/com/intel/jndn/utils/client/OnException.java b/src/main/java/com/intel/jndn/utils/client/OnException.java
new file mode 100644
index 0000000..d4c440d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/OnException.java
@@ -0,0 +1,29 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+/**
+ * Callback fired when an activity throws an exception; this is necessary because
+ * some actions are performed asynchronously (e.g. in a thread pool) and exceptions
+ * could otherwise be lost.
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface OnException {
+
+ /**
+ * Called when an activity throws an exception
+ * @param exception the exception thrown
+ */
+ public void onException(Exception exception);
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/RetryClient.java b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
new file mode 100644
index 0000000..9a5dacb
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
@@ -0,0 +1,44 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+import java.io.IOException;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+
+/**
+ * Define a client that can retry {@link Interest} packets on timeout.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface RetryClient {
+
+ /**
+ * Retry the given interest according to some implementation-specific
+ * strategy; the implementor must correctly call the given callbacks to pass
+ * success or failure states up to the application (e.g. if after retrying,
+ * the client gives up, the application's OnTimeout should be called). Note
+ * that an interest passed to this method may timeout long after it's lifetime
+ * due to the implementation's retry strategy.
+ *
+ * @param face the {@link Face} on which to retry requests
+ * @param interest the {@link Interest} to retry
+ * @param onData the application's success callback
+ * @param onTimeout the application's failure callback
+ * @throws IOException when the client cannot perform the necessary network IO
+ */
+ public void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException;
+}
diff --git a/src/test/java/com/intel/jndn/utils/repository/ForLoopRepositoryTest.java b/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
similarity index 62%
copy from src/test/java/com/intel/jndn/utils/repository/ForLoopRepositoryTest.java
copy to src/main/java/com/intel/jndn/utils/client/SegmentationType.java
index 1a73a01..166d81f 100644
--- a/src/test/java/com/intel/jndn/utils/repository/ForLoopRepositoryTest.java
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
@@ -11,16 +11,26 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.client;
/**
- * Test {@link ForLoopRepository}.
+ * Documents known partition types from
+ * http://named-data.net/doc/tech-memos/naming-conventions.pdf
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class ForLoopRepositoryTest extends RepositoryTest {
+public enum SegmentationType {
- public ForLoopRepositoryTest() {
- instance = new ForLoopRepository();
+ SEGMENT((byte) 0x00),
+ BYTE_OFFSET((byte) 0xFB);
+
+ private byte marker;
+
+ SegmentationType(byte marker) {
+ this.marker = marker;
+ }
+
+ public byte value() {
+ return marker;
}
}
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java b/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
new file mode 100644
index 0000000..d866675
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
@@ -0,0 +1,39 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+import java.io.IOException;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+
+/**
+ * Define a client that can retrieve segmented packets into a
+ * {@link DataStream}.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface SegmentedClient {
+
+ /**
+ * Asynchronously request packets until a FinalBlockId is reached. With this
+ * method, the user is responsible for calling {@link Face#processEvents()} in
+ * order for the data to be retrieved.
+ *
+ * @param face the {@link Face} on which to retry requests
+ * @param interest the {@link Interest} to retry
+ * @return a data stream of packets returned
+ * @throws IOException if the initial request fails
+ */
+ public DataStream getSegmentsAsync(Face face, Interest interest) throws IOException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedDataReassembler.java b/src/main/java/com/intel/jndn/utils/client/SegmentedDataReassembler.java
deleted file mode 100644
index 0dfa2a7..0000000
--- a/src/main/java/com/intel/jndn/utils/client/SegmentedDataReassembler.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.client;
-
-import com.intel.jndn.utils.SegmentedClient;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.function.Function;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-/**
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedDataReassembler {
- private static final Logger logger = Logger.getLogger(SegmentedDataReassembler.class.getName());
- private final Name interestName;
- private final List<CompletableFuture<Data>> segments;
-
- public SegmentedDataReassembler(Name interestName, List<CompletableFuture<Data>> segments) {
- this.interestName = interestName;
- this.segments = segments;
- }
-
- public CompletableFuture<Data> reassemble(){
- CompletableFuture[] segmentArray = segments.toArray(new CompletableFuture[]{});
- CompletableFuture<Void> allComplete = CompletableFuture.allOf(segmentArray);
- return allComplete.thenApply(new Function<Void, Data>() {
- @Override
- public Data apply(Void t) {
- try {
- logger.finer("Re-assembling data for request: " + interestName.toUri());
- byte[] content = aggregateBytes();
- Data data = buildAggregatePacket();
- data.setContent(new Blob(content));
- return data;
- } catch (ExecutionException | InterruptedException ex) {
- logger.log(Level.FINER, "Failed to re-assemble packet for request: " + interestName.toUri(), ex);
- throw new RuntimeException(ex);
- }
- }
- });
- }
-
- /**
- * @return the array of aggregated bytes for all of the segments retrieved
- * @throws ExecutionException
- */
- private byte[] aggregateBytes() throws ExecutionException {
- // aggregate bytes
- ByteArrayOutputStream content = new ByteArrayOutputStream();
- for (Future<Data> futureData : segments) {
- try {
- content.write(futureData.get().getContent().getImmutableArray());
- } catch (ExecutionException | IOException | InterruptedException e) {
- throw new ExecutionException("Failed while aggregating retrieved packets: " + interestName.toUri(), e);
- }
- }
- return content.toByteArray();
- }
-
- /**
- * @return an aggregated {@link Data} packet with no content set
- * @throws InterruptedException
- * @throws ExecutionException
- */
- private Data buildAggregatePacket() throws InterruptedException, ExecutionException {
- if (segments.isEmpty()) {
- throw new IllegalStateException("Unable to re-assemble packets; no segments added: " + interestName.toUri());
- }
- Data firstData = segments.get(0).get();
- Data aggregatedData = new Data(firstData);
- Name shortenedName = SegmentedClient.removeSegment(firstData.getName());
- aggregatedData.setName(shortenedName);
- return aggregatedData;
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/client/StreamingClient.java b/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
new file mode 100644
index 0000000..3ab5fb6
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
@@ -0,0 +1,43 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+
+/**
+ * Define a client that can stream content bytes that are partitioned over
+ * multiple packets.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface StreamingClient {
+
+ /**
+ * Asynchronously request a packet over the network and retrieve the
+ * partitioned content bytes as an input stream
+ *
+ * @param face the {@link Face} on which to make the request; call
+ * {@link Face#processEvents()} separately to complete the request
+ * @param interest the {@link Interest} to send over the network
+ * @param partitionMarker the byte marker identifying how the data packets are
+ * partitioned (e.g. segmentation, see
+ * http://named-data.net/doc/tech-memos/naming-conventions.pdf)
+ * @return a stream of content bytes
+ * @throws IOException if the stream setup fails
+ */
+ public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker) throws IOException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java b/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
new file mode 100644
index 0000000..be1e876
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
@@ -0,0 +1,169 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.OnComplete;
+import com.intel.jndn.utils.client.OnException;
+import com.intel.jndn.utils.client.RetryClient;
+import com.intel.jndn.utils.client.SegmentedClient;
+import com.intel.jndn.utils.client.DataStream;
+import com.intel.jndn.utils.client.SegmentationType;
+import com.intel.jndn.utils.client.StreamingClient;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnTimeout;
+
+/**
+ * Implementation of a client that can handle segmented data, retries after
+ * failed requests, and streaming of data packets.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class AdvancedClient extends SimpleClient implements SegmentedClient, StreamingClient {
+
+ private static final Logger logger = Logger.getLogger(AdvancedClient.class.getName());
+ public static final int DEFAULT_MAX_RETRIES = 3;
+ private static AdvancedClient defaultInstance;
+ private final SegmentedClient segmentedClient;
+ private final RetryClient retryClient;
+ private final StreamingClient streamingClient;
+
+ /**
+ * Singleton access for simpler client use
+ *
+ * @return a default client
+ */
+ public static AdvancedClient getDefault() {
+ if (defaultInstance == null) {
+ defaultInstance = new AdvancedClient();
+ }
+ return defaultInstance;
+ }
+
+ /**
+ * Build an advanced client
+ *
+ * @param sleepTime for synchronous processing, the time to sleep the thread
+ * between {@link Face#processEvents()}
+ * @param interestLifetime the {@link Interest} lifetime for default
+ * Interests; see
+ * {@link #getAsync(net.named_data.jndn.Face, net.named_data.jndn.Name)}
+ * @param segmentedClient the {@link SegmentedClient} to use for segmented
+ * data
+ * @param retryClient the {@link RetryClient} to use for retrying failed
+ * packets
+ * @param streamingClient the {@link StreamingClient} to use for segmented
+ * data
+ */
+ public AdvancedClient(long sleepTime, long interestLifetime, SegmentedClient segmentedClient, RetryClient retryClient, StreamingClient streamingClient) {
+ super(sleepTime, interestLifetime);
+ this.segmentedClient = segmentedClient;
+ this.retryClient = retryClient;
+ this.streamingClient = streamingClient;
+ }
+
+ /**
+ * Build an advanced client using default parameters
+ */
+ public AdvancedClient() {
+ super();
+ this.segmentedClient = new DefaultSegmentedClient();
+ this.retryClient = new DefaultRetryClient(DEFAULT_MAX_RETRIES);
+ this.streamingClient = new DefaultStreamingClient();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public CompletableFuture<Data> getAsync(Face face, Interest interest) {
+ CompletableFuture<Data> future = new CompletableFuture<>();
+
+ try {
+ DataStream stream = getSegmentsAsync(face, interest);
+
+ stream.observe(new OnException() {
+ public void onException(Exception exception) {
+ future.completeExceptionally(exception);
+ }
+ });
+
+ stream.observe(new OnComplete() {
+ public void onComplete() {
+ try {
+ future.complete(stream.assemble());
+ } catch (StreamException ex) {
+ stream.onException(ex);
+ }
+ }
+ });
+ } catch (IOException ex) {
+ future.completeExceptionally(ex);
+ }
+
+ return future;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public DataStream getSegmentsAsync(Face face, Interest interest) throws IOException {
+ DataStream stream = segmentedClient.getSegmentsAsync(face, interest);
+ stream.observe(new RetryHandler(face, stream));
+ return stream;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker) throws IOException {
+ return streamingClient.getStreamAsync(face, interest, partitionMarker);
+ }
+
+ /**
+ * Helper class for calling the retry client on failed requests
+ */
+ private class RetryHandler implements OnTimeout {
+ private final Face face;
+ private final DataStream stream;
+
+ private RetryHandler(Face face, DataStream stream) {
+ this.face = face;
+ this.stream = stream;
+ }
+
+ @Override
+ public void onTimeout(Interest failedInterest) {
+ logger.info("Timeout: " + failedInterest.toUri());
+ try {
+ retryClient.retry(face, failedInterest, stream, new OnTimeout() {
+ @Override
+ public void onTimeout(Interest interest) {
+ stream.onException(new TimeoutException("Interest timed out despite retries: " + interest.toUri()));
+ }
+ });
+ } catch (IOException ex) {
+ stream.onException(ex);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java b/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
new file mode 100644
index 0000000..c1ca4fe
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
@@ -0,0 +1,79 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.SegmentedClient;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.util.Blob;
+
+/**
+ * Internal class for assembling a list of {@link Data} packets into one large
+ * data packet; this implementation will use all properties of the first packet
+ * in the list and concatenate the content bytes of all packets in order.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+class DataAssembler {
+
+ private final Data[] packets;
+ private final byte marker;
+
+ public DataAssembler(Data[] packets, byte marker) {
+ this.packets = packets;
+ this.marker = marker;
+ }
+
+ /**
+ * @return a combined packet based on the properties of the first packet and
+ * the concatenated bytes of the entire list of packets
+ */
+ Data assemble() {
+ if (packets.length == 0) {
+ throw new IllegalStateException("No packets to assemble.");
+ }
+
+ // build from first returned packet
+ Data combined = new Data(packets[0]);
+ combined.setName(SegmentationHelper.removeSegment(combined.getName(), marker));
+ try {
+ combined.setContent(assembleBlob());
+ } catch (IOException ex) {
+ throw new IllegalStateException(ex);
+ }
+
+ return combined;
+ }
+
+ /**
+ * @return the concatenated bytes
+ * @throws IOException if a stream fails
+ */
+ private Blob assembleBlob() throws IOException {
+ if (packets.length == 0) {
+ return new Blob();
+ }
+
+ if (packets.length == 1) {
+ return packets[0].getContent();
+ }
+
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ for (Data packet : packets) {
+ stream.write(packet.getContent().getImmutableArray());
+ }
+ return new Blob(stream.toByteArray());
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
new file mode 100644
index 0000000..f984ac2
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
@@ -0,0 +1,121 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.RetryClient;
+import java.io.IOException;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+
+/**
+ * Default implementation of {@link RetryClient}; on request failure, this class
+ * immediately retries the request until a maximum number of retries is reached.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class DefaultRetryClient implements RetryClient {
+
+ private static final Logger logger = Logger.getLogger(DefaultRetryClient.class.getName());
+ private final int numRetriesAllowed;
+ private int totalRetries = 0;
+
+ public DefaultRetryClient(int numRetriesAllowed) {
+ this.numRetriesAllowed = numRetriesAllowed;
+ }
+
+ /**
+ * On timeout, retry the request until the maximum number of allowed retries
+ * is reached.
+ *
+ * @param face the {@link Face} on which to retry requests
+ * @param interest the {@link Interest} to retry
+ * @param onData the application's success callback
+ * @param onTimeout the application's failure callback
+ * @throws IOException when the client cannot perform the necessary network IO
+ */
+ @Override
+ public void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException {
+ RetryContext context = new RetryContext(face, interest, onData, onTimeout);
+ retryInterest(context);
+ }
+
+ /**
+ * Synchronized helper method to prevent multiple threads from mashing
+ * totalRetries
+ *
+ * @param context the current request context
+ * @throws IOException when the client cannot perform the necessary network IO
+ */
+ synchronized private void retryInterest(RetryContext context) throws IOException {
+ logger.info("Retrying interest: " + context.interest.toUri());
+ context.face.expressInterest(context.interest, context, context);
+ totalRetries++;
+ }
+
+ /**
+ * @return the total number of retries logged by this client
+ */
+ public int totalRetries() {
+ return totalRetries;
+ }
+
+ /**
+ * Helper class to separate out each request context; this allows the client
+ * to accept multiple concurrent retry operations
+ */
+ private class RetryContext implements OnData, OnTimeout {
+
+ public int numFailures = 0;
+ public final Face face;
+ public final Interest interest;
+ public final OnData applicationOnData;
+ public final OnTimeout applicationOnTimeout;
+
+ public RetryContext(Face face, Interest interest, OnData applicationOnData, OnTimeout applicationOnTimeout) {
+ this.face = face;
+ this.interest = interest;
+ this.applicationOnData = applicationOnData;
+ this.applicationOnTimeout = applicationOnTimeout;
+ }
+
+ public boolean shouldRetry() {
+ return numFailures < numRetriesAllowed;
+ }
+
+ @Override
+ public void onData(Interest interest, Data data) {
+ applicationOnData.onData(interest, data);
+ }
+
+ @Override
+ public void onTimeout(Interest interest) {
+ numFailures++;
+ logger.finest("Request failed, count " + numFailures + ": " + interest.toUri());
+
+ if (shouldRetry()) {
+ try {
+ retryInterest(this);
+ } catch (IOException ex) {
+ applicationOnTimeout.onTimeout(interest);
+ }
+ } else {
+ applicationOnTimeout.onTimeout(interest);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
new file mode 100644
index 0000000..83e0baa
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
@@ -0,0 +1,149 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.SegmentedClient;
+import com.intel.jndn.utils.client.DataStream;
+import java.io.IOException;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.Name.Component;
+import net.named_data.jndn.OnData;
+
+/**
+ * Retrieve segments one by one until the FinalBlockId indicates an end segment;
+ * then request remaining packets. This class currently only handles segmented
+ * data (not yet byte-offset segmented data).
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class DefaultSegmentedClient implements SegmentedClient {
+
+ private static final Logger logger = Logger.getLogger(DefaultSegmentedClient.class.getName());
+ private static DefaultSegmentedClient defaultInstance;
+ private byte marker = 0x00;
+
+ /**
+ * Singleton access for simpler client use
+ *
+ * @return a default client
+ */
+ public static DefaultSegmentedClient getDefault() {
+ if (defaultInstance == null) {
+ defaultInstance = new DefaultSegmentedClient();
+ }
+ return defaultInstance;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public DataStream getSegmentsAsync(Face face, Interest interest) throws IOException {
+ SegmentedDataStream stream = new SegmentedDataStream();
+
+ // once more packets are received, request more
+ stream.observe(new SegmentationContext(stream, face));
+
+ // request first packet
+ logger.info("Interest requested: " + interest.toUri());
+ face.expressInterest(interest, stream, stream);
+
+ return stream;
+ }
+
+ /**
+ * Helper class to track the last requested segment for a given request
+ * context and request follow-on packets
+ */
+ private class SegmentationContext implements OnData {
+
+ private final SegmentedDataStream stream;
+ private final Face face;
+ private long lastRequestedSegment;
+
+ public SegmentationContext(SegmentedDataStream stream, Face face) {
+ this.stream = stream;
+ this.face = face;
+ }
+
+ private synchronized void setLastRequestedSegment(long segmentNumber) {
+ lastRequestedSegment = segmentNumber;
+ }
+
+ @Override
+ public void onData(Interest interest, Data data) {
+ try {
+ if (stream.current() >= lastRequestedSegment) {
+ Interest dataBasedName = new Interest(interest).setName(data.getName());
+ if (stream.hasEnd()) {
+ if (stream.current() < stream.end()) {
+ requestRemainingSegments(face, dataBasedName, stream);
+ }
+ } else {
+ requestNext(face, dataBasedName, stream);
+ }
+ }
+ } catch (IOException e) {
+ stream.onException(e);
+ }
+ }
+
+ private void requestRemainingSegments(Face face, Interest interest, SegmentedDataStream stream) throws IOException {
+ long from = stream.current() + 1;
+ long to = stream.end();
+ logger.info("Requesting remaining segments: from #" + from + " to #" + to);
+
+ for (long segmentNumber = stream.current() + 1; segmentNumber <= stream.end(); segmentNumber++) {
+ request(face, interest, stream, segmentNumber, marker);
+ }
+ }
+
+ private void requestNext(Face face, Interest interest, SegmentedDataStream stream) throws IOException {
+ long segmentNumber = stream.current() + 1;
+ request(face, interest, stream, segmentNumber, marker);
+ }
+
+ private void request(Face face, Interest interest, DataStream stream, long segmentNumber, byte marker) throws IOException {
+ Interest copiedInterest = replaceFinalComponent(interest, segmentNumber, marker);
+ face.expressInterest(copiedInterest, stream, stream);
+ logger.info("Interest sent: " + copiedInterest.toUri());
+ setLastRequestedSegment(segmentNumber);
+ }
+ }
+
+ /**
+ * Replace the final component of an interest name with a segmented component;
+ * if the interest name does not have a segmented component, this will add
+ * one.
+ *
+ * @param interest the request
+ * @param segmentNumber a segment number
+ * @param marker a marker to use for segmenting the packet
+ * @return a segmented interest (a copy of the passed interest)
+ */
+ protected Interest replaceFinalComponent(Interest interest, long segmentNumber, byte marker) {
+ Interest copied = new Interest(interest);
+ Component lastComponent = Component.fromNumberWithMarker(segmentNumber, marker);
+ Name newName = (SegmentationHelper.isSegmented(copied.getName(), marker))
+ ? copied.getName().getPrefix(-1)
+ : new Name(copied.getName());
+ copied.setName(newName.append(lastComponent));
+ return copied;
+ }
+
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
new file mode 100644
index 0000000..55fa0f0
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
@@ -0,0 +1,103 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.DataStream;
+import com.intel.jndn.utils.client.OnException;
+import com.intel.jndn.utils.client.SegmentationType;
+import com.intel.jndn.utils.client.SegmentedClient;
+import com.intel.jndn.utils.client.StreamingClient;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnData;
+
+/**
+ * Default implementation of {@link StreamingClient}; uses a segmented client to
+ * retrieve packets asynchronously and pipes them to a stream as they are
+ * received.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class DefaultStreamingClient implements StreamingClient {
+
+ private static final Logger logger = Logger.getLogger(DefaultStreamingClient.class.getName());
+ private SegmentedClient client;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker) throws IOException {
+ return getStreamAsync(face, interest, partitionMarker, new DefaultOnException());
+ }
+
+ /**
+ *
+ * @param face
+ * @param interest
+ * @param partitionMarker
+ * @param onException
+ * @return
+ * @throws IOException
+ */
+ public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker, OnException onException) throws IOException {
+ SegmentedClient client = DefaultSegmentedClient.getDefault();
+ return getStreamAsync(client.getSegmentsAsync(face, interest), onException);
+ }
+
+ /**
+ *
+ * @param onDataStream
+ * @param onException
+ * @return
+ * @throws IOException
+ */
+ public InputStream getStreamAsync(DataStream onDataStream, OnException onException) throws IOException {
+ PipedInputStream in = new PipedInputStream();
+ PipedOutputStream out = new PipedOutputStream(in);
+
+ onDataStream.observe(onException);
+
+ onDataStream.observe(new OnData() {
+ @Override
+ public void onData(Interest interest, Data data) {
+ try {
+ out.write(data.getContent().getImmutableArray());
+ } catch (IOException ex) {
+ onDataStream.onException(ex);
+ }
+ }
+ });
+
+ return in;
+ }
+
+ /**
+ * Helper for logging exceptions
+ */
+ private class DefaultOnException implements OnException {
+
+ @Override
+ public void onException(Exception exception) {
+ logger.log(Level.SEVERE, "Streaming failed", exception);
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java b/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java
new file mode 100644
index 0000000..7a051ee
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+
+/**
+ * Helper methods for dealing with segmented packets (see
+ * http://named-data.net/doc/tech-memos/naming-conventions.pdf).
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentationHelper {
+
+ /**
+ * Determine if a name is segmented, i.e. if it ends with the correct marker
+ * type.
+ *
+ * @param name the name of a packet
+ * @param marker the marker type (the initial byte of the component)
+ * @return true if the name is segmented
+ */
+ public static boolean isSegmented(Name name, byte marker) {
+ return name.size() > 0 ? name.get(-1).getValue().buf().get(0) == marker : false;
+ }
+
+ /**
+ * Retrieve the segment number from the last component of a name.
+ *
+ * @param name the name of a packet
+ * @param marker the marker type (the initial byte of the component)
+ * @return the segment number
+ * @throws EncodingException if the name does not have a final component of
+ * the correct marker type
+ */
+ public static long parseSegment(Name name, byte marker) throws EncodingException {
+ if (name.size() == 0) {
+ throw new EncodingException("No components to parse.");
+ }
+ return name.get(-1).toNumberWithMarker(marker);
+ }
+
+ /**
+ * Remove a segment component from the end of a name
+ *
+ * @param name the name of a packet
+ * @param marker the marker type (the initial byte of the component)
+ * @return the new name with the segment component removed or a copy of the
+ * name if no segment component was present
+ */
+ public static Name removeSegment(Name name, byte marker) {
+ return isSegmented(name, marker) ? name.getPrefix(-1) : new Name(name);
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java b/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
new file mode 100644
index 0000000..41fd86a
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
@@ -0,0 +1,199 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.OnComplete;
+import com.intel.jndn.utils.client.OnException;
+import com.intel.jndn.utils.client.DataStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+import net.named_data.jndn.encoding.EncodingException;
+
+/**
+ * As packets are received, they are mapped by their last component's segment
+ * marker; if the last component is not parseable, an exception is thrown and
+ * processing completes. The exception to this is if the first packet returned
+ * does not have a segment marker as the last component; in this case, the
+ * packet is assumed to be the only packet returned and is placed as the first
+ * and only packet to assemble. Observers may register callbacks to watch when
+ * data is received; if data is received out of order, the callbacks will not be
+ * fired until adjoining packets are received.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentedDataStream implements DataStream {
+
+ private static final Logger logger = Logger.getLogger(SegmentedDataStream.class.getName());
+ private final byte PARTITION_MARKER = 0x00;
+ private volatile long current = -1;
+ private volatile long end = Long.MAX_VALUE;
+ private Map<Long, Data> packets = new HashMap<>();
+ private List<Object> observers = new ArrayList<>();
+ private Exception exception;
+
+ @Override
+ public boolean isComplete() {
+ return current == end || isCompletedExceptionally();
+ }
+
+ public boolean isCompletedExceptionally() {
+ return exception != null;
+ }
+
+ public boolean hasEnd() {
+ return end != Long.MAX_VALUE;
+ }
+
+ public long end() {
+ return end;
+ }
+
+ public long current() {
+ return current;
+ }
+
+ @Override
+ public Data[] list() {
+ return packets.values().toArray(new Data[]{});
+ }
+
+ @Override
+ public Data assemble() throws StreamException {
+ if (isCompletedExceptionally()) {
+ throw new StreamException(exception);
+ }
+
+ Object o = new LinkedList();
+ return new DataAssembler(list(), PARTITION_MARKER).assemble();
+ }
+
+ @Override
+ public void observe(OnData onData) {
+ observers.add(onData);
+ }
+
+ @Override
+ public void observe(OnComplete onComplete) {
+ observers.add(onComplete);
+ }
+
+ @Override
+ public void observe(OnException onException) {
+ observers.add(onException);
+ }
+
+ @Override
+ public void observe(OnTimeout onTimeout) {
+ observers.add(onTimeout);
+ }
+
+ @Override
+ public synchronized void onData(Interest interest, Data data) {
+ logger.info("Data received: " + data.getName().toUri());
+ long id;
+
+ // no segment component
+ if (!SegmentationHelper.isSegmented(data.getName(), PARTITION_MARKER) && packets.size() == 0) {
+ id = 0;
+ packets.put(id, data);
+
+ // mark processing complete if the first packet has no segment component
+ end = 0;
+ } // with segment component
+ else {
+ Name.Component lastComponent = data.getName().get(-1);
+ try {
+ id = lastComponent.toNumberWithMarker(PARTITION_MARKER);
+ packets.put(id, data);
+ } catch (EncodingException ex) {
+ onException(ex);
+ return;
+ }
+ }
+
+ if (hasFinalBlockId(data)) {
+ try {
+ end = data.getMetaInfo().getFinalBlockId().toNumberWithMarker(PARTITION_MARKER);
+ } catch (EncodingException ex) {
+ onException(ex);
+ }
+ }
+
+ // call data observers
+ if (isNextPacket(id)) {
+ do {
+ current++;
+ assert (packets.containsKey(current));
+ Data retrieved = packets.get(current);
+ observersOfType(OnData.class).stream().forEach((OnData cb) -> {
+ cb.onData(interest, retrieved);
+ });
+ } while (hasNextPacket());
+ }
+
+ // call completion observers
+ if (isComplete()) {
+ onComplete();
+ }
+ }
+
+ private boolean hasFinalBlockId(Data data) {
+ return data.getMetaInfo().getFinalBlockId().getValue().size() > 0;
+ }
+
+ private boolean isNextPacket(long id) {
+ return current + 1 == id;
+ }
+
+ private boolean hasNextPacket() {
+ return packets.containsKey(current + 1);
+ }
+
+ @Override
+ public synchronized void onComplete() {
+ observersOfType(OnComplete.class).stream().forEach((OnComplete cb) -> {
+ cb.onComplete();
+ });
+ }
+
+ @Override
+ public synchronized void onTimeout(Interest interest) {
+ observersOfType(OnTimeout.class).stream().forEach((OnTimeout cb) -> {
+ cb.onTimeout(interest);
+ });
+ }
+
+ @Override
+ public synchronized void onException(Exception exception) {
+ this.exception = exception;
+
+ observersOfType(OnException.class).stream().forEach((OnException cb) -> {
+ cb.onException(exception);
+ });
+ }
+
+ private <T> List<T> observersOfType(Class<T> type) {
+ return observers.stream().filter((Object o) -> type.isAssignableFrom(o.getClass())).map((o) -> (T) o).collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/SimpleClient.java b/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
similarity index 90%
rename from src/main/java/com/intel/jndn/utils/SimpleClient.java
rename to src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
index 1bf8519..9e2e594 100644
--- a/src/main/java/com/intel/jndn/utils/SimpleClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
@@ -11,8 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.client.impl;
+import com.intel.jndn.utils.Client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -87,6 +88,7 @@
face.expressInterest(interest, new OnData() {
@Override
public void onData(Interest interest, Data data) {
+ logger.log(Level.FINER, "Retrieved data: " + data.getName().toUri());
futureData.complete(data);
}
}, new OnTimeout() {
@@ -118,18 +120,18 @@
@Override
public Data getSync(Face face, Interest interest) throws IOException {
CompletableFuture<Data> future = getAsync(face, interest);
+
try {
// process events until complete
while (!future.isDone()) {
synchronized (face) {
face.processEvents();
}
-
+
if (sleepTime > 0) {
- try{
+ try {
Thread.sleep(sleepTime);
- }
- catch(InterruptedException e){
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@@ -150,11 +152,11 @@
}
/**
- * Create a default interest for a given Name using some common settings: -
- * lifetime: 2 seconds
+ * Create a default interest for a given {@link Name} using the client's
+ * passed settings (see {@link #SimpleClient(long, long)})
*
- * @param name
- * @return
+ * @param name the {@link Name} of the data to retrieve
+ * @return a default interest for the given name
*/
public Interest getDefaultInterest(Name name) {
Interest interest = new Interest(name, interestLifetime);
diff --git a/src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java b/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
similarity index 71%
copy from src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java
copy to src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
index 732cc9c..59ad55a 100644
--- a/src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
@@ -11,13 +11,20 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.client.impl;
/**
- * Thrown when a {@link Repository} cannot retrieve a stored packet.
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class DataNotFoundException extends Exception {
+public class StreamException extends Exception {
+ public StreamException(Exception exception) {
+ super(exception);
+ }
+
+ public StreamException(String message){
+ super(message);
+ }
+
}
diff --git a/src/main/java/com/intel/jndn/utils/server/pipeline/CompressionStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
similarity index 89%
rename from src/main/java/com/intel/jndn/utils/server/pipeline/CompressionStage.java
rename to src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
index 687bde0..21862fa 100644
--- a/src/main/java/com/intel/jndn/utils/server/pipeline/CompressionStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
@@ -11,9 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server.pipeline;
+package com.intel.jndn.utils.processing.impl;
-import com.intel.jndn.utils.server.PipelineStage;
+import com.intel.jndn.utils.ProcessingStage;
import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPOutputStream;
import net.named_data.jndn.Data;
@@ -24,7 +24,7 @@
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class CompressionStage implements PipelineStage<Data, Data> {
+public class CompressionStage implements ProcessingStage<Data, Data> {
/**
* Compress and replace the {@link Data} content. Note: this stage will return
diff --git a/src/main/java/com/intel/jndn/utils/server/pipeline/SigningStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
similarity index 92%
rename from src/main/java/com/intel/jndn/utils/server/pipeline/SigningStage.java
rename to src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
index 5296d89..3125db8 100644
--- a/src/main/java/com/intel/jndn/utils/server/pipeline/SigningStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
@@ -11,9 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server.pipeline;
+package com.intel.jndn.utils.processing.impl;
-import com.intel.jndn.utils.server.PipelineStage;
+import com.intel.jndn.utils.ProcessingStage;
import net.named_data.jndn.Data;
import net.named_data.jndn.Name;
import net.named_data.jndn.security.KeyChain;
@@ -25,7 +25,7 @@
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class SigningStage implements PipelineStage<Data, Data> {
+public class SigningStage implements ProcessingStage<Data, Data> {
private final KeyChain keyChain;
private final Name certificateName;
diff --git a/src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java b/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
similarity index 93%
rename from src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java
rename to src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
index 732cc9c..eba700a 100644
--- a/src/main/java/com/intel/jndn/utils/repository/DataNotFoundException.java
+++ b/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.repository.impl;
/**
* Thrown when a {@link Repository} cannot retrieve a stored packet.
diff --git a/src/main/java/com/intel/jndn/utils/repository/ForLoopRepository.java b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
similarity index 97%
rename from src/main/java/com/intel/jndn/utils/repository/ForLoopRepository.java
rename to src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
index 80fedb1..e40890d 100644
--- a/src/main/java/com/intel/jndn/utils/repository/ForLoopRepository.java
+++ b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
@@ -11,8 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.repository.impl;
+import com.intel.jndn.utils.Repository;
import java.util.ArrayList;
import java.util.List;
import net.named_data.jndn.Data;
@@ -23,7 +24,7 @@
* Store {@link Data} packets in a linked list and iterate over the list to find
* the best match; this is a subset of the functionality provided in
* {@link net.named_data.jndn.util.MemoryContentCache} and borrows the matching
- * logic from there. Code for removing stale packets is not yet implemented.
+ * logic from there.
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
diff --git a/src/main/java/com/intel/jndn/utils/DynamicServer.java b/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
similarity index 95%
rename from src/main/java/com/intel/jndn/utils/DynamicServer.java
rename to src/main/java/com/intel/jndn/utils/server/DynamicServer.java
index 8206e27..60caf35 100644
--- a/src/main/java/com/intel/jndn/utils/DynamicServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
@@ -11,10 +11,10 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server;
+import com.intel.jndn.utils.Server;
import com.intel.jndn.utils.server.RespondWithData;
-import com.intel.jndn.utils.server.Server;
import java.io.IOException;
/**
diff --git a/src/main/java/com/intel/jndn/utils/server/PipelineStage.java b/src/main/java/com/intel/jndn/utils/server/PipelineStage.java
deleted file mode 100644
index 15d23f0..0000000
--- a/src/main/java/com/intel/jndn/utils/server/PipelineStage.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.server;
-
-/**
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public interface PipelineStage<T, Y> {
- public Y process(T context) throws Exception;
-// public void setNextStage(PipelineStage<Y, ?> nextStage);
-// public PipelineStage<Y, ?> getNextStage();
-}
diff --git a/src/main/java/com/intel/jndn/utils/RepositoryServer.java b/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
similarity index 94%
rename from src/main/java/com/intel/jndn/utils/RepositoryServer.java
rename to src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
index 88219d8..09ebc23 100644
--- a/src/main/java/com/intel/jndn/utils/RepositoryServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
@@ -11,9 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server;
-import com.intel.jndn.utils.server.Server;
+import com.intel.jndn.utils.Server;
import java.io.IOException;
import net.named_data.jndn.Data;
diff --git a/src/main/java/com/intel/jndn/utils/SegmentedServer.java b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
similarity index 89%
rename from src/main/java/com/intel/jndn/utils/SegmentedServer.java
rename to src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
index 875ca87..6be4280 100644
--- a/src/main/java/com/intel/jndn/utils/SegmentedServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
@@ -11,12 +11,13 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server.impl;
-import com.intel.jndn.utils.repository.ForLoopRepository;
-import com.intel.jndn.utils.repository.Repository;
-import com.intel.jndn.utils.server.SegmentedServerHelper;
-import com.intel.jndn.utils.server.ServerBaseImpl;
+import com.intel.jndn.utils.server.RepositoryServer;
+import com.intel.jndn.utils.repository.impl.ForLoopRepository;
+import com.intel.jndn.utils.Repository;
+import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
+import com.intel.jndn.utils.server.impl.ServerBaseImpl;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
diff --git a/src/main/java/com/intel/jndn/utils/server/SegmentedServerHelper.java b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java
similarity index 98%
rename from src/main/java/com/intel/jndn/utils/server/SegmentedServerHelper.java
rename to src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java
index 2e1f92c..78e3c27 100644
--- a/src/main/java/com/intel/jndn/utils/server/SegmentedServerHelper.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server;
+package com.intel.jndn.utils.server.impl;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
diff --git a/src/main/java/com/intel/jndn/utils/server/ServerBaseImpl.java b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
similarity index 90%
rename from src/main/java/com/intel/jndn/utils/server/ServerBaseImpl.java
rename to src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
index b69b455..6110005 100644
--- a/src/main/java/com/intel/jndn/utils/server/ServerBaseImpl.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
@@ -11,8 +11,10 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server;
+package com.intel.jndn.utils.server.impl;
+import com.intel.jndn.utils.ProcessingStage;
+import com.intel.jndn.utils.Server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -36,7 +38,7 @@
private static final Logger logger = Logger.getLogger(ServerBaseImpl.class.getName());
private final Face face;
private final Name prefix;
- private final List<PipelineStage> pipeline = new ArrayList<>();
+ private final List<ProcessingStage> pipeline = new ArrayList<>();
private long registeredPrefixId = UNREGISTERED;
/**
@@ -89,7 +91,7 @@
logger.log(Level.SEVERE, "Failed to register prefix: " + prefix.toUri());
}
}, new ForwardingFlags());
- logger.log(Level.FINER, "Registered a new prefix: " + prefix.toUri());
+ logger.log(Level.FINER, "Registered a new prefix: " + prefix.toUri());
} catch (net.named_data.jndn.security.SecurityException e) {
throw new IOException("Failed to communicate to face due to security error", e);
}
@@ -99,13 +101,13 @@
* {@inheritDoc}
*/
@Override
- public void addPipelineStage(PipelineStage<Data, Data> pipelineStage) {
+ public void addPostProcessingStage(ProcessingStage<Data, Data> pipelineStage) {
pipeline.add(pipelineStage);
}
/**
* Process the {@link Data} before sending it; this runs the packet through
- * each registered {@link PipelineStage} in order.
+ * each registered {@link ProcessingStage} in order.
*
* @param data the {@link Data} to process
* @return a processed {@link Data} packet; no guarantee as to whether it is
@@ -113,7 +115,7 @@
* @throws Exception if a pipeline stage fails
*/
public Data processPipeline(Data data) throws Exception {
- for (PipelineStage<Data, Data> stage : pipeline) {
+ for (ProcessingStage<Data, Data> stage : pipeline) {
data = stage.process(data);
}
return data;
diff --git a/src/main/java/com/intel/jndn/utils/SimpleServer.java b/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
similarity index 90%
rename from src/main/java/com/intel/jndn/utils/SimpleServer.java
rename to src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
index b983fa3..cee7d7f 100644
--- a/src/main/java/com/intel/jndn/utils/SimpleServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
@@ -11,13 +11,15 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server.impl;
+import com.intel.jndn.utils.server.DynamicServer;
import com.intel.jndn.utils.server.RespondWithData;
import com.intel.jndn.utils.server.RespondWithBlob;
-import com.intel.jndn.utils.server.ServerBaseImpl;
+import com.intel.jndn.utils.server.RespondWithBlob;
+import com.intel.jndn.utils.server.RespondWithData;
+import com.intel.jndn.utils.server.impl.ServerBaseImpl;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.named_data.jndn.Data;
@@ -25,7 +27,7 @@
import net.named_data.jndn.Interest;
import net.named_data.jndn.InterestFilter;
import net.named_data.jndn.Name;
-import net.named_data.jndn.transport.Transport;
+import net.named_data.jndn.OnInterest;
import net.named_data.jndn.util.Blob;
/**
diff --git a/src/test/java/com/intel/jndn/utils/SimpleClientTestIT.java b/src/test/java/com/intel/jndn/utils/SimpleClientTestIT.java
deleted file mode 100644
index b60eed8..0000000
--- a/src/test/java/com/intel/jndn/utils/SimpleClientTestIT.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils;
-
-import com.intel.jndn.mock.MockKeyChain;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.InterestFilter;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.OnInterestCallback;
-import net.named_data.jndn.encoding.EncodingException;
-import net.named_data.jndn.security.KeyChain;
-import net.named_data.jndn.security.SecurityException;
-import net.named_data.jndn.util.Blob;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test SimpleClient.java; requires a hostname to an NFD accepting a generated
- * key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SimpleClientTestIT {
-
- private static final Logger logger = Logger.getLogger(SegmentedServerTestIT.class.getName());
- private static final Name PREFIX = new Name("/test/for/simple-client");
-
- Face face;
- SimpleClient instance;
- String ip;
- ScheduledExecutorService pool;
-
- public SimpleClientTestIT() throws SecurityException {
- this.ip = System.getProperty("nfd.ip");
- this.face = new Face(ip);
- this.instance = SimpleClient.getDefault();
- this.pool = Executors.newScheduledThreadPool(2);
-
- KeyChain mockKeyChain = MockKeyChain.configure(new Name("/test/server"));
- face.setCommandSigningInfo(mockKeyChain, mockKeyChain.getDefaultCertificateName());
- pool.scheduleAtFixedRate(new EventProcessor(face), 0, 10, TimeUnit.MILLISECONDS);
- }
-
- @Test
- public void testCompletableFuture() throws Exception {
- Data servedData = new Data();
- servedData.setContent(new Blob("....."));
- face.registerPrefix(PREFIX, new OnInterestCallback() {
- @Override
- public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
- servedData.setName(interest.getName());
- try {
- face.putData(servedData);
- } catch (IOException ex) {
- logger.log(Level.SEVERE, "Failed to put data.", ex);
- }
- }
- }, null);
-
- CompletableFuture<Data> future = instance.getAsync(face, PREFIX);
- Data retrievedData = future.get(200, TimeUnit.MILLISECONDS);
-
- Assert.assertEquals(servedData.getContent().toString(), retrievedData.getContent().toString());
- }
-
- private class EventProcessor implements Runnable {
-
- private final Face face;
-
- public EventProcessor(Face face) {
- this.face = face;
- }
-
- @Override
- public void run() {
- try {
- face.processEvents();
- } catch (IOException | EncodingException ex) {
- logger.log(Level.SEVERE, null, ex);
- }
- }
- }
-}
diff --git a/src/test/java/com/intel/jndn/utils/TestHelper.java b/src/test/java/com/intel/jndn/utils/TestHelper.java
index 8784a8d..a73aba4 100644
--- a/src/test/java/com/intel/jndn/utils/TestHelper.java
+++ b/src/test/java/com/intel/jndn/utils/TestHelper.java
@@ -13,20 +13,43 @@
*/
package com.intel.jndn.utils;
+import com.intel.jndn.mock.MockKeyChain;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.security.KeyChain;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.transport.UdpTransport;
import net.named_data.jndn.util.Blob;
/**
+ * Collect assorted methods to help with testing
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
public class TestHelper {
-
+
+ public static Data retrieve(CompletableFuture<Data> future) {
+ try {
+ return future.get(4000, TimeUnit.MILLISECONDS);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
public static List<CompletableFuture<Data>> buildFutureSegments(Name name, int from, int to) {
return buildSegments(name, from, to).stream()
.map((d) -> CompletableFuture.completedFuture(d))
@@ -34,7 +57,7 @@
}
public static List<Data> buildSegments(Name name, int from, int to) {
- return IntStream.range(0, 10).boxed()
+ return IntStream.range(from, to).boxed()
.map((i) -> buildData(new Name(name).appendSegment(i), i.toString(), to - 1))
.collect(Collectors.toList());
}
@@ -45,10 +68,64 @@
return data;
}
-
- public static Data buildData(Name name, String content, int finalBlockId){
+
+ public static Data buildData(Name name, String content, int finalBlockId) {
Data data = buildData(name, content);
data.getMetaInfo().setFinalBlockId(Name.Component.fromNumberWithMarker(finalBlockId, 0x00));
return data;
}
+
+ public static NdnEnvironment buildTestEnvironment(String host, int numFaces) throws SecurityException {
+ NdnEnvironment environment = new NdnEnvironment();
+ environment.executor = Executors.newScheduledThreadPool(numFaces);
+ environment.keyChain = MockKeyChain.configure(new Name("/test/identity").append(buildRandomString(10)));
+
+ for (int i = 0; i < numFaces; i++) {
+ Face face = new Face(new UdpTransport(), new UdpTransport.ConnectionInfo(host));
+ face.setCommandSigningInfo(environment.keyChain, environment.keyChain.getDefaultCertificateName());
+ environment.executor.scheduleAtFixedRate(new EventProcessor(face), 0, 20, TimeUnit.MILLISECONDS);
+ environment.faces.add(i, face);
+ }
+
+ return environment;
+ }
+
+ public static String buildRandomString(int length) {
+ return new String(buildRandomBytes(length));
+ }
+
+ public static byte[] buildRandomBytes(int length){
+ byte[] bytes = new byte[length];
+ new Random().nextBytes(bytes);
+ return bytes;
+ }
+
+ public static class NdnEnvironment {
+
+ public ScheduledExecutorService executor;
+ public KeyChain keyChain;
+ public List<Face> faces = new ArrayList<>();
+ }
+
+ public static class EventProcessor implements Runnable {
+
+ private final Face face;
+
+ public EventProcessor(Face face) {
+ this.face = face;
+ }
+
+ @Override
+ public void run() {
+ try {
+ face.processEvents();
+ } catch (IOException | EncodingException ex) {
+ Logger.getLogger(TestHelper.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+
+ public static class TestCounter{
+ public int count = 0;
+ }
}
diff --git a/src/test/java/com/intel/jndn/utils/client/SegmentedDataReassemblerTest.java b/src/test/java/com/intel/jndn/utils/client/SegmentedDataReassemblerTest.java
deleted file mode 100644
index 881b6af..0000000
--- a/src/test/java/com/intel/jndn/utils/client/SegmentedDataReassemblerTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.client;
-
-import com.intel.jndn.utils.TestHelper;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Name;
-import static org.junit.Assert.*;
-import org.junit.Test;
-
-/**
- * Test that segment data packets are re-assembled correctly.
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedDataReassemblerTest {
-
- @Test
- public void testReassemble() throws InterruptedException, ExecutionException {
- Name name = new Name("/data/re-assembly");
- List<CompletableFuture<Data>> segments = TestHelper.buildFutureSegments(name, 0, 10);
- SegmentedDataReassembler instance = new SegmentedDataReassembler(name, segments);
-
- CompletableFuture<Data> future = instance.reassemble();
- assertTrue(future.isDone());
- assertEquals(name.toUri(), future.get().getName().toUri()); // tests name-shortening logic
- assertEquals("0123456789", future.get().getContent().toString());
- }
-}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
new file mode 100644
index 0000000..45572a7
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
@@ -0,0 +1,118 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.Client;
+import com.intel.jndn.utils.TestHelper;
+import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.InterestFilter;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterestCallback;
+import net.named_data.jndn.security.SecurityException;
+import org.junit.Test;
+
+/**
+ * Test AdvancedClient.java; requires a hostname to an NFD accepting a generated
+ * key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class AdvancedClientFileTestIT {
+
+ private static final Logger logger = Logger.getLogger(AdvancedClientFileTestIT.class.getName());
+ private static final Name PREFIX = new Name("/test/advanced-client").append(TestHelper.buildRandomString(10));
+ private static final int NUM_MESSAGES = 100;
+ private static final int MESSAGE_SIZE_BYTES = 10000;
+ private static final int SEGMENT_SIZE_BYTES = 5000;
+ private final TestHelper.NdnEnvironment environment;
+
+ public AdvancedClientFileTestIT() throws SecurityException {
+ String ip = System.getProperty("nfd.ip");
+ logger.info("Testing on NFD at: " + ip);
+ environment = TestHelper.buildTestEnvironment(ip, 2);
+ }
+
+ @Test
+ public void stressTest() throws Exception {
+ Face producer = environment.faces.get(0);
+ Face consumer = environment.faces.get(1);
+
+ producer.registerPrefix(PREFIX, new AdvancedDataServer(MESSAGE_SIZE_BYTES, SEGMENT_SIZE_BYTES), null);
+
+ // TODO this must be here until the prefix registration callback is complete
+ Thread.sleep(500);
+
+ long startTime = System.currentTimeMillis();
+ AdvancedClient client = new AdvancedClient();
+ List<CompletableFuture<Data>> requests = expressInterests(client, consumer, PREFIX, NUM_MESSAGES);
+ List<Data> datas = requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
+ long endTime = System.currentTimeMillis();
+
+ logger.info(String.format("Transfered %d bytes in %d ms", MESSAGE_SIZE_BYTES * NUM_MESSAGES, endTime - startTime));
+ }
+
+ private List<CompletableFuture<Data>> expressInterests(Client client, Face face, Name name, int count) {
+ return IntStream.range(0, count).boxed().map((i) -> client.getAsync(face, name)).collect(Collectors.toList());
+ }
+
+ private class AdvancedDataServer implements OnInterestCallback {
+
+ private final int contentSize;
+ private final int segmentSize;
+
+ public AdvancedDataServer(int contentSize, int segmentSize) {
+ this.contentSize = contentSize;
+ this.segmentSize = segmentSize;
+ }
+
+ @Override
+ public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
+ // ignore segmented requests, the data has already been put below
+ if (SegmentationHelper.isSegmented(interest.getName(), (byte) 0x00)) {
+ return;
+ }
+
+ // randomly ignore 10% of requests
+ if (new Random().nextInt(10) < 2){
+ logger.info("Skipping an interest: " + interest.toUri());
+ return;
+ }
+
+ ByteArrayInputStream bytes = new ByteArrayInputStream(TestHelper.buildRandomString(contentSize).getBytes());
+ Data data = TestHelper.buildData(interest.getName(), "");
+ data.getMetaInfo().setFreshnessPeriod(0);
+
+ try {
+ for (Data segment : SegmentedServerHelper.segment(data, bytes, segmentSize)) {
+ logger.log(Level.INFO, "Put data: " + segment.getName().toUri());
+ face.putData(segment);
+ }
+ } catch (IOException ex) {
+ logger.log(Level.SEVERE, "Failed to put data.", ex);
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
new file mode 100644
index 0000000..6a99599
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
@@ -0,0 +1,118 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.Client;
+import com.intel.jndn.utils.TestHelper;
+import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.InterestFilter;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterestCallback;
+import net.named_data.jndn.security.SecurityException;
+import org.junit.Test;
+
+/**
+ * Test AdvancedClient.java; requires a hostname to an NFD accepting a generated
+ * key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class AdvancedClientStressTestIT {
+
+ private static final Logger logger = Logger.getLogger(AdvancedClientStressTestIT.class.getName());
+ private static final Name PREFIX = new Name("/test/advanced-client").append(TestHelper.buildRandomString(10));
+ private static final int NUM_MESSAGES = 100;
+ private static final int MESSAGE_SIZE_BYTES = 10000;
+ private static final int SEGMENT_SIZE_BYTES = 5000;
+ private final TestHelper.NdnEnvironment environment;
+
+ public AdvancedClientStressTestIT() throws SecurityException {
+ String ip = System.getProperty("nfd.ip");
+ logger.info("Testing on NFD at: " + ip);
+ environment = TestHelper.buildTestEnvironment(ip, 2);
+ }
+
+ @Test
+ public void stressTest() throws Exception {
+ Face producer = environment.faces.get(0);
+ Face consumer = environment.faces.get(1);
+
+ producer.registerPrefix(PREFIX, new RandomDataServer(MESSAGE_SIZE_BYTES, SEGMENT_SIZE_BYTES), null);
+
+ // TODO this must be here until the prefix registration callback is complete
+ Thread.sleep(500);
+
+ long startTime = System.currentTimeMillis();
+ AdvancedClient client = new AdvancedClient();
+ List<CompletableFuture<Data>> requests = expressInterests(client, consumer, PREFIX, NUM_MESSAGES);
+ List<Data> datas = requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
+ long endTime = System.currentTimeMillis();
+
+ logger.info(String.format("Transfered %d bytes in %d ms", MESSAGE_SIZE_BYTES * NUM_MESSAGES, endTime - startTime));
+ }
+
+ private List<CompletableFuture<Data>> expressInterests(Client client, Face face, Name name, int count) {
+ return IntStream.range(0, count).boxed().map((i) -> client.getAsync(face, name)).collect(Collectors.toList());
+ }
+
+ private class RandomDataServer implements OnInterestCallback {
+
+ private final int contentSize;
+ private final int segmentSize;
+
+ public RandomDataServer(int contentSize, int segmentSize) {
+ this.contentSize = contentSize;
+ this.segmentSize = segmentSize;
+ }
+
+ @Override
+ public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
+ // ignore segmented requests, the data has already been put below
+ if (SegmentationHelper.isSegmented(interest.getName(), (byte) 0x00)) {
+ return;
+ }
+
+ // randomly ignore 10% of requests
+ if (new Random().nextInt(10) < 2){
+ logger.info("Skipping an interest: " + interest.toUri());
+ return;
+ }
+
+ ByteArrayInputStream bytes = new ByteArrayInputStream(TestHelper.buildRandomString(contentSize).getBytes());
+ Data data = TestHelper.buildData(interest.getName(), "");
+ data.getMetaInfo().setFreshnessPeriod(0);
+
+ try {
+ for (Data segment : SegmentedServerHelper.segment(data, bytes, segmentSize)) {
+ logger.log(Level.INFO, "Put data: " + segment.getName().toUri());
+ face.putData(segment);
+ }
+ } catch (IOException ex) {
+ logger.log(Level.SEVERE, "Failed to put data.", ex);
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/SegmentedClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
similarity index 67%
rename from src/test/java/com/intel/jndn/utils/SegmentedClientTest.java
rename to src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
index 2ee0ae4..67058d6 100644
--- a/src/test/java/com/intel/jndn/utils/SegmentedClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
@@ -11,47 +11,52 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.client.impl;
import com.intel.jndn.mock.MockFace;
+import com.intel.jndn.utils.TestHelper;
+import com.intel.jndn.utils.client.SegmentationType;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
-import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.InterestFilter;
import net.named_data.jndn.Name;
-import net.named_data.jndn.Name.Component;
import net.named_data.jndn.OnInterestCallback;
import net.named_data.jndn.util.Blob;
-import org.junit.Test;
import static org.junit.Assert.*;
-import org.junit.Before;
+import org.junit.Test;
/**
- * Test SegmentedClient functionality.
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public class SegmentedClientTest {
+public class AdvancedClientTest {
- private static final Logger logger = Logger.getLogger(SimpleClient.class.getName());
- private SegmentedClient instance;
- private MockFace face;
+ MockFace face = new MockFace();
+ AdvancedClient instance = new AdvancedClient();
- @Before
- public void beforeTest() {
- face = new MockFace();
- instance = new SegmentedClient(1, 100); // warning: setting an interest
- // lifetime that is too low will cause the getSync() tests to fail due to
- // Thread.sleep()
+ @Test
+ public void testRetries() throws Exception {
+ Name name = new Name("/test/advanced/client");
+ Interest interest = new Interest(name, 1);
+
+ CompletableFuture<Data> future = instance.getAsync(face, interest);
+
+ while (!future.isDone()) {
+ face.processEvents();
+ }
+
+ assertTrue(future.isCompletedExceptionally());
}
@Test
public void testGetSync() throws Exception {
Name name = new Name("/segmented/data");
+
face.registerPrefix(name, new OnInterestCallback() {
private int count = 0;
private int max = 9;
@@ -59,10 +64,10 @@
@Override
public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
Data data = new Data(interest.getName());
- if (!SegmentedClient.hasSegment(data.getName())) {
+ if (!SegmentationHelper.isSegmented(data.getName(), SegmentationType.SEGMENT.value())) {
data.getName().appendSegment(0);
}
- data.getMetaInfo().setFinalBlockId(Component.fromNumberWithMarker(max, 0x00));
+ data.getMetaInfo().setFinalBlockId(Name.Component.fromNumberWithMarker(max, 0x00));
data.setContent(new Blob("."));
try {
face.putData(data);
@@ -72,35 +77,11 @@
}
}, null);
- logger.info("Client retrieving segments synchronously: " + name.toUri());
Data data = instance.getSync(face, new Name(name).appendSegment(0));
assertEquals(10, data.getContent().size());
}
-
- /**
- * Test that a failed request fails with an exception.
- *
- * @throws java.lang.Exception
- */
-// @Test(expected = ExecutionException.class)
-// public void testFailureToRetrieve() throws Exception {
-// // retrieve non-existent data, should timeout
-// logger.info("Client retrieving segments asynchronously: /test/no-data");
-// Future<Data> futureData = instance.getAsync(face, new Name("/test/no-data"));
-// face.processEvents();
-// futureData.get();
-// }
-
-// /**
-// * Test that a sync failed request fails with an exception.
-// */
-// @Test(expected = IOException.class)
-// public void testSyncFailureToRetrieve() throws IOException {
-// logger.info("Client retrieving segments synchronously: /test/no-data");
-// instance.getSync(face, new Name("/test/no-data"));
-// }
-
- /**
+
+ /**
* Verify that Data returned with a different Name than the Interest is still
* segmented correctly.
*
@@ -116,7 +97,6 @@
Name name = new Name("/a/b");
face.addResponse(name, segments.get(0));
- logger.info("Client retrieving segments synchronously: " + name.toUri());
Data data = instance.getSync(face, name);
assertNotNull(data);
assertEquals("/a/b/c/d", data.getName().toUri());
@@ -134,7 +114,6 @@
Data data = TestHelper.buildData(name, "", 0);
face.addResponse(name, data);
- logger.info("Client retrieving segments asynchronously: /test/no-content");
Future<Data> result = instance.getAsync(face, name);
face.processEvents();
assertEquals("/test/no-content", result.get().getName().toUri());
@@ -159,7 +138,6 @@
data1.getMetaInfo().setFinalBlockId(Name.Component.fromNumberWithMarker(1, 0x00));
face.addResponse(data2.getName(), data2);
- logger.info("Client retrieving segments asynchronously: /test/content-length");
Future<Data> result = instance.getAsync(face, new Name("/test/content-length").appendSegment(0));
face.processEvents();
face.processEvents();
@@ -167,8 +145,8 @@
}
/**
- * If a Data packet does not have a FinalBlockId, the SegmentedClient should
- * just return the packet.
+ * If a Data packet does not have a FinalBlockId, the AdvancedClient should
+ just return the packet.
*
* @throws Exception
*/
@@ -179,7 +157,6 @@
data.setContent(new Blob("1"));
face.addResponse(name, data);
- logger.info("Client retrieving segments asynchronously: /test/no-final-block-id");
Future<Data> result = instance.getAsync(face, name);
face.processEvents();
assertEquals("/test/no-final-block-id", result.get().getName().toUri());
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
new file mode 100644
index 0000000..39b6586
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
@@ -0,0 +1,76 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.TestHelper;
+import java.util.concurrent.ExecutionException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test that segment data packets are re-assembled correctly.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class DataAssemblerTest {
+
+ @Test
+ public void testReassembleMany() throws InterruptedException, ExecutionException {
+ Name name = new Name("/data/assembly");
+ Data[] packets = TestHelper.buildSegments(name, 0, 10).toArray(new Data[]{});
+ byte marker = 0x00;
+ DataAssembler instance = new DataAssembler(packets, marker);
+
+ Data reassembled = instance.assemble();
+ assertEquals(name.toUri(), reassembled.getName().toUri()); // tests name-shortening logic
+ assertEquals("0123456789", reassembled.getContent().toString());
+ }
+
+ @Test
+ public void testReassembleOne() throws InterruptedException, ExecutionException {
+ Name name = new Name("/data/assembly");
+ Data onlyPacket = TestHelper.buildData(name, "...");
+ Data[] packets = new Data[]{onlyPacket};
+ byte marker = 0x00;
+ DataAssembler instance = new DataAssembler(packets, marker);
+
+ Data reassembled = instance.assemble();
+ assertEquals(name.toUri(), reassembled.getName().toUri()); // tests name-shortening logic
+ assertEquals("...", reassembled.getContent().toString());
+ }
+
+ @Test
+ public void testReassembleNoContent() throws InterruptedException, ExecutionException {
+ Name name = new Name("/data/assembly");
+ Data onlyPacket = TestHelper.buildData(name, "");
+ Data[] packets = new Data[]{onlyPacket};
+ byte marker = 0x00;
+ DataAssembler instance = new DataAssembler(packets, marker);
+
+ Data reassembled = instance.assemble();
+ assertEquals(name.toUri(), reassembled.getName().toUri()); // tests name-shortening logic
+ assertEquals("", reassembled.getContent().toString());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testReassembleNone() throws InterruptedException, ExecutionException {
+ Data[] packets = new Data[]{};
+ byte marker = 0x00;
+ DataAssembler instance = new DataAssembler(packets, marker);
+
+ Data reassembled = instance.assemble();
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
new file mode 100644
index 0000000..aa0549a
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
@@ -0,0 +1,74 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.mock.MockFace;
+import com.intel.jndn.utils.TestHelper.TestCounter;
+import java.io.IOException;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+import net.named_data.jndn.encoding.EncodingException;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test DefaultRetryClient
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class DefaultRetryClientTest {
+
+ DefaultRetryClient client = new DefaultRetryClient(3);
+ MockFace face = new MockFace();
+ Name name = new Name("/test/retry/client");
+ Interest interest = new Interest(name, 0.0);
+ TestCounter counter = new TestCounter();
+
+ @Test
+ public void testRetry() throws Exception {
+
+ client.retry(face, interest, new OnData() {
+ @Override
+ public void onData(Interest interest, Data data) {
+ counter.count++;
+ }
+ }, new OnTimeout() {
+ @Override
+ public void onTimeout(Interest interest) {
+ fail("Should not timeout.");
+ }
+ });
+ assertEquals(1, client.totalRetries());
+
+ timeoutAndVerifyRetry(2);
+ timeoutAndVerifyRetry(3);
+ respondToRetryAttempt();
+ }
+
+ private void timeoutAndVerifyRetry(int retryCount) throws Exception {
+ Thread.sleep(1);
+ face.processEvents();
+ assertEquals(retryCount, client.totalRetries());
+ assertEquals(0, counter.count);
+ }
+
+ protected void respondToRetryAttempt() throws IOException, EncodingException {
+ face.getTransport().respondWith(new Data(name));
+ face.processEvents();
+ assertEquals(1, counter.count);
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
new file mode 100644
index 0000000..2d65724
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
@@ -0,0 +1,94 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.mock.MockFace;
+import com.intel.jndn.utils.TestHelper;
+import com.intel.jndn.utils.TestHelper.TestCounter;
+import com.intel.jndn.utils.client.DataStream;
+import java.util.Random;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ * Test DefaultSegmentedClient
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class DefaultSegmentedClientTest {
+
+ DefaultSegmentedClient instance = new DefaultSegmentedClient();
+
+ @Test
+ public void testGetSegmentsAsync() throws Exception {
+ MockFace face = new MockFace();
+ Name name = new Name("/test/segmented/client");
+ Interest interest = new Interest(name);
+ DataStream stream = instance.getSegmentsAsync(face, interest);
+
+ TestCounter counter = new TestCounter();
+ stream.observe((i, d) -> {
+ counter.count++;
+ });
+
+ for (Data segment : TestHelper.buildSegments(name, 0, 5)) {
+ stream.onData(interest, segment);
+ }
+
+ assertEquals(5, counter.count);
+ assertEquals("01234", stream.assemble().getContent().toString());
+ }
+
+ @Test
+ public void testReplacingFinalComponents() throws Exception {
+ long segmentNumber = 99;
+
+ Name name1 = new Name("/a/b/c");
+ Interest interest1 = new Interest(name1);
+ Interest copied1 = instance.replaceFinalComponent(interest1, segmentNumber, (byte) 0x00);
+ assertEquals(segmentNumber, copied1.getName().get(-1).toSegment());
+
+ Name name2 = new Name("/a/b/c").appendSegment(17);
+ Interest interest2 = new Interest(name2);
+ Interest copied2 = instance.replaceFinalComponent(interest2, segmentNumber, (byte) 0x00);
+ assertEquals(segmentNumber, copied2.getName().get(-1).toSegment());
+
+ assertEquals(copied1.toUri(), copied2.toUri());
+ }
+
+ @Test
+ public void verifyThatSegmentsAreRetrievedOnlyOnce() throws Exception {
+ MockFace face = new MockFace();
+ Name name = new Name("/test/segmented/client");
+ Interest interest = new Interest(name);
+ DataStream stream = instance.getSegmentsAsync(face, interest);
+
+ TestCounter counter = new TestCounter();
+ stream.observe((i, d) -> {
+ counter.count++;
+ });
+
+ for (Data segment : TestHelper.buildSegments(name, 0, 5)) {
+ face.putData(segment);
+ face.processEvents();
+ }
+
+ assertEquals(5, counter.count);
+ assertEquals(5, face.getTransport().getSentInterestPackets().size());
+ assertEquals("01234", stream.assemble().getContent().toString());
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
new file mode 100644
index 0000000..aaf43df
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
@@ -0,0 +1,48 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.TestHelper;
+import com.intel.jndn.utils.client.OnException;
+import java.io.InputStream;
+import net.named_data.jndn.Name;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class DefaultStreamingClientTest {
+
+ DefaultStreamingClient instance = new DefaultStreamingClient();
+
+ @Test
+ public void testGetAsyncStream() throws Exception {
+ SegmentedDataStream in = new SegmentedDataStream();
+ InputStream out = instance.getStreamAsync(in, new OnException() {
+ @Override
+ public void onException(Exception exception) {
+ fail("Streaming failed: " + exception);
+ }
+ });
+
+ Name name = new Name("/test/streaming/client");
+ for (int i = 0; i < 10; i++) {
+ in.onData(null, TestHelper.buildData(new Name(name).appendSegment(i), "."));
+ int c = out.read();
+ assertEquals((int) '.', c);
+ }
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java b/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
new file mode 100644
index 0000000..2f165b9
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
@@ -0,0 +1,138 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.TestHelper;
+import java.util.ArrayList;
+import java.util.stream.IntStream;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.Name.Component;
+import net.named_data.jndn.encoding.EncodingException;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentedDataStreamTest {
+
+ SegmentedDataStream instance = new SegmentedDataStream();
+
+ @Test
+ public void testAddingSequentialData() throws StreamException {
+ Name name = new Name("/test/segmented/data/stream");
+ Interest interest = new Interest(name);
+
+ instance.onData(interest, TestHelper.buildData(new Name(name).appendSegment(0), "."));
+ instance.onData(interest, TestHelper.buildData(new Name(name).appendSegment(1), "."));
+ instance.onData(interest, TestHelper.buildData(new Name(name).appendSegment(2), "."));
+
+ assertEquals(3, instance.list().length);
+ assertEquals("...", instance.assemble().getContent().toString());
+ }
+
+ @Test
+ public void testAddingUnorderedData() throws StreamException {
+ Name name = new Name("/test/segmented/data/stream");
+ Interest interest = new Interest(name);
+ ArrayList<Long> segments = new ArrayList<>();
+
+ instance.observe((i, d) -> {
+ try {
+ segments.add(d.getName().get(-1).toSegment());
+ } catch (EncodingException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+
+ instance.onData(interest, TestHelper.buildData(new Name(name).appendSegment(1), "."));
+ instance.onData(interest, TestHelper.buildData(new Name(name).appendSegment(0), "."));
+ instance.onData(interest, TestHelper.buildData(new Name(name).appendSegment(2), "."));
+
+ assertEquals(3, instance.list().length);
+ assertEquals("...", instance.assemble().getContent().toString());
+ assertArrayEquals(new Long[]{(long) 0, (long) 1, (long) 2}, segments.toArray(new Long[]{}));
+ }
+
+ @Test
+ public void testAddingOneData() throws StreamException {
+ Name name = new Name("/test/segmented/data/stream");
+ Interest interest = new Interest(name);
+
+ instance.onData(interest, TestHelper.buildData(new Name(name), "."));
+
+ assertEquals(1, instance.list().length);
+ assertEquals(".", instance.assemble().getContent().toString());
+ assertEquals(0, instance.current());
+ assertEquals(0, instance.end());
+ }
+
+ @Test
+ public void testFinalBlockId(){
+ Name name = new Name("/test/final/block/id");
+ Interest interest = new Interest(name);
+ Data data = TestHelper.buildData(name, ".");
+ data.getName().appendSegment(0);
+ data.getMetaInfo().setFinalBlockId(Component.fromNumberWithMarker(10, 0x00));
+
+ instance.onData(interest, data);
+
+ assertEquals(10, instance.end());
+ assertEquals(0, instance.current());
+ }
+
+ @Test
+ public void testOrderedPackets() {
+ int end = 10;
+ IntStream.range(0, end).forEach((i) -> {
+ addPacketToInstance(i);
+ assertEquals(i, instance.current());
+ });
+
+ assertEquals(end, instance.list().length);
+ }
+
+ @Test
+ public void testUnorderedPackets() {
+ addPacketToInstance(1);
+ assertEquals(-1, instance.current());
+
+ addPacketToInstance(0);
+ assertEquals(1, instance.current());
+
+ addPacketToInstance(3);
+ assertEquals(1, instance.current());
+
+ addPacketToInstance(5);
+ assertEquals(1, instance.current());
+
+ addPacketToInstance(4);
+ assertEquals(1, instance.current());
+
+ addPacketToInstance(2);
+ assertEquals(5, instance.current());
+
+ addPacketToInstance(99);
+ assertEquals(7, instance.list().length);
+ }
+
+ protected void addPacketToInstance(long i) {
+ Name name = new Name().appendSegment(i);
+ instance.onData(new Interest(name), new Data(name));
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/SimpleClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
similarity index 96%
rename from src/test/java/com/intel/jndn/utils/SimpleClientTest.java
rename to src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
index 35883a9..41e461c 100644
--- a/src/test/java/com/intel/jndn/utils/SimpleClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
@@ -11,8 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.client.impl;
+import com.intel.jndn.utils.client.impl.SimpleClient;
import com.intel.jndn.mock.MockFace;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -76,7 +77,7 @@
SimpleClient client = new SimpleClient();
Future<Data> futureData = client.getAsync(face, new Name("/test/async"));
assertTrue(!futureData.isDone());
-
+
// process events to retrieve data
face.processEvents();
assertTrue(futureData.isDone());
@@ -97,14 +98,13 @@
// wait for NDN timeout
Thread.sleep(2);
face.processEvents();
-
+
// verify that the client is completing the future with a TimeoutException
assertTrue(futureData.isDone());
assertTrue(futureData.isCompletedExceptionally());
- try{
+ try {
futureData.get();
- }
- catch(ExecutionException e){
+ } catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
}
@@ -112,11 +112,11 @@
@Test(expected = Exception.class)
public void testAsyncFailureToRetrieve() throws Exception {
Face face = new MockFace();
-
+
logger.info("Client expressing interest asynchronously: /test/no-data");
Interest interest = new Interest(new Name("/test/no-data"), 10);
Future future = SimpleClient.getDefault().getAsync(face, interest);
-
+
face.processEvents();
future.get(15, TimeUnit.MILLISECONDS);
}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
new file mode 100644
index 0000000..f49525b
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
@@ -0,0 +1,135 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.impl.SimpleClient;
+import com.intel.jndn.utils.TestHelper;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.InterestFilter;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterestCallback;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.util.Blob;
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import org.junit.Test;
+
+/**
+ * Test SimpleClient.java; requires a hostname to an NFD accepting a generated
+ * key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SimpleClientTestIT {
+
+ private static final Logger logger = Logger.getLogger(SimpleClientTestIT.class.getName());
+ private static final Name PREFIX_RETRIEVE = new Name("/test/simple-client/retrieve-one").append(TestHelper.buildRandomString(10));
+ private static final Name PREFIX_RETRIEVE_MULTIPLE = new Name("/test/simple-client/retrieve-multiple").append(TestHelper.buildRandomString(10));
+
+ SimpleClient instance;
+ private final TestHelper.NdnEnvironment environment;
+
+ public SimpleClientTestIT() throws SecurityException {
+ String ip = System.getProperty("nfd.ip");
+ environment = TestHelper.buildTestEnvironment(ip, 2);
+ instance = SimpleClient.getDefault();
+ }
+
+ @Test
+ public void testRetrieval() throws Exception {
+ // setup faces
+ Face producerFace = environment.faces.get(0);
+ Face consumerFace = environment.faces.get(1);
+
+ // setup server
+ Data servedData = new Data();
+ servedData.setContent(new Blob("....."));
+ servedData.getMetaInfo().setFreshnessPeriod(0);
+ producerFace.registerPrefix(PREFIX_RETRIEVE, new DataServer(servedData), null);
+
+ // TODO this must be here until the prefix registration callback is complete
+ Thread.sleep(500);
+
+ // send interest
+ CompletableFuture<Data> future = instance.getAsync(consumerFace, PREFIX_RETRIEVE);
+ Data retrievedData = future.get();
+
+ // verify
+ Assert.assertEquals(servedData.getContent().toString(), retrievedData.getContent().toString());
+ }
+
+ @Test
+ public void testSyncRetrieval() throws Exception {
+ String ip = System.getProperty("nfd.ip");
+ Face face = new Face(ip);
+ Interest interest = new Interest(new Name("/localhop/nfd/rib")); // this query should return some data if NFD is running locally
+ interest.setInterestLifetimeMilliseconds(2000);
+ Data data = SimpleClient.getDefault().getSync(face, interest);
+ assertNotNull(data);
+ }
+
+ @Test
+ public void testMultipleRetrieval() throws Exception {
+ int numInterests = 100;
+
+ // setup faces
+ Face producerFace = environment.faces.get(0);
+ Face consumerFace = environment.faces.get(1);
+
+ // setup server
+ Data servedData = new Data();
+ servedData.setContent(new Blob("0123456789"));
+ servedData.getMetaInfo().setFreshnessPeriod(0);
+ producerFace.registerPrefix(PREFIX_RETRIEVE_MULTIPLE, new DataServer(servedData), null);
+
+ // this must be here until the prefix registration callback is complete
+ Thread.sleep(500);
+
+ // request all packets
+ Stream<CompletableFuture<Data>> futures = IntStream.range(0, numInterests)
+ .boxed().map((i) -> instance.getAsync(consumerFace, PREFIX_RETRIEVE_MULTIPLE));
+
+ // check all returned packets
+ futures.map((f) -> TestHelper.retrieve(f))
+ .forEach((d) -> assertEquals(servedData.getContent().toString(), d.getContent().toString()));
+ }
+
+ private class DataServer implements OnInterestCallback {
+
+ private Data data;
+
+ public DataServer(Data data) {
+ this.data = data;
+ }
+
+ @Override
+ public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
+ data.setName(interest.getName());
+ try {
+ face.putData(data);
+ } catch (IOException ex) {
+ logger.log(Level.SEVERE, "Failed to put data.", ex);
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/server/pipeline/CompressionStageTest.java b/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
similarity index 88%
rename from src/test/java/com/intel/jndn/utils/server/pipeline/CompressionStageTest.java
rename to src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
index 12fcd28..763c361 100644
--- a/src/test/java/com/intel/jndn/utils/server/pipeline/CompressionStageTest.java
+++ b/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
@@ -11,9 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server.pipeline;
+package com.intel.jndn.utils.processing.impl;
-import com.intel.jndn.utils.SegmentedClient;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Name;
@@ -28,7 +28,7 @@
*/
public class CompressionStageTest {
- private static final Logger logger = Logger.getLogger(SegmentedClient.class.getName());
+ private static final Logger logger = Logger.getLogger(AdvancedClient.class.getName());
/**
* Test of process method, of class CompressionStage.
diff --git a/src/test/java/com/intel/jndn/utils/server/pipeline/SigningStageTest.java b/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
similarity index 97%
rename from src/test/java/com/intel/jndn/utils/server/pipeline/SigningStageTest.java
rename to src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
index 4f10ab4..107e6c1 100644
--- a/src/test/java/com/intel/jndn/utils/server/pipeline/SigningStageTest.java
+++ b/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server.pipeline;
+package com.intel.jndn.utils.processing.impl;
import com.intel.jndn.mock.MockKeyChain;
import net.named_data.jndn.Data;
diff --git a/src/test/java/com/intel/jndn/utils/repository/ForLoopRepositoryTest.java b/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
similarity index 94%
rename from src/test/java/com/intel/jndn/utils/repository/ForLoopRepositoryTest.java
rename to src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
index 1a73a01..5691b8b 100644
--- a/src/test/java/com/intel/jndn/utils/repository/ForLoopRepositoryTest.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.repository.impl;
/**
* Test {@link ForLoopRepository}.
diff --git a/src/test/java/com/intel/jndn/utils/repository/RepoHelper.java b/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
similarity index 96%
rename from src/test/java/com/intel/jndn/utils/repository/RepoHelper.java
rename to src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
index 7a624b0..fac32f9 100644
--- a/src/test/java/com/intel/jndn/utils/repository/RepoHelper.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.repository.impl;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
diff --git a/src/test/java/com/intel/jndn/utils/repository/RepositoryTest.java b/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
similarity index 96%
rename from src/test/java/com/intel/jndn/utils/repository/RepositoryTest.java
rename to src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
index f39e028..d981f91 100644
--- a/src/test/java/com/intel/jndn/utils/repository/RepositoryTest.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
@@ -11,9 +11,10 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.repository;
+package com.intel.jndn.utils.repository.impl;
-import static com.intel.jndn.utils.repository.RepoHelper.*;
+import com.intel.jndn.utils.Repository;
+import static com.intel.jndn.utils.repository.impl.RepoHelper.*;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
diff --git a/src/test/java/com/intel/jndn/utils/server/SegmentedServerHelperTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java
similarity index 98%
rename from src/test/java/com/intel/jndn/utils/server/SegmentedServerHelperTest.java
rename to src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java
index db4a8eb..b93e1a8 100644
--- a/src/test/java/com/intel/jndn/utils/server/SegmentedServerHelperTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server;
+package com.intel.jndn.utils.server.impl;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
diff --git a/src/test/java/com/intel/jndn/utils/SegmentedServerTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
similarity index 86%
rename from src/test/java/com/intel/jndn/utils/SegmentedServerTest.java
rename to src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
index b6d271b..71f505e 100644
--- a/src/test/java/com/intel/jndn/utils/SegmentedServerTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
@@ -11,8 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server.impl;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
import com.intel.jndn.mock.MockFace;
import java.io.IOException;
import net.named_data.jndn.Data;
@@ -38,7 +39,7 @@
@Test
public void testAddPipelineStage() {
- instance.addPipelineStage(null);
+ instance.addPostProcessingStage(null);
}
@Test
@@ -53,7 +54,7 @@
Data in = new Data(new Name("/test/prefix/serve"));
in.setContent(new Blob("1234"));
instance.serve(in);
- Data out = SegmentedClient.getDefault().getSync(face, new Name("/test/prefix/serve"));
+ Data out = AdvancedClient.getDefault().getSync(face, new Name("/test/prefix/serve"));
assertEquals(in.getContent(), out.getContent());
assertEquals(in.getName().toUri(), out.getName().toUri());
assertEquals("1234", out.getContent().toString());
@@ -66,12 +67,12 @@
instance.serve(in);
Thread.sleep(10);
- Data out = SegmentedClient.getDefault().getSync(face, new Name("/test"));
+ Data out = AdvancedClient.getDefault().getSync(face, new Name("/test"));
assertNotNull(out);
assertEquals(in.getName(), out.getName());
instance.cleanup();
- SegmentedClient.getDefault().getSync(face, new Name("/test"));
+ AdvancedClient.getDefault().getSync(face, new Name("/test"));
}
@Test
@@ -86,7 +87,7 @@
Interest interest = new Interest(new Name("/test/prefix/a/b"))
.setChildSelector(Interest.CHILD_SELECTOR_RIGHT).setInterestLifetimeMilliseconds(100);
- Data out = SegmentedClient.getDefault().getSync(face, interest);
+ Data out = AdvancedClient.getDefault().getSync(face, interest);
assertNotNull(out);
assertEquals("/test/prefix/a/b/c/1", out.getName().toUri());
diff --git a/src/test/java/com/intel/jndn/utils/SegmentedServerTestIT.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
similarity index 77%
rename from src/test/java/com/intel/jndn/utils/SegmentedServerTestIT.java
rename to src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
index 49d054f..a3409a3 100644
--- a/src/test/java/com/intel/jndn/utils/SegmentedServerTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
@@ -11,9 +11,11 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server.impl;
import com.intel.jndn.mock.MockKeyChain;
+import com.intel.jndn.utils.TestHelper;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
import java.io.IOException;
import java.util.logging.Logger;
import net.named_data.jndn.Data;
@@ -36,24 +38,23 @@
public class SegmentedServerTestIT {
private static final Logger logger = Logger.getLogger(SegmentedServerTestIT.class.getName());
- Name prefix;
+ private static final Name PREFIX = new Name("/test/for/segmented-server");
+ protected static final int DATA_SIZE_BYTES = 10000;
Face face;
SegmentedServer instance;
private String ip;
public SegmentedServerTestIT() throws SecurityException {
this.ip = System.getProperty("nfd.ip");
- this.prefix = new Name("/test/for/segmented-server");
this.face = new Face(ip);
- this.instance = new SegmentedServer(face, prefix);
+ this.instance = new SegmentedServer(face, PREFIX);
KeyChain mockKeyChain = MockKeyChain.configure(new Name("/test/server"));
face.setCommandSigningInfo(mockKeyChain, mockKeyChain.getDefaultCertificateName());
}
@Test
public void testRegisterAndRetrieval() throws Exception {
- final Name name1 = new Name(prefix).append("1");
- final Name name2 = new Name(prefix).append("2");
+ final Name dataName = new Name(PREFIX).append("1");
// why a new thread? The server will only operate if someone turns the crank,
// i.e. someone calls face.processEvents() every so often.
@@ -61,8 +62,7 @@
@Override
public void run() {
try {
- instance.serve(buildDataPacket(name1));
- instance.serve(buildDataPacket(name2));
+ instance.serve(buildDataPacket(dataName));
} catch (IOException ex) {
logger.info("Failed to serve data.");
}
@@ -77,27 +77,27 @@
}
}).start();
- // why wait? we want to make sure the thread is running and that the prefix
+ // TODO why wait? we want to make sure the thread is running and that the prefix
// registration has succeeded on the NFD before we send interests
Thread.sleep(1000);
// why a different face? because we don't want the abover face.processEvents()
// to interfere with the SimpleClient's processEvents().
- logger.info("Retrieving data: " + prefix.toUri());
- Interest interest = new Interest(new Name(prefix));
+ logger.info("Retrieving data: " + dataName.toUri());
+ Interest interest = new Interest(dataName);
interest.setInterestLifetimeMilliseconds(2000);
interest.setMustBeFresh(true);
interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
-
- Data retrieved = SegmentedClient.getDefault().getSync(new Face(ip), interest);
+
+ Data retrieved = AdvancedClient.getDefault().getSync(new Face(ip), interest);
assertNotNull(retrieved);
- assertEquals("...", retrieved.getContent().toString());
+ assertEquals(DATA_SIZE_BYTES, retrieved.getContent().size());
logger.info("Retrieved data: " + retrieved.getName().toUri());
}
Data buildDataPacket(Name name) {
Data data = new Data(new Name(name).appendSegment(0));
- data.setContent(new Blob("..."));
+ data.setContent(new Blob(TestHelper.buildRandomBytes(DATA_SIZE_BYTES)));
data.getMetaInfo().setFreshnessPeriod(30000);
return data;
}
diff --git a/src/test/java/com/intel/jndn/utils/server/ServerBaseImplTest.java b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
similarity index 90%
rename from src/test/java/com/intel/jndn/utils/server/ServerBaseImplTest.java
rename to src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
index f0ecea0..767a021 100644
--- a/src/test/java/com/intel/jndn/utils/server/ServerBaseImplTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
@@ -11,8 +11,9 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils.server;
+package com.intel.jndn.utils.server.impl;
+import com.intel.jndn.utils.ProcessingStage;
import com.intel.jndn.mock.MockFace;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -72,17 +73,17 @@
}
/**
- * Test of addPipelineStage method, of class ServerBaseImpl.
+ * Test of addProcessingStage method, of class ServerBaseImpl.
*/
@Test(expected = Exception.class)
public void testPipeline() throws Exception {
- PipelineStage<Data, Data> pipelineStage = new PipelineStage<Data, Data>() {
+ ProcessingStage<Data, Data> pipelineStage = new ProcessingStage<Data, Data>() {
@Override
public Data process(Data context) throws Exception {
throw new Exception("Test exceptions with this");
}
};
- instance.addPipelineStage(pipelineStage);
+ instance.addPostProcessingStage(pipelineStage);
instance.processPipeline(new Data());
}
diff --git a/src/test/java/com/intel/jndn/utils/SimpleServerTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
similarity index 96%
rename from src/test/java/com/intel/jndn/utils/SimpleServerTest.java
rename to src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
index 6910059..e3766b2 100644
--- a/src/test/java/com/intel/jndn/utils/SimpleServerTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
@@ -11,11 +11,11 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server.impl;
-import com.intel.jndn.utils.server.RespondWithData;
import com.intel.jndn.mock.MockFace;
import com.intel.jndn.utils.server.RespondWithBlob;
+import com.intel.jndn.utils.server.RespondWithData;
import java.io.IOException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
@@ -43,7 +43,7 @@
@Test
public void testAddPipelineStage() {
- instance.addPipelineStage(null);
+ instance.addPostProcessingStage(null);
}
@Test