blob: 105e802de5ab64d1768397fdca40479356483c34 [file] [log] [blame]
Weiqi Shi098f91c2014-07-23 17:41:35 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyevc0e26582017-08-13 21:16:49 -04002/*
Davide Pesavento0c139512018-11-03 18:23:38 -04003 * Copyright (c) 2014-2018, Regents of the University of California.
Weiqi Shi098f91c2014-07-23 17:41:35 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "watch-handle.hpp"
21
22namespace repo {
23
weijia yuan82cf9142018-10-21 12:25:02 -070024static const milliseconds PROCESS_DELETE_TIME(10000_ms);
25static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
Weiqi Shi098f91c2014-07-23 17:41:35 -070026
weijia yuan82cf9142018-10-21 12:25:02 -070027WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle,
28 ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler, Validator& validator)
29 : CommandBaseHandle(face, storageHandle, scheduler, validator)
Weiqi Shi098f91c2014-07-23 17:41:35 -070030 , m_validator(validator)
31 , m_interestNum(0)
32 , m_maxInterestNum(0)
33 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
34 , m_watchTimeout(0)
35 , m_startTime(steady_clock::now())
36 , m_size(0)
37{
weijia yuan82cf9142018-10-21 12:25:02 -070038 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("start"),
39 makeAuthorization(),
40 std::bind(&WatchHandle::validateParameters<WatchStartCommand>, this, _1),
41 std::bind(&WatchHandle::handleStartCommand, this, _1, _2, _3, _4));
42
43 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("check"),
44 makeAuthorization(),
45 std::bind(&WatchHandle::validateParameters<WatchCheckCommand>, this, _1),
46 std::bind(&WatchHandle::handleCheckCommand, this, _1, _2, _3, _4));
47
48 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("stop"),
49 makeAuthorization(),
50 std::bind(&WatchHandle::validateParameters<WatchStopCommand>, this, _1),
51 std::bind(&WatchHandle::handleStopCommand, this, _1, _2, _3, _4));
Weiqi Shi098f91c2014-07-23 17:41:35 -070052}
53
54void
55WatchHandle::deleteProcess(const Name& name)
56{
57 m_processes.erase(name);
58}
59
Weiqi Shi098f91c2014-07-23 17:41:35 -070060void
weijia yuan82cf9142018-10-21 12:25:02 -070061WatchHandle::handleStartCommand(const Name& prefix, const Interest& interest,
62 const ndn::mgmt::ControlParameters& parameter,
63 const ndn::mgmt::CommandContinuation& done)
Weiqi Shi098f91c2014-07-23 17:41:35 -070064{
weijia yuan82cf9142018-10-21 12:25:02 -070065 const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
66 processWatchCommand(interest, repoParameter, done);
Weiqi Shi098f91c2014-07-23 17:41:35 -070067}
68
69void WatchHandle::watchStop(const Name& name)
70{
71 m_processes[name].second = false;
72 m_maxInterestNum = 0;
73 m_interestNum = 0;
74 m_startTime = steady_clock::now();
weijia yuan82cf9142018-10-21 12:25:02 -070075 m_watchTimeout = 0_ms;
Weiqi Shi098f91c2014-07-23 17:41:35 -070076 m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
77 m_size = 0;
78}
79
Weiqi Shi098f91c2014-07-23 17:41:35 -070080
81void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080082WatchHandle::onData(const Interest& interest, const ndn::Data& data, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -070083{
weijia yuan82cf9142018-10-21 12:25:02 -070084 m_validator.validate(data,
85 bind(&WatchHandle::onDataValidated, this, interest, _1, name),
86 bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
Weiqi Shi098f91c2014-07-23 17:41:35 -070087}
88
89void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040090WatchHandle::onDataValidated(const Interest& interest, const Data& data, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -070091{
92 if (!m_processes[name].second) {
93 return;
94 }
weijia yuan82cf9142018-10-21 12:25:02 -070095 if (storageHandle.insertData(data)) {
Weiqi Shi098f91c2014-07-23 17:41:35 -070096 m_size++;
97 if (!onRunning(name))
98 return;
99
100 Interest fetchInterest(interest.getName());
101 fetchInterest.setSelectors(interest.getSelectors());
102 fetchInterest.setInterestLifetime(m_interestLifetime);
103 fetchInterest.setChildSelector(1);
104
105 // update selectors
106 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400107 if (data.getName().size() == interest.getName().size()) {
Weiqi Shi098f91c2014-07-23 17:41:35 -0700108 fetchInterest.setMinSuffixComponents(2);
109 }
110 else {
111 Exclude exclude;
112 if (!interest.getExclude().empty()) {
113 exclude = interest.getExclude();
114 }
115
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400116 exclude.excludeBefore(data.getName()[interest.getName().size()]);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700117 fetchInterest.setExclude(exclude);
118 }
119
120 ++m_interestNum;
weijia yuan82cf9142018-10-21 12:25:02 -0700121 face.expressInterest(fetchInterest,
Weiqi Shi098f91c2014-07-23 17:41:35 -0700122 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800123 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700124 bind(&WatchHandle::onTimeout, this, _1, name));
125 }
126 else {
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800127 BOOST_THROW_EXCEPTION(Error("Insert into Repo Failed"));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700128 }
129 m_processes[name].first.setInsertNum(m_size);
130}
131
132void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400133WatchHandle::onDataValidationFailed(const Interest& interest, const Data& data,
134 const ValidationError& error, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700135{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400136 std::cerr << error << std::endl;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700137 if (!m_processes[name].second) {
138 return;
139 }
140 if (!onRunning(name))
141 return;
142
143 Interest fetchInterest(interest.getName());
144 fetchInterest.setSelectors(interest.getSelectors());
145 fetchInterest.setInterestLifetime(m_interestLifetime);
146 fetchInterest.setChildSelector(1);
147
148 // update selectors
149 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400150 if (data.getName().size() == interest.getName().size()) {
Weiqi Shi098f91c2014-07-23 17:41:35 -0700151 fetchInterest.setMinSuffixComponents(2);
152 }
153 else {
154 Exclude exclude;
155 if (!interest.getExclude().empty()) {
156 exclude = interest.getExclude();
157 }
158 // Only exclude this data since other data whose names are smaller may be validated and satisfied
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400159 exclude.excludeBefore(data.getName()[interest.getName().size()]);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700160 fetchInterest.setExclude(exclude);
161 }
162
163 ++m_interestNum;
weijia yuan82cf9142018-10-21 12:25:02 -0700164 face.expressInterest(fetchInterest,
Weiqi Shi098f91c2014-07-23 17:41:35 -0700165 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800166 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700167 bind(&WatchHandle::onTimeout, this, _1, name));
168}
169
170void
171WatchHandle::onTimeout(const ndn::Interest& interest, const Name& name)
172{
173 std::cerr << "Timeout" << std::endl;
174 if (!m_processes[name].second) {
175 return;
176 }
177 if (!onRunning(name))
178 return;
179 // selectors do not need to be updated
180 Interest fetchInterest(interest.getName());
181 fetchInterest.setSelectors(interest.getSelectors());
182 fetchInterest.setInterestLifetime(m_interestLifetime);
183 fetchInterest.setChildSelector(1);
184
185 ++m_interestNum;
weijia yuan82cf9142018-10-21 12:25:02 -0700186 face.expressInterest(fetchInterest,
Weiqi Shi098f91c2014-07-23 17:41:35 -0700187 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800188 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700189 bind(&WatchHandle::onTimeout, this, _1, name));
190
191}
192
193void
weijia yuan82cf9142018-10-21 12:25:02 -0700194WatchHandle::handleStopCommand(const Name& prefix, const Interest& interest,
195 const ndn::mgmt::ControlParameters& parameter,
196 const ndn::mgmt::CommandContinuation& done)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700197{
weijia yuan82cf9142018-10-21 12:25:02 -0700198 const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
199
200 watchStop(repoParameter.getName());
201 std::string text = "Watched Prefix Insertion for prefix (" + prefix.toUri() + ") is stop.";
202 return done(RepoCommandResponse(101, text));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700203}
204
205void
weijia yuan82cf9142018-10-21 12:25:02 -0700206WatchHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
207 const ndn::mgmt::ControlParameters& parameter,
208 const ndn::mgmt::CommandContinuation& done)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700209{
weijia yuan82cf9142018-10-21 12:25:02 -0700210 const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700211
Weiqi Shi098f91c2014-07-23 17:41:35 -0700212 //check whether this process exists
weijia yuan82cf9142018-10-21 12:25:02 -0700213 Name name = repoParameter.getName();
Weiqi Shi098f91c2014-07-23 17:41:35 -0700214 if (m_processes.count(name) == 0) {
215 std::cerr << "no such process name: " << name << std::endl;
weijia yuan82cf9142018-10-21 12:25:02 -0700216 RepoCommandResponse response(404, "No such process is in progress");
217 response.setBody(response.wireEncode());
218 return done(response);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700219 }
220
221 RepoCommandResponse& response = m_processes[name].first;
weijia yuan82cf9142018-10-21 12:25:02 -0700222
223 if (!m_processes[name].second) {
224 response.setCode(101);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700225 }
226
weijia yuan82cf9142018-10-21 12:25:02 -0700227 return done(response);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700228}
229
230void
231WatchHandle::deferredDeleteProcess(const Name& name)
232{
weijia yuan82cf9142018-10-21 12:25:02 -0700233 scheduler.scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800234 bind(&WatchHandle::deleteProcess, this, name));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700235}
236
237void
238WatchHandle::processWatchCommand(const Interest& interest,
weijia yuan82cf9142018-10-21 12:25:02 -0700239 const RepoCommandParameter& parameter,
240 const ndn::mgmt::CommandContinuation& done)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700241{
242 // if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
243 if (parameter.hasWatchTimeout()) {
244 m_watchTimeout = parameter.getWatchTimeout();
245 }
246 else {
weijia yuan82cf9142018-10-21 12:25:02 -0700247 m_watchTimeout = 0_ms;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700248 }
249
250 // if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
251 if (parameter.hasMaxInterestNum()) {
252 m_maxInterestNum = parameter.getMaxInterestNum();
253 }
254 else {
255 m_maxInterestNum = 0;
256 }
257
258 if (parameter.hasInterestLifetime()) {
259 m_interestLifetime = parameter.getInterestLifetime();
260 }
261
weijia yuan82cf9142018-10-21 12:25:02 -0700262 RepoCommandResponse response(100, "Watching the prefix started.");
263 response.setBody(response.wireEncode());
264 done(response);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700265
266 m_processes[parameter.getName()] =
weijia yuan82cf9142018-10-21 12:25:02 -0700267 std::make_pair(RepoCommandResponse(300, "This watched prefix Insertion is in progress"),
268 true);
269
Weiqi Shi098f91c2014-07-23 17:41:35 -0700270 Interest fetchInterest(parameter.getName());
271 if (parameter.hasSelectors()) {
272 fetchInterest.setSelectors(parameter.getSelectors());
273 }
274 fetchInterest.setChildSelector(1);
275 fetchInterest.setInterestLifetime(m_interestLifetime);
276 m_startTime = steady_clock::now();
277 m_interestNum++;
weijia yuan82cf9142018-10-21 12:25:02 -0700278 face.expressInterest(fetchInterest,
279 bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
280 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
281 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700282}
283
284bool
285WatchHandle::onRunning(const Name& name)
286{
287 bool isTimeout = (m_watchTimeout != milliseconds::zero() &&
288 steady_clock::now() - m_startTime > m_watchTimeout);
289 bool isMaxInterest = m_interestNum >= m_maxInterestNum && m_maxInterestNum != 0;
290 if (isTimeout || isMaxInterest) {
291 deferredDeleteProcess(name);
292 watchStop(name);
293 return false;
294 }
295 return true;
296}
297
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800298} // namespace repo