blob: 34259502e484a81d4d75389755d1949b9158a487 [file] [log] [blame]
Andrew Brown3f2521a2015-01-17 22:10:15 -08001/*
2 * File name: Client.java
3 *
4 * Purpose: Provide a client to simplify information retrieval over the NDN
5 * network.
6 *
7 * © Copyright Intel Corporation. All rights reserved.
8 * Intel Corporation, 2200 Mission College Boulevard,
9 * Santa Clara, CA 95052-8119, USA
10 */
11package com.intel.jndn.utils;
12
13import java.io.IOException;
14import net.named_data.jndn.Data;
15import net.named_data.jndn.Face;
16import net.named_data.jndn.ForwardingFlags;
17import net.named_data.jndn.Interest;
18import net.named_data.jndn.Name;
19import net.named_data.jndn.OnData;
20import net.named_data.jndn.OnInterest;
21import net.named_data.jndn.OnRegisterFailed;
22import net.named_data.jndn.OnTimeout;
23import net.named_data.jndn.encoding.EncodingException;
24import net.named_data.jndn.transport.Transport;
25import org.apache.logging.log4j.LogManager;
26import org.apache.logging.log4j.Logger;
27
28/**
Andrew Brown070dc892015-01-21 09:55:12 -080029 * Provide a client to simplify information retrieval over the NDN network.
Andrew Brown3f2521a2015-01-17 22:10:15 -080030 *
31 * @author Andrew Brown <andrew.brown@intel.com>
32 */
33public class Client {
34
Andrew Brown7b1daf32015-01-19 16:36:01 -080035 public static final long DEFAULT_SLEEP_TIME = 20;
36 public static final long DEFAULT_TIMEOUT = 2000;
37 private static final Logger logger = LogManager.getLogger();
Andrew Brown070dc892015-01-21 09:55:12 -080038 private static Client defaultInstance;
39
40 /**
41 * Singleton access for simpler client use
42 *
43 * @return
44 */
45 public static Client getDefault() {
46 if (defaultInstance == null) {
47 defaultInstance = new Client();
48 }
49 return defaultInstance;
50 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080051
Andrew Brown7b1daf32015-01-19 16:36:01 -080052 /**
53 * Synchronously retrieve the Data for an Interest; this will block until
54 * complete (i.e. either data is received or the interest times out).
55 *
56 * @param face
57 * @param interest
58 * @return Data packet or null
59 */
60 public Data getSync(Face face, Interest interest) {
61 // setup event
62 long startTime = System.currentTimeMillis();
Andrew Brown070dc892015-01-21 09:55:12 -080063 final ClientEvent event = new ClientEvent(); // this event is used without observer/observables for speed; just serves as a final reference into the callbacks
Andrew Brown3f2521a2015-01-17 22:10:15 -080064
Andrew Brown7b1daf32015-01-19 16:36:01 -080065 // send interest
66 try {
67 face.expressInterest(interest, new OnData() {
68 @Override
69 public void onData(Interest interest, Data data) {
Andrew Brown070dc892015-01-21 09:55:12 -080070 event.fromPacket(data);
Andrew Brown7b1daf32015-01-19 16:36:01 -080071 }
72 }, new OnTimeout() {
73 @Override
74 public void onTimeout(Interest interest) {
Andrew Brown070dc892015-01-21 09:55:12 -080075 event.fromPacket(new Exception("Interest timed out: " + interest.getName().toUri()));
Andrew Brown7b1daf32015-01-19 16:36:01 -080076 }
77 });
78 } catch (IOException e) {
79 logger.warn("IO failure while sending interest.", e);
80 return null;
81 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080082
Andrew Brown070dc892015-01-21 09:55:12 -080083 // process eventCount until a response is received or timeout
Andrew Brown7b1daf32015-01-19 16:36:01 -080084 while (event.getPacket() == null) {
85 try {
86 synchronized (face) {
87 face.processEvents();
88 }
89 } catch (IOException | EncodingException e) {
90 logger.warn("Failed to process events.", e);
91 return null;
92 }
93 sleep();
94 }
Andrew Brown3f2521a2015-01-17 22:10:15 -080095
Andrew Brown7b1daf32015-01-19 16:36:01 -080096 // return
97 logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
98 return (event.isSuccess()) ? (Data) event.getPacket() : null;
99 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800100
Andrew Brown7b1daf32015-01-19 16:36:01 -0800101 /**
102 * Synchronously retrieve the Data for a Name using a default interest (e.g. 2
103 * second timeout); this will block until complete (i.e. either data is
104 * received or the interest times out).
105 *
106 * @param face
107 * @param name
108 * @return
109 */
110 public Data getSync(Face face, Name name) {
111 return getSync(face, getDefaultInterest(name));
112 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800113
Andrew Brown7b1daf32015-01-19 16:36:01 -0800114 /**
115 * Asynchronously retrieve the Data for a given interest; use the returned
Andrew Brown070dc892015-01-21 09:55:12 -0800116 * ClientObserver to handle the Data when it arrives. For example (with lambdas):
117 * <pre><code>
118 * Client.getDefault().get(face, interest).then((event) -> doSomething(event));
119 * </code></pre>
120 *
121 * If you want to block until the response returns, try something like:
122 * <pre><code>
123 * ClientObserver observer = Client.getDefault().get(face, interest);
124 * while(observer.eventCount() == 0){
125 * Thread.sleep(50);
126 * }
127 * doSomething(observer.getFirst());
128 * </code></pre>
Andrew Brown7b1daf32015-01-19 16:36:01 -0800129 *
130 * @param face
131 * @param interest
132 * @return
133 */
134 public ClientObserver get(final Face face, final Interest interest) {
135 // setup observer
136 final ClientObserver observer = new ClientObserver();
137 final ClientObservable eventHandler = new ClientObservable();
138 eventHandler.addObserver(observer);
Andrew Brown3f2521a2015-01-17 22:10:15 -0800139
Andrew Brown7b1daf32015-01-19 16:36:01 -0800140 // setup background thread
141 Thread backgroundThread = new Thread(new Runnable() {
142 @Override
143 public void run() {
144 // send interest
145 try {
146 face.expressInterest(interest, eventHandler, eventHandler);
147 } catch (IOException e) {
148 logger.warn("IO failure while sending interest.", e);
149 eventHandler.notify(e);
150 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800151
Andrew Brown070dc892015-01-21 09:55:12 -0800152 // process eventCount until a response is received or timeout
153 while (observer.dataCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
Andrew Brown7b1daf32015-01-19 16:36:01 -0800154 try {
155 synchronized (face) {
156 face.processEvents();
157 }
158 } catch (IOException | EncodingException e) {
159 logger.warn("Failed to process events.", e);
160 eventHandler.notify(e);
161 }
162 sleep();
163 }
Andrew Brown070dc892015-01-21 09:55:12 -0800164
Andrew Brown7b1daf32015-01-19 16:36:01 -0800165 // finished
166 logger.trace("Received response; stopping thread.");
167 }
168 });
169 backgroundThread.setName(String.format("Client.get(%s)", interest.getName().toUri()));
170 backgroundThread.setDaemon(true);
171 backgroundThread.start();
Andrew Brown3f2521a2015-01-17 22:10:15 -0800172
Andrew Brown7b1daf32015-01-19 16:36:01 -0800173 // return
174 return observer;
175 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800176
Andrew Brown7b1daf32015-01-19 16:36:01 -0800177 /**
178 * Asynchronously retrieve the Data for a Name using default Interest
179 * parameters; see get(Face, Interest) for examples.
180 *
181 * @param face
182 * @param name
183 * @return
184 */
185 public ClientObserver get(Face face, Name name) {
186 return get(face, getDefaultInterest(name));
187 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800188
Andrew Brown7b1daf32015-01-19 16:36:01 -0800189 /**
190 * Synchronously serve a Data on the given face until one request accesses the
191 * data; will return incoming Interest request. E.g.: Interest request =
192 * Client.putSync(face, data);
193 *
194 * @param face
195 * @param data
196 * @return
197 */
198 public Interest putSync(Face face, final Data data) {
199 // setup event
200 long startTime = System.currentTimeMillis();
201 final String dataName = data.getName().toUri();
Andrew Brown070dc892015-01-21 09:55:12 -0800202 final ClientEvent event = new ClientEvent();
Andrew Brown3f2521a2015-01-17 22:10:15 -0800203
Andrew Brown7b1daf32015-01-19 16:36:01 -0800204 // setup flags
205 ForwardingFlags flags = new ForwardingFlags();
206 flags.setCapture(true);
Andrew Brown3f2521a2015-01-17 22:10:15 -0800207
Andrew Brown7b1daf32015-01-19 16:36:01 -0800208 // register the data name on the face
209 try {
210 face.registerPrefix(data.getName(), new OnInterest() {
211 @Override
212 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
213 try {
214 transport.send(data.wireEncode().buf());
215 logger.debug("Sent data: " + dataName);
216 event.fromPacket(interest);
217 } catch (IOException e) {
218 logger.error("Failed to send data for: " + dataName);
219 event.fromPacket(e);
220 }
221 }
222 }, new OnRegisterFailed() {
223 @Override
224 public void onRegisterFailed(Name prefix) {
225 event.fromPacket(new Exception("Failed to register name: " + dataName));
226 }
227 }, flags);
228 logger.info("Registered data: " + dataName);
229 } catch (IOException e) {
230 logger.error("Could not connect to face to register prefix: " + dataName, e);
231 event.fromPacket(e);
232 } catch (net.named_data.jndn.security.SecurityException e) {
233 logger.error("Error registering prefix: " + dataName, e);
234 event.fromPacket(e);
235 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800236
Andrew Brown070dc892015-01-21 09:55:12 -0800237 // process eventCount until one response is sent or error
Andrew Brown7b1daf32015-01-19 16:36:01 -0800238 while (event.getPacket() == null) {
239 try {
240 synchronized (face) {
241 face.processEvents();
242 }
243 } catch (IOException | EncodingException e) {
244 logger.warn("Failed to process events.", e);
245 event.fromPacket(e);
246 }
247 sleep();
248 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800249
Andrew Brown7b1daf32015-01-19 16:36:01 -0800250 // return
251 logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
252 return (event.isSuccess()) ? (Interest) event.getPacket() : null;
253 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800254
Andrew Brown7b1daf32015-01-19 16:36:01 -0800255 /**
256 * Asynchronously serve a Data on the given face until an observer stops it.
257 * E.g.: ClientObserver observer = Client.put(face, data); // when finished
258 * serving the data, stop the background thread observer.stop();
259 *
260 * @param face
261 * @param data
262 * @return
263 */
264 public ClientObserver put(final Face face, final Data data) {
265 // setup observer
266 final ClientObserver observer = new ClientObserver();
267 final ClientObservable eventHandler = new ClientObservable();
268 eventHandler.addObserver(observer);
Andrew Brown3f2521a2015-01-17 22:10:15 -0800269
Andrew Brown7b1daf32015-01-19 16:36:01 -0800270 // setup handlers
271 final OnInterest interestHandler = new OnInterest() {
272 @Override
273 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
274 try {
275 transport.send(data.wireEncode().buf());
276 } catch (IOException e) {
277 logger.error("Failed to send data for: " + data.getName().toUri());
278 eventHandler.onError(e);
279 }
280 }
281 };
282 final OnRegisterFailed failureHandler = new OnRegisterFailed() {
283 @Override
284 public void onRegisterFailed(Name prefix) {
285 logger.error("Failed to register name to put: " + data.getName().toUri());
286 eventHandler.onError(new Exception("Failed to register name to put: " + data.getName().toUri()));
287 }
288 };
289 final ForwardingFlags flags = new ForwardingFlags();
290 flags.setCapture(true);
Andrew Brown3f2521a2015-01-17 22:10:15 -0800291
Andrew Brown7b1daf32015-01-19 16:36:01 -0800292 // setup background thread
293 Thread backgroundThread = new Thread(new Runnable() {
294 @Override
295 public void run() {
296 // register name on the face
297 try {
298 face.registerPrefix(data.getName(), interestHandler, failureHandler, flags);
299 logger.info("Registered data : " + data.getName().toUri());
300 } catch (IOException e) {
301 logger.error("Could not connect to face to register prefix: " + data.getName().toUri(), e);
302 eventHandler.onError(e);
303 } catch (net.named_data.jndn.security.SecurityException e) {
304 logger.error("Error registering prefix: " + data.getName().toUri(), e);
305 eventHandler.onError(e);
306 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800307
Andrew Brown070dc892015-01-21 09:55:12 -0800308 // process eventCount until a request is received
309 while (observer.interestCount() == 0 && observer.errorCount() == 0 && !observer.mustStop()) {
Andrew Brown7b1daf32015-01-19 16:36:01 -0800310 try {
311 synchronized (face) {
312 face.processEvents();
313 }
314 } catch (IOException | EncodingException e) {
315 logger.warn("Failed to process events.", e);
Andrew Brown070dc892015-01-21 09:55:12 -0800316 observer.update(eventHandler, new ClientEvent());
Andrew Brown7b1daf32015-01-19 16:36:01 -0800317 }
318 sleep();
319 }
320 }
321 });
322 backgroundThread.setName(String.format("Client.put(%s)", data.getName().toUri()));
323 backgroundThread.setDaemon(true);
324 backgroundThread.start();
Andrew Brown3f2521a2015-01-17 22:10:15 -0800325
Andrew Brown7b1daf32015-01-19 16:36:01 -0800326 return observer;
327 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800328
Andrew Brown7b1daf32015-01-19 16:36:01 -0800329 /**
330 * Put the current thread to sleep to allow time for IO
331 */
332 protected void sleep() {
333 try {
334 Thread.currentThread().sleep(DEFAULT_SLEEP_TIME);
335 } catch (InterruptedException e) {
336 logger.error("Event loop interrupted.", e);
337 }
338 }
339
340 /**
341 * Create a default interest for a given Name using some common settings: -
342 * lifetime: 2 seconds
343 *
344 * @param name
345 * @return
346 */
347 public Interest getDefaultInterest(Name name) {
348 Interest interest = new Interest(name, DEFAULT_TIMEOUT);
349 return interest;
350 }
Andrew Brown3f2521a2015-01-17 22:10:15 -0800351}