| /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
| /* |
| * Copyright (c) 2014-2018, Regents of the University of California. |
| * |
| * This file is part of NDN repo-ng (Next generation of NDN repository). |
| * See AUTHORS.md for complete list of repo-ng authors and contributors. |
| * |
| * repo-ng is free software: you can redistribute it and/or modify it under the terms |
| * of the GNU General Public License as published by the Free Software Foundation, |
| * either version 3 of the License, or (at your option) any later version. |
| * |
| * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; |
| * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR |
| * PURPOSE. See the GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License along with |
| * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>. |
| */ |
| |
| #include "write-handle.hpp" |
| |
| namespace repo { |
| |
| static const int RETRY_TIMEOUT = 3; |
| static const int DEFAULT_CREDIT = 12; |
| static const milliseconds NOEND_TIMEOUT(10000); |
| static const milliseconds PROCESS_DELETE_TIME(10000); |
| static const milliseconds DEFAULT_INTEREST_LIFETIME(4000); |
| |
| WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain, |
| Scheduler& scheduler, |
| Validator& validator) |
| : BaseHandle(face, storageHandle, keyChain, scheduler) |
| , m_validator(validator) |
| , m_retryTime(RETRY_TIMEOUT) |
| , m_credit(DEFAULT_CREDIT) |
| , m_noEndTimeout(NOEND_TIMEOUT) |
| , m_interestLifetime(DEFAULT_INTEREST_LIFETIME) |
| { |
| } |
| |
| void |
| WriteHandle::deleteProcess(ProcessId processId) |
| { |
| m_processes.erase(processId); |
| } |
| |
| // Interest. |
| void |
| WriteHandle::onInterest(const Name& prefix, const Interest& interest) |
| { |
| m_validator.validate(interest, |
| bind(&WriteHandle::onValidated, this, _1, prefix), |
| bind(&WriteHandle::onValidationFailed, this, _1, _2)); |
| } |
| |
| void |
| WriteHandle::onValidated(const Interest& interest, const Name& prefix) |
| { |
| RepoCommandParameter parameter; |
| try { |
| extractParameter(interest, prefix, parameter); |
| } |
| catch (const RepoCommandParameter::Error&) { |
| negativeReply(interest, 403); |
| return; |
| } |
| |
| if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) { |
| if (parameter.hasSelectors()) { |
| negativeReply(interest, 402); |
| return; |
| } |
| processSegmentedInsertCommand(interest, parameter); |
| } |
| else { |
| processSingleInsertCommand(interest, parameter); |
| } |
| if (parameter.hasInterestLifetime()) |
| m_interestLifetime = parameter.getInterestLifetime(); |
| } |
| |
| void |
| WriteHandle::onValidationFailed(const Interest& interest, const ValidationError& error) |
| { |
| std::cerr << error << std::endl; |
| negativeReply(interest, 401); |
| } |
| |
| void |
| WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId) |
| { |
| m_validator.validate(data, |
| bind(&WriteHandle::onDataValidated, this, interest, _1, processId), |
| bind(&WriteHandle::onDataValidationFailed, this, _1, _2)); |
| } |
| |
| void |
| WriteHandle::onDataValidated(const Interest& interest, const Data& data, ProcessId processId) |
| { |
| if (m_processes.count(processId) == 0) { |
| return; |
| } |
| |
| ProcessInfo& process = m_processes[processId]; |
| RepoCommandResponse& response = process.response; |
| |
| if (response.getInsertNum() == 0) { |
| getStorageHandle().insertData(data); |
| // getStorageHandle().insertEntry(data); |
| // getStoreIndex().insert(data); |
| response.setInsertNum(1); |
| } |
| |
| deferredDeleteProcess(processId); |
| } |
| |
| void |
| WriteHandle::onDataValidationFailed(const Data& data, const ValidationError& error) |
| { |
| std::cerr << error << std::endl; |
| } |
| |
| void |
| WriteHandle::onSegmentData(const Interest& interest, const Data& data, ProcessId processId) |
| { |
| m_validator.validate(data, |
| bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId), |
| bind(&WriteHandle::onDataValidationFailed, this, _1, _2)); |
| } |
| |
| void |
| WriteHandle::onSegmentDataValidated(const Interest& interest, const Data& data, ProcessId processId) |
| { |
| auto it = m_processes.find(processId); |
| if (it == m_processes.end()) { |
| return; |
| } |
| RepoCommandResponse& response = it->second.response; |
| |
| //refresh endBlockId |
| auto finalBlock = data.getFinalBlock(); |
| if (finalBlock && finalBlock->isSegment()) { |
| auto finalSeg = finalBlock->toSegment(); |
| if (response.hasEndBlockId()) { |
| if (finalSeg < response.getEndBlockId()) { |
| response.setEndBlockId(finalSeg); |
| } |
| } |
| else { |
| response.setEndBlockId(finalSeg); |
| } |
| } |
| |
| //insert data |
| if (getStorageHandle().insertData(data)) { |
| response.setInsertNum(response.getInsertNum() + 1); |
| } |
| |
| onSegmentDataControl(processId, interest); |
| } |
| |
| void |
| WriteHandle::onTimeout(const Interest& interest, ProcessId processId) |
| { |
| std::cerr << "Timeout" << std::endl; |
| m_processes.erase(processId); |
| } |
| |
| void |
| WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId) |
| { |
| std::cerr << "SegTimeout" << std::endl; |
| |
| onSegmentTimeoutControl(processId, interest); |
| } |
| |
| void |
| WriteHandle::listen(const Name& prefix) |
| { |
| getFace().setInterestFilter(Name(prefix).append("insert"), |
| bind(&WriteHandle::onInterest, this, _1, _2)); |
| getFace().setInterestFilter(Name(prefix).append("insert check"), |
| bind(&WriteHandle::onCheckInterest, this, _1, _2)); |
| } |
| |
| void |
| WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter) |
| { |
| ProcessInfo& process = m_processes[processId]; |
| process.credit = 0; |
| |
| map<SegmentNo, int>& processRetry = process.retryCounts; |
| |
| Name name = parameter.getName(); |
| SegmentNo startBlockId = parameter.getStartBlockId(); |
| |
| uint64_t initialCredit = m_credit; |
| |
| if (parameter.hasEndBlockId()) { |
| initialCredit = |
| std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1); |
| } |
| else { |
| // set noEndTimeout timer |
| process.noEndTime = ndn::time::steady_clock::now() + |
| m_noEndTimeout; |
| } |
| process.credit = initialCredit; |
| SegmentNo segment = startBlockId; |
| |
| for (; segment < startBlockId + initialCredit; ++segment) { |
| Name fetchName = name; |
| fetchName.appendSegment(segment); |
| Interest interest(fetchName); |
| interest.setInterestLifetime(m_interestLifetime); |
| getFace().expressInterest(interest, |
| bind(&WriteHandle::onSegmentData, this, _1, _2, processId), |
| bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack |
| bind(&WriteHandle::onSegmentTimeout, this, _1, processId)); |
| process.credit--; |
| processRetry[segment] = 0; |
| } |
| |
| queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue; |
| |
| process.nextSegment = segment; |
| nextSegmentQueue.push(segment); |
| } |
| |
| void |
| WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest) |
| { |
| if (m_processes.count(processId) == 0) { |
| return; |
| } |
| ProcessInfo& process = m_processes[processId]; |
| RepoCommandResponse& response = process.response; |
| int& processCredit = process.credit; |
| //onSegmentDataControl is called when a data returns. |
| //When data returns, processCredit++ |
| processCredit++; |
| SegmentNo& nextSegment = process.nextSegment; |
| queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue; |
| map<SegmentNo, int>& retryCounts = process.retryCounts; |
| |
| //read whether notime timeout |
| if (!response.hasEndBlockId()) { |
| |
| ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime; |
| ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now(); |
| |
| if (now > noEndTime) { |
| std::cerr << "noEndtimeout: " << processId << std::endl; |
| //m_processes.erase(processId); |
| //StatusCode should be refreshed as 405 |
| response.setStatusCode(405); |
| //schedule a delete event |
| deferredDeleteProcess(processId); |
| return; |
| } |
| } |
| |
| //read whether this process has total ends, if ends, remove control info from the maps |
| if (response.hasEndBlockId()) { |
| uint64_t nSegments = |
| response.getEndBlockId() - response.getStartBlockId() + 1; |
| if (response.getInsertNum() >= nSegments) { |
| //m_processes.erase(processId); |
| //All the data has been inserted, StatusCode is refreshed as 200 |
| response.setStatusCode(200); |
| deferredDeleteProcess(processId); |
| return; |
| } |
| } |
| |
| //check whether there is any credit |
| if (processCredit == 0) |
| return; |
| |
| |
| //check whether sent queue empty |
| if (nextSegmentQueue.empty()) { |
| //do not do anything |
| return; |
| } |
| |
| //pop the queue |
| SegmentNo sendingSegment = nextSegmentQueue.front(); |
| nextSegmentQueue.pop(); |
| |
| //check whether sendingSegment exceeds |
| if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) { |
| //do not do anything |
| return; |
| } |
| |
| //read whether this is retransmitted data; |
| SegmentNo fetchedSegment = |
| interest.getName().get(interest.getName().size() - 1).toSegment(); |
| |
| BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0); |
| |
| //find this fetched data, remove it from this map |
| //rit->second.erase(oit); |
| retryCounts.erase(fetchedSegment); |
| //express the interest of the top of the queue |
| Name fetchName(interest.getName().getPrefix(-1)); |
| fetchName.appendSegment(sendingSegment); |
| Interest fetchInterest(fetchName); |
| fetchInterest.setInterestLifetime(m_interestLifetime); |
| getFace().expressInterest(fetchInterest, |
| bind(&WriteHandle::onSegmentData, this, _1, _2, processId), |
| bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack |
| bind(&WriteHandle::onSegmentTimeout, this, _1, processId)); |
| //When an interest is expressed, processCredit-- |
| processCredit--; |
| if (retryCounts.count(sendingSegment) == 0) { |
| //not found |
| retryCounts[sendingSegment] = 0; |
| } |
| else { |
| //found |
| retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1; |
| } |
| //increase the next seg and put it into the queue |
| if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) { |
| nextSegment++; |
| nextSegmentQueue.push(nextSegment); |
| } |
| } |
| |
| void |
| WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest) |
| { |
| if (m_processes.count(processId) == 0) { |
| return; |
| } |
| ProcessInfo& process = m_processes[processId]; |
| // RepoCommandResponse& response = process.response; |
| // SegmentNo& nextSegment = process.nextSegment; |
| // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue; |
| map<SegmentNo, int>& retryCounts = process.retryCounts; |
| |
| SegmentNo timeoutSegment = interest.getName().get(-1).toSegment(); |
| |
| std::cerr << "timeoutSegment: " << timeoutSegment << std::endl; |
| |
| BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0); |
| |
| //read the retry time. If retry out of time, fail the process. if not, plus |
| int& retryTime = retryCounts[timeoutSegment]; |
| if (retryTime >= m_retryTime) { |
| //fail this process |
| std::cerr << "Retry timeout: " << processId << std::endl; |
| m_processes.erase(processId); |
| return; |
| } |
| else { |
| //Reput it in the queue, retryTime++ |
| retryTime++; |
| Interest retryInterest(interest.getName()); |
| retryInterest.setInterestLifetime(m_interestLifetime); |
| getFace().expressInterest(retryInterest, |
| bind(&WriteHandle::onSegmentData, this, _1, _2, processId), |
| bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack |
| bind(&WriteHandle::onSegmentTimeout, this, _1, processId)); |
| } |
| |
| } |
| |
| void |
| WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest) |
| { |
| m_validator.validate(interest, |
| bind(&WriteHandle::onCheckValidated, this, _1, prefix), |
| bind(&WriteHandle::onCheckValidationFailed, this, _1, _2)); |
| |
| } |
| |
| void |
| WriteHandle::onCheckValidated(const Interest& interest, const Name& prefix) |
| { |
| RepoCommandParameter parameter; |
| try { |
| extractParameter(interest, prefix, parameter); |
| } |
| catch (const RepoCommandParameter::Error&) { |
| negativeReply(interest, 403); |
| return; |
| } |
| |
| if (!parameter.hasProcessId()) { |
| negativeReply(interest, 403); |
| return; |
| } |
| //check whether this process exists |
| ProcessId processId = parameter.getProcessId(); |
| if (m_processes.count(processId) == 0) { |
| std::cerr << "no such processId: " << processId << std::endl; |
| negativeReply(interest, 404); |
| return; |
| } |
| |
| ProcessInfo& process = m_processes[processId]; |
| |
| RepoCommandResponse& response = process.response; |
| |
| //Check whether it is single data fetching |
| if (!response.hasStartBlockId() && |
| !response.hasEndBlockId()) { |
| reply(interest, response); |
| return; |
| } |
| |
| //read if noEndtimeout |
| if (!response.hasEndBlockId()) { |
| extendNoEndTime(process); |
| reply(interest, response); |
| return; |
| } |
| else { |
| reply(interest, response); |
| } |
| } |
| |
| void |
| WriteHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error) |
| { |
| std::cerr << error << std::endl; |
| negativeReply(interest, 401); |
| } |
| |
| void |
| WriteHandle::deferredDeleteProcess(ProcessId processId) |
| { |
| getScheduler().scheduleEvent(PROCESS_DELETE_TIME, |
| bind(&WriteHandle::deleteProcess, this, processId)); |
| } |
| |
| void |
| WriteHandle::processSingleInsertCommand(const Interest& interest, |
| RepoCommandParameter& parameter) |
| { |
| ProcessId processId = generateProcessId(); |
| |
| ProcessInfo& process = m_processes[processId]; |
| |
| RepoCommandResponse& response = process.response; |
| response.setStatusCode(100); |
| response.setProcessId(processId); |
| response.setInsertNum(0); |
| |
| reply(interest, response); |
| |
| response.setStatusCode(300); |
| |
| Interest fetchInterest(parameter.getName()); |
| fetchInterest.setInterestLifetime(m_interestLifetime); |
| if (parameter.hasSelectors()) { |
| fetchInterest.setSelectors(parameter.getSelectors()); |
| } |
| getFace().expressInterest(fetchInterest, |
| bind(&WriteHandle::onData, this, _1, _2, processId), |
| bind(&WriteHandle::onTimeout, this, _1, processId), // Nack |
| bind(&WriteHandle::onTimeout, this, _1, processId)); |
| } |
| |
| void |
| WriteHandle::processSegmentedInsertCommand(const Interest& interest, |
| RepoCommandParameter& parameter) |
| { |
| if (parameter.hasEndBlockId()) { |
| //normal fetch segment |
| if (!parameter.hasStartBlockId()) { |
| parameter.setStartBlockId(0); |
| } |
| |
| SegmentNo startBlockId = parameter.getStartBlockId(); |
| SegmentNo endBlockId = parameter.getEndBlockId(); |
| if (startBlockId > endBlockId) { |
| negativeReply(interest, 403); |
| return; |
| } |
| |
| ProcessId processId = generateProcessId(); |
| ProcessInfo& process = m_processes[processId]; |
| RepoCommandResponse& response = process.response; |
| response.setStatusCode(100); |
| response.setProcessId(processId); |
| response.setInsertNum(0); |
| response.setStartBlockId(startBlockId); |
| response.setEndBlockId(endBlockId); |
| |
| reply(interest, response); |
| |
| //300 means data fetching is in progress |
| response.setStatusCode(300); |
| |
| segInit(processId, parameter); |
| } |
| else { |
| //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop |
| ProcessId processId = generateProcessId(); |
| ProcessInfo& process = m_processes[processId]; |
| RepoCommandResponse& response = process.response; |
| response.setStatusCode(100); |
| response.setProcessId(processId); |
| response.setInsertNum(0); |
| response.setStartBlockId(parameter.getStartBlockId()); |
| reply(interest, response); |
| |
| //300 means data fetching is in progress |
| response.setStatusCode(300); |
| |
| segInit(processId, parameter); |
| } |
| } |
| |
| void |
| WriteHandle::extendNoEndTime(ProcessInfo& process) |
| { |
| ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime; |
| ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now(); |
| RepoCommandResponse& response = process.response; |
| if (now > noEndTime) { |
| response.setStatusCode(405); |
| return; |
| } |
| //extends noEndTime |
| process.noEndTime = |
| ndn::time::steady_clock::now() + m_noEndTimeout; |
| |
| } |
| |
| void |
| WriteHandle::negativeReply(const Interest& interest, int statusCode) |
| { |
| RepoCommandResponse response; |
| response.setStatusCode(statusCode); |
| reply(interest, response); |
| } |
| |
| } // namespace repo |