blob: 2e0169c05be999fe949a56ece08c3d9aad486a7a [file] [log] [blame]
akmhoque66e66182014-02-21 17:56:03 -06001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
19 * Chaoyi Bian <bcy@pku.edu.cn>
akmhoque157b0a42014-05-13 00:26:37 -050020 * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
akmhoque66e66182014-02-21 17:56:03 -060021 * Yingdi Yu <yingdi@cs.ucla.edu>
22 */
23
24#include "sync-logic.h"
25#include "sync-diff-leaf.h"
26#include "sync-full-leaf.h"
27#include "sync-logging.h"
28#include "sync-state.h"
29
30#include <boost/foreach.hpp>
31#include <boost/lexical_cast.hpp>
32#include <vector>
33
34using namespace std;
Vince Lehman0a7da612014-10-29 14:39:29 -050035
36using ndn::Data;
37using ndn::EventId;
38using ndn::KeyChain;
39using ndn::Face;
40using ndn::Name;
41using ndn::OnDataValidated;
42using ndn::OnDataValidationFailed;
43using ndn::Validator;
akmhoque66e66182014-02-21 17:56:03 -060044
45INIT_LOGGER ("SyncLogic");
46
47
48#ifdef _DEBUG
49#define _LOG_DEBUG_ID(v) _LOG_DEBUG(m_instanceId << " " << v)
50#else
51#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
52#endif
53
54#define GET_RANDOM(var) var ()
55
56#define TIME_SECONDS_WITH_JITTER(sec) \
akmhoque157b0a42014-05-13 00:26:37 -050057 (ndn::time::seconds(sec) + ndn::time::milliseconds(GET_RANDOM (m_reexpressionJitter)))
akmhoque66e66182014-02-21 17:56:03 -060058
59#define TIME_MILLISECONDS_WITH_JITTER(ms) \
akmhoque157b0a42014-05-13 00:26:37 -050060 (ndn::time::seconds(ms) + ndn::time::milliseconds(GET_RANDOM (m_reexpressionJitter)))
akmhoque66e66182014-02-21 17:56:03 -060061
62namespace Sync {
63
64using ndn::shared_ptr;
65
66int SyncLogic::m_instanceCounter = 0;
67
Yingdi Yu40cd1c32014-04-17 15:02:17 -070068const int SyncLogic::m_syncResponseFreshness = 1000; // MUST BE dividable by 1000!!!
69const int SyncLogic::m_syncInterestReexpress = 4; // seconds
70
akmhoque66e66182014-02-21 17:56:03 -060071SyncLogic::SyncLogic (const Name& syncPrefix,
Yingdi Yu40cd1c32014-04-17 15:02:17 -070072 shared_ptr<Validator> validator,
akmhoque66e66182014-02-21 17:56:03 -060073 shared_ptr<Face> face,
74 LogicUpdateCallback onUpdate,
75 LogicRemoveCallback onRemove)
76 : m_state (new FullState)
akmhoque157b0a42014-05-13 00:26:37 -050077 , m_syncInterestTable (face->getIoService(), ndn::time::seconds(m_syncInterestReexpress))
akmhoque66e66182014-02-21 17:56:03 -060078 , m_syncPrefix (syncPrefix)
79 , m_onUpdate (onUpdate)
80 , m_onRemove (onRemove)
81 , m_perBranch (false)
82 , m_validator(validator)
83 , m_keyChain(new KeyChain())
84 , m_face(face)
akmhoque157b0a42014-05-13 00:26:37 -050085 , m_scheduler(face->getIoService())
akmhoque66e66182014-02-21 17:56:03 -060086 , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
87 , m_rangeUniformRandom (m_randomGenerator, boost::uniform_int<> (200,1000))
88 , m_reexpressionJitter (m_randomGenerator, boost::uniform_int<> (100,500))
89 , m_recoveryRetransmissionInterval (m_defaultRecoveryRetransmitInterval)
Yingdi Yu40cd1c32014-04-17 15:02:17 -070090{
91 m_syncRegisteredPrefixId = m_face->setInterestFilter (m_syncPrefix,
akmhoque157b0a42014-05-13 00:26:37 -050092 bind(&SyncLogic::onSyncInterest,
93 this, _1, _2),
94 bind(&SyncLogic::onSyncRegisterSucceed,
95 this, _1),
96 bind(&SyncLogic::onSyncRegisterFailed,
97 this, _1, _2));
Yingdi Yu40cd1c32014-04-17 15:02:17 -070098
akmhoque66e66182014-02-21 17:56:03 -060099
akmhoque157b0a42014-05-13 00:26:37 -0500100 m_reexpressingInterestId = m_scheduler.scheduleEvent (ndn::time::seconds (0),
akmhoque66e66182014-02-21 17:56:03 -0600101 bind (&SyncLogic::sendSyncInterest, this));
102
103 m_instanceId = string("Instance " + boost::lexical_cast<string>(m_instanceCounter++) + " ");
104}
105
106SyncLogic::SyncLogic (const Name& syncPrefix,
107 shared_ptr<Validator> validator,
108 shared_ptr<Face> face,
109 LogicPerBranchCallback onUpdateBranch)
110 : m_state (new FullState)
akmhoque157b0a42014-05-13 00:26:37 -0500111 , m_syncInterestTable (face->getIoService(), ndn::time::seconds (m_syncInterestReexpress))
akmhoque66e66182014-02-21 17:56:03 -0600112 , m_syncPrefix (syncPrefix)
113 , m_onUpdateBranch (onUpdateBranch)
114 , m_perBranch(true)
115 , m_validator(validator)
116 , m_keyChain(new KeyChain())
117 , m_face(face)
akmhoque157b0a42014-05-13 00:26:37 -0500118 , m_scheduler(face->getIoService())
akmhoque66e66182014-02-21 17:56:03 -0600119 , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
120 , m_rangeUniformRandom (m_randomGenerator, boost::uniform_int<> (200,1000))
121 , m_reexpressionJitter (m_randomGenerator, boost::uniform_int<> (100,500))
122 , m_recoveryRetransmissionInterval (m_defaultRecoveryRetransmitInterval)
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700123{
124 m_syncRegisteredPrefixId = m_face->setInterestFilter (m_syncPrefix,
akmhoque157b0a42014-05-13 00:26:37 -0500125 bind(&SyncLogic::onSyncInterest,
126 this, _1, _2),
127 bind(&SyncLogic::onSyncRegisterSucceed,
128 this, _1),
129 bind(&SyncLogic::onSyncRegisterFailed,
130 this, _1, _2));
akmhoque66e66182014-02-21 17:56:03 -0600131
akmhoque157b0a42014-05-13 00:26:37 -0500132 m_reexpressingInterestId = m_scheduler.scheduleEvent (ndn::time::seconds (0),
akmhoque66e66182014-02-21 17:56:03 -0600133 bind (&SyncLogic::sendSyncInterest, this));
134}
135
136SyncLogic::~SyncLogic ()
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700137{
138 m_face->unsetInterestFilter(m_syncRegisteredPrefixId);
akmhoque66e66182014-02-21 17:56:03 -0600139 m_scheduler.cancelEvent (m_reexpressingInterestId);
140 m_scheduler.cancelEvent (m_delayedInterestProcessingId);
141}
142
143/**
144 * Two types of intersts
145 *
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700146 * Normal name: .../<hash>
akmhoque66e66182014-02-21 17:56:03 -0600147 * Recovery name: .../recovery/<hash>
148 */
Vince Lehman0a7da612014-10-29 14:39:29 -0500149tuple<DigestConstPtr, std::string>
akmhoque66e66182014-02-21 17:56:03 -0600150SyncLogic::convertNameToDigestAndType (const Name &name)
151{
152 BOOST_ASSERT (m_syncPrefix.isPrefixOf(name));
153
154 int nameLengthDiff = name.size() - m_syncPrefix.size();
155 BOOST_ASSERT (nameLengthDiff > 0);
156 BOOST_ASSERT (nameLengthDiff < 3);
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700157
akmhoque157b0a42014-05-13 00:26:37 -0500158 string hash = name.get(-1).toUri();
akmhoque66e66182014-02-21 17:56:03 -0600159 string interestType;
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700160
akmhoque66e66182014-02-21 17:56:03 -0600161 if(nameLengthDiff == 1)
162 interestType = "normal";
163 else
akmhoque157b0a42014-05-13 00:26:37 -0500164 interestType = name.get(-2).toUri();
akmhoque66e66182014-02-21 17:56:03 -0600165
166 _LOG_DEBUG_ID (hash << ", " << interestType);
167
Vince Lehman0a7da612014-10-29 14:39:29 -0500168 DigestPtr digest = make_shared<Digest> ();
akmhoque66e66182014-02-21 17:56:03 -0600169 istringstream is (hash);
170 is >> *digest;
171
Vince Lehman0a7da612014-10-29 14:39:29 -0500172 return make_tuple(digest, interestType);
akmhoque66e66182014-02-21 17:56:03 -0600173}
174
175void
176SyncLogic::onSyncInterest (const Name& prefix, const ndn::Interest& interest)
177{
178 Name name = interest.getName();
179
180 _LOG_DEBUG_ID("respondSyncInterest: " << name);
181
182 try
183 {
184 _LOG_DEBUG_ID ("<< I " << name);
185
186 DigestConstPtr digest;
187 string type;
188 tie (digest, type) = convertNameToDigestAndType (name);
189
190 if (type == "normal") // kind of ineffective...
191 {
192 processSyncInterest (name, digest);
193 }
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700194 else if (type == "recovery")
akmhoque66e66182014-02-21 17:56:03 -0600195 {
196 processSyncRecoveryInterest (name, digest);
197 }
198 }
199 catch (Error::DigestCalculationError &e)
200 {
201 _LOG_DEBUG_ID ("Something fishy happened...");
202 // log error. ignoring it for now, later we should log it
203 return ;
204 }
205}
206
207void
akmhoque157b0a42014-05-13 00:26:37 -0500208SyncLogic::onSyncRegisterSucceed(const Name& prefix)
209{
210 _LOG_DEBUG_ID("Sync prefix registration succeeded! " << prefix);
211}
212
213void
akmhoque66e66182014-02-21 17:56:03 -0600214SyncLogic::onSyncRegisterFailed(const Name& prefix, const string& msg)
215{
216 _LOG_DEBUG_ID("Sync prefix registration failed! " << msg);
217}
218
219void
220SyncLogic::onSyncData(const ndn::Interest& interest, Data& data)
221{
222 OnDataValidated onValidated = bind(&SyncLogic::onSyncDataValidated, this, _1);
223 OnDataValidationFailed onValidationFailed = bind(&SyncLogic::onSyncDataValidationFailed, this, _1);
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700224 m_validator->validate(data, onValidated, onValidationFailed);
akmhoque66e66182014-02-21 17:56:03 -0600225}
226
227void
228SyncLogic::onSyncTimeout(const ndn::Interest& interest)
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700229{
230 // It is OK. Others will handle the time out situation.
akmhoque66e66182014-02-21 17:56:03 -0600231}
232
233void
234SyncLogic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
235{
236 _LOG_DEBUG_ID("Sync data cannot be verified!");
237}
238
239void
240SyncLogic::onSyncDataValidated(const shared_ptr<const Data>& data)
241{
242 Name name = data->getName();
243 const char* wireData = (const char*)data->getContent().value();
244 size_t len = data->getContent().value_size();
245
246 try
247 {
248 _LOG_DEBUG_ID ("<< D " << name);
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700249
akmhoque66e66182014-02-21 17:56:03 -0600250 DigestConstPtr digest;
251 string type;
252 tie (digest, type) = convertNameToDigestAndType (name);
253
254 if (type == "normal")
255 {
256 processSyncData (name, digest, wireData, len);
257 }
258 else
259 {
260 // timer is always restarted when we schedule recovery
261 m_scheduler.cancelEvent (m_reexpressingRecoveryInterestId);
262 processSyncData (name, digest, wireData, len);
263 }
264 }
265 catch (Error::DigestCalculationError &e)
266 {
267 _LOG_DEBUG_ID ("Something fishy happened...");
268 // log error. ignoring it for now, later we should log it
269 return;
270 }
271}
272
273void
akmhoque157b0a42014-05-13 00:26:37 -0500274SyncLogic::processSyncInterest (const Name &name, DigestConstPtr digest,
275 bool timedProcessing/*=false*/)
akmhoque66e66182014-02-21 17:56:03 -0600276{
277 _LOG_DEBUG_ID("processSyncInterest");
278 DigestConstPtr rootDigest;
279 {
280 rootDigest = m_state->getDigest();
281 }
282
283 // Special case when state is not empty and we have received request with zero-root digest
284 if (digest->isZero () && !rootDigest->isZero ())
285 {
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700286
akmhoque66e66182014-02-21 17:56:03 -0600287 SyncStateMsg ssm;
288 {
289 ssm << (*m_state);
290 }
291 sendSyncData (name, digest, ssm);
292 return;
293 }
294
295 if (*rootDigest == *digest)
296 {
297 _LOG_DEBUG_ID ("processSyncInterest (): Same state. Adding to PIT");
298 m_syncInterestTable.insert (digest, name.toUri(), false);
299 return;
300 }
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700301
akmhoque66e66182014-02-21 17:56:03 -0600302 DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
303
304 if (stateInDiffLog != m_log.end ())
305 {
306 DiffStateConstPtr stateDiff = (*stateInDiffLog)->diff ();
307
308 sendSyncData (name, digest, stateDiff);
309 return;
310 }
311
312 if (!timedProcessing)
313 {
314 bool exists = m_syncInterestTable.insert (digest, name.toUri(), true);
315 if (exists) // somebody else replied, so restart random-game timer
316 {
akmhoque157b0a42014-05-13 00:26:37 -0500317 _LOG_DEBUG_ID("Unknown digest, but somebody may have already replied, " <<
318 "so restart our timer");
akmhoque66e66182014-02-21 17:56:03 -0600319 m_scheduler.cancelEvent (m_delayedInterestProcessingId);
320 }
321
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700322 uint32_t waitDelay = GET_RANDOM (m_rangeUniformRandom);
akmhoque157b0a42014-05-13 00:26:37 -0500323 _LOG_DEBUG_ID("Digest is not in the log. Schedule processing after small delay: " <<
324 ndn::time::milliseconds (waitDelay));
akmhoque66e66182014-02-21 17:56:03 -0600325
akmhoque157b0a42014-05-13 00:26:37 -0500326 m_delayedInterestProcessingId =
327 m_scheduler.scheduleEvent(ndn::time::milliseconds (waitDelay),
328 bind (&SyncLogic::processSyncInterest, this, name, digest, true));
akmhoque66e66182014-02-21 17:56:03 -0600329 }
330 else
331 {
akmhoque157b0a42014-05-13 00:26:37 -0500332 _LOG_DEBUG_ID(" (timed processing)");
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700333
akmhoque66e66182014-02-21 17:56:03 -0600334 m_recoveryRetransmissionInterval = m_defaultRecoveryRetransmitInterval;
335 sendSyncRecoveryInterests (digest);
336 }
337}
338
339void
akmhoque157b0a42014-05-13 00:26:37 -0500340SyncLogic::processSyncData (const Name &name, DigestConstPtr digest,
341 const char *wireData, size_t len)
akmhoque66e66182014-02-21 17:56:03 -0600342{
Vince Lehman0a7da612014-10-29 14:39:29 -0500343 DiffStatePtr diffLog = make_shared<DiffState> ();
akmhoque66e66182014-02-21 17:56:03 -0600344 bool ownInterestSatisfied = false;
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700345
akmhoque66e66182014-02-21 17:56:03 -0600346 try
347 {
348
349 m_syncInterestTable.remove (name.toUri()); // Remove satisfied interest from PIT
350
351 ownInterestSatisfied = (name == m_outstandingInterestName);
352
353 DiffState diff;
354 SyncStateMsg msg;
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700355 if (!msg.ParseFromArray(wireData, len) || !msg.IsInitialized())
akmhoque66e66182014-02-21 17:56:03 -0600356 {
357 //Throw
358 BOOST_THROW_EXCEPTION (Error::SyncStateMsgDecodingFailure () );
359 }
360 msg >> diff;
361
362 vector<MissingDataInfo> v;
363 BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
364 {
Vince Lehman0a7da612014-10-29 14:39:29 -0500365 DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
akmhoque66e66182014-02-21 17:56:03 -0600366 BOOST_ASSERT (diffLeaf != 0);
367
368 NameInfoConstPtr info = diffLeaf->getInfo();
369 if (diffLeaf->getOperation() == UPDATE)
370 {
371 SeqNo seq = diffLeaf->getSeq();
372
373 bool inserted = false;
374 bool updated = false;
375 SeqNo oldSeq;
376 {
Vince Lehman0a7da612014-10-29 14:39:29 -0500377 tie (inserted, updated, oldSeq) = m_state->update (info, seq);
akmhoque66e66182014-02-21 17:56:03 -0600378 }
379
380 if (inserted || updated)
381 {
382 diffLog->update (info, seq);
383 if (!oldSeq.isValid())
384 {
385 oldSeq = SeqNo(seq.getSession(), 0);
386 }
387 else
388 {
389 ++oldSeq;
390 }
391 // there is no need for application to process update on forwarder node
392 if (info->toString() != forwarderPrefix)
393 {
394 MissingDataInfo mdi = {info->toString(), oldSeq, seq};
395 {
396 ostringstream interestName;
akmhoque157b0a42014-05-13 00:26:37 -0500397 interestName << mdi.prefix <<
398 "/" << mdi.high.getSession() <<
399 "/" << mdi.high.getSeq();
akmhoque66e66182014-02-21 17:56:03 -0600400 _LOG_DEBUG_ID("+++++++++++++++ " + interestName.str());
401 }
402 if (m_perBranch)
403 {
404 ostringstream interestName;
akmhoque157b0a42014-05-13 00:26:37 -0500405 interestName << mdi.prefix <<
406 "/" << mdi.high.getSession() <<
407 "/" << mdi.high.getSeq();
akmhoque66e66182014-02-21 17:56:03 -0600408 m_onUpdateBranch(interestName.str());
409 }
410 else
411 {
412 v.push_back(mdi);
413 }
414 }
415 }
416 }
417 else if (diffLeaf->getOperation() == REMOVE)
418 {
419 if (m_state->remove (info))
420 {
421 diffLog->remove (info);
422 if (!m_perBranch)
423 {
424 m_onRemove (info->toString ());
425 }
426 }
427 }
428 else
429 {
430 }
431 }
432
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700433 if (!v.empty())
akmhoque66e66182014-02-21 17:56:03 -0600434 {
435 if (!m_perBranch)
436 {
437 m_onUpdate(v);
438 }
439 }
440
441 insertToDiffLog (diffLog);
442 }
443 catch (Error::SyncStateMsgDecodingFailure &e)
444 {
445 _LOG_DEBUG_ID ("Something really fishy happened during state decoding " <<
446 diagnostic_information (e));
447 diffLog.reset ();
448 // don't do anything
449 }
450
451 if ((diffLog != 0 && diffLog->getLeaves ().size () > 0) ||
452 ownInterestSatisfied)
453 {
454 _LOG_DEBUG_ID(" +++++++++++++++ state changed!!!");
455 // Do it only if everything went fine and state changed
456
457 // this is kind of wrong
akmhoque157b0a42014-05-13 00:26:37 -0500458 // satisfyPendingSyncInterests (diffLog); // if there are interests in PIT,
459 // // there is a point to satisfy them using new state
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700460
akmhoque66e66182014-02-21 17:56:03 -0600461 // if state has changed, then it is safe to express a new interest
akmhoque157b0a42014-05-13 00:26:37 -0500462 ndn::time::system_clock::Duration after =
463 ndn::time::milliseconds(GET_RANDOM (m_reexpressionJitter));
akmhoque66e66182014-02-21 17:56:03 -0600464 // cout << "------------ reexpress interest after: " << after << endl;
465 EventId eventId = m_scheduler.scheduleEvent (after,
466 bind (&SyncLogic::sendSyncInterest, this));
467
468 m_scheduler.cancelEvent (m_reexpressingInterestId);
469 m_reexpressingInterestId = eventId;
470 }
471}
472
473void
474SyncLogic::processSyncRecoveryInterest (const Name &name, DigestConstPtr digest)
475{
476 _LOG_DEBUG_ID("processSyncRecoveryInterest");
477 DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
478
479 if (stateInDiffLog == m_log.end ())
480 {
481 _LOG_DEBUG_ID ("Could not find " << *digest << " in digest log");
482 return;
483 }
484
485 SyncStateMsg ssm;
486 {
487 ssm << (*m_state);
488 }
489 sendSyncData (name, digest, ssm);
490}
491
492void
493SyncLogic::satisfyPendingSyncInterests (DiffStateConstPtr diffLog)
494{
Vince Lehman0a7da612014-10-29 14:39:29 -0500495 DiffStatePtr fullStateLog = make_shared<DiffState> ();
akmhoque66e66182014-02-21 17:56:03 -0600496 {
497 BOOST_FOREACH (LeafConstPtr leaf, m_state->getLeaves ()/*.get<timed> ()*/)
498 {
499 fullStateLog->update (leaf->getInfo (), leaf->getSeq ());
500 /// @todo Impose limit on how many state info should be send out
501 }
502 }
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700503
akmhoque66e66182014-02-21 17:56:03 -0600504 try
505 {
506 uint32_t counter = 0;
507 while (m_syncInterestTable.size () > 0)
508 {
509 Sync::Interest interest = m_syncInterestTable.pop ();
510
511 if (!interest.m_unknown)
512 {
513 _LOG_DEBUG_ID (">> D " << interest.m_name);
514 sendSyncData (interest.m_name, interest.m_digest, diffLog);
515 }
516 else
517 {
518 _LOG_DEBUG_ID (">> D (unknown)" << interest.m_name);
519 sendSyncData (interest.m_name, interest.m_digest, fullStateLog);
520 }
521 counter ++;
522 }
523 _LOG_DEBUG_ID ("Satisfied " << counter << " pending interests");
524 }
525 catch (Error::InterestTableIsEmpty &e)
526 {
527 // ok. not really an error
528 }
529}
530
531void
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700532SyncLogic::insertToDiffLog (DiffStatePtr diffLog)
akmhoque66e66182014-02-21 17:56:03 -0600533{
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700534 diffLog->setDigest (m_state->getDigest());
akmhoque66e66182014-02-21 17:56:03 -0600535 if (m_log.size () > 0)
536 {
537 m_log.get<sequenced> ().front ()->setNext (diffLog);
538 }
akmhoque157b0a42014-05-13 00:26:37 -0500539 m_log.erase (m_state->getDigest()); // remove diff state with the same digest.
540 // next pointers are still valid
akmhoque66e66182014-02-21 17:56:03 -0600541 /// @todo Optimization
542 m_log.get<sequenced> ().push_front (diffLog);
543}
544
545void
546SyncLogic::addLocalNames (const Name &prefix, uint64_t session, uint64_t seq)
547{
548 DiffStatePtr diff;
549 {
550 //cout << "Add local names" <<endl;
551 NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix.toUri());
552
553 _LOG_DEBUG_ID ("addLocalNames (): old state " << *m_state->getDigest ());
554
555 SeqNo seqN (session, seq);
556 m_state->update(info, seqN);
557
558 _LOG_DEBUG_ID ("addLocalNames (): new state " << *m_state->getDigest ());
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700559
Vince Lehman0a7da612014-10-29 14:39:29 -0500560 diff = make_shared<DiffState>();
akmhoque66e66182014-02-21 17:56:03 -0600561 diff->update(info, seqN);
562 insertToDiffLog (diff);
563 }
564
565 // _LOG_DEBUG_ID ("PIT size: " << m_syncInterestTable.size ());
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700566 satisfyPendingSyncInterests (diff);
akmhoque66e66182014-02-21 17:56:03 -0600567}
568
569void
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700570SyncLogic::remove(const Name &prefix)
akmhoque66e66182014-02-21 17:56:03 -0600571{
572 DiffStatePtr diff;
573 {
574 NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix.toUri());
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700575 m_state->remove(info);
akmhoque66e66182014-02-21 17:56:03 -0600576
577 // increment the sequence number for the forwarder node
578 NameInfoConstPtr forwarderInfo = StdNameInfo::FindOrCreate(forwarderPrefix);
579
580 LeafContainer::iterator item = m_state->getLeaves ().find (forwarderInfo);
581 SeqNo seqNo (0);
582 if (item != m_state->getLeaves ().end ())
583 {
584 seqNo = (*item)->getSeq ();
585 ++seqNo;
586 }
587 m_state->update (forwarderInfo, seqNo);
588
Vince Lehman0a7da612014-10-29 14:39:29 -0500589 diff = make_shared<DiffState>();
akmhoque66e66182014-02-21 17:56:03 -0600590 diff->remove(info);
591 diff->update(forwarderInfo, seqNo);
592
593 insertToDiffLog (diff);
594 }
595
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700596 satisfyPendingSyncInterests (diff);
akmhoque66e66182014-02-21 17:56:03 -0600597}
598
599void
600SyncLogic::sendSyncInterest ()
601{
602 _LOG_DEBUG_ID("sendSyncInterest");
603
604 {
605 m_outstandingInterestName = m_syncPrefix;
606 ostringstream os;
607 os << *m_state->getDigest();
608 m_outstandingInterestName.append(os.str());
609 _LOG_DEBUG_ID (">> I " << m_outstandingInterestName);
610 }
611
612 _LOG_DEBUG_ID("sendSyncInterest: " << m_outstandingInterestName);
613
akmhoque157b0a42014-05-13 00:26:37 -0500614 EventId eventId =
615 m_scheduler.scheduleEvent(ndn::time::seconds(m_syncInterestReexpress) +
616 ndn::time::milliseconds(GET_RANDOM(m_reexpressionJitter)),
617 bind (&SyncLogic::sendSyncInterest, this));
akmhoque66e66182014-02-21 17:56:03 -0600618 m_scheduler.cancelEvent (m_reexpressingInterestId);
619 m_reexpressingInterestId = eventId;
620
621 ndn::Interest interest(m_outstandingInterestName);
622 interest.setMustBeFresh(true);
623
624 m_face->expressInterest(interest,
625 bind(&SyncLogic::onSyncData, this, _1, _2),
626 bind(&SyncLogic::onSyncTimeout, this, _1));
627}
628
629void
630SyncLogic::sendSyncRecoveryInterests (DigestConstPtr digest)
631{
632 ostringstream os;
633 os << *digest;
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700634
akmhoque66e66182014-02-21 17:56:03 -0600635 Name interestName = m_syncPrefix;
636 interestName.append("recovery").append(os.str());
637
akmhoque157b0a42014-05-13 00:26:37 -0500638 ndn::time::system_clock::Duration nextRetransmission =
639 ndn::time::milliseconds(m_recoveryRetransmissionInterval + GET_RANDOM (m_reexpressionJitter));
akmhoque66e66182014-02-21 17:56:03 -0600640
641 m_recoveryRetransmissionInterval <<= 1;
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700642
akmhoque66e66182014-02-21 17:56:03 -0600643 m_scheduler.cancelEvent (m_reexpressingRecoveryInterestId);
644 if (m_recoveryRetransmissionInterval < 100*1000) // <100 seconds
akmhoque157b0a42014-05-13 00:26:37 -0500645 m_reexpressingRecoveryInterestId =
646 m_scheduler.scheduleEvent (nextRetransmission,
647 bind (&SyncLogic::sendSyncRecoveryInterests, this, digest));
akmhoque66e66182014-02-21 17:56:03 -0600648
649 ndn::Interest interest(interestName);
650 interest.setMustBeFresh(true);
651
652 m_face->expressInterest(interest,
653 bind(&SyncLogic::onSyncData, this, _1, _2),
654 bind(&SyncLogic::onSyncTimeout, this, _1));
655}
656
657
658void
659SyncLogic::sendSyncData (const Name &name, DigestConstPtr digest, StateConstPtr state)
660{
661 SyncStateMsg msg;
662 msg << (*state);
663 sendSyncData(name, digest, msg);
664}
665
666// pass in state msg instead of state, so that there is no need to lock the state until
667// this function returns
668void
669SyncLogic::sendSyncData (const Name &name, DigestConstPtr digest, SyncStateMsg &ssm)
670{
671 _LOG_DEBUG_ID (">> D " << name);
672 int size = ssm.ByteSize();
673 char *wireData = new char[size];
674 ssm.SerializeToArray(wireData, size);
675
akmhoque69c9aa92014-07-23 15:15:05 -0500676 //Data syncData(name);
677 ndn::shared_ptr<ndn::Data> syncData = ndn::make_shared<ndn::Data>();
678 syncData->setName(name);
679 syncData->setContent(reinterpret_cast<const uint8_t*>(wireData), size);
680 syncData->setFreshnessPeriod(ndn::time::seconds(m_syncResponseFreshness));
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700681
akmhoque69c9aa92014-07-23 15:15:05 -0500682 m_keyChain->sign(*syncData);
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700683
akmhoque69c9aa92014-07-23 15:15:05 -0500684 m_face->put(*syncData);
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700685
akmhoque66e66182014-02-21 17:56:03 -0600686 delete []wireData;
687
688 // checking if our own interest got satisfied
689 bool satisfiedOwnInterest = false;
690 {
691 satisfiedOwnInterest = (m_outstandingInterestName == name);
692 }
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700693
akmhoque66e66182014-02-21 17:56:03 -0600694 if (satisfiedOwnInterest)
695 {
696 _LOG_DEBUG_ID ("Satisfied our own Interest. Re-expressing (hopefully with a new digest)");
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700697
akmhoque157b0a42014-05-13 00:26:37 -0500698 ndn::time::system_clock::Duration after =
699 ndn::time::milliseconds(GET_RANDOM(m_reexpressionJitter));
akmhoque66e66182014-02-21 17:56:03 -0600700 // cout << "------------ reexpress interest after: " << after << endl;
701 EventId eventId = m_scheduler.scheduleEvent (after,
702 bind (&SyncLogic::sendSyncInterest, this));
703 m_scheduler.cancelEvent (m_reexpressingInterestId);
704 m_reexpressingInterestId = eventId;
705 }
706}
707
708string
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700709SyncLogic::getRootDigest()
akmhoque66e66182014-02-21 17:56:03 -0600710{
711 ostringstream os;
712 os << *m_state->getDigest();
713 return os.str();
714}
715
716size_t
717SyncLogic::getNumberOfBranches () const
718{
719 return m_state->getLeaves ().size ();
720}
721
722void
723SyncLogic::printState () const
724{
Vince Lehman0a7da612014-10-29 14:39:29 -0500725 BOOST_FOREACH (const shared_ptr<Sync::Leaf> leaf, m_state->getLeaves ())
akmhoque66e66182014-02-21 17:56:03 -0600726 {
727 std::cout << *leaf << std::endl;
728 }
729}
730
731std::map<std::string, bool>
732SyncLogic::getBranchPrefixes() const
733{
734 std::map<std::string, bool> m;
735
Vince Lehman0a7da612014-10-29 14:39:29 -0500736 BOOST_FOREACH (const shared_ptr<Sync::Leaf> leaf, m_state->getLeaves ())
akmhoque66e66182014-02-21 17:56:03 -0600737 {
738 std::string prefix = leaf->getInfo()->toString();
739 // do not return forwarder prefix
740 if (prefix != forwarderPrefix)
741 {
742 m.insert(pair<std::string, bool>(prefix, false));
743 }
744 }
745
746 return m;
747}
748
749}