blob: cae79a2e6c4fa70622b4aaa8d3cf9aef5d994592 [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -08003 * Copyright (c) 2013-2017, Regents of the University of California.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080019 */
20
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080021#include "dispatcher.hpp"
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080022#include "fetch-task-db.hpp"
Lijing Wang8e56d082016-12-25 14:45:23 -080023#include "core/logging.hpp"
24
25#include <ndn-cxx/util/digest.hpp>
26#include <ndn-cxx/util/string-helper.hpp>
Alexander Afanasyevf9978f82013-01-23 16:30:31 -080027
Alexander Afanasyevf9978f82013-01-23 16:30:31 -080028#include <boost/lexical_cast.hpp>
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080029
Lijing Wang8e56d082016-12-25 14:45:23 -080030namespace ndn {
31namespace chronoshare {
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080032
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -080033_LOG_INIT(Dispatcher);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080034
Lijing Wang8e56d082016-12-25 14:45:23 -080035namespace fs = boost::filesystem;
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -080036
Lijing Wang8e56d082016-12-25 14:45:23 -080037static const name::Component CHRONOSHARE_APP = name::Component("chronoshare");
38static const Name BROADCAST_DOMAIN = "/ndn/multicast";
39
40static const time::seconds DEFAULT_SYNC_INTEREST_INTERVAL = time::seconds(10);
Yukai Tu729b2892017-02-09 21:15:48 -080041static const time::seconds DEFAULT_AUTO_DISCOVERY_INTERVAL = time::seconds(60);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -080042
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080043Dispatcher::Dispatcher(const std::string& localUserName, const std::string& sharedFolder,
Yukai Tu729b2892017-02-09 21:15:48 -080044 const fs::path& rootDir, Face& face)
Lijing Wang8e56d082016-12-25 14:45:23 -080045 : m_face(face)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080046 , m_rootDir(rootDir)
Lijing Wang8e56d082016-12-25 14:45:23 -080047 , m_ioService(face.getIoService())
Yukai Tu729b2892017-02-09 21:15:48 -080048 , m_scheduler(m_ioService)
49 , m_autoDiscovery(m_scheduler)
Lijing Wang8e56d082016-12-25 14:45:23 -080050 , m_objectManager(face, m_keyChain, rootDir, CHRONOSHARE_APP.toUri())
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080051 , m_localUserName(localUserName)
52 , m_sharedFolder(sharedFolder)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080053{
Zhenkai Zhu3290b8e2013-01-24 15:25:48 -080054 m_syncLog = make_shared<SyncLog>(m_rootDir, localUserName);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080055 m_actionLog =
Lijing Wang8e56d082016-12-25 14:45:23 -080056 make_shared<ActionLog>(m_face, m_rootDir, m_syncLog, sharedFolder, CHRONOSHARE_APP,
57 // bind(&Dispatcher::Did_ActionLog_ActionApply_AddOrModify, this, _1, _2, _3, _4, _5, _6, _7),
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080058 ActionLog::OnFileAddedOrChangedCallback(), // don't really need this callback
59 bind(&Dispatcher::Did_ActionLog_ActionApply_Delete, this, _1));
60 m_fileState = m_actionLog->GetFileState();
Alexander Afanasyevd6364ef2013-02-06 13:13:07 -080061
Lijing Wang8e56d082016-12-25 14:45:23 -080062 Name syncPrefix = Name(BROADCAST_DOMAIN);
63 syncPrefix.append(CHRONOSHARE_APP);
64 syncPrefix.append(sharedFolder);
Alexander Afanasyevcbda9922013-01-22 11:21:12 -080065
Lijing Wang8e56d082016-12-25 14:45:23 -080066 m_server = make_unique<ContentServer>(m_face, m_actionLog, rootDir, m_localUserName,
67 m_sharedFolder, CHRONOSHARE_APP, m_keyChain);
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -080068 m_server->registerPrefix(Name("/"));
Zhenkai Zhu3f64d2d2013-01-25 14:34:59 -080069 m_server->registerPrefix(Name(BROADCAST_DOMAIN));
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080070
Lijing Wangb95c6a52016-12-25 14:45:17 -080071 m_stateServer = make_unique<StateServer>(m_face, m_actionLog, rootDir, m_localUserName, m_sharedFolder,
72 CHRONOSHARE_APP, m_objectManager, m_keyChain);
Alexander Afanasyev026eaf32013-02-23 16:37:14 -080073
Lijing Wang8e56d082016-12-25 14:45:23 -080074 m_core = make_unique<SyncCore>(m_face, m_syncLog, localUserName, Name("/"), syncPrefix,
75 bind(&Dispatcher::Did_SyncLog_StateChange, this, _1),
76 DEFAULT_SYNC_INTEREST_INTERVAL);
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080077
Zhenkai Zhuda686882013-01-29 22:32:24 -080078 FetchTaskDbPtr actionTaskDb = make_shared<FetchTaskDb>(m_rootDir, "action");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080079 m_actionFetcher =
Lijing Wang8e56d082016-12-25 14:45:23 -080080 make_shared<FetchManager>(m_face, bind(&SyncLog::LookupLocator, &*m_syncLog, _1),
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080081 Name(BROADCAST_DOMAIN), // no appname suffix now
82 3,
83 bind(&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4),
84 FetchManager::FinishCallback(), actionTaskDb);
Zhenkai Zhub8c49af2013-01-29 16:03:56 -080085
Zhenkai Zhuda686882013-01-29 22:32:24 -080086 FetchTaskDbPtr fileTaskDb = make_shared<FetchTaskDb>(m_rootDir, "file");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080087 m_fileFetcher =
Lijing Wang8e56d082016-12-25 14:45:23 -080088 make_shared<FetchManager>(m_face, bind(&SyncLog::LookupLocator, &*m_syncLog, _1),
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080089 Name(BROADCAST_DOMAIN), // no appname suffix now
90 3, bind(&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2,
91 _3, _4),
92 bind(&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2),
93 fileTaskDb);
Zhenkai Zhub8c49af2013-01-29 16:03:56 -080094
Yukai Tu729b2892017-02-09 21:15:48 -080095 _LOG_DEBUG("registering prefix discovery in Dispatcher");
96 m_autoDiscovery = m_scheduler.scheduleEvent(DEFAULT_AUTO_DISCOVERY_INTERVAL,
97 bind(&Dispatcher::DiscoverPrefix, this));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080098}
99
100Dispatcher::~Dispatcher()
101{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800102 _LOG_DEBUG("Enter destructor of dispatcher");
Yukai Tu729b2892017-02-09 21:15:48 -0800103}
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800104
Yukai Tu729b2892017-02-09 21:15:48 -0800105void
106Dispatcher::DiscoverPrefix()
107{
108 _LOG_DEBUG("Query for local prefix");
109 Interest interest(Name("/localhop/nfd/rib/routable-prefixes"));
110 m_face.expressInterest(interest,
111 bind(&Dispatcher::Handle_Prefix_Discovery, this, _1, _2),
112 bind([this] {m_autoDiscovery = m_scheduler.scheduleEvent(DEFAULT_AUTO_DISCOVERY_INTERVAL,
113 bind(&Dispatcher::DiscoverPrefix, this));
114 _LOG_DEBUG("query routable-prefixes, nack received"); }),
115 bind([this] {m_autoDiscovery = m_scheduler.scheduleEvent(DEFAULT_AUTO_DISCOVERY_INTERVAL,
116 bind(&Dispatcher::DiscoverPrefix, this));
117 _LOG_DEBUG("query routable-prefixes interest timeout"); }));
118}
119
120void
121Dispatcher::Handle_Prefix_Discovery(const Interest& interest, const Data& data)
122{
123 const Block& content = data.getContent();
124 content.parse();
125
126 Name prefix;
127
128 for (auto& element : content.elements()) {
129 prefix = Name(element);
130 break;
Alexander Afanasyev026eaf32013-02-23 16:37:14 -0800131 }
Yukai Tu729b2892017-02-09 21:15:48 -0800132
133 _LOG_DEBUG("local prefix:" << prefix << prefix.empty());
134 if(prefix.empty()) {
135 _LOG_DEBUG("Local prefix is empty");
136 return;
137 }
138
139 _LOG_DEBUG("local prefix:" << prefix << "discovered");
140 Did_LocalPrefix_Updated(prefix);
141
142 m_autoDiscovery = m_scheduler.scheduleEvent(DEFAULT_AUTO_DISCOVERY_INTERVAL,
143 bind(&Dispatcher::DiscoverPrefix, this));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800144}
145
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800146void
Lijing Wang8e56d082016-12-25 14:45:23 -0800147Dispatcher::Did_LocalPrefix_Updated(const Name& forwardingHint)
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800148{
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800149 Name effectiveForwardingHint;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800150 if (m_localUserName.size() >= forwardingHint.size() &&
Lijing Wang8e56d082016-12-25 14:45:23 -0800151 m_localUserName.getSubName(0, forwardingHint.size()) == forwardingHint) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800152 effectiveForwardingHint = Name("/"); // "directly" accesible
153 }
154 else {
155 effectiveForwardingHint = forwardingHint;
156 }
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800157
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800158 Name oldLocalPrefix = m_syncLog->LookupLocalLocator();
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800159
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800160 if (oldLocalPrefix == effectiveForwardingHint) {
161 _LOG_DEBUG(
162 "Got notification about prefix change from " << oldLocalPrefix << " to: " << forwardingHint
163 << ", but effective prefix didn't change");
164 return;
165 }
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800166
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800167 if (effectiveForwardingHint == Name("/") || effectiveForwardingHint == Name(BROADCAST_DOMAIN)) {
168 _LOG_DEBUG("Basic effective prefix [" << effectiveForwardingHint
169 << "]. Updating local prefix, but don't reregister");
170 m_syncLog->UpdateLocalLocator(effectiveForwardingHint);
171 return;
172 }
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800173
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800174 _LOG_DEBUG("LocalPrefix changed from: " << oldLocalPrefix << " to: " << effectiveForwardingHint);
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800175
176 m_server->registerPrefix(effectiveForwardingHint);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800177 m_syncLog->UpdateLocalLocator(effectiveForwardingHint);
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800178
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800179 if (oldLocalPrefix == Name("/") || oldLocalPrefix == Name(BROADCAST_DOMAIN)) {
180 _LOG_DEBUG("Don't deregister basic prefix: " << oldLocalPrefix);
181 return;
182 }
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800183 m_server->deregisterPrefix(oldLocalPrefix);
184}
185
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800186/////////////////////////////////////////////////////////////////////////////////////////////////////
187/////////////////////////////////////////////////////////////////////////////////////////////////////
188/////////////////////////////////////////////////////////////////////////////////////////////////////
189
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800190void
Lijing Wang8e56d082016-12-25 14:45:23 -0800191Dispatcher::Did_LocalFile_AddOrModify(const fs::path& relativeFilePath)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800192{
Lijing Wang8e56d082016-12-25 14:45:23 -0800193 m_ioService.post(bind(&Dispatcher::Did_LocalFile_AddOrModify_Execute, this, relativeFilePath));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800194}
195
196void
Lijing Wang8e56d082016-12-25 14:45:23 -0800197Dispatcher::Did_LocalFile_AddOrModify_Execute(fs::path relativeFilePath)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800198{
Zhenkai Zhufaee2d42013-01-24 17:47:13 -0800199 _LOG_DEBUG(m_localUserName << " calls LocalFile_AddOrModify_Execute");
Lijing Wang8e56d082016-12-25 14:45:23 -0800200 fs::path absolutePath = m_rootDir / relativeFilePath;
201 _LOG_DEBUG("relativeFilePath : " << relativeFilePath);
202 _LOG_DEBUG("absolutePath : " << absolutePath);
203 if (!fs::exists(absolutePath)) {
204 // BOOST_THROW_EXCEPTION(Error::Dispatcher() << error_info_str("Update non exist file: " +
205 // absolutePath.string() ));
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800206 _LOG_DEBUG("Update non exist file: " << absolutePath.string());
207 return;
208 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800209
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800210 FileItemPtr currentFile = m_fileState->LookupFile(relativeFilePath.generic_string());
Lijing Wang8e56d082016-12-25 14:45:23 -0800211
212 if (currentFile) {
213 fs::ifstream input(absolutePath);
214 if (*util::Sha256(input).computeDigest() ==
215 Buffer(currentFile->file_hash().c_str(), currentFile->file_hash().size())
216 // The following two are commented out to prevent front end from reporting intermediate files
217 // should enable it if there is other way to prevent this
218 // && last_write_time(absolutePath) == currentFile->mtime()
219 // && status(absolutePath).permissions() ==
220 // static_cast<fs::perms>(currentFile->mode())
221 ) {
222 _LOG_ERROR("Got notification about the same file [" << relativeFilePath << "]");
223 return;
224 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800225 }
226
227 if (currentFile && !currentFile->is_complete()) {
228 _LOG_ERROR("Got notification about incomplete file [" << relativeFilePath << "]");
229 return;
230 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800231
232 int seg_num;
Lijing Wang8e56d082016-12-25 14:45:23 -0800233 ConstBufferPtr hash;
234 _LOG_DEBUG("absolutePath: " << absolutePath << " m_localUserName: " << m_localUserName);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800235 tie(hash, seg_num) = m_objectManager.localFileToObjects(absolutePath, m_localUserName);
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800236
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800237 try {
Lijing Wang8e56d082016-12-25 14:45:23 -0800238 m_actionLog->AddLocalActionUpdate(relativeFilePath.generic_string(), *hash,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800239 last_write_time(absolutePath),
Alexander Afanasyevf8ff5e12013-07-11 13:57:32 -0700240#if BOOST_VERSION >= 104900
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800241 status(absolutePath).permissions(),
Alexander Afanasyevf8ff5e12013-07-11 13:57:32 -0700242#else
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800243 0,
Alexander Afanasyevf8ff5e12013-07-11 13:57:32 -0700244#endif
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800245 seg_num);
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800246
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800247 // notify SyncCore to propagate the change
248 m_core->localStateChangedDelayed();
249 }
Yukai Tu729b2892017-02-09 21:15:48 -0800250 catch (const fs::filesystem_error& error) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800251 _LOG_ERROR("File operations failed on [" << relativeFilePath << "](ignoring)");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800252 }
Lijing Wang8e56d082016-12-25 14:45:23 -0800253
254 _LOG_DEBUG("LocalFile_AddOrModify_Execute Finished!");
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800255}
256
257void
Lijing Wang8e56d082016-12-25 14:45:23 -0800258Dispatcher::Did_LocalFile_Delete(const fs::path& relativeFilePath)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800259{
Lijing Wang8e56d082016-12-25 14:45:23 -0800260 m_ioService.post(bind(&Dispatcher::Did_LocalFile_Delete_Execute, this, relativeFilePath));
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800261}
262
263void
Lijing Wang8e56d082016-12-25 14:45:23 -0800264Dispatcher::Did_LocalFile_Delete_Execute(fs::path relativeFilePath)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800265{
Lijing Wang8e56d082016-12-25 14:45:23 -0800266 fs::path absolutePath = m_rootDir / relativeFilePath;
267 if (fs::exists(absolutePath)) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800268 _LOG_ERROR("DELETE command, but file still exists: " << absolutePath.string());
269 return;
270 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800271
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800272 m_actionLog->AddLocalActionDelete(relativeFilePath.generic_string());
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800273 // notify SyncCore to propagate the change
Alexander Afanasyeva2fabcf2013-02-05 11:26:37 -0800274 m_core->localStateChangedDelayed();
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800275}
276
277/////////////////////////////////////////////////////////////////////////////////////////////////////
278/////////////////////////////////////////////////////////////////////////////////////////////////////
279/////////////////////////////////////////////////////////////////////////////////////////////////////
280
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800281void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800282Dispatcher::Did_SyncLog_StateChange(SyncStateMsgPtr stateMsg)
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800283{
Lijing Wang8e56d082016-12-25 14:45:23 -0800284 m_ioService.post(bind(&Dispatcher::Did_SyncLog_StateChange_Execute, this, stateMsg));
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800285}
286
287void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800288Dispatcher::Did_SyncLog_StateChange_Execute(SyncStateMsgPtr stateMsg)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800289{
290 int size = stateMsg->state_size();
291 int index = 0;
292 // iterate and fetch the actions
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800293 for (; index < size; index++) {
294 SyncState state = stateMsg->state(index);
295 if (state.has_old_seq() && state.has_seq()) {
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800296 uint64_t oldSeq = state.old_seq();
297 uint64_t newSeq = state.seq();
Lijing Wang8e56d082016-12-25 14:45:23 -0800298 Name userName(Block((const unsigned char*)state.name().c_str(), state.name().size()));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800299
Lijing Wang8e56d082016-12-25 14:45:23 -0800300 // fetch actions with oldSeq + 1 to newSeq(inclusive)
301 Name actionNameBase = Name("/");
302 actionNameBase.append(userName).append(CHRONOSHARE_APP).append("action").append(m_sharedFolder);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800303 m_actionFetcher->Enqueue(userName, actionNameBase, std::max<uint64_t>(oldSeq + 1, 1), newSeq,
304 FetchManager::PRIORITY_HIGH);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800305 }
306 }
307}
308
309void
Lijing Wang8e56d082016-12-25 14:45:23 -0800310Dispatcher::Did_FetchManager_ActionFetch(const Name& deviceName, const Name& actionBaseName,
311 uint32_t seqno, shared_ptr<Data> actionData)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800312{
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800313 /// @todo Errors and exception checking
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800314 _LOG_DEBUG("Received action deviceName: " << deviceName << ", actionBaseName: " << actionBaseName
315 << ", seqno: "
316 << seqno);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800317
Lijing Wang8e56d082016-12-25 14:45:23 -0800318 ActionItemPtr action = m_actionLog->AddRemoteAction(deviceName, seqno, actionData);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800319 if (!action) {
320 _LOG_ERROR("AddRemoteAction did not insert action, ignoring");
321 return;
322 }
Lijing Wang8e56d082016-12-25 14:45:23 -0800323 // trigger may invoke Did_ActionLog_ActionApply_Delete or Did_ActionLog_ActionApply_AddOrModify
324 // callbacks
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800325
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800326 if (action->action() == ActionItem::UPDATE) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800327 ConstBufferPtr hash =
328 make_shared<Buffer>(action->file_hash().c_str(), action->file_hash().size());
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800329
Lijing Wang8e56d082016-12-25 14:45:23 -0800330 Name fileNameBase = Name("/");
331 fileNameBase.append(deviceName).append(CHRONOSHARE_APP).append("file");
332 // fileNameBase.append(name::Component(hash));
333 fileNameBase.appendImplicitSha256Digest(hash);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800334
Lijing Wang8e56d082016-12-25 14:45:23 -0800335 std::string hashStr = toHex(*hash);
336 if (ObjectDb::doesExist(m_rootDir / ".chronoshare", deviceName, hashStr)) {
337 _LOG_DEBUG("File already exists in the database. No need to refetch, just directly applying "
338 "the action");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800339 Did_FetchManager_FileFetchComplete(deviceName, fileNameBase);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800340 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800341 else {
Lijing Wang8e56d082016-12-25 14:45:23 -0800342 if (m_objectDbMap.find(*hash) == m_objectDbMap.end()) {
343 _LOG_DEBUG("create ObjectDb for " << toHex(*hash));
344 m_objectDbMap[*hash] = make_shared<ObjectDb>(m_rootDir / ".chronoshare", hashStr);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800345 }
346
347 m_fileFetcher->Enqueue(deviceName, fileNameBase, 0, action->seg_num() - 1,
348 FetchManager::PRIORITY_NORMAL);
349 }
350 }
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800351}
352
353void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800354Dispatcher::Did_ActionLog_ActionApply_Delete(const std::string& filename)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800355{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800356 _LOG_DEBUG("Action to delete " << filename);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800357
Lijing Wang8e56d082016-12-25 14:45:23 -0800358 fs::path absolutePath = m_rootDir / filename;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800359 try {
Lijing Wang8e56d082016-12-25 14:45:23 -0800360 if (fs::exists(absolutePath)) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800361 // need some protection from local detection of removal
362 remove(absolutePath);
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800363
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800364 // hack to remove empty parent dirs
Lijing Wang8e56d082016-12-25 14:45:23 -0800365 fs::path parentPath = absolutePath.parent_path();
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800366 while (parentPath > m_rootDir) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800367 if (fs::is_empty(parentPath)) {
368 fs::remove(parentPath);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800369 parentPath = parentPath.parent_path();
370 }
371 else {
372 break;
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800373 }
374 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800375 }
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800376 // don't exist
377 }
Yukai Tu729b2892017-02-09 21:15:48 -0800378 catch (const fs::filesystem_error& error) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800379 _LOG_ERROR("File operations failed when removing [" << absolutePath << "](ignoring)");
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800380 }
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800381}
382
383void
Lijing Wang8e56d082016-12-25 14:45:23 -0800384Dispatcher::Did_FetchManager_FileSegmentFetch(const Name& deviceName,
385 const Name& fileSegmentBaseName,
386 uint32_t segment,
387 shared_ptr<Data>
388 fileSegmentData)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800389{
Lijing Wang8e56d082016-12-25 14:45:23 -0800390 m_ioService.post(bind(&Dispatcher::Did_FetchManager_FileSegmentFetch_Execute, this, deviceName,
391 fileSegmentBaseName, segment, fileSegmentData));
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800392}
393
394void
Lijing Wang8e56d082016-12-25 14:45:23 -0800395Dispatcher::Did_FetchManager_FileSegmentFetch_Execute(Name deviceName,
396 Name fileSegmentBaseName,
397 uint32_t segment,
398 shared_ptr<Data> fileSegmentData)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800399{
Alexander Afanasyev4d086752013-02-07 13:06:04 -0800400 // fileSegmentBaseName: /<device_name>/<appname>/file/<hash>
401
Lijing Wang8e56d082016-12-25 14:45:23 -0800402 Buffer hash(fileSegmentBaseName.get(-1).value(), fileSegmentBaseName.get(-1).value_size());
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800403
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800404 _LOG_DEBUG("Received segment deviceName: " << deviceName << ", segmentBaseName: " << fileSegmentBaseName
405 << ", segment: "
406 << segment);
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800407
Lijing Wang8e56d082016-12-25 14:45:23 -0800408 // _LOG_DEBUG("Looking up objectdb for " << hash);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800409
Lijing Wang8e56d082016-12-25 14:45:23 -0800410 std::map<Buffer, shared_ptr<ObjectDb>>::iterator db = m_objectDbMap.find(hash);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800411 if (db != m_objectDbMap.end()) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800412 db->second->saveContentObject(deviceName, segment, *fileSegmentData);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800413 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800414 else {
415 _LOG_ERROR("no db available for this content object: " << fileSegmentBaseName << ", size: "
Lijing Wang8e56d082016-12-25 14:45:23 -0800416 << fileSegmentData->getContent().size());
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800417 }
Alexander Afanasyev17507ba2013-01-24 23:47:34 -0800418
Lijing Wang8e56d082016-12-25 14:45:23 -0800419 // ObjectDb objectDb(m_rootDir / ".chronoshare", lexical_cast<string>(hash));
420 // objectDb.saveContentObject(deviceName, segment, *fileSegmentData);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800421}
422
423void
Lijing Wang8e56d082016-12-25 14:45:23 -0800424Dispatcher::Did_FetchManager_FileFetchComplete(const Name& deviceName, const Name& fileBaseName)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800425{
Lijing Wang8e56d082016-12-25 14:45:23 -0800426 m_ioService.post(
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800427 bind(&Dispatcher::Did_FetchManager_FileFetchComplete_Execute, this, deviceName, fileBaseName));
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800428}
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800429
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800430void
Lijing Wang8e56d082016-12-25 14:45:23 -0800431Dispatcher::Did_FetchManager_FileFetchComplete_Execute(Name deviceName, Name fileBaseName)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800432{
Alexander Afanasyev4d086752013-02-07 13:06:04 -0800433 // fileBaseName: /<device_name>/<appname>/file/<hash>
434
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800435 _LOG_DEBUG("Finished fetching " << deviceName << ", fileBaseName: " << fileBaseName);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800436
Lijing Wang8e56d082016-12-25 14:45:23 -0800437 Buffer hash(fileBaseName.get(-1).value(), fileBaseName.get(-1).value_size());
438
439 _LOG_DEBUG("Extracted hash: " << toHex(hash));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800440
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800441 if (m_objectDbMap.find(hash) != m_objectDbMap.end()) {
Alexander Afanasyev1807e8d2013-01-24 23:37:32 -0800442 // remove the db handle
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800443 m_objectDbMap.erase(hash); // to commit write
Alexander Afanasyev1807e8d2013-01-24 23:37:32 -0800444 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800445 else {
Lijing Wang8e56d082016-12-25 14:45:23 -0800446 _LOG_ERROR("no db available for this file: " << toHex(hash));
Alexander Afanasyev1807e8d2013-01-24 23:37:32 -0800447 }
448
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800449 FileItemsPtr filesToAssemble = m_fileState->LookupFilesForHash(hash);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800450
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800451 for (FileItems::iterator file = filesToAssemble->begin(); file != filesToAssemble->end(); file++) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800452 fs::path filePath = m_rootDir / file->filename();
Yingdi Yuadb54eb2013-08-15 10:28:28 -0700453
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800454 try {
Lijing Wang8e56d082016-12-25 14:45:23 -0800455 if (fs::exists(filePath) && fs::last_write_time(filePath) == file->mtime() &&
456 fs::status(filePath).permissions() == static_cast<fs::perms>(file->mode())) {
457 fs::ifstream input(filePath, std::ios::in | std::ios::binary);
458 if (*util::Sha256(input).computeDigest() == hash) {
459 _LOG_DEBUG("Asking to assemble a file, but file already exists on a filesystem");
460 continue;
461 }
Zhenkai Zhuf3a9fa62013-01-31 17:23:09 -0800462 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800463 }
Yukai Tu729b2892017-02-09 21:15:48 -0800464 catch (const fs::filesystem_error& error) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800465 _LOG_ERROR("File operations failed on [" << filePath << "](ignoring)");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800466 }
467
Lijing Wang8e56d082016-12-25 14:45:23 -0800468 if (ObjectDb::doesExist(m_rootDir / ".chronoshare", deviceName, toHex(hash))) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800469 bool ok = m_objectManager.objectsToLocalFile(deviceName, hash, filePath);
470 if (ok) {
471 last_write_time(filePath, file->mtime());
472#if BOOST_VERSION >= 104900
Lijing Wang8e56d082016-12-25 14:45:23 -0800473 permissions(filePath, static_cast<fs::perms>(file->mode()));
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800474#endif
475
476 m_fileState->SetFileComplete(file->filename());
477 }
478 else {
479 _LOG_ERROR("Notified about complete fetch, but file cannot be restored from the database: ["
480 << filePath
481 << "]");
482 }
483 }
484 else {
485 _LOG_ERROR(filePath << " supposed to have all segments, but not");
486 // should abort for debugging
487 }
488 }
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800489}
Alexander Afanasyev0a30a0c2013-01-29 17:25:42 -0800490
Lijing Wang8e56d082016-12-25 14:45:23 -0800491} // namespace chronoshare
492} // namespace ndn