blob: d7e570a99c1a2483fde6d6f6ed69380c305dce0f [file] [log] [blame]
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -08001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Zhenkai Zhu <zhenkai@cs.ucla.edu>
19 * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
20 */
21
22#include "dispatcher.h"
23#include <boost/make_shared.hpp>
24
25using namespace Ccnx;
26using namespace std;
27using namespace boost;
28
29static const string BROADCAST_DOMAIN = "/ndn/broadcast/chronoshare";
30static const int MAX_FILE_SEGMENT_SIZE = 1024;
31
Alexander Afanasyevcbda9922013-01-22 11:21:12 -080032Dispatcher::Dispatcher(const filesystem::path &path, const std::string &localUserName, const Ccnx::Name &localPrefix, const std::string &sharedFolder, const filesystem::path &rootDir, Ccnx::CcnxWrapperPtr ccnx, SchedulerPtr scheduler, int poolSize)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080033 : m_ccnx(ccnx)
34 , m_core(NULL)
35 , m_rootDir(rootDir)
36 , m_executor(poolSize)
37 , m_objectManager(ccnx, rootDir)
38 , m_localUserName(localUserName)
39 , m_sharedFolder(sharedFolder)
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080040 , m_server(NULL)
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080041{
Alexander Afanasyevcbda9922013-01-22 11:21:12 -080042 m_syncLog = make_shared<SyncLog>(path, localUserName);
Alexander Afanasyeva35756b2013-01-22 16:59:11 -080043 m_actionLog = make_shared<ActionLog>(m_ccnx, path, m_syncLog, sharedFolder);
Alexander Afanasyev1b15cc62013-01-22 17:00:40 -080044 Name syncPrefix = Name(BROADCAST_DOMAIN)(sharedFolder);
Alexander Afanasyevcbda9922013-01-22 11:21:12 -080045
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080046 m_server = new ContentServer(m_ccnx, m_actionLog, rootDir);
47 m_server->registerPrefix(localPrefix);
48 m_server->registerPrefix(syncPrefix);
49
Alexander Afanasyevcbda9922013-01-22 11:21:12 -080050 m_core = new SyncCore (m_syncLog, localUserName, localPrefix, syncPrefix,
51 bind(&Dispatcher::syncStateChanged, this, _1), ccnx, scheduler);
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080052
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080053}
54
55Dispatcher::~Dispatcher()
56{
57 if (m_core != NULL)
58 {
59 delete m_core;
60 m_core = NULL;
61 }
Zhenkai Zhu1dcbbab2013-01-22 16:03:20 -080062
63 if (m_server != NULL)
64 {
65 delete m_server;
66 m_server = NULL;
67 }
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -080068}
69
70void
71Dispatcher::fileChangedCallback(const filesystem::path &relativeFilePath, ActionType type)
72{
73 Executor::Job job = bind(&Dispatcher::fileChanged, this, relativeFilePath, type);
74 m_executor.execute(job);
75}
76
77void
78Dispatcher::syncStateChangedCallback(const SyncStateMsgPtr &stateMsg)
79{
80 Executor::Job job = bind(&Dispatcher::syncStateChanged, this, stateMsg);
81 m_executor.execute(job);
82}
83
84void
85Dispatcher::actionReceivedCallback(const ActionItemPtr &actionItem)
86{
87 Executor::Job job = bind(&Dispatcher::actionReceived, this, actionItem);
88 m_executor.execute(job);
89}
90
91void
92Dispatcher::fileSegmentReceivedCallback(const Ccnx::Name &name, const Ccnx::Bytes &content)
93{
94 Executor::Job job = bind(&Dispatcher::fileSegmentReceived, this, name, content);
95 m_executor.execute(job);
96}
97
98void
99Dispatcher::fileReadyCallback(const Ccnx::Name &fileNamePrefix)
100{
101 Executor::Job job = bind(&Dispatcher::fileReady, this, fileNamePrefix);
102 m_executor.execute(job);
103}
104
105void
106Dispatcher::fileChanged(const filesystem::path &relativeFilePath, ActionType type)
107{
108
109 switch (type)
110 {
111 case UPDATE:
112 {
Alexander Afanasyevcbda9922013-01-22 11:21:12 -0800113 filesystem::path absolutePath = m_rootDir / relativeFilePath;
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800114 if (filesystem::exists(absolutePath))
115 {
116 HashPtr hash = Hash::FromFileContent(absolutePath);
Alexander Afanasyeva35756b2013-01-22 16:59:11 -0800117 if (m_actionLog->KnownFileState(relativeFilePath.generic_string(), *hash))
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800118 {
119 // the file state is known; i.e. the detected changed file is identical to
120 // the file state kept in FileState table
121 // it is the case that backend puts the file fetched from remote;
122 // we should not publish action for this.
123 }
124 else
125 {
126 uintmax_t fileSize = filesystem::file_size(absolutePath);
Alexander Afanasyeva35756b2013-01-22 16:59:11 -0800127 int seg_num;
128 tie (hash, seg_num) = m_objectManager.localFileToObjects (absolutePath, m_localUserName);
129
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800130 time_t wtime = filesystem::last_write_time(absolutePath);
131 filesystem::file_status stat = filesystem::status(absolutePath);
132 int mode = stat.permissions();
Alexander Afanasyeva35756b2013-01-22 16:59:11 -0800133
134 m_actionLog->AddLocalActionUpdate (relativeFilePath.generic_string(), *hash, wtime, mode, seg_num);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800135 // publish the file
Alexander Afanasyeva35756b2013-01-22 16:59:11 -0800136
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800137 // notify SyncCore to propagate the change
138 m_core->localStateChanged();
139 }
140 break;
141 }
142 else
143 {
144 BOOST_THROW_EXCEPTION (Error::Dispatcher() << error_info_str("Update non exist file: " + absolutePath.string() ));
145 }
146 }
147 case DELETE:
148 {
Alexander Afanasyeva35756b2013-01-22 16:59:11 -0800149 m_actionLog->AddLocalActionDelete (relativeFilePath.generic_string());
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800150 // notify SyncCore to propagate the change
151 m_core->localStateChanged();
152 break;
153 }
154 default:
155 break;
156 }
157}
158
159void
160Dispatcher::syncStateChanged(const SyncStateMsgPtr &stateMsg)
161{
162 int size = stateMsg->state_size();
163 int index = 0;
164 // iterate and fetch the actions
165 while (index < size)
166 {
167 SyncState state = stateMsg->state(index);
168 if (state.has_old_seq() && state.has_seq())
169 {
170 uint64_t oldSeq = state.old_seq();
171 uint64_t newSeq = state.seq();
172 Name userName = state.name();
173 Name locator = state.locator();
174
175 // fetch actions with oldSeq + 1 to newSeq (inclusive)
176 Name actionNameBase(userName);
177 actionNameBase.appendComp("action")
178 .appendComp(m_sharedFolder);
179
180 for (uint64_t seqNo = oldSeq + 1; seqNo <= newSeq; seqNo++)
181 {
182 Name actionName = actionNameBase;
183 actionName.appendComp(seqNo);
184
185 // TODO:
186 // use fetcher to fetch the name with callback "actionRecieved"
187 }
188 }
189 }
190}
191
192void
193Dispatcher::actionReceived(const ActionItemPtr &actionItem)
194{
195 switch (actionItem->action())
196 {
197 case ActionItem::UPDATE:
198 {
Alexander Afanasyevcbda9922013-01-22 11:21:12 -0800199 // @TODO
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800200 // need a function in ActionLog to apply received action, i.e. record remote action in ActionLog
201
202 string hashBytes = actionItem->file_hash();
203 Hash hash(hashBytes.c_str(), hashBytes.size());
204 ostringstream oss;
205 oss << hash;
206 string hashString = oss.str();
207 ObjectDbPtr db = make_shared<ObjectDb>(m_rootDir, hashString);
208 m_objectDbMap[hashString] = db;
209
210 // TODO:
211 // user fetcher to fetch the file with callback "fileSegmentReceived" for segment callback and "fileReady" for file ready callback
212 break;
213 }
214 case ActionItem::DELETE:
215 {
216 string filename = actionItem->filename();
217 // TODO:
Alexander Afanasyevcbda9922013-01-22 11:21:12 -0800218 // m_actionLog->AddRemoteActionDelete(filename);
Zhenkai Zhuc3fd51e2013-01-22 10:45:54 -0800219 break;
220 }
221 default:
222 break;
223 }
224}
225
226void
227Dispatcher::fileSegmentReceived(const Ccnx::Name &name, const Ccnx::Bytes &content)
228{
229 int size = name.size();
230 uint64_t segment = name.getCompAsInt(size - 1);
231 Bytes hashBytes = name.getComp(size - 2);
232 Hash hash(head(hashBytes), hashBytes.size());
233 ostringstream oss;
234 oss << hash;
235 string hashString = oss.str();
236 if (m_objectDbMap.find(hashString) != m_objectDbMap.end())
237 {
238 ObjectDbPtr db = m_objectDbMap[hashString];
239 // get the device name
240 // Name deviceName = name.getPartialName();
241 // db->saveContenObject(deviceName, segment, content);
242 }
243 else
244 {
245 cout << "no db available for this content object: " << name << ", size: " << content.size() << endl;
246 }
247}
248
249void
250Dispatcher::fileReady(const Ccnx::Name &fileNamePrefix)
251{
252 int size = fileNamePrefix.size();
253 Bytes hashBytes = fileNamePrefix.getComp(size - 1);
254 Hash hash(head(hashBytes), hashBytes.size());
255 ostringstream oss;
256 oss << hash;
257 string hashString = oss.str();
258
259 if (m_objectDbMap.find(hashString) != m_objectDbMap.end())
260 {
261 // remove the db handle
262 m_objectDbMap.erase(hashString);
263 }
264 else
265 {
266 cout << "no db available for this file: " << fileNamePrefix << endl;
267 }
268
269 // query the action table to get the path on local file system
270 // m_objectManager.objectsToLocalFile(deviceName, hash, relativeFilePath);
271
272}