blob: 0ae79b13c995342f7e262d263f4218f343b2733f [file] [log] [blame]
Andrew Brown3f2521a2015-01-17 22:10:15 -08001/*
2 * File channel: Publisher.java
3 *
4 * Purpose: Provide the publisher side of a pub-sub API.
5 *
6 * © Copyright Intel Corporation. All rights reserved.
7 * Intel Corporation, 2200 Mission College Boulevard,
8 * Santa Clara, CA 95052-8119, USA
9 */
10package com.intel.jndn.utils;
11
12import java.io.IOException;
13import java.util.HashMap;
14import java.util.Map.Entry;
15import net.named_data.jndn.Data;
16import net.named_data.jndn.Face;
17import net.named_data.jndn.ForwardingFlags;
18import net.named_data.jndn.Interest;
19import net.named_data.jndn.Name;
20import net.named_data.jndn.OnInterest;
21import net.named_data.jndn.OnRegisterFailed;
22import net.named_data.jndn.security.SecurityException;
23import net.named_data.jndn.transport.Transport;
24import net.named_data.jndn.util.Blob;
25import org.apache.logging.log4j.LogManager;
26import org.apache.logging.log4j.Logger;
27
28/**
29 * This implementation requires both ends (publisher and subscriber) to be
30 * routable to each other. When this class receives data to publish (through
31 * publish()), it sends an alert to all subscribers indicating new data is
32 * available to retrieve; subscribers then retrieve this data using the channel
33 * provided in the alert packet. The flow is as follows: - Publisher listens on
34 * a given channel (e.g. /my/channel) - Subscriber also listens on a given
35 * channel (e.g. /my/channel) - Publisher receives a new Foo to publish -
36 * Publisher notifies subscribers of new data (e.g.
37 * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber
38 * retrieves data (e.g. /my/channel/foo/[timestamp])
39 *
40 * @author Andrew Brown <andrew.brown@intel.com>
41 */
42public class Publisher implements OnInterest, OnRegisterFailed {
43
Andrew Brown7b1daf32015-01-19 16:36:01 -080044 public static final long DEFAULT_TIMEOUT = 2000;
45 private static final Logger logger = LogManager.getLogger();
46 protected HashMap<Class, String> types = new HashMap<>();
47 protected HashMap<Name, PublishedObject> published = new HashMap<>();
48 protected Face face;
49 protected Name channel;
50 protected boolean registered = true;
51 protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives
Andrew Brown3f2521a2015-01-17 22:10:15 -080052
Andrew Brown7b1daf32015-01-19 16:36:01 -080053 /**
54 * Initialize a publisher on the given channel; published objects will persist
55 * as long as this instance lives.
56 *
57 * @param face
58 * @param channel
59 * @param registerOnForwarder set this to false to disable interaction with the forwarder (e.g. as in tests)
60 */
61 public Publisher(final Face face, final Name channel, boolean registerOnForwarder) {
62 this.face = face;
63 this.channel = channel;
64 if (registerOnForwarder) {
65 try {
66 this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
67 } catch (IOException e) {
68 logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
69 } catch (SecurityException e) {
70 logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
71 }
72 }
73 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080074
Andrew Brown7b1daf32015-01-19 16:36:01 -080075 /**
76 * Initialize a publisher on the given channel; published objects will persist
77 * as long as this instance lives.
78 *
79 * @param face
80 * @param channel
81 */
82 public Publisher(final Face face, final Name channel) {
83 this(face, channel, true);
84 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080085
Andrew Brown7b1daf32015-01-19 16:36:01 -080086 /**
87 * Initialize a publisher on the given channel; limit the lifetime of
88 * published objects to save memory.
89 *
90 * @param face
91 * @param channel
92 * @param publishLifetime
93 */
94 public Publisher(final Face face, final Name channel, long publishLifetime) {
95 this(face, channel);
96 this.publishLifetime = publishLifetime;
97 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080098
Andrew Brown7b1daf32015-01-19 16:36:01 -080099 /**
100 * Add a type and its alias to this publisher
101 *
102 * @param typeAlias
103 * @param type
104 */
105 public void addType(String typeAlias, Class type) {
106 types.put(type, typeAlias);
107 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800108
Andrew Brown7b1daf32015-01-19 16:36:01 -0800109 /**
110 * Publish the given object by sending an alert to all subscribers
111 *
112 * @param publishedObject
113 */
114 public void publish(Object publishedObject) {
115 // check if this publisher is registered on the face
116 if (!registered) {
117 logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri());
118 return;
119 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800120
Andrew Brown7b1daf32015-01-19 16:36:01 -0800121 // check known types
122 if (!types.containsKey(publishedObject.getClass())) {
123 logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName());
124 return;
125 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800126
Andrew Brown7b1daf32015-01-19 16:36:01 -0800127 // check if old published objects need to be removed
128 clearOldPublished();
Andrew Brown3f2521a2015-01-17 22:10:15 -0800129
Andrew Brown7b1daf32015-01-19 16:36:01 -0800130 // track published objects
131 Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis());
132 published.put(publishedName, new PublishedObject(publishedObject));
Andrew Brown3f2521a2015-01-17 22:10:15 -0800133
Andrew Brown7b1daf32015-01-19 16:36:01 -0800134 // send out alert and don't wait for response
135 Client client = new Client();
136 client.get(face, getAlert(publishedName));
137 logger.debug("Notified channel of new published object: " + publishedName.toUri());
138 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800139
Andrew Brown7b1daf32015-01-19 16:36:01 -0800140 /**
141 * Handle incoming requests for published objects; will check the published
142 * list and respond with objects that exactly match the incoming interest.
143 * TODO signing, encryption
144 *
145 * @param prefix
146 * @param interest
147 * @param transport
148 * @param registeredPrefixId
149 */
150 @Override
151 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
152 // check for old published objects
153 clearOldPublished();
Andrew Brown3f2521a2015-01-17 22:10:15 -0800154
Andrew Brown7b1daf32015-01-19 16:36:01 -0800155 // publish object if available
156 if (published.containsKey(interest.getName())) {
157 try {
158 Data data = new Data(interest.getName());
159 data.setContent(new Blob("TODO parse object"));
160 if (publishLifetime != -1) {
161 data.getMetaInfo().setFreshnessPeriod(publishLifetime);
162 }
163 // data.setSignature(...);
164 transport.send(data.wireEncode().signedBuf());
165 logger.debug("Sent data: " + interest.getName().toUri());
166 } catch (IOException e) {
167 logger.error("Failed to send data: " + interest.getName().toUri());
168 }
169 }
170 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800171
Andrew Brown7b1daf32015-01-19 16:36:01 -0800172 /**
173 * Handle registration failure; this will stop the publisher from sending
174 * notifications since the routes will not be setup to respond
175 *
176 * @param prefix
177 */
178 @Override
179 public void onRegisterFailed(Name prefix) {
180 logger.error("Failed to register publisher: " + channel.toUri());
181 registered = false;
182 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800183
Andrew Brown7b1daf32015-01-19 16:36:01 -0800184 /**
185 * Remove any published objects that have outlived their lifetime; a lifetime
186 * of -1 persists them forever.
187 */
188 public void clearOldPublished() {
189 if (publishLifetime == -1) {
190 return;
191 }
192 for (Entry<Name, PublishedObject> e : published.entrySet()) {
193 if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) {
194 published.remove(e.getKey());
195 logger.debug("Removing old published object: " + e.getKey().toUri());
196 }
197 }
198 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800199
Andrew Brown7b1daf32015-01-19 16:36:01 -0800200 /**
201 * Build an alert Interest to notify subscribers to retrieve data from this
202 * publisher; the Interest will have the form
203 * /this/published/channel/ALERT/[encoded channel to request]
204 *
205 * @param nameToPublish
206 * @return
207 */
208 protected Interest getAlert(Name nameToPublish) {
209 Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode());
210 Interest alert = new Interest(alertName);
211 alert.setMustBeFresh(true); // interest must reach the application subscribers
212 alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT);
213 return alert;
214 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800215
Andrew Brown7b1daf32015-01-19 16:36:01 -0800216 /**
217 * Helper class to track published objects and their timestamps
218 */
219 private class PublishedObject {
Andrew Brown3f2521a2015-01-17 22:10:15 -0800220
Andrew Brown7b1daf32015-01-19 16:36:01 -0800221 Object publishedObject;
222 long publishedOn;
223
224 public PublishedObject(Object object) {
225 publishedObject = object;
226 }
227 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800228}