Andrew Brown | 3f2521a | 2015-01-17 22:10:15 -0800 | [diff] [blame^] | 1 | /* |
| 2 | * File name: Subscriber.java |
| 3 | * |
| 4 | * Purpose: Provide the subscriber side of a pub-sub API. |
| 5 | * |
| 6 | * © Copyright Intel Corporation. All rights reserved. |
| 7 | * Intel Corporation, 2200 Mission College Boulevard, |
| 8 | * Santa Clara, CA 95052-8119, USA |
| 9 | */ |
| 10 | package com.intel.jndn.utils; |
| 11 | |
| 12 | import java.io.IOException; |
| 13 | import java.util.HashMap; |
| 14 | import net.named_data.jndn.Data; |
| 15 | import net.named_data.jndn.Face; |
| 16 | import net.named_data.jndn.ForwardingFlags; |
| 17 | import net.named_data.jndn.Interest; |
| 18 | import net.named_data.jndn.Name; |
| 19 | import net.named_data.jndn.Name.Component; |
| 20 | import net.named_data.jndn.OnInterest; |
| 21 | import net.named_data.jndn.OnRegisterFailed; |
| 22 | import net.named_data.jndn.encoding.EncodingException; |
| 23 | import net.named_data.jndn.transport.Transport; |
| 24 | import org.apache.logging.log4j.LogManager; |
| 25 | import org.apache.logging.log4j.Logger; |
| 26 | |
| 27 | /** |
| 28 | * This implementation requires both ends (publisher and subscriber) to be |
| 29 | * routable to each other. When this class receives data to publish (through |
| 30 | * publish()), it sends an alert to all subscribers indicating new data is |
| 31 | * available to retrieve; subscribers then retrieve this data using the channel |
| 32 | * provided in the alert packet. The flow is as follows: - Publisher listens on |
| 33 | * a given channel (e.g. /my/channel) - Subscriber also listens on a given |
| 34 | * channel (e.g. /my/channel) - Publisher receives a new Foo to publish - |
| 35 | * Publisher notifies subscribers of new data (e.g. |
| 36 | * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber |
| 37 | * retrieves data (e.g. /my/channel/foo/[timestamp]) |
| 38 | * |
| 39 | * @author Andrew Brown <andrew.brown@intel.com> |
| 40 | */ |
| 41 | public class Subscriber implements OnInterest, OnRegisterFailed { |
| 42 | |
| 43 | public static final long DEFAULT_TIMEOUT = 2000; |
| 44 | private static final Logger logger = LogManager.getLogger(); |
| 45 | protected HashMap<String, Class> types = new HashMap<>(); |
| 46 | protected HashMap<Class, OnPublish> handlers = new HashMap<>(); |
| 47 | protected Face face; |
| 48 | protected Name channel; |
| 49 | |
| 50 | /** |
| 51 | * Initialize a publisher on the given channel; published objects will |
| 52 | * persist as long as this instance lives. |
| 53 | * |
| 54 | * @param face |
| 55 | * @param channel |
| 56 | */ |
| 57 | public Subscriber(final Face face, final Name channel) { |
| 58 | this.face = face; |
| 59 | this.channel = channel; |
| 60 | try { |
| 61 | this.face.registerPrefix(this.channel, this, this, new ForwardingFlags()); |
| 62 | } catch (IOException e) { |
| 63 | logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e); |
| 64 | } catch (net.named_data.jndn.security.SecurityException e) { |
| 65 | logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e); |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | /** |
| 70 | * Add a type and its alias to this publisher |
| 71 | * |
| 72 | * @param typeAlias |
| 73 | * @param type |
| 74 | */ |
| 75 | public void addType(String typeAlias, Class type) { |
| 76 | types.put(typeAlias, type); |
| 77 | } |
| 78 | |
| 79 | |
| 80 | /** |
| 81 | * Functional interface defining action to take upon receipt of a published |
| 82 | * object |
| 83 | * |
| 84 | * @param <T> |
| 85 | */ |
| 86 | public interface OnPublish<T>{ |
| 87 | public void onPublish(T publishedObject); |
| 88 | } |
| 89 | |
| 90 | /** |
| 91 | * Register a handler for receipt of a certain published type |
| 92 | * |
| 93 | * @param type |
| 94 | * @param handler |
| 95 | */ |
| 96 | public void on(Class type, OnPublish handler) { |
| 97 | handlers.put(type, handler); |
| 98 | } |
| 99 | |
| 100 | /** |
| 101 | * Register a handler for receipt of a certain published type |
| 102 | * |
| 103 | * @param typeAlias |
| 104 | * @param handler |
| 105 | */ |
| 106 | public void on(String typeAlias, OnPublish handler) { |
| 107 | if (types.containsKey(typeAlias)) { |
| 108 | handlers.put(types.get(typeAlias), handler); |
| 109 | } else { |
| 110 | logger.warn("Unrecognized type (no handler registered): " + typeAlias); |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | /** |
| 115 | * Handle alert notifications for published objects; once the published |
| 116 | * object is parsed, this will pass the object to the handle registered |
| 117 | * in on(). |
| 118 | * |
| 119 | * @param prefix |
| 120 | * @param interest |
| 121 | * @param transport |
| 122 | * @param registeredPrefixId |
| 123 | */ |
| 124 | @Override |
| 125 | public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) { |
| 126 | // check format |
| 127 | if (!isAlert(interest)) { |
| 128 | logger.warn("Incoming interest was not in ALERT format: " + interest.getName().toUri()); |
| 129 | return; |
| 130 | } |
| 131 | |
| 132 | // check signature |
| 133 | // TODO |
| 134 | // retrieve name |
| 135 | Name publishedName = getAlertName(interest); |
| 136 | if (publishedName == null) { |
| 137 | return; |
| 138 | } |
| 139 | |
| 140 | // retrieve data |
| 141 | Client client = new Client(); |
| 142 | Data data = client.getSync(face, publishedName); |
| 143 | if (data == null) { |
| 144 | logger.warn("Faled to retrieve published object: " + publishedName.toUri()); |
| 145 | return; |
| 146 | } |
| 147 | |
| 148 | // get handler |
| 149 | String typeAlias = getPublishedType(data); |
| 150 | if (!types.containsKey(typeAlias)) { |
| 151 | logger.warn("Type not found: " + typeAlias); |
| 152 | return; |
| 153 | } |
| 154 | Class type = types.get(typeAlias); |
| 155 | if (!handlers.containsKey(type)) { |
| 156 | logger.warn("No handler found for type: " + typeAlias); |
| 157 | return; |
| 158 | } |
| 159 | |
| 160 | // build object |
| 161 | Object publishedObject = null; |
| 162 | // TODO parse object |
| 163 | |
| 164 | // call |
| 165 | handlers.get(type).onPublish(publishedObject); |
| 166 | } |
| 167 | |
| 168 | /** |
| 169 | * Handle registration failure; |
| 170 | * |
| 171 | * @param prefix |
| 172 | */ |
| 173 | @Override |
| 174 | public void onRegisterFailed(Name prefix) { |
| 175 | logger.error("Failed to register: " + prefix.toUri()); |
| 176 | } |
| 177 | |
| 178 | /** |
| 179 | * Check if an incoming interest is an alert notification to retrieve data |
| 180 | * |
| 181 | * @param interest |
| 182 | * @return |
| 183 | */ |
| 184 | protected boolean isAlert(Interest interest) { |
| 185 | Component alertComponent = interest.getName().get(-2); |
| 186 | return alertComponent == null || !alertComponent.equals(new Component("ALERT")); |
| 187 | } |
| 188 | |
| 189 | /** |
| 190 | * Parse an Interest to retrieve the remote name of the published object to |
| 191 | * then request; the Interest must have the form |
| 192 | * /this/published/channel/ALERT/[encoded name to request] |
| 193 | * |
| 194 | * @param interest |
| 195 | * @return |
| 196 | */ |
| 197 | protected Name getAlertName(Interest interest) { |
| 198 | try { |
| 199 | Name publishedName = new Name(); |
| 200 | publishedName.wireDecode(interest.getName().get(-1).getValue()); |
| 201 | return publishedName; |
| 202 | } catch (EncodingException e) { |
| 203 | logger.error("Failed to parse remote published name", e); |
| 204 | return null; |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | /** |
| 209 | * Parse the published type from a Data packet; must be the second to last |
| 210 | * component of the name |
| 211 | * |
| 212 | * @param data |
| 213 | * @return |
| 214 | */ |
| 215 | protected String getPublishedType(Data data) { |
| 216 | return data.getName().get(-2).toEscapedString(); |
| 217 | } |
| 218 | } |