| /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
| /* |
| * Copyright (c) 2014-2019, Regents of the University of California. |
| * |
| * This file is part of NDN repo-ng (Next generation of NDN repository). |
| * See AUTHORS.md for complete list of repo-ng authors and contributors. |
| * |
| * repo-ng is free software: you can redistribute it and/or modify it under the terms |
| * of the GNU General Public License as published by the Free Software Foundation, |
| * either version 3 of the License, or (at your option) any later version. |
| * |
| * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; |
| * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR |
| * PURPOSE. See the GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License along with |
| * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>. |
| */ |
| |
| #include "write-handle.hpp" |
| |
| #include <ndn-cxx/util/logger.hpp> |
| #include <ndn-cxx/util/random.hpp> |
| |
| namespace repo { |
| |
| NDN_LOG_INIT(repo.WriteHandle); |
| |
| static const int DEFAULT_CREDIT = 12; |
| static const bool DEFAULT_CANBE_PREFIX = false; |
| static const milliseconds MAX_TIMEOUT(60_s); |
| static const milliseconds NOEND_TIMEOUT(10000_ms); |
| static const milliseconds PROCESS_DELETE_TIME(10000_ms); |
| static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms); |
| |
| WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, ndn::mgmt::Dispatcher& dispatcher, |
| Scheduler& scheduler, Validator& validator) |
| : CommandBaseHandle(face, storageHandle, scheduler, validator) |
| , m_validator(validator) |
| , m_credit(DEFAULT_CREDIT) |
| , m_canBePrefix(DEFAULT_CANBE_PREFIX) |
| , m_maxTimeout(MAX_TIMEOUT) |
| , m_noEndTimeout(NOEND_TIMEOUT) |
| , m_interestLifetime(DEFAULT_INTEREST_LIFETIME) |
| { |
| dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert"), |
| makeAuthorization(), |
| std::bind(&WriteHandle::validateParameters<InsertCommand>, this, _1), |
| std::bind(&WriteHandle::handleInsertCommand, this, _1, _2, _3, _4)); |
| |
| dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert check"), |
| makeAuthorization(), |
| std::bind(&WriteHandle::validateParameters<InsertCheckCommand>, this, _1), |
| std::bind(&WriteHandle::handleCheckCommand, this, _1, _2, _3, _4)); |
| } |
| |
| void |
| WriteHandle::deleteProcess(ProcessId processId) |
| { |
| m_processes.erase(processId); |
| } |
| |
| void |
| WriteHandle::handleInsertCommand(const Name& prefix, const Interest& interest, |
| const ndn::mgmt::ControlParameters& parameter, |
| const ndn::mgmt::CommandContinuation& done) |
| { |
| RepoCommandParameter* repoParameter = |
| dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(¶meter)); |
| |
| if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) { |
| processSegmentedInsertCommand(interest, *repoParameter, done); |
| } |
| else { |
| processSingleInsertCommand(interest, *repoParameter, done); |
| } |
| if (repoParameter->hasInterestLifetime()) |
| m_interestLifetime = repoParameter->getInterestLifetime(); |
| } |
| |
| void |
| WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId) |
| { |
| m_validator.validate(data, |
| std::bind(&WriteHandle::onDataValidated, this, interest, _1, processId), |
| [](const Data& data, const ValidationError& error){NDN_LOG_ERROR("Error: " << error);}); |
| } |
| |
| void |
| WriteHandle::onDataValidated(const Interest& interest, const Data& data, ProcessId processId) |
| { |
| if (m_processes.count(processId) == 0) { |
| return; |
| } |
| |
| ProcessInfo& process = m_processes[processId]; |
| RepoCommandResponse& response = process.response; |
| |
| if (response.getInsertNum() == 0) { |
| storageHandle.insertData(data); |
| response.setInsertNum(1); |
| } |
| |
| deferredDeleteProcess(processId); |
| } |
| |
| void |
| WriteHandle::onTimeout(const Interest& interest, ProcessId processId) |
| { |
| NDN_LOG_DEBUG("Timeout" << std::endl); |
| m_processes.erase(processId); |
| } |
| |
| void |
| WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter, |
| const ndn::mgmt::CommandContinuation& done) |
| { |
| ProcessId processId = ndn::random::generateWord64(); |
| |
| ProcessInfo& process = m_processes[processId]; |
| |
| RepoCommandResponse& response = process.response; |
| response.setCode(100); |
| response.setProcessId(processId); |
| response.setInsertNum(0); |
| response.setBody(response.wireEncode()); |
| done(response); |
| |
| response.setCode(300); |
| Interest fetchInterest(parameter.getName()); |
| fetchInterest.setCanBePrefix(m_canBePrefix); |
| fetchInterest.setInterestLifetime(m_interestLifetime); |
| face.expressInterest(fetchInterest, |
| std::bind(&WriteHandle::onData, this, _1, _2, processId), |
| std::bind(&WriteHandle::onTimeout, this, _1, processId), // Nack |
| std::bind(&WriteHandle::onTimeout, this, _1, processId)); |
| } |
| |
| void |
| WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter) |
| { |
| // use SegmentFetcher to send fetch interest. |
| ProcessInfo& process = m_processes[processId]; |
| Name name = parameter.getName(); |
| SegmentNo startBlockId = parameter.getStartBlockId(); |
| |
| uint64_t initialCredit = m_credit; |
| |
| if (parameter.hasEndBlockId()) { |
| initialCredit = |
| std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1); |
| } |
| else { |
| // set noEndTimeout timer |
| process.noEndTime = ndn::time::steady_clock::now() + |
| m_noEndTimeout; |
| } |
| |
| Name fetchName = name; |
| SegmentNo segment = startBlockId; |
| fetchName.appendSegment(segment); |
| Interest interest(fetchName); |
| |
| ndn::util::SegmentFetcher::Options options; |
| options.initCwnd = initialCredit; |
| options.interestLifetime = m_interestLifetime; |
| options.maxTimeout = m_maxTimeout; |
| auto fetcher = ndn::util::SegmentFetcher::start(face, interest, m_validator, options); |
| fetcher->onError.connect([] (uint32_t errorCode, const std::string& errorMsg) |
| {NDN_LOG_ERROR("Error: " << errorMsg);}); |
| fetcher->afterSegmentValidated.connect([this, &fetcher, &processId] (const Data& data) |
| {onSegmentData(*fetcher, data, processId);}); |
| fetcher->afterSegmentTimedOut.connect([this, &fetcher, &processId] () |
| {onSegmentTimeout(*fetcher, processId);}); |
| } |
| |
| void |
| WriteHandle::onSegmentData(ndn::util::SegmentFetcher& fetcher, const Data& data, ProcessId processId) |
| { |
| auto it = m_processes.find(processId); |
| if (it == m_processes.end()) { |
| fetcher.stop(); |
| return; |
| } |
| |
| RepoCommandResponse& response = it->second.response; |
| |
| //insert data |
| if (storageHandle.insertData(data)) { |
| response.setInsertNum(response.getInsertNum() + 1); |
| } |
| |
| ProcessInfo& process = m_processes[processId]; |
| //read whether notime timeout |
| if (!response.hasEndBlockId()) { |
| |
| ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime; |
| ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now(); |
| |
| if (now > noEndTime) { |
| NDN_LOG_DEBUG("noEndtimeout: " << processId); |
| //StatusCode should be refreshed as 405 |
| response.setCode(405); |
| //schedule a delete event |
| deferredDeleteProcess(processId); |
| fetcher.stop(); |
| return; |
| } |
| } |
| |
| //read whether this process has total ends, if ends, remove control info from the maps |
| if (response.hasEndBlockId()) { |
| uint64_t nSegments = response.getEndBlockId() - response.getStartBlockId() + 1; |
| if (response.getInsertNum() >= nSegments) { |
| //All the data has been inserted, StatusCode is refreshed as 200 |
| response.setCode(200); |
| deferredDeleteProcess(processId); |
| fetcher.stop(); |
| return; |
| } |
| } |
| } |
| |
| void |
| WriteHandle::onSegmentTimeout(ndn::util::SegmentFetcher& fetcher, ProcessId processId) |
| { |
| NDN_LOG_DEBUG("SegTimeout"); |
| if (m_processes.count(processId) == 0) { |
| fetcher.stop(); |
| return; |
| } |
| } |
| |
| void |
| WriteHandle::processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter, |
| const ndn::mgmt::CommandContinuation& done) |
| { |
| if (parameter.hasEndBlockId()) { |
| //normal fetch segment |
| SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0; |
| SegmentNo endBlockId = parameter.getEndBlockId(); |
| if (startBlockId > endBlockId) { |
| done(negativeReply("Malformed Command", 403)); |
| return; |
| } |
| |
| ProcessId processId = ndn::random::generateWord64(); |
| ProcessInfo& process = m_processes[processId]; |
| RepoCommandResponse& response = process.response; |
| response.setCode(100); |
| response.setProcessId(processId); |
| response.setInsertNum(0); |
| response.setStartBlockId(startBlockId); |
| response.setEndBlockId(endBlockId); |
| response.setBody(response.wireEncode()); |
| done(response); |
| |
| //300 means data fetching is in progress |
| response.setCode(300); |
| |
| segInit(processId, parameter); |
| } |
| else { |
| //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop |
| ProcessId processId = ndn::random::generateWord64(); |
| ProcessInfo& process = m_processes[processId]; |
| RepoCommandResponse& response = process.response; |
| response.setCode(100); |
| response.setProcessId(processId); |
| response.setInsertNum(0); |
| response.setStartBlockId(parameter.getStartBlockId()); |
| response.setBody(response.wireEncode()); |
| done(response); |
| |
| //300 means data fetching is in progress |
| response.setCode(300); |
| |
| segInit(processId, parameter); |
| } |
| } |
| |
| void |
| WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest, |
| const ndn::mgmt::ControlParameters& parameter, |
| const ndn::mgmt::CommandContinuation& done) |
| { |
| const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter); |
| |
| //check whether this process exists |
| ProcessId processId = repoParameter.getProcessId(); |
| if (m_processes.count(processId) == 0) { |
| NDN_LOG_DEBUG("no such processId: " << processId); |
| done(negativeReply("No such this process is in progress", 404)); |
| return; |
| } |
| |
| ProcessInfo& process = m_processes[processId]; |
| |
| RepoCommandResponse& response = process.response; |
| |
| //Check whether it is single data fetching |
| if (!response.hasStartBlockId() && !response.hasEndBlockId()) { |
| done(response); |
| return; |
| } |
| |
| //read if noEndtimeout |
| if (!response.hasEndBlockId()) { |
| extendNoEndTime(process); |
| done(response); |
| return; |
| } |
| else { |
| done(response); |
| } |
| } |
| |
| void |
| WriteHandle::deferredDeleteProcess(ProcessId processId) |
| { |
| scheduler.schedule(PROCESS_DELETE_TIME, [=] { deleteProcess(processId); }); |
| } |
| |
| void |
| WriteHandle::extendNoEndTime(ProcessInfo& process) |
| { |
| auto& noEndTime = process.noEndTime; |
| auto now = ndn::time::steady_clock::now(); |
| RepoCommandResponse& response = process.response; |
| if (now > noEndTime) { |
| response.setCode(405); |
| return; |
| } |
| |
| //extends noEndTime |
| process.noEndTime = ndn::time::steady_clock::now() + m_noEndTimeout; |
| } |
| |
| RepoCommandResponse |
| WriteHandle::negativeReply(std::string text, int statusCode) |
| { |
| RepoCommandResponse response(statusCode, text); |
| response.setBody(response.wireEncode()); |
| |
| return response; |
| } |
| |
| } // namespace repo |