10 #ifndef __PION_TCP_STREAM_HEADER__ 11 #define __PION_TCP_STREAM_HEADER__ 16 #include <boost/bind.hpp> 17 #include <boost/thread/mutex.hpp> 18 #include <boost/thread/condition.hpp> 19 #include <pion/config.hpp> 20 #include <pion/tcp/connection.hpp> 33 :
public std::basic_streambuf<char, std::char_traits<char> >
38 typedef char char_type;
39 typedef std::char_traits<char>::int_type int_type;
40 typedef std::char_traits<char>::off_type off_type;
41 typedef std::char_traits<char>::pos_type pos_type;
42 typedef std::char_traits<char> traits_type;
47 WRITE_BUFFER_SIZE = 8192
57 : m_conn_ptr(conn_ptr), m_read_buf(m_conn_ptr->get_read_buffer().c_array())
69 const bool ssl_flag =
false)
70 : m_conn_ptr(new
connection(io_service, ssl_flag)),
71 m_read_buf(m_conn_ptr->get_read_buffer().c_array())
83 connection::ssl_context_type& ssl_context)
84 : m_conn_ptr(new
connection(io_service, ssl_context)),
85 m_read_buf(m_conn_ptr->get_read_buffer().c_array())
105 setg(m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX);
107 setp(m_write_buf, m_write_buf+(WRITE_BUFFER_SIZE-1));
116 const std::streamsize bytes_to_send = std::streamsize(pptr() - pbase());
117 int_type bytes_sent = 0;
118 if (bytes_to_send > 0) {
119 boost::mutex::scoped_lock async_lock(m_async_mutex);
120 m_bytes_transferred = 0;
121 m_conn_ptr->async_write(boost::asio::buffer(pbase(), bytes_to_send),
122 boost::bind(&stream_buffer::operation_finished,
this,
123 boost::asio::placeholders::error,
124 boost::asio::placeholders::bytes_transferred));
125 m_async_done.wait(async_lock);
126 bytes_sent = m_bytes_transferred;
129 bytes_sent = traits_type::eof();
141 if (gptr() < egptr())
142 return traits_type::to_int_type(*gptr());
145 std::streamsize put_back_num = std::streamsize(gptr() - eback());
146 if (put_back_num > PUT_BACK_MAX)
147 put_back_num = PUT_BACK_MAX;
150 if (put_back_num > 0)
151 memmove(m_read_buf+(PUT_BACK_MAX-put_back_num), gptr()-put_back_num, put_back_num);
156 boost::mutex::scoped_lock async_lock(m_async_mutex);
157 m_bytes_transferred = 0;
158 m_conn_ptr->async_read_some(boost::asio::buffer(m_read_buf+PUT_BACK_MAX,
159 connection::READ_BUFFER_SIZE-PUT_BACK_MAX),
160 boost::bind(&stream_buffer::operation_finished,
this,
161 boost::asio::placeholders::error,
162 boost::asio::placeholders::bytes_transferred));
163 m_async_done.wait(async_lock);
165 return traits_type::eof();
168 setg(m_read_buf+(PUT_BACK_MAX-put_back_num),
169 m_read_buf+PUT_BACK_MAX,
170 m_read_buf+PUT_BACK_MAX+m_bytes_transferred);
173 return traits_type::to_int_type(*gptr());
183 if (! traits_type::eq_int_type(c, traits_type::eof())) {
192 ? traits_type::eof() : traits_type::not_eof(c));
203 virtual std::streamsize
xsputn(
const char_type *s, std::streamsize n) {
204 const std::streamsize bytes_available = std::streamsize(epptr() - pptr());
205 std::streamsize bytes_sent = 0;
206 if (bytes_available >= n) {
208 memcpy(pptr(), s, n);
213 if (bytes_available > 0) {
215 memcpy(pptr(), s, bytes_available);
216 pbump(bytes_available);
221 if ((n-bytes_available) >= (WRITE_BUFFER_SIZE-1)) {
224 boost::mutex::scoped_lock async_lock(m_async_mutex);
225 m_bytes_transferred = 0;
226 m_conn_ptr->async_write(boost::asio::buffer(s+bytes_available,
228 boost::bind(&stream_buffer::operation_finished,
this,
229 boost::asio::placeholders::error,
230 boost::asio::placeholders::bytes_transferred));
231 m_async_done.wait(async_lock);
232 bytes_sent = bytes_available + m_bytes_transferred;
236 memcpy(pbase(), s+bytes_available, n-bytes_available);
237 pbump(n-bytes_available);
252 virtual std::streamsize
xsgetn(char_type *s, std::streamsize n) {
253 std::streamsize bytes_remaining = n;
254 while (bytes_remaining > 0) {
255 const std::streamsize bytes_available = std::streamsize(egptr() - gptr());
256 const std::streamsize bytes_next_read = ((bytes_available >= bytes_remaining)
257 ? bytes_remaining : bytes_available);
259 if (bytes_next_read > 0) {
260 memcpy(s, gptr(), bytes_next_read);
261 gbump(bytes_next_read);
262 bytes_remaining -= bytes_next_read;
263 s += bytes_next_read;
265 if (bytes_remaining > 0) {
267 if (traits_type::eq_int_type(
underflow(), traits_type::eof()))
271 return(n-bytes_remaining);
280 return ((
flush_output() == traits_type::eof()) ? -1 : 0);
287 inline void operation_finished(
const boost::system::error_code& error_code,
288 std::size_t bytes_transferred)
290 boost::mutex::scoped_lock async_lock(m_async_mutex);
291 m_async_error = error_code;
292 m_bytes_transferred = bytes_transferred;
293 m_async_done.notify_one();
298 tcp::connection_ptr m_conn_ptr;
301 boost::mutex m_async_mutex;
304 boost::condition m_async_done;
307 boost::system::error_code m_async_error;
310 std::size_t m_bytes_transferred;
313 char_type * m_read_buf;
316 char_type m_write_buf[WRITE_BUFFER_SIZE];
324 :
public std::basic_iostream<char, std::char_traits<char> >
329 typedef char char_type;
330 typedef std::char_traits<char>::int_type int_type;
331 typedef std::char_traits<char>::off_type off_type;
332 typedef std::char_traits<char>::pos_type pos_type;
333 typedef std::char_traits<char> traits_type;
341 explicit stream(
const tcp::connection_ptr& conn_ptr)
342 :
std::basic_iostream<char,
std::char_traits<char> >(NULL), m_tcp_buf(conn_ptr)
345 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
354 explicit stream(boost::asio::io_service& io_service,
355 const bool ssl_flag =
false)
356 :
std::basic_iostream<char,
std::char_traits<char> >(NULL), m_tcp_buf(io_service, ssl_flag)
359 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
368 stream(boost::asio::io_service& io_service,
369 connection::ssl_context_type& ssl_context)
370 :
std::basic_iostream<char,
std::char_traits<char> >(NULL), m_tcp_buf(io_service, ssl_context)
373 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
384 inline boost::system::error_code
accept(boost::asio::ip::tcp::acceptor& tcp_acceptor)
386 boost::system::error_code ec = m_tcp_buf.get_connection().accept(tcp_acceptor);
387 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_server();
399 inline boost::system::error_code
connect(boost::asio::ip::tcp::endpoint& tcp_endpoint)
401 boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint);
402 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client();
415 inline boost::system::error_code
connect(
const boost::asio::ip::address& remote_addr,
416 const unsigned int remote_port)
418 boost::asio::ip::tcp::endpoint tcp_endpoint(remote_addr, remote_port);
419 boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint);
420 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client();
425 inline void close(
void) { m_tcp_buf.get_connection().close(); }
435 inline bool is_open(
void)
const {
return m_tcp_buf.get_connection().is_open(); }
438 inline bool get_ssl_flag(
void)
const {
return m_tcp_buf.get_connection().get_ssl_flag(); }
442 return m_tcp_buf.get_connection().get_remote_ip();
boost::asio::ip::address get_remote_ip(void) const
returns the client's IP address
virtual int_type overflow(int_type c)
int_type flush_output(void)
bool get_ssl_flag(void) const
returns true if the connection is encrypted using SSL
stream(const tcp::connection_ptr &conn_ptr)
boost::system::error_code accept(boost::asio::ip::tcp::acceptor &tcp_acceptor)
stream_buffer(boost::asio::io_service &io_service, const bool ssl_flag=false)
void setup_buffers(void)
sets up the read and write buffers for input and output
bool is_open(void) const
returns true if the connection is currently open
stream(boost::asio::io_service &io_service, const bool ssl_flag=false)
stream_buffer(boost::asio::io_service &io_service, connection::ssl_context_type &ssl_context)
stream(boost::asio::io_service &io_service, connection::ssl_context_type &ssl_context)
const connection & get_connection(void) const
returns a const reference to the current TCP connection
virtual std::streamsize xsputn(const char_type *s, std::streamsize n)
virtual ~stream_buffer()
virtual destructor flushes the write buffer
connection & get_connection(void)
returns a reference to the current TCP connection
boost::system::error_code connect(boost::asio::ip::tcp::endpoint &tcp_endpoint)
void close(void)
closes the tcp connection
virtual int_type underflow(void)
stream_buffer * rdbuf(void)
returns a pointer to the stream buffer in use
virtual int_type sync(void)
virtual std::streamsize xsgetn(char_type *s, std::streamsize n)
boost::system::error_code connect(const boost::asio::ip::address &remote_addr, const unsigned int remote_port)
stream_buffer(const tcp::connection_ptr &conn_ptr)