blob: bf0856e86a05bb09cf821b451f4c050bd7979f87 [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -08003 * Copyright (c) 2013-2017, Regents of the University of California.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080019 */
20
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080021#include "dispatcher.hpp"
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080022#include "fetch-task-db.hpp"
Lijing Wang8e56d082016-12-25 14:45:23 -080023#include "core/logging.hpp"
24
25#include <ndn-cxx/util/digest.hpp>
26#include <ndn-cxx/util/string-helper.hpp>
Alexander Afanasyevf9978f82013-01-23 16:30:31 -080027
Alexander Afanasyevf9978f82013-01-23 16:30:31 -080028#include <boost/lexical_cast.hpp>
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080029
Lijing Wang8e56d082016-12-25 14:45:23 -080030namespace ndn {
31namespace chronoshare {
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080032
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -080033_LOG_INIT(Dispatcher);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080034
Lijing Wang8e56d082016-12-25 14:45:23 -080035namespace fs = boost::filesystem;
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -080036
Lijing Wang8e56d082016-12-25 14:45:23 -080037static const name::Component CHRONOSHARE_APP = name::Component("chronoshare");
38static const Name BROADCAST_DOMAIN = "/ndn/multicast";
39
40static const time::seconds DEFAULT_SYNC_INTEREST_INTERVAL = time::seconds(10);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -080041
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080042Dispatcher::Dispatcher(const std::string& localUserName, const std::string& sharedFolder,
Lijing Wang8e56d082016-12-25 14:45:23 -080043 const fs::path& rootDir, Face& face, bool enablePrefixDiscovery)
44 : m_face(face)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080045 , m_rootDir(rootDir)
Lijing Wang8e56d082016-12-25 14:45:23 -080046 , m_ioService(face.getIoService())
47 , m_objectManager(face, m_keyChain, rootDir, CHRONOSHARE_APP.toUri())
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080048 , m_localUserName(localUserName)
49 , m_sharedFolder(sharedFolder)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080050 , m_enablePrefixDiscovery(enablePrefixDiscovery)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080051{
Zhenkai Zhu3290b8e2013-01-24 15:25:48 -080052 m_syncLog = make_shared<SyncLog>(m_rootDir, localUserName);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080053 m_actionLog =
Lijing Wang8e56d082016-12-25 14:45:23 -080054 make_shared<ActionLog>(m_face, m_rootDir, m_syncLog, sharedFolder, CHRONOSHARE_APP,
55 // bind(&Dispatcher::Did_ActionLog_ActionApply_AddOrModify, this, _1, _2, _3, _4, _5, _6, _7),
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080056 ActionLog::OnFileAddedOrChangedCallback(), // don't really need this callback
57 bind(&Dispatcher::Did_ActionLog_ActionApply_Delete, this, _1));
58 m_fileState = m_actionLog->GetFileState();
Alexander Afanasyevd6364ef2013-02-06 13:13:07 -080059
Lijing Wang8e56d082016-12-25 14:45:23 -080060 Name syncPrefix = Name(BROADCAST_DOMAIN);
61 syncPrefix.append(CHRONOSHARE_APP);
62 syncPrefix.append(sharedFolder);
Alexander Afanasyevcbda9922013-01-22 11:21:12 -080063
Lijing Wang8e56d082016-12-25 14:45:23 -080064 m_server = make_unique<ContentServer>(m_face, m_actionLog, rootDir, m_localUserName,
65 m_sharedFolder, CHRONOSHARE_APP, m_keyChain);
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -080066 m_server->registerPrefix(Name("/"));
Zhenkai Zhu3f64d2d2013-01-25 14:34:59 -080067 m_server->registerPrefix(Name(BROADCAST_DOMAIN));
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080068
Lijing Wangb95c6a52016-12-25 14:45:17 -080069 m_stateServer = make_unique<StateServer>(m_face, m_actionLog, rootDir, m_localUserName, m_sharedFolder,
70 CHRONOSHARE_APP, m_objectManager, m_keyChain);
Alexander Afanasyev026eaf32013-02-23 16:37:14 -080071
Lijing Wang8e56d082016-12-25 14:45:23 -080072 m_core = make_unique<SyncCore>(m_face, m_syncLog, localUserName, Name("/"), syncPrefix,
73 bind(&Dispatcher::Did_SyncLog_StateChange, this, _1),
74 DEFAULT_SYNC_INTEREST_INTERVAL);
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080075
Zhenkai Zhuda686882013-01-29 22:32:24 -080076 FetchTaskDbPtr actionTaskDb = make_shared<FetchTaskDb>(m_rootDir, "action");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080077 m_actionFetcher =
Lijing Wang8e56d082016-12-25 14:45:23 -080078 make_shared<FetchManager>(m_face, bind(&SyncLog::LookupLocator, &*m_syncLog, _1),
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080079 Name(BROADCAST_DOMAIN), // no appname suffix now
80 3,
81 bind(&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4),
82 FetchManager::FinishCallback(), actionTaskDb);
Zhenkai Zhub8c49af2013-01-29 16:03:56 -080083
Zhenkai Zhuda686882013-01-29 22:32:24 -080084 FetchTaskDbPtr fileTaskDb = make_shared<FetchTaskDb>(m_rootDir, "file");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080085 m_fileFetcher =
Lijing Wang8e56d082016-12-25 14:45:23 -080086 make_shared<FetchManager>(m_face, bind(&SyncLog::LookupLocator, &*m_syncLog, _1),
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080087 Name(BROADCAST_DOMAIN), // no appname suffix now
88 3, bind(&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2,
89 _3, _4),
90 bind(&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2),
91 fileTaskDb);
Zhenkai Zhub8c49af2013-01-29 16:03:56 -080092
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080093 if (m_enablePrefixDiscovery) {
Zhenkai Zhu3f64d2d2013-01-25 14:34:59 -080094 _LOG_DEBUG("registering prefix discovery in Dispatcher");
Lijing Wang8e56d082016-12-25 14:45:23 -080095 std::string tag = "dispatcher" + m_localUserName.toUri();
96 // Ccnx::CcnxDiscovery::registerCallback(TaggedFunction(bind(&Dispatcher::Did_LocalPrefix_Updated,
97 // this, _1), tag));
98 // TODO registerCallback...?
99 //
100 // this registerCallback is used when the local prefix changes.
101 // the ndn-cxx library does not have this functionality
102 // thus, the application will need to implement this.
103 // send a data packet and get the local prefix. If they are different, call the callback
104 // function, else do nothing.
Zhenkai Zhufaee2d42013-01-24 17:47:13 -0800105 }
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800106}
107
108Dispatcher::~Dispatcher()
109{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800110 _LOG_DEBUG("Enter destructor of dispatcher");
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800111
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800112 if (m_enablePrefixDiscovery) {
Zhenkai Zhu3f64d2d2013-01-25 14:34:59 -0800113 _LOG_DEBUG("deregistering prefix discovery in Dispatcher");
Lijing Wang8e56d082016-12-25 14:45:23 -0800114 std::string tag = "dispatcher" + m_localUserName.toUri();
115 // TODO
116 // Ccnx::CcnxDiscovery::deregisterCallback(TaggedFunction(bind(&Dispatcher::Did_LocalPrefix_Updated,
117 // this, _1), tag));
Alexander Afanasyev026eaf32013-02-23 16:37:14 -0800118 }
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800119}
120
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800121void
Lijing Wang8e56d082016-12-25 14:45:23 -0800122Dispatcher::Did_LocalPrefix_Updated(const Name& forwardingHint)
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800123{
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800124 Name effectiveForwardingHint;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800125 if (m_localUserName.size() >= forwardingHint.size() &&
Lijing Wang8e56d082016-12-25 14:45:23 -0800126 m_localUserName.getSubName(0, forwardingHint.size()) == forwardingHint) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800127 effectiveForwardingHint = Name("/"); // "directly" accesible
128 }
129 else {
130 effectiveForwardingHint = forwardingHint;
131 }
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800132
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800133 Name oldLocalPrefix = m_syncLog->LookupLocalLocator();
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800134
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800135 if (oldLocalPrefix == effectiveForwardingHint) {
136 _LOG_DEBUG(
137 "Got notification about prefix change from " << oldLocalPrefix << " to: " << forwardingHint
138 << ", but effective prefix didn't change");
139 return;
140 }
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800141
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800142 if (effectiveForwardingHint == Name("/") || effectiveForwardingHint == Name(BROADCAST_DOMAIN)) {
143 _LOG_DEBUG("Basic effective prefix [" << effectiveForwardingHint
144 << "]. Updating local prefix, but don't reregister");
145 m_syncLog->UpdateLocalLocator(effectiveForwardingHint);
146 return;
147 }
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800148
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800149 _LOG_DEBUG("LocalPrefix changed from: " << oldLocalPrefix << " to: " << effectiveForwardingHint);
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800150
151 m_server->registerPrefix(effectiveForwardingHint);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800152 m_syncLog->UpdateLocalLocator(effectiveForwardingHint);
Alexander Afanasyev7aced752013-02-13 09:57:25 -0800153
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800154 if (oldLocalPrefix == Name("/") || oldLocalPrefix == Name(BROADCAST_DOMAIN)) {
155 _LOG_DEBUG("Don't deregister basic prefix: " << oldLocalPrefix);
156 return;
157 }
Alexander Afanasyev758f51b2013-01-24 13:48:18 -0800158 m_server->deregisterPrefix(oldLocalPrefix);
159}
160
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800161/////////////////////////////////////////////////////////////////////////////////////////////////////
162/////////////////////////////////////////////////////////////////////////////////////////////////////
163/////////////////////////////////////////////////////////////////////////////////////////////////////
164
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800165void
Lijing Wang8e56d082016-12-25 14:45:23 -0800166Dispatcher::Did_LocalFile_AddOrModify(const fs::path& relativeFilePath)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800167{
Lijing Wang8e56d082016-12-25 14:45:23 -0800168 m_ioService.post(bind(&Dispatcher::Did_LocalFile_AddOrModify_Execute, this, relativeFilePath));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800169}
170
171void
Lijing Wang8e56d082016-12-25 14:45:23 -0800172Dispatcher::Did_LocalFile_AddOrModify_Execute(fs::path relativeFilePath)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800173{
Zhenkai Zhufaee2d42013-01-24 17:47:13 -0800174 _LOG_DEBUG(m_localUserName << " calls LocalFile_AddOrModify_Execute");
Lijing Wang8e56d082016-12-25 14:45:23 -0800175 fs::path absolutePath = m_rootDir / relativeFilePath;
176 _LOG_DEBUG("relativeFilePath : " << relativeFilePath);
177 _LOG_DEBUG("absolutePath : " << absolutePath);
178 if (!fs::exists(absolutePath)) {
179 // BOOST_THROW_EXCEPTION(Error::Dispatcher() << error_info_str("Update non exist file: " +
180 // absolutePath.string() ));
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800181 _LOG_DEBUG("Update non exist file: " << absolutePath.string());
182 return;
183 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800184
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800185 FileItemPtr currentFile = m_fileState->LookupFile(relativeFilePath.generic_string());
Lijing Wang8e56d082016-12-25 14:45:23 -0800186
187 if (currentFile) {
188 fs::ifstream input(absolutePath);
189 if (*util::Sha256(input).computeDigest() ==
190 Buffer(currentFile->file_hash().c_str(), currentFile->file_hash().size())
191 // The following two are commented out to prevent front end from reporting intermediate files
192 // should enable it if there is other way to prevent this
193 // && last_write_time(absolutePath) == currentFile->mtime()
194 // && status(absolutePath).permissions() ==
195 // static_cast<fs::perms>(currentFile->mode())
196 ) {
197 _LOG_ERROR("Got notification about the same file [" << relativeFilePath << "]");
198 return;
199 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800200 }
201
202 if (currentFile && !currentFile->is_complete()) {
203 _LOG_ERROR("Got notification about incomplete file [" << relativeFilePath << "]");
204 return;
205 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800206
207 int seg_num;
Lijing Wang8e56d082016-12-25 14:45:23 -0800208 ConstBufferPtr hash;
209 _LOG_DEBUG("absolutePath: " << absolutePath << " m_localUserName: " << m_localUserName);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800210 tie(hash, seg_num) = m_objectManager.localFileToObjects(absolutePath, m_localUserName);
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800211
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800212 try {
Lijing Wang8e56d082016-12-25 14:45:23 -0800213 m_actionLog->AddLocalActionUpdate(relativeFilePath.generic_string(), *hash,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800214 last_write_time(absolutePath),
Alexander Afanasyevf8ff5e12013-07-11 13:57:32 -0700215#if BOOST_VERSION >= 104900
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800216 status(absolutePath).permissions(),
Alexander Afanasyevf8ff5e12013-07-11 13:57:32 -0700217#else
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800218 0,
Alexander Afanasyevf8ff5e12013-07-11 13:57:32 -0700219#endif
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800220 seg_num);
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800221
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800222 // notify SyncCore to propagate the change
223 m_core->localStateChangedDelayed();
224 }
Lijing Wang8e56d082016-12-25 14:45:23 -0800225 catch (fs::filesystem_error& error) {
226 _LOG_ERROR("File operations failed on [" << relativeFilePath << "](ignoring)");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800227 }
Lijing Wang8e56d082016-12-25 14:45:23 -0800228
229 _LOG_DEBUG("LocalFile_AddOrModify_Execute Finished!");
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800230}
231
232void
Lijing Wang8e56d082016-12-25 14:45:23 -0800233Dispatcher::Did_LocalFile_Delete(const fs::path& relativeFilePath)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800234{
Lijing Wang8e56d082016-12-25 14:45:23 -0800235 m_ioService.post(bind(&Dispatcher::Did_LocalFile_Delete_Execute, this, relativeFilePath));
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800236}
237
238void
Lijing Wang8e56d082016-12-25 14:45:23 -0800239Dispatcher::Did_LocalFile_Delete_Execute(fs::path relativeFilePath)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800240{
Lijing Wang8e56d082016-12-25 14:45:23 -0800241 fs::path absolutePath = m_rootDir / relativeFilePath;
242 if (fs::exists(absolutePath)) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800243 _LOG_ERROR("DELETE command, but file still exists: " << absolutePath.string());
244 return;
245 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800246
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800247 m_actionLog->AddLocalActionDelete(relativeFilePath.generic_string());
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800248 // notify SyncCore to propagate the change
Alexander Afanasyeva2fabcf2013-02-05 11:26:37 -0800249 m_core->localStateChangedDelayed();
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800250}
251
252/////////////////////////////////////////////////////////////////////////////////////////////////////
253/////////////////////////////////////////////////////////////////////////////////////////////////////
254/////////////////////////////////////////////////////////////////////////////////////////////////////
255
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800256void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800257Dispatcher::Did_SyncLog_StateChange(SyncStateMsgPtr stateMsg)
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800258{
Lijing Wang8e56d082016-12-25 14:45:23 -0800259 m_ioService.post(bind(&Dispatcher::Did_SyncLog_StateChange_Execute, this, stateMsg));
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800260}
261
262void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800263Dispatcher::Did_SyncLog_StateChange_Execute(SyncStateMsgPtr stateMsg)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800264{
265 int size = stateMsg->state_size();
266 int index = 0;
267 // iterate and fetch the actions
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800268 for (; index < size; index++) {
269 SyncState state = stateMsg->state(index);
270 if (state.has_old_seq() && state.has_seq()) {
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800271 uint64_t oldSeq = state.old_seq();
272 uint64_t newSeq = state.seq();
Lijing Wang8e56d082016-12-25 14:45:23 -0800273 Name userName(Block((const unsigned char*)state.name().c_str(), state.name().size()));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800274
Lijing Wang8e56d082016-12-25 14:45:23 -0800275 // fetch actions with oldSeq + 1 to newSeq(inclusive)
276 Name actionNameBase = Name("/");
277 actionNameBase.append(userName).append(CHRONOSHARE_APP).append("action").append(m_sharedFolder);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800278 m_actionFetcher->Enqueue(userName, actionNameBase, std::max<uint64_t>(oldSeq + 1, 1), newSeq,
279 FetchManager::PRIORITY_HIGH);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800280 }
281 }
282}
283
284void
Lijing Wang8e56d082016-12-25 14:45:23 -0800285Dispatcher::Did_FetchManager_ActionFetch(const Name& deviceName, const Name& actionBaseName,
286 uint32_t seqno, shared_ptr<Data> actionData)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800287{
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800288 /// @todo Errors and exception checking
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800289 _LOG_DEBUG("Received action deviceName: " << deviceName << ", actionBaseName: " << actionBaseName
290 << ", seqno: "
291 << seqno);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800292
Lijing Wang8e56d082016-12-25 14:45:23 -0800293 ActionItemPtr action = m_actionLog->AddRemoteAction(deviceName, seqno, actionData);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800294 if (!action) {
295 _LOG_ERROR("AddRemoteAction did not insert action, ignoring");
296 return;
297 }
Lijing Wang8e56d082016-12-25 14:45:23 -0800298 // trigger may invoke Did_ActionLog_ActionApply_Delete or Did_ActionLog_ActionApply_AddOrModify
299 // callbacks
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800300
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800301 if (action->action() == ActionItem::UPDATE) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800302 ConstBufferPtr hash =
303 make_shared<Buffer>(action->file_hash().c_str(), action->file_hash().size());
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800304
Lijing Wang8e56d082016-12-25 14:45:23 -0800305 Name fileNameBase = Name("/");
306 fileNameBase.append(deviceName).append(CHRONOSHARE_APP).append("file");
307 // fileNameBase.append(name::Component(hash));
308 fileNameBase.appendImplicitSha256Digest(hash);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800309
Lijing Wang8e56d082016-12-25 14:45:23 -0800310 std::string hashStr = toHex(*hash);
311 if (ObjectDb::doesExist(m_rootDir / ".chronoshare", deviceName, hashStr)) {
312 _LOG_DEBUG("File already exists in the database. No need to refetch, just directly applying "
313 "the action");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800314 Did_FetchManager_FileFetchComplete(deviceName, fileNameBase);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800315 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800316 else {
Lijing Wang8e56d082016-12-25 14:45:23 -0800317 if (m_objectDbMap.find(*hash) == m_objectDbMap.end()) {
318 _LOG_DEBUG("create ObjectDb for " << toHex(*hash));
319 m_objectDbMap[*hash] = make_shared<ObjectDb>(m_rootDir / ".chronoshare", hashStr);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800320 }
321
322 m_fileFetcher->Enqueue(deviceName, fileNameBase, 0, action->seg_num() - 1,
323 FetchManager::PRIORITY_NORMAL);
324 }
325 }
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800326}
327
328void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800329Dispatcher::Did_ActionLog_ActionApply_Delete(const std::string& filename)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800330{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800331 _LOG_DEBUG("Action to delete " << filename);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800332
Lijing Wang8e56d082016-12-25 14:45:23 -0800333 fs::path absolutePath = m_rootDir / filename;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800334 try {
Lijing Wang8e56d082016-12-25 14:45:23 -0800335 if (fs::exists(absolutePath)) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800336 // need some protection from local detection of removal
337 remove(absolutePath);
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800338
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800339 // hack to remove empty parent dirs
Lijing Wang8e56d082016-12-25 14:45:23 -0800340 fs::path parentPath = absolutePath.parent_path();
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800341 while (parentPath > m_rootDir) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800342 if (fs::is_empty(parentPath)) {
343 fs::remove(parentPath);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800344 parentPath = parentPath.parent_path();
345 }
346 else {
347 break;
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800348 }
349 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800350 }
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800351 // don't exist
352 }
Lijing Wang8e56d082016-12-25 14:45:23 -0800353 catch (fs::filesystem_error& error) {
354 _LOG_ERROR("File operations failed when removing [" << absolutePath << "](ignoring)");
Zhenkai Zhudd4359b2013-02-24 11:07:34 -0800355 }
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800356}
357
358void
Lijing Wang8e56d082016-12-25 14:45:23 -0800359Dispatcher::Did_FetchManager_FileSegmentFetch(const Name& deviceName,
360 const Name& fileSegmentBaseName,
361 uint32_t segment,
362 shared_ptr<Data>
363 fileSegmentData)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800364{
Lijing Wang8e56d082016-12-25 14:45:23 -0800365 m_ioService.post(bind(&Dispatcher::Did_FetchManager_FileSegmentFetch_Execute, this, deviceName,
366 fileSegmentBaseName, segment, fileSegmentData));
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800367}
368
369void
Lijing Wang8e56d082016-12-25 14:45:23 -0800370Dispatcher::Did_FetchManager_FileSegmentFetch_Execute(Name deviceName,
371 Name fileSegmentBaseName,
372 uint32_t segment,
373 shared_ptr<Data> fileSegmentData)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800374{
Alexander Afanasyev4d086752013-02-07 13:06:04 -0800375 // fileSegmentBaseName: /<device_name>/<appname>/file/<hash>
376
Lijing Wang8e56d082016-12-25 14:45:23 -0800377 Buffer hash(fileSegmentBaseName.get(-1).value(), fileSegmentBaseName.get(-1).value_size());
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800378
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800379 _LOG_DEBUG("Received segment deviceName: " << deviceName << ", segmentBaseName: " << fileSegmentBaseName
380 << ", segment: "
381 << segment);
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800382
Lijing Wang8e56d082016-12-25 14:45:23 -0800383 // _LOG_DEBUG("Looking up objectdb for " << hash);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800384
Lijing Wang8e56d082016-12-25 14:45:23 -0800385 std::map<Buffer, shared_ptr<ObjectDb>>::iterator db = m_objectDbMap.find(hash);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800386 if (db != m_objectDbMap.end()) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800387 db->second->saveContentObject(deviceName, segment, *fileSegmentData);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800388 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800389 else {
390 _LOG_ERROR("no db available for this content object: " << fileSegmentBaseName << ", size: "
Lijing Wang8e56d082016-12-25 14:45:23 -0800391 << fileSegmentData->getContent().size());
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800392 }
Alexander Afanasyev17507ba2013-01-24 23:47:34 -0800393
Lijing Wang8e56d082016-12-25 14:45:23 -0800394 // ObjectDb objectDb(m_rootDir / ".chronoshare", lexical_cast<string>(hash));
395 // objectDb.saveContentObject(deviceName, segment, *fileSegmentData);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800396}
397
398void
Lijing Wang8e56d082016-12-25 14:45:23 -0800399Dispatcher::Did_FetchManager_FileFetchComplete(const Name& deviceName, const Name& fileBaseName)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800400{
Lijing Wang8e56d082016-12-25 14:45:23 -0800401 m_ioService.post(
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800402 bind(&Dispatcher::Did_FetchManager_FileFetchComplete_Execute, this, deviceName, fileBaseName));
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800403}
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800404
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800405void
Lijing Wang8e56d082016-12-25 14:45:23 -0800406Dispatcher::Did_FetchManager_FileFetchComplete_Execute(Name deviceName, Name fileBaseName)
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800407{
Alexander Afanasyev4d086752013-02-07 13:06:04 -0800408 // fileBaseName: /<device_name>/<appname>/file/<hash>
409
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800410 _LOG_DEBUG("Finished fetching " << deviceName << ", fileBaseName: " << fileBaseName);
Alexander Afanasyevf9978f82013-01-23 16:30:31 -0800411
Lijing Wang8e56d082016-12-25 14:45:23 -0800412 Buffer hash(fileBaseName.get(-1).value(), fileBaseName.get(-1).value_size());
413
414 _LOG_DEBUG("Extracted hash: " << toHex(hash));
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800415
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800416 if (m_objectDbMap.find(hash) != m_objectDbMap.end()) {
Alexander Afanasyev1807e8d2013-01-24 23:37:32 -0800417 // remove the db handle
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800418 m_objectDbMap.erase(hash); // to commit write
Alexander Afanasyev1807e8d2013-01-24 23:37:32 -0800419 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800420 else {
Lijing Wang8e56d082016-12-25 14:45:23 -0800421 _LOG_ERROR("no db available for this file: " << toHex(hash));
Alexander Afanasyev1807e8d2013-01-24 23:37:32 -0800422 }
423
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800424 FileItemsPtr filesToAssemble = m_fileState->LookupFilesForHash(hash);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800425
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800426 for (FileItems::iterator file = filesToAssemble->begin(); file != filesToAssemble->end(); file++) {
Lijing Wang8e56d082016-12-25 14:45:23 -0800427 fs::path filePath = m_rootDir / file->filename();
Yingdi Yuadb54eb2013-08-15 10:28:28 -0700428
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800429 try {
Lijing Wang8e56d082016-12-25 14:45:23 -0800430 if (fs::exists(filePath) && fs::last_write_time(filePath) == file->mtime() &&
431 fs::status(filePath).permissions() == static_cast<fs::perms>(file->mode())) {
432 fs::ifstream input(filePath, std::ios::in | std::ios::binary);
433 if (*util::Sha256(input).computeDigest() == hash) {
434 _LOG_DEBUG("Asking to assemble a file, but file already exists on a filesystem");
435 continue;
436 }
Zhenkai Zhuf3a9fa62013-01-31 17:23:09 -0800437 }
Alexander Afanasyevf2c16e02013-01-23 18:08:04 -0800438 }
Lijing Wang8e56d082016-12-25 14:45:23 -0800439 catch (fs::filesystem_error& error) {
440 _LOG_ERROR("File operations failed on [" << filePath << "](ignoring)");
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800441 }
442
Lijing Wang8e56d082016-12-25 14:45:23 -0800443 if (ObjectDb::doesExist(m_rootDir / ".chronoshare", deviceName, toHex(hash))) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800444 bool ok = m_objectManager.objectsToLocalFile(deviceName, hash, filePath);
445 if (ok) {
446 last_write_time(filePath, file->mtime());
447#if BOOST_VERSION >= 104900
Lijing Wang8e56d082016-12-25 14:45:23 -0800448 permissions(filePath, static_cast<fs::perms>(file->mode()));
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800449#endif
450
451 m_fileState->SetFileComplete(file->filename());
452 }
453 else {
454 _LOG_ERROR("Notified about complete fetch, but file cannot be restored from the database: ["
455 << filePath
456 << "]");
457 }
458 }
459 else {
460 _LOG_ERROR(filePath << " supposed to have all segments, but not");
461 // should abort for debugging
462 }
463 }
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800464}
Alexander Afanasyev0a30a0c2013-01-29 17:25:42 -0800465
Lijing Wang8e56d082016-12-25 14:45:23 -0800466} // namespace chronoshare
467} // namespace ndn