Fix Klocwork issues (#5)

* Fix issue 14: insecure Random
* Fix issue 18: incorrect synchronization
* Fix issue 21: exception too generic
* Fix issue 23: stream not closed
* Fix issue 26: compare not using .equals
diff --git a/src/main/java/com/intel/jndn/utils/ProcessingStage.java b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
index d665c84..5bb2a0e 100644
--- a/src/main/java/com/intel/jndn/utils/ProcessingStage.java
+++ b/src/main/java/com/intel/jndn/utils/ProcessingStage.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -28,7 +28,7 @@
    * @param input the object to be processed
    * @return a processed object (this may be the same instance as the input or
    * may be a new object)
-   * @throws Exception if the processing fails
+   * @throws ProcessingStageException thrown by the processing stage to signal failure
    */
-  Y process(T input) throws Exception;
+  Y process(T input) throws ProcessingStageException;
 }
diff --git a/src/main/java/com/intel/jndn/utils/ProcessingStageException.java b/src/main/java/com/intel/jndn/utils/ProcessingStageException.java
new file mode 100644
index 0000000..d72e8d9
--- /dev/null
+++ b/src/main/java/com/intel/jndn/utils/ProcessingStageException.java
@@ -0,0 +1,32 @@
+/*
+ * jndn-utils
+ * Copyright (c) 2016, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 3, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
+ * more details.
+ */
+
+package com.intel.jndn.utils;
+
+/**
+ * @author Andrew Brown, andrew.brown@intel.com
+ */
+public class ProcessingStageException extends Exception {
+  public ProcessingStageException(String message) {
+    super(message);
+  }
+
+  public ProcessingStageException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ProcessingStageException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
index ce5db19..40588a5 100644
--- a/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
+++ b/src/main/java/com/intel/jndn/utils/client/impl/DefaultStreamingClient.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -86,7 +86,31 @@
       }
     });
 
-    return in;
+    return new DoublePipeInputStream(in, out);
+  }
+
+  /**
+   * Helper for closing both ends of the pipe on close
+   */
+  private class DoublePipeInputStream extends InputStream {
+    private final PipedInputStream in;
+    private final PipedOutputStream out;
+
+    public DoublePipeInputStream(PipedInputStream in, PipedOutputStream out) {
+      this.in = in;
+      this.out = out;
+    }
+
+    @Override
+    public int read() throws IOException {
+      return in.read();
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+      out.close();
+    }
   }
 
   /**
diff --git a/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
index 6102a9b..5a61c12 100644
--- a/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
+++ b/src/main/java/com/intel/jndn/utils/impl/BoundedLinkedMap.java
@@ -104,7 +104,7 @@
   @Override
   public synchronized V remove(Object key) {
     V value = map.remove(key);
-    if (key == latest) {
+    if (key.equals(latest)) {
       latest = findLatest(map);
     }
     return value;
diff --git a/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
index 5befb6d..b66819d 100644
--- a/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/CompressionStage.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -14,10 +14,12 @@
 package com.intel.jndn.utils.processing.impl;
 
 import com.intel.jndn.utils.ProcessingStage;
+import com.intel.jndn.utils.ProcessingStageException;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.util.Blob;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.zip.GZIPOutputStream;
 
 /**
@@ -33,14 +35,17 @@
    *
    * @param context the {@link Data} packet
    * @return the same packet but with GZIP-compressed content
-   * @throws Exception if compression fails
+   * @throws ProcessingStageException if compression fails
    */
   @Override
-  public Data process(Data context) throws Exception {
+  public Data process(Data context) throws ProcessingStageException {
     ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
     try (GZIPOutputStream stream = new GZIPOutputStream(buffer)) {
       stream.write(context.getContent().getImmutableArray(), 0, context.getContent().size());
       stream.close();
+    } catch (IOException e) {
+      throw new ProcessingStageException(e);
     }
 
     context.setContent(new Blob(buffer.toByteArray()));
diff --git a/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
index d9eb897..0ba6d5d 100644
--- a/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
+++ b/src/main/java/com/intel/jndn/utils/processing/impl/SigningStage.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -14,6 +14,7 @@
 package com.intel.jndn.utils.processing.impl;
 
 import com.intel.jndn.utils.ProcessingStage;
+import com.intel.jndn.utils.ProcessingStageException;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Name;
 import net.named_data.jndn.security.KeyChain;
@@ -57,11 +58,15 @@
    *
    * @param context the data packet to sign
    * @return the signed data packet
-   * @throws Exception if signing fails
+   * @throws ProcessingStageException if signing fails
    */
   @Override
-  public Data process(Data context) throws Exception {
-    keyChain.sign(context, certificateName);
+  public Data process(Data context) throws ProcessingStageException {
+    try {
+      keyChain.sign(context, certificateName);
+    } catch (SecurityException e) {
+      throw new ProcessingStageException(e);
+    }
     return context;
   }
 }
diff --git a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
index adb069b..3e6abdf 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/NdnPublisher.java
@@ -63,7 +63,32 @@
     this.contentStore = contentStore;

   }

 

+  private static boolean isAttributesRequest(Name name, Interest interest) {

+    return name.equals(interest.getName()) && interest.getChildSelector() == -1;

+  }

+

+  private static void sendAttributes(Face face, Name publisherName) {

+    Data data = new Data(publisherName);

+    data.setContent(new Blob("[attributes here]"));

+    data.getMetaInfo().setFreshnessPeriod(ATTRIBUTES_FRESHNESS_PERIOD);

+    try {

+      face.putData(data);

+    } catch (IOException e) {

+      LOGGER.log(Level.SEVERE, "Failed to publish attributes for publisher: " + publisherName, e);

+    }

+  }

+

+  /**

+   * Open the publisher, registering prefixes and announcing group entry. If the publisher is already open, this method

+   * immediately returns

+   *

+   * @throws IOException if prefix registration or group announcement fails

+   */

   synchronized void open() throws IOException {

+    if (opened) {

+      return;

+    }

+

     opened = true;

     CompletableFuture<Void> future = new CompletableFuture<>();

     OnRegistration onRegistration = new OnRegistration(future);

@@ -95,9 +120,7 @@
 

   @Override

   public void publish(Blob message) throws IOException {

-    if (!opened) {

-      open();

-    }

+    open(); // will immediately return if already open

 

     long id = latestMessageId.getAndIncrement();

     Name name = PubSubNamespace.toMessageName(prefix, id);

@@ -125,10 +148,6 @@
     }

   }

 

-  private static boolean isAttributesRequest(Name name, Interest interest) {

-    return name.equals(interest.getName()) && interest.getChildSelector() == -1;

-  }

-

   private void sendContent(Face face, Name name) {

     try {

       contentStore.push(face, name);

@@ -144,15 +163,4 @@
       LOGGER.log(Level.SEVERE, "Failed to publish message, aborting: {0}", new Object[]{interest.getName(), e});

     }

   }

-

-  private static void sendAttributes(Face face, Name publisherName) {

-    Data data = new Data(publisherName);

-    data.setContent(new Blob("[attributes here]"));

-    data.getMetaInfo().setFreshnessPeriod(ATTRIBUTES_FRESHNESS_PERIOD);

-    try {

-      face.putData(data);

-    } catch (IOException e) {

-      LOGGER.log(Level.SEVERE, "Failed to publish attributes for publisher: " + publisherName, e);

-    }

-  }

 }

