Finalizing sync logic implementation
diff --git a/model/sync-interest-container.h b/model/sync-interest-container.h
index 15bd07c..b998b18 100644
--- a/model/sync-interest-container.h
+++ b/model/sync-interest-container.h
@@ -42,16 +42,18 @@
struct Interest
{
- Interest (DigestConstPtr digest, const std::string &name)
+ Interest (DigestConstPtr digest, const std::string &name, bool unknown=false)
: m_digest (digest)
, m_name (name)
, m_time (TIME_NOW)
+ , m_unknown (unknown)
{
}
DigestConstPtr m_digest;
std::string m_name;
TimeAbsolute m_time;
+ bool m_unknown;
};
/// @cond include_hidden
diff --git a/model/sync-interest-table.cc b/model/sync-interest-table.cc
index 469e4e4..80f8862 100644
--- a/model/sync-interest-table.cc
+++ b/model/sync-interest-table.cc
@@ -48,7 +48,9 @@
expireInterests ();
recursive_mutex::scoped_lock lock (m_mutex);
- BOOST_ASSERT (m_table.size () != 0);
+ if (m_table.size () == 0)
+ BOOST_THROW_EXCEPTION (Error::InterestTableIsEmpty ());
+
Interest ret = *m_table.begin ();
m_table.erase (m_table.begin ());
diff --git a/model/sync-interest-table.h b/model/sync-interest-table.h
index 083f913..2e5451f 100644
--- a/model/sync-interest-table.h
+++ b/model/sync-interest-table.h
@@ -92,6 +92,11 @@
mutable boost::recursive_mutex m_mutex;
};
+namespace Error {
+struct InterestTableIsEmpty : virtual boost::exception, virtual std::exception { };
+}
+
+
} // Sync
#endif // SYNC_INTEREST_TABLE_H
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 13b66f5..3098b00 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -133,10 +133,12 @@
size_t pos = hash.find ('/');
if (pos != string::npos)
{
- hash = name.substr (pos + 1);
- interestType = name.substr (0, pos);
+ interestType = hash.substr (0, pos);
+ hash = hash.substr (pos + 1);
}
+ _LOG_TRACE (hash << ", " << interestType);
+
DigestPtr digest = make_shared<Digest> ();
istringstream is (hash);
is >> *digest;
@@ -183,7 +185,15 @@
string type;
tie (digest, type) = convertNameToDigestAndType (name);
- processSyncData (name, digest, dataBuffer);
+ if (type == "normal")
+ {
+ processSyncData (name, digest, dataBuffer);
+ }
+ else
+ {
+ m_scheduler.cancel (REEXPRESSING_RECOVERY_INTEREST);
+ processSyncData (name, digest, dataBuffer);
+ }
}
catch (Error::DigestCalculationError &e)
{
@@ -191,7 +201,6 @@
// log error. ignoring it for now, later we should log it
return;
}
-
}
@@ -242,9 +251,8 @@
}
else
{
- BOOST_ASSERT (false); // not implemented yet
-
- // _LOG_TRACE (">> D " << *digest << " (timed processing)");
+ _LOG_TRACE (" (timed processing)");
+ sendSyncRecoveryInterests (digest);
}
}
@@ -307,11 +315,12 @@
// don't do anything
}
- if (diffLog != 0)
+ if (diffLog != 0 && diffLog->getLeaves ().size () > 0)
{
- BOOST_ASSERT (diffLog->getLeaves ().size () > 0);
+ // Do it only if everything went fine and state changed
- satisfyPendingSyncInterests (diffLog); // if there are interests in PIT, there is a point to satisfy them using new state
+ // this is kind of wrong
+ // satisfyPendingSyncInterests (diffLog); // if there are interests in PIT, there is a point to satisfy them using new state
// if state has changed, then it is safe to express a new interest
m_scheduler.cancel (REEXPRESSING_INTEREST);
@@ -328,7 +337,11 @@
DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
- if (stateInDiffLog == m_log.end ()) return;
+ if (stateInDiffLog == m_log.end ())
+ {
+ _LOG_TRACE ("Could not find " << *digest << " in digest log");
+ return;
+ }
sendSyncData (name, digest, m_state);
}
@@ -336,16 +349,41 @@
void
SyncLogic::satisfyPendingSyncInterests (DiffStateConstPtr diffLog)
{
- uint32_t counter = 0;
- while (m_syncInterestTable.size () > 0)
+ DiffStatePtr fullStateLog = make_shared<DiffState> ();
+ {
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+ BOOST_FOREACH (LeafConstPtr leaf, m_state->getLeaves ()/*.get<timed> ()*/)
+ {
+ fullStateLog->update (leaf->getInfo (), leaf->getSeq ());
+ /// @todo Impose limit on how many state info should be send out
+ }
+ }
+
+ try
{
- Interest interest = m_syncInterestTable.pop ();
-
- _LOG_TRACE (">> D " << interest.m_name);
- sendSyncData (interest.m_name, interest.m_digest, diffLog);
- counter ++;
+ uint32_t counter = 0;
+ while (m_syncInterestTable.size () > 0)
+ {
+ Interest interest = m_syncInterestTable.pop ();
+
+ if (!interest.m_unknown)
+ {
+ _LOG_TRACE (">> D " << interest.m_name);
+ sendSyncData (interest.m_name, interest.m_digest, diffLog);
+ }
+ else
+ {
+ _LOG_TRACE (">> D (unknown)" << interest.m_name);
+ sendSyncData (interest.m_name, interest.m_digest, fullStateLog);
+ }
+ counter ++;
+ }
+ _LOG_DEBUG ("Satisfied " << counter << " pending interests");
}
- _LOG_DEBUG ("Satisfied " << counter << " pending interests");
+ catch (Error::InterestTableIsEmpty &e)
+ {
+ // ok. not really an error
+ }
}
void
@@ -427,8 +465,27 @@
}
void
+SyncLogic::sendSyncRecoveryInterests (DigestConstPtr digest)
+{
+ ostringstream os;
+ os << m_syncPrefix << "/recovery/" << *digest;
+ _LOG_TRACE (">> I " << os.str ());
+
+ // no need for retransmission
+ // m_scheduler.cancel (REEXPRESSING_RECOVERY_INTEREST);
+ // m_scheduler.schedule (TIME_SECONDS_WITH_JITTER(1.0),//TIME_SECONDS_WITH_JITTER (m_syncInterestReexpress),
+ // bind (&SyncLogic::sendSyncRecoveryInterests, this, digest),
+ // REEXPRESSING_RECOVERY_INTEREST);
+
+ m_ccnxHandle->sendInterest (os.str (),
+ bind (&SyncLogic::respondSyncData, this, _1, _2));
+}
+
+
+void
SyncLogic::sendSyncData (const std::string &name, DigestConstPtr digest, StateConstPtr state)
{
+ _LOG_TRACE (">> D " << name);
// sending
m_ccnxHandle->publishData (name,
lexical_cast<string> (*state),
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 53f0d2e..5e38f81 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -142,6 +142,9 @@
sendSyncInterest ();
void
+ sendSyncRecoveryInterests (DigestConstPtr digest);
+
+ void
sendSyncData (const std::string &name,
DigestConstPtr digest, StateConstPtr state);
@@ -182,7 +185,8 @@
enum EventLabels
{
DELAYED_INTEREST_PROCESSING = 1,
- REEXPRESSING_INTEREST = 2
+ REEXPRESSING_INTEREST = 2,
+ REEXPRESSING_RECOVERY_INTEREST = 3
};
#ifdef _DEBUG