web socket RFC6455 connection --asio C++11
#ifndef __APP_WEBSOCKET_CONNECTION_H__ #define __APP_WEBSOCKET_CONNECTION_H__ #include <asio.hpp> #include "tcp_connection.hpp" class websocket_connection : public tcp_connection { public: websocket_connection( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s ); std::tr1::shared_ptr<asio::ip::tcp::socket> get_socket() ; //握手应答后调用该方法 void start_read(); ~websocket_connection(); //buf为payload数据,write方法内部自动加上web socket frame的头部. void write( const std::tr1::shared_ptr<buffer> & buf); //服务器接受到连接时,调用该方法执行握手过程 void start_handshake(); //设置接收到消息的回调方法 void set_message_handler(const OnMessage & on_message); protected: //读取web socket frame 的最小长度值 void read_min_head(); //读取剩余的数据、在一个比较大的包中,可能会出现多次调用该方法才读到一个完整的包 void read_leave_data( int leave_len ); private: shared_ptr<buffer> read_buf; OnMessage on_message_; }; #endif
websocket_connection::websocket_connection( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s ):tcp_connection(s){ read_buf = get_buffer(8192); read_buf->size(8192); } std::tr1::shared_ptr<asio::ip::tcp::socket> websocket_connection:: get_socket() { return socket_;} void websocket_connection:: start_read(){ //从包头读取 read_min_head(); } websocket_connection::~websocket_connection(){ LOG(ERROR)<<"~websocket_connection"; } void websocket_connection::write( const std::tr1::shared_ptr<buffer> & buf){ websocket_frame frame; frame.fin = true; frame.mask = true;//使用mask、某些浏览器版本较低不能使用 frame.opcode = 0x02;//发送的是二进制数据 buf->offset(4); frame.payload = buf; frame.package_size(); if( frame.payload_len == 127 ){ LOG(ERROR)<<"ignore big message.."<<buf->capacity(); return; } shared_ptr<buffer> frame_buf = frame.package(); tcp_connection::write(frame_buf); } void websocket_connection::read_min_head(){ auto buf = asio::buffer( read_buf->data(),websocket_frame::fix_min_len); auto self(shared_from_this()); auto read_handler = [self,this](std::error_code ec, std::size_t length){ if( ec ){ LOG(ERROR)<<"read_min_head error:"<<ec.message(); if( on_error_ ){ on_error_(ec.message()); } }else{ read_buf->size(length); websocket_frame frame; int leave_len = frame.unpakcage(read_buf); //读取需要解析一个frame所需要的剩余数据 read_leave_data(leave_len); } }; //asio 异步读取指定长度的数据,读取成功后回调read_handler方法. asio::async_read(*socket_, buf,read_handler); } void websocket_connection::read_leave_data( int leave_len ){ if( read_buf->length() + leave_len > read_buf->capacity() ){ if( on_error_ ){ on_error_("data to big..."); } return; } auto buf = asio::buffer( read_buf->data()+read_buf->length(),leave_len); auto self( shared_from_this()); auto read_handler =[self,this](std::error_code ec, std::size_t length){ if( ec ){ if( on_error_ ){ on_error_(ec.message()); } }else{ read_buf->size(read_buf->length() + length); websocket_frame frame; int leave_len = frame.unpakcage(read_buf); if( leave_len == 0 ){ //读取到一个完整的freame了... if( on_message_ ){ on_message_(frame.payload); } read_buf->size(0); start_read(); }else{ //如果不够、则继续读取剩余的字节. read_leave_data(leave_len); } } }; asio::async_read(*socket_, buf,read_handler); } void websocket_connection::start_handshake(){ shared_ptr<buffer> buf = get_buffer(512); auto asio_buf = asio::buffer( buf->data(), 512); auto self(shared_from_this()); socket_->async_read_some( asio_buf, [self,this,buf]( const asio::error_code& error, std::size_t bytes_transferred ){ if( error ){ if( on_error_ ){ on_error_(error.message()); } recycle_buffer(buf); return; } std::string data((const char*)buf->data(),bytes_transferred); recycle_buffer(buf); std::string key="Sec-WebSocket-Key:"; auto pos = data.find(key); auto posEnd = data.find("\r\n",pos); auto value = data.substr(pos + key.length(),posEnd - (pos + key.length())); std::string sha1Src = trim(value); sha1Src += std::string("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); unsigned char sha1out[20]; sha1((const unsigned char *)sha1Src.c_str(),sha1Src.length(),sha1out); std::vector<unsigned char> data64; for( auto c: sha1out) data64.push_back(c); std::ostringstream os_rsp; os_rsp<<"HTTP/1.1 101 Switching Protocols\r\n" <<"Upgrade: websocket\r\n" <<"Connection: Upgrade\r\n" <<"Sec-WebSocket-Accept: "<<base64Encode(data64)<<"=\r\n" <<"\r\n"; std::string rsp = os_rsp.str(); socket_->async_send(asio::buffer(rsp),[self,this](const asio::error_code& ec, std::size_t bytes_transferred){ }); start_read();//握手应答后,启动对web socke frame的读 }); } void websocket_connection::set_message_handler(const OnMessage & on_message){ on_message_ = on_message; }
void init_service_admin_websocket( asio::io_service & io){ static std::once_flag init_flag; std::call_once( init_flag,[&io]{ auto new_connected_handler = []( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s) {//新连接到来时的回调方法 shared_ptr<websocket_connection> websocket( new websocket_connection(s)); //设置错误发生时的回调方法 auto on_error =[websocket](const std::string & error ){//主要清理资源 LOG(ERROR)<<error<<" "<<websocket->name(); sessions::clear_session(websocket); websocket.reset(); }; websocket->set_error_handler(on_error); std::tr1::weak_ptr<websocket_connection> wpsocket(websocket); //设置接收到消息时的回调方法,websocket frame的payload auto on_message = [wpsocket]( const std::tr1::shared_ptr<buffer> & buf) { //消息分发 }; websocket->set_message_handler(on_message); websocket->start_handshake();//启动握手 }; unsigned int port = 80; tcp_server * srv (new tcp_server(io, port , new_connected_handler)); srv->start_accept(); }); }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。