blob: 23e8cf64d39ff20f765aa1c18cdeaebe56c8a01c [file] [log] [blame]
Andrew Brown3f2521a2015-01-17 22:10:15 -08001/*
2 * File name: Subscriber.java
3 *
4 * Purpose: Provide the subscriber side of a pub-sub API.
5 *
6 * © Copyright Intel Corporation. All rights reserved.
7 * Intel Corporation, 2200 Mission College Boulevard,
8 * Santa Clara, CA 95052-8119, USA
9 */
10package com.intel.jndn.utils;
11
12import java.io.IOException;
13import java.util.HashMap;
14import net.named_data.jndn.Data;
15import net.named_data.jndn.Face;
16import net.named_data.jndn.ForwardingFlags;
17import net.named_data.jndn.Interest;
18import net.named_data.jndn.Name;
19import net.named_data.jndn.Name.Component;
20import net.named_data.jndn.OnInterest;
21import net.named_data.jndn.OnRegisterFailed;
22import net.named_data.jndn.encoding.EncodingException;
23import net.named_data.jndn.transport.Transport;
24import org.apache.logging.log4j.LogManager;
25import org.apache.logging.log4j.Logger;
26
27/**
28 * This implementation requires both ends (publisher and subscriber) to be
29 * routable to each other. When this class receives data to publish (through
30 * publish()), it sends an alert to all subscribers indicating new data is
31 * available to retrieve; subscribers then retrieve this data using the channel
32 * provided in the alert packet. The flow is as follows: - Publisher listens on
33 * a given channel (e.g. /my/channel) - Subscriber also listens on a given
34 * channel (e.g. /my/channel) - Publisher receives a new Foo to publish -
35 * Publisher notifies subscribers of new data (e.g.
36 * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber
37 * retrieves data (e.g. /my/channel/foo/[timestamp])
38 *
39 * @author Andrew Brown <andrew.brown@intel.com>
40 */
41public class Subscriber implements OnInterest, OnRegisterFailed {
42
43 public static final long DEFAULT_TIMEOUT = 2000;
44 private static final Logger logger = LogManager.getLogger();
45 protected HashMap<String, Class> types = new HashMap<>();
46 protected HashMap<Class, OnPublish> handlers = new HashMap<>();
47 protected Face face;
48 protected Name channel;
49
50 /**
51 * Initialize a publisher on the given channel; published objects will
52 * persist as long as this instance lives.
53 *
54 * @param face
55 * @param channel
56 */
57 public Subscriber(final Face face, final Name channel) {
58 this.face = face;
59 this.channel = channel;
60 try {
61 this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
62 } catch (IOException e) {
63 logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
64 } catch (net.named_data.jndn.security.SecurityException e) {
65 logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
66 }
67 }
68
69 /**
70 * Add a type and its alias to this publisher
71 *
72 * @param typeAlias
73 * @param type
74 */
75 public void addType(String typeAlias, Class type) {
76 types.put(typeAlias, type);
77 }
78
79
80 /**
81 * Functional interface defining action to take upon receipt of a published
82 * object
83 *
84 * @param <T>
85 */
86 public interface OnPublish<T>{
87 public void onPublish(T publishedObject);
88 }
89
90 /**
91 * Register a handler for receipt of a certain published type
92 *
93 * @param type
94 * @param handler
95 */
96 public void on(Class type, OnPublish handler) {
97 handlers.put(type, handler);
98 }
99
100 /**
101 * Register a handler for receipt of a certain published type
102 *
103 * @param typeAlias
104 * @param handler
105 */
106 public void on(String typeAlias, OnPublish handler) {
107 if (types.containsKey(typeAlias)) {
108 handlers.put(types.get(typeAlias), handler);
109 } else {
110 logger.warn("Unrecognized type (no handler registered): " + typeAlias);
111 }
112 }
113
114 /**
115 * Handle alert notifications for published objects; once the published
116 * object is parsed, this will pass the object to the handle registered
117 * in on().
118 *
119 * @param prefix
120 * @param interest
121 * @param transport
122 * @param registeredPrefixId
123 */
124 @Override
125 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
126 // check format
127 if (!isAlert(interest)) {
128 logger.warn("Incoming interest was not in ALERT format: " + interest.getName().toUri());
129 return;
130 }
131
132 // check signature
133 // TODO
134 // retrieve name
135 Name publishedName = getAlertName(interest);
136 if (publishedName == null) {
137 return;
138 }
139
140 // retrieve data
141 Client client = new Client();
142 Data data = client.getSync(face, publishedName);
143 if (data == null) {
144 logger.warn("Faled to retrieve published object: " + publishedName.toUri());
145 return;
146 }
147
148 // get handler
149 String typeAlias = getPublishedType(data);
150 if (!types.containsKey(typeAlias)) {
151 logger.warn("Type not found: " + typeAlias);
152 return;
153 }
154 Class type = types.get(typeAlias);
155 if (!handlers.containsKey(type)) {
156 logger.warn("No handler found for type: " + typeAlias);
157 return;
158 }
159
160 // build object
161 Object publishedObject = null;
162 // TODO parse object
163
164 // call
165 handlers.get(type).onPublish(publishedObject);
166 }
167
168 /**
169 * Handle registration failure;
170 *
171 * @param prefix
172 */
173 @Override
174 public void onRegisterFailed(Name prefix) {
175 logger.error("Failed to register: " + prefix.toUri());
176 }
177
178 /**
179 * Check if an incoming interest is an alert notification to retrieve data
180 *
181 * @param interest
182 * @return
183 */
184 protected boolean isAlert(Interest interest) {
185 Component alertComponent = interest.getName().get(-2);
186 return alertComponent == null || !alertComponent.equals(new Component("ALERT"));
187 }
188
189 /**
190 * Parse an Interest to retrieve the remote name of the published object to
191 * then request; the Interest must have the form
192 * /this/published/channel/ALERT/[encoded name to request]
193 *
194 * @param interest
195 * @return
196 */
197 protected Name getAlertName(Interest interest) {
198 try {
199 Name publishedName = new Name();
200 publishedName.wireDecode(interest.getName().get(-1).getValue());
201 return publishedName;
202 } catch (EncodingException e) {
203 logger.error("Failed to parse remote published name", e);
204 return null;
205 }
206 }
207
208 /**
209 * Parse the published type from a Data packet; must be the second to last
210 * component of the name
211 *
212 * @param data
213 * @return
214 */
215 protected String getPublishedType(Data data) {
216 return data.getName().get(-2).toEscapedString();
217 }
218}