blob: 7c4e41010ec118e9dad396851f00941259049bf4 [file] [log] [blame]
akmhoque66e66182014-02-21 17:56:03 -06001/* -*- Mode: C32++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Author: Yingdi Yu <yingdi@cs.ucla.edu>
19 */
20
21#include "sync-socket.h"
22#include "sync-logging.h"
23
24using namespace std;
25using namespace ndn;
26
27INIT_LOGGER ("SyncSocket");
28
29namespace Sync {
30
31using ndn::shared_ptr;
Yingdi Yu40cd1c32014-04-17 15:02:17 -070032using ndn::make_shared;
akmhoque66e66182014-02-21 17:56:03 -060033
Yingdi Yu40cd1c32014-04-17 15:02:17 -070034SyncSocket::SyncSocket (const Name &syncPrefix,
akmhoque66e66182014-02-21 17:56:03 -060035 shared_ptr<Validator> validator,
36 shared_ptr<Face> face,
Yingdi Yu40cd1c32014-04-17 15:02:17 -070037 NewDataCallback dataCallback,
akmhoque66e66182014-02-21 17:56:03 -060038 RemoveCallback rmCallback )
39 : m_newDataCallback(dataCallback)
40 , m_validator(validator)
41 , m_keyChain(new KeyChain())
42 , m_face(face)
43 , m_ioService(face->ioService())
44 , m_syncLogic (syncPrefix,
45 validator,
46 face,
47 bind(&SyncSocket::passCallback, this, _1),
48 rmCallback)
49{}
50
51SyncSocket::~SyncSocket()
52{
53}
54
Yingdi Yu40cd1c32014-04-17 15:02:17 -070055bool
akmhoque66e66182014-02-21 17:56:03 -060056SyncSocket::publishData(const Name &prefix, uint64_t session, const char *buf, size_t len, int freshness,uint64_t seq)
57{
58 shared_ptr<Data> data = make_shared<Data>();
59 data->setContent(reinterpret_cast<const uint8_t*>(buf), len);
akmhoque05d5fcf2014-04-15 14:58:45 -050060 data->setFreshnessPeriod(time::seconds(freshness));
akmhoque66e66182014-02-21 17:56:03 -060061
62 m_ioService->post(bind(&SyncSocket::publishDataInternal, this, data, prefix, session,seq));
63
Yingdi Yu40cd1c32014-04-17 15:02:17 -070064 return true;
akmhoque66e66182014-02-21 17:56:03 -060065}
66
67void
68SyncSocket::publishDataInternal(shared_ptr<Data> data, const Name &prefix, uint64_t session,uint64_t seq)
69{
70 uint64_t sequence = seq;
71 Name dataName = prefix;
72 dataName.append(boost::lexical_cast<string>(session)).append(boost::lexical_cast<string>(sequence));
73 data->setName(dataName);
74
Yingdi Yu40cd1c32014-04-17 15:02:17 -070075 m_keyChain->sign(*data);
akmhoque66e66182014-02-21 17:56:03 -060076 m_face->put(*data);
77
78 SeqNo s(session, sequence + 1);
79
80 m_sequenceLog[prefix] = s;
81 m_syncLogic.addLocalNames (prefix, session, sequence);
82}
83
Yingdi Yu40cd1c32014-04-17 15:02:17 -070084void
akmhoque66e66182014-02-21 17:56:03 -060085SyncSocket::fetchData(const Name &prefix, const SeqNo &seq, const OnDataValidated& onValidated, int retry)
86{
87 Name interestName = prefix;
88 interestName.append(boost::lexical_cast<string>(seq.getSession())).append(boost::lexical_cast<string>(seq.getSeq()));
89
90 const OnDataValidationFailed& onValidationFailed = bind(&SyncSocket::onDataValidationFailed, this, _1);
Yingdi Yu40cd1c32014-04-17 15:02:17 -070091
akmhoque66e66182014-02-21 17:56:03 -060092 ndn::Interest interest(interestName);
93 interest.setMustBeFresh(true);
Yingdi Yu40cd1c32014-04-17 15:02:17 -070094 m_face->expressInterest(interest,
95 bind(&SyncSocket::onData, this, _1, _2, onValidated, onValidationFailed),
akmhoque66e66182014-02-21 17:56:03 -060096 bind(&SyncSocket::onDataTimeout, this, _1, retry, onValidated, onValidationFailed));
97
98}
99
100void
101SyncSocket::onData(const ndn::Interest& interest, Data& data,
102 const OnDataValidated& onValidated,
103 const OnDataValidationFailed& onValidationFailed)
104{
105 m_validator->validate(data, onValidated, onValidationFailed);
106}
107
108void
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700109SyncSocket::onDataTimeout(const ndn::Interest& interest,
akmhoque66e66182014-02-21 17:56:03 -0600110 int retry,
111 const OnDataValidated& onValidated,
112 const OnDataValidationFailed& onValidationFailed)
113{
114 if(retry > 0)
115 {
116 m_face->expressInterest(interest,
117 bind(&SyncSocket::onData,
118 this,
119 _1,
120 _2,
121 onValidated,
122 onValidationFailed),
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700123 bind(&SyncSocket::onDataTimeout,
akmhoque66e66182014-02-21 17:56:03 -0600124 this,
125 _1,
126 retry - 1,
127 onValidated,
128 onValidationFailed));
Yingdi Yu40cd1c32014-04-17 15:02:17 -0700129
akmhoque66e66182014-02-21 17:56:03 -0600130 }
131 else
132 _LOG_DEBUG("interest eventually time out!");
133}
134
135void
136SyncSocket::onDataValidationFailed(const shared_ptr<const Data>& data)
137{
138 _LOG_DEBUG("data cannot be verified!");
139}
140
141
142uint64_t
143SyncSocket::getNextSeq (const Name &prefix, uint64_t session)
144{
145 SequenceLog::iterator i = m_sequenceLog.find (prefix);
146
147 if (i != m_sequenceLog.end ())
148 {
149 SeqNo s = i->second;
150 if (s.getSession() == session)
151 return s.getSeq();
152 }
153 return 0;
154}
155
156}//Sync