blob: e39b86b1f191b311b53fdb451fb48c53eec04b92 [file] [log] [blame]
Zhenkai Zhu8d935c82012-03-06 10:44:12 -08001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
19 * 卞超轶 Chaoyi Bian <bcy@pku.edu.cn>
20 * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
21 */
Chaoyi Bian44fff0c2012-03-07 21:07:22 -080022
Chaoyi Bian11f294f2012-03-08 14:28:06 -080023#include "sync-logic.h"
Chaoyi Bian89ee2dc2012-03-09 14:06:01 -080024#include "sync-diff-leaf.h"
25#include "sync-full-leaf.h"
26#include <boost/make_shared.hpp>
27#include <boost/foreach.hpp>
Alexander Afanasyev387ac952012-03-11 23:49:27 -070028#include <boost/lexical_cast.hpp>
29#include <boost/date_time/posix_time/posix_time.hpp>
Zhenkai Zhua5d06d72012-03-09 15:16:24 -080030#include <vector>
Chaoyi Bian44fff0c2012-03-07 21:07:22 -080031
32using namespace std;
33using namespace boost;
34
35namespace Sync
36{
37
Alexander Afanasyev387ac952012-03-11 23:49:27 -070038const boost::posix_time::time_duration SyncLogic::m_delayedCheckTime = boost::posix_time::seconds (4.0);
39
40
Alexander Afanasyev750d1872012-03-12 15:33:56 -070041SyncLogic::SyncLogic (const std::string &syncPrefix,
42 LogicUpdateCallback onUpdate,
43 LogicRemoveCallback onRemove,
Alexander Afanasyevc1030192012-03-08 22:21:28 -080044 CcnxWrapperPtr ccnxHandle)
45 : m_syncPrefix (syncPrefix)
Alexander Afanasyev750d1872012-03-12 15:33:56 -070046 , m_onUpdate (onUpdate)
47 , m_onRemove (onRemove)
Alexander Afanasyevc1030192012-03-08 22:21:28 -080048 , m_ccnxHandle (ccnxHandle)
Alexander Afanasyev387ac952012-03-11 23:49:27 -070049 , m_delayedCheckThreadRunning (true)
Chaoyi Bian44fff0c2012-03-07 21:07:22 -080050{
Chaoyi Bian89ee2dc2012-03-09 14:06:01 -080051 srandom(time(NULL));
Alexander Afanasyev387ac952012-03-11 23:49:27 -070052 m_ccnxHandle->setInterestFilter (syncPrefix,
53 bind (&SyncLogic::respondSyncInterest, this, _1));
54
55 m_delayedCheckThread = thread (&SyncLogic::delayedChecksLoop, this);
Chaoyi Bian44fff0c2012-03-07 21:07:22 -080056}
57
Alexander Afanasyevc1030192012-03-08 22:21:28 -080058SyncLogic::~SyncLogic ()
Chaoyi Bian44fff0c2012-03-07 21:07:22 -080059{
Alexander Afanasyev387ac952012-03-11 23:49:27 -070060 m_delayedCheckThreadRunning = false;
61 // cout << "Requested stop" << this_thread::get_id () << endl;
62 m_delayedCheckThread.interrupt ();
63 m_delayedCheckThread.join ();
64}
Chaoyi Bian44fff0c2012-03-07 21:07:22 -080065
Alexander Afanasyev387ac952012-03-11 23:49:27 -070066void
67SyncLogic::delayedChecksLoop ()
68{
69 while (m_delayedCheckThreadRunning)
70 {
71 try
72 {
73 DelayedChecksList::value_type tuple;
74
75 {
76 unique_lock<mutex> lock (m_listChecksMutex);
77 while (m_delayedCheckThreadRunning && m_listChecks.size () == 0)
78 {
79 m_listChecksCondition.wait (lock);
80 // cout << "Got something" << endl;
81 }
82
83 if (m_listChecks.size () == 0) continue;
84
85 tuple = m_listChecks.front ();
86 m_listChecks.pop_front ();
87 // cout << "pop" << endl;
88 // release the mutex
89 }
90
91 // waiting and calling
92
93 // cout << "Duration: " << tuple.get<0> () - get_system_time () << endl;
94 this_thread::sleep (tuple.get<0> ());
95
96 if (!m_delayedCheckThreadRunning) continue;
97 tuple.get<1> () (); // call the scheduled function
98 }
99 catch (thread_interrupted e)
100 {
101 // cout << "interrupted: " << this_thread::get_id () << endl;
102 // do nothing
103 }
104 }
105 // cout << "Exited...\n";
106}
107
108
109
110void
111SyncLogic::respondSyncInterest (const string &interest)
112{
113 string hash = interest.substr(interest.find_last_of("/") + 1);
114 DigestPtr digest = make_shared<Digest> ();
115 try
116 {
117 istringstream is (hash);
118 is >> *digest;
119 }
120 catch (Error::DigestCalculationError &e)
121 {
122 // log error. ignoring it for now, later we should log it
123 return;
124 }
125
126 processSyncInterest (digest, interest);
127}
128
129void
130SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
131{
132 // cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700133 recursive_mutex::scoped_lock lock (m_stateMutex);
Alexander Afanasyev387ac952012-03-11 23:49:27 -0700134
135 if (*m_state.getDigest() == *digest)
136 {
137 m_syncInterestTable.insert (interestName);
138 return;
139 }
140
141 DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
142
143 if (stateInDiffLog != m_log.end ())
144 {
145 m_ccnxHandle->publishData (interestName,
146 lexical_cast<string> (*(*stateInDiffLog)->diff ()),
147 m_syncResponseFreshness);
148 return;
149 }
150
151 if (!timedProcessing)
152 {
153 {
154 // Alex: Should we ignore interests if interest with the same digest is already in the wait queue?
155
156 lock_guard<mutex> lock (m_listChecksMutex);
157 system_time delay = get_system_time () + m_delayedCheckTime;
158 // do we need randomization??
159 // delay += boost::ptime::milliseconds (rand() % 80 + 20);
160
161 m_listChecks.push_back (make_tuple (delay,
162 bind (&SyncLogic::processSyncInterest, this, digest, interestName, true))
163 );
164 }
165 m_listChecksCondition.notify_one ();
166 }
167 else
168 {
169 m_ccnxHandle->publishData (interestName + "/state",
170 lexical_cast<string> (m_state),
171 m_syncResponseFreshness);
172 }
Chaoyi Bian44fff0c2012-03-07 21:07:22 -0800173}
174
Alexander Afanasyevc1030192012-03-08 22:21:28 -0800175void
176SyncLogic::processSyncData (const string &name, const string &dataBuffer)
Chaoyi Bian44fff0c2012-03-07 21:07:22 -0800177{
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700178 DiffStatePtr diffLog = make_shared<DiffState> ();
Alexander Afanasyeve4e2bf72012-03-12 12:44:54 -0700179
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700180 try
181 {
182 recursive_mutex::scoped_lock lock (m_stateMutex);
183
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700184 string last = name.substr(name.find_last_of("/") + 1);
185 istringstream ss (dataBuffer);
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700186
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700187 if (last == "state")
188 {
189 FullState full;
190 ss >> full;
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700191 BOOST_FOREACH (LeafConstPtr leaf, full.getLeaves()) // order doesn't matter
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700192 {
193 NameInfoConstPtr info = leaf->getInfo ();
194 SeqNo seq = leaf->getSeq ();
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700195
196 bool inserted = false;
197 bool updated = false;
198 SeqNo oldSeq;
199 tie (inserted, updated, oldSeq) = m_state.update (info, seq);
Chaoyi Bian89ee2dc2012-03-09 14:06:01 -0800200
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700201 if (updated)
202 {
203 diffLog->update (info, seq);
204 m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700205 }
206 }
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700207 }
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700208 else
209 {
210 DiffState diff;
211 ss >> diff;
212 BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
213 {
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700214 DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
215 BOOST_ASSERT (diffLeaf != 0);
216
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700217 NameInfoConstPtr info = diffLeaf->getInfo();
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700218 if (diffLeaf->getOperation() == UPDATE)
219 {
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700220 SeqNo seq = diffLeaf->getSeq();
Chaoyi Bian89ee2dc2012-03-09 14:06:01 -0800221
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700222 bool inserted = false;
223 bool updated = false;
224 SeqNo oldSeq;
225 tie (inserted, updated, oldSeq) = m_state.update (info, seq);
Chaoyi Bian89ee2dc2012-03-09 14:06:01 -0800226
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700227 if (updated)
228 {
229 diffLog->update (info, seq);
230 m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700231 }
232 }
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700233 else if (diffLeaf->getOperation() == REMOVE)
234 {
235 if (m_state.remove (info))
236 {
237 diffLog->remove (info);
238 m_onRemove (info->toString ());
239 }
240 }
241 else
242 {
243 BOOST_ASSERT (false); // just in case
244 }
245 }
246 }
247
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700248 diffLog->setDigest(m_state.getDigest());
Alexander Afanasyeva36b7512012-03-12 16:03:03 -0700249 if (m_log.size () > 0)
250 {
251 m_log.get<sequenced> ().front ()->setNext (diffLog);
252 }
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700253 m_log.insert (diffLog);
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700254 }
255 catch (Error::SyncXmlDecodingFailure &e)
256 {
257 // log error
258 return;
259 }
260
261 if (diffLog->getLeaves ().size () > 0)
262 {
Alexander Afanasyev04fd8a82012-03-12 15:40:48 -0700263 sendSyncInterest();
264 }
Chaoyi Bian44fff0c2012-03-07 21:07:22 -0800265}
266
Alexander Afanasyevc1030192012-03-08 22:21:28 -0800267void
Alexander Afanasyevd91497c2012-03-12 15:39:30 -0700268SyncLogic::processPendingSyncInterests(DiffStatePtr &diff)
Chaoyi Bian44fff0c2012-03-07 21:07:22 -0800269{
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700270 recursive_mutex::scoped_lock lock (m_stateMutex);
Chaoyi Bian44fff0c2012-03-07 21:07:22 -0800271 diff->setDigest(m_state.getDigest());
272 m_log.insert(diff);
273
Alexander Afanasyevc1030192012-03-08 22:21:28 -0800274 vector<string> pis = m_syncInterestTable.fetchAll ();
Alexander Afanasyevc1030192012-03-08 22:21:28 -0800275 stringstream ss;
Chaoyi Bian44fff0c2012-03-07 21:07:22 -0800276 ss << *diff;
Alexander Afanasyev172d2b72012-03-08 23:43:39 -0800277 for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
278 {
Chaoyi Bian89ee2dc2012-03-09 14:06:01 -0800279 m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
Alexander Afanasyev172d2b72012-03-08 23:43:39 -0800280 }
281}
Chaoyi Bian44fff0c2012-03-07 21:07:22 -0800282
283void
Zhenkai Zhu0efa37b2012-03-12 13:54:12 -0700284SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
285{
Alexander Afanasyevd91497c2012-03-12 15:39:30 -0700286 recursive_mutex::scoped_lock lock (m_stateMutex);
Zhenkai Zhu0efa37b2012-03-12 13:54:12 -0700287 NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
288 SeqNo seqN(session, seq);
289 m_state.update(info, seqN);
290
291 DiffStatePtr diff = make_shared<DiffState>();
292 diff->update(info, seqN);
293
Alexander Afanasyevd91497c2012-03-12 15:39:30 -0700294 processPendingSyncInterests(diff);
Zhenkai Zhu0efa37b2012-03-12 13:54:12 -0700295}
296
297void
Alexander Afanasyevd91497c2012-03-12 15:39:30 -0700298SyncLogic::remove(const string &prefix)
299{
300 recursive_mutex::scoped_lock lock (m_stateMutex);
Zhenkai Zhu0efa37b2012-03-12 13:54:12 -0700301 NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
Alexander Afanasyevd91497c2012-03-12 15:39:30 -0700302 m_state.remove(info);
Zhenkai Zhu0efa37b2012-03-12 13:54:12 -0700303
304 DiffStatePtr diff = make_shared<DiffState>();
Alexander Afanasyevd91497c2012-03-12 15:39:30 -0700305 diff->remove(info);
Zhenkai Zhu0efa37b2012-03-12 13:54:12 -0700306
Alexander Afanasyevd91497c2012-03-12 15:39:30 -0700307 processPendingSyncInterests(diff);
Zhenkai Zhu0efa37b2012-03-12 13:54:12 -0700308}
309
310void
Zhenkai Zhu8d935c82012-03-06 10:44:12 -0800311SyncLogic::sendSyncInterest ()
312{
Alexander Afanasyev750d1872012-03-12 15:33:56 -0700313 recursive_mutex::scoped_lock lock (m_stateMutex);
314
Zhenkai Zhu8d935c82012-03-06 10:44:12 -0800315 ostringstream os;
316 os << m_syncPrefix << "/" << m_state.getDigest();
317
318 m_ccnxHandle->sendInterest (os.str (),
319 bind (&SyncLogic::processSyncData, this, _1, _2));
320}
321
322}