blob: 9e802120f976e35781d8559a04efa29936389b19 [file] [log] [blame]
Weiqi Shi098f91c2014-07-23 17:41:35 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "watch-handle.hpp"
21
22namespace repo {
23
24static const milliseconds PROCESS_DELETE_TIME(10000);
25static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
26
27WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
28 Scheduler& scheduler, ValidatorConfig& validator)
29 : BaseHandle(face, storageHandle, keyChain, scheduler)
30 , m_validator(validator)
31 , m_interestNum(0)
32 , m_maxInterestNum(0)
33 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
34 , m_watchTimeout(0)
35 , m_startTime(steady_clock::now())
36 , m_size(0)
37{
38}
39
40void
41WatchHandle::deleteProcess(const Name& name)
42{
43 m_processes.erase(name);
44}
45
46// Interest.
47void
48WatchHandle::onInterest(const Name& prefix, const Interest& interest)
49{
50 m_validator.validate(interest,
51 bind(&WatchHandle::onValidated, this, _1, prefix),
52 bind(&WatchHandle::onValidationFailed, this, _1, _2));
53}
54
55void
56WatchHandle::onRegistered(const Name& prefix)
57{
58 getFace().setInterestFilter(Name().append(prefix).append("start"),
59 bind(&WatchHandle::onInterest, this, _1, _2));
60 getFace().setInterestFilter(Name().append(prefix).append("check"),
61 bind(&WatchHandle::onCheckInterest, this, _1, _2));
62 getFace().setInterestFilter(Name().append(prefix).append("stop"),
63 bind(&WatchHandle::onStopInterest, this, _1, _2));
64}
65
66// onRegisterFailed for watch start.
67void
68WatchHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
69{
70 std::cerr << reason << std::endl;
71 throw Error("watch prefix registration failed");
72}
73
74void
75WatchHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
76{
77 RepoCommandParameter parameter;
78 try {
79 extractParameter(*interest, prefix, parameter);
80 }
81 catch (RepoCommandParameter::Error) {
82 negativeReply(*interest, 403);
83 return;
84 }
85
86 processWatchCommand(*interest, parameter);
87}
88
89void WatchHandle::watchStop(const Name& name)
90{
91 m_processes[name].second = false;
92 m_maxInterestNum = 0;
93 m_interestNum = 0;
94 m_startTime = steady_clock::now();
95 m_watchTimeout = milliseconds(0);
96 m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
97 m_size = 0;
98}
99
100void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700101WatchHandle::onValidationFailed(const shared_ptr<const Interest>& interest,
102 const std::string& reason)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700103{
104 std::cerr << reason << std::endl;
105 negativeReply(*interest, 401);
106}
107
108void
109WatchHandle::onData(const Interest& interest, ndn::Data& data, const Name& name)
110{
111 m_validator.validate(data,
112 bind(&WatchHandle::onDataValidated, this, interest, _1, name),
113 bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
114}
115
116void
117WatchHandle::onDataValidated(const Interest& interest, const shared_ptr<const Data>& data,
118 const Name& name)
119{
120 if (!m_processes[name].second) {
121 return;
122 }
123 if (getStorageHandle().insertData(*data)) {
124 m_size++;
125 if (!onRunning(name))
126 return;
127
128 Interest fetchInterest(interest.getName());
129 fetchInterest.setSelectors(interest.getSelectors());
130 fetchInterest.setInterestLifetime(m_interestLifetime);
131 fetchInterest.setChildSelector(1);
132
133 // update selectors
134 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
135 if (data->getName().size() == interest.getName().size()) {
136 fetchInterest.setMinSuffixComponents(2);
137 }
138 else {
139 Exclude exclude;
140 if (!interest.getExclude().empty()) {
141 exclude = interest.getExclude();
142 }
143
144 exclude.excludeBefore(data->getName()[interest.getName().size()]);
145 fetchInterest.setExclude(exclude);
146 }
147
148 ++m_interestNum;
149 getFace().expressInterest(fetchInterest,
150 bind(&WatchHandle::onData, this, _1, _2, name),
151 bind(&WatchHandle::onTimeout, this, _1, name));
152 }
153 else {
154 throw Error("Insert into Repo Failed");
155 }
156 m_processes[name].first.setInsertNum(m_size);
157}
158
159void
160WatchHandle::onDataValidationFailed(const Interest& interest, const shared_ptr<const Data>& data,
161 const std::string& reason, const Name& name)
162{
163 std::cerr << reason << std::endl;
164 if (!m_processes[name].second) {
165 return;
166 }
167 if (!onRunning(name))
168 return;
169
170 Interest fetchInterest(interest.getName());
171 fetchInterest.setSelectors(interest.getSelectors());
172 fetchInterest.setInterestLifetime(m_interestLifetime);
173 fetchInterest.setChildSelector(1);
174
175 // update selectors
176 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
177 if (data->getName().size() == interest.getName().size()) {
178 fetchInterest.setMinSuffixComponents(2);
179 }
180 else {
181 Exclude exclude;
182 if (!interest.getExclude().empty()) {
183 exclude = interest.getExclude();
184 }
185 // Only exclude this data since other data whose names are smaller may be validated and satisfied
186 exclude.excludeBefore(data->getName()[interest.getName().size()]);
187 fetchInterest.setExclude(exclude);
188 }
189
190 ++m_interestNum;
191 getFace().expressInterest(fetchInterest,
192 bind(&WatchHandle::onData, this, _1, _2, name),
193 bind(&WatchHandle::onTimeout, this, _1, name));
194}
195
196void
197WatchHandle::onTimeout(const ndn::Interest& interest, const Name& name)
198{
199 std::cerr << "Timeout" << std::endl;
200 if (!m_processes[name].second) {
201 return;
202 }
203 if (!onRunning(name))
204 return;
205 // selectors do not need to be updated
206 Interest fetchInterest(interest.getName());
207 fetchInterest.setSelectors(interest.getSelectors());
208 fetchInterest.setInterestLifetime(m_interestLifetime);
209 fetchInterest.setChildSelector(1);
210
211 ++m_interestNum;
212 getFace().expressInterest(fetchInterest,
213 bind(&WatchHandle::onData, this, _1, _2, name),
214 bind(&WatchHandle::onTimeout, this, _1, name));
215
216}
217
218void
219WatchHandle::listen(const Name& prefix)
220{
221 Name baseWatchPrefix(prefix);
222 baseWatchPrefix.append("watch");
223 getFace().registerPrefix(baseWatchPrefix,
224 bind(&WatchHandle::onRegistered, this, _1),
225 bind(&WatchHandle::onRegisterFailed, this, _1, _2));
226}
227
228void
229WatchHandle::onStopInterest(const Name& prefix, const Interest& interest)
230{
231 m_validator.validate(interest,
232 bind(&WatchHandle::onStopValidated, this, _1, prefix),
233 bind(&WatchHandle::onStopValidationFailed, this, _1, _2));
234}
235
236void
237WatchHandle::onStopValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
238{
239 RepoCommandParameter parameter;
240 try {
241 extractParameter(*interest, prefix, parameter);
242 }
243 catch (RepoCommandParameter::Error) {
244 negativeReply(*interest, 403);
245 return;
246 }
247
248 watchStop(parameter.getName());
249 negativeReply(*interest, 101);
250}
251
252void
253WatchHandle::onStopValidationFailed(const shared_ptr<const Interest>& interest,
254 const std::string& reason)
255{
256 std::cerr << reason << std::endl;
257 negativeReply(*interest, 401);
258}
259
260void
261WatchHandle::onCheckInterest(const Name& prefix, const Interest& interest)
262{
263 m_validator.validate(interest,
264 bind(&WatchHandle::onCheckValidated, this, _1, prefix),
265 bind(&WatchHandle::onCheckValidationFailed, this, _1, _2));
266}
267
268void
269WatchHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
270{
271 RepoCommandParameter parameter;
272 try {
273 extractParameter(*interest, prefix, parameter);
274 }
275 catch (RepoCommandParameter::Error) {
276 negativeReply(*interest, 403);
277 return;
278 }
279
280 if (!parameter.hasName()) {
281 negativeReply(*interest, 403);
282 return;
283 }
284 //check whether this process exists
285 Name name = parameter.getName();
286 if (m_processes.count(name) == 0) {
287 std::cerr << "no such process name: " << name << std::endl;
288 negativeReply(*interest, 404);
289 return;
290 }
291
292 RepoCommandResponse& response = m_processes[name].first;
293 if (!m_processes[name].second) {
294 response.setStatusCode(101);
295 }
296
297 reply(*interest, response);
298
299}
300
301void
302WatchHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
303 const std::string& reason)
304{
305 std::cerr << reason << std::endl;
306 negativeReply(*interest, 401);
307}
308
309void
310WatchHandle::deferredDeleteProcess(const Name& name)
311{
312 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
313 ndn::bind(&WatchHandle::deleteProcess, this, name));
314}
315
316void
317WatchHandle::processWatchCommand(const Interest& interest,
318 RepoCommandParameter& parameter)
319{
320 // if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
321 if (parameter.hasWatchTimeout()) {
322 m_watchTimeout = parameter.getWatchTimeout();
323 }
324 else {
325 m_watchTimeout = milliseconds(0);
326 }
327
328 // if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
329 if (parameter.hasMaxInterestNum()) {
330 m_maxInterestNum = parameter.getMaxInterestNum();
331 }
332 else {
333 m_maxInterestNum = 0;
334 }
335
336 if (parameter.hasInterestLifetime()) {
337 m_interestLifetime = parameter.getInterestLifetime();
338 }
339
340 reply(interest, RepoCommandResponse().setStatusCode(100));
341
342 m_processes[parameter.getName()] =
343 std::make_pair(RepoCommandResponse().setStatusCode(300), true);
344 Interest fetchInterest(parameter.getName());
345 if (parameter.hasSelectors()) {
346 fetchInterest.setSelectors(parameter.getSelectors());
347 }
348 fetchInterest.setChildSelector(1);
349 fetchInterest.setInterestLifetime(m_interestLifetime);
350 m_startTime = steady_clock::now();
351 m_interestNum++;
352 getFace().expressInterest(fetchInterest,
353 bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
354 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
355}
356
357
358void
359WatchHandle::negativeReply(const Interest& interest, int statusCode)
360{
361 RepoCommandResponse response;
362 response.setStatusCode(statusCode);
363 reply(interest, response);
364}
365
366bool
367WatchHandle::onRunning(const Name& name)
368{
369 bool isTimeout = (m_watchTimeout != milliseconds::zero() &&
370 steady_clock::now() - m_startTime > m_watchTimeout);
371 bool isMaxInterest = m_interestNum >= m_maxInterestNum && m_maxInterestNum != 0;
372 if (isTimeout || isMaxInterest) {
373 deferredDeleteProcess(name);
374 watchStop(name);
375 return false;
376 }
377 return true;
378}
379
380} //namespace repo