| /* |
| * File channel: Publisher.java |
| * |
| * Purpose: Provide the publisher side of a pub-sub API. |
| * |
| * © Copyright Intel Corporation. All rights reserved. |
| * Intel Corporation, 2200 Mission College Boulevard, |
| * Santa Clara, CA 95052-8119, USA |
| */ |
| package com.intel.jndn.utils; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Map.Entry; |
| import net.named_data.jndn.Data; |
| import net.named_data.jndn.Face; |
| import net.named_data.jndn.ForwardingFlags; |
| import net.named_data.jndn.Interest; |
| import net.named_data.jndn.Name; |
| import net.named_data.jndn.OnInterest; |
| import net.named_data.jndn.OnRegisterFailed; |
| import net.named_data.jndn.security.SecurityException; |
| import net.named_data.jndn.transport.Transport; |
| import net.named_data.jndn.util.Blob; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| /** |
| * This implementation requires both ends (publisher and subscriber) to be |
| * routable to each other. When this class receives data to publish (through |
| * publish()), it sends an alert to all subscribers indicating new data is |
| * available to retrieve; subscribers then retrieve this data using the channel |
| * provided in the alert packet. The flow is as follows: - Publisher listens on |
| * a given channel (e.g. /my/channel) - Subscriber also listens on a given |
| * channel (e.g. /my/channel) - Publisher receives a new Foo to publish - |
| * Publisher notifies subscribers of new data (e.g. |
| * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber |
| * retrieves data (e.g. /my/channel/foo/[timestamp]) |
| * |
| * @author Andrew Brown <andrew.brown@intel.com> |
| */ |
| public class Publisher implements OnInterest, OnRegisterFailed { |
| |
| public static final long DEFAULT_TIMEOUT = 2000; |
| private static final Logger logger = LogManager.getLogger(); |
| protected HashMap<Class, String> types = new HashMap<>(); |
| protected HashMap<Name, PublishedObject> published = new HashMap<>(); |
| protected Face face; |
| protected Name channel; |
| protected boolean registered = true; |
| protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives |
| |
| /** |
| * Initialize a publisher on the given channel; published objects will |
| * persist as long as this instance lives. |
| * |
| * @param face |
| * @param channel |
| */ |
| public Publisher(final Face face, final Name channel) { |
| this.face = face; |
| this.channel = channel; |
| try { |
| this.face.registerPrefix(this.channel, this, this, new ForwardingFlags()); |
| } catch (IOException e) { |
| logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e); |
| } catch (SecurityException e) { |
| logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e); |
| } |
| } |
| |
| /** |
| * Initialize a publisher on the given channel; limit the lifetime of |
| * published objects to save memory. |
| * |
| * @param face |
| * @param channel |
| * @param publishLifetime |
| */ |
| public Publisher(final Face face, final Name channel, long publishLifetime) { |
| this(face, channel); |
| this.publishLifetime = publishLifetime; |
| } |
| |
| /** |
| * Add a type and its alias to this publisher |
| * |
| * @param typeAlias |
| * @param type |
| */ |
| public void addType(String typeAlias, Class type) { |
| types.put(type, typeAlias); |
| } |
| |
| |
| /** |
| * Publish the given object by sending an alert to all subscribers |
| * |
| * @param publishedObject |
| */ |
| public void publish(Object publishedObject) { |
| // check if this publisher is registered on the face |
| if (!registered) { |
| logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri()); |
| return; |
| } |
| |
| // check known types |
| if (!types.containsKey(publishedObject.getClass())) { |
| logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName()); |
| return; |
| } |
| |
| // check if old published objects need to be removed |
| clearOldPublished(); |
| |
| // track published objects |
| Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis()); |
| published.put(publishedName, new PublishedObject(publishedObject)); |
| |
| // send out alert and don't wait for response |
| Client client = new Client(); |
| client.get(face, getAlert(publishedName)); |
| logger.debug("Notified channel of new published object: " + publishedName.toUri()); |
| } |
| |
| /** |
| * Handle incoming requests for published objects; will check the published |
| * list and respond with objects that exactly match the incoming interest. |
| * TODO signing, encryption |
| * |
| * @param prefix |
| * @param interest |
| * @param transport |
| * @param registeredPrefixId |
| */ |
| @Override |
| public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) { |
| // check for old published objects |
| clearOldPublished(); |
| |
| // publish object if available |
| if (published.containsKey(interest.getName())) { |
| try { |
| Data data = new Data(interest.getName()); |
| data.setContent(new Blob("TODO parse object")); |
| if (publishLifetime != -1) { |
| data.getMetaInfo().setFreshnessPeriod(publishLifetime); |
| } |
| // data.setSignature(...); |
| transport.send(data.wireEncode().signedBuf()); |
| logger.debug("Sent data: " + interest.getName().toUri()); |
| } catch (IOException e) { |
| logger.error("Failed to send data: " + interest.getName().toUri()); |
| } |
| } |
| } |
| |
| /** |
| * Handle registration failure; this will stop the publisher from sending |
| * notifications since the routes will not be setup to respond |
| * |
| * @param prefix |
| */ |
| @Override |
| public void onRegisterFailed(Name prefix) { |
| logger.error("Failed to register publisher: " + channel.toUri()); |
| registered = false; |
| } |
| |
| /** |
| * Remove any published objects that have outlived their lifetime; a |
| * lifetime of -1 persists them forever. |
| */ |
| public void clearOldPublished() { |
| if (publishLifetime == -1) { |
| return; |
| } |
| for (Entry<Name, PublishedObject> e : published.entrySet()) { |
| if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) { |
| published.remove(e.getKey()); |
| logger.debug("Removing old published object: " + e.getKey().toUri()); |
| } |
| } |
| } |
| |
| /** |
| * Build an alert Interest to notify subscribers to retrieve data from this |
| * publisher; the Interest will have the form |
| * /this/published/channel/ALERT/[encoded channel to request] |
| * |
| * @param nameToPublish |
| * @return |
| */ |
| protected Interest getAlert(Name nameToPublish) { |
| Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode()); |
| Interest alert = new Interest(alertName); |
| alert.setMustBeFresh(true); // interest must reach the application subscribers |
| alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT); |
| return alert; |
| } |
| |
| /** |
| * Helper class to track published objects and their timestamps |
| */ |
| private class PublishedObject { |
| |
| Object publishedObject; |
| long publishedOn; |
| |
| public PublishedObject(Object object) { |
| publishedObject = object; |
| } |
| } |
| } |