blob: bee036f5745fde74d92230358f91ce1a411df5a2 [file] [log] [blame]
Weiqi Shi098f91c2014-07-23 17:41:35 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyevc0e26582017-08-13 21:16:49 -04002/*
Alexander Afanasyev42290b22017-03-09 12:58:29 -08003 * Copyright (c) 2014-2017, Regents of the University of California.
Weiqi Shi098f91c2014-07-23 17:41:35 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "watch-handle.hpp"
21
22namespace repo {
23
24static const milliseconds PROCESS_DELETE_TIME(10000);
25static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
26
27WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
Junxiao Shi047a6fb2017-06-08 16:16:05 +000028 Scheduler& scheduler, Validator& validator)
Weiqi Shi098f91c2014-07-23 17:41:35 -070029 : BaseHandle(face, storageHandle, keyChain, scheduler)
30 , m_validator(validator)
31 , m_interestNum(0)
32 , m_maxInterestNum(0)
33 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
34 , m_watchTimeout(0)
35 , m_startTime(steady_clock::now())
36 , m_size(0)
37{
38}
39
40void
41WatchHandle::deleteProcess(const Name& name)
42{
43 m_processes.erase(name);
44}
45
46// Interest.
47void
48WatchHandle::onInterest(const Name& prefix, const Interest& interest)
49{
50 m_validator.validate(interest,
51 bind(&WatchHandle::onValidated, this, _1, prefix),
52 bind(&WatchHandle::onValidationFailed, this, _1, _2));
53}
54
55void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040056WatchHandle::onValidated(const Interest& interest, const Name& prefix)
Weiqi Shi098f91c2014-07-23 17:41:35 -070057{
58 RepoCommandParameter parameter;
59 try {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040060 extractParameter(interest, prefix, parameter);
Weiqi Shi098f91c2014-07-23 17:41:35 -070061 }
62 catch (RepoCommandParameter::Error) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040063 negativeReply(interest, 403);
Weiqi Shi098f91c2014-07-23 17:41:35 -070064 return;
65 }
66
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040067 processWatchCommand(interest, parameter);
Weiqi Shi098f91c2014-07-23 17:41:35 -070068}
69
70void WatchHandle::watchStop(const Name& name)
71{
72 m_processes[name].second = false;
73 m_maxInterestNum = 0;
74 m_interestNum = 0;
75 m_startTime = steady_clock::now();
76 m_watchTimeout = milliseconds(0);
77 m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
78 m_size = 0;
79}
80
81void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040082WatchHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
Weiqi Shi098f91c2014-07-23 17:41:35 -070083{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040084 std::cerr << error << std::endl;
85 negativeReply(interest, 401);
Weiqi Shi098f91c2014-07-23 17:41:35 -070086}
87
88void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080089WatchHandle::onData(const Interest& interest, const ndn::Data& data, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -070090{
91 m_validator.validate(data,
92 bind(&WatchHandle::onDataValidated, this, interest, _1, name),
93 bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
94}
95
96void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040097WatchHandle::onDataValidated(const Interest& interest, const Data& data, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -070098{
99 if (!m_processes[name].second) {
100 return;
101 }
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400102 if (getStorageHandle().insertData(data)) {
Weiqi Shi098f91c2014-07-23 17:41:35 -0700103 m_size++;
104 if (!onRunning(name))
105 return;
106
107 Interest fetchInterest(interest.getName());
108 fetchInterest.setSelectors(interest.getSelectors());
109 fetchInterest.setInterestLifetime(m_interestLifetime);
110 fetchInterest.setChildSelector(1);
111
112 // update selectors
113 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400114 if (data.getName().size() == interest.getName().size()) {
Weiqi Shi098f91c2014-07-23 17:41:35 -0700115 fetchInterest.setMinSuffixComponents(2);
116 }
117 else {
118 Exclude exclude;
119 if (!interest.getExclude().empty()) {
120 exclude = interest.getExclude();
121 }
122
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400123 exclude.excludeBefore(data.getName()[interest.getName().size()]);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700124 fetchInterest.setExclude(exclude);
125 }
126
127 ++m_interestNum;
128 getFace().expressInterest(fetchInterest,
129 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800130 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700131 bind(&WatchHandle::onTimeout, this, _1, name));
132 }
133 else {
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800134 BOOST_THROW_EXCEPTION(Error("Insert into Repo Failed"));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700135 }
136 m_processes[name].first.setInsertNum(m_size);
137}
138
139void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400140WatchHandle::onDataValidationFailed(const Interest& interest, const Data& data,
141 const ValidationError& error, const Name& name)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700142{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400143 std::cerr << error << std::endl;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700144 if (!m_processes[name].second) {
145 return;
146 }
147 if (!onRunning(name))
148 return;
149
150 Interest fetchInterest(interest.getName());
151 fetchInterest.setSelectors(interest.getSelectors());
152 fetchInterest.setInterestLifetime(m_interestLifetime);
153 fetchInterest.setChildSelector(1);
154
155 // update selectors
156 // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400157 if (data.getName().size() == interest.getName().size()) {
Weiqi Shi098f91c2014-07-23 17:41:35 -0700158 fetchInterest.setMinSuffixComponents(2);
159 }
160 else {
161 Exclude exclude;
162 if (!interest.getExclude().empty()) {
163 exclude = interest.getExclude();
164 }
165 // Only exclude this data since other data whose names are smaller may be validated and satisfied
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400166 exclude.excludeBefore(data.getName()[interest.getName().size()]);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700167 fetchInterest.setExclude(exclude);
168 }
169
170 ++m_interestNum;
171 getFace().expressInterest(fetchInterest,
172 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800173 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700174 bind(&WatchHandle::onTimeout, this, _1, name));
175}
176
177void
178WatchHandle::onTimeout(const ndn::Interest& interest, const Name& name)
179{
180 std::cerr << "Timeout" << std::endl;
181 if (!m_processes[name].second) {
182 return;
183 }
184 if (!onRunning(name))
185 return;
186 // selectors do not need to be updated
187 Interest fetchInterest(interest.getName());
188 fetchInterest.setSelectors(interest.getSelectors());
189 fetchInterest.setInterestLifetime(m_interestLifetime);
190 fetchInterest.setChildSelector(1);
191
192 ++m_interestNum;
193 getFace().expressInterest(fetchInterest,
194 bind(&WatchHandle::onData, this, _1, _2, name),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800195 bind(&WatchHandle::onTimeout, this, _1, name), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700196 bind(&WatchHandle::onTimeout, this, _1, name));
197
198}
199
200void
201WatchHandle::listen(const Name& prefix)
202{
Junxiao Shi2b7b8312017-06-16 03:43:24 +0000203 getFace().setInterestFilter(Name(prefix).append("watch").append("start"),
204 bind(&WatchHandle::onInterest, this, _1, _2));
205 getFace().setInterestFilter(Name(prefix).append("watch").append("check"),
206 bind(&WatchHandle::onCheckInterest, this, _1, _2));
207 getFace().setInterestFilter(Name(prefix).append("watch").append("stop"),
208 bind(&WatchHandle::onStopInterest, this, _1, _2));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700209}
210
211void
212WatchHandle::onStopInterest(const Name& prefix, const Interest& interest)
213{
214 m_validator.validate(interest,
215 bind(&WatchHandle::onStopValidated, this, _1, prefix),
216 bind(&WatchHandle::onStopValidationFailed, this, _1, _2));
217}
218
219void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400220WatchHandle::onStopValidated(const Interest& interest, const Name& prefix)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700221{
222 RepoCommandParameter parameter;
223 try {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400224 extractParameter(interest, prefix, parameter);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700225 }
226 catch (RepoCommandParameter::Error) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400227 negativeReply(interest, 403);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700228 return;
229 }
230
231 watchStop(parameter.getName());
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400232 negativeReply(interest, 101);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700233}
234
235void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400236WatchHandle::onStopValidationFailed(const Interest& interest, const ValidationError& error)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700237{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400238 std::cerr << error << std::endl;
239 negativeReply(interest, 401);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700240}
241
242void
243WatchHandle::onCheckInterest(const Name& prefix, const Interest& interest)
244{
245 m_validator.validate(interest,
246 bind(&WatchHandle::onCheckValidated, this, _1, prefix),
247 bind(&WatchHandle::onCheckValidationFailed, this, _1, _2));
248}
249
250void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400251WatchHandle::onCheckValidated(const Interest& interest, const Name& prefix)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700252{
253 RepoCommandParameter parameter;
254 try {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400255 extractParameter(interest, prefix, parameter);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700256 }
257 catch (RepoCommandParameter::Error) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400258 negativeReply(interest, 403);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700259 return;
260 }
261
262 if (!parameter.hasName()) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400263 negativeReply(interest, 403);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700264 return;
265 }
266 //check whether this process exists
267 Name name = parameter.getName();
268 if (m_processes.count(name) == 0) {
269 std::cerr << "no such process name: " << name << std::endl;
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400270 negativeReply(interest, 404);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700271 return;
272 }
273
274 RepoCommandResponse& response = m_processes[name].first;
275 if (!m_processes[name].second) {
276 response.setStatusCode(101);
277 }
278
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400279 reply(interest, response);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700280
281}
282
283void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400284WatchHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
Weiqi Shi098f91c2014-07-23 17:41:35 -0700285{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400286 std::cerr << error << std::endl;
287 negativeReply(interest, 401);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700288}
289
290void
291WatchHandle::deferredDeleteProcess(const Name& name)
292{
293 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800294 bind(&WatchHandle::deleteProcess, this, name));
Weiqi Shi098f91c2014-07-23 17:41:35 -0700295}
296
297void
298WatchHandle::processWatchCommand(const Interest& interest,
299 RepoCommandParameter& parameter)
300{
301 // if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
302 if (parameter.hasWatchTimeout()) {
303 m_watchTimeout = parameter.getWatchTimeout();
304 }
305 else {
306 m_watchTimeout = milliseconds(0);
307 }
308
309 // if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
310 if (parameter.hasMaxInterestNum()) {
311 m_maxInterestNum = parameter.getMaxInterestNum();
312 }
313 else {
314 m_maxInterestNum = 0;
315 }
316
317 if (parameter.hasInterestLifetime()) {
318 m_interestLifetime = parameter.getInterestLifetime();
319 }
320
321 reply(interest, RepoCommandResponse().setStatusCode(100));
322
323 m_processes[parameter.getName()] =
324 std::make_pair(RepoCommandResponse().setStatusCode(300), true);
325 Interest fetchInterest(parameter.getName());
326 if (parameter.hasSelectors()) {
327 fetchInterest.setSelectors(parameter.getSelectors());
328 }
329 fetchInterest.setChildSelector(1);
330 fetchInterest.setInterestLifetime(m_interestLifetime);
331 m_startTime = steady_clock::now();
332 m_interestNum++;
333 getFace().expressInterest(fetchInterest,
334 bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800335 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
Weiqi Shi098f91c2014-07-23 17:41:35 -0700336 bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
337}
338
339
340void
341WatchHandle::negativeReply(const Interest& interest, int statusCode)
342{
343 RepoCommandResponse response;
344 response.setStatusCode(statusCode);
345 reply(interest, response);
346}
347
348bool
349WatchHandle::onRunning(const Name& name)
350{
351 bool isTimeout = (m_watchTimeout != milliseconds::zero() &&
352 steady_clock::now() - m_startTime > m_watchTimeout);
353 bool isMaxInterest = m_interestNum >= m_maxInterestNum && m_maxInterestNum != 0;
354 if (isTimeout || isMaxInterest) {
355 deferredDeleteProcess(name);
356 watchStop(name);
357 return false;
358 }
359 return true;
360}
361
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800362} // namespace repo