Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 1 | /* |
| 2 | * File channel: Publisher.java |
| 3 | * |
| 4 | * Purpose: Provide the publisher side of a pub-sub API. |
| 5 | * |
| 6 | * © Copyright Intel Corporation. All rights reserved. |
| 7 | * Intel Corporation, 2200 Mission College Boulevard, |
| 8 | * Santa Clara, CA 95052-8119, USA |
| 9 | */ |
| 10 | package com.intel.jndn.utils; |
| 11 | |
| 12 | import java.io.IOException; |
| 13 | import java.util.HashMap; |
| 14 | import java.util.Map.Entry; |
| 15 | import net.named_data.jndn.Data; |
| 16 | import net.named_data.jndn.Face; |
| 17 | import net.named_data.jndn.ForwardingFlags; |
| 18 | import net.named_data.jndn.Interest; |
| 19 | import net.named_data.jndn.Name; |
| 20 | import net.named_data.jndn.OnInterest; |
| 21 | import net.named_data.jndn.OnRegisterFailed; |
| 22 | import net.named_data.jndn.security.SecurityException; |
| 23 | import net.named_data.jndn.transport.Transport; |
| 24 | import net.named_data.jndn.util.Blob; |
| 25 | import org.apache.logging.log4j.LogManager; |
| 26 | import org.apache.logging.log4j.Logger; |
| 27 | |
| 28 | /** |
| 29 | * This implementation requires both ends (publisher and subscriber) to be |
| 30 | * routable to each other. When this class receives data to publish (through |
| 31 | * publish()), it sends an alert to all subscribers indicating new data is |
| 32 | * available to retrieve; subscribers then retrieve this data using the channel |
| 33 | * provided in the alert packet. The flow is as follows: - Publisher listens on |
| 34 | * a given channel (e.g. /my/channel) - Subscriber also listens on a given |
| 35 | * channel (e.g. /my/channel) - Publisher receives a new Foo to publish - |
| 36 | * Publisher notifies subscribers of new data (e.g. |
| 37 | * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber |
| 38 | * retrieves data (e.g. /my/channel/foo/[timestamp]) |
| 39 | * |
| 40 | * @author Andrew Brown <andrew.brown@intel.com> |
| 41 | */ |
| 42 | public class Publisher implements OnInterest, OnRegisterFailed { |
| 43 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 44 | public static final long DEFAULT_TIMEOUT = 2000; |
| 45 | private static final Logger logger = LogManager.getLogger(); |
| 46 | protected HashMap<Class, String> types = new HashMap<>(); |
| 47 | protected HashMap<Name, PublishedObject> published = new HashMap<>(); |
| 48 | protected Face face; |
| 49 | protected Name channel; |
| 50 | protected boolean registered = true; |
| 51 | protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 52 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 53 | /** |
| 54 | * Initialize a publisher on the given channel; published objects will persist |
| 55 | * as long as this instance lives. |
| 56 | * |
| 57 | * @param face |
| 58 | * @param channel |
| 59 | * @param registerOnForwarder set this to false to disable interaction with the forwarder (e.g. as in tests) |
| 60 | */ |
| 61 | public Publisher(final Face face, final Name channel, boolean registerOnForwarder) { |
| 62 | this.face = face; |
| 63 | this.channel = channel; |
| 64 | if (registerOnForwarder) { |
| 65 | try { |
| 66 | this.face.registerPrefix(this.channel, this, this, new ForwardingFlags()); |
| 67 | } catch (IOException e) { |
| 68 | logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e); |
| 69 | } catch (SecurityException e) { |
| 70 | logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e); |
| 71 | } |
| 72 | } |
| 73 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 74 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 75 | /** |
| 76 | * Initialize a publisher on the given channel; published objects will persist |
| 77 | * as long as this instance lives. |
| 78 | * |
| 79 | * @param face |
| 80 | * @param channel |
| 81 | */ |
| 82 | public Publisher(final Face face, final Name channel) { |
| 83 | this(face, channel, true); |
| 84 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 85 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 86 | /** |
| 87 | * Initialize a publisher on the given channel; limit the lifetime of |
| 88 | * published objects to save memory. |
| 89 | * |
| 90 | * @param face |
| 91 | * @param channel |
| 92 | * @param publishLifetime |
| 93 | */ |
| 94 | public Publisher(final Face face, final Name channel, long publishLifetime) { |
| 95 | this(face, channel); |
| 96 | this.publishLifetime = publishLifetime; |
| 97 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 98 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 99 | /** |
| 100 | * Add a type and its alias to this publisher |
| 101 | * |
| 102 | * @param typeAlias |
| 103 | * @param type |
| 104 | */ |
| 105 | public void addType(String typeAlias, Class type) { |
| 106 | types.put(type, typeAlias); |
| 107 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 108 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 109 | /** |
| 110 | * Publish the given object by sending an alert to all subscribers |
| 111 | * |
| 112 | * @param publishedObject |
| 113 | */ |
| 114 | public void publish(Object publishedObject) { |
| 115 | // check if this publisher is registered on the face |
| 116 | if (!registered) { |
| 117 | logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri()); |
| 118 | return; |
| 119 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 120 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 121 | // check known types |
| 122 | if (!types.containsKey(publishedObject.getClass())) { |
| 123 | logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName()); |
| 124 | return; |
| 125 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 126 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 127 | // check if old published objects need to be removed |
| 128 | clearOldPublished(); |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 129 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 130 | // track published objects |
| 131 | Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis()); |
| 132 | published.put(publishedName, new PublishedObject(publishedObject)); |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 133 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 134 | // send out alert and don't wait for response |
| 135 | Client client = new Client(); |
| 136 | client.get(face, getAlert(publishedName)); |
| 137 | logger.debug("Notified channel of new published object: " + publishedName.toUri()); |
| 138 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 139 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 140 | /** |
| 141 | * Handle incoming requests for published objects; will check the published |
| 142 | * list and respond with objects that exactly match the incoming interest. |
| 143 | * TODO signing, encryption |
| 144 | * |
| 145 | * @param prefix |
| 146 | * @param interest |
| 147 | * @param transport |
| 148 | * @param registeredPrefixId |
| 149 | */ |
| 150 | @Override |
| 151 | public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) { |
| 152 | // check for old published objects |
| 153 | clearOldPublished(); |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 154 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 155 | // publish object if available |
| 156 | if (published.containsKey(interest.getName())) { |
| 157 | try { |
| 158 | Data data = new Data(interest.getName()); |
| 159 | data.setContent(new Blob("TODO parse object")); |
| 160 | if (publishLifetime != -1) { |
| 161 | data.getMetaInfo().setFreshnessPeriod(publishLifetime); |
| 162 | } |
| 163 | // data.setSignature(...); |
| 164 | transport.send(data.wireEncode().signedBuf()); |
| 165 | logger.debug("Sent data: " + interest.getName().toUri()); |
| 166 | } catch (IOException e) { |
| 167 | logger.error("Failed to send data: " + interest.getName().toUri()); |
| 168 | } |
| 169 | } |
| 170 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 171 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 172 | /** |
| 173 | * Handle registration failure; this will stop the publisher from sending |
| 174 | * notifications since the routes will not be setup to respond |
| 175 | * |
| 176 | * @param prefix |
| 177 | */ |
| 178 | @Override |
| 179 | public void onRegisterFailed(Name prefix) { |
| 180 | logger.error("Failed to register publisher: " + channel.toUri()); |
| 181 | registered = false; |
| 182 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 183 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 184 | /** |
| 185 | * Remove any published objects that have outlived their lifetime; a lifetime |
| 186 | * of -1 persists them forever. |
| 187 | */ |
| 188 | public void clearOldPublished() { |
| 189 | if (publishLifetime == -1) { |
| 190 | return; |
| 191 | } |
| 192 | for (Entry<Name, PublishedObject> e : published.entrySet()) { |
| 193 | if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) { |
| 194 | published.remove(e.getKey()); |
| 195 | logger.debug("Removing old published object: " + e.getKey().toUri()); |
| 196 | } |
| 197 | } |
| 198 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 199 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 200 | /** |
| 201 | * Build an alert Interest to notify subscribers to retrieve data from this |
| 202 | * publisher; the Interest will have the form |
| 203 | * /this/published/channel/ALERT/[encoded channel to request] |
| 204 | * |
| 205 | * @param nameToPublish |
| 206 | * @return |
| 207 | */ |
| 208 | protected Interest getAlert(Name nameToPublish) { |
| 209 | Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode()); |
| 210 | Interest alert = new Interest(alertName); |
| 211 | alert.setMustBeFresh(true); // interest must reach the application subscribers |
| 212 | alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT); |
| 213 | return alert; |
| 214 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 215 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 216 | /** |
| 217 | * Helper class to track published objects and their timestamps |
| 218 | */ |
| 219 | private class PublishedObject { |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 220 | |
Andrew Brown | 7b1daf3 | 2015-01-19 16:36:01 -0800 | [diff] [blame^] | 221 | Object publishedObject; |
| 222 | long publishedOn; |
| 223 | |
| 224 | public PublishedObject(Object object) { |
| 225 | publishedObject = object; |
| 226 | } |
| 227 | } |
Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame] | 228 | } |