blob: df6d37587962f30623cd2ff172d71894e4f88fb7 [file] [log] [blame]
Andrew Brown3f2521a2015-01-17 22:10:15 -08001/*
2 * File name: Subscriber.java
3 *
4 * Purpose: Provide the subscriber side of a pub-sub API.
5 *
6 * © Copyright Intel Corporation. All rights reserved.
7 * Intel Corporation, 2200 Mission College Boulevard,
8 * Santa Clara, CA 95052-8119, USA
9 */
10package com.intel.jndn.utils;
11
12import java.io.IOException;
13import java.util.HashMap;
14import net.named_data.jndn.Data;
15import net.named_data.jndn.Face;
16import net.named_data.jndn.ForwardingFlags;
17import net.named_data.jndn.Interest;
18import net.named_data.jndn.Name;
19import net.named_data.jndn.Name.Component;
20import net.named_data.jndn.OnInterest;
21import net.named_data.jndn.OnRegisterFailed;
22import net.named_data.jndn.encoding.EncodingException;
23import net.named_data.jndn.transport.Transport;
24import org.apache.logging.log4j.LogManager;
25import org.apache.logging.log4j.Logger;
26
27/**
28 * This implementation requires both ends (publisher and subscriber) to be
29 * routable to each other. When this class receives data to publish (through
30 * publish()), it sends an alert to all subscribers indicating new data is
31 * available to retrieve; subscribers then retrieve this data using the channel
32 * provided in the alert packet. The flow is as follows: - Publisher listens on
33 * a given channel (e.g. /my/channel) - Subscriber also listens on a given
34 * channel (e.g. /my/channel) - Publisher receives a new Foo to publish -
35 * Publisher notifies subscribers of new data (e.g.
36 * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber
37 * retrieves data (e.g. /my/channel/foo/[timestamp])
38 *
39 * @author Andrew Brown <andrew.brown@intel.com>
40 */
41public class Subscriber implements OnInterest, OnRegisterFailed {
42
Andrew Brown7b1daf32015-01-19 16:36:01 -080043 public static final long DEFAULT_TIMEOUT = 2000;
44 private static final Logger logger = LogManager.getLogger();
45 protected HashMap<String, Class> types = new HashMap<>();
46 protected HashMap<Class, OnPublish> handlers = new HashMap<>();
47 protected Face face;
48 protected Name channel;
Andrew Brown3f2521a2015-01-17 22:10:15 -080049
Andrew Brown7b1daf32015-01-19 16:36:01 -080050 /**
51 * Initialize a publisher on the given channel; published objects will persist
52 * as long as this instance lives.
53 *
54 * @param face
55 * @param channel
56 * @param registerOnForwarder set this to false to disable interaction with the forwarder (e.g. as in tests)
57 */
58 public Subscriber(final Face face, final Name channel, boolean registerOnForwarder) {
59 this.face = face;
60 this.channel = channel;
61 if (registerOnForwarder) {
62 try {
63 this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
64 } catch (IOException e) {
65 logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
66 } catch (net.named_data.jndn.security.SecurityException e) {
67 logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
68 }
69 }
70 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080071
Andrew Brown7b1daf32015-01-19 16:36:01 -080072 /**
73 * Initialize a publisher on the given channel; published objects will persist
74 * as long as this instance lives.
75 *
76 * @param face
77 * @param channel
78 */
79 public Subscriber(final Face face, final Name channel) {
80 this(face, channel, true);
81 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080082
Andrew Brown7b1daf32015-01-19 16:36:01 -080083 /**
84 * Add a type and its alias to this publisher
85 *
86 * @param typeAlias
87 * @param type
88 */
89 public void addType(String typeAlias, Class type) {
90 types.put(typeAlias, type);
91 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080092
Andrew Brown7b1daf32015-01-19 16:36:01 -080093 /**
94 * Functional interface defining action to take upon receipt of a published
95 * object
96 *
97 * @param <T>
98 */
99 public interface OnPublish<T> {
Andrew Brown3f2521a2015-01-17 22:10:15 -0800100
Andrew Brown7b1daf32015-01-19 16:36:01 -0800101 public void onPublish(T publishedObject);
102 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800103
Andrew Brown7b1daf32015-01-19 16:36:01 -0800104 /**
105 * Register a handler for receipt of a certain published type
106 *
107 * @param <T>
108 * @param type
109 * @param handler
110 */
111 public <T> void on(Class<T> type, OnPublish<T> handler) {
112 handlers.put(type, handler);
113 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800114
Andrew Brown7b1daf32015-01-19 16:36:01 -0800115 /**
116 * Register a handler for receipt of a certain published type
117 *
118 * @param typeAlias
119 * @param handler
120 */
121 public void on(String typeAlias, OnPublish handler) {
122 if (types.containsKey(typeAlias)) {
123 handlers.put(types.get(typeAlias), handler);
124 } else {
125 logger.warn("Unrecognized type (no handler registered): " + typeAlias);
126 }
127 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800128
Andrew Brown7b1daf32015-01-19 16:36:01 -0800129 /**
130 * Handle alert notifications for published objects; once the published object
131 * is parsed, this will pass the object to the handle registered in on().
132 *
133 * @param prefix
134 * @param interest
135 * @param transport
136 * @param registeredPrefixId
137 */
138 @Override
139 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
140 // check format
141 if (!isAlert(interest)) {
142 logger.warn("Incoming interest was not in ALERT format: " + interest.getName().toUri());
143 return;
144 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800145
Andrew Brown7b1daf32015-01-19 16:36:01 -0800146 // check signature
147 // TODO
148 // retrieve name
149 Name publishedName = getAlertName(interest);
150 if (publishedName == null) {
151 return;
152 }
153
154 // retrieve data
155 Client client = new Client();
156 Data data = client.getSync(face, publishedName);
157 if (data == null) {
158 logger.warn("Faled to retrieve published object: " + publishedName.toUri());
159 return;
160 }
161
162 // get handler
163 String typeAlias = getPublishedType(data);
164 if (!types.containsKey(typeAlias)) {
165 logger.warn("Type not found: " + typeAlias);
166 return;
167 }
168 Class type = types.get(typeAlias);
169 if (!handlers.containsKey(type)) {
170 logger.warn("No handler found for type: " + typeAlias);
171 return;
172 }
173
174 // build object
175 Object publishedObject = null;
Andrew Brown3f2521a2015-01-17 22:10:15 -0800176 // TODO parse object
177
Andrew Brown7b1daf32015-01-19 16:36:01 -0800178 // call
179 handlers.get(type).onPublish(type.cast(publishedObject));
180 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800181
Andrew Brown7b1daf32015-01-19 16:36:01 -0800182 /**
183 * Handle registration failure;
184 *
185 * @param prefix
186 */
187 @Override
188 public void onRegisterFailed(Name prefix) {
189 logger.error("Failed to register: " + prefix.toUri());
190 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800191
Andrew Brown7b1daf32015-01-19 16:36:01 -0800192 /**
193 * Check if an incoming interest is an alert notification to retrieve data
194 *
195 * @param interest
196 * @return
197 */
198 protected boolean isAlert(Interest interest) {
199 Component alertComponent = interest.getName().get(-2);
200 return alertComponent == null || !alertComponent.equals(new Component("ALERT"));
201 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800202
Andrew Brown7b1daf32015-01-19 16:36:01 -0800203 /**
204 * Parse an Interest to retrieve the remote name of the published object to
205 * then request; the Interest must have the form
206 * /this/published/channel/ALERT/[encoded name to request]
207 *
208 * @param interest
209 * @return
210 */
211 protected Name getAlertName(Interest interest) {
212 try {
213 Name publishedName = new Name();
214 publishedName.wireDecode(interest.getName().get(-1).getValue());
215 return publishedName;
216 } catch (EncodingException e) {
217 logger.error("Failed to parse remote published name", e);
218 return null;
219 }
220 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800221
Andrew Brown7b1daf32015-01-19 16:36:01 -0800222 /**
223 * Parse the published type from a Data packet; must be the second to last
224 * component of the name
225 *
226 * @param data
227 * @return
228 */
229 protected String getPublishedType(Data data) {
230 return data.getName().get(-2).toEscapedString();
231 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800232}