blob: 680d24be9683bd16015b0bf9c248812c561131b4 [file] [log] [blame]
Yingdi Yud45777b2014-10-16 23:54:11 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013, Regents of the University of California
4 * Yingdi Yu
5 *
6 * BSD license, See the LICENSE file for more information
7 *
8 * Author: Yingdi Yu <yingdi@cs.ucla.edu>
9 */
10
11#include "chat-dialog-backend.hpp"
12
13#ifndef Q_MOC_RUN
14#include <ndn-cxx/util/io.hpp>
15#include "logging.h"
16#endif
17
18
19INIT_LOGGER("ChatDialogBackend");
20
21namespace chronos {
22
23static const time::milliseconds FRESHNESS_PERIOD(60000);
24static const time::seconds HELLO_INTERVAL(60);
25static const uint8_t ROUTING_HINT_SEPARATOR[2] = {0xF0, 0x2E}; // %F0.
26
27ChatDialogBackend::ChatDialogBackend(const Name& chatroomPrefix,
28 const Name& userChatPrefix,
29 const Name& routingPrefix,
30 const std::string& chatroomName,
31 const std::string& nick,
32 QObject* parent)
33 : QThread(parent)
34 , m_localRoutingPrefix(routingPrefix)
35 , m_chatroomPrefix(chatroomPrefix)
36 , m_userChatPrefix(userChatPrefix)
37 , m_chatroomName(chatroomName)
38 , m_nick(nick)
39 , m_scheduler(m_face.getIoService())
40{
41 updatePrefixes();
42}
43
44
45ChatDialogBackend::~ChatDialogBackend()
46{
47}
48
49// protected methods:
50void
51ChatDialogBackend::run()
52{
53 initializeSync();
54
55 m_face.processEvents();
56
57 std::cerr << "Bye!" << std::endl;
58}
59
60// private methods:
61void
62ChatDialogBackend::initializeSync()
63{
Yingdi Yud45777b2014-10-16 23:54:11 -070064 // if a SyncSocket is running, turn it off
65 if (static_cast<bool>(m_sock)) {
66 if (m_joined)
67 sendLeave();
68 m_sock.reset();
69
70 usleep(100000);
71 }
72
73 // create a new SyncSocket
74 m_sock = make_shared<chronosync::Socket>(m_chatroomPrefix,
75 m_routableUserChatPrefix,
76 ref(m_face),
77 bind(&ChatDialogBackend::processSyncUpdate, this, _1));
78
79 // schedule a new join event
80 m_scheduler.scheduleEvent(time::milliseconds(600),
81 bind(&ChatDialogBackend::sendJoin, this));
82
83 // cancel existing hello event if it exists
84 if (static_cast<bool>(m_helloEventId)) {
85 m_scheduler.cancelEvent(m_helloEventId);
86 m_helloEventId.reset();
87 }
88}
89
90void
91ChatDialogBackend::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates)
92{
93 _LOG_DEBUG("<<< processing Tree Update");
94
95 if (updates.empty()) {
96 return;
97 }
98
99 std::vector<NodeInfo> nodeInfos;
100
101
102 for (int i = 0; i < updates.size(); i++) {
103 // update roster
104 if (m_roster.find(updates[i].session) == m_roster.end()) {
105 m_roster[updates[i].session].sessionPrefix = updates[i].session;
106 m_roster[updates[i].session].hasNick = false;
107 }
108
109 // fetch missing chat data
110 if (updates[i].high - updates[i].low < 3) {
111 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
112 m_sock->fetchData(updates[i].session, seq,
113 bind(&ChatDialogBackend::processChatData, this, _1, true),
114 2);
115 _LOG_DEBUG("<<< Fetching " << updates[i].session << "/" << seq);
116 }
117 }
118 else {
119 // There are too many msgs to fetch, let's just fetch the latest one
120 m_sock->fetchData(updates[i].session, updates[i].high,
121 bind(&ChatDialogBackend::processChatData, this, _1, false),
122 2);
123 }
124
125 // prepare notification to frontend
126 NodeInfo nodeInfo;
127 nodeInfo.sessionPrefix = QString::fromStdString(updates[i].session.toUri());
128 nodeInfo.seqNo = updates[i].high;
129 nodeInfos.push_back(nodeInfo);
130 }
131
132 // reflect the changes on GUI
133 emit syncTreeUpdated(nodeInfos,
134 QString::fromStdString(getHexEncodedDigest(m_sock->getRootDigest())));
135}
136
137void
138ChatDialogBackend::processChatData(const ndn::shared_ptr<const ndn::Data>& data, bool needDisplay)
139{
140 SyncDemo::ChatMessage msg;
141
142 if (!msg.ParseFromArray(data->getContent().value(), data->getContent().value_size())) {
143 _LOG_DEBUG("Errrrr.. Can not parse msg with name: " <<
144 data->getName() << ". what is happening?");
145 // nasty stuff: as a remedy, we'll form some standard msg for inparsable msgs
146 msg.set_from("inconnu");
147 msg.set_type(SyncDemo::ChatMessage::OTHER);
148 return;
149 }
150
151 Name remoteSessionPrefix = data->getName().getPrefix(-1);
152
153 if (msg.type() == SyncDemo::ChatMessage::LEAVE) {
154 BackendRoster::iterator it = m_roster.find(remoteSessionPrefix);
155
156 if (it != m_roster.end()) {
157 // cancel timeout event
158 if (static_cast<bool>(it->second.timeoutEventId))
159 m_scheduler.cancelEvent(it->second.timeoutEventId);
160
161 // notify frontend to remove the remote session (node)
162 emit sessionRemoved(QString::fromStdString(remoteSessionPrefix.toUri()),
163 QString::fromStdString(msg.from()),
164 msg.timestamp());
165
166 // remove roster entry
167 m_roster.erase(remoteSessionPrefix);
168 }
169 }
170 else {
171 BackendRoster::iterator it = m_roster.find(remoteSessionPrefix);
172
173 if (it == m_roster.end()) {
174 // Should not happen
175 BOOST_ASSERT(false);
176 }
177
178 // If we haven't got any message from this session yet.
179 if (m_roster[remoteSessionPrefix].hasNick == false) {
180 m_roster[remoteSessionPrefix].userNick = msg.from();
181 m_roster[remoteSessionPrefix].hasNick = true;
182 emit sessionAdded(QString::fromStdString(remoteSessionPrefix.toUri()),
183 QString::fromStdString(msg.from()),
184 msg.timestamp());
185 }
186
187 // If we get a new nick for an existing session, update it.
188 if (m_roster[remoteSessionPrefix].userNick != msg.from()) {
189 m_roster[remoteSessionPrefix].userNick = msg.from();
190 emit nickUpdated(QString::fromStdString(remoteSessionPrefix.toUri()),
191 QString::fromStdString(msg.from()));
192 }
193
194 // If a timeout event has been scheduled, cancel it.
195 if (static_cast<bool>(it->second.timeoutEventId))
196 m_scheduler.cancelEvent(it->second.timeoutEventId);
197
198 // (Re)schedule another timeout event after 3 HELLO_INTERVAL;
199 it->second.timeoutEventId =
200 m_scheduler.scheduleEvent(HELLO_INTERVAL * 3,
201 bind(&ChatDialogBackend::remoteSessionTimeout,
202 this, remoteSessionPrefix));
203
204 // If chat message, notify the frontend
205 if (msg.type() == SyncDemo::ChatMessage::CHAT)
206 emit chatMessageReceived(QString::fromStdString(msg.from()),
207 QString::fromStdString(msg.data()),
208 msg.timestamp());
209
210 // Notify frontend to plot notification on DigestTree.
211 emit messageReceived(QString::fromStdString(remoteSessionPrefix.toUri()));
212 }
213}
214
215void
216ChatDialogBackend::remoteSessionTimeout(const Name& sessionPrefix)
217{
218 time_t timestamp =
219 static_cast<time_t>(time::toUnixTimestamp(time::system_clock::now()).count() / 1000);
220
221 // notify frontend
222 emit sessionRemoved(QString::fromStdString(sessionPrefix.toUri()),
223 QString::fromStdString(m_roster[sessionPrefix].userNick),
224 timestamp);
225
226 // remove roster entry
227 m_roster.erase(sessionPrefix);
228}
229
230void
231ChatDialogBackend::sendMsg(SyncDemo::ChatMessage& msg)
232{
233 // send msg
234 ndn::OBufferStream os;
235 msg.SerializeToOstream(&os);
236
237 if (!msg.IsInitialized()) {
238 _LOG_DEBUG("Errrrr.. msg was not probally initialized " << __FILE__ <<
239 ":" << __LINE__ << ". what is happening?");
240 abort();
241 }
242
243 uint64_t nextSequence = m_sock->getLogic().getSeqNo() + 1;
244
245 m_sock->publishData(os.buf()->buf(), os.buf()->size(), FRESHNESS_PERIOD);
246
247 std::vector<NodeInfo> nodeInfos;
248 NodeInfo nodeInfo = {QString::fromStdString(m_routableUserChatPrefix.toUri()),
249 nextSequence};
250 nodeInfos.push_back(nodeInfo);
251
252 emit syncTreeUpdated(nodeInfos,
253 QString::fromStdString(getHexEncodedDigest(m_sock->getRootDigest())));
254}
255
256void
257ChatDialogBackend::sendJoin()
258{
259 m_joined = true;
260
261 SyncDemo::ChatMessage msg;
262 prepareControlMessage(msg, SyncDemo::ChatMessage::JOIN);
263 sendMsg(msg);
264
265 m_helloEventId = m_scheduler.scheduleEvent(HELLO_INTERVAL,
266 bind(&ChatDialogBackend::sendHello, this));
267
268 emit sessionAdded(QString::fromStdString(m_routableUserChatPrefix.toUri()),
269 QString::fromStdString(msg.from()),
270 msg.timestamp());
271}
272
273void
274ChatDialogBackend::sendHello()
275{
276 SyncDemo::ChatMessage msg;
277 prepareControlMessage(msg, SyncDemo::ChatMessage::HELLO);
278 sendMsg(msg);
279
280 m_helloEventId = m_scheduler.scheduleEvent(HELLO_INTERVAL,
281 bind(&ChatDialogBackend::sendHello, this));
282}
283
284void
285ChatDialogBackend::sendLeave()
286{
287 SyncDemo::ChatMessage msg;
288 prepareControlMessage(msg, SyncDemo::ChatMessage::LEAVE);
289 sendMsg(msg);
290
291 usleep(5000);
292 m_joined = false;
293}
294
295void
296ChatDialogBackend::prepareControlMessage(SyncDemo::ChatMessage& msg,
297 SyncDemo::ChatMessage::ChatMessageType type)
298{
299 msg.set_from(m_nick);
300 msg.set_to(m_chatroomName);
301 int32_t seconds =
302 static_cast<int32_t>(time::toUnixTimestamp(time::system_clock::now()).count() / 1000);
303 msg.set_timestamp(seconds);
304 msg.set_type(type);
305}
306
307void
308ChatDialogBackend::prepareChatMessage(const QString& text,
309 time_t timestamp,
310 SyncDemo::ChatMessage &msg)
311{
312 msg.set_from(m_nick);
313 msg.set_to(m_chatroomName);
314 msg.set_data(text.toStdString());
315 msg.set_timestamp(timestamp);
316 msg.set_type(SyncDemo::ChatMessage::CHAT);
317}
318
319void
320ChatDialogBackend::updatePrefixes()
321{
322 m_routableUserChatPrefix.clear();
323
324 if (m_localRoutingPrefix.isPrefixOf(m_userChatPrefix))
325 m_routableUserChatPrefix = m_userChatPrefix;
326 else
327 m_routableUserChatPrefix.append(m_localRoutingPrefix)
328 .append(ROUTING_HINT_SEPARATOR, 2)
329 .append(m_userChatPrefix);
330
331 emit chatPrefixChanged(m_routableUserChatPrefix);
332}
333
334std::string
335ChatDialogBackend::getHexEncodedDigest(ndn::ConstBufferPtr digest)
336{
337 std::stringstream os;
338
339 CryptoPP::StringSource(digest->buf(), digest->size(), true,
340 new CryptoPP::HexEncoder(new CryptoPP::FileSink(os), false));
341 return os.str();
342}
343
344
345// public slots:
346void
347ChatDialogBackend::sendChatMessage(QString text, time_t timestamp)
348{
349 SyncDemo::ChatMessage msg;
350 prepareChatMessage(text, timestamp, msg);
351 sendMsg(msg);
352
353 emit chatMessageReceived(QString::fromStdString(msg.from()),
354 QString::fromStdString(msg.data()),
355 msg.timestamp());
356}
357
358void
359ChatDialogBackend::updateRoutingPrefix(const QString& localRoutingPrefix)
360{
361 Name newLocalRoutingPrefix(localRoutingPrefix.toStdString());
362
363 if (!newLocalRoutingPrefix.empty() && newLocalRoutingPrefix != m_localRoutingPrefix) {
364 // Update localPrefix
365 m_localRoutingPrefix = newLocalRoutingPrefix;
366
367 updatePrefixes();
368
369 initializeSync();
370 }
371}
372
373void
374ChatDialogBackend::shutdown()
375{
Yingdi Yu2c9e7712014-10-20 11:55:05 -0700376 if (static_cast<bool>(m_sock)) {
377 if (m_joined)
378 sendLeave();
379 m_sock.reset();
380
381 usleep(100000);
382 }
383
Yingdi Yud45777b2014-10-16 23:54:11 -0700384 m_face.getIoService().stop();
385}
386
387} // namespace chronos
388
389#if WAF
390#include "chat-dialog-backend.moc"
391// #include "chat-dialog-backend.cpp.moc"
392#endif