blob: 67e2f73f1026d6197581805a957ee69300eef93a [file] [log] [blame]
Zhenkai Zhu8224bb62013-01-06 15:58:02 -08001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Zhenkai Zhu <zhenkai@cs.ucla.edu>
19 * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
20 */
21
22#include "sync-core.h"
23
Zhenkai Zhue851b952013-01-13 22:29:57 -080024const string SyncCore::RECOVER = "RECOVER";
25const double SyncCore::WAIT = 0.2;
26const double SyncCore::RANDOM_PERCENT = 0.5;
27
28SyncCore::SyncCore(const string &path, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, CcnxWrapperPtr handle, SchedulerPtr scheduler)
29 : m_log(path, userName.toString())
Alexander Afanasyeva44a7a22013-01-14 17:37:06 -080030 , m_scheduler(scheduler)
31 , m_stateMsgCallback(callback)
32 , m_userName(userName)
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080033 , m_localPrefix(localPrefix)
34 , m_syncPrefix(syncPrefix)
Zhenkai Zhu8224bb62013-01-06 15:58:02 -080035 , m_handle(handle)
36{
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080037 m_rootHash = m_log.RememberStateInStateLog();
Zhenkai Zhue851b952013-01-13 22:29:57 -080038 m_syncClosure = new Closure(0, boost::bind(&SyncCore::handleSyncData, this, _1, _2), boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1));
39 m_recoverClosure = new Closure(0, boost::bind(&SyncCore::handleRecoverData, this, _1, _2), boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1));
40 m_handle->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
41 m_log.initYP(m_yp);
Zhenkai Zhue29616f2013-01-14 15:40:57 -080042 //m_scheduler->start();
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080043 sendSyncInterest();
44}
45
46SyncCore::~SyncCore()
47{
Zhenkai Zhue29616f2013-01-14 15:40:57 -080048 //m_scheduler->shutdown();
Zhenkai Zhue851b952013-01-13 22:29:57 -080049 if (m_syncClosure != 0)
50 {
51 delete m_syncClosure;
52 m_syncClosure = 0;
53 }
54
55 if (m_recoverClosure != 0)
56 {
57 delete m_recoverClosure;
58 m_recoverClosure = 0;
59 }
60}
61
62Name
63SyncCore::yp(const Name &deviceName)
64{
65 Name locator;
66 ReadLock(m_ypMutex);
67 if (m_yp.find(deviceName) != m_yp.end())
68 {
69 locator = m_yp[deviceName];
70 }
71 return locator;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080072}
73
74void
75SyncCore::updateLocalPrefix(const Name &localPrefix)
76{
77 m_localPrefix = localPrefix;
78 // optionally, we can have a sync action to announce the new prefix
79 // we are not doing this for now
80}
81
82void
Zhenkai Zhue851b952013-01-13 22:29:57 -080083SyncCore::updateLocalState(sqlite3_int64 seqno)
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080084{
Alexander Afanasyeva44a7a22013-01-14 17:37:06 -080085 cout << "User name: " << m_userName << endl;
Zhenkai Zhue851b952013-01-13 22:29:57 -080086 m_log.UpdateDeviceSeqNo(m_userName, seqno);
87 // choose to update locator everytime
88 m_log.UpdateLocator(m_userName, m_localPrefix);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080089 HashPtr oldHash = m_rootHash;
90 m_rootHash = m_log.RememberStateInStateLog();
91
Zhenkai Zhu05de64a2013-01-14 15:48:23 -080092 cout << "Old hash: " << *oldHash << endl;
93 cout << "New hash: " << *m_rootHash << endl;
94
Zhenkai Zhue851b952013-01-13 22:29:57 -080095 SyncStateMsgPtr msg = m_log.FindStateDifferences(*oldHash, *m_rootHash);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080096
97 // reply sync Interest with oldHash as last component
98 Name syncName = constructSyncName(oldHash);
99 Bytes syncData;
100 msgToBytes(msg, syncData);
101 m_handle->publishData(syncName, syncData, FRESHNESS);
102
103 // no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
104 // this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
105 ostringstream ss;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800106 ss << *m_rootHash;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800107 TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::sendSyncInterest, this), ss.str(), m_scheduler, 0.05));
108 m_scheduler->addTask(task);
109}
110
111void
Zhenkai Zhue851b952013-01-13 22:29:57 -0800112SyncCore::handleInterest(const Name &name)
113{
114 int size = name.size();
115 int prefixSize = m_syncPrefix.size();
116 if (size == prefixSize + 1)
117 {
118 // this is normal sync interest
119 handleSyncInterest(name);
120 }
121 else if (size == prefixSize + 2 && name.getCompAsString(m_syncPrefix.size()) == RECOVER)
122 {
123 // this is recovery interest
124 handleRecoverInterest(name);
125 }
126}
127
128void
129SyncCore::handleRecoverInterest(const Name &name)
130{
131 Bytes hashBytes = name.getComp(name.size() - 1);
132 // this is the hash unkonwn to the sender of the interest
133 Hash hash(head(hashBytes), hashBytes.size());
134 if (m_log.LookupSyncLog(hash) > 0)
135 {
136 // we know the hash, should reply everything
137 SyncStateMsgPtr msg = m_log.FindStateDifferences(*(Hash::Origin), *m_rootHash);
138 Bytes syncData;
139 msgToBytes(msg, syncData);
140 m_handle->publishData(name, syncData, FRESHNESS);
141 }
142 else
143 {
144 // we don't recognize this hash, can not help
145 }
146}
147
148void
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800149SyncCore::handleSyncInterest(const Name &name)
150{
Zhenkai Zhue851b952013-01-13 22:29:57 -0800151 Bytes hashBytes = name.getComp(name.size() - 1);
152 HashPtr hash(new Hash(head(hashBytes), hashBytes.size()));
153 if (*hash == *m_rootHash)
154 {
155 // we have the same hash; nothing needs to be done
156 return;
157 }
158 else if (m_log.LookupSyncLog(*hash) > 0)
159 {
160 // we know something more
Zhenkai Zhue29616f2013-01-14 15:40:57 -0800161 cout << "SyncInterest: " << name << endl;
162 cout << "hash: " << *hash << endl;
163 cout << "root hash: " << *m_rootHash << endl;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800164 SyncStateMsgPtr msg = m_log.FindStateDifferences(*hash, *m_rootHash);
165
166 Bytes syncData;
167 msgToBytes(msg, syncData);
168 m_handle->publishData(name, syncData, FRESHNESS);
169 }
170 else
171 {
172 // we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
173 ostringstream ss;
174 ss << *hash;
175 IntervalGeneratorPtr generator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP));
176 TaskPtr task(new PeriodicTask(boost::bind(&SyncCore::recover, this, hash), ss.str(), m_scheduler, generator, 1));
177 m_scheduler->addTask(task);
178 }
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800179}
180
181Closure::TimeoutCallbackReturnValue
182SyncCore::handleSyncInterestTimeout(const Name &name)
183{
184 // sendInterestInterest with the current root hash;
185 sendSyncInterest();
Zhenkai Zhue851b952013-01-13 22:29:57 -0800186 return Closure::RESULT_OK;
187}
188
189Closure::TimeoutCallbackReturnValue
190SyncCore::handleRecoverInterestTimeout(const Name &name)
191{
192 // We do not re-express recovery interest for now
193 // if difference is not resolved, the sync interest will trigger
194 // recovery anyway; so it's not so important to have recovery interest
195 // re-expressed
196 return Closure::RESULT_OK;
197}
198
199void
200SyncCore::handleRecoverData(const Name &name, const Bytes &content)
201{
202 handleStateData(content);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800203}
204
205void
206SyncCore::handleSyncData(const Name &name, const Bytes &content)
207{
Zhenkai Zhue851b952013-01-13 22:29:57 -0800208 // suppress recover in interest - data out of order case
209 handleStateData(content);
210
211 // resume outstanding sync interest
212 sendSyncInterest();
213}
214
215void
216SyncCore::handleStateData(const Bytes &content)
217{
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800218 SyncStateMsgPtr msg(new SyncStateMsg);
219 bool success = msg->ParseFromArray(head(content), content.size());
220 if(!success)
221 {
222 // ignore misformed SyncData
Zhenkai Zhue851b952013-01-13 22:29:57 -0800223 cerr << "Misformed SyncData"<< endl;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800224 return;
225 }
226
227 int size = msg->state_size();
228 int index = 0;
229 while (index < size)
230 {
231 SyncState state = msg->state(index);
Zhenkai Zhue851b952013-01-13 22:29:57 -0800232 string devStr = state.name();
233 Name deviceName((const unsigned char *)devStr.c_str(), devStr.size());
234 if (state.type() == SyncState::UPDATE)
235 {
236 sqlite3_int64 seqno = state.seq();
237 m_log.UpdateDeviceSeqNo(deviceName, seqno);
238 if (state.has_locator())
239 {
240 string locStr = state.locator();
241 Name locatorName((const unsigned char *)locStr.c_str(), locStr.size());
242 m_log.UpdateLocator(deviceName, locatorName);
243 WriteLock(m_ypMutex);
244 m_yp[deviceName] = locatorName;
245 }
246 }
247 else
248 {
249 deregister(deviceName);
250 }
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800251 index++;
252 }
253
Zhenkai Zhue851b952013-01-13 22:29:57 -0800254 // find the actuall difference and invoke callback on the actual difference
255 HashPtr oldHash = m_rootHash;
256 m_rootHash = m_log.RememberStateInStateLog();
257 SyncStateMsgPtr diff = m_log.FindStateDifferences(*oldHash, *m_rootHash);
258 m_stateMsgCallback(diff);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800259}
260
261void
262SyncCore::sendSyncInterest()
263{
264 Name syncInterest = constructSyncName(m_rootHash);
Zhenkai Zhue851b952013-01-13 22:29:57 -0800265 m_handle->sendInterest(syncInterest, m_syncClosure);
266}
267
268void
269SyncCore::recover(const HashPtr &hash)
270{
271 if (!(*hash == *m_rootHash) && m_log.LookupSyncLog(*hash) <= 0)
272 {
273 // unfortunately we still don't recognize this hash
274 Bytes bytes;
275 readRaw(bytes, (const unsigned char *)hash->GetHash(), hash->GetHashBytes());
276 Name recoverInterest = m_syncPrefix;
277 recoverInterest.appendComp(RECOVER);
278 // append the unknown hash
279 recoverInterest.appendComp(bytes);
280 m_handle->sendInterest(recoverInterest, m_recoverClosure);
281 }
282 else
283 {
284 // we already learned the hash; cheers!
285 }
286}
287
288void
289SyncCore::deregister(const Name &name)
290{
291 // Do nothing for now
292 // TODO: handle deregistering
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800293}
294
295Name
296SyncCore::constructSyncName(const HashPtr &hash)
297{
298 Bytes bytes;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800299 readRaw(bytes, (const unsigned char*)hash->GetHash(), hash->GetHashBytes());
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800300 Name syncName = m_syncPrefix;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800301 syncName.appendComp(bytes);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800302 return syncName;
303}
304
305void
306SyncCore::msgToBytes(const SyncStateMsgPtr &msg, Bytes &bytes)
307{
Zhenkai Zhue851b952013-01-13 22:29:57 -0800308 int size = msg->ByteSize();
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800309 bytes.resize(size);
Zhenkai Zhue851b952013-01-13 22:29:57 -0800310 msg->SerializeToArray(head(bytes), size);
Zhenkai Zhu8224bb62013-01-06 15:58:02 -0800311}