Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame^] | 1 | /* |
| 2 | * File channel: Publisher.java |
| 3 | * |
| 4 | * Purpose: Provide the publisher side of a pub-sub API. |
| 5 | * |
| 6 | * © Copyright Intel Corporation. All rights reserved. |
| 7 | * Intel Corporation, 2200 Mission College Boulevard, |
| 8 | * Santa Clara, CA 95052-8119, USA |
| 9 | */ |
| 10 | package com.intel.jndn.utils; |
| 11 | |
| 12 | import java.io.IOException; |
| 13 | import java.util.HashMap; |
| 14 | import java.util.Map.Entry; |
| 15 | import net.named_data.jndn.Data; |
| 16 | import net.named_data.jndn.Face; |
| 17 | import net.named_data.jndn.ForwardingFlags; |
| 18 | import net.named_data.jndn.Interest; |
| 19 | import net.named_data.jndn.Name; |
| 20 | import net.named_data.jndn.OnInterest; |
| 21 | import net.named_data.jndn.OnRegisterFailed; |
| 22 | import net.named_data.jndn.security.SecurityException; |
| 23 | import net.named_data.jndn.transport.Transport; |
| 24 | import net.named_data.jndn.util.Blob; |
| 25 | import org.apache.logging.log4j.LogManager; |
| 26 | import org.apache.logging.log4j.Logger; |
| 27 | |
| 28 | /** |
| 29 | * This implementation requires both ends (publisher and subscriber) to be |
| 30 | * routable to each other. When this class receives data to publish (through |
| 31 | * publish()), it sends an alert to all subscribers indicating new data is |
| 32 | * available to retrieve; subscribers then retrieve this data using the channel |
| 33 | * provided in the alert packet. The flow is as follows: - Publisher listens on |
| 34 | * a given channel (e.g. /my/channel) - Subscriber also listens on a given |
| 35 | * channel (e.g. /my/channel) - Publisher receives a new Foo to publish - |
| 36 | * Publisher notifies subscribers of new data (e.g. |
| 37 | * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber |
| 38 | * retrieves data (e.g. /my/channel/foo/[timestamp]) |
| 39 | * |
| 40 | * @author Andrew Brown <andrew.brown@intel.com> |
| 41 | */ |
| 42 | public class Publisher implements OnInterest, OnRegisterFailed { |
| 43 | |
| 44 | public static final long DEFAULT_TIMEOUT = 2000; |
| 45 | private static final Logger logger = LogManager.getLogger(); |
| 46 | protected HashMap<Class, String> types = new HashMap<>(); |
| 47 | protected HashMap<Name, PublishedObject> published = new HashMap<>(); |
| 48 | protected Face face; |
| 49 | protected Name channel; |
| 50 | protected boolean registered = true; |
| 51 | protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives |
| 52 | |
| 53 | /** |
| 54 | * Initialize a publisher on the given channel; published objects will |
| 55 | * persist as long as this instance lives. |
| 56 | * |
| 57 | * @param face |
| 58 | * @param channel |
| 59 | */ |
| 60 | public Publisher(final Face face, final Name channel) { |
| 61 | this.face = face; |
| 62 | this.channel = channel; |
| 63 | try { |
| 64 | this.face.registerPrefix(this.channel, this, this, new ForwardingFlags()); |
| 65 | } catch (IOException e) { |
| 66 | logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e); |
| 67 | } catch (SecurityException e) { |
| 68 | logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e); |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | /** |
| 73 | * Initialize a publisher on the given channel; limit the lifetime of |
| 74 | * published objects to save memory. |
| 75 | * |
| 76 | * @param face |
| 77 | * @param channel |
| 78 | * @param publishLifetime |
| 79 | */ |
| 80 | public Publisher(final Face face, final Name channel, long publishLifetime) { |
| 81 | this(face, channel); |
| 82 | this.publishLifetime = publishLifetime; |
| 83 | } |
| 84 | |
| 85 | /** |
| 86 | * Add a type and its alias to this publisher |
| 87 | * |
| 88 | * @param typeAlias |
| 89 | * @param type |
| 90 | */ |
| 91 | public void addType(String typeAlias, Class type) { |
| 92 | types.put(type, typeAlias); |
| 93 | } |
| 94 | |
| 95 | |
| 96 | /** |
| 97 | * Publish the given object by sending an alert to all subscribers |
| 98 | * |
| 99 | * @param publishedObject |
| 100 | */ |
| 101 | public void publish(Object publishedObject) { |
| 102 | // check if this publisher is registered on the face |
| 103 | if (!registered) { |
| 104 | logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri()); |
| 105 | return; |
| 106 | } |
| 107 | |
| 108 | // check known types |
| 109 | if (!types.containsKey(publishedObject.getClass())) { |
| 110 | logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName()); |
| 111 | return; |
| 112 | } |
| 113 | |
| 114 | // check if old published objects need to be removed |
| 115 | clearOldPublished(); |
| 116 | |
| 117 | // track published objects |
| 118 | Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis()); |
| 119 | published.put(publishedName, new PublishedObject(publishedObject)); |
| 120 | |
| 121 | // send out alert and don't wait for response |
| 122 | Client client = new Client(); |
| 123 | client.get(face, getAlert(publishedName)); |
| 124 | logger.debug("Notified channel of new published object: " + publishedName.toUri()); |
| 125 | } |
| 126 | |
| 127 | /** |
| 128 | * Handle incoming requests for published objects; will check the published |
| 129 | * list and respond with objects that exactly match the incoming interest. |
| 130 | * TODO signing, encryption |
| 131 | * |
| 132 | * @param prefix |
| 133 | * @param interest |
| 134 | * @param transport |
| 135 | * @param registeredPrefixId |
| 136 | */ |
| 137 | @Override |
| 138 | public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) { |
| 139 | // check for old published objects |
| 140 | clearOldPublished(); |
| 141 | |
| 142 | // publish object if available |
| 143 | if (published.containsKey(interest.getName())) { |
| 144 | try { |
| 145 | Data data = new Data(interest.getName()); |
| 146 | data.setContent(new Blob("TODO parse object")); |
| 147 | if (publishLifetime != -1) { |
| 148 | data.getMetaInfo().setFreshnessPeriod(publishLifetime); |
| 149 | } |
| 150 | // data.setSignature(...); |
| 151 | transport.send(data.wireEncode().signedBuf()); |
| 152 | logger.debug("Sent data: " + interest.getName().toUri()); |
| 153 | } catch (IOException e) { |
| 154 | logger.error("Failed to send data: " + interest.getName().toUri()); |
| 155 | } |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | /** |
| 160 | * Handle registration failure; this will stop the publisher from sending |
| 161 | * notifications since the routes will not be setup to respond |
| 162 | * |
| 163 | * @param prefix |
| 164 | */ |
| 165 | @Override |
| 166 | public void onRegisterFailed(Name prefix) { |
| 167 | logger.error("Failed to register publisher: " + channel.toUri()); |
| 168 | registered = false; |
| 169 | } |
| 170 | |
| 171 | /** |
| 172 | * Remove any published objects that have outlived their lifetime; a |
| 173 | * lifetime of -1 persists them forever. |
| 174 | */ |
| 175 | public void clearOldPublished() { |
| 176 | if (publishLifetime == -1) { |
| 177 | return; |
| 178 | } |
| 179 | for (Entry<Name, PublishedObject> e : published.entrySet()) { |
| 180 | if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) { |
| 181 | published.remove(e.getKey()); |
| 182 | logger.debug("Removing old published object: " + e.getKey().toUri()); |
| 183 | } |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | /** |
| 188 | * Build an alert Interest to notify subscribers to retrieve data from this |
| 189 | * publisher; the Interest will have the form |
| 190 | * /this/published/channel/ALERT/[encoded channel to request] |
| 191 | * |
| 192 | * @param nameToPublish |
| 193 | * @return |
| 194 | */ |
| 195 | protected Interest getAlert(Name nameToPublish) { |
| 196 | Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode()); |
| 197 | Interest alert = new Interest(alertName); |
| 198 | alert.setMustBeFresh(true); // interest must reach the application subscribers |
| 199 | alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT); |
| 200 | return alert; |
| 201 | } |
| 202 | |
| 203 | /** |
| 204 | * Helper class to track published objects and their timestamps |
| 205 | */ |
| 206 | private class PublishedObject { |
| 207 | |
| 208 | Object publishedObject; |
| 209 | long publishedOn; |
| 210 | |
| 211 | public PublishedObject(Object object) { |
| 212 | publishedObject = object; |
| 213 | } |
| 214 | } |
| 215 | } |