Add new Server implementation
diff --git a/src/main/java/com/intel/jndn/utils/DynamicServer.java b/src/main/java/com/intel/jndn/utils/DynamicServer.java
new file mode 100644
index 0000000..8206e27
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/DynamicServer.java
@@ -0,0 +1,40 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.server.RespondWithData;
+import com.intel.jndn.utils.server.Server;
+import java.io.IOException;
+
+/**
+ * Defines the API for a {@link Server} producing {@link Data} packets
+ * dynamically; in other words, when an {@link Interest} arrives, this server
+ * will run a callback to determine what packet to send back. As good practice,
+ * keep callback methods as short as possible.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface DynamicServer extends Server {
+
+ /**
+ * Set the callback method to run when an {@link Interest} packet is passed to
+ * this server. This method should either return a {@link Data} packet that
+ * satisfies the Interest or throw an Exception to avoid sending. Calling this
+ * method a second time should replace the callback.
+ *
+ * @param callback the callback instance
+ * @throws java.io.IOException if the server fails to register a prefix
+ */
+ public void respondUsing(RespondWithData callback) throws IOException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/RepositoryServer.java b/src/main/java/com/intel/jndn/utils/RepositoryServer.java
new file mode 100644
index 0000000..0364eb7
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/RepositoryServer.java
@@ -0,0 +1,38 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.server.Server;
+import java.io.IOException;
+import net.named_data.jndn.Data;
+
+/**
+ * Defines the API for a {@link Server} producing {@link Data} packets and
+ * storing them until they are requested; this server corresponds closely to use
+ * cases such as: cache, file system, web server.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface RepositoryServer extends Server {
+
+ /**
+ * Store a {@link Data} packet in the server's repository until requested. The
+ * task of removing (or retaining) stale packets is not specified here but
+ * left to the implementation.
+ *
+ * @param data the {@link Data} packet to store and serve
+ * @throws IOException if the underlying server fails to store the packet
+ */
+ public void serve(Data data) throws IOException;
+}
diff --git a/src/main/java/com/intel/jndn/utils/SegmentedServer.java b/src/main/java/com/intel/jndn/utils/SegmentedServer.java
new file mode 100644
index 0000000..0daf0af
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/SegmentedServer.java
@@ -0,0 +1,91 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.repository.ForLoopRepository;
+import com.intel.jndn.utils.repository.Repository;
+import com.intel.jndn.utils.server.SegmentedServerHelper;
+import com.intel.jndn.utils.server.ServerBaseImpl;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.encoding.EncodingException;
+import net.named_data.jndn.transport.Transport;
+
+/**
+ * Implementation of a {@link RepositoryServer} that segments packets stored in
+ * its repository.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SegmentedServer extends ServerBaseImpl implements RepositoryServer {
+
+ private static final Logger logger = Logger.getLogger(SegmentedClient.class.getName());
+ private final Repository repository = new ForLoopRepository();
+
+ /**
+ * {@inheritDoc}
+ */
+ public SegmentedServer(Face face, Name prefix) {
+ super(face, prefix);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void serve(Data data) throws IOException {
+ if (!isRegistered()) {
+ register();
+ }
+
+ InputStream stream = new ByteArrayInputStream(data.getContent().getImmutableArray());
+ List<Data> segments = SegmentedServerHelper.segment(data, stream);
+ for (Data segment : segments) {
+ logger.info("Added segment: " + segment.getName().toUri());
+ repository.put(segment);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
+ if (interest.getChildSelector() == -1) {
+ try {
+ interest.getName().get(-1).toSegment();
+ } catch (EncodingException e) {
+ interest.setChildSelector(Interest.CHILD_SELECTOR_LEFT);
+ }
+ }
+
+ try {
+ Data data = repository.get(interest);
+ data = processPipeline(data);
+ ByteBuffer buffer = data.wireEncode().buf();
+ transport.send(buffer);
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "Failed to send data for: " + interest.toUri(), e);
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/SimpleServer.java b/src/main/java/com/intel/jndn/utils/SimpleServer.java
new file mode 100644
index 0000000..0bd4667
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/SimpleServer.java
@@ -0,0 +1,96 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils;
+
+import com.intel.jndn.utils.server.RespondWithData;
+import com.intel.jndn.utils.server.RespondWithBlob;
+import com.intel.jndn.utils.server.ServerBaseImpl;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.Interest;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.transport.Transport;
+import net.named_data.jndn.util.Blob;
+
+/**
+ * Implementation of a {@link DynamicServer} that wraps the {@link OnInterest}
+ * callback with some encoding and pipeline support.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public class SimpleServer extends ServerBaseImpl implements DynamicServer {
+
+ private static final Logger logger = Logger.getLogger(SegmentedClient.class.getName());
+ private RespondWithData callback;
+
+ /**
+ * {@inheritDoc}
+ */
+ public SimpleServer(Face face, Name prefix) {
+ super(face, prefix);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void respondUsing(RespondWithData callback) throws IOException {
+ if (!isRegistered()) {
+ register();
+ }
+ this.callback = callback;
+ }
+
+ /**
+ * Convenience method for responding to an {@link Interest} by returning the
+ * {@link Blob} content only; when an Interest arrives, this method wraps the
+ * returned Blob with a {@link Data} using the exact {@link Name} of the
+ * incoming Interest.
+ *
+ * @param callback the callback function to retrieve content when an
+ * {@link Interest} arrives
+ * @throws java.io.IOException if the server fails to register a prefix
+ */
+ public void respondUsing(final RespondWithBlob callback) throws IOException {
+ RespondWithData dataCallback = new RespondWithData() {
+ @Override
+ public Data onInterest(Name prefix, Interest interest) throws Exception {
+ Data data = new Data(interest.getName());
+ Blob content = callback.onInterest(prefix, interest);
+ data.setContent(content);
+ return data;
+ }
+ };
+ respondUsing(dataCallback);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
+ try {
+ Data data = callback.onInterest(prefix, interest);
+ data = processPipeline(data);
+ ByteBuffer buffer = data.wireEncode().buf();
+ transport.send(buffer);
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "Failed to send data for: " + interest.toUri(), e);
+ }
+ }
+}
diff --git a/src/main/java/com/intel/jndn/utils/OnServeInterest.java b/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
similarity index 79%
copy from src/main/java/com/intel/jndn/utils/OnServeInterest.java
copy to src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
index 9c67141..fe7f74a 100644
--- a/src/main/java/com/intel/jndn/utils/OnServeInterest.java
+++ b/src/main/java/com/intel/jndn/utils/server/RespondWithBlob.java
@@ -11,18 +11,18 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server;
-import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
+import net.named_data.jndn.util.Blob;
/**
* Functional interface for serving data from Server.on()
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public interface OnServeInterest {
+public interface RespondWithBlob {
- public Data onInterest(Name prefix, Interest interest);
+ public Blob onInterest(Name prefix, Interest interest) throws Exception;
}
diff --git a/src/main/java/com/intel/jndn/utils/OnServeInterest.java b/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
similarity index 83%
rename from src/main/java/com/intel/jndn/utils/OnServeInterest.java
rename to src/main/java/com/intel/jndn/utils/server/RespondWithData.java
index 9c67141..95d0e6a 100644
--- a/src/main/java/com/intel/jndn/utils/OnServeInterest.java
+++ b/src/main/java/com/intel/jndn/utils/server/RespondWithData.java
@@ -11,7 +11,7 @@
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*/
-package com.intel.jndn.utils;
+package com.intel.jndn.utils.server;
import net.named_data.jndn.Data;
import net.named_data.jndn.Interest;
@@ -22,7 +22,7 @@
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
-public interface OnServeInterest {
+public interface RespondWithData {
- public Data onInterest(Name prefix, Interest interest);
+ public Data onInterest(Name prefix, Interest interest) throws Exception;
}
diff --git a/src/main/java/com/intel/jndn/utils/server/SegmentedServerHelper.java b/src/main/java/com/intel/jndn/utils/server/SegmentedServerHelper.java
index ce607cf..2e1f92c 100644
--- a/src/main/java/com/intel/jndn/utils/server/SegmentedServerHelper.java
+++ b/src/main/java/com/intel/jndn/utils/server/SegmentedServerHelper.java
@@ -24,7 +24,11 @@
import net.named_data.jndn.util.Blob;
/**
- * Helper for segmenting an input stream into a list of Data packets.
+ * Helper for segmenting an input stream into a list of Data packets. Current
+ * use of the default segment size of 4096 (only for
+ * {@link #segment(net.named_data.jndn.Data, java.io.InputStream)} is based on
+ * several assumptions: NDN packet size was limited to 8000 at the time this was
+ * written and signature size is unknown.
*
* @author Andrew Brown <andrew.brown@intel.com>
*/
@@ -36,10 +40,11 @@
* Segment a stream of bytes into a list of Data packets; this must read all
* the bytes first in order to determine the end segment for FinalBlockId.
*
- * @param template
- * @param bytes
- * @return
- * @throws IOException
+ * @param template the {@link Data} packet to use for the segment {@link Name},
+ * {@link net.named_data.jndn.MetaInfo}, etc.
+ * @param bytes an {@link InputStream} to the bytes to segment
+ * @return a list of segmented {@link Data} packets
+ * @throws IOException if the stream fails
*/
public static List<Data> segment(Data template, InputStream bytes) throws IOException {
return segment(template, bytes, DEFAULT_SEGMENT_SIZE);
@@ -49,11 +54,11 @@
* Segment a stream of bytes into a list of Data packets; this must read all
* the bytes first in order to determine the end segment for FinalBlockId.
*
- * @param template
- * @param bytes
- * @param segmentSize
- * @return
- * @throws IOException
+ * @param template the {@link Data} packet to use for the segment {@link Name},
+ * {@link net.named_data.jndn.MetaInfo}, etc.
+ * @param bytes an {@link InputStream} to the bytes to segment
+ * @return a list of segmented {@link Data} packets
+ * @throws IOException if the stream fails
*/
public static List<Data> segment(Data template, InputStream bytes, int segmentSize) throws IOException {
List<Data> segments = new ArrayList<>();
@@ -77,11 +82,11 @@
}
/**
- * Read all the bytes in an input stream.
+ * Read all of the bytes in an input stream.
*
- * @param bytes
- * @return
- * @throws IOException
+ * @param bytes the {@link InputStream} of bytes to read
+ * @return an array of all bytes retrieved from the stream
+ * @throws IOException if the stream fails
*/
public static byte[] readAll(InputStream bytes) throws IOException {
ByteArrayOutputStream builder = new ByteArrayOutputStream();
diff --git a/src/main/java/com/intel/jndn/utils/server/Server.java b/src/main/java/com/intel/jndn/utils/server/Server.java
new file mode 100644
index 0000000..ba4ff1c
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/server/Server.java
@@ -0,0 +1,46 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.server;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnInterest;
+
+/**
+ * Base interface for defining a server; see descendant interfaces for different
+ * modes of serving packets. This class extends {@link Runnable} expecting
+ * implementing classes to do any necessary event processing in the
+ * {@link Runnable#run()} block, thus allowing different ways to manage servers
+ * (e.g. single-thread vs {@link ScheduledThreadPoolExecutor}.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public interface Server extends Runnable, OnInterest {
+
+ /**
+ * @return the {@link Name} prefix this server is serving on.
+ */
+ public Name getPrefix();
+
+ /**
+ * Add a stage to the server pipeline. Each stage should be processed once the
+ * server has the {@link Data} packet available to send (e.g. after a callback
+ * has produced a packet); also, stages should be processed in the order they
+ * are added.
+ *
+ * @param pipelineStage a Data-to-Data processing stage
+ */
+ public void addPipelineStage(PipelineStage<Data, Data> pipelineStage);
+}
diff --git a/src/main/java/com/intel/jndn/utils/server/ServerBaseImpl.java b/src/main/java/com/intel/jndn/utils/server/ServerBaseImpl.java
new file mode 100644
index 0000000..0d150d6
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/server/ServerBaseImpl.java
@@ -0,0 +1,140 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2015, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ */
+package com.intel.jndn.utils.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.named_data.jndn.Data;
+import net.named_data.jndn.Face;
+import net.named_data.jndn.ForwardingFlags;
+import net.named_data.jndn.Name;
+import net.named_data.jndn.OnRegisterFailed;
+import net.named_data.jndn.encoding.EncodingException;
+
+/**
+ * Base implementation for a {@link Server}.
+ *
+ * @author Andrew Brown <andrew.brown@intel.com>
+ */
+public abstract class ServerBaseImpl implements Server {
+
+ private static final Logger logger = Logger.getLogger(ServerBaseImpl.class.getName());
+ private final Face face;
+ private final Name prefix;
+ private final List<PipelineStage> pipeline = new ArrayList<>();
+ private boolean registered = false;
+
+ /**
+ * Build the base server; register() must run separately and is run
+ * automatically on {@link #run()}.
+ *
+ * @param face a {@link Face} allowing prefix registration (see
+ * {@link Face#setCommandSigningInfo(net.named_data.jndn.security.KeyChain, net.named_data.jndn.Name)}
+ * @param prefix the {@link Name} to register
+ */
+ public ServerBaseImpl(Face face, Name prefix) {
+ this.face = face;
+ this.prefix = prefix;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Name getPrefix() {
+ return prefix;
+ }
+
+ /**
+ * @return true if the server has registered a prefix on the face
+ */
+ public boolean isRegistered() {
+ return registered;
+ }
+
+ /**
+ * Register a prefix for responding to interests.
+ *
+ * @throws java.io.IOException if IO fails
+ */
+ public void register() throws IOException {
+ registered = true;
+ try {
+ face.registerPrefix(prefix, this, new OnRegisterFailed() {
+ @Override
+ public void onRegisterFailed(Name prefix) {
+ registered = false;
+ logger.log(Level.SEVERE, "Failed to register prefix: " + prefix.toUri());
+ }
+ }, new ForwardingFlags());
+ } catch (net.named_data.jndn.security.SecurityException e) {
+ registered = false;
+ throw new IOException("Failed to communicate to face due to security error", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addPipelineStage(PipelineStage<Data, Data> pipelineStage) {
+ pipeline.add(pipelineStage);
+ }
+
+ /**
+ * Process the {@link Data} before sending it; this runs the packet through
+ * each registered {@link PipelineStage} in order.
+ *
+ * @param data the {@link Data} to process
+ * @return a processed {@link Data} packet; no guarantee as to whether it is
+ * the same instance as passed in as a parameter (and likely not).
+ * @throws Exception if a pipeline stage fails
+ */
+ public Data processPipeline(Data data) throws Exception {
+ for (PipelineStage<Data, Data> stage : pipeline) {
+ data = stage.process(data);
+ }
+ return data;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ if (!isRegistered()) {
+ try {
+ register();
+ } catch (IOException ex) {
+ throw new RuntimeException("Failed to register prefix, aborting.", ex);
+ }
+ }
+
+ // continuously serve packets
+ while (true) {
+ try {
+ synchronized (face) {
+ face.processEvents();
+ }
+ } catch (IOException ex) {
+ logger.log(Level.SEVERE, "Failed to process events.", ex);
+ } catch (EncodingException ex) {
+ logger.log(Level.SEVERE, "Failed to parse bytes.", ex);
+ }
+ }
+ }
+}