blob: 4486532cff7aa03fd50bff1e4a02a7aeefe776b1 [file] [log] [blame]
Zhenkai Zhu8224bb62013-01-06 15:58:02 -08001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Zhenkai Zhu <zhenkai@cs.ucla.edu>
19 * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
20 */
21
22#include "sync-core.h"
23
Zhenkai Zhue851b952013-01-13 22:29:57 -080024const string SyncCore::RECOVER = "RECOVER";
25const double SyncCore::WAIT = 0.2;
26const double SyncCore::RANDOM_PERCENT = 0.5;
27
28SyncCore::SyncCore(const string &path, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, CcnxWrapperPtr handle, SchedulerPtr scheduler)
29 : m_log(path, userName.toString())
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080030 , m_localPrefix(localPrefix)
31 , m_syncPrefix(syncPrefix)
32 , m_stateMsgCallback(callback)
Zhenkai Zhu8224bb62013-01-06 15:58:02 -080033 , m_handle(handle)
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080034 , m_scheduler(scheduler)
Zhenkai Zhu8224bb62013-01-06 15:58:02 -080035{
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080036 m_rootHash = m_log.RememberStateInStateLog();
Zhenkai Zhue851b952013-01-13 22:29:57 -080037 m_syncClosure = new Closure(0, boost::bind(&SyncCore::handleSyncData, this, _1, _2), boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1));
38 m_recoverClosure = new Closure(0, boost::bind(&SyncCore::handleRecoverData, this, _1, _2), boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1));
39 m_handle->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
40 m_log.initYP(m_yp);
Zhenkai Zhue29616f2013-01-14 15:40:57 -080041 //m_scheduler->start();
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080042 sendSyncInterest();
43}
44
45SyncCore::~SyncCore()
46{
Zhenkai Zhue29616f2013-01-14 15:40:57 -080047 //m_scheduler->shutdown();
Zhenkai Zhue851b952013-01-13 22:29:57 -080048 if (m_syncClosure != 0)
49 {
50 delete m_syncClosure;
51 m_syncClosure = 0;
52 }
53
54 if (m_recoverClosure != 0)
55 {
56 delete m_recoverClosure;
57 m_recoverClosure = 0;
58 }
59}
60
61Name
62SyncCore::yp(const Name &deviceName)
63{
64 Name locator;
65 ReadLock(m_ypMutex);
66 if (m_yp.find(deviceName) != m_yp.end())
67 {
68 locator = m_yp[deviceName];
69 }
70 return locator;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080071}
72
73void
74SyncCore::updateLocalPrefix(const Name &localPrefix)
75{
76 m_localPrefix = localPrefix;
77 // optionally, we can have a sync action to announce the new prefix
78 // we are not doing this for now
79}
80
81void
Zhenkai Zhue851b952013-01-13 22:29:57 -080082SyncCore::updateLocalState(sqlite3_int64 seqno)
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080083{
Zhenkai Zhue851b952013-01-13 22:29:57 -080084 m_log.UpdateDeviceSeqNo(m_userName, seqno);
85 // choose to update locator everytime
86 m_log.UpdateLocator(m_userName, m_localPrefix);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080087 HashPtr oldHash = m_rootHash;
88 m_rootHash = m_log.RememberStateInStateLog();
89
Zhenkai Zhu05de64a2013-01-14 15:48:23 -080090 cout << "Old hash: " << *oldHash << endl;
91 cout << "New hash: " << *m_rootHash << endl;
92
Zhenkai Zhue851b952013-01-13 22:29:57 -080093 SyncStateMsgPtr msg = m_log.FindStateDifferences(*oldHash, *m_rootHash);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080094
95 // reply sync Interest with oldHash as last component
96 Name syncName = constructSyncName(oldHash);
97 Bytes syncData;
98 msgToBytes(msg, syncData);
99 m_handle->publishData(syncName, syncData, FRESHNESS);
100
101 // no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
102 // this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
103 ostringstream ss;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800104 ss << *m_rootHash;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800105 TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::sendSyncInterest, this), ss.str(), m_scheduler, 0.05));
106 m_scheduler->addTask(task);
107}
108
109void
Zhenkai Zhue851b952013-01-13 22:29:57 -0800110SyncCore::handleInterest(const Name &name)
111{
112 int size = name.size();
113 int prefixSize = m_syncPrefix.size();
114 if (size == prefixSize + 1)
115 {
116 // this is normal sync interest
117 handleSyncInterest(name);
118 }
119 else if (size == prefixSize + 2 && name.getCompAsString(m_syncPrefix.size()) == RECOVER)
120 {
121 // this is recovery interest
122 handleRecoverInterest(name);
123 }
124}
125
126void
127SyncCore::handleRecoverInterest(const Name &name)
128{
129 Bytes hashBytes = name.getComp(name.size() - 1);
130 // this is the hash unkonwn to the sender of the interest
131 Hash hash(head(hashBytes), hashBytes.size());
132 if (m_log.LookupSyncLog(hash) > 0)
133 {
134 // we know the hash, should reply everything
135 SyncStateMsgPtr msg = m_log.FindStateDifferences(*(Hash::Origin), *m_rootHash);
136 Bytes syncData;
137 msgToBytes(msg, syncData);
138 m_handle->publishData(name, syncData, FRESHNESS);
139 }
140 else
141 {
142 // we don't recognize this hash, can not help
143 }
144}
145
146void
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800147SyncCore::handleSyncInterest(const Name &name)
148{
Zhenkai Zhue851b952013-01-13 22:29:57 -0800149 Bytes hashBytes = name.getComp(name.size() - 1);
150 HashPtr hash(new Hash(head(hashBytes), hashBytes.size()));
151 if (*hash == *m_rootHash)
152 {
153 // we have the same hash; nothing needs to be done
154 return;
155 }
156 else if (m_log.LookupSyncLog(*hash) > 0)
157 {
158 // we know something more
Zhenkai Zhue29616f2013-01-14 15:40:57 -0800159 cout << "SyncInterest: " << name << endl;
160 cout << "hash: " << *hash << endl;
161 cout << "root hash: " << *m_rootHash << endl;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800162 SyncStateMsgPtr msg = m_log.FindStateDifferences(*hash, *m_rootHash);
163
164 Bytes syncData;
165 msgToBytes(msg, syncData);
166 m_handle->publishData(name, syncData, FRESHNESS);
167 }
168 else
169 {
170 // we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
171 ostringstream ss;
172 ss << *hash;
173 IntervalGeneratorPtr generator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP));
174 TaskPtr task(new PeriodicTask(boost::bind(&SyncCore::recover, this, hash), ss.str(), m_scheduler, generator, 1));
175 m_scheduler->addTask(task);
176 }
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800177}
178
179Closure::TimeoutCallbackReturnValue
180SyncCore::handleSyncInterestTimeout(const Name &name)
181{
182 // sendInterestInterest with the current root hash;
183 sendSyncInterest();
Zhenkai Zhue851b952013-01-13 22:29:57 -0800184 return Closure::RESULT_OK;
185}
186
187Closure::TimeoutCallbackReturnValue
188SyncCore::handleRecoverInterestTimeout(const Name &name)
189{
190 // We do not re-express recovery interest for now
191 // if difference is not resolved, the sync interest will trigger
192 // recovery anyway; so it's not so important to have recovery interest
193 // re-expressed
194 return Closure::RESULT_OK;
195}
196
197void
198SyncCore::handleRecoverData(const Name &name, const Bytes &content)
199{
200 handleStateData(content);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800201}
202
203void
204SyncCore::handleSyncData(const Name &name, const Bytes &content)
205{
Zhenkai Zhue851b952013-01-13 22:29:57 -0800206 // suppress recover in interest - data out of order case
207 handleStateData(content);
208
209 // resume outstanding sync interest
210 sendSyncInterest();
211}
212
213void
214SyncCore::handleStateData(const Bytes &content)
215{
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800216 SyncStateMsgPtr msg(new SyncStateMsg);
217 bool success = msg->ParseFromArray(head(content), content.size());
218 if(!success)
219 {
220 // ignore misformed SyncData
Zhenkai Zhue851b952013-01-13 22:29:57 -0800221 cerr << "Misformed SyncData"<< endl;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800222 return;
223 }
224
225 int size = msg->state_size();
226 int index = 0;
227 while (index < size)
228 {
229 SyncState state = msg->state(index);
Zhenkai Zhue851b952013-01-13 22:29:57 -0800230 string devStr = state.name();
231 Name deviceName((const unsigned char *)devStr.c_str(), devStr.size());
232 if (state.type() == SyncState::UPDATE)
233 {
234 sqlite3_int64 seqno = state.seq();
235 m_log.UpdateDeviceSeqNo(deviceName, seqno);
236 if (state.has_locator())
237 {
238 string locStr = state.locator();
239 Name locatorName((const unsigned char *)locStr.c_str(), locStr.size());
240 m_log.UpdateLocator(deviceName, locatorName);
241 WriteLock(m_ypMutex);
242 m_yp[deviceName] = locatorName;
243 }
244 }
245 else
246 {
247 deregister(deviceName);
248 }
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800249 index++;
250 }
251
Zhenkai Zhue851b952013-01-13 22:29:57 -0800252 // find the actuall difference and invoke callback on the actual difference
253 HashPtr oldHash = m_rootHash;
254 m_rootHash = m_log.RememberStateInStateLog();
255 SyncStateMsgPtr diff = m_log.FindStateDifferences(*oldHash, *m_rootHash);
256 m_stateMsgCallback(diff);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800257}
258
259void
260SyncCore::sendSyncInterest()
261{
262 Name syncInterest = constructSyncName(m_rootHash);
Zhenkai Zhue851b952013-01-13 22:29:57 -0800263 m_handle->sendInterest(syncInterest, m_syncClosure);
264}
265
266void
267SyncCore::recover(const HashPtr &hash)
268{
269 if (!(*hash == *m_rootHash) && m_log.LookupSyncLog(*hash) <= 0)
270 {
271 // unfortunately we still don't recognize this hash
272 Bytes bytes;
273 readRaw(bytes, (const unsigned char *)hash->GetHash(), hash->GetHashBytes());
274 Name recoverInterest = m_syncPrefix;
275 recoverInterest.appendComp(RECOVER);
276 // append the unknown hash
277 recoverInterest.appendComp(bytes);
278 m_handle->sendInterest(recoverInterest, m_recoverClosure);
279 }
280 else
281 {
282 // we already learned the hash; cheers!
283 }
284}
285
286void
287SyncCore::deregister(const Name &name)
288{
289 // Do nothing for now
290 // TODO: handle deregistering
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800291}
292
293Name
294SyncCore::constructSyncName(const HashPtr &hash)
295{
296 Bytes bytes;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800297 readRaw(bytes, (const unsigned char*)hash->GetHash(), hash->GetHashBytes());
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800298 Name syncName = m_syncPrefix;
Zhenkai Zhue851b952013-01-13 22:29:57 -0800299 syncName.appendComp(bytes);
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800300 return syncName;
301}
302
303void
304SyncCore::msgToBytes(const SyncStateMsgPtr &msg, Bytes &bytes)
305{
Zhenkai Zhue851b952013-01-13 22:29:57 -0800306 int size = msg->ByteSize();
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800307 bytes.resize(size);
Zhenkai Zhue851b952013-01-13 22:29:57 -0800308 msg->SerializeToArray(head(bytes), size);
Zhenkai Zhu8224bb62013-01-06 15:58:02 -0800309}