blob: d971878e094e950d03fbc472a6c5bbbceef73edf [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyevc0e26582017-08-13 21:16:49 -04002/*
Davide Pesavento8891c832019-03-20 23:20:35 -04003 * Copyright (c) 2014-2019, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
weijia yuan3aa8d2b2018-03-06 15:35:57 -080022#include <ndn-cxx/util/logger.hpp>
weijia yuan82cf9142018-10-21 12:25:02 -070023#include <ndn-cxx/util/random.hpp>
24
Shuo Chen29c77fe2014-03-18 11:29:41 -070025namespace repo {
26
weijia yuan3aa8d2b2018-03-06 15:35:57 -080027NDN_LOG_INIT(repo.WriteHandle);
28
Shuo Chen29c77fe2014-03-18 11:29:41 -070029static const int DEFAULT_CREDIT = 12;
weijia yuan3aa8d2b2018-03-06 15:35:57 -080030static const bool DEFAULT_CANBE_PREFIX = false;
31static const milliseconds MAX_TIMEOUT(60_s);
weijia yuan82cf9142018-10-21 12:25:02 -070032static const milliseconds NOEND_TIMEOUT(10000_ms);
33static const milliseconds PROCESS_DELETE_TIME(10000_ms);
34static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
Shuo Chen29c77fe2014-03-18 11:29:41 -070035
weijia yuan82cf9142018-10-21 12:25:02 -070036WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, ndn::mgmt::Dispatcher& dispatcher,
37 Scheduler& scheduler, Validator& validator)
38 : CommandBaseHandle(face, storageHandle, scheduler, validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070039 , m_validator(validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070040 , m_credit(DEFAULT_CREDIT)
weijia yuan3aa8d2b2018-03-06 15:35:57 -080041 , m_canBePrefix(DEFAULT_CANBE_PREFIX)
42 , m_maxTimeout(MAX_TIMEOUT)
Shuo Chen29c77fe2014-03-18 11:29:41 -070043 , m_noEndTimeout(NOEND_TIMEOUT)
Weiqi Shi098f91c2014-07-23 17:41:35 -070044 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
Shuo Chen29c77fe2014-03-18 11:29:41 -070045{
weijia yuan82cf9142018-10-21 12:25:02 -070046 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert"),
47 makeAuthorization(),
48 std::bind(&WriteHandle::validateParameters<InsertCommand>, this, _1),
49 std::bind(&WriteHandle::handleInsertCommand, this, _1, _2, _3, _4));
50
51 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert check"),
52 makeAuthorization(),
53 std::bind(&WriteHandle::validateParameters<InsertCheckCommand>, this, _1),
54 std::bind(&WriteHandle::handleCheckCommand, this, _1, _2, _3, _4));
Shuo Chen29c77fe2014-03-18 11:29:41 -070055}
56
57void
58WriteHandle::deleteProcess(ProcessId processId)
59{
60 m_processes.erase(processId);
61}
62
Shuo Chen29c77fe2014-03-18 11:29:41 -070063void
weijia yuan82cf9142018-10-21 12:25:02 -070064WriteHandle::handleInsertCommand(const Name& prefix, const Interest& interest,
65 const ndn::mgmt::ControlParameters& parameter,
66 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -070067{
weijia yuan82cf9142018-10-21 12:25:02 -070068 RepoCommandParameter* repoParameter =
69 dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(&parameter));
Shuo Chen29c77fe2014-03-18 11:29:41 -070070
weijia yuan82cf9142018-10-21 12:25:02 -070071 if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) {
weijia yuan82cf9142018-10-21 12:25:02 -070072 processSegmentedInsertCommand(interest, *repoParameter, done);
Shuo Chen29c77fe2014-03-18 11:29:41 -070073 }
74 else {
weijia yuan82cf9142018-10-21 12:25:02 -070075 processSingleInsertCommand(interest, *repoParameter, done);
Shuo Chen29c77fe2014-03-18 11:29:41 -070076 }
weijia yuan82cf9142018-10-21 12:25:02 -070077 if (repoParameter->hasInterestLifetime())
78 m_interestLifetime = repoParameter->getInterestLifetime();
Shuo Chen29c77fe2014-03-18 11:29:41 -070079}
80
81void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080082WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -070083{
Shuo Chenc88c87d2014-06-25 20:21:02 +080084 m_validator.validate(data,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080085 std::bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
86 [](const Data& data, const ValidationError& error){NDN_LOG_ERROR("Error: " << error);});
Shuo Chenc88c87d2014-06-25 20:21:02 +080087}
88
89void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040090WriteHandle::onDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +080091{
Shuo Chen29c77fe2014-03-18 11:29:41 -070092 if (m_processes.count(processId) == 0) {
93 return;
94 }
95
96 ProcessInfo& process = m_processes[processId];
97 RepoCommandResponse& response = process.response;
98
99 if (response.getInsertNum() == 0) {
weijia yuan82cf9142018-10-21 12:25:02 -0700100 storageHandle.insertData(data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700101 response.setInsertNum(1);
102 }
103
104 deferredDeleteProcess(processId);
105}
106
107void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700108WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700109{
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800110 NDN_LOG_DEBUG("Timeout" << std::endl);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700111 m_processes.erase(processId);
112}
113
114void
weijia yuan82cf9142018-10-21 12:25:02 -0700115WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
116 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700117{
weijia yuan82cf9142018-10-21 12:25:02 -0700118 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700119
120 ProcessInfo& process = m_processes[processId];
121
122 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700123 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700124 response.setProcessId(processId);
125 response.setInsertNum(0);
weijia yuan82cf9142018-10-21 12:25:02 -0700126 response.setBody(response.wireEncode());
127 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700128
weijia yuan82cf9142018-10-21 12:25:02 -0700129 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700130 Interest fetchInterest(parameter.getName());
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800131 fetchInterest.setCanBePrefix(m_canBePrefix);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700132 fetchInterest.setInterestLifetime(m_interestLifetime);
weijia yuan82cf9142018-10-21 12:25:02 -0700133 face.expressInterest(fetchInterest,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800134 std::bind(&WriteHandle::onData, this, _1, _2, processId),
135 std::bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
136 std::bind(&WriteHandle::onTimeout, this, _1, processId));
137}
138
139void
140WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
141{
142 // use SegmentFetcher to send fetch interest.
143 ProcessInfo& process = m_processes[processId];
144 Name name = parameter.getName();
145 SegmentNo startBlockId = parameter.getStartBlockId();
146
147 uint64_t initialCredit = m_credit;
148
149 if (parameter.hasEndBlockId()) {
150 initialCredit =
151 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
152 }
153 else {
154 // set noEndTimeout timer
155 process.noEndTime = ndn::time::steady_clock::now() +
156 m_noEndTimeout;
157 }
158
159 Name fetchName = name;
160 SegmentNo segment = startBlockId;
161 fetchName.appendSegment(segment);
162 Interest interest(fetchName);
163
164 ndn::util::SegmentFetcher::Options options;
165 options.initCwnd = initialCredit;
166 options.interestLifetime = m_interestLifetime;
167 options.maxTimeout = m_maxTimeout;
168 auto fetcher = ndn::util::SegmentFetcher::start(face, interest, m_validator, options);
169 fetcher->onError.connect([] (uint32_t errorCode, const std::string& errorMsg)
170 {NDN_LOG_ERROR("Error: " << errorMsg);});
171 fetcher->afterSegmentValidated.connect([this, &fetcher, &processId] (const Data& data)
172 {onSegmentData(*fetcher, data, processId);});
173 fetcher->afterSegmentTimedOut.connect([this, &fetcher, &processId] ()
174 {onSegmentTimeout(*fetcher, processId);});
175}
176
177void
178WriteHandle::onSegmentData(ndn::util::SegmentFetcher& fetcher, const Data& data, ProcessId processId)
179{
180 auto it = m_processes.find(processId);
181 if (it == m_processes.end()) {
182 fetcher.stop();
183 return;
184 }
185
186 RepoCommandResponse& response = it->second.response;
187
188 //insert data
189 if (storageHandle.insertData(data)) {
190 response.setInsertNum(response.getInsertNum() + 1);
191 }
192
193 ProcessInfo& process = m_processes[processId];
194 //read whether notime timeout
195 if (!response.hasEndBlockId()) {
196
197 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
198 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
199
200 if (now > noEndTime) {
201 NDN_LOG_DEBUG("noEndtimeout: " << processId);
202 //StatusCode should be refreshed as 405
203 response.setCode(405);
204 //schedule a delete event
205 deferredDeleteProcess(processId);
206 fetcher.stop();
207 return;
208 }
209 }
210
211 //read whether this process has total ends, if ends, remove control info from the maps
212 if (response.hasEndBlockId()) {
213 uint64_t nSegments = response.getEndBlockId() - response.getStartBlockId() + 1;
214 if (response.getInsertNum() >= nSegments) {
215 //All the data has been inserted, StatusCode is refreshed as 200
216 response.setCode(200);
217 deferredDeleteProcess(processId);
218 fetcher.stop();
219 return;
220 }
221 }
222}
223
224void
225WriteHandle::onSegmentTimeout(ndn::util::SegmentFetcher& fetcher, ProcessId processId)
226{
227 NDN_LOG_DEBUG("SegTimeout");
228 if (m_processes.count(processId) == 0) {
229 fetcher.stop();
230 return;
231 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700232}
233
234void
weijia yuan82cf9142018-10-21 12:25:02 -0700235WriteHandle::processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
236 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700237{
238 if (parameter.hasEndBlockId()) {
239 //normal fetch segment
weijia yuan82cf9142018-10-21 12:25:02 -0700240 SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700241 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700242 if (startBlockId > endBlockId) {
weijia yuan82cf9142018-10-21 12:25:02 -0700243 done(negativeReply("Malformed Command", 403));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700244 return;
245 }
246
weijia yuan82cf9142018-10-21 12:25:02 -0700247 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700248 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700249 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700250 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700251 response.setProcessId(processId);
252 response.setInsertNum(0);
253 response.setStartBlockId(startBlockId);
254 response.setEndBlockId(endBlockId);
weijia yuan82cf9142018-10-21 12:25:02 -0700255 response.setBody(response.wireEncode());
256 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700257
258 //300 means data fetching is in progress
weijia yuan82cf9142018-10-21 12:25:02 -0700259 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700260
261 segInit(processId, parameter);
262 }
263 else {
264 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
weijia yuan82cf9142018-10-21 12:25:02 -0700265 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700266 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700267 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700268 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700269 response.setProcessId(processId);
270 response.setInsertNum(0);
271 response.setStartBlockId(parameter.getStartBlockId());
weijia yuan82cf9142018-10-21 12:25:02 -0700272 response.setBody(response.wireEncode());
273 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700274
275 //300 means data fetching is in progress
weijia yuan82cf9142018-10-21 12:25:02 -0700276 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700277
278 segInit(processId, parameter);
279 }
280}
281
282void
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800283WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
284 const ndn::mgmt::ControlParameters& parameter,
285 const ndn::mgmt::CommandContinuation& done)
286{
287 const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
288
289 //check whether this process exists
290 ProcessId processId = repoParameter.getProcessId();
291 if (m_processes.count(processId) == 0) {
292 NDN_LOG_DEBUG("no such processId: " << processId);
293 done(negativeReply("No such this process is in progress", 404));
294 return;
295 }
296
297 ProcessInfo& process = m_processes[processId];
298
299 RepoCommandResponse& response = process.response;
300
301 //Check whether it is single data fetching
302 if (!response.hasStartBlockId() && !response.hasEndBlockId()) {
303 done(response);
304 return;
305 }
306
307 //read if noEndtimeout
308 if (!response.hasEndBlockId()) {
309 extendNoEndTime(process);
310 done(response);
311 return;
312 }
313 else {
314 done(response);
315 }
316}
317
318void
319WriteHandle::deferredDeleteProcess(ProcessId processId)
320{
Davide Pesavento8891c832019-03-20 23:20:35 -0400321 scheduler.schedule(PROCESS_DELETE_TIME, [=] { deleteProcess(processId); });
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800322}
323
324void
Shuo Chen29c77fe2014-03-18 11:29:41 -0700325WriteHandle::extendNoEndTime(ProcessInfo& process)
326{
Davide Pesavento8891c832019-03-20 23:20:35 -0400327 auto& noEndTime = process.noEndTime;
328 auto now = ndn::time::steady_clock::now();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700329 RepoCommandResponse& response = process.response;
330 if (now > noEndTime) {
weijia yuan82cf9142018-10-21 12:25:02 -0700331 response.setCode(405);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700332 return;
333 }
Davide Pesavento8891c832019-03-20 23:20:35 -0400334
Shuo Chen29c77fe2014-03-18 11:29:41 -0700335 //extends noEndTime
weijia yuan82cf9142018-10-21 12:25:02 -0700336 process.noEndTime = ndn::time::steady_clock::now() + m_noEndTimeout;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700337}
338
weijia yuan82cf9142018-10-21 12:25:02 -0700339RepoCommandResponse
340WriteHandle::negativeReply(std::string text, int statusCode)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700341{
weijia yuan82cf9142018-10-21 12:25:02 -0700342 RepoCommandResponse response(statusCode, text);
343 response.setBody(response.wireEncode());
344
345 return response;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700346}
347
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800348} // namespace repo