blob: 2be8636e04063c20b353be24f3dbc3b2aad82f10 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012-2014 University of California, Los Angeles
4 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
23 */
24
25#ifndef CHRONOSYNC_LOGIC_HPP
26#define CHRONOSYNC_LOGIC_HPP
27
28#include "boost-header.h"
29#include <memory>
30#include <map>
31
32#include <ndn-cxx/face.hpp>
33#include <ndn-cxx/util/scheduler.hpp>
34#include <ndn-cxx/security/key-chain.hpp>
35
36#include "interest-table.hpp"
37#include "diff-state-container.hpp"
38
39namespace chronosync {
40
41/**
42 * @brief The missing sequence numbers for a session
43 *
44 * This class is used to notify the clients of Logic
45 * the details of state changes.
46 *
47 * Instances of this class is usually used as elements of some containers
48 * such as std::vector, thus it is copyable.
49 */
50class MissingDataInfo
51{
52public:
53 /// @brief session name
54 Name session;
55 /// @brief the lowest one of missing sequence numbers
56 SeqNo low;
57 /// @brief the highest one of missing sequence numbers
58 SeqNo high;
59};
60
61/**
62 * @brief The callback function to handle state updates
63 *
64 * The parameter is a set of MissingDataInfo, of which each corresponds to
65 * a session that has changed its state.
66 */
67typedef function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
68
69/**
70 * @brief Logic of ChronoSync
71 */
72class Logic : noncopyable
73{
74public:
75 static const time::steady_clock::Duration DEFAULT_RESET_TIMER;
76 static const time::steady_clock::Duration DEFAULT_CANCEL_RESET_TIMER;
77 static const time::milliseconds DEFAULT_RESET_INTEREST_LIFETIME;
78 static const time::milliseconds DEFAULT_SYNC_INTEREST_LIFETIME;
79 static const time::milliseconds DEFAULT_SYNC_REPLY_FRESHNESS;
80
81 /**
82 * @brief Constructor
83 *
84 * @param syncPrefix The prefix of the sync group
85 * @param userPrefix The prefix of the user who owns the session
86 * @param onUpdate The callback function to handle state updates
87 * @param resetTimer The timer to periodically send Reset Interest
88 * @param syncReplyFreshness The FreshnessPeriod of sync reply
89 * @param resetInterestLifetime The lifetime of sync interest
90 * @param resetInterestLifetime The lifetime of Reset Interest
91 * @param cancelResetTimer The timer to exit from Reset state
92 */
93 Logic(ndn::Face& face,
94 const Name& syncPrefix,
95 const Name& userPrefix,
96 const UpdateCallback& onUpdate,
97 const time::steady_clock::Duration& resetTimer = DEFAULT_RESET_TIMER,
98 const time::steady_clock::Duration& cancelResetTimer = DEFAULT_CANCEL_RESET_TIMER,
99 const time::milliseconds& resetInterestLifetime = DEFAULT_RESET_INTEREST_LIFETIME,
100 const time::milliseconds& syncInterestLifetime = DEFAULT_SYNC_INTEREST_LIFETIME,
101 const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS);
102
103 ~Logic();
104
105 /// @brief Reset the sync tree (and restart synchronization again)
106 void
107 reset();
108
109 /**
110 * @brief Set user prefix
111 *
112 * This method will also change the session name and trigger reset.
113 *
114 * @param userPrefix The prefix of user.
115 */
116 void
117 setUserPrefix(const Name& userPrefix);
118
119 /// @brief Get the name of the local session.
120 const Name&
121 getSessionName() const
122 {
123 return m_sessionName;
124 }
125
126 /// @brief Get current seqNo of the local session.
127 const SeqNo&
128 getSeqNo() const
129 {
130 return m_seqNo;
131 }
132
133 /**
134 * @brief Update the seqNo of the local session
135 *
136 * The method updates the existing seqNo with the supplied seqNo.
137 *
138 * @param seq The new seqNo.
139 */
140 void
141 updateSeqNo(const SeqNo& seq);
142
143 /// @brief Get root digest of current sync tree
144 ndn::ConstBufferPtr
145 getRootDigest() const;
146
147 /// @brief Get the name of all sessions
148 std::set<Name>
149 getSessionNames() const;
150
151PUBLIC_WITH_TESTS_ELSE_PRIVATE:
152 void
153 printState(std::ostream& os) const;
154
155 ndn::Scheduler&
156 getScheduler()
157 {
158 return m_scheduler;
159 }
160
161 State&
162 getState()
163 {
164 return m_state;
165 }
166
167private:
168 /**
169 * @brief Callback to handle Sync Interest
170 *
171 * This method checks whether an incoming interest is a normal one or a reset
172 * and dispatches the incoming interest to corresponding processing methods.
173 *
174 * @param prefix The prefix of the sync group.
175 * @param interest The incoming sync interest.
176 */
177 void
178 onSyncInterest(const Name& prefix, const Interest& interest);
179
180 /**
181 * @brief Callback to handle Sync prefix registration failure
182 *
183 * This method does nothing for now.
184 *
185 * @param prefix The prefix of the sync group.
186 * @param msg The error message.
187 */
188 void
189 onSyncRegisterFailed(const Name& prefix, const std::string& msg);
190
191 /**
192 * @brief Callback to handle Sync Reply
193 *
194 * This method calls validator to validate Sync Reply.
195 * For now, validation is disabled, Logic::onSyncDataValidated is called
196 * directly.
197 *
198 * @param interest The Sync Interest
199 * @param data The reply to the Sync Interest
200 */
201 void
202 onSyncData(const Interest& interest, Data& data);
203
204 /**
205 * @brief Callback to handle reply to Reset Interest.
206 *
207 * This method does nothing, since reply to Reset Interest is not useful for now.
208 *
209 * @param interest The Reset Interest
210 * @param data The reply to the Reset Interest
211 */
212 void
213 onResetData(const Interest& interest, Data& data);
214
215 /**
216 * @brief Callback to handle Sync Interest timeout.
217 *
218 * This method does nothing, since Logic per se handles timeout explicitly.
219 *
220 * @param interest The Sync Interest
221 */
222 void
223 onSyncTimeout(const Interest& interest);
224
225 /**
226 * @brief Callback to invalid Sync Reply.
227 *
228 * This method does nothing but drops the invalid reply.
229 *
230 * @param data The invalid Sync Reply
231 */
232 void
233 onSyncDataValidationFailed(const shared_ptr<const Data>& data);
234
235 /**
236 * @brief Callback to valid Sync Reply.
237 *
238 * This method simply passes the valid reply to processSyncData.
239 *
240 * @param data The valid Sync Reply.
241 */
242 void
243 onSyncDataValidated(const shared_ptr<const Data>& data);
244
245 /**
246 * @brief Process normal Sync Interest
247 *
248 * This method extracts the digest from the incoming Sync Interest,
249 * compares it against current local digest, and process the Sync
250 * Interest according to the comparison result. See docs/design.rst
251 * for more details.
252 *
253 * @param interest The incoming interest
254 * @param isTimedProcessing True if the interest needs an immediate reply,
255 * otherwise hold the interest for a while before
256 * making a reply (to avoid unnecessary recovery)
257 */
258 void
259 processSyncInterest(const shared_ptr<const Interest>& interest,
260 bool isTimedProcessing = false);
261
262 /**
263 * @brief Process reset Sync Interest
264 *
265 * This method simply call Logic::reset()
266 *
267 * @param interest The incoming interest.
268 */
269 void
270 processResetInterest(const Interest& interest);
271
272 /**
273 * @brief Process Sync Reply.
274 *
275 * This method extracts state update information from Sync Reply and applies
276 * it to the Sync Tree and re-express Sync Interest.
277 *
278 * @param name The data name of the Sync Reply.
279 * @param digest The digest in the data name.
280 * @param syncReplyBlock The content of the Sync Reply.
281 */
282 void
283 processSyncData(const Name& name,
284 ndn::ConstBufferPtr digest,
285 const Block& syncReplyBlock);
286
287 /**
288 * @brief Insert state diff into log
289 *
290 * @param diff The diff .
291 * @param previousRoot The root digest before state changes.
292 */
293 void
294 insertToDiffLog(DiffStatePtr diff,
295 ndn::ConstBufferPtr previousRoot);
296
297 /**
298 * @brief Reply to all pending Sync Interests with a particular commit (or diff)
299 *
300 * @param commit The diff.
301 */
302 void
303 satisfyPendingSyncInterests(ConstDiffStatePtr commit);
304
305 /// @brief Helper method to send normal Sync Interest
306 void
307 sendSyncInterest();
308
309 /// @brief Helper method to send reset Sync Interest
310 void
311 sendResetInterest();
312
313 /// @brief Helper method to send Sync Reply
314 void
315 sendSyncData(const Name& name, const State& state);
316
317 /**
318 * @brief Unset reset status
319 *
320 * By invoking this method, one can add its own state into the Sync Tree, thus
321 * jumping out of the reset status
322 */
323 void
324 cancelReset();
325
326 void
327 printDigest(ndn::ConstBufferPtr digest);
328
329private:
330
331 static const ndn::ConstBufferPtr EMPTY_DIGEST;
332 static const ndn::name::Component RESET_COMPONENT;
333
334 // Communication
335 ndn::Face& m_face;
336 Name m_syncPrefix;
337 const ndn::RegisteredPrefixId* m_syncRegisteredPrefixId;
338 Name m_syncReset;
339 Name m_userPrefix;
340
341 // State
342 Name m_sessionName;
343 SeqNo m_seqNo;
344 State m_state;
345 DiffStateContainer m_log;
346 InterestTable m_interestTable;
347 Name m_outstandingInterestName;
348 const ndn::PendingInterestId* m_outstandingInterestId;
349 bool m_isInReset;
350 bool m_needPeriodReset;
351
352 // Callback
353 UpdateCallback m_onUpdate;
354
355 // Event
356 ndn::Scheduler m_scheduler;
357 ndn::EventId m_delayedInterestProcessingId;
358 ndn::EventId m_reexpressingInterestId;
359 ndn::EventId m_resetInterestId;
360
361 // Timer
362 boost::mt19937 m_randomGenerator;
363 boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
364 boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_reexpressionJitter;
365 /// @brief Timer to send next reset 0 for no reset
366 time::steady_clock::Duration m_resetTimer;
367 /// @brief Timer to cancel reset state
368 time::steady_clock::Duration m_cancelResetTimer;
369 /// @brief Lifetime of reset interest
370 time::milliseconds m_resetInterestLifetime;
371 /// @brief Lifetime of sync interest
372 time::milliseconds m_syncInterestLifetime;
373 /// @brief FreshnessPeriod of SyncReply
374 time::milliseconds m_syncReplyFreshness;
375
376 // Others
377 ndn::KeyChain m_keyChain;
378
379#ifdef _DEBUG
380 int m_instanceId;
381 static int m_instanceCounter;
382#endif
383};
384
385
386} // namespace chronosync
387
388#endif // CHRONOSYNC_LOGIC_HPP