blob: 3e77b53e48f536c3f725c7570a573ae8d6ffb212 [file] [log] [blame]
Yingdi Yud45777b2014-10-16 23:54:11 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013, Regents of the University of California
4 * Yingdi Yu
5 *
6 * BSD license, See the LICENSE file for more information
7 *
8 * Author: Yingdi Yu <yingdi@cs.ucla.edu>
9 */
10
11#include "chat-dialog-backend.hpp"
12
13#ifndef Q_MOC_RUN
14#include <ndn-cxx/util/io.hpp>
15#include "logging.h"
16#endif
17
18
19INIT_LOGGER("ChatDialogBackend");
20
21namespace chronos {
22
23static const time::milliseconds FRESHNESS_PERIOD(60000);
24static const time::seconds HELLO_INTERVAL(60);
25static const uint8_t ROUTING_HINT_SEPARATOR[2] = {0xF0, 0x2E}; // %F0.
26
27ChatDialogBackend::ChatDialogBackend(const Name& chatroomPrefix,
28 const Name& userChatPrefix,
29 const Name& routingPrefix,
30 const std::string& chatroomName,
31 const std::string& nick,
32 QObject* parent)
33 : QThread(parent)
34 , m_localRoutingPrefix(routingPrefix)
35 , m_chatroomPrefix(chatroomPrefix)
36 , m_userChatPrefix(userChatPrefix)
37 , m_chatroomName(chatroomName)
38 , m_nick(nick)
Yingdi Yud45777b2014-10-16 23:54:11 -070039{
40 updatePrefixes();
41}
42
43
44ChatDialogBackend::~ChatDialogBackend()
45{
46}
47
48// protected methods:
49void
50ChatDialogBackend::run()
51{
Yingdi Yu4647f022015-02-01 00:26:38 -080052 bool shouldResume = false;
53 do {
54 initializeSync();
Yingdi Yud45777b2014-10-16 23:54:11 -070055
Yingdi Yu4647f022015-02-01 00:26:38 -080056 if (m_face == nullptr)
57 break;
58
59 m_face->getIoService().run();
60
61 m_mutex.lock();
62 shouldResume = m_shouldResume;
63 m_shouldResume = false;
64 m_mutex.unlock();
65
66 } while (shouldResume);
Yingdi Yud45777b2014-10-16 23:54:11 -070067
68 std::cerr << "Bye!" << std::endl;
69}
70
71// private methods:
72void
73ChatDialogBackend::initializeSync()
74{
Yingdi Yu4647f022015-02-01 00:26:38 -080075 BOOST_ASSERT(m_sock == nullptr);
Yingdi Yud45777b2014-10-16 23:54:11 -070076
Yingdi Yu4647f022015-02-01 00:26:38 -080077 m_face = unique_ptr<ndn::Face>(new ndn::Face);
78 m_scheduler = unique_ptr<ndn::Scheduler>(new ndn::Scheduler(m_face->getIoService()));
Yingdi Yud45777b2014-10-16 23:54:11 -070079
80 // create a new SyncSocket
81 m_sock = make_shared<chronosync::Socket>(m_chatroomPrefix,
82 m_routableUserChatPrefix,
Yingdi Yu4647f022015-02-01 00:26:38 -080083 ref(*m_face),
Yingdi Yud45777b2014-10-16 23:54:11 -070084 bind(&ChatDialogBackend::processSyncUpdate, this, _1));
85
86 // schedule a new join event
Yingdi Yu4647f022015-02-01 00:26:38 -080087 m_scheduler->scheduleEvent(time::milliseconds(600),
88 bind(&ChatDialogBackend::sendJoin, this));
Yingdi Yud45777b2014-10-16 23:54:11 -070089
90 // cancel existing hello event if it exists
Yingdi Yu4647f022015-02-01 00:26:38 -080091 if (m_helloEventId != nullptr) {
92 m_scheduler->cancelEvent(m_helloEventId);
Yingdi Yud45777b2014-10-16 23:54:11 -070093 m_helloEventId.reset();
94 }
95}
96
97void
Yingdi Yu4647f022015-02-01 00:26:38 -080098ChatDialogBackend::close()
99{
100 if (m_joined)
101 sendLeave();
102
103 usleep(100000);
104
105 m_scheduler->cancelAllEvents();
106 m_helloEventId.reset();
107 m_roster.clear();
108 m_sock.reset();
109}
110
111void
Yingdi Yud45777b2014-10-16 23:54:11 -0700112ChatDialogBackend::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates)
113{
114 _LOG_DEBUG("<<< processing Tree Update");
115
116 if (updates.empty()) {
117 return;
118 }
119
120 std::vector<NodeInfo> nodeInfos;
121
122
Yingdi Yu1cc45d92015-02-09 14:19:54 -0800123 for (size_t i = 0; i < updates.size(); i++) {
Yingdi Yud45777b2014-10-16 23:54:11 -0700124 // update roster
125 if (m_roster.find(updates[i].session) == m_roster.end()) {
126 m_roster[updates[i].session].sessionPrefix = updates[i].session;
127 m_roster[updates[i].session].hasNick = false;
128 }
129
130 // fetch missing chat data
131 if (updates[i].high - updates[i].low < 3) {
132 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
133 m_sock->fetchData(updates[i].session, seq,
134 bind(&ChatDialogBackend::processChatData, this, _1, true),
135 2);
136 _LOG_DEBUG("<<< Fetching " << updates[i].session << "/" << seq);
137 }
138 }
139 else {
140 // There are too many msgs to fetch, let's just fetch the latest one
141 m_sock->fetchData(updates[i].session, updates[i].high,
142 bind(&ChatDialogBackend::processChatData, this, _1, false),
143 2);
144 }
145
146 // prepare notification to frontend
147 NodeInfo nodeInfo;
148 nodeInfo.sessionPrefix = QString::fromStdString(updates[i].session.toUri());
149 nodeInfo.seqNo = updates[i].high;
150 nodeInfos.push_back(nodeInfo);
151 }
152
153 // reflect the changes on GUI
154 emit syncTreeUpdated(nodeInfos,
155 QString::fromStdString(getHexEncodedDigest(m_sock->getRootDigest())));
156}
157
158void
159ChatDialogBackend::processChatData(const ndn::shared_ptr<const ndn::Data>& data, bool needDisplay)
160{
161 SyncDemo::ChatMessage msg;
162
163 if (!msg.ParseFromArray(data->getContent().value(), data->getContent().value_size())) {
164 _LOG_DEBUG("Errrrr.. Can not parse msg with name: " <<
165 data->getName() << ". what is happening?");
166 // nasty stuff: as a remedy, we'll form some standard msg for inparsable msgs
167 msg.set_from("inconnu");
168 msg.set_type(SyncDemo::ChatMessage::OTHER);
169 return;
170 }
171
172 Name remoteSessionPrefix = data->getName().getPrefix(-1);
173
174 if (msg.type() == SyncDemo::ChatMessage::LEAVE) {
175 BackendRoster::iterator it = m_roster.find(remoteSessionPrefix);
176
177 if (it != m_roster.end()) {
178 // cancel timeout event
179 if (static_cast<bool>(it->second.timeoutEventId))
Yingdi Yu4647f022015-02-01 00:26:38 -0800180 m_scheduler->cancelEvent(it->second.timeoutEventId);
Yingdi Yud45777b2014-10-16 23:54:11 -0700181
182 // notify frontend to remove the remote session (node)
183 emit sessionRemoved(QString::fromStdString(remoteSessionPrefix.toUri()),
184 QString::fromStdString(msg.from()),
185 msg.timestamp());
186
187 // remove roster entry
188 m_roster.erase(remoteSessionPrefix);
189 }
190 }
191 else {
192 BackendRoster::iterator it = m_roster.find(remoteSessionPrefix);
193
194 if (it == m_roster.end()) {
195 // Should not happen
196 BOOST_ASSERT(false);
197 }
198
199 // If we haven't got any message from this session yet.
200 if (m_roster[remoteSessionPrefix].hasNick == false) {
201 m_roster[remoteSessionPrefix].userNick = msg.from();
202 m_roster[remoteSessionPrefix].hasNick = true;
203 emit sessionAdded(QString::fromStdString(remoteSessionPrefix.toUri()),
204 QString::fromStdString(msg.from()),
205 msg.timestamp());
206 }
207
208 // If we get a new nick for an existing session, update it.
209 if (m_roster[remoteSessionPrefix].userNick != msg.from()) {
210 m_roster[remoteSessionPrefix].userNick = msg.from();
211 emit nickUpdated(QString::fromStdString(remoteSessionPrefix.toUri()),
212 QString::fromStdString(msg.from()));
213 }
214
215 // If a timeout event has been scheduled, cancel it.
216 if (static_cast<bool>(it->second.timeoutEventId))
Yingdi Yu4647f022015-02-01 00:26:38 -0800217 m_scheduler->cancelEvent(it->second.timeoutEventId);
Yingdi Yud45777b2014-10-16 23:54:11 -0700218
219 // (Re)schedule another timeout event after 3 HELLO_INTERVAL;
220 it->second.timeoutEventId =
Yingdi Yu4647f022015-02-01 00:26:38 -0800221 m_scheduler->scheduleEvent(HELLO_INTERVAL * 3,
222 bind(&ChatDialogBackend::remoteSessionTimeout,
223 this, remoteSessionPrefix));
Yingdi Yud45777b2014-10-16 23:54:11 -0700224
225 // If chat message, notify the frontend
226 if (msg.type() == SyncDemo::ChatMessage::CHAT)
227 emit chatMessageReceived(QString::fromStdString(msg.from()),
228 QString::fromStdString(msg.data()),
229 msg.timestamp());
230
231 // Notify frontend to plot notification on DigestTree.
232 emit messageReceived(QString::fromStdString(remoteSessionPrefix.toUri()));
233 }
234}
235
236void
237ChatDialogBackend::remoteSessionTimeout(const Name& sessionPrefix)
238{
239 time_t timestamp =
240 static_cast<time_t>(time::toUnixTimestamp(time::system_clock::now()).count() / 1000);
241
242 // notify frontend
243 emit sessionRemoved(QString::fromStdString(sessionPrefix.toUri()),
244 QString::fromStdString(m_roster[sessionPrefix].userNick),
245 timestamp);
246
247 // remove roster entry
248 m_roster.erase(sessionPrefix);
249}
250
251void
252ChatDialogBackend::sendMsg(SyncDemo::ChatMessage& msg)
253{
254 // send msg
255 ndn::OBufferStream os;
256 msg.SerializeToOstream(&os);
257
258 if (!msg.IsInitialized()) {
259 _LOG_DEBUG("Errrrr.. msg was not probally initialized " << __FILE__ <<
260 ":" << __LINE__ << ". what is happening?");
261 abort();
262 }
263
264 uint64_t nextSequence = m_sock->getLogic().getSeqNo() + 1;
265
266 m_sock->publishData(os.buf()->buf(), os.buf()->size(), FRESHNESS_PERIOD);
267
268 std::vector<NodeInfo> nodeInfos;
269 NodeInfo nodeInfo = {QString::fromStdString(m_routableUserChatPrefix.toUri()),
270 nextSequence};
271 nodeInfos.push_back(nodeInfo);
272
273 emit syncTreeUpdated(nodeInfos,
274 QString::fromStdString(getHexEncodedDigest(m_sock->getRootDigest())));
275}
276
277void
278ChatDialogBackend::sendJoin()
279{
280 m_joined = true;
281
282 SyncDemo::ChatMessage msg;
283 prepareControlMessage(msg, SyncDemo::ChatMessage::JOIN);
284 sendMsg(msg);
285
Yingdi Yu4647f022015-02-01 00:26:38 -0800286 m_helloEventId = m_scheduler->scheduleEvent(HELLO_INTERVAL,
287 bind(&ChatDialogBackend::sendHello, this));
Yingdi Yud45777b2014-10-16 23:54:11 -0700288
289 emit sessionAdded(QString::fromStdString(m_routableUserChatPrefix.toUri()),
290 QString::fromStdString(msg.from()),
291 msg.timestamp());
292}
293
294void
295ChatDialogBackend::sendHello()
296{
297 SyncDemo::ChatMessage msg;
298 prepareControlMessage(msg, SyncDemo::ChatMessage::HELLO);
299 sendMsg(msg);
300
Yingdi Yu4647f022015-02-01 00:26:38 -0800301 m_helloEventId = m_scheduler->scheduleEvent(HELLO_INTERVAL,
302 bind(&ChatDialogBackend::sendHello, this));
Yingdi Yud45777b2014-10-16 23:54:11 -0700303}
304
305void
306ChatDialogBackend::sendLeave()
307{
308 SyncDemo::ChatMessage msg;
309 prepareControlMessage(msg, SyncDemo::ChatMessage::LEAVE);
310 sendMsg(msg);
311
312 usleep(5000);
313 m_joined = false;
314}
315
316void
317ChatDialogBackend::prepareControlMessage(SyncDemo::ChatMessage& msg,
318 SyncDemo::ChatMessage::ChatMessageType type)
319{
320 msg.set_from(m_nick);
321 msg.set_to(m_chatroomName);
322 int32_t seconds =
323 static_cast<int32_t>(time::toUnixTimestamp(time::system_clock::now()).count() / 1000);
324 msg.set_timestamp(seconds);
325 msg.set_type(type);
326}
327
328void
329ChatDialogBackend::prepareChatMessage(const QString& text,
330 time_t timestamp,
331 SyncDemo::ChatMessage &msg)
332{
333 msg.set_from(m_nick);
334 msg.set_to(m_chatroomName);
335 msg.set_data(text.toStdString());
336 msg.set_timestamp(timestamp);
337 msg.set_type(SyncDemo::ChatMessage::CHAT);
338}
339
340void
341ChatDialogBackend::updatePrefixes()
342{
343 m_routableUserChatPrefix.clear();
344
345 if (m_localRoutingPrefix.isPrefixOf(m_userChatPrefix))
346 m_routableUserChatPrefix = m_userChatPrefix;
347 else
348 m_routableUserChatPrefix.append(m_localRoutingPrefix)
349 .append(ROUTING_HINT_SEPARATOR, 2)
350 .append(m_userChatPrefix);
351
352 emit chatPrefixChanged(m_routableUserChatPrefix);
353}
354
355std::string
356ChatDialogBackend::getHexEncodedDigest(ndn::ConstBufferPtr digest)
357{
358 std::stringstream os;
359
360 CryptoPP::StringSource(digest->buf(), digest->size(), true,
361 new CryptoPP::HexEncoder(new CryptoPP::FileSink(os), false));
362 return os.str();
363}
364
365
366// public slots:
367void
368ChatDialogBackend::sendChatMessage(QString text, time_t timestamp)
369{
370 SyncDemo::ChatMessage msg;
371 prepareChatMessage(text, timestamp, msg);
372 sendMsg(msg);
373
374 emit chatMessageReceived(QString::fromStdString(msg.from()),
375 QString::fromStdString(msg.data()),
376 msg.timestamp());
377}
378
379void
380ChatDialogBackend::updateRoutingPrefix(const QString& localRoutingPrefix)
381{
382 Name newLocalRoutingPrefix(localRoutingPrefix.toStdString());
383
384 if (!newLocalRoutingPrefix.empty() && newLocalRoutingPrefix != m_localRoutingPrefix) {
385 // Update localPrefix
386 m_localRoutingPrefix = newLocalRoutingPrefix;
387
388 updatePrefixes();
389
Yingdi Yu4647f022015-02-01 00:26:38 -0800390 m_mutex.lock();
391 m_shouldResume = true;
392 m_mutex.unlock();
393
394 close();
395
396 m_face->getIoService().stop();
Yingdi Yud45777b2014-10-16 23:54:11 -0700397 }
398}
399
400void
401ChatDialogBackend::shutdown()
402{
Yingdi Yu4647f022015-02-01 00:26:38 -0800403 m_mutex.lock();
404 m_shouldResume = false;
405 m_mutex.unlock();
Yingdi Yu2c9e7712014-10-20 11:55:05 -0700406
Yingdi Yu4647f022015-02-01 00:26:38 -0800407 close();
Yingdi Yu2c9e7712014-10-20 11:55:05 -0700408
Yingdi Yu4647f022015-02-01 00:26:38 -0800409 m_face->getIoService().stop();
Yingdi Yud45777b2014-10-16 23:54:11 -0700410}
411
412} // namespace chronos
413
414#if WAF
415#include "chat-dialog-backend.moc"
416// #include "chat-dialog-backend.cpp.moc"
417#endif