blob: 91eaac4a0306869b9bb9dab4b3930485cf0316df [file] [log] [blame]
Weiqi Shi098f91c2014-07-23 17:41:35 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev42290b22017-03-09 12:58:29 -08003 * Copyright (c) 2014-2017, Regents of the University of California.
Weiqi Shi098f91c2014-07-23 17:41:35 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "watch-handle.hpp"
21
22namespace repo {
23
24static const milliseconds PROCESS_DELETE_TIME(10000);
25static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
26
27WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
Junxiao Shi047a6fb2017-06-08 16:16:05 +000028 Scheduler& scheduler, Validator& validator)
Weiqi Shi098f91c2014-07-23 17:41:35 -070029 : BaseHandle(face, storageHandle, keyChain, scheduler)
30 , m_validator(validator)
31 , m_interestNum(0)
32 , m_maxInterestNum(0)
33 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
34 , m_watchTimeout(0)
35 , m_startTime(steady_clock::now())
36 , m_size(0)
37{
38}
39
40void
41WatchHandle::deleteProcess(const Name& name)
42{
43 m_processes.erase(name);
44}
45
46// Interest.
47void
48WatchHandle::onInterest(const Name& prefix, const Interest& interest)
49{
50 m_validator.validate(interest,
51 bind(&WatchHandle::onValidated, this, _1, prefix),
52 bind(&WatchHandle::onValidationFailed, this, _1, _2));
53}
54
55void
Weiqi Shi098f91c2014-07-23 17:41:35 -070056WatchHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
57{
58 RepoCommandParameter parameter;
59 try {
60 extractParameter(*interest, prefix, parameter);
61 }
62 catch (RepoCommandParameter::Error) {
63 negativeReply(*interest, 403);
64 return;
65 }
66
67 processWatchCommand(*interest, parameter);
68}
69
70void WatchHandle::watchStop(const Name& name)
71{
72 m_processes[name].second = false;
73 m_maxInterestNum = 0;
74 m_interestNum = 0;
75 m_startTime = steady_clock::now();
76 m_watchTimeout = milliseconds(0);
77 m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
78 m_size = 0;
79}
80
81void
Wentao Shanga8f3c402014-10-30 14:03:27 -070082WatchHandle::onValidationFailed(const shared_ptr<const Interest>& interest,
83 const std::string& reason)
Weiqi Shi098f91c2014-07-23 17:41:35 -070084{
85 std::cerr << reason << std::endl;
86 negativeReply(*interest, 401);
87}
88
89void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080090WatchHandle::onData(const Interest& interest, const ndn::Data& data, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -070091{
92 m_validator.validate(data,
93 bind(&WatchHandle::onDataValidated, this, interest, _1, name),
94 bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
95}
96
97void
98WatchHandle::onDataValidated(const Interest& interest, const shared_ptr<const Data>& data,
99 const Name& name)
100{
101 if (!m_processes[name].second) {
102 return;
103 }
104 if (getStorageHandle().insertData(*data)) {
105 m_size++;
106 if (!onRunning(name))
107 return;
108
109 Interest fetchInterest(interest.getName());
110 fetchInterest.setSelectors(interest.getSelectors());
111 fetchInterest.setInterestLifetime(m_interestLifetime);
112 fetchInterest.setChildSelector(1);
113
114 // update selectors
115 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
116 if (data->getName().size() == interest.getName().size()) {
117 fetchInterest.setMinSuffixComponents(2);
118 }
119 else {
120 Exclude exclude;
121 if (!interest.getExclude().empty()) {
122 exclude = interest.getExclude();
123 }
124
125 exclude.excludeBefore(data->getName()[interest.getName().size()]);
126 fetchInterest.setExclude(exclude);
127 }
128
129 ++m_interestNum;
130 getFace().expressInterest(fetchInterest,
131 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800132 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700133 bind(&WatchHandle::onTimeout, this, _1, name));
134 }
135 else {
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800136 BOOST_THROW_EXCEPTION(Error("Insert into Repo Failed"));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700137 }
138 m_processes[name].first.setInsertNum(m_size);
139}
140
141void
142WatchHandle::onDataValidationFailed(const Interest& interest, const shared_ptr<const Data>& data,
143 const std::string& reason, const Name& name)
144{
145 std::cerr << reason << std::endl;
146 if (!m_processes[name].second) {
147 return;
148 }
149 if (!onRunning(name))
150 return;
151
152 Interest fetchInterest(interest.getName());
153 fetchInterest.setSelectors(interest.getSelectors());
154 fetchInterest.setInterestLifetime(m_interestLifetime);
155 fetchInterest.setChildSelector(1);
156
157 // update selectors
158 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
159 if (data->getName().size() == interest.getName().size()) {
160 fetchInterest.setMinSuffixComponents(2);
161 }
162 else {
163 Exclude exclude;
164 if (!interest.getExclude().empty()) {
165 exclude = interest.getExclude();
166 }
167 // Only exclude this data since other data whose names are smaller may be validated and satisfied
168 exclude.excludeBefore(data->getName()[interest.getName().size()]);
169 fetchInterest.setExclude(exclude);
170 }
171
172 ++m_interestNum;
173 getFace().expressInterest(fetchInterest,
174 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800175 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700176 bind(&WatchHandle::onTimeout, this, _1, name));
177}
178
179void
180WatchHandle::onTimeout(const ndn::Interest& interest, const Name& name)
181{
182 std::cerr << "Timeout" << std::endl;
183 if (!m_processes[name].second) {
184 return;
185 }
186 if (!onRunning(name))
187 return;
188 // selectors do not need to be updated
189 Interest fetchInterest(interest.getName());
190 fetchInterest.setSelectors(interest.getSelectors());
191 fetchInterest.setInterestLifetime(m_interestLifetime);
192 fetchInterest.setChildSelector(1);
193
194 ++m_interestNum;
195 getFace().expressInterest(fetchInterest,
196 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800197 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700198 bind(&WatchHandle::onTimeout, this, _1, name));
199
200}
201
202void
203WatchHandle::listen(const Name& prefix)
204{
Junxiao Shi2b7b8312017-06-16 03:43:24 +0000205 getFace().setInterestFilter(Name(prefix).append("watch").append("start"),
206 bind(&WatchHandle::onInterest, this, _1, _2));
207 getFace().setInterestFilter(Name(prefix).append("watch").append("check"),
208 bind(&WatchHandle::onCheckInterest, this, _1, _2));
209 getFace().setInterestFilter(Name(prefix).append("watch").append("stop"),
210 bind(&WatchHandle::onStopInterest, this, _1, _2));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700211}
212
213void
214WatchHandle::onStopInterest(const Name& prefix, const Interest& interest)
215{
216 m_validator.validate(interest,
217 bind(&WatchHandle::onStopValidated, this, _1, prefix),
218 bind(&WatchHandle::onStopValidationFailed, this, _1, _2));
219}
220
221void
222WatchHandle::onStopValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
223{
224 RepoCommandParameter parameter;
225 try {
226 extractParameter(*interest, prefix, parameter);
227 }
228 catch (RepoCommandParameter::Error) {
229 negativeReply(*interest, 403);
230 return;
231 }
232
233 watchStop(parameter.getName());
234 negativeReply(*interest, 101);
235}
236
237void
238WatchHandle::onStopValidationFailed(const shared_ptr<const Interest>& interest,
239 const std::string& reason)
240{
241 std::cerr << reason << std::endl;
242 negativeReply(*interest, 401);
243}
244
245void
246WatchHandle::onCheckInterest(const Name& prefix, const Interest& interest)
247{
248 m_validator.validate(interest,
249 bind(&WatchHandle::onCheckValidated, this, _1, prefix),
250 bind(&WatchHandle::onCheckValidationFailed, this, _1, _2));
251}
252
253void
254WatchHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
255{
256 RepoCommandParameter parameter;
257 try {
258 extractParameter(*interest, prefix, parameter);
259 }
260 catch (RepoCommandParameter::Error) {
261 negativeReply(*interest, 403);
262 return;
263 }
264
265 if (!parameter.hasName()) {
266 negativeReply(*interest, 403);
267 return;
268 }
269 //check whether this process exists
270 Name name = parameter.getName();
271 if (m_processes.count(name) == 0) {
272 std::cerr << "no such process name: " << name << std::endl;
273 negativeReply(*interest, 404);
274 return;
275 }
276
277 RepoCommandResponse& response = m_processes[name].first;
278 if (!m_processes[name].second) {
279 response.setStatusCode(101);
280 }
281
282 reply(*interest, response);
283
284}
285
286void
287WatchHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
288 const std::string& reason)
289{
290 std::cerr << reason << std::endl;
291 negativeReply(*interest, 401);
292}
293
294void
295WatchHandle::deferredDeleteProcess(const Name& name)
296{
297 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800298 bind(&WatchHandle::deleteProcess, this, name));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700299}
300
301void
302WatchHandle::processWatchCommand(const Interest& interest,
303 RepoCommandParameter& parameter)
304{
305 // if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
306 if (parameter.hasWatchTimeout()) {
307 m_watchTimeout = parameter.getWatchTimeout();
308 }
309 else {
310 m_watchTimeout = milliseconds(0);
311 }
312
313 // if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
314 if (parameter.hasMaxInterestNum()) {
315 m_maxInterestNum = parameter.getMaxInterestNum();
316 }
317 else {
318 m_maxInterestNum = 0;
319 }
320
321 if (parameter.hasInterestLifetime()) {
322 m_interestLifetime = parameter.getInterestLifetime();
323 }
324
325 reply(interest, RepoCommandResponse().setStatusCode(100));
326
327 m_processes[parameter.getName()] =
328 std::make_pair(RepoCommandResponse().setStatusCode(300), true);
329 Interest fetchInterest(parameter.getName());
330 if (parameter.hasSelectors()) {
331 fetchInterest.setSelectors(parameter.getSelectors());
332 }
333 fetchInterest.setChildSelector(1);
334 fetchInterest.setInterestLifetime(m_interestLifetime);
335 m_startTime = steady_clock::now();
336 m_interestNum++;
337 getFace().expressInterest(fetchInterest,
338 bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800339 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700340 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
341}
342
343
344void
345WatchHandle::negativeReply(const Interest& interest, int statusCode)
346{
347 RepoCommandResponse response;
348 response.setStatusCode(statusCode);
349 reply(interest, response);
350}
351
352bool
353WatchHandle::onRunning(const Name& name)
354{
355 bool isTimeout = (m_watchTimeout != milliseconds::zero() &&
356 steady_clock::now() - m_startTime > m_watchTimeout);
357 bool isMaxInterest = m_interestNum >= m_maxInterestNum && m_maxInterestNum != 0;
358 if (isTimeout || isMaxInterest) {
359 deferredDeleteProcess(name);
360 watchStop(name);
361 return false;
362 }
363 return true;
364}
365
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800366} // namespace repo