blob: 62a6a363a8600ad57504a9d3a6d22ea8142ba8b9 [file] [log] [blame]
Andrew Brown3f2521a2015-01-17 22:10:15 -08001/*
2 * File name: Client.java
3 *
4 * Purpose: Provide a client to simplify information retrieval over the NDN
5 * network.
6 *
7 * © Copyright Intel Corporation. All rights reserved.
8 * Intel Corporation, 2200 Mission College Boulevard,
9 * Santa Clara, CA 95052-8119, USA
10 */
11package com.intel.jndn.utils;
12
13import java.io.IOException;
14import net.named_data.jndn.Data;
15import net.named_data.jndn.Face;
16import net.named_data.jndn.ForwardingFlags;
17import net.named_data.jndn.Interest;
18import net.named_data.jndn.Name;
19import net.named_data.jndn.OnData;
20import net.named_data.jndn.OnInterest;
21import net.named_data.jndn.OnRegisterFailed;
22import net.named_data.jndn.OnTimeout;
23import net.named_data.jndn.encoding.EncodingException;
24import net.named_data.jndn.transport.Transport;
25import org.apache.logging.log4j.LogManager;
26import org.apache.logging.log4j.Logger;
27
28/**
29 *
30 * @author Andrew Brown <andrew.brown@intel.com>
31 */
32public class Client {
33
34 public static final long DEFAULT_SLEEP_TIME = 20;
35 public static final long DEFAULT_TIMEOUT = 2000;
36 private static final Logger logger = LogManager.getLogger();
37
38 /**
39 * Synchronously retrieve the Data for an Interest; this will block until
40 * complete (i.e. either data is received or the interest times out).
41 *
42 * @param face
43 * @param interest
44 * @return Data packet or null
45 */
46 public Data getSync(Face face, Interest interest) {
47 // setup event
48 long startTime = System.currentTimeMillis();
49 final ClientObservableEvent event = new ClientObservableEvent();
50
51 // send interest
52 try {
53 face.expressInterest(interest, new OnData() {
54 @Override
55 public void onData(Interest interest, Data data) {
56 event.setTimestamp(System.currentTimeMillis());
57 event.setSuccess(true);
58 event.setPacket(data);
59 }
60 }, new OnTimeout() {
61 @Override
62 public void onTimeout(Interest interest) {
63 event.setTimestamp(System.currentTimeMillis());
64 event.setSuccess(false);
65 event.setPacket(new Object());
66 }
67 });
68 } catch (IOException e) {
69 logger.warn("IO failure while sending interest.", e);
70 return null;
71 }
72
73 // process events until a response is received or timeout
74 while (event.getPacket() == null) {
75 try {
76 synchronized (face) {
77 face.processEvents();
78 }
79 } catch (IOException | EncodingException e) {
80 logger.warn("Failed to process events.", e);
81 return null;
82 }
83 sleep();
84 }
85
86 // return
87 logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
88 return (event.isSuccess()) ? (Data) event.getPacket() : null;
89 }
90
91 /**
92 * Synchronously retrieve the Data for a Name using a default interest (e.g.
93 * 2 second timeout); this will block until complete (i.e. either data is
94 * received or the interest times out).
95 *
96 * @param face
97 * @param name
98 * @return
99 */
100 public Data getSync(Face face, Name name) {
101 return getSync(face, getDefaultInterest(name));
102 }
103
104 /**
105 * Asynchronously retrieve the Data for a given interest; use the returned
106 * ClientObserver to handle the Data when it arrives. E.g.: Client.get(face,
107 * interest).then((data) -> doSomething(data));
108 *
109 * @param face
110 * @param interest
111 * @return
112 */
113 public ClientObserver get(final Face face, final Interest interest) {
114 // setup observer
115 final ClientObserver observer = new ClientObserver();
116 final ClientObservable eventHandler = new ClientObservable();
117 eventHandler.addObserver(observer);
118
119 // setup background thread
120 Thread backgroundThread = new Thread(new Runnable() {
121 @Override
122 public void run() {
123 // send interest
124 try {
125 face.expressInterest(interest, eventHandler, eventHandler);
126 } catch (IOException e) {
127 logger.warn("IO failure while sending interest.", e);
128 observer.update(eventHandler, new ClientObservableEvent());
129 }
130
131 // process events until a response is received or timeout
132 while (observer.responses() == 0) {
133 try {
134 synchronized (face) {
135 face.processEvents();
136 }
137 } catch (IOException | EncodingException e) {
138 logger.warn("Failed to process events.", e);
139 observer.update(eventHandler, new ClientObservableEvent());
140 }
141 sleep();
142 }
143 }
144 });
145 backgroundThread.setName(String.format("Client.get(%s)", interest.getName().toUri()));
146 backgroundThread.setDaemon(true);
147 backgroundThread.start();
148
149 // return
150 return observer;
151 }
152
153 /**
154 * Asynchronously retrieve the Data for a Name using default Interest
155 * parameters; see get(Face, Interest) for examples.
156 *
157 * @param face
158 * @param name
159 * @return
160 */
161 public ClientObserver get(Face face, Name name) {
162 return get(face, getDefaultInterest(name));
163 }
164
165 /**
166 * Synchronously serve a Data on the given face until one request accesses
167 * the data; will return incoming Interest request. E.g.: Interest request =
168 * Client.putSync(face, data);
169 *
170 * @param face
171 * @param data
172 * @return
173 */
174 public Interest putSync(Face face, final Data data) {
175 // setup event
176 long startTime = System.currentTimeMillis();
177 final String dataName = data.getName().toUri();
178 final ClientObservableEvent event = new ClientObservableEvent();
179
180 // setup flags
181 ForwardingFlags flags = new ForwardingFlags();
182 flags.setCapture(true);
183
184 // register the data name on the face
185 try {
186 face.registerPrefix(data.getName(), new OnInterest() {
187 @Override
188 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
189 try {
190 transport.send(data.wireEncode().buf());
191 logger.debug("Sent data: " + dataName);
192 event.fromPacket(interest);
193 } catch (IOException e) {
194 logger.error("Failed to send data for: " + dataName);
195 event.fromPacket(e);
196 }
197 }
198 }, new OnRegisterFailed() {
199 @Override
200 public void onRegisterFailed(Name prefix) {
201 event.fromPacket(new Exception("Failed to register name: " + dataName));
202 }
203 }, flags);
204 logger.info("Registered data: " + dataName);
205 } catch (IOException e) {
206 logger.error("Could not connect to face to register prefix: " + dataName, e);
207 event.fromPacket(e);
208 } catch (net.named_data.jndn.security.SecurityException e) {
209 logger.error("Error registering prefix: " + dataName, e);
210 event.fromPacket(e);
211 }
212
213 // process events until one response is sent or error
214 while (event.getPacket() == null) {
215 try {
216 synchronized (face) {
217 face.processEvents();
218 }
219 } catch (IOException | EncodingException e) {
220 logger.warn("Failed to process events.", e);
221 event.fromPacket(e);
222 }
223 sleep();
224 }
225
226 // return
227 logger.debug("Request time (ms): " + (event.getTimestamp() - startTime));
228 return (event.isSuccess()) ? (Interest) event.getPacket() : null;
229 }
230
231 /**
232 * Asynchronously serve a Data on the given face until an observer stops it.
233 * E.g.: ClientObserver observer = Client.put(face, data); // when finished
234 * serving the data, stop the background thread observer.stop();
235 *
236 * @param face
237 * @param data
238 * @return
239 */
240 public ClientObserver put(final Face face, final Data data) {
241 // setup observer
242 final ClientObserver observer = new ClientObserver();
243 final ClientObservable eventHandler = new ClientObservable();
244 eventHandler.addObserver(observer);
245
246 // setup handlers
247 final OnInterest interestHandler = new OnInterest() {
248 @Override
249 public void onInterest(Name prefix, Interest interest, Transport transport, long registeredPrefixId) {
250 try {
251 transport.send(data.wireEncode().buf());
252 } catch (IOException e) {
253 logger.error("Failed to send data for: " + data.getName().toUri());
254 eventHandler.onError(e);
255 }
256 }
257 };
258 final OnRegisterFailed failureHandler = new OnRegisterFailed() {
259 @Override
260 public void onRegisterFailed(Name prefix) {
261 logger.error("Failed to register name to put: " + data.getName().toUri());
262 eventHandler.onError(new Exception("Failed to register name to put: " + data.getName().toUri()));
263 }
264 };
265 final ForwardingFlags flags = new ForwardingFlags();
266 flags.setCapture(true);
267
268 // setup background thread
269 Thread backgroundThread = new Thread(new Runnable() {
270 @Override
271 public void run() {
272 // register name on the face
273 try {
274 face.registerPrefix(data.getName(), interestHandler, failureHandler, flags);
275 logger.info("Registered data : " + data.getName().toUri());
276 } catch (IOException e) {
277 logger.error("Could not connect to face to register prefix: " + data.getName().toUri(), e);
278 eventHandler.onError(e);
279 } catch (net.named_data.jndn.security.SecurityException e) {
280 logger.error("Error registering prefix: " + data.getName().toUri(), e);
281 eventHandler.onError(e);
282 }
283
284 // process events until a request is received
285 while (observer.requests() == 0 && observer.errors() == 0 && !observer.mustStop()) {
286 try {
287 synchronized (face) {
288 face.processEvents();
289 }
290 } catch (IOException | EncodingException e) {
291 logger.warn("Failed to process events.", e);
292 observer.update(eventHandler, new ClientObservableEvent());
293 }
294 sleep();
295 }
296 }
297 });
298 backgroundThread.setName(String.format("Client.put(%s)", data.getName().toUri()));
299 backgroundThread.setDaemon(true);
300 backgroundThread.start();
301
302 return observer;
303 }
304
305 /**
306 * Put the current thread to sleep to allow time for IO
307 */
308 protected void sleep() {
309 try {
310 Thread.currentThread().sleep(DEFAULT_SLEEP_TIME);
311 } catch (InterruptedException e) {
312 logger.error("Event loop interrupted.", e);
313 }
314 }
315
316 /**
317 * Create a default interest for a given Name using some common settings: -
318 * lifetime: 2 seconds
319 *
320 * @param name
321 * @return
322 */
323 public Interest getDefaultInterest(Name name) {
324 Interest interest = new Interest(name, DEFAULT_TIMEOUT);
325 return interest;
326 }
327}