| /* |
| * File name: Subscriber.java |
| * |
| * Purpose: Provide the subscriber side of a pub-sub API. |
| * |
| * © Copyright Intel Corporation. All rights reserved. |
| * Intel Corporation, 2200 Mission College Boulevard, |
| * Santa Clara, CA 95052-8119, USA |
| */ |
| package com.intel.jndn.utils; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import net.named_data.jndn.Data; |
| import net.named_data.jndn.Face; |
| import net.named_data.jndn.ForwardingFlags; |
| import net.named_data.jndn.Interest; |
| import net.named_data.jndn.Name; |
| import net.named_data.jndn.Name.Component; |
| import net.named_data.jndn.OnInterest; |
| import net.named_data.jndn.OnRegisterFailed; |
| import net.named_data.jndn.encoding.EncodingException; |
| import net.named_data.jndn.transport.Transport; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| /** |
| * This implementation requires both ends (publisher and subscriber) to be |
| * routable to each other. When this class receives data to publish (through |
| * publish()), it sends an alert to all subscribers indicating new data is |
| * available to retrieve; subscribers then retrieve this data using the channel |
| * provided in the alert packet. The flow is as follows: - Publisher listens on |
| * a given channel (e.g. /my/channel) - Subscriber also listens on a given |
| * channel (e.g. /my/channel) - Publisher receives a new Foo to publish - |
| * Publisher notifies subscribers of new data (e.g. |
| * /my/channel/ALERT/{encoded:/my/channel/foo/[timestamp]} - Subscriber |
| * retrieves data (e.g. /my/channel/foo/[timestamp]) |
| * |
| * @author Andrew Brown <andrew.brown@intel.com> |
| */ |
| public class Subscriber implements OnInterest, OnRegisterFailed { |
| |
| public static final long DEFAULT_TIMEOUT = 2000; |
| private static final Logger logger = LogManager.getLogger(); |
| protected HashMap<String, Class> types = new HashMap<>(); |
| protected HashMap<Class, OnPublish> handlers = new HashMap<>(); |
| protected Face face; |
| protected Name channel; |
| |
| /** |
| * Initialize a publisher on the given channel; published objects will persist |
| * as long as this instance lives. |
| * |
| * @param face |
| * @param channel |
| * @param registerOnForwarder set this to false to disable interaction with the forwarder (e.g. as in tests) |
| */ |
| public Subscriber(final Face face, final Name channel, boolean registerOnForwarder) { |
| this.face = face; |
| this.channel = channel; |
| if (registerOnForwarder) { |
| try { |
| this.face.registerPrefix(this.channel, this, this, new ForwardingFlags()); |
| } catch (IOException e) { |
| logger.error("Failed to send prefix registration for: " + this.channel.toUri(), e); |
| } catch (net.named_data.jndn.security.SecurityException e) { |
| logger.error("Failed to configure security correctly for registration: " + this.channel.toUri(), e); |
| } |
| } |
| } |
| |
| /** |
| * Initialize a publisher on the given channel; published objects will persist |
| * as long as this instance lives. |
| * |
| * @param face |
| * @param channel |
| */ |
| public Subscriber(final Face face, final Name channel) { |
| this(face, channel, true); |
| } |
| |
| /** |
| * Add a type and its alias to this publisher |
| * |
| * @param typeAlias |
| * @param type |
| */ |
| public void addType(String typeAlias, Class type) { |
| types.put(typeAlias, type); |
| } |
| |
| /** |
| * Functional interface defining action to take upon receipt of a published |
| * object |
| * |
| * @param <T> |
| */ |
| public interface OnPublish<T> { |
| |
| public void onPublish(T publishedObject); |
| } |
| |
| /** |
| * Register a handler for receipt of a certain published type |
| * |
| * @param <T> |
| * @param type |
| * @param handler |
| */ |
| public <T> void on(Class<T> type, OnPublish<T> handler) { |
| handlers.put(type, handler); |
| } |
| |
| /** |
| * Register a handler for receipt of a certain published type |
| * |
| * @param typeAlias |
| * @param handler |
| */ |
| public void on(String typeAlias, OnPublish handler) { |
| if (types.containsKey(typeAlias)) { |
| handlers.put(types.get(typeAlias), handler); |
| } else { |
| logger.warn("Unrecognized type (no handler registered): " + typeAlias); |
| } |
| } |
| |
| /** |
| * Handle alert notifications for published objects; once the published object |
| * is parsed, this will pass the object to the handle registered in on(). |
| * |
| * @param prefix |
| * @param interest |
| * @param transport |
| * @param registeredPrefixId |
| */ |
| @Override |
| public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) { |
| // check format |
| if (!isAlert(interest)) { |
| logger.warn("Incoming interest was not in ALERT format: " + interest.getName().toUri()); |
| return; |
| } |
| |
| // check signature |
| // TODO |
| // retrieve name |
| Name publishedName = getAlertName(interest); |
| if (publishedName == null) { |
| return; |
| } |
| |
| // retrieve data |
| Client client = new Client(); |
| Data data = client.getSync(face, publishedName); |
| if (data == null) { |
| logger.warn("Faled to retrieve published object: " + publishedName.toUri()); |
| return; |
| } |
| |
| // get handler |
| String typeAlias = getPublishedType(data); |
| if (!types.containsKey(typeAlias)) { |
| logger.warn("Type not found: " + typeAlias); |
| return; |
| } |
| Class type = types.get(typeAlias); |
| if (!handlers.containsKey(type)) { |
| logger.warn("No handler found for type: " + typeAlias); |
| return; |
| } |
| |
| // build object |
| Object publishedObject = null; |
| // TODO parse object |
| |
| // call |
| handlers.get(type).onPublish(type.cast(publishedObject)); |
| } |
| |
| /** |
| * Handle registration failure; |
| * |
| * @param prefix |
| */ |
| @Override |
| public void onRegisterFailed(Name prefix) { |
| logger.error("Failed to register: " + prefix.toUri()); |
| } |
| |
| /** |
| * Check if an incoming interest is an alert notification to retrieve data |
| * |
| * @param interest |
| * @return |
| */ |
| protected boolean isAlert(Interest interest) { |
| Component alertComponent = interest.getName().get(-2); |
| return alertComponent == null || !alertComponent.equals(new Component("ALERT")); |
| } |
| |
| /** |
| * Parse an Interest to retrieve the remote name of the published object to |
| * then request; the Interest must have the form |
| * /this/published/channel/ALERT/[encoded name to request] |
| * |
| * @param interest |
| * @return |
| */ |
| protected Name getAlertName(Interest interest) { |
| try { |
| Name publishedName = new Name(); |
| publishedName.wireDecode(interest.getName().get(-1).getValue()); |
| return publishedName; |
| } catch (EncodingException e) { |
| logger.error("Failed to parse remote published name", e); |
| return null; |
| } |
| } |
| |
| /** |
| * Parse the published type from a Data packet; must be the second to last |
| * component of the name |
| * |
| * @param data |
| * @return |
| */ |
| protected String getPublishedType(Data data) { |
| return data.getName().get(-2).toEscapedString(); |
| } |
| } |