blob: 0c5ee66831ca830f99885f55ae506c52ff186e6f [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Regents of the University of California.
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef REPO_NDN_HANDLE_WRITE_HANDLE_HPP
8#define REPO_NDN_HANDLE_WRITE_HANDLE_HPP
9
10#include "ndn-handle-common.hpp"
11#include "base-handle.hpp"
12#include <queue>
13
14namespace repo {
15
16using std::map;
17using std::pair;
18using std::queue;
19
20/**
21* @brief WriteHandle provides basic credit based congestion control.
22*
23* First repo sends interests of credit number and then credit will be 0.
24*
25* If a data comes, credit++ and sends a interest then credit--.
26*
27* If the interest timeout, repo will retry and send interest in retrytimes.
28*
29* If one interest timeout beyond retrytimes, the fetching process will terminate.
30*
31* Another case is that if command will insert segmented data without EndBlockId.
32*
33* The repo will keep fetching data in noendTimeout time.
34*
35* If data returns with FinalBlockId, this detecting timeout process will terminate.
36*
37* If client sends a insert check command, the noendTimeout timer will be set to 0.
38*
39* If repo cannot get FinalBlockId in noendTimeout time, the fetching process will terminate.
40*/
41class WriteHandle : public BaseHandle
42{
43
44public:
45 class Error : public BaseHandle::Error
46 {
47 public:
48 explicit
49 Error(const std::string& what)
50 : BaseHandle::Error(what)
51 {
52 }
53 };
54
55
56public:
57 WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
58 Scheduler& scheduler, CommandInterestValidator& validator);
59
60 virtual void
61 listen(const Name& prefix);
62
63private:
64 /**
65 * @brief Information of insert process including variables for response
66 * and credit based flow control
67 */
68 struct ProcessInfo
69 {
70 //ProcessId id;
71 RepoCommandResponse response;
72 queue<SegmentNo> nextSegmentQueue; ///< queue of waiting segment
73 /// to be sent when having credits
74 SegmentNo nextSegment; ///< last segment put into the nextSegmentQueue
75 map<SegmentNo, int> retryCounts; ///< to store retrying times of timeout segment
76 int credit; ///< congestion control credits of process
77
78 /**
79 * @brief the latest time point at which EndBlockId must be determined
80 *
81 * Segmented fetch process will terminate if EndBlockId cannot be
82 * determined before this time point.
83 * It is initialized to now()+noEndTimeout when segmented fetch process begins,
84 * and reset to now()+noEndTimeout each time an insert status check command is processed.
85 */
86 ndn::time::steady_clock::TimePoint noEndTime;
87 };
88
89private: // insert command
90 /**
91 * @brief handle insert commands
92 */
93 void
94 onInterest(const Name& prefix, const Interest& interest);
95
96 void
97 onValidated(const shared_ptr<const Interest>& interest, const Name& prefix);
98
99 void
100 onValidationFailed(const shared_ptr<const Interest>& interest);
101
102 /**
103 * @brief insert command prefix register failed
104 */
105 void
106 onRegisterFailed(const Name& prefix, const std::string& reason);
107
108private: // single data fetching
109 /**
110 * @brief fetch one data
111 */
112 void
113 onData(const Interest& interest, Data& data, ProcessId processId);
114
115 /**
116 * @brief handle when fetching one data timeout
117 */
118 void
119 onTimeout(const Interest& interest, ProcessId processId);
120
121 void
122 processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
123
124private: // segmented data fetching
125 /**
126 * @brief fetch segmented data
127 */
128 void
129 onSegmentData(const Interest& interest, Data& data, ProcessId processId);
130
131 /**
132 * @brief Timeout when fetching segmented data. Data can be fetched RETRY_TIMEOUT times.
133 */
134 void
135 onSegmentTimeout(const Interest& interest, ProcessId processId);
136
137 /**
138 * @brief initiate fetching segmented data
139 */
140 void
141 segInit(ProcessId processId, const RepoCommandParameter& parameter);
142
143 /**
144 * @brief control for sending interests in function onSegmentData()
145 */
146 void
147 onSegmentDataControl(ProcessId processId, const Interest& interest);
148
149 /**
150 * @brief control for sending interest in function onSegmentTimeout
151 */
152 void
153 onSegmentTimeoutControl(ProcessId processId, const Interest& interest);
154
155 void
156 processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
157
158 /**
159 * @brief extends noEndTime of process if not noEndTimeout, set StatusCode 405
160 *
161 * called by onCheckValidated() if there is no EndBlockId. If not noEndTimeout,
162 * extends noEndTime of process. If noEndTimeout, set status code 405 as noEndTimeout.
163 */
164 void
165 extendNoEndTime(ProcessInfo& process);
166
167private: // insert state check command
168 /**
169 * @brief handle insert check command
170 */
171 void
172 onCheckInterest(const Name& prefix, const Interest& interest);
173
174 /**
175 * @brief insert check command prefix register failed
176 */
177 void
178 onCheckRegisterFailed(const Name& prefix, const std::string& reason);
179
180 void
181 onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix);
182
183 void
184 onCheckValidationFailed(const shared_ptr<const Interest>& interest);
185
186private:
187 void
188 deleteProcess(ProcessId processId);
189
190 /**
191 * @brief schedule a event to delete the process
192 */
193 void
194 deferredDeleteProcess(ProcessId processId);
195
196 void
197 negativeReply(const Interest& interest, int statusCode);
198
199private:
200
201 CommandInterestValidator& m_validator;
202
203 map<ProcessId, ProcessInfo> m_processes;
204
205 int m_retryTime;
206 int m_credit;
207 ndn::time::milliseconds m_noEndTimeout;
208};
209
210} // namespace repo
211
212#endif // REPO_NDN_HANDLE_WRITE_HANDLE_HPP