blob: 15a82fa8407a14068c2389665a9168df7dfa6006 [file] [log] [blame]
Weiqi Shi098f91c2014-07-23 17:41:35 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "watch-handle.hpp"
21
22namespace repo {
23
24static const milliseconds PROCESS_DELETE_TIME(10000);
25static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
26
27WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
28 Scheduler& scheduler, ValidatorConfig& validator)
29 : BaseHandle(face, storageHandle, keyChain, scheduler)
30 , m_validator(validator)
31 , m_interestNum(0)
32 , m_maxInterestNum(0)
33 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
34 , m_watchTimeout(0)
35 , m_startTime(steady_clock::now())
36 , m_size(0)
37{
38}
39
40void
41WatchHandle::deleteProcess(const Name& name)
42{
43 m_processes.erase(name);
44}
45
46// Interest.
47void
48WatchHandle::onInterest(const Name& prefix, const Interest& interest)
49{
50 m_validator.validate(interest,
51 bind(&WatchHandle::onValidated, this, _1, prefix),
52 bind(&WatchHandle::onValidationFailed, this, _1, _2));
53}
54
55void
56WatchHandle::onRegistered(const Name& prefix)
57{
58 getFace().setInterestFilter(Name().append(prefix).append("start"),
59 bind(&WatchHandle::onInterest, this, _1, _2));
60 getFace().setInterestFilter(Name().append(prefix).append("check"),
61 bind(&WatchHandle::onCheckInterest, this, _1, _2));
62 getFace().setInterestFilter(Name().append(prefix).append("stop"),
63 bind(&WatchHandle::onStopInterest, this, _1, _2));
64}
65
66// onRegisterFailed for watch start.
67void
68WatchHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
69{
70 std::cerr << reason << std::endl;
71 throw Error("watch prefix registration failed");
72}
73
74void
75WatchHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
76{
77 RepoCommandParameter parameter;
78 try {
79 extractParameter(*interest, prefix, parameter);
80 }
81 catch (RepoCommandParameter::Error) {
82 negativeReply(*interest, 403);
83 return;
84 }
85
86 processWatchCommand(*interest, parameter);
87}
88
89void WatchHandle::watchStop(const Name& name)
90{
91 m_processes[name].second = false;
92 m_maxInterestNum = 0;
93 m_interestNum = 0;
94 m_startTime = steady_clock::now();
95 m_watchTimeout = milliseconds(0);
96 m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
97 m_size = 0;
98}
99
100void
101WatchHandle::onValidationFailed(const shared_ptr<const Interest>& interest, const string& reason)
102{
103 std::cerr << reason << std::endl;
104 negativeReply(*interest, 401);
105}
106
107void
108WatchHandle::onData(const Interest& interest, ndn::Data& data, const Name& name)
109{
110 m_validator.validate(data,
111 bind(&WatchHandle::onDataValidated, this, interest, _1, name),
112 bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
113}
114
115void
116WatchHandle::onDataValidated(const Interest& interest, const shared_ptr<const Data>& data,
117 const Name& name)
118{
119 if (!m_processes[name].second) {
120 return;
121 }
122 if (getStorageHandle().insertData(*data)) {
123 m_size++;
124 if (!onRunning(name))
125 return;
126
127 Interest fetchInterest(interest.getName());
128 fetchInterest.setSelectors(interest.getSelectors());
129 fetchInterest.setInterestLifetime(m_interestLifetime);
130 fetchInterest.setChildSelector(1);
131
132 // update selectors
133 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
134 if (data->getName().size() == interest.getName().size()) {
135 fetchInterest.setMinSuffixComponents(2);
136 }
137 else {
138 Exclude exclude;
139 if (!interest.getExclude().empty()) {
140 exclude = interest.getExclude();
141 }
142
143 exclude.excludeBefore(data->getName()[interest.getName().size()]);
144 fetchInterest.setExclude(exclude);
145 }
146
147 ++m_interestNum;
148 getFace().expressInterest(fetchInterest,
149 bind(&WatchHandle::onData, this, _1, _2, name),
150 bind(&WatchHandle::onTimeout, this, _1, name));
151 }
152 else {
153 throw Error("Insert into Repo Failed");
154 }
155 m_processes[name].first.setInsertNum(m_size);
156}
157
158void
159WatchHandle::onDataValidationFailed(const Interest& interest, const shared_ptr<const Data>& data,
160 const std::string& reason, const Name& name)
161{
162 std::cerr << reason << std::endl;
163 if (!m_processes[name].second) {
164 return;
165 }
166 if (!onRunning(name))
167 return;
168
169 Interest fetchInterest(interest.getName());
170 fetchInterest.setSelectors(interest.getSelectors());
171 fetchInterest.setInterestLifetime(m_interestLifetime);
172 fetchInterest.setChildSelector(1);
173
174 // update selectors
175 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
176 if (data->getName().size() == interest.getName().size()) {
177 fetchInterest.setMinSuffixComponents(2);
178 }
179 else {
180 Exclude exclude;
181 if (!interest.getExclude().empty()) {
182 exclude = interest.getExclude();
183 }
184 // Only exclude this data since other data whose names are smaller may be validated and satisfied
185 exclude.excludeBefore(data->getName()[interest.getName().size()]);
186 fetchInterest.setExclude(exclude);
187 }
188
189 ++m_interestNum;
190 getFace().expressInterest(fetchInterest,
191 bind(&WatchHandle::onData, this, _1, _2, name),
192 bind(&WatchHandle::onTimeout, this, _1, name));
193}
194
195void
196WatchHandle::onTimeout(const ndn::Interest& interest, const Name& name)
197{
198 std::cerr << "Timeout" << std::endl;
199 if (!m_processes[name].second) {
200 return;
201 }
202 if (!onRunning(name))
203 return;
204 // selectors do not need to be updated
205 Interest fetchInterest(interest.getName());
206 fetchInterest.setSelectors(interest.getSelectors());
207 fetchInterest.setInterestLifetime(m_interestLifetime);
208 fetchInterest.setChildSelector(1);
209
210 ++m_interestNum;
211 getFace().expressInterest(fetchInterest,
212 bind(&WatchHandle::onData, this, _1, _2, name),
213 bind(&WatchHandle::onTimeout, this, _1, name));
214
215}
216
217void
218WatchHandle::listen(const Name& prefix)
219{
220 Name baseWatchPrefix(prefix);
221 baseWatchPrefix.append("watch");
222 getFace().registerPrefix(baseWatchPrefix,
223 bind(&WatchHandle::onRegistered, this, _1),
224 bind(&WatchHandle::onRegisterFailed, this, _1, _2));
225}
226
227void
228WatchHandle::onStopInterest(const Name& prefix, const Interest& interest)
229{
230 m_validator.validate(interest,
231 bind(&WatchHandle::onStopValidated, this, _1, prefix),
232 bind(&WatchHandle::onStopValidationFailed, this, _1, _2));
233}
234
235void
236WatchHandle::onStopValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
237{
238 RepoCommandParameter parameter;
239 try {
240 extractParameter(*interest, prefix, parameter);
241 }
242 catch (RepoCommandParameter::Error) {
243 negativeReply(*interest, 403);
244 return;
245 }
246
247 watchStop(parameter.getName());
248 negativeReply(*interest, 101);
249}
250
251void
252WatchHandle::onStopValidationFailed(const shared_ptr<const Interest>& interest,
253 const std::string& reason)
254{
255 std::cerr << reason << std::endl;
256 negativeReply(*interest, 401);
257}
258
259void
260WatchHandle::onCheckInterest(const Name& prefix, const Interest& interest)
261{
262 m_validator.validate(interest,
263 bind(&WatchHandle::onCheckValidated, this, _1, prefix),
264 bind(&WatchHandle::onCheckValidationFailed, this, _1, _2));
265}
266
267void
268WatchHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
269{
270 RepoCommandParameter parameter;
271 try {
272 extractParameter(*interest, prefix, parameter);
273 }
274 catch (RepoCommandParameter::Error) {
275 negativeReply(*interest, 403);
276 return;
277 }
278
279 if (!parameter.hasName()) {
280 negativeReply(*interest, 403);
281 return;
282 }
283 //check whether this process exists
284 Name name = parameter.getName();
285 if (m_processes.count(name) == 0) {
286 std::cerr << "no such process name: " << name << std::endl;
287 negativeReply(*interest, 404);
288 return;
289 }
290
291 RepoCommandResponse& response = m_processes[name].first;
292 if (!m_processes[name].second) {
293 response.setStatusCode(101);
294 }
295
296 reply(*interest, response);
297
298}
299
300void
301WatchHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
302 const std::string& reason)
303{
304 std::cerr << reason << std::endl;
305 negativeReply(*interest, 401);
306}
307
308void
309WatchHandle::deferredDeleteProcess(const Name& name)
310{
311 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
312 ndn::bind(&WatchHandle::deleteProcess, this, name));
313}
314
315void
316WatchHandle::processWatchCommand(const Interest& interest,
317 RepoCommandParameter& parameter)
318{
319 // if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
320 if (parameter.hasWatchTimeout()) {
321 m_watchTimeout = parameter.getWatchTimeout();
322 }
323 else {
324 m_watchTimeout = milliseconds(0);
325 }
326
327 // if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
328 if (parameter.hasMaxInterestNum()) {
329 m_maxInterestNum = parameter.getMaxInterestNum();
330 }
331 else {
332 m_maxInterestNum = 0;
333 }
334
335 if (parameter.hasInterestLifetime()) {
336 m_interestLifetime = parameter.getInterestLifetime();
337 }
338
339 reply(interest, RepoCommandResponse().setStatusCode(100));
340
341 m_processes[parameter.getName()] =
342 std::make_pair(RepoCommandResponse().setStatusCode(300), true);
343 Interest fetchInterest(parameter.getName());
344 if (parameter.hasSelectors()) {
345 fetchInterest.setSelectors(parameter.getSelectors());
346 }
347 fetchInterest.setChildSelector(1);
348 fetchInterest.setInterestLifetime(m_interestLifetime);
349 m_startTime = steady_clock::now();
350 m_interestNum++;
351 getFace().expressInterest(fetchInterest,
352 bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
353 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
354}
355
356
357void
358WatchHandle::negativeReply(const Interest& interest, int statusCode)
359{
360 RepoCommandResponse response;
361 response.setStatusCode(statusCode);
362 reply(interest, response);
363}
364
365bool
366WatchHandle::onRunning(const Name& name)
367{
368 bool isTimeout = (m_watchTimeout != milliseconds::zero() &&
369 steady_clock::now() - m_startTime > m_watchTimeout);
370 bool isMaxInterest = m_interestNum >= m_maxInterestNum && m_maxInterestNum != 0;
371 if (isTimeout || isMaxInterest) {
372 deferredDeleteProcess(name);
373 watchStop(name);
374 return false;
375 }
376 return true;
377}
378
379} //namespace repo