xrootd
XrdClStream.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef __XRD_CL_STREAM_HH__
20#define __XRD_CL_STREAM_HH__
21
22#include "XrdCl/XrdClPoller.hh"
23#include "XrdCl/XrdClStatus.hh"
24#include "XrdCl/XrdClURL.hh"
28#include "XrdCl/XrdClInQueue.hh"
29#include "XrdCl/XrdClUtils.hh"
30
32#include "XrdNet/XrdNetAddr.hh"
33#include <list>
34#include <vector>
35#include <functional>
36#include <memory>
37
38namespace XrdCl
39{
40 class Message;
41 class Channel;
42 class TransportHandler;
43 class TaskManager;
44 struct SubStreamData;
45
46 //----------------------------------------------------------------------------
48 //----------------------------------------------------------------------------
49 class Stream
50 {
51 public:
52 //------------------------------------------------------------------------
54 //------------------------------------------------------------------------
56 {
60 Error = 3
61 };
62
63 //------------------------------------------------------------------------
65 //------------------------------------------------------------------------
66 Stream( const URL *url, const URL &prefer = URL() );
67
68 //------------------------------------------------------------------------
70 //------------------------------------------------------------------------
72
73 //------------------------------------------------------------------------
75 //------------------------------------------------------------------------
77
78 //------------------------------------------------------------------------
80 //------------------------------------------------------------------------
82 MsgHandler *handler,
83 bool stateful,
84 time_t expires );
85
86 //------------------------------------------------------------------------
88 //------------------------------------------------------------------------
89 void SetTransport( TransportHandler *transport )
90 {
91 pTransport = transport;
92 }
93
94 //------------------------------------------------------------------------
96 //------------------------------------------------------------------------
97 void SetPoller( Poller *poller )
98 {
99 pPoller = poller;
100 }
101
102 //------------------------------------------------------------------------
104 //------------------------------------------------------------------------
105 void SetIncomingQueue( InQueue *incomingQueue )
106 {
107 pIncomingQueue = incomingQueue;
108 }
109
110 //------------------------------------------------------------------------
112 //------------------------------------------------------------------------
113 void SetChannelData( AnyObject *channelData )
114 {
115 pChannelData = channelData;
116 }
117
118 //------------------------------------------------------------------------
120 //------------------------------------------------------------------------
121 void SetTaskManager( TaskManager *taskManager )
122 {
123 pTaskManager = taskManager;
124 }
125
126 //------------------------------------------------------------------------
128 //------------------------------------------------------------------------
129 void SetJobManager( JobManager *jobManager )
130 {
131 pJobManager = jobManager;
132 }
133
134 //------------------------------------------------------------------------
138 //------------------------------------------------------------------------
140
141 //------------------------------------------------------------------------
143 //------------------------------------------------------------------------
144 void Disconnect( bool force = false );
145
146 //------------------------------------------------------------------------
149 //------------------------------------------------------------------------
150 void Tick( time_t now );
151
152 //------------------------------------------------------------------------
154 //------------------------------------------------------------------------
155 const URL *GetURL() const
156 {
157 return pUrl;
158 }
159
160 //------------------------------------------------------------------------
162 //------------------------------------------------------------------------
164
165 //------------------------------------------------------------------------
167 //------------------------------------------------------------------------
168 const std::string &GetName() const
169 {
170 return pStreamName;
171 }
172
173 //------------------------------------------------------------------------
175 //------------------------------------------------------------------------
176 void DisableIfEmpty( uint16_t subStream );
177
178 //------------------------------------------------------------------------
180 //------------------------------------------------------------------------
181 void OnIncoming( uint16_t subStream,
182 std::shared_ptr<Message> msg,
183 uint32_t bytesReceived );
184
185 //------------------------------------------------------------------------
186 // Call when one of the sockets is ready to accept a new message
187 //------------------------------------------------------------------------
188 std::pair<Message *, MsgHandler *>
189 OnReadyToWrite( uint16_t subStream );
190
191 //------------------------------------------------------------------------
192 // Call when a message is written to the socket
193 //------------------------------------------------------------------------
194 void OnMessageSent( uint16_t subStream,
195 Message *msg,
196 uint32_t bytesSent );
197
198 //------------------------------------------------------------------------
200 //------------------------------------------------------------------------
201 void OnConnect( uint16_t subStream );
202
203 //------------------------------------------------------------------------
205 //------------------------------------------------------------------------
206 void OnConnectError( uint16_t subStream, XRootDStatus status );
207
208 //------------------------------------------------------------------------
210 //------------------------------------------------------------------------
211 void OnError( uint16_t subStream, XRootDStatus status );
212
213 //------------------------------------------------------------------------
215 //------------------------------------------------------------------------
216 void ForceError( XRootDStatus status );
217
218 //------------------------------------------------------------------------
220 //------------------------------------------------------------------------
221 void OnReadTimeout( uint16_t subStream );
222
223 //------------------------------------------------------------------------
225 //------------------------------------------------------------------------
226 void OnWriteTimeout( uint16_t subStream );
227
228 //------------------------------------------------------------------------
230 //------------------------------------------------------------------------
232
233 //------------------------------------------------------------------------
235 //------------------------------------------------------------------------
237
238 //------------------------------------------------------------------------
247 //------------------------------------------------------------------------
249 InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
250
251 //------------------------------------------------------------------------
255 //------------------------------------------------------------------------
256 uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
257
258 //------------------------------------------------------------------------
260 //------------------------------------------------------------------------
261 void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
262 {
263 XrdSysMutexHelper scopedLock( pMutex );
264 pOnDataConnJob = onConnJob;
265 }
266
267 //------------------------------------------------------------------------
270 //------------------------------------------------------------------------
271 bool CanCollapse( const URL &url );
272
273 //------------------------------------------------------------------------
275 //------------------------------------------------------------------------
276 Status Query( uint16_t query, AnyObject &result );
277
278 private:
279
280 //------------------------------------------------------------------------
282 //------------------------------------------------------------------------
283 static bool IsPartial( Message &msg );
284
285 //------------------------------------------------------------------------
287 //------------------------------------------------------------------------
288 inline static bool HasNetAddr( const XrdNetAddr &addr,
289 std::vector<XrdNetAddr> &addresses )
290 {
291 auto itr = addresses.begin();
292 for( ; itr != addresses.end() ; ++itr )
293 {
294 if( itr->Same( &addr ) ) return true;
295 }
296
297 return false;
298 }
299
300 //------------------------------------------------------------------------
301 // Job handling the incoming messages
302 //------------------------------------------------------------------------
303 class HandleIncMsgJob: public Job
304 {
305 public:
306 HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
307 virtual ~HandleIncMsgJob() {};
308 virtual void Run( void* )
309 {
310 pHandler->Process();
311 delete this;
312 }
313 private:
315 };
316
317 //------------------------------------------------------------------------
319 //------------------------------------------------------------------------
320 void OnFatalError( uint16_t subStream,
321 XRootDStatus status,
322 XrdSysMutexHelper &lock );
323
324 //------------------------------------------------------------------------
326 //------------------------------------------------------------------------
328
329 //------------------------------------------------------------------------
331 //------------------------------------------------------------------------
333
334 typedef std::vector<SubStreamData*> SubStreamList;
335
336 //------------------------------------------------------------------------
337 // Data members
338 //------------------------------------------------------------------------
339 const URL *pUrl;
341 std::string pStreamName;
357 std::vector<XrdNetAddr> pAddresses;
360 uint64_t pSessionId;
361
362 //------------------------------------------------------------------------
363 // Monitoring info
364 //------------------------------------------------------------------------
367 uint64_t pBytesSent;
369
370 //------------------------------------------------------------------------
371 // Data stream on-connect handler
372 //------------------------------------------------------------------------
373 std::shared_ptr<Job> pOnDataConnJob;
374 };
375}
376
377#endif // __XRD_CL_STREAM_HH__
Definition: XrdClAnyObject.hh:33
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:210
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:34
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:37
A synchronized queue.
Definition: XrdClJobManager.hh:51
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:34
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
Message handler.
Definition: XrdClPostMasterInterfaces.hh:51
virtual void Process()
Definition: XrdClPostMasterInterfaces.hh:125
Interface for socket pollers.
Definition: XrdClPoller.hh:87
Definition: XrdClStream.hh:304
virtual void Run(void *)
The job logic.
Definition: XrdClStream.hh:308
MsgHandler * pHandler
Definition: XrdClStream.hh:314
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:307
HandleIncMsgJob(MsgHandler *handler)
Definition: XrdClStream.hh:306
Stream.
Definition: XrdClStream.hh:50
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
static bool HasNetAddr(const XrdNetAddr &addr, std::vector< XrdNetAddr > &addresses)
Check if addresses contains given address.
Definition: XrdClStream.hh:288
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:89
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:56
@ Disconnected
Not connected.
Definition: XrdClStream.hh:57
@ Error
Broken.
Definition: XrdClStream.hh:60
@ Connected
Connected.
Definition: XrdClStream.hh:58
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:59
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:105
timeval pConnectionStarted
Definition: XrdClStream.hh:365
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:334
bool CanCollapse(const URL &url)
InQueue * pIncomingQueue
Definition: XrdClStream.hh:347
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:346
Utils::AddressType pAddressType
Definition: XrdClStream.hh:358
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:97
void ForceConnect()
Force connection.
XRootDStatus pLastFatalError
Definition: XrdClStream.hh:350
time_t pConnectionInitTime
Definition: XrdClStream.hh:354
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:121
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:261
Poller * pPoller
Definition: XrdClStream.hh:343
TaskManager * pTaskManager
Definition: XrdClStream.hh:344
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:129
Status Query(uint16_t query, AnyObject &result)
Query the stream.
uint32_t pLastStreamError
Definition: XrdClStream.hh:349
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus RequestClose(Message &resp)
Send close after an open request timed out.
uint16_t pConnectionRetry
Definition: XrdClStream.hh:353
void MonitorDisconnection(XRootDStatus status)
Inform the monitoring about disconnection.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:168
uint64_t pBytesReceived
Definition: XrdClStream.hh:368
void Tick(time_t now)
void ForceError(XRootDStatus status)
Force error.
AnyObject * pChannelData
Definition: XrdClStream.hh:348
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:357
~Stream()
Destructor.
const URL * pUrl
Definition: XrdClStream.hh:339
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
const URL pPrefer
Definition: XrdClStream.hh:340
JobManager * pJobManager
Definition: XrdClStream.hh:345
uint16_t pConnectionCount
Definition: XrdClStream.hh:352
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
void OnReadTimeout(uint16_t subStream)
On read timeout.
std::string pStreamName
Definition: XrdClStream.hh:341
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
SubStreamList pSubStreams
Definition: XrdClStream.hh:356
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:359
uint64_t pBytesSent
Definition: XrdClStream.hh:367
uint16_t pConnectionWindow
Definition: XrdClStream.hh:355
TransportHandler * pTransport
Definition: XrdClStream.hh:342
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:351
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:155
timeval pConnectionDone
Definition: XrdClStream.hh:366
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
void OnFatalError(uint16_t subStream, XRootDStatus status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
std::shared_ptr< Job > pOnDataConnJob
Definition: XrdClStream.hh:373
static bool IsPartial(Message &msg)
Check if message is a partial response.
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:113
uint64_t pSessionId
Definition: XrdClStream.hh:360
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void OnWriteTimeout(uint16_t subStream)
On write timeout.
Definition: XrdClTaskManager.hh:76
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:310
URL representation.
Definition: XrdClURL.hh:31
AddressType
Address type.
Definition: XrdClUtils.hh:98
Request status.
Definition: XrdClXRootDResponses.hh:219
Definition: XrdNetAddr.hh:42
Definition: XrdSysPthread.hh:263
Definition: XrdSysPthread.hh:242
Definition: XrdClAction.hh:34
Definition: XrdClPostMasterInterfaces.hh:269
Procedure execution status.
Definition: XrdClStatus.hh:114