diff --git a/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
index c0caddc..7be87a8 100644
--- a/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
+++ b/src/main/java/com/intel/jndn/utils/pubsub/PubSubFactory.java
@@ -25,7 +25,7 @@
 import net.named_data.jndn.util.Blob;
 
 import java.io.IOException;
-import java.util.Random;
+import java.security.SecureRandom;
 
 /**
  * Assemble the necessary elements for building the {@link Publisher} and {@link Subscriber} implementations
@@ -60,7 +60,7 @@
    * @return a group-announcing, unopened subscriber (it will automatically open on first publish)
    */
   public static Publisher newPublisher(Face face, Name prefix) {
-    long publisherId = Math.abs(new Random().nextLong());
+    long publisherId = Math.abs(new SecureRandom().nextLong());
     return new NdnPublisher(face, prefix, publisherId, new NdnAnnouncementService(face, prefix), new BoundedInMemoryPendingInterestTable(1024), new InMemoryContentStore(2000));
   }
 }
diff --git a/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
index 64130bd..66f11ad 100644
--- a/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
+++ b/src/main/java/com/intel/jndn/utils/server/impl/ServerBaseImpl.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -14,6 +14,7 @@
 package com.intel.jndn.utils.server.impl;
 
 import com.intel.jndn.utils.ProcessingStage;
+import com.intel.jndn.utils.ProcessingStageException;
 import com.intel.jndn.utils.Server;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
@@ -113,9 +114,9 @@
    * @param data the {@link Data} to process
    * @return a processed {@link Data} packet; no guarantee as to whether it is
    * the same instance as passed in as a parameter (and likely not).
-   * @throws Exception if a pipeline stage fails
+   * @throws ProcessingStageException if a pipeline stage fails
    */
-  public Data processPipeline(Data data) throws Exception {
+  public Data processPipeline(Data data) throws ProcessingStageException {
     for (ProcessingStage<Data, Data> stage : pipeline) {
       data = stage.process(data);
     }
diff --git a/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
index 346c570..728f899 100644
--- a/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
+++ b/src/test/java/com/intel/jndn/utils/server/impl/ServerBaseImplTest.java
@@ -1,6 +1,6 @@
 /*
  * jndn-utils
- * Copyright (c) 2015, Intel Corporation.
+ * Copyright (c) 2016, Intel Corporation.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms and conditions of the GNU Lesser General Public License,
@@ -15,6 +15,7 @@
 
 import com.intel.jndn.mock.MockFace;
 import com.intel.jndn.utils.ProcessingStage;
+import com.intel.jndn.utils.ProcessingStageException;
 import net.named_data.jndn.Data;
 import net.named_data.jndn.Face;
 import net.named_data.jndn.Interest;
@@ -55,7 +56,7 @@
   /**
    * Test of getRegisteredPrefixId method, of class ServerBaseImpl
    */
-  public void testGetRegisteredPrefixId(){
+  public void testGetRegisteredPrefixId() {
     assertEquals(ServerBaseImpl.UNREGISTERED, instance.getRegisteredPrefixId());
   }
 
@@ -76,8 +77,8 @@
   public void testPipeline() throws Exception {
     ProcessingStage<Data, Data> pipelineStage = new ProcessingStage<Data, Data>() {
       @Override
-      public Data process(Data context) throws Exception {
-        throw new Exception("Test exceptions with this");
+      public Data process(Data context) throws ProcessingStageException {
+        throw new ProcessingStageException("Test exceptions with this");
       }
     };
     instance.addPostProcessingStage(pipelineStage);