blob: 862f001ccb8864e441b9930b9902a906838bca66 [file] [log] [blame]
Andrew Brown3f2521a2015-01-17 22:10:15 -08001/*
2 * File channel: Publisher.java
3 *
4 * Purpose: Provide the publisher side of a pub-sub API.
5 *
6 * © Copyright Intel Corporation. All rights reserved.
7 * Intel Corporation, 2200 Mission College Boulevard,
8 * Santa Clara, CA 95052-8119, USA
9 */
10package com.intel.jndn.utils;
11
12import java.io.IOException;
13import java.util.HashMap;
14import java.util.Map.Entry;
15import net.named_data.jndn.Data;
16import net.named_data.jndn.Face;
17import net.named_data.jndn.ForwardingFlags;
18import net.named_data.jndn.Interest;
19import net.named_data.jndn.Name;
20import net.named_data.jndn.OnInterest;
21import net.named_data.jndn.OnRegisterFailed;
22import net.named_data.jndn.security.SecurityException;
23import net.named_data.jndn.transport.Transport;
24import net.named_data.jndn.util.Blob;
25import org.apache.logging.log4j.LogManager;
26import org.apache.logging.log4j.Logger;
27
28/**
29 * This implementation requires both ends (publisher and subscriber) to be
30 * routable to each other. When this class receives data to publish (through
31 * publish()), it sends an alert to all subscribers indicating new data is
32 * available to retrieve; subscribers then retrieve this data using the channel
33 * provided in the alert packet. The flow is as follows: - Publisher listens on
34 * a given channel (e.g. /my/channel) - Subscriber also listens on a given
35 * channel (e.g. /my/channel) - Publisher receives a new Foo to publish -
36 * Publisher notifies subscribers of new data (e.g.
37 * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber
38 * retrieves data (e.g. /my/channel/foo/[timestamp])
39 *
40 * @author Andrew Brown <andrew.brown@intel.com>
41 */
42public class Publisher implements OnInterest, OnRegisterFailed {
43
44 public static final long DEFAULT_TIMEOUT = 2000;
45 private static final Logger logger = LogManager.getLogger();
46 protected HashMap<Class, String> types = new HashMap<>();
47 protected HashMap<Name, PublishedObject> published = new HashMap<>();
48 protected Face face;
49 protected Name channel;
50 protected boolean registered = true;
51 protected long publishLifetime = -1; // in milliseconds, -1 will persist as long as this instance lives
52
53 /**
54 * Initialize a publisher on the given channel; published objects will
55 * persist as long as this instance lives.
56 *
57 * @param face
58 * @param channel
59 */
60 public Publisher(final Face face, final Name channel) {
61 this.face = face;
62 this.channel = channel;
63 try {
64 this.face.registerPrefix(this.channel, this, this, new ForwardingFlags());
65 } catch (IOException e) {
66 logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e);
67 } catch (SecurityException e) {
68 logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e);
69 }
70 }
71
72 /**
73 * Initialize a publisher on the given channel; limit the lifetime of
74 * published objects to save memory.
75 *
76 * @param face
77 * @param channel
78 * @param publishLifetime
79 */
80 public Publisher(final Face face, final Name channel, long publishLifetime) {
81 this(face, channel);
82 this.publishLifetime = publishLifetime;
83 }
84
85 /**
86 * Add a type and its alias to this publisher
87 *
88 * @param typeAlias
89 * @param type
90 */
91 public void addType(String typeAlias, Class type) {
92 types.put(type, typeAlias);
93 }
94
95
96 /**
97 * Publish the given object by sending an alert to all subscribers
98 *
99 * @param publishedObject
100 */
101 public void publish(Object publishedObject) {
102 // check if this publisher is registered on the face
103 if (!registered) {
104 logger.error("Publisher failed to register; cannot publish data to: " + channel.toUri());
105 return;
106 }
107
108 // check known types
109 if (!types.containsKey(publishedObject.getClass())) {
110 logger.error("Attempted (and failed) to publish unknown type: " + publishedObject.getClass().getName());
111 return;
112 }
113
114 // check if old published objects need to be removed
115 clearOldPublished();
116
117 // track published objects
118 Name publishedName = new Name(channel).append(types.get(publishedObject.getClass())).appendTimestamp(System.currentTimeMillis());
119 published.put(publishedName, new PublishedObject(publishedObject));
120
121 // send out alert and don't wait for response
122 Client client = new Client();
123 client.get(face, getAlert(publishedName));
124 logger.debug("Notified channel of new published object: " + publishedName.toUri());
125 }
126
127 /**
128 * Handle incoming requests for published objects; will check the published
129 * list and respond with objects that exactly match the incoming interest.
130 * TODO signing, encryption
131 *
132 * @param prefix
133 * @param interest
134 * @param transport
135 * @param registeredPrefixId
136 */
137 @Override
138 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
139 // check for old published objects
140 clearOldPublished();
141
142 // publish object if available
143 if (published.containsKey(interest.getName())) {
144 try {
145 Data data = new Data(interest.getName());
146 data.setContent(new Blob("TODO parse object"));
147 if (publishLifetime != -1) {
148 data.getMetaInfo().setFreshnessPeriod(publishLifetime);
149 }
150 // data.setSignature(...);
151 transport.send(data.wireEncode().signedBuf());
152 logger.debug("Sent data: " + interest.getName().toUri());
153 } catch (IOException e) {
154 logger.error("Failed to send data: " + interest.getName().toUri());
155 }
156 }
157 }
158
159 /**
160 * Handle registration failure; this will stop the publisher from sending
161 * notifications since the routes will not be setup to respond
162 *
163 * @param prefix
164 */
165 @Override
166 public void onRegisterFailed(Name prefix) {
167 logger.error("Failed to register publisher: " + channel.toUri());
168 registered = false;
169 }
170
171 /**
172 * Remove any published objects that have outlived their lifetime; a
173 * lifetime of -1 persists them forever.
174 */
175 public void clearOldPublished() {
176 if (publishLifetime == -1) {
177 return;
178 }
179 for (Entry<Name, PublishedObject> e : published.entrySet()) {
180 if (System.currentTimeMillis() - e.getValue().publishedOn > publishLifetime) {
181 published.remove(e.getKey());
182 logger.debug("Removing old published object: " + e.getKey().toUri());
183 }
184 }
185 }
186
187 /**
188 * Build an alert Interest to notify subscribers to retrieve data from this
189 * publisher; the Interest will have the form
190 * /this/published/channel/ALERT/[encoded channel to request]
191 *
192 * @param nameToPublish
193 * @return
194 */
195 protected Interest getAlert(Name nameToPublish) {
196 Name alertName = new Name(channel).append("ALERT").append(nameToPublish.wireEncode());
197 Interest alert = new Interest(alertName);
198 alert.setMustBeFresh(true); // interest must reach the application subscribers
199 alert.setInterestLifetimeMilliseconds(DEFAULT_TIMEOUT);
200 return alert;
201 }
202
203 /**
204 * Helper class to track published objects and their timestamps
205 */
206 private class PublishedObject {
207
208 Object publishedObject;
209 long publishedOn;
210
211 public PublishedObject(Object object) {
212 publishedObject = object;
213 }
214 }
215}