Merge pull request #3 from 01org/pubsub
Initial publish/subscribe implementation
diff --git a/pom.xml b/pom.xml
index 9ef8dbd..19093d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.intel.jndn.utils</groupId>
<artifactId>jndn-utils</artifactId>
- <version>1.0.4</version>
+ <version>1.1.0</version>
<name>jndn-utils</name>
<description>Collection of tools to simplify synchronous and asynchronous data transfer over the NDN network
</description>
@@ -58,6 +58,12 @@
<version>1.1.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.10.19</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<properties>
diff --git a/src/main/java/com/intel/jndn/utils/Cancellation.java b/src/main/java/com/intel/jndn/utils/Cancellation.java
new file mode 100644
index 0000000..1a8e29f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Cancellation.java
@@ -0,0 +1,35 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Represents a token used for canceling some ongoing action. This approach was chosen as it is lighter and more
+ * flexible than a cancellable {@link java.util.concurrent.Future}; perhaps it could be replaced by {@link
+ * AutoCloseable} although this seems to imply something else (however, the language support is a benefit, TODO
+ * re-look at this).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Cancellation {
+ /**
+ * Shortcut for a no-op, already-cancelled cancellation
+ */
+ Cancellation CANCELLED = () -> {/* do nothing */};
+
+ /**
+ * Cancel the action referenced by this token
+ */
+ void cancel();
+}
diff --git a/src/main/java/com/intel/jndn/utils/Client.java b/src/main/java/com/intel/jndn/utils/Client.java
index 711d24c..e38d38d 100644
--- a/src/main/java/com/intel/jndn/utils/Client.java
+++ b/src/main/java/com/intel/jndn/utils/Client.java
@@ -13,17 +13,18 @@
*/
package com.intel.jndn.utils;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
/**
* Base functionality provided by all NDN clients in this package.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface Client {
@@ -38,7 +39,7 @@
* @param interest the {@link Interest} to send over the network
* @return a future {@link Data} packet
*/
- public CompletableFuture<Data> getAsync(Face face, Interest interest);
+ CompletableFuture<Data> getAsync(Face face, Interest interest);
/**
* Convenience method for calling
@@ -50,7 +51,7 @@
* @param name the {@link Name} to wrap inside a default {@link Interest}
* @return a future {@link Data} packet
*/
- public CompletableFuture<Data> getAsync(Face face, Name name);
+ CompletableFuture<Data> getAsync(Face face, Name name);
/**
* Synchronously retrieve the {@link Data} for an {@link Interest}; this will
@@ -64,7 +65,7 @@
* @return a {@link Data} packet
* @throws java.io.IOException if the request fails
*/
- public Data getSync(Face face, Interest interest) throws IOException;
+ Data getSync(Face face, Interest interest) throws IOException;
/**
* Convenience method for calling
@@ -77,5 +78,5 @@
* @return a {@link Data} packet
* @throws java.io.IOException if the request fails
*/
- public Data getSync(Face face, Name name) throws IOException;
+ Data getSync(Face face, Name name) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/ContentStore.java b/src/main/java/com/intel/jndn/utils/ContentStore.java
new file mode 100644
index 0000000..e6ad68c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/ContentStore.java
@@ -0,0 +1,95 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Represents a named content store of NDN data which allows querying by name or interest; TODO merge with Repository
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface ContentStore {
+ /**
+ * Store some content under a name
+ *
+ * @param name the name of the content
+ * @param content the bytes of data
+ */
+ void put(Name name, Blob content);
+
+ /**
+ * Check if the content exists
+ *
+ * @param name the name of the content
+ * @return true if the content exists
+ */
+ boolean has(Name name);
+
+ /**
+ * Check if the content exists
+ *
+ * @param interest a selector-enabled interest to search for content in the store
+ * @return true if the content exists
+ */
+ boolean has(Interest interest);
+
+ /**
+ * Retrieve the content by name
+ *
+ * @param name the name of the content
+ * @return the content if it exists
+ */
+ Optional<Blob> get(Name name);
+
+ /**
+ * Retrieve the content by name
+ *
+ * @param interest the name of the content
+ * @return the content if it exists
+ */
+ Optional<Blob> get(Interest interest);
+
+ /**
+ * Write the stored content to the face as Data packets. If no content exists for the given name, this method should
+ * have no effect.
+ *
+ * @param face the face to write to
+ * @param name the name of the data to write
+ * @throws IOException if the writing fails
+ */
+ void push(Face face, Name name) throws IOException;
+
+ /**
+ * Write the stored content to the face as Data packets. If no content exists for the given name, this method should
+ * have no effect.
+ *
+ * @param face the face to write to
+ * @param interest the interest identifying of the data to write
+ * @throws IOException if the writing fails
+ */
+ void push(Face face, Interest interest) throws IOException;
+
+ /**
+ * Remove all stored content
+ */
+ void clear();
+}
diff --git a/src/main/java/com/intel/jndn/utils/NameTree.java b/src/main/java/com/intel/jndn/utils/NameTree.java
new file mode 100644
index 0000000..a07c791
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/NameTree.java
@@ -0,0 +1,84 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Represent a tree of nodes, branching based on their component values
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface NameTree<T> {
+ /**
+ * @return the full name leading from the root of the tree to this node
+ */
+ Name fullName();
+
+ /**
+ * @return the last component identifying this node; e.g. if the {@link #fullName()} is {@code /a/b/c} for this node,
+ * this method must return {@code c}
+ */
+ Name.Component lastComponent();
+
+ /**
+ * @return the optional content stored at this location in the tree
+ */
+ Optional<T> content();
+
+ /**
+ * @return the children of this node or an empty collection
+ */
+ Collection<NameTree<T>> children();
+
+ /**
+ * @return the parent of this node; note that calling this on the root node will return {@code null}
+ */
+ NameTree<T> parent();
+
+ /**
+ * @param name the full name leading to the location in the tree; if intervening nodes do not yet exist, they will be
+ * created
+ * @param content the content to store at this location; this may overwrite previously existing content, much like a
+ * {@link java.util.Map}
+ * @return a reference to the node inserted
+ */
+ NameTree<T> insert(Name name, T content);
+
+ /**
+ * @param query the name to use as a path through the tree
+ * @return an optional node; if there is no node at the end of the query, the {@link Optional} will be empty
+ */
+ Optional<NameTree<T>> find(Name query);
+
+ /**
+ * @param name the name to use as a path through the tree
+ * @return the removed node or an empty {@link Optional} if the node was not found
+ */
+ Optional<NameTree<T>> delete(Name name);
+
+ /**
+ * @return the count of all nodes in the tree below this one that are non-empty (e.g. have some content)
+ */
+ int count();
+
+ /**
+ * Remove all nodes beneath this one; will have no effect on a leaf node
+ */
+ void clear();
+}
diff --git a/src/main/java/com/intel/jndn/utils/On.java b/src/main/java/com/intel/jndn/utils/On.java
new file mode 100644
index 0000000..96a84d2
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/On.java
@@ -0,0 +1,28 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * Generic callback API for receiving some signal T when an event is fired.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+@FunctionalInterface
+public interface On<T> {
+ /**
+ * @param event the event object describing the occurrence
+ */
+ void on(T event);
+}
diff --git a/src/main/java/com/intel/jndn/utils/PendingInterestTable.java b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
new file mode 100644
index 0000000..3e49f11
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/PendingInterestTable.java
@@ -0,0 +1,46 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+
+/**
+ * Represent a pending interest table; the jndn {@link net.named_data.jndn.impl.PendingInterestTable} depended on
+ * several patterns such as List output parameters for collecting output and did not allow querying without extracting
+ * (see {@link #has(Interest)}).
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface PendingInterestTable {
+ /**
+ * @param interest the PIT entry to add
+ */
+ void add(Interest interest);
+
+ /**
+ * @param interest the incoming interest to match against
+ * @return true if the interest matches an entry already in the PIT
+ */
+ boolean has(Interest interest);
+
+ /**
+ * @param name the name to match against
+ * @return the PIT entries matching a name, removing them from the PIT
+ */
+ Collection<Interest> extract(Name name);
+}
diff --git a/src/main/java/com/intel/jndn/utils/ProcessingStage.java b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
index 68b2649..d665c84 100644
--- a/src/main/java/com/intel/jndn/utils/ProcessingStage.java
+++ b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
@@ -18,7 +18,7 @@
* processing stage may convert a data packet with unencrypted content to one
* with encrypted content.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface ProcessingStage<T, Y> {
@@ -30,5 +30,5 @@
* may be a new object)
* @throws Exception if the processing fails
*/
- public Y process(T input) throws Exception;
+ Y process(T input) throws Exception;
}
diff --git a/src/main/java/com/intel/jndn/utils/Publisher.java b/src/main/java/com/intel/jndn/utils/Publisher.java
new file mode 100644
index 0000000..4a93084
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Publisher.java
@@ -0,0 +1,30 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Publisher extends AutoCloseable {
+ /**
+ * @param message a binary blob to publish to a topic
+ * @throws IOException if the publication fails
+ */
+ void publish(Blob message) throws IOException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/Repository.java b/src/main/java/com/intel/jndn/utils/Repository.java
index fcc0172..6e314f4 100644
--- a/src/main/java/com/intel/jndn/utils/Repository.java
+++ b/src/main/java/com/intel/jndn/utils/Repository.java
@@ -20,7 +20,7 @@
/**
* Define API for storing and retrieving NDN packets
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface Repository {
@@ -29,7 +29,7 @@
*
* @param data a {@link Data} packet
*/
- public void put(Data data);
+ void put(Data data);
/**
* Retrieve a {@link Data} packet in the repository; this method should
@@ -39,7 +39,7 @@
* @return a {@link Data} packet
* @throws DataNotFoundException if the packet is not found
*/
- public Data get(Interest interest) throws DataNotFoundException;
+ Data get(Interest interest) throws DataNotFoundException;
/**
* Check if this repository can satisfy the {@link Interest} with a
@@ -49,10 +49,10 @@
* @param interest the {@link Interest} to attempt to satisfy
* @return true if a {@link Data} exists that satisfies the {@link Interest}
*/
- public boolean satisfies(Interest interest);
+ boolean satisfies(Interest interest);
/**
* Remove all stale {@link Data} packets from the repository.
*/
- public void cleanup();
+ void cleanup();
}
diff --git a/src/main/java/com/intel/jndn/utils/Server.java b/src/main/java/com/intel/jndn/utils/Server.java
index 197a52a..422074c 100644
--- a/src/main/java/com/intel/jndn/utils/Server.java
+++ b/src/main/java/com/intel/jndn/utils/Server.java
@@ -13,12 +13,13 @@
*/
package com.intel.jndn.utils;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Name;
import net.named_data.jndn.OnInterestCallback;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
/**
* Base interface for defining a server; see descendant interfaces for different
* modes of serving packets. This class extends {@link Runnable} expecting
@@ -26,20 +27,20 @@
* {@link Runnable#run()} block, thus allowing different ways to manage servers
* (e.g. single-thread vs {@link ScheduledThreadPoolExecutor}.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface Server extends Runnable, OnInterestCallback {
/**
* @return the {@link Name} prefix this server is serving on.
*/
- public Name getPrefix();
+ Name getPrefix();
/**
* @return the registered prefix ID of the server on the {@link Face} or -1 if
* unregistered
*/
- public long getRegisteredPrefixId();
+ long getRegisteredPrefixId();
/**
* Add a stage to the server pipeline. Each stage should be processed once the
@@ -49,5 +50,5 @@
*
* @param stage a Data-to-Data processing stage
*/
- public void addPostProcessingStage(ProcessingStage<Data, Data> stage);
+ void addPostProcessingStage(ProcessingStage<Data, Data> stage);
}
diff --git a/src/main/java/com/intel/jndn/utils/Subscriber.java b/src/main/java/com/intel/jndn/utils/Subscriber.java
new file mode 100644
index 0000000..2830cbc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Subscriber.java
@@ -0,0 +1,35 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public interface Subscriber extends AutoCloseable {
+ /**
+ * @return the currently known publishers
+ */
+ Set<Long> knownPublishers();
+
+ /**
+ * Open the subscriber transport mechanisms and retrieve publisher messages
+ *
+ * @throws IOException if the subscriber fails during network access
+ */
+ void open() throws IOException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/Topic.java b/src/main/java/com/intel/jndn/utils/Topic.java
new file mode 100644
index 0000000..8816e5d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/Topic.java
@@ -0,0 +1,68 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.pubsub.PubSubFactory;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+
+/**
+ * A publish-subscrib topic; see the {@code pubsub} package for implementation details
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public final class Topic {
+ private final Name name;
+
+ /**
+ * @param name the NDN name of the topic
+ */
+ public Topic(Name name) {
+ this.name = name;
+ }
+
+ /**
+ * @return the NDN name of the topic; the return is wrapped so that the original value is immutable and the return
+ * type can be modified
+ */
+ public Name name() {
+ return new Name(name);
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @param onMessage callback fired when a message is received
+ * @param onError callback fired when an error happens after subscription
+ * @return an open subscriber
+ * @throws IOException if the subscription fails
+ */
+ public Subscriber subscribe(Face face, On<Blob> onMessage, On<Exception> onError) throws IOException {
+ return PubSubFactory.newSubscriber(face, name, onMessage, onError);
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @return a factory-built publisher
+ */
+ public Publisher newPublisher(Face face) {
+ return PubSubFactory.newPublisher(face, name);
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/DataStream.java b/src/main/java/com/intel/jndn/utils/client/DataStream.java
index 7f33870..f558ab4 100644
--- a/src/main/java/com/intel/jndn/utils/client/DataStream.java
+++ b/src/main/java/com/intel/jndn/utils/client/DataStream.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -22,54 +22,56 @@
* Define a stream of {@link Data} packets as they are retrieved from the
* network and provide methods for observing stream events
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * TODO merge with Observable
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface DataStream extends OnData, OnTimeout, OnComplete, OnException {
/**
* @return true if the stream is complete
*/
- public boolean isComplete();
+ boolean isComplete();
/**
* @return the current list of packets retrieved; this may change as more
* packets are added
*/
- public Data[] list();
+ Data[] list();
/**
* @return an assembled packet containing the concatenated bytes of all
* received packets
* @throws StreamException if assembly fails
*/
- public Data assemble() throws StreamException;
+ Data assemble() throws StreamException;
/**
* Watch all {@link OnData} events
*
* @param onData the callback fired
*/
- public void observe(OnData onData);
+ void observe(OnData onData);
/**
* Watch all {@link OnComplete} events
*
* @param onComplete the callback fired
*/
- public void observe(OnComplete onComplete);
+ void observe(OnComplete onComplete);
/**
* Watch all {@link OnException} events
*
* @param onException the callback fired
*/
- public void observe(OnException onException);
+ void observe(OnException onException);
/**
* Watch all {@link OnTimeout} events
*
* @param onTimeout the callback fired
*/
- public void observe(OnTimeout onTimeout);
+ void observe(OnTimeout onTimeout);
}
diff --git a/src/main/java/com/intel/jndn/utils/client/OnComplete.java b/src/main/java/com/intel/jndn/utils/client/OnComplete.java
index 6cbbe86..5bd6811 100644
--- a/src/main/java/com/intel/jndn/utils/client/OnComplete.java
+++ b/src/main/java/com/intel/jndn/utils/client/OnComplete.java
@@ -16,12 +16,12 @@
/**
* Callback fired when an activity is finished.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface OnComplete {
/**
* Called when an activity is finished
*/
- public void onComplete();
+ void onComplete();
}
diff --git a/src/main/java/com/intel/jndn/utils/client/OnException.java b/src/main/java/com/intel/jndn/utils/client/OnException.java
index d4c440d..976a874 100644
--- a/src/main/java/com/intel/jndn/utils/client/OnException.java
+++ b/src/main/java/com/intel/jndn/utils/client/OnException.java
@@ -17,7 +17,7 @@
* Callback fired when an activity throws an exception; this is necessary because
* some actions are performed asynchronously (e.g. in a thread pool) and exceptions
* could otherwise be lost.
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface OnException {
@@ -25,5 +25,5 @@
* Called when an activity throws an exception
* @param exception the exception thrown
*/
- public void onException(Exception exception);
+ void onException(Exception exception);
}
diff --git a/src/main/java/com/intel/jndn/utils/client/RetryClient.java b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
index 9a5dacb..432db0b 100644
--- a/src/main/java/com/intel/jndn/utils/client/RetryClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/RetryClient.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -11,18 +11,21 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
+
package com.intel.jndn.utils.client;
-import java.io.IOException;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.OnData;
import net.named_data.jndn.OnTimeout;
+import java.io.IOException;
+
/**
- * Define a client that can retry {@link Interest} packets on timeout.
+ * Define a client that can retry {@link Interest} packets on timeout. TODO should return Cancellation so that users
+ * can stop the process
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface RetryClient {
@@ -40,5 +43,5 @@
* @param onTimeout the application's failure callback
* @throws IOException when the client cannot perform the necessary network IO
*/
- public void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException;
+ void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentationType.java b/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
index 166d81f..5d7dd88 100644
--- a/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentationType.java
@@ -17,14 +17,14 @@
* Documents known partition types from
* http://named-data.net/doc/tech-memos/naming-conventions.pdf
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public enum SegmentationType {
SEGMENT((byte) 0x00),
BYTE_OFFSET((byte) 0xFB);
- private byte marker;
+ private final byte marker;
SegmentationType(byte marker) {
this.marker = marker;
diff --git a/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java b/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
index d866675..6155398 100644
--- a/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/SegmentedClient.java
@@ -13,15 +13,16 @@
*/
package com.intel.jndn.utils.client;
-import java.io.IOException;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
+import java.io.IOException;
+
/**
* Define a client that can retrieve segmented packets into a
* {@link DataStream}.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface SegmentedClient {
@@ -35,5 +36,5 @@
* @return a data stream of packets returned
* @throws IOException if the initial request fails
*/
- public DataStream getSegmentsAsync(Face face, Interest interest) throws IOException;
+ DataStream getSegmentsAsync(Face face, Interest interest) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/client/StreamingClient.java b/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
index 3ab5fb6..9e75840 100644
--- a/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/StreamingClient.java
@@ -13,16 +13,17 @@
*/
package com.intel.jndn.utils.client;
-import java.io.IOException;
-import java.io.InputStream;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
+import java.io.IOException;
+import java.io.InputStream;
+
/**
* Define a client that can stream content bytes that are partitioned over
* multiple packets.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface StreamingClient {
@@ -39,5 +40,5 @@
* @return a stream of content bytes
* @throws IOException if the stream setup fails
*/
- public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker) throws IOException;
+ InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java b/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
index 2659656..d1cdd51 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/AdvancedClient.java
@@ -13,51 +13,40 @@
*/
package com.intel.jndn.utils.client.impl;
+import com.intel.jndn.utils.client.DataStream;
import com.intel.jndn.utils.client.OnComplete;
import com.intel.jndn.utils.client.OnException;
import com.intel.jndn.utils.client.RetryClient;
-import com.intel.jndn.utils.client.SegmentedClient;
-import com.intel.jndn.utils.client.DataStream;
import com.intel.jndn.utils.client.SegmentationType;
+import com.intel.jndn.utils.client.SegmentedClient;
import com.intel.jndn.utils.client.StreamingClient;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.OnTimeout;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
/**
* Implementation of a client that can handle segmented data, retries after
* failed requests, and streaming of data packets.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class AdvancedClient extends SimpleClient implements SegmentedClient, StreamingClient {
- private static final Logger logger = Logger.getLogger(AdvancedClient.class.getName());
public static final int DEFAULT_MAX_RETRIES = 3;
+ private static final Logger logger = Logger.getLogger(AdvancedClient.class.getName());
private static AdvancedClient defaultInstance;
private final SegmentedClient segmentedClient;
private final RetryClient retryClient;
private final StreamingClient streamingClient;
/**
- * Singleton access for simpler client use
- *
- * @return a default client
- */
- public static AdvancedClient getDefault() {
- if (defaultInstance == null) {
- defaultInstance = new AdvancedClient();
- }
- return defaultInstance;
- }
-
- /**
* Build an advanced client
*
* @param sleepTime for synchronous processing, the time to sleep the thread
@@ -90,6 +79,18 @@
}
/**
+ * Singleton access for simpler client use
+ *
+ * @return a default client
+ */
+ public static AdvancedClient getDefault() {
+ if (defaultInstance == null) {
+ defaultInstance = new AdvancedClient();
+ }
+ return defaultInstance;
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/BackoffRetryClient.java b/src/main/java/com/intel/jndn/utils/client/impl/BackoffRetryClient.java
new file mode 100644
index 0000000..ca8357f
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/client/impl/BackoffRetryClient.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.client.impl;
+
+import com.intel.jndn.utils.client.RetryClient;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BackoffRetryClient implements RetryClient {
+
+ private static final Logger LOGGER = Logger.getLogger(BackoffRetryClient.class.getName());
+ private final double cutoffLifetime;
+ private final int backoffFactor;
+
+ public BackoffRetryClient(double cutoffLifetime, int backoffFactor) {
+ this.cutoffLifetime = cutoffLifetime;
+ this.backoffFactor = backoffFactor;
+ }
+
+ @Override
+ public void retry(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException {
+ retryInterest(face, interest, onData, onTimeout);
+ }
+
+ private void retryInterest(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException {
+ double newLifetime = interest.getInterestLifetimeMilliseconds() * backoffFactor;
+ if (newLifetime < cutoffLifetime) {
+ interest.setInterestLifetimeMilliseconds(newLifetime);
+ resend(face, interest, onData, onTimeout);
+ } else {
+ onTimeout.onTimeout(interest);
+ }
+ }
+
+ private void resend(Face face, Interest interest, OnData onData, OnTimeout onTimeout) throws IOException {
+ LOGGER.log(Level.INFO, "Resending interest with {0}ms lifetime: {1}", new Object[]{interest.getInterestLifetimeMilliseconds(), interest.getName()});
+ face.expressInterest(interest, onData, timedOutInterest -> {
+ try {
+ retryInterest(face, timedOutInterest, onData, onTimeout);
+ } catch (IOException e) {
+ onTimeout.onTimeout(interest);
+ }
+ });
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java b/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
index c1ca4fe..be642fc 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DataAssembler.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -13,18 +13,19 @@
*/
package com.intel.jndn.utils.client.impl;
-import com.intel.jndn.utils.client.SegmentedClient;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
+import com.intel.jndn.utils.impl.SegmentationHelper;
import net.named_data.jndn.Data;
import net.named_data.jndn.util.Blob;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
/**
* Internal class for assembling a list of {@link Data} packets into one large
* data packet; this implementation will use all properties of the first packet
* in the list and concatenate the content bytes of all packets in order.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
class DataAssembler {
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
index 21f1c0e..cb90496 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultRetryClient.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -14,23 +14,24 @@
package com.intel.jndn.utils.client.impl;
import com.intel.jndn.utils.client.RetryClient;
-import java.io.IOException;
-import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.OnData;
import net.named_data.jndn.OnTimeout;
+import java.io.IOException;
+import java.util.logging.Logger;
+
/**
* Default implementation of {@link RetryClient}; on request failure, this class
* immediately retries the request until a maximum number of retries is reached.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class DefaultRetryClient implements RetryClient {
- private static final Logger logger = Logger.getLogger(DefaultRetryClient.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(DefaultRetryClient.class.getName());
private final int numRetriesAllowed;
private volatile int totalRetries = 0;
@@ -61,8 +62,8 @@
* @param context the current request context
* @throws IOException when the client cannot perform the necessary network IO
*/
- synchronized private void retryInterest(RetryContext context) throws IOException {
- logger.info("Retrying interest: " + context.interest.toUri());
+ private synchronized void retryInterest(RetryContext context) throws IOException {
+ LOGGER.info("Retrying interest: " + context.interest.toUri());
context.face.expressInterest(context.interest, context, context);
totalRetries++;
}
@@ -70,7 +71,7 @@
/**
* @return the total number of retries logged by this client
*/
- public int totalRetries() {
+ int totalRetries() {
return totalRetries;
}
@@ -80,20 +81,20 @@
*/
private class RetryContext implements OnData, OnTimeout {
- public int numFailures = 0;
- public final Face face;
- public final Interest interest;
- public final OnData applicationOnData;
- public final OnTimeout applicationOnTimeout;
+ final Face face;
+ final Interest interest;
+ final OnData applicationOnData;
+ final OnTimeout applicationOnTimeout;
+ int numFailures = 0;
- public RetryContext(Face face, Interest interest, OnData applicationOnData, OnTimeout applicationOnTimeout) {
+ RetryContext(Face face, Interest interest, OnData applicationOnData, OnTimeout applicationOnTimeout) {
this.face = face;
this.interest = interest;
this.applicationOnData = applicationOnData;
this.applicationOnTimeout = applicationOnTimeout;
}
- public boolean shouldRetry() {
+ boolean shouldRetry() {
return numFailures < numRetriesAllowed;
}
@@ -105,7 +106,7 @@
@Override
public void onTimeout(Interest interest) {
numFailures++;
- logger.finest("Request failed, count " + numFailures + ": " + interest.toUri());
+ LOGGER.finest("Request failed, count " + numFailures + ": " + interest.toUri());
if (shouldRetry()) {
try {
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
index 83e0baa..74b7144 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClient.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -13,10 +13,9 @@
*/
package com.intel.jndn.utils.client.impl;
-import com.intel.jndn.utils.client.SegmentedClient;
import com.intel.jndn.utils.client.DataStream;
-import java.io.IOException;
-import java.util.logging.Logger;
+import com.intel.jndn.utils.client.SegmentedClient;
+import com.intel.jndn.utils.impl.SegmentationHelper;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -24,18 +23,21 @@
import net.named_data.jndn.Name.Component;
import net.named_data.jndn.OnData;
+import java.io.IOException;
+import java.util.logging.Logger;
+
/**
* Retrieve segments one by one until the FinalBlockId indicates an end segment;
* then request remaining packets. This class currently only handles segmented
* data (not yet byte-offset segmented data).
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class DefaultSegmentedClient implements SegmentedClient {
private static final Logger logger = Logger.getLogger(DefaultSegmentedClient.class.getName());
private static DefaultSegmentedClient defaultInstance;
- private byte marker = 0x00;
+ private final byte marker = 0x00;
/**
* Singleton access for simpler client use
@@ -67,6 +69,26 @@
}
/**
+ * Replace the final component of an interest name with a segmented component;
+ * if the interest name does not have a segmented component, this will add
+ * one.
+ *
+ * @param interest the request
+ * @param segmentNumber a segment number
+ * @param marker a marker to use for segmenting the packet
+ * @return a segmented interest (a copy of the passed interest)
+ */
+ protected Interest replaceFinalComponent(Interest interest, long segmentNumber, byte marker) {
+ Interest copied = new Interest(interest);
+ Component lastComponent = Component.fromNumberWithMarker(segmentNumber, marker);
+ Name newName = (SegmentationHelper.isSegmented(copied.getName(), marker))
+ ? copied.getName().getPrefix(-1)
+ : new Name(copied.getName());
+ copied.setName(newName.append(lastComponent));
+ return copied;
+ }
+
+ /**
* Helper class to track the last requested segment for a given request
* context and request follow-on packets
*/
@@ -126,24 +148,4 @@
}
}
- /**
- * Replace the final component of an interest name with a segmented component;
- * if the interest name does not have a segmented component, this will add
- * one.
- *
- * @param interest the request
- * @param segmentNumber a segment number
- * @param marker a marker to use for segmenting the packet
- * @return a segmented interest (a copy of the passed interest)
- */
- protected Interest replaceFinalComponent(Interest interest, long segmentNumber, byte marker) {
- Interest copied = new Interest(interest);
- Component lastComponent = Component.fromNumberWithMarker(segmentNumber, marker);
- Name newName = (SegmentationHelper.isSegmented(copied.getName(), marker))
- ? copied.getName().getPrefix(-1)
- : new Name(copied.getName());
- copied.setName(newName.append(lastComponent));
- return copied;
- }
-
}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
index 55fa0f0..ce5db19 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
@@ -18,28 +18,27 @@
import com.intel.jndn.utils.client.SegmentationType;
import com.intel.jndn.utils.client.SegmentedClient;
import com.intel.jndn.utils.client.StreamingClient;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.OnData;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.OnData;
/**
* Default implementation of {@link StreamingClient}; uses a segmented client to
* retrieve packets asynchronously and pipes them to a stream as they are
* received.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class DefaultStreamingClient implements StreamingClient {
-
- private static final Logger logger = Logger.getLogger(DefaultStreamingClient.class.getName());
- private SegmentedClient client;
+ private static final Logger LOGGER = Logger.getLogger(DefaultStreamingClient.class.getName());
/**
* {@inheritDoc}
@@ -50,13 +49,14 @@
}
/**
- *
- * @param face
- * @param interest
- * @param partitionMarker
- * @param onException
- * @return
- * @throws IOException
+ * @param face the {@link Face} on which to make the request; call {@link Face#processEvents()} separately to complete
+ * the request
+ * @param interest the {@link Interest} to send over the network
+ * @param partitionMarker the byte marker identifying how the data packets are partitioned (e.g. segmentation, see
+ * http://named-data.net/doc/tech-memos/naming-conventions.pdf)
+ * @param onException callback fired if a failure occurs during streaming
+ * @return a stream of content bytes
+ * @throws IOException if the stream setup fails
*/
public InputStream getStreamAsync(Face face, Interest interest, SegmentationType partitionMarker, OnException onException) throws IOException {
SegmentedClient client = DefaultSegmentedClient.getDefault();
@@ -64,11 +64,10 @@
}
/**
- *
- * @param onDataStream
- * @param onException
- * @return
- * @throws IOException
+ * @param onDataStream the data stream of incoming data packets
+ * @param onException callback fired if a failure occurs during streaming
+ * @return a stream of content bytes
+ * @throws IOException if the stream setup fails
*/
public InputStream getStreamAsync(DataStream onDataStream, OnException onException) throws IOException {
PipedInputStream in = new PipedInputStream();
@@ -97,7 +96,7 @@
@Override
public void onException(Exception exception) {
- logger.log(Level.SEVERE, "Streaming failed", exception);
+ LOGGER.log(Level.SEVERE, "Streaming failed", exception);
}
}
}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java b/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java
deleted file mode 100644
index 7a051ee..0000000
--- a/src/main/java/com/intel/jndn/utils/client/impl/SegmentationHelper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.client.impl;
-
-import net.named_data.jndn.Name;
-import net.named_data.jndn.encoding.EncodingException;
-
-/**
- * Helper methods for dealing with segmented packets (see
- * http://named-data.net/doc/tech-memos/naming-conventions.pdf).
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentationHelper {
-
- /**
- * Determine if a name is segmented, i.e. if it ends with the correct marker
- * type.
- *
- * @param name the name of a packet
- * @param marker the marker type (the initial byte of the component)
- * @return true if the name is segmented
- */
- public static boolean isSegmented(Name name, byte marker) {
- return name.size() > 0 ? name.get(-1).getValue().buf().get(0) == marker : false;
- }
-
- /**
- * Retrieve the segment number from the last component of a name.
- *
- * @param name the name of a packet
- * @param marker the marker type (the initial byte of the component)
- * @return the segment number
- * @throws EncodingException if the name does not have a final component of
- * the correct marker type
- */
- public static long parseSegment(Name name, byte marker) throws EncodingException {
- if (name.size() == 0) {
- throw new EncodingException("No components to parse.");
- }
- return name.get(-1).toNumberWithMarker(marker);
- }
-
- /**
- * Remove a segment component from the end of a name
- *
- * @param name the name of a packet
- * @param marker the marker type (the initial byte of the component)
- * @return the new name with the segment component removed or a copy of the
- * name if no segment component was present
- */
- public static Name removeSegment(Name name, byte marker) {
- return isSegmented(name, marker) ? name.getPrefix(-1) : new Name(name);
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java b/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
index 41fd86a..43f6839 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/SegmentedDataStream.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -13,9 +13,17 @@
*/
package com.intel.jndn.utils.client.impl;
+import com.intel.jndn.utils.client.DataStream;
import com.intel.jndn.utils.client.OnComplete;
import com.intel.jndn.utils.client.OnException;
-import com.intel.jndn.utils.client.DataStream;
+import com.intel.jndn.utils.impl.SegmentationHelper;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import net.named_data.jndn.OnTimeout;
+import net.named_data.jndn.encoding.EncodingException;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -23,12 +31,6 @@
import java.util.Map;
import java.util.logging.Logger;
import java.util.stream.Collectors;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Interest;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.OnData;
-import net.named_data.jndn.OnTimeout;
-import net.named_data.jndn.encoding.EncodingException;
/**
* As packets are received, they are mapped by their last component's segment
@@ -40,7 +42,7 @@
* data is received; if data is received out of order, the callbacks will not be
* fired until adjoining packets are received.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SegmentedDataStream implements DataStream {
@@ -48,8 +50,8 @@
private final byte PARTITION_MARKER = 0x00;
private volatile long current = -1;
private volatile long end = Long.MAX_VALUE;
- private Map<Long, Data> packets = new HashMap<>();
- private List<Object> observers = new ArrayList<>();
+ private final Map<Long, Data> packets = new HashMap<>();
+ private final List<Object> observers = new ArrayList<>();
private Exception exception;
@Override
@@ -84,7 +86,6 @@
throw new StreamException(exception);
}
- Object o = new LinkedList();
return new DataAssembler(list(), PARTITION_MARKER).assemble();
}
@@ -146,7 +147,7 @@
current++;
assert (packets.containsKey(current));
Data retrieved = packets.get(current);
- observersOfType(OnData.class).stream().forEach((OnData cb) -> {
+ observersOfType(OnData.class).forEach((OnData cb) -> {
cb.onData(interest, retrieved);
});
} while (hasNextPacket());
@@ -172,14 +173,14 @@
@Override
public synchronized void onComplete() {
- observersOfType(OnComplete.class).stream().forEach((OnComplete cb) -> {
+ observersOfType(OnComplete.class).forEach((OnComplete cb) -> {
cb.onComplete();
});
}
@Override
public synchronized void onTimeout(Interest interest) {
- observersOfType(OnTimeout.class).stream().forEach((OnTimeout cb) -> {
+ observersOfType(OnTimeout.class).forEach((OnTimeout cb) -> {
cb.onTimeout(interest);
});
}
@@ -188,7 +189,7 @@
public synchronized void onException(Exception exception) {
this.exception = exception;
- observersOfType(OnException.class).stream().forEach((OnException cb) -> {
+ observersOfType(OnException.class).forEach((OnException cb) -> {
cb.onException(exception);
});
}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java b/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
index 9e2e594..5ebf05a 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/SimpleClient.java
@@ -14,24 +14,25 @@
package com.intel.jndn.utils.client.impl;
import com.intel.jndn.utils.Client;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.OnData;
import net.named_data.jndn.OnTimeout;
-import java.util.logging.Logger;
import net.named_data.jndn.encoding.EncodingException;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
/**
* Provide a client to simplify information retrieval over the NDN network.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SimpleClient implements Client {
@@ -43,18 +44,6 @@
private final long interestLifetime;
/**
- * Singleton access for simpler client use
- *
- * @return a default client
- */
- public static SimpleClient getDefault() {
- if (defaultInstance == null) {
- defaultInstance = new SimpleClient();
- }
- return defaultInstance;
- }
-
- /**
* Build a simple client
*
* @param sleepTime for synchronous processing, the time to sleep the thread
@@ -76,6 +65,18 @@
}
/**
+ * Singleton access for simpler client use
+ *
+ * @return a default client
+ */
+ public static SimpleClient getDefault() {
+ if (defaultInstance == null) {
+ defaultInstance = new SimpleClient();
+ }
+ return defaultInstance;
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -159,7 +160,6 @@
* @return a default interest for the given name
*/
public Interest getDefaultInterest(Name name) {
- Interest interest = new Interest(name, interestLifetime);
- return interest;
+ return new Interest(name, interestLifetime);
}
}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java b/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
index 59ad55a..7cf5c6f 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/StreamException.java
@@ -15,7 +15,7 @@
/**
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class StreamException extends Exception {
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
new file mode 100644
index 0000000..5ed94cb
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTable.java
@@ -0,0 +1,68 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.PendingInterestTable;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * TODO use NameTree for storage? or Set and override Interest equals()
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTable implements PendingInterestTable {
+ private static final Logger LOGGER = Logger.getLogger(BoundedInMemoryPendingInterestTable.class.getName());
+ private final BoundedLinkedMap<Name, Interest> table;
+
+ public BoundedInMemoryPendingInterestTable(int maxSize) {
+ this.table = new BoundedLinkedMap<>(maxSize);
+ }
+
+ @Override
+ public void add(Interest interest) {
+ LOGGER.log(Level.INFO, "Adding pending interest: {0}", interest.toUri());
+ table.put(interest.getName(), interest);
+ }
+
+ @Override
+ public boolean has(Interest interest) {
+ if (interest.getChildSelector() != -1) {
+ for (Name name : table.keySet()) {
+ // TODO this logic must be more complex; must match selectors as well
+ if (interest.matchesName(name)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return has(interest.getName());
+ }
+ }
+
+ public boolean has(Name name) {
+ return table.containsKey(name);
+ }
+
+ @Override
+ public Collection<Interest> extract(Name name) {
+ return table.values().stream().filter(i -> i.matchesName(name)).collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
new file mode 100644
index 0000000..6102a9b
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
@@ -0,0 +1,159 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Linked hash map exposing the earliest and latest entries added; it is bounded to a configurable size to save memory.
+ * When benchmarked against CircularBufferCache, this class had slower insertions but faster reads and was therefore
+ * retained for use.
+ * <p>
+ * It limits the amount of memory by replacing the oldest added element; this involves overriding the LinkedHashMap
+ * implementation's removeEldestEntry() method; see the <a href="https://docs.oracle.com/javase/8/docs/api/java/util/LinkedHashMap.html#removeEldestEntry-java.util.Map.Entry-">Javadoc
+ * entry</a> for more information.
+ * <p>
+ * Additionally, we implemented Josh Bloch's item 16 of Effective Java so that calls are forwarded to the underlying
+ * LinkedHashMap. This allows us to decorate with some custom behavior and synchronize as we need.
+ * <p>
+ * This class is coarsely thread-safe; every public method is synchronized for one-at-a-time access to the underlying
+ * map.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedLinkedMap<K, V> implements Map<K, V> {
+ private final LinkedHashMap<K, V> map;
+ private K latest;
+
+ /**
+ * @param maxSize the maximum allowed number of records to store
+ */
+ public BoundedLinkedMap(int maxSize) {
+ this.map = new LinkedHashMap<K, V>(maxSize) {
+ @Override
+ public boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ return size() > maxSize;
+ }
+ };
+ }
+
+ /**
+ * @return the earliest key added to this set or null if none are added
+ */
+ public synchronized K earliest() {
+ for (K key : map.keySet()) {
+ return key; // the LinkedHashMap guarantees iteration in order of insertion
+ }
+ return null;
+ }
+
+ /**
+ * @return the latest key added to this set or null if none are added
+ */
+ public synchronized K latest() {
+ return latest;
+ }
+
+ @Override
+ public synchronized V put(K key, V value) {
+ latest = key;
+ return map.put(key, value);
+ }
+
+ @Override
+ public synchronized int size() {
+ return map.size();
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ return map.containsValue(value);
+ }
+
+ @Override
+ public synchronized V get(Object key) {
+ return map.get(key);
+ }
+
+
+ @Override
+ public synchronized V remove(Object key) {
+ V value = map.remove(key);
+ if (key == latest) {
+ latest = findLatest(map);
+ }
+ return value;
+ }
+
+ @Override
+ public synchronized void putAll(Map<? extends K, ? extends V> m) {
+ map.putAll(m);
+ latest = findLatest(map);
+ }
+
+ @Override
+ public synchronized void clear() {
+ map.clear();
+ latest = null;
+ }
+
+ @Override
+ public synchronized Set<K> keySet() {
+ return map.keySet();
+ }
+
+ @Override
+ public synchronized Collection<V> values() {
+ return map.values();
+ }
+
+ @Override
+ public synchronized Set<java.util.Map.Entry<K, V>> entrySet() {
+ return map.entrySet();
+ }
+
+ @Override
+ public String toString() {
+ return map.toString();
+ }
+
+ /**
+ * To find the latest key in a LinkedHashMap, iterate and return the last one found. The LinkedHashMap guarantees
+ * iteration in order of insertion
+ *
+ * @param m the map to inspect
+ * @return the latest key added to the map
+ */
+ private K findLatest(LinkedHashMap<K, V> m) {
+ K newLatest = null;
+ for (K key : m.keySet()) {
+ newLatest = key;
+ }
+ return newLatest;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
new file mode 100644
index 0000000..048f304
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/DefaultNameTree.java
@@ -0,0 +1,154 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Name;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * TODO need a way to bound the size
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class DefaultNameTree<T> implements NameTree<T> {
+ private final DefaultNameTree<T> parent;
+ private final Map<Name.Component, DefaultNameTree<T>> children = new HashMap<>();
+ private Name.Component component;
+ private T content;
+
+ DefaultNameTree(DefaultNameTree<T> parent) {
+ this.parent = parent;
+ }
+
+ public static <T> NameTree<T> newRootTree() {
+ return new DefaultNameTree<>(null);
+ }
+
+ @Override
+ public Optional<T> content() {
+ return Optional.ofNullable(content);
+ }
+
+ @Override
+ public Name fullName() {
+ ArrayList<Name.Component> components = new ArrayList<>();
+ NameTree<T> self = this;
+ while (self.lastComponent() != null) {
+ components.add(self.lastComponent());
+ self = self.parent();
+ }
+ Collections.reverse(components);
+ return new Name(components);
+ }
+
+ @Override
+ public Name.Component lastComponent() {
+ return component;
+ }
+
+ @Override
+ public Collection<NameTree<T>> children() {
+ List<NameTree<T>> c = new ArrayList<>(children.size());
+ for (NameTree<T> nt : children.values()) {
+ c.add(nt);
+ }
+ return c;
+ }
+
+ @Override
+ public NameTree<T> parent() {
+ return parent;
+ }
+
+ @Override
+ public Optional<NameTree<T>> find(Name name) {
+ if (name.size() == 0) {
+ return Optional.of(this);
+ } else {
+ Name.Component first = name.get(0);
+ DefaultNameTree<T> child = children.get(first);
+ if (child == null) {
+ return Optional.empty();
+ } else {
+ return child.find(name.size() > 1 ? name.getSubName(1) : new Name());
+ }
+ }
+ }
+
+ @Override
+ public NameTree<T> insert(Name name, T content) {
+ Name.Component first = name.get(0);
+
+ DefaultNameTree<T> child = children.get(first);
+ if (child == null) {
+ child = new DefaultNameTree<>(this);
+ child.component = first;
+ children.put(first, child);
+ }
+
+ if (name.size() == 1) {
+ child.content = content;
+ return child;
+ } else {
+ return child.insert(name.getSubName(1), content);
+ }
+ }
+
+ @Override
+ public Optional<NameTree<T>> delete(Name name) {
+ if (name.size() == 0) {
+ return Optional.of(parent.deleteChild(this.component));
+ } else {
+ Name.Component first = name.get(0);
+ DefaultNameTree<T> child = children.get(first);
+ if (child == null) {
+ return Optional.empty();
+ } else {
+ if (children.size() == 1) {
+ children.remove(first);
+ }
+ return child.delete(name.getSubName(1));
+ }
+ }
+ }
+
+ private NameTree<T> deleteChild(Name.Component component) {
+ return children.remove(component);
+ }
+
+ @Override
+ public int count() {
+ throw new UnsupportedOperationException("Breadth-first search?");
+ }
+
+ @Override
+ public void clear() {
+ children.clear();
+ }
+
+ @Override
+ public String toString() {
+ String c = (component == null) ? null : component.toEscapedString();
+ return "DefaultNameTree{" + c + ": " + content + '}';
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
new file mode 100644
index 0000000..713442c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/InMemoryContentStore.java
@@ -0,0 +1,141 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.ContentStore;
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.logging.Logger;
+
+/**
+ * TODO must bound the size of the content store
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class InMemoryContentStore implements ContentStore {
+ private static final Logger LOGGER = Logger.getLogger(InMemoryContentStore.class.getName());
+ private final NameTree<Blob> store;
+ private final Data template;
+
+ public InMemoryContentStore(int freshnessMs) {
+ this.template = new Data();
+ this.template.getMetaInfo().setFreshnessPeriod(freshnessMs);
+ this.store = DefaultNameTree.newRootTree();
+ }
+
+ @Override
+ public void put(Name name, Blob data) {
+ store.insert(name, data);
+ }
+
+ @Override
+ public Optional<Blob> get(Interest interest) {
+ Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+ return leaf.isPresent() ? leaf.get().content() : Optional.empty();
+ }
+
+ private Optional<NameTree<Blob>> getWithSelectors(Interest interest) {
+ Optional<NameTree<Blob>> possibleBlob = store.find(interest.getName());
+ if (possibleBlob.isPresent() && hasSelectors(interest)) {
+ List<NameTree<Blob>> children = new ArrayList<>(possibleBlob.get().children());
+ if (children.isEmpty()) {
+ return Optional.empty();
+ } else if (children.size() == 1) {
+ return Optional.of(children.get(0));
+ } else if (isRightMost(interest)) {
+ Collections.sort(children, (a, b) -> b.lastComponent().compare(a.lastComponent()));
+ return Optional.of(children.get(0));
+ } else if (isLeftMost(interest)) {
+ Collections.sort(children, (a, b) -> a.lastComponent().compare(b.lastComponent()));
+ return Optional.of(children.get(0));
+ } else {
+ // TODO max/min suffix components and excludes
+ LOGGER.warning("The interest requested has selectors not yet implemented; returning empty content");
+ return Optional.empty();
+ }
+ }
+ return possibleBlob;
+ }
+
+ private static boolean hasSelectors(Interest interest) {
+ return interest.getChildSelector() != -1 || interest.getExclude().size() > 0;
+ }
+
+ private static boolean isRightMost(Interest interest) {
+ return interest.getChildSelector() == Interest.CHILD_SELECTOR_RIGHT;
+ }
+
+ private static boolean isLeftMost(Interest interest) {
+ return interest.getChildSelector() == Interest.CHILD_SELECTOR_LEFT;
+ }
+
+ @Override
+ public boolean has(Name name) {
+ return store.find(name).isPresent();
+ }
+
+ @Override
+ public boolean has(Interest interest) {
+ return get(interest).isPresent();
+ }
+
+ @Override
+ public Optional<Blob> get(Name name) {
+ Optional<NameTree<Blob>> tree = store.find(name);
+ return tree.isPresent() ? tree.get().content() : Optional.empty();
+ }
+
+ @Override
+ public void push(Face face, Name name) throws IOException {
+ Optional<Blob> blob = get(name);
+ if (blob.isPresent()) {
+ Data t = new Data(template);
+ t.setName(name);
+ ByteArrayInputStream b = new ByteArrayInputStream(blob.get().getImmutableArray());
+ for (Data d : SegmentationHelper.segment(t, b)) {
+ face.putData(d);
+ }
+ }
+ }
+
+ @Override
+ public void push(Face face, Interest interest) throws IOException {
+ Optional<NameTree<Blob>> leaf = getWithSelectors(interest);
+ if (leaf.isPresent() && leaf.get().content().isPresent()) {
+ Data t = new Data(template);
+ t.setName(leaf.get().fullName());
+ ByteArrayInputStream b = new ByteArrayInputStream(leaf.get().content().get().getImmutableArray());
+ for (Data d : SegmentationHelper.segment(t, b)) {
+ face.putData(d);
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ store.clear();
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
new file mode 100644
index 0000000..1ddef62
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/impl/SegmentationHelper.java
@@ -0,0 +1,149 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper methods for reading and writing segmented NDN packets. See <a
+ * href="http://named-data.net/doc/tech-memos/naming-conventions.pdf">NDN Naming Conventions</a> for information used
+ * in this class
+ * <p>
+ * For segmentation of streams: the current use of the default segment size of 4096
+ * (only for {@link #segment(net.named_data.jndn.Data, java.io.InputStream)} is based on several assumptions: NDN packet
+ * size was limited to 8000 at the time this was written and the signature size is unknown.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class SegmentationHelper {
+
+ public static final int DEFAULT_SEGMENT_SIZE = 4096;
+ private static final byte NDN_SEGMENT_MARKER = 0x00;
+
+ private SegmentationHelper() {
+ // do not instantiate this class
+ }
+
+ /**
+ * Determine if a name is segmented, i.e. if it ends with the correct marker type.
+ *
+ * @param name the name of a packet
+ * @param marker the marker type (the initial byte of the component)
+ * @return true if the name is segmented
+ */
+ public static boolean isSegmented(Name name, byte marker) {
+ return name.size() > 0 && name.get(-1).getValue().buf().get(0) == marker;
+ }
+
+ /**
+ * Retrieve the segment number from the last component of a name.
+ *
+ * @param name the name of a packet
+ * @param marker the marker type (the initial byte of the component)
+ * @return the segment number
+ * @throws EncodingException if the name does not have a final component of the correct marker type
+ */
+ public static long parseSegment(Name name, byte marker) throws EncodingException {
+ if (name.size() == 0) {
+ throw new EncodingException("No components to parse.");
+ }
+ return name.get(-1).toNumberWithMarker(marker);
+ }
+
+ /**
+ * Remove a segment component from the end of a name
+ *
+ * @param name the name of a packet
+ * @param marker the marker type (the initial byte of the component)
+ * @return the new name with the segment component removed or a copy of the name if no segment component was present
+ */
+ public static Name removeSegment(Name name, byte marker) {
+ return isSegmented(name, marker) ? name.getPrefix(-1) : new Name(name);
+ }
+
+ /**
+ * Segment a stream of bytes into a list of Data packets; this must read all
+ * the bytes first in order to determine the end segment for FinalBlockId.
+ *
+ * @param template the {@link Data} packet to use for the segment {@link Name}, {@link net.named_data.jndn.MetaInfo},
+ * etc.
+ * @param bytes an {@link InputStream} to the bytes to segment
+ * @return a list of segmented {@link Data} packets
+ * @throws IOException if the stream fails
+ */
+ public static List<Data> segment(Data template, InputStream bytes) throws IOException {
+ return segment(template, bytes, DEFAULT_SEGMENT_SIZE);
+ }
+
+ /**
+ * Segment a stream of bytes into a list of Data packets; this must read all the bytes first in order to determine the
+ * end segment for FinalBlockId. TODO this could be smarter and only add segments if necessary
+ *
+ * @param template the {@link Data} packet to use for the segment {@link Name}, {@link net.named_data.jndn.MetaInfo},
+ * etc.
+ * @param bytes an {@link InputStream} to the bytes to segment
+ * @return a list of segmented {@link Data} packets
+ * @throws IOException if the stream fails
+ */
+ public static List<Data> segment(Data template, InputStream bytes, int segmentSize) throws IOException {
+ List<Data> segments = new ArrayList<>();
+ byte[] readBytes = readAll(bytes);
+ int numBytes = readBytes.length;
+ int numPackets = (int) Math.ceil((double) numBytes / segmentSize);
+ ByteBuffer buffer = ByteBuffer.wrap(readBytes, 0, numBytes);
+ Name.Component lastSegment = Name.Component.fromNumberWithMarker((long) numPackets - 1, NDN_SEGMENT_MARKER);
+
+ for (int i = 0; i < numPackets; i++) {
+ Data segment = new Data(template);
+ segment.getName().appendSegment(i);
+ segment.getMetaInfo().setFinalBlockId(lastSegment);
+ byte[] content = new byte[Math.min(segmentSize, buffer.remaining())];
+ buffer.get(content);
+ segment.setContent(new Blob(content));
+ segments.add(segment);
+ }
+
+ return segments;
+ }
+
+ /**
+ * Read all of the bytes in an input stream.
+ *
+ * @param bytes the {@link InputStream} of bytes to read
+ * @return an array of all bytes retrieved from the stream
+ * @throws IOException if the stream fails
+ */
+ public static byte[] readAll(InputStream bytes) throws IOException {
+ ByteArrayOutputStream builder = new ByteArrayOutputStream();
+ int read = bytes.read();
+ while (read != -1) {
+ builder.write(read);
+ read = bytes.read();
+ }
+ builder.flush();
+ bytes.close();
+ return builder.toByteArray();
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
index 21862fa..5befb6d 100644
--- a/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
@@ -14,15 +14,16 @@
package com.intel.jndn.utils.processing.impl;
import com.intel.jndn.utils.ProcessingStage;
-import java.io.ByteArrayOutputStream;
-import java.util.zip.GZIPOutputStream;
import net.named_data.jndn.Data;
import net.named_data.jndn.util.Blob;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPOutputStream;
+
/**
* Sample stage for compressing {@link Data} content using GZIP
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class CompressionStage implements ProcessingStage<Data, Data> {
diff --git a/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
index 3125db8..d9eb897 100644
--- a/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
@@ -20,10 +20,9 @@
import net.named_data.jndn.security.SecurityException;
/**
- * As a part of a {@link com.intel.jndn.utils.ServerIn} pipeline, this stage
- * will sign a {@link Data} packet.
+ * As a part of a server pipeline, this stage will sign a {@link Data} packet.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SigningStage implements ProcessingStage<Data, Data> {
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
new file mode 100644
index 0000000..363a9ce
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/AnnouncementService.java
@@ -0,0 +1,63 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Cancellation;
+import com.intel.jndn.utils.On;
+
+import java.io.IOException;
+
+/**
+ * API for entering and exiting a group and tracking the accompanying announcements.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+interface AnnouncementService {
+ /**
+ * Announce entrance into the group
+ *
+ * @param id the ID of the agent
+ * @throws IOException if the announcement fails
+ */
+ void announceEntrance(long id) throws IOException;
+
+ /**
+ * Announce exit from the group
+ *
+ * @param id the ID of the agent
+ * @throws IOException if the announcement fails
+ */
+ void announceExit(long id) throws IOException;
+
+ /**
+ * Discover all previously announced entrances; the implementation is expected to be driven by network IO
+ * @param onFound callback fired when a group member is discovered
+ * @param onComplete callback fired when all group members have been discovered
+ * @param onError callback fired if errors occur during discovery
+ * @return a cancellation token for stopping discovery
+ * @throws IOException if the network fails during discovery
+ */
+ Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException;
+
+ /**
+ * Observe any new entrances/exits; the implementation is expected to be driven by network IO
+ * @param onAdded callback fired when a group member announces its entry into the group, see {@link #announceEntrance(long)}
+ * @param onRemoved callback fired when a group member announces its exit from the group, see {@link #announceExit(long)}
+ * @param onError callback fired if errors are encountered while processing announcements
+ * @return a cancellation token for stopping the observer
+ * @throws IOException if the network fails while setting up the observer
+ */
+ Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
new file mode 100644
index 0000000..ccb058c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnAnnouncementService.java
@@ -0,0 +1,210 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Cancellation;
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.client.impl.BackoffRetryClient;
+import net.named_data.jndn.Exclude;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.InterestFilter;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterestCallback;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.security.SecurityException;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.intel.jndn.utils.Cancellation.CANCELLED;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class NdnAnnouncementService implements AnnouncementService {
+ private static final Logger LOGGER = Logger.getLogger(NdnAnnouncementService.class.getName());
+ private static final double STARTING_DISCOVERY_LIFETIME = 100.0;
+ private static final double MAX_DISCOVERY_LIFETIME = 45000.0;
+ private final Face face;
+ private final Name topicPrefix;
+ private final Name broadcastPrefix;
+ private final Set<Long> known = new HashSet<>();
+ private final BackoffRetryClient client;
+ private boolean stopped = false;
+
+ private NdnAnnouncementService(Face face, Name broadcastPrefix, Name topicPrefix) {
+ this.face = face;
+ this.topicPrefix = topicPrefix;
+ this.broadcastPrefix = new Name(broadcastPrefix).append(topicPrefix);
+ this.client = new BackoffRetryClient(MAX_DISCOVERY_LIFETIME, 2);
+ }
+
+ NdnAnnouncementService(Face face, Name topicPrefix) {
+ this(face, PubSubNamespace.DEFAULT_BROADCAST_PREFIX, topicPrefix);
+ }
+
+ @Override
+ public void announceEntrance(long id) throws IOException {
+ LOGGER.log(Level.INFO, "Announcing publisher entrance: {0} to {1}", new Object[]{id, broadcastPrefix});
+ Name name = PubSubNamespace.toAnnouncement(broadcastPrefix, id, PubSubNamespace.Announcement.ENTRANCE);
+ Interest interest = new Interest(name);
+ face.expressInterest(interest, null);
+ }
+
+ @Override
+ public void announceExit(long id) throws IOException {
+ LOGGER.log(Level.INFO, "Announcing publisher exit: {0} from {1}", new Object[]{id, broadcastPrefix});
+ Name name = PubSubNamespace.toAnnouncement(broadcastPrefix, id, PubSubNamespace.Announcement.EXIT);
+ Interest interest = new Interest(name);
+ face.expressInterest(interest, null);
+ }
+
+ @Override
+ public Cancellation discoverExistingAnnouncements(On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException {
+ LOGGER.log(Level.INFO, "Discover existing publishers: {0}", topicPrefix);
+ if (onFound == null) {
+ return CANCELLED;
+ }
+
+ for (long id : known) {
+ LOGGER.info("Passing known publisher: " + id);
+ onFound.on(id);
+ }
+
+ discover(client, onFound, onComplete, onError);
+ return () -> stopped = true;
+ }
+
+ // TODO need special namespace for discovery
+ private Interest discover(BackoffRetryClient client, On<Long> onFound, On<Void> onComplete, On<Exception> onError) throws IOException {
+ Interest interest = new Interest(topicPrefix);
+ interest.setInterestLifetimeMilliseconds(STARTING_DISCOVERY_LIFETIME);
+ interest.setExclude(excludeKnownPublishers());
+ client.retry(face, interest, (interest1, data) -> {
+ if (stopped) {
+ return;
+ }
+
+ LOGGER.log(Level.INFO, "Received discovery data ({0} bytes): {1}", new Object[]{data.getContent().size(), data.getName()});
+ found(data.getName(), onFound, onError);
+
+ // TODO instead of inspecting name should look at content and examine for multiple publishers
+ try {
+ discover(client, onFound, onComplete, onError); // recursion
+ } catch (IOException e) {
+ LOGGER.log(Level.SEVERE, "Failed while discovering publishers, aborting: {0}", new Object[]{broadcastPrefix, e});
+ onError.on(e);
+ // TODO re-call discover here?
+ }
+ }, interest2 -> {
+ // TODO recall client here, we should never be done
+ });
+ return interest;
+ }
+
+ private Exclude excludeKnownPublishers() {
+ Exclude exclude = new Exclude();
+ for (long pid : known) {
+ exclude.appendComponent(PubSubNamespace.toPublisherComponent(pid));
+ }
+ return exclude;
+ }
+
+ private void found(Name publisherName, On<Long> onFound, On<Exception> onError) {
+ try {
+ found(PubSubNamespace.parsePublisher(publisherName), onFound);
+ } catch (EncodingException e) {
+ LOGGER.log(Level.SEVERE, "Failed to parse new publisher name, ignoring: {0}", publisherName);
+ onError.on(e);
+ }
+ }
+
+ private void found(long publisherId, On<Long> onFound) {
+ LOGGER.log(Level.INFO, "Found new publisher: {0}", publisherId);
+ known.add(publisherId);
+ onFound.on(publisherId);
+ }
+
+ @Override
+ public Cancellation observeNewAnnouncements(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) throws IOException {
+ LOGGER.log(Level.INFO, "Observing new announcements: {0}", broadcastPrefix);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ OnRegistration onRegistration = new OnRegistration(future);
+ OnAnnouncement onAnnouncement = new OnAnnouncement(onAdded, onRemoved, onError);
+
+ try {
+ long registeredPrefix = face.registerPrefix(broadcastPrefix, onAnnouncement, (OnRegisterFailed) onRegistration, onRegistration);
+ return () -> face.removeRegisteredPrefix(registeredPrefix);
+ } catch (SecurityException e) {
+ throw new IOException("Failed while using transport security key chain", e);
+ }
+ }
+
+ private class OnAnnouncement implements OnInterestCallback {
+ private final On<Long> onAdded;
+ private final On<Long> onRemoved;
+ private final On<Exception> onError;
+
+ OnAnnouncement(On<Long> onAdded, On<Long> onRemoved, On<Exception> onError) {
+ this.onAdded = onAdded;
+ this.onRemoved = onRemoved;
+ this.onError = onError;
+ }
+
+ @Override
+ public void onInterest(Name name, Interest interest, Face face, long l, InterestFilter interestFilter) {
+ LOGGER.log(Level.INFO, "Received announcement: {0}", interest.toUri());
+ try {
+ long publisherId = PubSubNamespace.parsePublisher(interest.getName());
+ PubSubNamespace.Announcement announcement = PubSubNamespace.parseAnnouncement(interest.getName());
+ switch (announcement) {
+ case ENTRANCE:
+ add(publisherId);
+ break;
+ case EXIT:
+ remove(publisherId);
+ break;
+ default:
+ LOGGER.warning("Unknown announcement action, ignoring: " + interest.toUri());
+ }
+ } catch (EncodingException e) {
+ LOGGER.log(Level.SEVERE, "Failed to decode announcement: " + interest.toUri(), e);
+ onError.on(e);
+ }
+ }
+
+ private void remove(long publisherId) {
+ LOGGER.info("Publisher leaving topic: " + publisherId);
+ known.remove(publisherId);
+ if (onRemoved != null) {
+ onRemoved.on(publisherId);
+ }
+ }
+
+ private void add(long publisherId) {
+ LOGGER.info("Publisher entering topic: " + publisherId);
+ known.add(publisherId);
+ if (onAdded != null) {
+ onAdded.on(publisherId);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
new file mode 100644
index 0000000..adb069b
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -0,0 +1,158 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.ContentStore;
+import com.intel.jndn.utils.PendingInterestTable;
+import com.intel.jndn.utils.Publisher;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.InterestFilter;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterestCallback;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * TODO look at thread safety
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class NdnPublisher implements Publisher, OnInterestCallback {
+ private static final Logger LOGGER = Logger.getLogger(NdnPublisher.class.getName());
+ private static final int ATTRIBUTES_FRESHNESS_PERIOD = 2000;
+ private final Face face;
+ private final Name prefix;
+ private final AnnouncementService announcementService;
+ private final PendingInterestTable pendingInterestTable;
+ private final ContentStore contentStore;
+ private final long publisherId;
+ private final AtomicLong latestMessageId = new AtomicLong(0);
+ private long registrationId;
+ private boolean opened = false;
+
+ NdnPublisher(Face face, Name prefix, long publisherId, AnnouncementService announcementService, PendingInterestTable pendingInterestTable, ContentStore contentStore) {
+ this.face = face;
+ this.prefix = PubSubNamespace.toPublisherName(prefix, publisherId);
+ this.publisherId = publisherId;
+ this.announcementService = announcementService;
+ this.pendingInterestTable = pendingInterestTable;
+ this.contentStore = contentStore;
+ }
+
+ synchronized void open() throws IOException {
+ opened = true;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ OnRegistration onRegistration = new OnRegistration(future);
+
+ try {
+ registrationId = face.registerPrefix(prefix, this, (OnRegisterFailed) onRegistration, onRegistration);
+ // assumes face.processEvents is driven concurrently elsewhere
+ future.get(10, TimeUnit.SECONDS);
+ announcementService.announceEntrance(publisherId);
+ } catch (IOException | SecurityException | InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IOException("Failed to register NDN prefix; pub-sub IO will be impossible", e);
+ }
+ }
+
+ /**
+ * TODO this should not clear content store or remove registered prefix; do that in the future to allow subscribers
+ * to retrieve still-alive messages
+ *
+ * @throws IOException if the group exit announcement fails
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (opened) {
+ face.removeRegisteredPrefix(registrationId);
+ contentStore.clear();
+ announcementService.announceExit(publisherId);
+ }
+ }
+
+ @Override
+ public void publish(Blob message) throws IOException {
+ if (!opened) {
+ open();
+ }
+
+ long id = latestMessageId.getAndIncrement();
+ Name name = PubSubNamespace.toMessageName(prefix, id);
+
+ contentStore.put(name, PubSubNamespace.toResponse(null, message));
+ LOGGER.log(Level.INFO, "Published message {0} to content store: {1}", new Object[]{id, name});
+
+ if (pendingInterestTable.has(new Interest(name))) {
+ sendContent(face, name);
+ // TODO extract satisfied interests
+ }
+ }
+
+ @Override
+ public void onInterest(Name name, Interest interest, Face face, long registrationId, InterestFilter interestFilter) {
+ LOGGER.log(Level.INFO, "Client requesting message: {0}", interest.toUri());
+ if (isAttributesRequest(name, interest)) {
+ sendAttributes(face, name);
+ } else {
+ if (contentStore.has(interest)) {
+ sendContent(face, interest);
+ } else {
+ pendingInterestTable.add(interest);
+ }
+ }
+ }
+
+ private static boolean isAttributesRequest(Name name, Interest interest) {
+ return name.equals(interest.getName()) && interest.getChildSelector() == -1;
+ }
+
+ private void sendContent(Face face, Name name) {
+ try {
+ contentStore.push(face, name);
+ } catch (IOException e) {
+ LOGGER.log(Level.SEVERE, "Failed to publish message, aborting: {0}", new Object[]{name, e});
+ }
+ }
+
+ private void sendContent(Face face, Interest interest) {
+ try {
+ contentStore.push(face, interest);
+ } catch (IOException e) {
+ LOGGER.log(Level.SEVERE, "Failed to publish message, aborting: {0}", new Object[]{interest.getName(), e});
+ }
+ }
+
+ private static void sendAttributes(Face face, Name publisherName) {
+ Data data = new Data(publisherName);
+ data.setContent(new Blob("[attributes here]"));
+ data.getMetaInfo().setFreshnessPeriod(ATTRIBUTES_FRESHNESS_PERIOD);
+ try {
+ face.putData(data);
+ } catch (IOException e) {
+ LOGGER.log(Level.SEVERE, "Failed to publish attributes for publisher: " + publisherName, e);
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
new file mode 100644
index 0000000..51aee3a
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnSubscriber.java
@@ -0,0 +1,186 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Cancellation;
+import com.intel.jndn.utils.Client;
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Subscriber;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * TODO look at thread safety
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class NdnSubscriber implements Subscriber {
+ private static final Logger LOGGER = Logger.getLogger(NdnSubscriber.class.getName());
+ private final Face face;
+ private final Name prefix;
+ private final On<Blob> onMessage;
+ private final On<Exception> onError;
+ private final AnnouncementService announcementService;
+ private final Client client;
+ private final Map<Long, Subscription> subscriptions = new ConcurrentHashMap<>();
+ private Cancellation newAnnouncementCancellation;
+ private Cancellation existingAnnouncementsCancellation;
+
+ NdnSubscriber(Face face, Name prefix, On<Blob> onMessage, On<Exception> onError, AnnouncementService announcementService, Client client) {
+ this.face = face;
+ this.prefix = prefix;
+ this.onMessage = onMessage;
+ this.onError = onError;
+ this.announcementService = announcementService;
+ this.client = client;
+ }
+
+ @Override
+ public Set<Long> knownPublishers() {
+ return subscriptions.keySet();
+ }
+
+ @Override
+ public void open() throws IOException {
+ LOGGER.log(Level.INFO, "Starting subscriber: {0}", prefix);
+ existingAnnouncementsCancellation = announcementService.discoverExistingAnnouncements(this::addPublisher, null, e -> close());
+ newAnnouncementCancellation = announcementService.observeNewAnnouncements(this::addPublisher, this::removePublisher, e -> close());
+ }
+
+ void addPublisher(long publisherId) {
+ if (subscriptions.containsKey(publisherId)) {
+ LOGGER.log(Level.WARNING, "Duplicate publisher ID {} received from announcement service; this should not happen and will be ignored", publisherId);
+ } else {
+ Subscription subscription = new Subscription(publisherId);
+ subscriptions.put(publisherId, subscription);
+ subscription.subscribe();
+ }
+ }
+
+ void removePublisher(long publisherId) {
+ Subscription removed = subscriptions.remove(publisherId);
+ removed.cancel();
+ }
+
+ @Override
+ public void close() {
+ LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{subscriptions.size(), subscriptions});
+
+ if (newAnnouncementCancellation != null) {
+ newAnnouncementCancellation.cancel();
+ }
+
+ if (existingAnnouncementsCancellation != null) {
+ existingAnnouncementsCancellation.cancel();
+ }
+
+ for (Subscription c : subscriptions.values()) {
+ c.cancel();
+ }
+ }
+
+ private class Subscription implements Cancellation {
+ final long publisherId;
+ long messageId;
+ CompletableFuture<Data> currentRequest;
+
+ Subscription(long publisherId) {
+ this.publisherId = publisherId;
+ }
+
+ @Override
+ public synchronized void cancel() {
+ if (currentRequest != null) {
+ currentRequest.cancel(true);
+ }
+ }
+
+ synchronized void subscribe() {
+ // would prefer this to be getAsync(on<>, on<>)?
+ currentRequest = client.getAsync(face, buildLatestInterest(publisherId));
+ currentRequest.handle(this::handleResponse);
+ }
+
+ private Interest buildLatestInterest(long publisherId) {
+ Name name = PubSubNamespace.toPublisherName(prefix, publisherId);
+ Interest interest = new Interest(name); // TODO ms lifetime
+ interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+ return interest;
+ }
+
+ synchronized void next(long publisherId, long messageId) {
+ currentRequest = client.getAsync(face, buildNextInterest(publisherId, messageId));
+ currentRequest.handle(this::handleResponse);
+ }
+
+ private Interest buildNextInterest(long publisherId, long messageId) {
+ Name name = PubSubNamespace.toMessageName(prefix, publisherId, messageId + 1);
+ return new Interest(name); // TODO ms lifetime
+ }
+
+ private Void handleResponse(Data data, Throwable throwable) {
+ if (throwable != null) {
+ onError.on((Exception) throwable); // TODO avoid cast?
+ } else {
+ try {
+ Response response = PubSubNamespace.parseResponse(data);
+ this.messageId = response.messageId();
+ onMessage.on(response.content()); // TODO buffer and catch exceptions
+ } catch (EncodingException e) {
+ onError.on(e);
+ }
+
+ // TODO catchup and loop detection (if above fails next() is called again with same params)
+ next(this.publisherId, this.messageId);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Subscription subscription = (Subscription) o;
+ return publisherId == subscription.publisherId;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (publisherId ^ (publisherId >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "Subscription{" + "publisherId=" + publisherId + '}';
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
new file mode 100644
index 0000000..7b5a160
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/OnRegistration.java
@@ -0,0 +1,49 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.OnRegisterSuccess;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Logger;
+
+/**
+ * Helper for tracking and logging prefix registrations; TODO replace or remove this in the future
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class OnRegistration implements OnRegisterFailed, OnRegisterSuccess {
+ private static final Logger LOGGER = Logger.getLogger(OnRegistration.class.getName());
+ private final CompletableFuture<Void> future;
+
+ OnRegistration(CompletableFuture<Void> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void onRegisterSuccess(Name name, long l) {
+ LOGGER.info("Registered prefix: " + name);
+ future.complete(null);
+ }
+
+ @Override
+ public void onRegisterFailed(Name name) {
+ String message = "Failed to register prefix: " + name;
+ LOGGER.severe(message);
+ future.completeExceptionally(new RegistrationFailureException(message));
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
new file mode 100644
index 0000000..c0caddc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.On;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Subscriber;
+import com.intel.jndn.utils.client.impl.AdvancedClient;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import com.intel.jndn.utils.impl.InMemoryContentStore;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Assemble the necessary elements for building the {@link Publisher} and {@link Subscriber} implementations
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class PubSubFactory {
+
+ private PubSubFactory() {
+ // do not instantiate this factory
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @param prefix the NDN namespace under which messages are published
+ * @param onMessage callback fired when a message is received
+ * @param onError callback fired when an error happens after subscription
+ * @return an open, publisher-discovering, message-retrieving subscriber
+ * @throws IOException if the initial subscription or announcement IO fails
+ */
+ public static Subscriber newSubscriber(Face face, Name prefix, On<Blob> onMessage, On<Exception> onError) throws IOException {
+ NdnAnnouncementService announcementService = new NdnAnnouncementService(face, prefix);
+ AdvancedClient client = new AdvancedClient();
+ NdnSubscriber subscriber = new NdnSubscriber(face, prefix, onMessage, onError, announcementService, client);
+ subscriber.open();
+ return subscriber;
+ }
+
+ /**
+ * @param face the face to use for network IO; must be driven externally (e.g. {@link Face#processEvents()})
+ * @param prefix the NDN namespace under which messages are published
+ * @return a group-announcing, unopened subscriber (it will automatically open on first publish)
+ */
+ public static Publisher newPublisher(Face face, Name prefix) {
+ long publisherId = Math.abs(new Random().nextLong());
+ return new NdnPublisher(face, prefix, publisherId, new NdnAnnouncementService(face, prefix), new BoundedInMemoryPendingInterestTable(1024), new InMemoryContentStore(2000));
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
new file mode 100644
index 0000000..53c8cbc
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubNamespace.java
@@ -0,0 +1,139 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.encoding.tlv.TlvDecoder;
+import net.named_data.jndn.encoding.tlv.TlvEncoder;
+import net.named_data.jndn.util.Blob;
+
+import java.util.Collections;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class PubSubNamespace {
+ static final int MESSAGE_ID_MARKER = 42;
+ static final int PUBLISHER_ID_MARKER = 43;
+ static final int ANNOUNCEMENT_ACTION_MARKER = 44;
+
+ static final int MESSAGE_MARKER = 100;
+
+ static final int MESSAGE_ATTRIBUTES_MARKER = 101;
+ static final int MESSAGE_CONTENT_MARKER = 102;
+
+ static final Name DEFAULT_BROADCAST_PREFIX = new Name("/ndn/broadcast");
+ public static final int MESSAGE_PAYLOAD_INITIAL_CAPACITY = 4096;
+
+ private PubSubNamespace() {
+ // do not instantiate this class
+ }
+
+ static Name.Component toPublisherComponent(long publisherId) {
+ return Name.Component.fromNumberWithMarker(publisherId, PubSubNamespace.PUBLISHER_ID_MARKER);
+ }
+
+ static Name toPublisherName(Name prefix, long publisherId) {
+ return new Name(prefix).append(toPublisherComponent(publisherId));
+ }
+
+ static long parsePublisher(Name name) throws EncodingException {
+ return findMarkerFromEnd(name, PUBLISHER_ID_MARKER);
+ }
+
+ static Name toAnnouncement(Name prefix, long publisherId, Announcement announcement) {
+ Name.Component announcementComponent = Name.Component.fromNumberWithMarker(announcement.id, PubSubNamespace.ANNOUNCEMENT_ACTION_MARKER);
+ return toPublisherName(prefix, publisherId).append(announcementComponent);
+ }
+
+ static Announcement parseAnnouncement(Name name) throws EncodingException {
+ return Announcement.from((int) name.get(-1).toNumberWithMarker(ANNOUNCEMENT_ACTION_MARKER));
+ }
+
+ static Name toMessageName(Name prefix, long publisherId, long messageId) {
+ Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);
+ return toPublisherName(prefix, publisherId).append(messageComponent);
+ }
+
+ static Name toMessageName(Name publisherPrefix, long messageId) {
+ Name.Component messageComponent = Name.Component.fromNumberWithMarker(messageId, PubSubNamespace.MESSAGE_ID_MARKER);
+ return new Name(publisherPrefix).append(messageComponent);
+ }
+
+ static long parseMessageId(Name name) throws EncodingException {
+ return findMarkerFromEnd(name, MESSAGE_ID_MARKER);
+ }
+
+ static Blob toResponse(Blob attributes, Blob content) {
+ TlvEncoder encoder = new TlvEncoder(MESSAGE_PAYLOAD_INITIAL_CAPACITY);
+ int initialLength = encoder.getLength();
+
+ encoder.writeBlobTlv(MESSAGE_CONTENT_MARKER, content.buf()); // encode backwards, see Tlv0_1_1WireFormat.java
+ if(attributes != null && attributes.size() > 0){
+ encoder.writeBlobTlv(MESSAGE_ATTRIBUTES_MARKER, content.buf());
+ }
+ encoder.writeTypeAndLength(MESSAGE_MARKER, encoder.getLength() - initialLength);
+
+ return new Blob(encoder.getOutput(), false);
+ }
+
+ static Response parseResponse(Data data) throws EncodingException {
+ long messageId = parseMessageId(data.getName());
+ TlvDecoder decoder = new TlvDecoder(data.getContent().buf());
+ int endOffset = decoder.readNestedTlvsStart(MESSAGE_MARKER);
+ Blob attributes = new Blob(decoder.readOptionalBlobTlv(MESSAGE_ATTRIBUTES_MARKER, endOffset), true);
+ Blob content = new Blob(decoder.readBlobTlv(MESSAGE_CONTENT_MARKER), true);
+ return new Response(data.getName(), messageId, Collections.singletonMap("*", attributes), content);
+ }
+
+ private static long findMarkerFromEnd(Name name, int marker) throws EncodingException {
+ for (int i = name.size() - 1; i >= 0; i--) {
+ try {
+ return name.get(i).toNumberWithMarker(marker);
+ } catch (EncodingException e) {
+ // do nothing
+ }
+ }
+ throw new EncodingException("Failed to find marker " + marker + " in name: " + name.toUri());
+ }
+
+ enum Announcement {
+ ENTRANCE(0),
+ EXIT(1);
+
+ private final int id;
+
+ Announcement(int id) {
+ this.id = id;
+ }
+
+ static Announcement from(int value) {
+ for (Announcement a : Announcement.values()) {
+ if (a.id == value) return a;
+ }
+ return null;
+ }
+
+ int value() {
+ return id;
+ }
+
+ public int getValue() {
+ return id;
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java b/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java
new file mode 100644
index 0000000..aa7919d
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/RegistrationFailureException.java
@@ -0,0 +1,28 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class RegistrationFailureException extends Exception {
+ RegistrationFailureException(String message) {
+ super(message);
+ }
+
+ public RegistrationFailureException(Exception e) {
+ super(e);
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/Response.java b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
new file mode 100644
index 0000000..29aa1cd
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/Response.java
@@ -0,0 +1,56 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+
+import java.util.Map;
+
+/**
+ * Represent a publisher response to a subscriber request for a message. Note that the {@link #attributes} act as a type
+ * of header optionally prepended to the message content. The intent of this is to allow publishers to inform
+ * subscribers of publisher-side state changes without requiring a separate mechanism (with accompanying complexity and
+ * overhead); the full set of publisher state attributes should be served elsewhere, not in this class.
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+class Response {
+ private final Name name;
+ private final long messageId;
+ private final Map<String, Object> attributes;
+ private final Blob content;
+
+ Response(Name name, long messageId, Map<String, Object> attributes, Blob content) {
+ this.name = name;
+ this.messageId = messageId;
+ this.attributes = attributes;
+ this.content = content;
+ }
+
+ /**
+ * @return the message ID of the published message
+ */
+ long messageId() {
+ return messageId;
+ }
+
+ /**
+ * @return the content of the message
+ */
+ Blob content() {
+ return content;
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/package.html b/src/main/java/com/intel/jndn/utils/pubsub/package.html
new file mode 100644
index 0000000..562c542
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/pubsub/package.html
@@ -0,0 +1,26 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
+<html>
+<body>
+<p>See [need link here] for algorithm details.</p>
+
+<p>The NDN pub-sub algorithm is designed using the following criteria:</p>
+<ul>
+ <li>subscriber-driven</li>
+ <li>lightweight</li>
+ <li>avoid unnecessary IO</li>
+</ul>
+
+<p>It makes the following guarantees:</p>
+<ul>
+ <li>each publisher will order its own messages by the order in which they were published; however, the mechanism
+ for this ordering (i.e. a monotonically-increasing ID) will not maintain the guarantee across multiple
+ publishers (in this case, an application-level mechanism, like a time-synchronized timestamp, must be used;
+ alternatively, try ChronoSync)
+ </li>
+ <li>publishers will provide the messages published on a topic for a specified lifetime; even after being closed,
+ the publisher must maintain a registered prefix and respond to subscriber requests for data
+ </li>
+ <li>subscribers will eventually discover all routable publishers</li>
+</ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java b/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
index eba700a..df0659f 100644
--- a/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
+++ b/src/main/java/com/intel/jndn/utils/repository/impl/DataNotFoundException.java
@@ -14,9 +14,9 @@
package com.intel.jndn.utils.repository.impl;
/**
- * Thrown when a {@link Repository} cannot retrieve a stored packet.
+ * Thrown when a {@link com.intel.jndn.utils.Repository} cannot retrieve a stored packet.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class DataNotFoundException extends Exception {
diff --git a/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
index e40890d..bb259c1 100644
--- a/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
+++ b/src/main/java/com/intel/jndn/utils/repository/impl/ForLoopRepository.java
@@ -26,11 +26,11 @@
* {@link net.named_data.jndn.util.MemoryContentCache} and borrows the matching
* logic from there.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class ForLoopRepository implements Repository {
- private List<Record> storage = new ArrayList<>();
+ private final List<Record> storage = new ArrayList<>();
/**
* {@inheritDoc}
@@ -95,7 +95,7 @@
* components (e.g. c); if the Data is not longer than the Interest, return an
* empty component.
*/
- private Name.Component getNextComponentAfterLastInterestComponent(Data content, Interest interest) {
+ private static Name.Component getNextComponentAfterLastInterestComponent(Data content, Interest interest) {
if (content.getName().size() > interest.getName().size()) {
return content.getName().get(interest.getName().size());
} else {
@@ -132,11 +132,7 @@
* record is fresh
*/
private boolean hasAcceptableFreshness(Interest interest, Record record) {
- if (!interest.getMustBeFresh()) {
- return true;
- } else {
- return isFresh(record);
- }
+ return !interest.getMustBeFresh() || isFresh(record);
}
/**
@@ -171,10 +167,10 @@
*/
private class Record {
- public final Data data;
- public final long addedAt;
+ final Data data;
+ final long addedAt;
- public Record(Data data) {
+ Record(Data data) {
this.data = data;
this.addedAt = System.currentTimeMillis();
}
diff --git a/src/main/java/com/intel/jndn/utils/server/DynamicServer.java b/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
index 60caf35..f8432ca 100644
--- a/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/DynamicServer.java
@@ -14,27 +14,27 @@
package com.intel.jndn.utils.server;
import com.intel.jndn.utils.Server;
-import com.intel.jndn.utils.server.RespondWithData;
+
import java.io.IOException;
/**
- * Defines the API for a {@link Server} producing {@link Data} packets
- * dynamically; in other words, when an {@link Interest} arrives, this server
+ * Defines the API for a {@link Server} producing data packets
+ * dynamically; in other words, when an interest arrives, this server
* will run a callback to determine what packet to send back. As good practice,
* keep callback methods as short as possible.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface DynamicServer extends Server {
/**
- * Set the callback method to run when an {@link Interest} packet is passed to
- * this server. This method should either return a {@link Data} packet that
+ * Set the callback method to run when an interest packet is passed to
+ * this server. This method should either return a data packet that
* satisfies the Interest or throw an Exception to avoid sending. Calling this
* method a second time should replace the callback.
*
* @param callback the callback instance
* @throws java.io.IOException if the server fails to register a prefix
*/
- public void respondUsing(RespondWithData callback) throws IOException;
+ void respondUsing(RespondWithData callback) throws IOException;
}
diff --git a/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java b/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
index 09ebc23..a0db7e9 100644
--- a/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/RepositoryServer.java
@@ -14,15 +14,16 @@
package com.intel.jndn.utils.server;
import com.intel.jndn.utils.Server;
-import java.io.IOException;
import net.named_data.jndn.Data;
+import java.io.IOException;
+
/**
* Defines the API for a {@link Server} producing {@link Data} packets and
* storing them until they are requested; this server corresponds closely to use
* cases such as: cache, file system, web server.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface RepositoryServer extends Server {
@@ -32,10 +33,10 @@
* @param data the {@link Data} packet to store and serve
* @throws IOException if the underlying server fails to store the packet
*/
- public void serve(Data data) throws IOException;
+ void serve(Data data) throws IOException;
/**
* Clean up stale {@link Data} packets from the underlying content store.
*/
- public void cleanup();
+ void cleanup();
}
diff --git a/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java b/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
index fe7f74a..d83e1d6 100644
--- a/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
+++ b/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
@@ -20,9 +20,9 @@
/**
* Functional interface for serving data from Server.on()
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface RespondWithBlob {
- public Blob onInterest(Name prefix, Interest interest) throws Exception;
+ Blob onInterest(Name prefix, Interest interest) throws Exception;
}
diff --git a/src/main/java/com/intel/jndn/utils/server/RespondWithData.java b/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
index 95d0e6a..556d1d6 100644
--- a/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
+++ b/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
@@ -20,9 +20,9 @@
/**
* Functional interface for serving data from Server.on()
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public interface RespondWithData {
- public Data onInterest(Name prefix, Interest interest) throws Exception;
+ Data onInterest(Name prefix, Interest interest) throws Exception;
}
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
index 6be4280..38a4fde 100644
--- a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServer.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -13,17 +13,10 @@
*/
package com.intel.jndn.utils.server.impl;
-import com.intel.jndn.utils.server.RepositoryServer;
-import com.intel.jndn.utils.repository.impl.ForLoopRepository;
import com.intel.jndn.utils.Repository;
-import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
-import com.intel.jndn.utils.server.impl.ServerBaseImpl;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import com.intel.jndn.utils.impl.SegmentationHelper;
+import com.intel.jndn.utils.repository.impl.ForLoopRepository;
+import com.intel.jndn.utils.server.RepositoryServer;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -31,11 +24,18 @@
import net.named_data.jndn.Name;
import net.named_data.jndn.encoding.EncodingException;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
/**
* Implementation of a {@link RepositoryServer} that segments packets stored in
* its repository.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SegmentedServer extends ServerBaseImpl implements RepositoryServer {
@@ -58,9 +58,9 @@
register();
}
- if (data.getContent().size() >= SegmentedServerHelper.DEFAULT_SEGMENT_SIZE) {
+ if (data.getContent().size() >= SegmentationHelper.DEFAULT_SEGMENT_SIZE) {
InputStream stream = new ByteArrayInputStream(data.getContent().getImmutableArray());
- List<Data> segments = SegmentedServerHelper.segment(data, stream);
+ List<Data> segments = SegmentationHelper.segment(data, stream);
for (Data segment : segments) {
logger.fine("Adding segment: " + segment.getName().toUri());
repository.put(segment);
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java b/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java
deleted file mode 100644
index 78e3c27..0000000
--- a/src/main/java/com/intel/jndn/utils/server/impl/SegmentedServerHelper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.server.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.util.Blob;
-
-/**
- * Helper for segmenting an input stream into a list of Data packets. Current
- * use of the default segment size of 4096 (only for
- * {@link #segment(net.named_data.jndn.Data, java.io.InputStream)} is based on
- * several assumptions: NDN packet size was limited to 8000 at the time this was
- * written and signature size is unknown.
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedServerHelper {
-
- public static final int DEFAULT_SEGMENT_SIZE = 4096;
-
- /**
- * Segment a stream of bytes into a list of Data packets; this must read all
- * the bytes first in order to determine the end segment for FinalBlockId.
- *
- * @param template the {@link Data} packet to use for the segment {@link Name},
- * {@link net.named_data.jndn.MetaInfo}, etc.
- * @param bytes an {@link InputStream} to the bytes to segment
- * @return a list of segmented {@link Data} packets
- * @throws IOException if the stream fails
- */
- public static List<Data> segment(Data template, InputStream bytes) throws IOException {
- return segment(template, bytes, DEFAULT_SEGMENT_SIZE);
- }
-
- /**
- * Segment a stream of bytes into a list of Data packets; this must read all
- * the bytes first in order to determine the end segment for FinalBlockId.
- *
- * @param template the {@link Data} packet to use for the segment {@link Name},
- * {@link net.named_data.jndn.MetaInfo}, etc.
- * @param bytes an {@link InputStream} to the bytes to segment
- * @return a list of segmented {@link Data} packets
- * @throws IOException if the stream fails
- */
- public static List<Data> segment(Data template, InputStream bytes, int segmentSize) throws IOException {
- List<Data> segments = new ArrayList<>();
- byte[] buffer_ = readAll(bytes);
- int numBytes = buffer_.length;
- int numPackets = (int) Math.ceil((double) numBytes / segmentSize);
- ByteBuffer buffer = ByteBuffer.wrap(buffer_, 0, numBytes);
- Name.Component lastSegment = Name.Component.fromNumberWithMarker(numPackets - 1, 0x00);
-
- for (int i = 0; i < numPackets; i++) {
- Data segment = new Data(template);
- segment.getName().appendSegment(i);
- segment.getMetaInfo().setFinalBlockId(lastSegment);
- byte[] content = new byte[Math.min(segmentSize, buffer.remaining())];
- buffer.get(content);
- segment.setContent(new Blob(content));
- segments.add(segment);
- }
-
- return segments;
- }
-
- /**
- * Read all of the bytes in an input stream.
- *
- * @param bytes the {@link InputStream} of bytes to read
- * @return an array of all bytes retrieved from the stream
- * @throws IOException if the stream fails
- */
- public static byte[] readAll(InputStream bytes) throws IOException {
- ByteArrayOutputStream builder = new ByteArrayOutputStream();
- int read = bytes.read();
- while (read != -1) {
- builder.write(read);
- read = bytes.read();
- }
- builder.flush();
- bytes.close();
- return builder.toByteArray();
- }
-}
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
index 6110005..64130bd 100644
--- a/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
@@ -15,11 +15,6 @@
import com.intel.jndn.utils.ProcessingStage;
import com.intel.jndn.utils.Server;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.ForwardingFlags;
@@ -27,10 +22,16 @@
import net.named_data.jndn.OnRegisterFailed;
import net.named_data.jndn.encoding.EncodingException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
/**
* Base implementation for a {@link Server}.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public abstract class ServerBaseImpl implements Server {
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java b/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
index cee7d7f..1c1a643 100644
--- a/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/SimpleServer.java
@@ -14,14 +14,8 @@
package com.intel.jndn.utils.server.impl;
import com.intel.jndn.utils.server.DynamicServer;
-import com.intel.jndn.utils.server.RespondWithData;
-import com.intel.jndn.utils.server.RespondWithBlob;
import com.intel.jndn.utils.server.RespondWithBlob;
import com.intel.jndn.utils.server.RespondWithData;
-import com.intel.jndn.utils.server.impl.ServerBaseImpl;
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -30,11 +24,15 @@
import net.named_data.jndn.OnInterest;
import net.named_data.jndn.util.Blob;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
/**
* Implementation of a {@link DynamicServer} that wraps the {@link OnInterest}
* callback with some encoding and pipeline support.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SimpleServer extends ServerBaseImpl implements DynamicServer {
diff --git a/src/test/java/com/intel/jndn/utils/TestHelper.java b/src/test/java/com/intel/jndn/utils/TestHelper.java
index a73aba4..72a96d7 100644
--- a/src/test/java/com/intel/jndn/utils/TestHelper.java
+++ b/src/test/java/com/intel/jndn/utils/TestHelper.java
@@ -14,6 +14,15 @@
package com.intel.jndn.utils;
import com.intel.jndn.mock.MockKeyChain;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.security.KeyChain;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.transport.UdpTransport;
+import net.named_data.jndn.util.Blob;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -26,19 +35,11 @@
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Face;
-import net.named_data.jndn.Name;
-import net.named_data.jndn.encoding.EncodingException;
-import net.named_data.jndn.security.KeyChain;
-import net.named_data.jndn.security.SecurityException;
-import net.named_data.jndn.transport.UdpTransport;
-import net.named_data.jndn.util.Blob;
/**
* Collect assorted methods to help with testing
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class TestHelper {
@@ -104,7 +105,7 @@
public ScheduledExecutorService executor;
public KeyChain keyChain;
- public List<Face> faces = new ArrayList<>();
+ public final List<Face> faces = new ArrayList<>();
}
public static class EventProcessor implements Runnable {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
index 45572a7..d778a40 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientFileTestIT.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -15,16 +15,7 @@
import com.intel.jndn.utils.Client;
import com.intel.jndn.utils.TestHelper;
-import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import com.intel.jndn.utils.impl.SegmentationHelper;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -34,11 +25,21 @@
import net.named_data.jndn.security.SecurityException;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* Test AdvancedClient.java; requires a hostname to an NFD accepting a generated
* key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class AdvancedClientFileTestIT {
@@ -68,7 +69,7 @@
long startTime = System.currentTimeMillis();
AdvancedClient client = new AdvancedClient();
List<CompletableFuture<Data>> requests = expressInterests(client, consumer, PREFIX, NUM_MESSAGES);
- List<Data> datas = requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
+ requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
long endTime = System.currentTimeMillis();
logger.info(String.format("Transfered %d bytes in %d ms", MESSAGE_SIZE_BYTES * NUM_MESSAGES, endTime - startTime));
@@ -106,7 +107,7 @@
data.getMetaInfo().setFreshnessPeriod(0);
try {
- for (Data segment : SegmentedServerHelper.segment(data, bytes, segmentSize)) {
+ for (Data segment : SegmentationHelper.segment(data, bytes, segmentSize)) {
logger.log(Level.INFO, "Put data: " + segment.getName().toUri());
face.putData(segment);
}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
index 6a99599..b09ae6f 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientStressTestIT.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -15,16 +15,7 @@
import com.intel.jndn.utils.Client;
import com.intel.jndn.utils.TestHelper;
-import com.intel.jndn.utils.server.impl.SegmentedServerHelper;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import com.intel.jndn.utils.impl.SegmentationHelper;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -34,11 +25,21 @@
import net.named_data.jndn.security.SecurityException;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* Test AdvancedClient.java; requires a hostname to an NFD accepting a generated
* key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class AdvancedClientStressTestIT {
@@ -68,7 +69,7 @@
long startTime = System.currentTimeMillis();
AdvancedClient client = new AdvancedClient();
List<CompletableFuture<Data>> requests = expressInterests(client, consumer, PREFIX, NUM_MESSAGES);
- List<Data> datas = requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
+ requests.stream().map((f) -> TestHelper.retrieve(f)).collect(Collectors.toList());
long endTime = System.currentTimeMillis();
logger.info(String.format("Transfered %d bytes in %d ms", MESSAGE_SIZE_BYTES * NUM_MESSAGES, endTime - startTime));
@@ -106,7 +107,7 @@
data.getMetaInfo().setFreshnessPeriod(0);
try {
- for (Data segment : SegmentedServerHelper.segment(data, bytes, segmentSize)) {
+ for (Data segment : SegmentationHelper.segment(data, bytes, segmentSize)) {
logger.log(Level.INFO, "Put data: " + segment.getName().toUri());
face.putData(segment);
}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
index 3ee3bb8..2462f23 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/AdvancedClientTest.java
@@ -1,6 +1,6 @@
/*
* jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
@@ -17,6 +17,7 @@
import com.intel.jndn.mock.MockForwarder;
import com.intel.jndn.utils.TestHelper;
import com.intel.jndn.utils.client.SegmentationType;
+import com.intel.jndn.utils.impl.SegmentationHelper;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -70,7 +71,7 @@
Face face = forwarder.connect();
face.registerPrefix(name, new OnInterestCallback() {
- private int max = 9;
+ private final int max = 9;
@Override
public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/BackoffRetryClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/BackoffRetryClientTest.java
new file mode 100644
index 0000000..f070911
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/client/impl/BackoffRetryClientTest.java
@@ -0,0 +1,65 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.client.impl;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnTimeout;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static junit.framework.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BackoffRetryClientTest {
+
+ private BackoffRetryClient instance;
+
+ @Before
+ public void before() {
+ instance = new BackoffRetryClient(10, 2);
+ }
+
+ @Test
+ public void retry() throws Exception {
+ Face face = mock(Face.class);
+ ArgumentCaptor<Interest> interestCaptor = ArgumentCaptor.forClass(Interest.class);
+ ArgumentCaptor<OnTimeout> onTimeoutCaptor = ArgumentCaptor.forClass(OnTimeout.class);
+ when(face.expressInterest(interestCaptor.capture(), any(), onTimeoutCaptor.capture())).then(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ onTimeoutCaptor.getValue().onTimeout(interestCaptor.getValue());
+ return -1L;
+ }
+ });
+ Interest interest = new Interest(new Name("/backoff/test"), 1);
+ AtomicInteger timeouts = new AtomicInteger();
+
+ instance.retry(face, interest, null, interest1 -> timeouts.incrementAndGet());
+
+ assertEquals(1, timeouts.get());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
index 39b6586..5d15a89 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DataAssemblerTest.java
@@ -14,16 +14,18 @@
package com.intel.jndn.utils.client.impl;
import com.intel.jndn.utils.TestHelper;
-import java.util.concurrent.ExecutionException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Name;
-import static org.junit.Assert.*;
import org.junit.Test;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+
/**
* Test that segment data packets are re-assembled correctly.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class DataAssemblerTest {
@@ -71,6 +73,6 @@
byte marker = 0x00;
DataAssembler instance = new DataAssembler(packets, marker);
- Data reassembled = instance.assemble();
+ instance.assemble();
}
}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
index f529b58..c5f93b2 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultRetryClientTest.java
@@ -34,9 +34,9 @@
private static final double INTEREST_LIFETIME_MS = 1.0;
private DefaultRetryClient client;
- private Name name = new Name("/test/retry/client");
- private Interest interest = new Interest(name, INTEREST_LIFETIME_MS);
- private TestCounter counter = new TestCounter();
+ private final Name name = new Name("/test/retry/client");
+ private final Interest interest = new Interest(name, INTEREST_LIFETIME_MS);
+ private final TestCounter counter = new TestCounter();
@Before
public void before() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
index 05c85f5..e4082c3 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultSegmentedClientTest.java
@@ -34,7 +34,7 @@
*/
public class DefaultSegmentedClientTest {
- private DefaultSegmentedClient instance = new DefaultSegmentedClient();
+ private final DefaultSegmentedClient instance = new DefaultSegmentedClient();
@Test
public void testGetSegmentsAsync() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
index aaf43df..7497f72 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/DefaultStreamingClientTest.java
@@ -15,18 +15,21 @@
import com.intel.jndn.utils.TestHelper;
import com.intel.jndn.utils.client.OnException;
-import java.io.InputStream;
import net.named_data.jndn.Name;
import org.junit.Test;
-import static org.junit.Assert.*;
+
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class DefaultStreamingClientTest {
- DefaultStreamingClient instance = new DefaultStreamingClient();
+ private final DefaultStreamingClient instance = new DefaultStreamingClient();
@Test
public void testGetAsyncStream() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java b/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
index 2f165b9..a8203a8 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SegmentedDataStreamTest.java
@@ -14,24 +14,26 @@
package com.intel.jndn.utils.client.impl;
import com.intel.jndn.utils.TestHelper;
-import java.util.ArrayList;
-import java.util.stream.IntStream;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.Name.Component;
import net.named_data.jndn.encoding.EncodingException;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.stream.IntStream;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import org.junit.Test;
/**
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SegmentedDataStreamTest {
- SegmentedDataStream instance = new SegmentedDataStream();
+ private final SegmentedDataStream instance = new SegmentedDataStream();
@Test
public void testAddingSequentialData() throws StreamException {
@@ -131,7 +133,7 @@
assertEquals(7, instance.list().length);
}
- protected void addPacketToInstance(long i) {
+ private void addPacketToInstance(long i) {
Name name = new Name().appendSegment(i);
instance.onData(new Interest(name), new Data(name));
}
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
index 4e529be..fc5ac4b 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTest.java
@@ -36,7 +36,7 @@
/**
* Test SimpleClient.java
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SimpleClientTest {
diff --git a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
index f49525b..67f5ce7 100644
--- a/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/client/impl/SimpleClientTestIT.java
@@ -13,14 +13,7 @@
*/
package com.intel.jndn.utils.client.impl;
-import com.intel.jndn.utils.client.impl.SimpleClient;
import com.intel.jndn.utils.TestHelper;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -30,24 +23,31 @@
import net.named_data.jndn.security.SecurityException;
import net.named_data.jndn.util.Blob;
import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import org.junit.Test;
/**
* Test SimpleClient.java; requires a hostname to an NFD accepting a generated
* key to register prefixes, e.g. mvn test -Dnfd.ip=10.10.10.1
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SimpleClientTestIT {
private static final Logger logger = Logger.getLogger(SimpleClientTestIT.class.getName());
private static final Name PREFIX_RETRIEVE = new Name("/test/simple-client/retrieve-one").append(TestHelper.buildRandomString(10));
private static final Name PREFIX_RETRIEVE_MULTIPLE = new Name("/test/simple-client/retrieve-multiple").append(TestHelper.buildRandomString(10));
-
- SimpleClient instance;
private final TestHelper.NdnEnvironment environment;
+ private SimpleClient instance;
public SimpleClientTestIT() throws SecurityException {
String ip = System.getProperty("nfd.ip");
@@ -116,7 +116,7 @@
private class DataServer implements OnInterestCallback {
- private Data data;
+ private final Data data;
public DataServer(Data data) {
this.data = data;
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
new file mode 100644
index 0000000..739e215
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedInMemoryPendingInterestTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedInMemoryPendingInterestTableTest {
+
+ private BoundedInMemoryPendingInterestTable instance;
+
+ @Before
+ public void before() {
+ instance = new BoundedInMemoryPendingInterestTable(5);
+ }
+
+ @Test
+ public void add() throws Exception {
+ Name name = new Name("/a/b/c");
+ instance.add(new Interest(name));
+ assertTrue(instance.has(name));
+ }
+
+ @Test
+ public void has() throws Exception {
+ Name name = new Name("/a/b/c");
+ instance.add(new Interest(name));
+
+ Interest interest = new Interest(new Name("/a/b"));
+ interest.setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+ assertTrue(instance.has(interest));
+ }
+
+ @Test
+ public void extract() throws Exception {
+ instance.add(new Interest(new Name("/a")));
+ instance.add(new Interest(new Name("/a/b")));
+ instance.add(new Interest(new Name("/a/b/c")));
+
+ Collection<Interest> extracted = instance.extract(new Name("/a/b"));
+ assertEquals(2, extracted.size()); // TODO not sure about this...
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
new file mode 100644
index 0000000..d4ca544
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/BoundedLinkedMapTest.java
@@ -0,0 +1,177 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test BoundedLinkedMapTest
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class BoundedLinkedMapTest {
+ private static final Logger LOGGER = Logger.getLogger(BoundedLinkedMapTest.class.getName());
+ private BoundedLinkedMap<String, Object> instance;
+
+ @Before
+ public void beforeTest() {
+ instance = new BoundedLinkedMap<>(2);
+ }
+
+ @Test
+ public void testUsage() {
+ Object object0 = new Object();
+ Object object1 = new Object();
+ Object object2 = new Object();
+
+ instance.put("0", object0);
+ assertEquals(1, instance.size());
+
+ instance.put("1", object1);
+ assertEquals(2, instance.size());
+
+ instance.put("2", object2);
+ assertEquals(2, instance.size());
+
+ assertNull(instance.get("0"));
+ assertEquals("2", instance.latest());
+ }
+
+ @Test
+ public void testEarliestLatest() {
+ assertNull(instance.earliest());
+ assertNull(instance.latest());
+
+ instance.put(".", new Object());
+ assertEquals(instance.earliest(), instance.latest());
+
+ instance.put("..", new Object());
+ assertEquals(".", instance.earliest());
+ assertEquals("..", instance.latest());
+
+ instance.put("...", new Object());
+ assertEquals("..", instance.earliest());
+ assertEquals("...", instance.latest());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ assertTrue(instance.isEmpty());
+ instance.put("...", new Object());
+ assertFalse(instance.isEmpty());
+ }
+
+ @Test
+ public void testContainsKey() {
+ assertFalse(instance.containsKey("..."));
+ instance.put("...", new Object());
+ assertTrue(instance.containsKey("..."));
+ }
+
+ @Test
+ public void testContainsValue() {
+ Object o = new Object();
+ assertFalse(instance.containsValue(o));
+ instance.put("...", o);
+ assertTrue(instance.containsValue(o));
+ }
+
+ @Test
+ public void testRemove() {
+ Object o = new Object();
+ String key = "...";
+
+ instance.put(key, o);
+ assertTrue(instance.containsKey(key));
+ assertTrue(instance.containsValue(o));
+
+ instance.remove(key);
+ assertFalse(instance.containsKey(key));
+ assertFalse(instance.containsValue(o));
+ assertEquals(0, instance.size());
+ }
+
+ @Test
+ public void testPutAll() {
+ HashMap<String, Object> map = new HashMap<>();
+ map.put("1", new Object());
+ map.put("2", new Object());
+ map.put("99", new Object());
+
+ instance.putAll(map);
+ LOGGER.log(Level.FINE, "Map passed to putAll(): {0}", map.toString());
+ LOGGER.log(Level.FINE, "Resulting bounded map after putAll(): {0}", instance.toString());
+
+ assertEquals(2, instance.size()); // note: this is not 3 because the max size is bounded
+ assertEquals("2", instance.latest()); // note: put all doesn't do the FIFO replacement
+ }
+
+ @Test
+ public void testClear() {
+ instance.put("...", new Object());
+
+ instance.clear();
+
+ assertEquals(0, instance.size());
+ assertNull(instance.get("..."));
+ assertNull(instance.latest());
+ assertNull(instance.earliest());
+ }
+
+ @Test
+ public void testConversions() {
+ instance.put("...", new Object());
+
+ assertEquals(1, instance.keySet().size());
+ assertEquals(1, instance.values().size());
+ assertEquals(1, instance.entrySet().size());
+ }
+
+ @Test
+ public void testPerformanceAgainstArrayList() {
+ int numMessages = 10000;
+ BoundedLinkedMap<Integer, Object> map = new BoundedLinkedMap<>(numMessages);
+ ArrayList<Object> list = new ArrayList<>(numMessages);
+
+ long mapPutTime = measure(numMessages, i -> map.put(i, new Object()));
+ long listPutTime = measure(numMessages, i -> list.add(i, new Object()));
+ LOGGER.log(Level.FINE, "Custom map put has overhead of {0}% versus list put", toPercent((mapPutTime - listPutTime) / (double) listPutTime));
+
+ long mapGetTime = measure(numMessages, map::get);
+ long listGetTime = measure(numMessages, list::get);
+ LOGGER.log(Level.FINE, "Custom map get has overhead of {0}% versus list get", toPercent((mapGetTime - listGetTime) / (double) listPutTime));
+ }
+
+ private long measure(int numTimes, Consumer<Integer> work) {
+ long start = System.nanoTime();
+ for (int i = 0; i < numTimes; i++) {
+ work.accept(i);
+ }
+ return System.nanoTime() - start;
+ }
+
+ private double toPercent(double number) {
+ return Math.round(number * 100);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/DefaultNameTreeTest.java b/src/test/java/com/intel/jndn/utils/impl/DefaultNameTreeTest.java
new file mode 100644
index 0000000..9af62c9
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/DefaultNameTreeTest.java
@@ -0,0 +1,91 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.utils.NameTree;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class DefaultNameTreeTest {
+
+ private NameTree<String> instance;
+
+ @Before
+ public void setUp() throws Exception {
+ instance = DefaultNameTree.newRootTree();
+ instance.insert(new Name("/a/b/c"), ".");
+ instance.insert(new Name("/a/b/d"), "..");
+ instance.insert(new Name("/a/e"), "...");
+ }
+
+ @Test
+ public void content() throws Exception {
+ // TODO
+ }
+
+ @Test
+ public void retrieveName() throws Exception {
+ assertEquals("/a", instance.find(new Name("/a")).get().fullName().toString());
+ assertEquals("/a/b", instance.find(new Name("/a/b")).get().fullName().toString());
+ assertEquals("/a/b/c", instance.find(new Name("/a/b/c")).get().fullName().toString());
+ }
+
+ @Test
+ public void children() throws Exception {
+ // TODO
+ }
+
+ @Test
+ public void parent() throws Exception {
+ assertNull(instance.parent());
+ }
+
+ @Test
+ public void find() throws Exception {
+ assertEquals(2, instance.find(new Name("/a/b")).get().children().size());
+ }
+
+ @Test
+ public void findDeep() throws Exception {
+ instance.insert(new Name("/a/e/x/y/z"), "...");
+ assertEquals(2, instance.find(new Name("/a")).get().children().size());
+ assertEquals(1, instance.find(new Name("/a/e")).get().children().size());
+ assertEquals(1, instance.find(new Name("/a/e/x")).get().children().size());
+ assertEquals(1, instance.find(new Name("/a/e/x/y")).get().children().size());
+ assertEquals(0, instance.find(new Name("/a/e/x/y/z")).get().children().size());
+ }
+
+ @Test
+ public void insert() throws Exception {
+ // TODO
+ }
+
+ @Test
+ public void delete() throws Exception {
+ // TODO
+ }
+
+ @Test
+ public void count() throws Exception {
+ // TODO
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/InMemoryContentStoreTest.java b/src/test/java/com/intel/jndn/utils/impl/InMemoryContentStoreTest.java
new file mode 100644
index 0000000..449af17
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/InMemoryContentStoreTest.java
@@ -0,0 +1,92 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import com.intel.jndn.mock.MockFace;
+import com.intel.jndn.utils.ContentStore;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class InMemoryContentStoreTest {
+ private final ContentStore instance = new InMemoryContentStore(1000);
+
+ @Test
+ public void basicUsage() throws Exception {
+ instance.put(new Name("/a"), new Blob("."));
+
+ assertTrue(instance.has(new Name("/a")));
+ assertArrayEquals(".".getBytes(), instance.get(new Name("/a")).get().getImmutableArray());
+ }
+
+ @Ignore // TODO need bounds on NameTree
+ @Test
+ public void replacement() throws Exception {
+ instance.put(new Name("/a"), new Blob("."));
+ instance.put(new Name("/a/b"), new Blob(".."));
+ instance.put(new Name("/a/b/c"), new Blob("..."));
+ instance.put(new Name("/a/b/c/d"), new Blob("...."));
+ instance.put(new Name("/a/b/c/d/e"), new Blob("...."));
+ assertTrue(instance.has(new Name("/a")));
+
+ instance.put(new Name("/replace/oldest"), new Blob("."));
+
+ assertFalse(instance.has(new Name("/a")));
+ assertTrue(instance.has(new Name("/replace/oldest")));
+ }
+
+ @Test
+ public void push() throws Exception {
+ MockFace face = new MockFace();
+ Name name = new Name("/a");
+ instance.put(name, new Blob("."));
+
+ instance.push(face, name);
+
+ assertEquals(1, face.sentData.size());
+ assertEquals(name.appendSegment(0), face.sentData.get(0).getName()); // TODO this should probably be smarter and avoid appending segments if not needed
+ }
+
+ @Test
+ public void clear() throws Exception {
+ instance.put(new Name("/a"), new Blob("."));
+ assertTrue(instance.has(new Name("/a")));
+
+ instance.clear();
+
+ assertFalse(instance.has(new Name("/a")));
+ }
+
+ @Test
+ public void retrieveWithSelectors() throws Exception {
+ instance.put(new Name("/a/1"), new Blob("."));
+ instance.put(new Name("/a/2"), new Blob(".."));
+ instance.put(new Name("/a/3"), new Blob("..."));
+
+ Interest interest1 = new Interest(new Name("/a")).setChildSelector(Interest.CHILD_SELECTOR_RIGHT);
+ assertTrue(instance.has(interest1));
+ assertEquals("...", instance.get(interest1).get().toString());
+
+ Interest interest2 = new Interest(new Name("/a")).setChildSelector(Interest.CHILD_SELECTOR_LEFT);
+ assertEquals(".", instance.get(interest2).get().toString());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/impl/SegmentationHelperTest.java b/src/test/java/com/intel/jndn/utils/impl/SegmentationHelperTest.java
new file mode 100644
index 0000000..f9d5384
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/impl/SegmentationHelperTest.java
@@ -0,0 +1,112 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.impl;
+
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test the SegmentationHelper
+ *
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class SegmentationHelperTest {
+
+ private static final byte MARKER = 0x0F;
+
+ @Test
+ public void testSegmentation() throws Exception {
+ final Data template = new Data(new Name("/segmented/data"));
+ final InputStream content = new ByteArrayInputStream("0123456789".getBytes());
+ List<Data> segments = SegmentationHelper.segment(template, content, 1);
+ assertEquals(10, segments.size());
+
+ // test first packet
+ assertEquals(0, segments.get(0).getName().get(-1).toSegment());
+ assertEquals(9, segments.get(0).getMetaInfo().getFinalBlockId().toSegment());
+ assertEquals("0", segments.get(0).getContent().toString());
+
+ // test last packet
+ assertEquals(9, segments.get(9).getName().get(-1).toSegment());
+ assertEquals(9, segments.get(9).getMetaInfo().getFinalBlockId().toSegment());
+ assertEquals("9", segments.get(9).getContent().toString());
+ }
+
+ @Test
+ public void testSegmentationDifferentSizes() throws Exception {
+ final Data template = new Data(new Name("/segmented/data"));
+
+ // size 2
+ final InputStream content2 = new ByteArrayInputStream("0123456789".getBytes());
+ List<Data> segments2 = SegmentationHelper.segment(template, content2, 2);
+ assertEquals(5, segments2.size());
+ assertEquals("89", segments2.get(4).getContent().toString());
+
+ // size 3
+ final InputStream content3 = new ByteArrayInputStream("0123456789".getBytes());
+ List<Data> segments3 = SegmentationHelper.segment(template, content3, 3);
+ assertEquals(4, segments3.size());
+ assertEquals("9", segments3.get(3).getContent().toString());
+
+ // size 4
+ final InputStream content4 = new ByteArrayInputStream("0123456789".getBytes());
+ List<Data> segments4 = SegmentationHelper.segment(template, content4, 4);
+ assertEquals(3, segments4.size());
+ assertEquals("89", segments4.get(2).getContent().toString());
+ }
+
+ @Test
+ public void isSegmented() {
+ Name.Component component = Name.Component.fromNumberWithMarker(42, MARKER);
+ assertTrue(SegmentationHelper.isSegmented(new Name("/segmented/data").append(component), MARKER));
+ }
+
+ @Test
+ public void isNotSegmented() {
+ assertFalse(SegmentationHelper.isSegmented(new Name("/segmented/data"), MARKER));
+ }
+
+ @Test
+ public void parseSegment() throws Exception {
+ Name.Component component = Name.Component.fromNumberWithMarker(42, MARKER);
+ assertEquals(42, SegmentationHelper.parseSegment(new Name("/segmented/data").append(component), MARKER));
+ }
+
+ @Test(expected = EncodingException.class)
+ public void parseMissingSegment() throws Exception {
+ SegmentationHelper.parseSegment(new Name("/segmented/data"), MARKER);
+ }
+
+ @Test
+ public void removeSegment() throws Exception {
+ Name unsegmentedName = new Name("/name");
+ Name segmentedName = new Name(unsegmentedName).append(Name.Component.fromNumberWithMarker(42, MARKER));
+ assertEquals(unsegmentedName, SegmentationHelper.removeSegment(segmentedName, MARKER));
+ }
+
+ @Test
+ public void removeNoSegment() throws Exception {
+ Name unsegmentedName = new Name("/unsegmented/name");
+ assertEquals(unsegmentedName, SegmentationHelper.removeSegment(unsegmentedName, MARKER));
+ }
+}
diff --git a/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java b/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
index 763c361..495e307 100644
--- a/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
+++ b/src/test/java/com/intel/jndn/utils/processing/impl/CompressionStageTest.java
@@ -14,17 +14,19 @@
package com.intel.jndn.utils.processing.impl;
import com.intel.jndn.utils.client.impl.AdvancedClient;
-import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Name;
import net.named_data.jndn.util.Blob;
import org.junit.Test;
-import static org.junit.Assert.*;
+
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertTrue;
/**
* Test {@link CompressionStage}.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class CompressionStageTest {
diff --git a/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java b/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
index 107e6c1..e118fe2 100644
--- a/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
+++ b/src/test/java/com/intel/jndn/utils/processing/impl/SigningStageTest.java
@@ -20,16 +20,17 @@
import net.named_data.jndn.security.SecurityException;
import net.named_data.jndn.util.Blob;
import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertTrue;
/**
* Test {@link SigningStage}.
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SigningStageTest {
- KeyChain keyChain;
- SigningStage instance;
+ private KeyChain keyChain;
+ private SigningStage instance;
public SigningStageTest() throws SecurityException {
keyChain = MockKeyChain.configure(new Name("/test/signer"));
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
new file mode 100644
index 0000000..65bd063
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnAnnouncementServiceTest.java
@@ -0,0 +1,74 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnData;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnAnnouncementServiceTest {
+
+ private NdnAnnouncementService instance;
+ private Face face;
+
+ @Before
+ public void before() {
+ face = mock(Face.class);
+ instance = new NdnAnnouncementService(face, new Name("/topic/prefix"));
+ }
+
+ @Test
+ public void announceEntrance() throws Exception {
+ instance.announceEntrance(42);
+
+ ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+ verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+ assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+ assertEquals(PubSubNamespace.Announcement.ENTRANCE, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+ }
+
+ @Test
+ public void announceExit() throws Exception {
+ instance.announceExit(42);
+
+ ArgumentCaptor<Interest> interest = ArgumentCaptor.forClass(Interest.class);
+ verify(face, times(1)).expressInterest(interest.capture(), any(OnData.class));
+ assertEquals(42, PubSubNamespace.parsePublisher(interest.getValue().getName()));
+ assertEquals(PubSubNamespace.Announcement.EXIT, PubSubNamespace.parseAnnouncement(interest.getValue().getName()));
+ }
+
+ @Test
+ public void discoverExistingAnnouncements() throws Exception {
+
+ }
+
+ @Test
+ public void observeNewAnnouncements() throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
new file mode 100644
index 0000000..acd18c1
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnPublisherTest.java
@@ -0,0 +1,166 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MeasurableFace;
+import com.intel.jndn.mock.MockForwarder;
+import com.intel.jndn.utils.impl.InMemoryContentStore;
+import com.intel.jndn.utils.impl.BoundedInMemoryPendingInterestTable;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnPublisherTest {
+ private static final Logger LOGGER = Logger.getLogger(NdnPublisherTest.class.getName());
+ private static final Name PUBLISHER_PREFIX = new Name("/publisher");
+ private static final long PUBLISHER_ID = new Random().nextLong();
+ private NdnPublisher instance;
+ private Face face;
+ private MockForwarder forwarder;
+
+ @Before
+ public void before() throws Exception {
+ forwarder = new MockForwarder();
+ face = forwarder.connect();
+ NdnAnnouncementService announcementService = new NdnAnnouncementService(face, PUBLISHER_PREFIX);
+ BoundedInMemoryPendingInterestTable pendingInterestTable = new BoundedInMemoryPendingInterestTable(1024);
+ InMemoryContentStore contentStore = new InMemoryContentStore(2000);
+ instance = new NdnPublisher(face, PUBLISHER_PREFIX, PUBLISHER_ID, announcementService, pendingInterestTable, contentStore);
+
+ driveFace(face); // TODO preferably do this in MockForwarder; causes failures if run in certain orders
+ }
+
+ @Test
+ public void basicUsage() throws Exception {
+ instance.open();
+ assertEquals(1, numSentInterests()); // doesn't count prefix registration, only announcement
+
+ instance.publish(new Blob("..."));
+
+ instance.close();
+ assertEquals(2, numSentInterests());
+ assertEquals(0, numReceivedDatas());
+ }
+
+ @Test
+ public void closeWithoutOpen() throws Exception {
+ instance.close();
+
+ assertEquals(0, numSentInterests()); // if the publisher has not been opened, it will not announce an exit
+ assertEquals(0, numReceivedDatas());
+ }
+
+ @Test
+ public void publish() throws Exception {
+ instance.open();
+ assertEquals(1, numSentInterests());
+
+ instance.publish(new Blob("..."));
+ assertEquals(0, numSentDatas());
+ assertEquals(1, numSentInterests()); // none more than from opening
+ }
+
+ @Test
+ public void publishWithPendingInterest() throws Exception {
+ instance.open();
+ assertEquals(1, numSentInterests());
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Face client = forwarder.connect();
+ client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ instance.publish(new Blob("..."));
+ assertEquals(1, numSentDatas());
+ assertEquals(1, numSentInterests()); // none more than from opening
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ latch.await(1, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void publishAndRespondToNewInterest() throws Exception {
+ Face client = forwarder.connect();
+ instance.open();
+ assertEquals(1, numSentInterests());
+
+ instance.publish(new Blob("..."));
+ assertEquals(0, numSentDatas());
+ assertEquals(1, numSentInterests()); // none more than from opening
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.expressInterest(new Interest(PubSubNamespace.toMessageName(PUBLISHER_PREFIX, PUBLISHER_ID, 0)), (interest, data) -> latch.countDown());
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ client.processEvents(); // TODO remove somehow
+ face.processEvents();
+
+ latch.await(1, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ private int numSentDatas() {
+ return ((MeasurableFace) face).sentDatas().size();
+ }
+
+ private int numReceivedDatas() {
+ return ((MeasurableFace) face).receivedDatas().size();
+ }
+
+ private int numSentInterests() {
+ return ((MeasurableFace) face).sentInterests().size();
+ }
+
+ private void driveFace(Face face) {
+ ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
+ pool.schedule(() -> {
+ try {
+ face.processEvents();
+ } catch (IOException | EncodingException e) {
+ LOGGER.log(Level.SEVERE, "Failed to process face events", e);
+ fail(e.getMessage());
+ }
+ }, 50, TimeUnit.MILLISECONDS);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
new file mode 100644
index 0000000..5370f29
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/NdnSubscriberTest.java
@@ -0,0 +1,92 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.utils.Client;
+import com.intel.jndn.utils.On;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class NdnSubscriberTest {
+
+ private static final Name TOPIC_NAME = new Name("/subscriber");
+ private NdnSubscriber instance;
+ private AnnouncementService announcementService;
+
+ @Before
+ public void before() {
+ Face face = mock(Face.class);
+ announcementService = mock(AnnouncementService.class);
+ Client client = mock(Client.class);
+ when(client.getAsync(any(Face.class), any(Interest.class))).thenReturn(new CompletableFuture<>());
+ instance = new NdnSubscriber(face, TOPIC_NAME, null, null, announcementService, client);
+ }
+
+ @Test
+ public void basicUsage() throws Exception {
+ instance.open();
+
+ // subscriber is driven by IO from announcements and publisher messages
+
+ instance.close();
+ }
+
+ @Test
+ public void knownPublishersUsingCallbacks() throws Exception {
+ ArgumentCaptor<On<Long>> existingPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+ ArgumentCaptor<On<Long>> newPublishersSignal = ArgumentCaptor.forClass((Class) On.class);
+ when(announcementService.discoverExistingAnnouncements(existingPublishersSignal.capture(), any(), any())).thenReturn(null);
+ when(announcementService.observeNewAnnouncements(newPublishersSignal.capture(), any(), any())).thenReturn(null);
+ instance.open();
+
+ assertEquals(0, instance.knownPublishers().size());
+
+ existingPublishersSignal.getValue().on(42L);
+ assertEquals(1, instance.knownPublishers().size());
+
+ newPublishersSignal.getValue().on(42L);
+ assertEquals(1, instance.knownPublishers().size());
+
+ existingPublishersSignal.getValue().on(99L);
+ assertEquals(2, instance.knownPublishers().size());
+ }
+
+ @Test
+ public void knownPublishersUsingMethods() throws Exception {
+ instance.addPublisher(99);
+
+ assertEquals(1, instance.knownPublishers().size());
+ assertTrue(instance.knownPublishers().contains(99L));
+
+ instance.removePublisher(99);
+
+ assertEquals(0, instance.knownPublishers().size());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/pubsub/PubSubTestIT.java b/src/test/java/com/intel/jndn/utils/pubsub/PubSubTestIT.java
new file mode 100644
index 0000000..77c93f6
--- /dev/null
+++ b/src/test/java/com/intel/jndn/utils/pubsub/PubSubTestIT.java
@@ -0,0 +1,84 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils.pubsub;
+
+import com.intel.jndn.mock.MockKeyChain;
+import com.intel.jndn.utils.Publisher;
+import com.intel.jndn.utils.Topic;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.ThreadPoolFace;
+import net.named_data.jndn.security.KeyChain;
+import net.named_data.jndn.security.SecurityException;
+import net.named_data.jndn.transport.AsyncTcpTransport;
+import net.named_data.jndn.util.Blob;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class PubSubTestIT {
+ private static final Logger LOGGER = Logger.getLogger(PubSubTestIT.class.getName());
+ private Face pubFace;
+ private Face subFace;
+
+ @Before
+ public void before() throws Exception {
+ String hostname = System.getProperty("nfd.ip");
+ LOGGER.info("Testing on NFD at: " + hostname);
+ pubFace = connect(hostname);
+ subFace = connect(hostname);
+ }
+
+ @Test
+ public void basicUsage() throws Exception {
+ Topic topic = new Topic(new Name("/pub/sub/topic"));
+ CountDownLatch latch = new CountDownLatch(1);
+
+ topic.subscribe(pubFace, b -> latch.countDown(), e -> fail(e.getMessage()));
+
+ Publisher publisher = topic.newPublisher(subFace);
+ publisher.publish(new Blob("."));
+ publisher.publish(new Blob(".."));
+ publisher.publish(new Blob("..."));
+
+ latch.await(20, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ private Face connect(String hostName) throws SecurityException {
+ ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
+ AsyncTcpTransport transport = new AsyncTcpTransport(pool);
+ AsyncTcpTransport.ConnectionInfo connectionInfo = new AsyncTcpTransport.ConnectionInfo(hostName, 6363, true);
+ ThreadPoolFace face = new ThreadPoolFace(pool, transport, connectionInfo);
+
+ Name signatureName = new Name("/topic/test/it").appendVersion(new Random().nextLong()); // note that using the same signature name seemed to cause registration failures
+ KeyChain keyChain = MockKeyChain.configure(signatureName);
+ face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName());
+
+ return face;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java b/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
index 5691b8b..d73604e 100644
--- a/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/ForLoopRepositoryTest.java
@@ -16,7 +16,7 @@
/**
* Test {@link ForLoopRepository}.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class ForLoopRepositoryTest extends RepositoryTest {
diff --git a/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java b/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
index fac32f9..e12e2da 100644
--- a/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/RepoHelper.java
@@ -21,7 +21,7 @@
/**
* Helper methods for testing.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class RepoHelper {
diff --git a/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java b/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
index d981f91..abe0480 100644
--- a/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
+++ b/src/test/java/com/intel/jndn/utils/repository/impl/RepositoryTest.java
@@ -14,17 +14,18 @@
package com.intel.jndn.utils.repository.impl;
import com.intel.jndn.utils.Repository;
-import static com.intel.jndn.utils.repository.impl.RepoHelper.*;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import org.junit.Test;
+
+import static com.intel.jndn.utils.repository.impl.RepoHelper.*;
import static org.junit.Assert.*;
/**
* Extend this in descendant tests.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public abstract class RepositoryTest {
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java
deleted file mode 100644
index b93e1a8..0000000
--- a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerHelperTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms and conditions of the GNU Lesser General Public License,
- * version 3, as published by the Free Software Foundation.
- *
- * This program is distributed in the hope it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
- * more details.
- */
-package com.intel.jndn.utils.server.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.List;
-import net.named_data.jndn.Data;
-import net.named_data.jndn.Name;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-/**
- * Test the SegmentedServerHelper
- *
- * @author Andrew Brown <andrew.brown@intel.com>
- */
-public class SegmentedServerHelperTest {
-
- /**
- * Test of segment method, of class SegmentedServerHelper.
- *
- * @throws java.lang.Exception
- */
- @Test
- public void testSegmentation() throws Exception {
- final Data template = new Data(new Name("/segmented/data"));
- final InputStream content = new ByteArrayInputStream("0123456789".getBytes());
- List<Data> segments = SegmentedServerHelper.segment(template, content, 1);
- assertEquals(10, segments.size());
-
- // test first packet
- assertEquals(0, segments.get(0).getName().get(-1).toSegment());
- assertEquals(9, segments.get(0).getMetaInfo().getFinalBlockId().toSegment());
- assertEquals("0", segments.get(0).getContent().toString());
-
- // test last packet
- assertEquals(9, segments.get(9).getName().get(-1).toSegment());
- assertEquals(9, segments.get(9).getMetaInfo().getFinalBlockId().toSegment());
- assertEquals("9", segments.get(9).getContent().toString());
- }
-
- /**
- * Test of segment method, of class SegmentedServerHelper.
- *
- * @throws java.lang.Exception
- */
- @Test
- public void testSegmentationDifferentSizes() throws Exception {
- final Data template = new Data(new Name("/segmented/data"));
-
- // size 2
- final InputStream content2 = new ByteArrayInputStream("0123456789".getBytes());
- List<Data> segments2 = SegmentedServerHelper.segment(template, content2, 2);
- assertEquals(5, segments2.size());
- assertEquals("89", segments2.get(4).getContent().toString());
-
- // size 3
- final InputStream content3 = new ByteArrayInputStream("0123456789".getBytes());
- List<Data> segments3 = SegmentedServerHelper.segment(template, content3, 3);
- assertEquals(4, segments3.size());
- assertEquals("9", segments3.get(3).getContent().toString());
-
- // size 4
- final InputStream content4 = new ByteArrayInputStream("0123456789".getBytes());
- List<Data> segments4 = SegmentedServerHelper.segment(template, content4, 4);
- assertEquals(3, segments4.size());
- assertEquals("89", segments4.get(2).getContent().toString());
- }
-}
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
index fd6b516..c21d40b 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTest.java
@@ -15,26 +15,27 @@
import com.intel.jndn.mock.MockForwarder;
import com.intel.jndn.utils.client.impl.AdvancedClient;
-import com.intel.jndn.mock.MockFace;
-import java.io.IOException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.util.Blob;
-import static org.junit.Assert.*;
-
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
/**
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SegmentedServerTest {
- Face face;
- SegmentedServer instance;
+ private Face face;
+ private SegmentedServer instance;
@Before
public void before() throws Exception {
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
index a3409a3..8eb991f 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SegmentedServerTestIT.java
@@ -16,8 +16,6 @@
import com.intel.jndn.mock.MockKeyChain;
import com.intel.jndn.utils.TestHelper;
import com.intel.jndn.utils.client.impl.AdvancedClient;
-import java.io.IOException;
-import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -26,23 +24,28 @@
import net.named_data.jndn.security.KeyChain;
import net.named_data.jndn.security.SecurityException;
import net.named_data.jndn.util.Blob;
-import static org.junit.Assert.*;
import org.junit.Test;
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
/**
* Test the {@link SegmentedServer} on an actual NFD; to run this, pass the NFD
* IP/host name as a parameter like <code>-Dnfd.ip=10.1.1.1</code>.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SegmentedServerTestIT {
+ private static final int DATA_SIZE_BYTES = 10000;
private static final Logger logger = Logger.getLogger(SegmentedServerTestIT.class.getName());
private static final Name PREFIX = new Name("/test/for/segmented-server");
- protected static final int DATA_SIZE_BYTES = 10000;
- Face face;
- SegmentedServer instance;
- private String ip;
+ private final Face face;
+ private final SegmentedServer instance;
+ private final String ip;
public SegmentedServerTestIT() throws SecurityException {
this.ip = System.getProperty("nfd.ip");
@@ -95,7 +98,7 @@
logger.info("Retrieved data: " + retrieved.getName().toUri());
}
- Data buildDataPacket(Name name) {
+ private Data buildDataPacket(Name name) {
Data data = new Data(new Name(name).appendSegment(0));
data.setContent(new Blob(TestHelper.buildRandomBytes(DATA_SIZE_BYTES)));
data.getMetaInfo().setFreshnessPeriod(30000);
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
index bd054cc..346c570 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
@@ -13,10 +13,8 @@
*/
package com.intel.jndn.utils.server.impl;
-import com.intel.jndn.utils.ProcessingStage;
import com.intel.jndn.mock.MockFace;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.intel.jndn.utils.ProcessingStage;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
@@ -24,17 +22,21 @@
import net.named_data.jndn.Name;
import org.junit.Before;
import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import static org.junit.Assert.*;
/**
* Test base server implementation.
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class ServerBaseImplTest {
- Face face;
- ServerBaseImpl instance;
+ private Face face;
+ private ServerBaseImpl instance;
@Before
public void before() throws Exception {
@@ -42,18 +44,6 @@
instance = new ServerBaseImplImpl(face, new Name("/test/base"));
}
- public class ServerBaseImplImpl extends ServerBaseImpl {
-
- public ServerBaseImplImpl(Face face, Name prefix) {
- super(face, prefix);
- }
-
- @Override
- public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
- }
- }
-
/**
* Test of getPrefix method, of class ServerBaseImpl.
*/
@@ -61,7 +51,7 @@
public void testGetPrefix() {
assertNotNull(instance.getPrefix());
}
-
+
/**
* Test of getRegisteredPrefixId method, of class ServerBaseImpl
*/
@@ -106,4 +96,16 @@
assertTrue(instance.isRegistered());
executor.shutdownNow();
}
+
+ public class ServerBaseImplImpl extends ServerBaseImpl {
+
+ public ServerBaseImplImpl(Face face, Name prefix) {
+ super(face, prefix);
+ }
+
+ @Override
+ public void onInterest(Name prefix, Interest interest, Face face, long interestFilterId, InterestFilter filter) {
+ throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ }
+ }
}
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java b/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
index 4ce40ae..0b05ff1 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/SimpleServerTest.java
@@ -14,32 +14,32 @@
package com.intel.jndn.utils.server.impl;
import com.intel.jndn.mock.MeasurableFace;
-import com.intel.jndn.mock.MockFace;
import com.intel.jndn.mock.MockForwarder;
import com.intel.jndn.utils.server.RespondWithBlob;
import com.intel.jndn.utils.server.RespondWithData;
-import java.io.IOException;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
-import net.named_data.jndn.OnData;
import net.named_data.jndn.encoding.EncodingException;
import net.named_data.jndn.util.Blob;
-import static org.junit.Assert.*;
-
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
/**
* Test {@link SimpleServer}
*
- * @author Andrew Brown <andrew.brown@intel.com>
+ * @author Andrew Brown, andrew.brown@intel.com
*/
public class SimpleServerTest {
- Face face;
- SimpleServer instance;
+ private Face face;
+ private SimpleServer instance;
@Before
public void before() throws Exception {