blob: b9c51f0c8b831171d41536d68044b5f00606c122 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyevc0e26582017-08-13 21:16:49 -04002/*
Davide Pesaventob4966422023-08-10 21:15:32 -04003 * Copyright (c) 2014-2023, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
weijia yuan3aa8d2b2018-03-06 15:35:57 -080022#include <ndn-cxx/util/logger.hpp>
weijia yuan82cf9142018-10-21 12:25:02 -070023#include <ndn-cxx/util/random.hpp>
24
Shuo Chen29c77fe2014-03-18 11:29:41 -070025namespace repo {
26
weijia yuan3aa8d2b2018-03-06 15:35:57 -080027NDN_LOG_INIT(repo.WriteHandle);
28
Davide Pesavento11904062022-04-14 22:33:28 -040029const int DEFAULT_CREDIT = 12;
30const time::milliseconds NOEND_TIMEOUT = 10_s;
31const time::milliseconds PROCESS_DELETE_TIME = 10_s;
Shuo Chen29c77fe2014-03-18 11:29:41 -070032
weijia yuan82cf9142018-10-21 12:25:02 -070033WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, ndn::mgmt::Dispatcher& dispatcher,
Davide Pesavento11904062022-04-14 22:33:28 -040034 Scheduler& scheduler, ndn::security::Validator& validator)
weijia yuan82cf9142018-10-21 12:25:02 -070035 : CommandBaseHandle(face, storageHandle, scheduler, validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070036 , m_validator(validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070037{
weijia yuan82cf9142018-10-21 12:25:02 -070038 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert"),
39 makeAuthorization(),
40 std::bind(&WriteHandle::validateParameters<InsertCommand>, this, _1),
41 std::bind(&WriteHandle::handleInsertCommand, this, _1, _2, _3, _4));
42
43 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert check"),
44 makeAuthorization(),
45 std::bind(&WriteHandle::validateParameters<InsertCheckCommand>, this, _1),
46 std::bind(&WriteHandle::handleCheckCommand, this, _1, _2, _3, _4));
Shuo Chen29c77fe2014-03-18 11:29:41 -070047}
48
49void
50WriteHandle::deleteProcess(ProcessId processId)
51{
52 m_processes.erase(processId);
53}
54
Shuo Chen29c77fe2014-03-18 11:29:41 -070055void
weijia yuan82cf9142018-10-21 12:25:02 -070056WriteHandle::handleInsertCommand(const Name& prefix, const Interest& interest,
Davide Pesavento11904062022-04-14 22:33:28 -040057 const ndn::mgmt::ControlParameters& params,
weijia yuan82cf9142018-10-21 12:25:02 -070058 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -070059{
Davide Pesavento11904062022-04-14 22:33:28 -040060 auto& repoParam = dynamic_cast<RepoCommandParameter&>(const_cast<ndn::mgmt::ControlParameters&>(params));
Shuo Chen29c77fe2014-03-18 11:29:41 -070061
Davide Pesavento11904062022-04-14 22:33:28 -040062 if (repoParam.hasStartBlockId() || repoParam.hasEndBlockId()) {
63 processSegmentedInsertCommand(interest, repoParam, done);
Shuo Chen29c77fe2014-03-18 11:29:41 -070064 }
65 else {
Davide Pesavento11904062022-04-14 22:33:28 -040066 processSingleInsertCommand(interest, repoParam, done);
Shuo Chen29c77fe2014-03-18 11:29:41 -070067 }
Davide Pesavento11904062022-04-14 22:33:28 -040068 if (repoParam.hasInterestLifetime()) {
69 m_interestLifetime = repoParam.getInterestLifetime();
70 }
Shuo Chen29c77fe2014-03-18 11:29:41 -070071}
72
73void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080074WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -070075{
Shuo Chenc88c87d2014-06-25 20:21:02 +080076 m_validator.validate(data,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080077 std::bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
Davide Pesavento11904062022-04-14 22:33:28 -040078 [] (auto&&, const auto& error) { NDN_LOG_ERROR("Error: " << error); });
Shuo Chenc88c87d2014-06-25 20:21:02 +080079}
80
81void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040082WriteHandle::onDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +080083{
Shuo Chen29c77fe2014-03-18 11:29:41 -070084 if (m_processes.count(processId) == 0) {
85 return;
86 }
87
88 ProcessInfo& process = m_processes[processId];
89 RepoCommandResponse& response = process.response;
90
91 if (response.getInsertNum() == 0) {
weijia yuan82cf9142018-10-21 12:25:02 -070092 storageHandle.insertData(data);
Shuo Chen29c77fe2014-03-18 11:29:41 -070093 response.setInsertNum(1);
94 }
95
96 deferredDeleteProcess(processId);
97}
98
99void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700100WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700101{
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800102 NDN_LOG_DEBUG("Timeout" << std::endl);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700103 m_processes.erase(processId);
104}
105
106void
weijia yuan82cf9142018-10-21 12:25:02 -0700107WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
108 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700109{
weijia yuan82cf9142018-10-21 12:25:02 -0700110 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700111
112 ProcessInfo& process = m_processes[processId];
113
114 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700115 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700116 response.setProcessId(processId);
117 response.setInsertNum(0);
weijia yuan82cf9142018-10-21 12:25:02 -0700118 response.setBody(response.wireEncode());
119 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700120
weijia yuan82cf9142018-10-21 12:25:02 -0700121 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700122 Interest fetchInterest(parameter.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700123 fetchInterest.setInterestLifetime(m_interestLifetime);
weijia yuan82cf9142018-10-21 12:25:02 -0700124 face.expressInterest(fetchInterest,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800125 std::bind(&WriteHandle::onData, this, _1, _2, processId),
126 std::bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
127 std::bind(&WriteHandle::onTimeout, this, _1, processId));
128}
129
130void
131WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
132{
133 // use SegmentFetcher to send fetch interest.
134 ProcessInfo& process = m_processes[processId];
135 Name name = parameter.getName();
136 SegmentNo startBlockId = parameter.getStartBlockId();
137
Davide Pesavento11904062022-04-14 22:33:28 -0400138 int initialCredit = DEFAULT_CREDIT;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800139 if (parameter.hasEndBlockId()) {
Davide Pesavento11904062022-04-14 22:33:28 -0400140 initialCredit = std::min<int>(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800141 }
142 else {
143 // set noEndTimeout timer
Davide Pesavento11904062022-04-14 22:33:28 -0400144 process.noEndTime = time::steady_clock::now() + NOEND_TIMEOUT;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800145 }
146
147 Name fetchName = name;
Davide Pesavento11904062022-04-14 22:33:28 -0400148 fetchName.appendSegment(startBlockId);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800149 Interest interest(fetchName);
150
Davide Pesaventob4966422023-08-10 21:15:32 -0400151 ndn::SegmentFetcher::Options options;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800152 options.initCwnd = initialCredit;
153 options.interestLifetime = m_interestLifetime;
Davide Pesaventob4966422023-08-10 21:15:32 -0400154 auto fetcher = ndn::SegmentFetcher::start(face, interest, m_validator, options);
Davide Pesavento11904062022-04-14 22:33:28 -0400155 fetcher->onError.connect([] (uint32_t, const auto& errorMsg) {
156 NDN_LOG_ERROR("Error: " << errorMsg);
157 });
158 fetcher->afterSegmentValidated.connect([this, &fetcher, &processId] (const Data& data) {
159 onSegmentData(*fetcher, data, processId);
160 });
161 fetcher->afterSegmentTimedOut.connect([this, &fetcher, &processId] {
162 onSegmentTimeout(*fetcher, processId);
163 });
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800164}
165
166void
Davide Pesaventob4966422023-08-10 21:15:32 -0400167WriteHandle::onSegmentData(ndn::SegmentFetcher& fetcher, const Data& data, ProcessId processId)
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800168{
169 auto it = m_processes.find(processId);
170 if (it == m_processes.end()) {
171 fetcher.stop();
172 return;
173 }
174
175 RepoCommandResponse& response = it->second.response;
176
177 //insert data
178 if (storageHandle.insertData(data)) {
179 response.setInsertNum(response.getInsertNum() + 1);
180 }
181
182 ProcessInfo& process = m_processes[processId];
183 //read whether notime timeout
184 if (!response.hasEndBlockId()) {
Davide Pesavento11904062022-04-14 22:33:28 -0400185 auto noEndTime = process.noEndTime;
186 auto now = time::steady_clock::now();
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800187
188 if (now > noEndTime) {
189 NDN_LOG_DEBUG("noEndtimeout: " << processId);
190 //StatusCode should be refreshed as 405
191 response.setCode(405);
192 //schedule a delete event
193 deferredDeleteProcess(processId);
194 fetcher.stop();
195 return;
196 }
197 }
198
199 //read whether this process has total ends, if ends, remove control info from the maps
200 if (response.hasEndBlockId()) {
201 uint64_t nSegments = response.getEndBlockId() - response.getStartBlockId() + 1;
202 if (response.getInsertNum() >= nSegments) {
203 //All the data has been inserted, StatusCode is refreshed as 200
204 response.setCode(200);
205 deferredDeleteProcess(processId);
206 fetcher.stop();
207 return;
208 }
209 }
210}
211
212void
Davide Pesaventob4966422023-08-10 21:15:32 -0400213WriteHandle::onSegmentTimeout(ndn::SegmentFetcher& fetcher, ProcessId processId)
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800214{
215 NDN_LOG_DEBUG("SegTimeout");
216 if (m_processes.count(processId) == 0) {
217 fetcher.stop();
218 return;
219 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700220}
221
222void
weijia yuan82cf9142018-10-21 12:25:02 -0700223WriteHandle::processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
224 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700225{
226 if (parameter.hasEndBlockId()) {
227 //normal fetch segment
weijia yuan82cf9142018-10-21 12:25:02 -0700228 SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700229 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700230 if (startBlockId > endBlockId) {
weijia yuan82cf9142018-10-21 12:25:02 -0700231 done(negativeReply("Malformed Command", 403));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700232 return;
233 }
234
weijia yuan82cf9142018-10-21 12:25:02 -0700235 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700236 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700237 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700238 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700239 response.setProcessId(processId);
240 response.setInsertNum(0);
241 response.setStartBlockId(startBlockId);
242 response.setEndBlockId(endBlockId);
weijia yuan82cf9142018-10-21 12:25:02 -0700243 response.setBody(response.wireEncode());
244 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700245
246 //300 means data fetching is in progress
weijia yuan82cf9142018-10-21 12:25:02 -0700247 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700248
249 segInit(processId, parameter);
250 }
251 else {
252 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
weijia yuan82cf9142018-10-21 12:25:02 -0700253 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700254 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700255 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700256 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700257 response.setProcessId(processId);
258 response.setInsertNum(0);
259 response.setStartBlockId(parameter.getStartBlockId());
weijia yuan82cf9142018-10-21 12:25:02 -0700260 response.setBody(response.wireEncode());
261 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700262
263 //300 means data fetching is in progress
weijia yuan82cf9142018-10-21 12:25:02 -0700264 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700265
266 segInit(processId, parameter);
267 }
268}
269
270void
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800271WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
Davide Pesavento11904062022-04-14 22:33:28 -0400272 const ndn::mgmt::ControlParameters& params,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800273 const ndn::mgmt::CommandContinuation& done)
274{
Davide Pesavento11904062022-04-14 22:33:28 -0400275 const auto& repoParameter = dynamic_cast<const RepoCommandParameter&>(params);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800276
277 //check whether this process exists
278 ProcessId processId = repoParameter.getProcessId();
279 if (m_processes.count(processId) == 0) {
280 NDN_LOG_DEBUG("no such processId: " << processId);
281 done(negativeReply("No such this process is in progress", 404));
282 return;
283 }
284
285 ProcessInfo& process = m_processes[processId];
286
287 RepoCommandResponse& response = process.response;
288
289 //Check whether it is single data fetching
290 if (!response.hasStartBlockId() && !response.hasEndBlockId()) {
291 done(response);
292 return;
293 }
294
295 //read if noEndtimeout
296 if (!response.hasEndBlockId()) {
297 extendNoEndTime(process);
298 done(response);
299 return;
300 }
301 else {
302 done(response);
303 }
304}
305
306void
307WriteHandle::deferredDeleteProcess(ProcessId processId)
308{
Davide Pesavento8891c832019-03-20 23:20:35 -0400309 scheduler.schedule(PROCESS_DELETE_TIME, [=] { deleteProcess(processId); });
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800310}
311
312void
Shuo Chen29c77fe2014-03-18 11:29:41 -0700313WriteHandle::extendNoEndTime(ProcessInfo& process)
314{
Davide Pesavento11904062022-04-14 22:33:28 -0400315 auto noEndTime = process.noEndTime;
316 auto now = time::steady_clock::now();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700317 RepoCommandResponse& response = process.response;
318 if (now > noEndTime) {
weijia yuan82cf9142018-10-21 12:25:02 -0700319 response.setCode(405);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700320 return;
321 }
Davide Pesavento8891c832019-03-20 23:20:35 -0400322
Shuo Chen29c77fe2014-03-18 11:29:41 -0700323 //extends noEndTime
Davide Pesavento11904062022-04-14 22:33:28 -0400324 process.noEndTime = time::steady_clock::now() + NOEND_TIMEOUT;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700325}
326
weijia yuan82cf9142018-10-21 12:25:02 -0700327RepoCommandResponse
328WriteHandle::negativeReply(std::string text, int statusCode)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700329{
weijia yuan82cf9142018-10-21 12:25:02 -0700330 RepoCommandResponse response(statusCode, text);
331 response.setBody(response.wireEncode());
weijia yuan82cf9142018-10-21 12:25:02 -0700332 return response;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700333}
334
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800335} // namespace repo