blob: 25ce27800f471f73e660d9f2fe81ae0028c7c0d5 [file] [log] [blame]
Weiqi Shi098f91c2014-07-23 17:41:35 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev42290b22017-03-09 12:58:29 -08003 * Copyright (c) 2014-2017, Regents of the University of California.
Weiqi Shi098f91c2014-07-23 17:41:35 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "watch-handle.hpp"
21
22namespace repo {
23
24static const milliseconds PROCESS_DELETE_TIME(10000);
25static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
26
27WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
28 Scheduler& scheduler, ValidatorConfig& validator)
29 : BaseHandle(face, storageHandle, keyChain, scheduler)
30 , m_validator(validator)
31 , m_interestNum(0)
32 , m_maxInterestNum(0)
33 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
34 , m_watchTimeout(0)
35 , m_startTime(steady_clock::now())
36 , m_size(0)
37{
38}
39
40void
41WatchHandle::deleteProcess(const Name& name)
42{
43 m_processes.erase(name);
44}
45
46// Interest.
47void
48WatchHandle::onInterest(const Name& prefix, const Interest& interest)
49{
50 m_validator.validate(interest,
51 bind(&WatchHandle::onValidated, this, _1, prefix),
52 bind(&WatchHandle::onValidationFailed, this, _1, _2));
53}
54
55void
56WatchHandle::onRegistered(const Name& prefix)
57{
58 getFace().setInterestFilter(Name().append(prefix).append("start"),
59 bind(&WatchHandle::onInterest, this, _1, _2));
60 getFace().setInterestFilter(Name().append(prefix).append("check"),
61 bind(&WatchHandle::onCheckInterest, this, _1, _2));
62 getFace().setInterestFilter(Name().append(prefix).append("stop"),
63 bind(&WatchHandle::onStopInterest, this, _1, _2));
64}
65
66// onRegisterFailed for watch start.
67void
68WatchHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
69{
70 std::cerr << reason << std::endl;
Alexander Afanasyev42290b22017-03-09 12:58:29 -080071 BOOST_THROW_EXCEPTION(Error("watch prefix registration failed"));
Weiqi Shi098f91c2014-07-23 17:41:35 -070072}
73
74void
75WatchHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
76{
77 RepoCommandParameter parameter;
78 try {
79 extractParameter(*interest, prefix, parameter);
80 }
81 catch (RepoCommandParameter::Error) {
82 negativeReply(*interest, 403);
83 return;
84 }
85
86 processWatchCommand(*interest, parameter);
87}
88
89void WatchHandle::watchStop(const Name& name)
90{
91 m_processes[name].second = false;
92 m_maxInterestNum = 0;
93 m_interestNum = 0;
94 m_startTime = steady_clock::now();
95 m_watchTimeout = milliseconds(0);
96 m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
97 m_size = 0;
98}
99
100void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700101WatchHandle::onValidationFailed(const shared_ptr<const Interest>& interest,
102 const std::string& reason)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700103{
104 std::cerr << reason << std::endl;
105 negativeReply(*interest, 401);
106}
107
108void
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800109WatchHandle::onData(const Interest& interest, const ndn::Data& data, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700110{
111 m_validator.validate(data,
112 bind(&WatchHandle::onDataValidated, this, interest, _1, name),
113 bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
114}
115
116void
117WatchHandle::onDataValidated(const Interest& interest, const shared_ptr<const Data>& data,
118 const Name& name)
119{
120 if (!m_processes[name].second) {
121 return;
122 }
123 if (getStorageHandle().insertData(*data)) {
124 m_size++;
125 if (!onRunning(name))
126 return;
127
128 Interest fetchInterest(interest.getName());
129 fetchInterest.setSelectors(interest.getSelectors());
130 fetchInterest.setInterestLifetime(m_interestLifetime);
131 fetchInterest.setChildSelector(1);
132
133 // update selectors
134 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
135 if (data->getName().size() == interest.getName().size()) {
136 fetchInterest.setMinSuffixComponents(2);
137 }
138 else {
139 Exclude exclude;
140 if (!interest.getExclude().empty()) {
141 exclude = interest.getExclude();
142 }
143
144 exclude.excludeBefore(data->getName()[interest.getName().size()]);
145 fetchInterest.setExclude(exclude);
146 }
147
148 ++m_interestNum;
149 getFace().expressInterest(fetchInterest,
150 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800151 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700152 bind(&WatchHandle::onTimeout, this, _1, name));
153 }
154 else {
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800155 BOOST_THROW_EXCEPTION(Error("Insert into Repo Failed"));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700156 }
157 m_processes[name].first.setInsertNum(m_size);
158}
159
160void
161WatchHandle::onDataValidationFailed(const Interest& interest, const shared_ptr<const Data>& data,
162 const std::string& reason, const Name& name)
163{
164 std::cerr << reason << std::endl;
165 if (!m_processes[name].second) {
166 return;
167 }
168 if (!onRunning(name))
169 return;
170
171 Interest fetchInterest(interest.getName());
172 fetchInterest.setSelectors(interest.getSelectors());
173 fetchInterest.setInterestLifetime(m_interestLifetime);
174 fetchInterest.setChildSelector(1);
175
176 // update selectors
177 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
178 if (data->getName().size() == interest.getName().size()) {
179 fetchInterest.setMinSuffixComponents(2);
180 }
181 else {
182 Exclude exclude;
183 if (!interest.getExclude().empty()) {
184 exclude = interest.getExclude();
185 }
186 // Only exclude this data since other data whose names are smaller may be validated and satisfied
187 exclude.excludeBefore(data->getName()[interest.getName().size()]);
188 fetchInterest.setExclude(exclude);
189 }
190
191 ++m_interestNum;
192 getFace().expressInterest(fetchInterest,
193 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800194 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700195 bind(&WatchHandle::onTimeout, this, _1, name));
196}
197
198void
199WatchHandle::onTimeout(const ndn::Interest& interest, const Name& name)
200{
201 std::cerr << "Timeout" << std::endl;
202 if (!m_processes[name].second) {
203 return;
204 }
205 if (!onRunning(name))
206 return;
207 // selectors do not need to be updated
208 Interest fetchInterest(interest.getName());
209 fetchInterest.setSelectors(interest.getSelectors());
210 fetchInterest.setInterestLifetime(m_interestLifetime);
211 fetchInterest.setChildSelector(1);
212
213 ++m_interestNum;
214 getFace().expressInterest(fetchInterest,
215 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800216 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700217 bind(&WatchHandle::onTimeout, this, _1, name));
218
219}
220
221void
222WatchHandle::listen(const Name& prefix)
223{
224 Name baseWatchPrefix(prefix);
225 baseWatchPrefix.append("watch");
226 getFace().registerPrefix(baseWatchPrefix,
227 bind(&WatchHandle::onRegistered, this, _1),
228 bind(&WatchHandle::onRegisterFailed, this, _1, _2));
229}
230
231void
232WatchHandle::onStopInterest(const Name& prefix, const Interest& interest)
233{
234 m_validator.validate(interest,
235 bind(&WatchHandle::onStopValidated, this, _1, prefix),
236 bind(&WatchHandle::onStopValidationFailed, this, _1, _2));
237}
238
239void
240WatchHandle::onStopValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
241{
242 RepoCommandParameter parameter;
243 try {
244 extractParameter(*interest, prefix, parameter);
245 }
246 catch (RepoCommandParameter::Error) {
247 negativeReply(*interest, 403);
248 return;
249 }
250
251 watchStop(parameter.getName());
252 negativeReply(*interest, 101);
253}
254
255void
256WatchHandle::onStopValidationFailed(const shared_ptr<const Interest>& interest,
257 const std::string& reason)
258{
259 std::cerr << reason << std::endl;
260 negativeReply(*interest, 401);
261}
262
263void
264WatchHandle::onCheckInterest(const Name& prefix, const Interest& interest)
265{
266 m_validator.validate(interest,
267 bind(&WatchHandle::onCheckValidated, this, _1, prefix),
268 bind(&WatchHandle::onCheckValidationFailed, this, _1, _2));
269}
270
271void
272WatchHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
273{
274 RepoCommandParameter parameter;
275 try {
276 extractParameter(*interest, prefix, parameter);
277 }
278 catch (RepoCommandParameter::Error) {
279 negativeReply(*interest, 403);
280 return;
281 }
282
283 if (!parameter.hasName()) {
284 negativeReply(*interest, 403);
285 return;
286 }
287 //check whether this process exists
288 Name name = parameter.getName();
289 if (m_processes.count(name) == 0) {
290 std::cerr << "no such process name: " << name << std::endl;
291 negativeReply(*interest, 404);
292 return;
293 }
294
295 RepoCommandResponse& response = m_processes[name].first;
296 if (!m_processes[name].second) {
297 response.setStatusCode(101);
298 }
299
300 reply(*interest, response);
301
302}
303
304void
305WatchHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
306 const std::string& reason)
307{
308 std::cerr << reason << std::endl;
309 negativeReply(*interest, 401);
310}
311
312void
313WatchHandle::deferredDeleteProcess(const Name& name)
314{
315 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800316 bind(&WatchHandle::deleteProcess, this, name));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700317}
318
319void
320WatchHandle::processWatchCommand(const Interest& interest,
321 RepoCommandParameter& parameter)
322{
323 // if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
324 if (parameter.hasWatchTimeout()) {
325 m_watchTimeout = parameter.getWatchTimeout();
326 }
327 else {
328 m_watchTimeout = milliseconds(0);
329 }
330
331 // if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
332 if (parameter.hasMaxInterestNum()) {
333 m_maxInterestNum = parameter.getMaxInterestNum();
334 }
335 else {
336 m_maxInterestNum = 0;
337 }
338
339 if (parameter.hasInterestLifetime()) {
340 m_interestLifetime = parameter.getInterestLifetime();
341 }
342
343 reply(interest, RepoCommandResponse().setStatusCode(100));
344
345 m_processes[parameter.getName()] =
346 std::make_pair(RepoCommandResponse().setStatusCode(300), true);
347 Interest fetchInterest(parameter.getName());
348 if (parameter.hasSelectors()) {
349 fetchInterest.setSelectors(parameter.getSelectors());
350 }
351 fetchInterest.setChildSelector(1);
352 fetchInterest.setInterestLifetime(m_interestLifetime);
353 m_startTime = steady_clock::now();
354 m_interestNum++;
355 getFace().expressInterest(fetchInterest,
356 bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800357 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700358 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
359}
360
361
362void
363WatchHandle::negativeReply(const Interest& interest, int statusCode)
364{
365 RepoCommandResponse response;
366 response.setStatusCode(statusCode);
367 reply(interest, response);
368}
369
370bool
371WatchHandle::onRunning(const Name& name)
372{
373 bool isTimeout = (m_watchTimeout != milliseconds::zero() &&
374 steady_clock::now() - m_startTime > m_watchTimeout);
375 bool isMaxInterest = m_interestNum >= m_maxInterestNum && m_maxInterestNum != 0;
376 if (isTimeout || isMaxInterest) {
377 deferredDeleteProcess(name);
378 watchStop(name);
379 return false;
380 }
381 return true;
382}
383
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800384} // namespace repo