blob: 87eed37d046c87761f93ff0074b363ec6295c548 [file] [log] [blame]
Yingdi Yud45777b2014-10-16 23:54:11 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013, Regents of the University of California
4 * Yingdi Yu
5 *
6 * BSD license, See the LICENSE file for more information
7 *
8 * Author: Yingdi Yu <yingdi@cs.ucla.edu>
9 */
10
11#include "chat-dialog-backend.hpp"
12
13#ifndef Q_MOC_RUN
14#include <ndn-cxx/util/io.hpp>
15#include "logging.h"
16#endif
17
18
19INIT_LOGGER("ChatDialogBackend");
20
21namespace chronos {
22
23static const time::milliseconds FRESHNESS_PERIOD(60000);
24static const time::seconds HELLO_INTERVAL(60);
25static const uint8_t ROUTING_HINT_SEPARATOR[2] = {0xF0, 0x2E}; // %F0.
26
27ChatDialogBackend::ChatDialogBackend(const Name& chatroomPrefix,
28 const Name& userChatPrefix,
29 const Name& routingPrefix,
30 const std::string& chatroomName,
31 const std::string& nick,
32 QObject* parent)
33 : QThread(parent)
34 , m_localRoutingPrefix(routingPrefix)
35 , m_chatroomPrefix(chatroomPrefix)
36 , m_userChatPrefix(userChatPrefix)
37 , m_chatroomName(chatroomName)
38 , m_nick(nick)
39 , m_scheduler(m_face.getIoService())
40{
41 updatePrefixes();
42}
43
44
45ChatDialogBackend::~ChatDialogBackend()
46{
47}
48
49// protected methods:
50void
51ChatDialogBackend::run()
52{
53 initializeSync();
54
55 m_face.processEvents();
56
57 std::cerr << "Bye!" << std::endl;
58}
59
60// private methods:
61void
62ChatDialogBackend::initializeSync()
63{
64 QMutexLocker locker(&mutex);
65
66 // if a SyncSocket is running, turn it off
67 if (static_cast<bool>(m_sock)) {
68 if (m_joined)
69 sendLeave();
70 m_sock.reset();
71
72 usleep(100000);
73 }
74
75 // create a new SyncSocket
76 m_sock = make_shared<chronosync::Socket>(m_chatroomPrefix,
77 m_routableUserChatPrefix,
78 ref(m_face),
79 bind(&ChatDialogBackend::processSyncUpdate, this, _1));
80
81 // schedule a new join event
82 m_scheduler.scheduleEvent(time::milliseconds(600),
83 bind(&ChatDialogBackend::sendJoin, this));
84
85 // cancel existing hello event if it exists
86 if (static_cast<bool>(m_helloEventId)) {
87 m_scheduler.cancelEvent(m_helloEventId);
88 m_helloEventId.reset();
89 }
90}
91
92void
93ChatDialogBackend::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates)
94{
95 _LOG_DEBUG("<<< processing Tree Update");
96
97 if (updates.empty()) {
98 return;
99 }
100
101 std::vector<NodeInfo> nodeInfos;
102
103
104 for (int i = 0; i < updates.size(); i++) {
105 // update roster
106 if (m_roster.find(updates[i].session) == m_roster.end()) {
107 m_roster[updates[i].session].sessionPrefix = updates[i].session;
108 m_roster[updates[i].session].hasNick = false;
109 }
110
111 // fetch missing chat data
112 if (updates[i].high - updates[i].low < 3) {
113 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
114 m_sock->fetchData(updates[i].session, seq,
115 bind(&ChatDialogBackend::processChatData, this, _1, true),
116 2);
117 _LOG_DEBUG("<<< Fetching " << updates[i].session << "/" << seq);
118 }
119 }
120 else {
121 // There are too many msgs to fetch, let's just fetch the latest one
122 m_sock->fetchData(updates[i].session, updates[i].high,
123 bind(&ChatDialogBackend::processChatData, this, _1, false),
124 2);
125 }
126
127 // prepare notification to frontend
128 NodeInfo nodeInfo;
129 nodeInfo.sessionPrefix = QString::fromStdString(updates[i].session.toUri());
130 nodeInfo.seqNo = updates[i].high;
131 nodeInfos.push_back(nodeInfo);
132 }
133
134 // reflect the changes on GUI
135 emit syncTreeUpdated(nodeInfos,
136 QString::fromStdString(getHexEncodedDigest(m_sock->getRootDigest())));
137}
138
139void
140ChatDialogBackend::processChatData(const ndn::shared_ptr<const ndn::Data>& data, bool needDisplay)
141{
142 SyncDemo::ChatMessage msg;
143
144 if (!msg.ParseFromArray(data->getContent().value(), data->getContent().value_size())) {
145 _LOG_DEBUG("Errrrr.. Can not parse msg with name: " <<
146 data->getName() << ". what is happening?");
147 // nasty stuff: as a remedy, we'll form some standard msg for inparsable msgs
148 msg.set_from("inconnu");
149 msg.set_type(SyncDemo::ChatMessage::OTHER);
150 return;
151 }
152
153 Name remoteSessionPrefix = data->getName().getPrefix(-1);
154
155 if (msg.type() == SyncDemo::ChatMessage::LEAVE) {
156 BackendRoster::iterator it = m_roster.find(remoteSessionPrefix);
157
158 if (it != m_roster.end()) {
159 // cancel timeout event
160 if (static_cast<bool>(it->second.timeoutEventId))
161 m_scheduler.cancelEvent(it->second.timeoutEventId);
162
163 // notify frontend to remove the remote session (node)
164 emit sessionRemoved(QString::fromStdString(remoteSessionPrefix.toUri()),
165 QString::fromStdString(msg.from()),
166 msg.timestamp());
167
168 // remove roster entry
169 m_roster.erase(remoteSessionPrefix);
170 }
171 }
172 else {
173 BackendRoster::iterator it = m_roster.find(remoteSessionPrefix);
174
175 if (it == m_roster.end()) {
176 // Should not happen
177 BOOST_ASSERT(false);
178 }
179
180 // If we haven't got any message from this session yet.
181 if (m_roster[remoteSessionPrefix].hasNick == false) {
182 m_roster[remoteSessionPrefix].userNick = msg.from();
183 m_roster[remoteSessionPrefix].hasNick = true;
184 emit sessionAdded(QString::fromStdString(remoteSessionPrefix.toUri()),
185 QString::fromStdString(msg.from()),
186 msg.timestamp());
187 }
188
189 // If we get a new nick for an existing session, update it.
190 if (m_roster[remoteSessionPrefix].userNick != msg.from()) {
191 m_roster[remoteSessionPrefix].userNick = msg.from();
192 emit nickUpdated(QString::fromStdString(remoteSessionPrefix.toUri()),
193 QString::fromStdString(msg.from()));
194 }
195
196 // If a timeout event has been scheduled, cancel it.
197 if (static_cast<bool>(it->second.timeoutEventId))
198 m_scheduler.cancelEvent(it->second.timeoutEventId);
199
200 // (Re)schedule another timeout event after 3 HELLO_INTERVAL;
201 it->second.timeoutEventId =
202 m_scheduler.scheduleEvent(HELLO_INTERVAL * 3,
203 bind(&ChatDialogBackend::remoteSessionTimeout,
204 this, remoteSessionPrefix));
205
206 // If chat message, notify the frontend
207 if (msg.type() == SyncDemo::ChatMessage::CHAT)
208 emit chatMessageReceived(QString::fromStdString(msg.from()),
209 QString::fromStdString(msg.data()),
210 msg.timestamp());
211
212 // Notify frontend to plot notification on DigestTree.
213 emit messageReceived(QString::fromStdString(remoteSessionPrefix.toUri()));
214 }
215}
216
217void
218ChatDialogBackend::remoteSessionTimeout(const Name& sessionPrefix)
219{
220 time_t timestamp =
221 static_cast<time_t>(time::toUnixTimestamp(time::system_clock::now()).count() / 1000);
222
223 // notify frontend
224 emit sessionRemoved(QString::fromStdString(sessionPrefix.toUri()),
225 QString::fromStdString(m_roster[sessionPrefix].userNick),
226 timestamp);
227
228 // remove roster entry
229 m_roster.erase(sessionPrefix);
230}
231
232void
233ChatDialogBackend::sendMsg(SyncDemo::ChatMessage& msg)
234{
235 // send msg
236 ndn::OBufferStream os;
237 msg.SerializeToOstream(&os);
238
239 if (!msg.IsInitialized()) {
240 _LOG_DEBUG("Errrrr.. msg was not probally initialized " << __FILE__ <<
241 ":" << __LINE__ << ". what is happening?");
242 abort();
243 }
244
245 uint64_t nextSequence = m_sock->getLogic().getSeqNo() + 1;
246
247 m_sock->publishData(os.buf()->buf(), os.buf()->size(), FRESHNESS_PERIOD);
248
249 std::vector<NodeInfo> nodeInfos;
250 NodeInfo nodeInfo = {QString::fromStdString(m_routableUserChatPrefix.toUri()),
251 nextSequence};
252 nodeInfos.push_back(nodeInfo);
253
254 emit syncTreeUpdated(nodeInfos,
255 QString::fromStdString(getHexEncodedDigest(m_sock->getRootDigest())));
256}
257
258void
259ChatDialogBackend::sendJoin()
260{
261 m_joined = true;
262
263 SyncDemo::ChatMessage msg;
264 prepareControlMessage(msg, SyncDemo::ChatMessage::JOIN);
265 sendMsg(msg);
266
267 m_helloEventId = m_scheduler.scheduleEvent(HELLO_INTERVAL,
268 bind(&ChatDialogBackend::sendHello, this));
269
270 emit sessionAdded(QString::fromStdString(m_routableUserChatPrefix.toUri()),
271 QString::fromStdString(msg.from()),
272 msg.timestamp());
273}
274
275void
276ChatDialogBackend::sendHello()
277{
278 SyncDemo::ChatMessage msg;
279 prepareControlMessage(msg, SyncDemo::ChatMessage::HELLO);
280 sendMsg(msg);
281
282 m_helloEventId = m_scheduler.scheduleEvent(HELLO_INTERVAL,
283 bind(&ChatDialogBackend::sendHello, this));
284}
285
286void
287ChatDialogBackend::sendLeave()
288{
289 SyncDemo::ChatMessage msg;
290 prepareControlMessage(msg, SyncDemo::ChatMessage::LEAVE);
291 sendMsg(msg);
292
293 usleep(5000);
294 m_joined = false;
295}
296
297void
298ChatDialogBackend::prepareControlMessage(SyncDemo::ChatMessage& msg,
299 SyncDemo::ChatMessage::ChatMessageType type)
300{
301 msg.set_from(m_nick);
302 msg.set_to(m_chatroomName);
303 int32_t seconds =
304 static_cast<int32_t>(time::toUnixTimestamp(time::system_clock::now()).count() / 1000);
305 msg.set_timestamp(seconds);
306 msg.set_type(type);
307}
308
309void
310ChatDialogBackend::prepareChatMessage(const QString& text,
311 time_t timestamp,
312 SyncDemo::ChatMessage &msg)
313{
314 msg.set_from(m_nick);
315 msg.set_to(m_chatroomName);
316 msg.set_data(text.toStdString());
317 msg.set_timestamp(timestamp);
318 msg.set_type(SyncDemo::ChatMessage::CHAT);
319}
320
321void
322ChatDialogBackend::updatePrefixes()
323{
324 m_routableUserChatPrefix.clear();
325
326 if (m_localRoutingPrefix.isPrefixOf(m_userChatPrefix))
327 m_routableUserChatPrefix = m_userChatPrefix;
328 else
329 m_routableUserChatPrefix.append(m_localRoutingPrefix)
330 .append(ROUTING_HINT_SEPARATOR, 2)
331 .append(m_userChatPrefix);
332
333 emit chatPrefixChanged(m_routableUserChatPrefix);
334}
335
336std::string
337ChatDialogBackend::getHexEncodedDigest(ndn::ConstBufferPtr digest)
338{
339 std::stringstream os;
340
341 CryptoPP::StringSource(digest->buf(), digest->size(), true,
342 new CryptoPP::HexEncoder(new CryptoPP::FileSink(os), false));
343 return os.str();
344}
345
346
347// public slots:
348void
349ChatDialogBackend::sendChatMessage(QString text, time_t timestamp)
350{
351 SyncDemo::ChatMessage msg;
352 prepareChatMessage(text, timestamp, msg);
353 sendMsg(msg);
354
355 emit chatMessageReceived(QString::fromStdString(msg.from()),
356 QString::fromStdString(msg.data()),
357 msg.timestamp());
358}
359
360void
361ChatDialogBackend::updateRoutingPrefix(const QString& localRoutingPrefix)
362{
363 Name newLocalRoutingPrefix(localRoutingPrefix.toStdString());
364
365 if (!newLocalRoutingPrefix.empty() && newLocalRoutingPrefix != m_localRoutingPrefix) {
366 // Update localPrefix
367 m_localRoutingPrefix = newLocalRoutingPrefix;
368
369 updatePrefixes();
370
371 initializeSync();
372 }
373}
374
375void
376ChatDialogBackend::shutdown()
377{
378 m_face.getIoService().stop();
379}
380
381} // namespace chronos
382
383#if WAF
384#include "chat-dialog-backend.moc"
385// #include "chat-dialog-backend.cpp.moc"
386#endif