boost::asio的http client应用笔记

1 踩过的坑

1.1 io_service

boost::asio::io_service::run()会一直运行到没有任务为止,如果中途调用stop(),则所有等待中的任务会立刻执行。解决方案是用run_one(),即

while (keep_running)
    io_service_.run_one();

keep_running是个bool值,要stop io_service的时候直接置false即可。

1.2 deadline_timer

在调用async_wait()后,无论调用deadline_timer::cancel()还是这个deadline_timer都析构掉,handler都会被触发。当然,这个在文档是有写的。规避野指针的办法有两个,一是传入的handler是shared_ptr,二是再封装一层。后者适用于handler的生命周期无法由自身控制的情况,示例代码请看http client一节的TimerHolder类。

1.3 async*

这个其实和deadline_timer::asyn::wait()差不多,async_readasync_read_until等带async_前缀的函数,只要中途被停止(例如调用ip::tcp::socket::close()),Handler都会被执行并传入一个代表aborted的boost::system::error_code。

1.4 ip::tcp::socket

  1. 官方的example过于简单,反而迷惑人了。HTTP协议说来也算复杂,例如chunked encoding还得自己解析。
  2. 从ip::tcp::resolver得到的可能是多个IP,如果把返回的迭代器交给async_connect,那么很可能出错,应为IP里可能有不合理的地址。比如可能返回的是全0的地址。解决办法参考http client代码的DoResolveAndConnect()函数。
  3. socket的read可能会读到额外的数据,这个文档里有写。

2. http client的应用

封装成了C++类。这是单线程的实现(io_service是同一线程下run的),同步地调用socket函数并用deadline_timer来异步返回数据会更容易控制。
不细说了,请看代码。

// header
#include <assert.h>
#include <string>
#include "boost/asio.hpp"
#include "boost/make_shared.hpp"

class HttpTransaction {
 public:
  explicit HttpTransaction(boost::asio::io_service& io);
  ~HttpTransaction();

  class Delegate {
   public:
    virtual ~Delegate() {}

    virtual void OnResponseReceived(HttpTransaction* transaction,
        const HttpResponse& response) = 0;

    virtual void OnDataReceived(HttpTransaction* transaction,
        const char* data, size_t length) = 0;

    virtual void OnFinished(HttpTransaction* transaction) = 0;

    virtual void OnError(HttpTransaction* transaction, int error_code) = 0;
  };

  virtual void Start(int* out_error_code);

  virtual void Cancel();

  virtual HttpRequest* request() const { return http_request_; }
  virtual void set_request(HttpRequest* request) { http_request_ = request; }

  virtual Delegate* delegate() const { return delegate_; }
  virtual void set_delegate(Delegate* delegate) { delegate_ = delegate; }

 private:
  enum State {
    STATE_NONE,
    STATE_CONNECT,
    STATE_SEND_REQUEST,
    STATE_READ_HEADER,
    STATE_READ_BODY,
    STATE_READ_CHUNK_SIZE,
    STATE_READ_CHUNK_DATA,
    STATE_READ_UNTIL_EOF,
    STATE_CALL_ON_FINISHED,
  };

  void DoLoop();

  bool DoResolveAndConnect();
  bool DoSendRequest();
  bool DoReadHeader();
  bool DoReadChunkSize();
  bool DoReadChunkData();
  bool DoReadBody();
  bool DoReadUntilEof();
  bool DoCallOnFinished();

  void CallOnDataReceived(size_t size, size_t additional_consume_size = 0);

  HttpRequest* http_request_;
  Delegate* delegate_;
  HttpResponse* http_response_;

  boost::asio::ip::tcp::resolver resolver_;
  boost::asio::ip::tcp::socket socket_;
  boost::shared_ptr<boost::asio::streambuf> stream_buf_;

  size_t pending_read_size_;

  State next_state_;

  bool started_;

  class TimerHolder;
  boost::shared_ptr<TimerHolder> timer_holder_;
};

#####################################################################

// implementation
#include <string.h>
#include <assert.h>
#include <algorithm>
#include "boost/bind.hpp"
#include "http_transaction.h"
#include "http_request.h"
#include "http_response_impl.h"
#include "util/url.h"
#include "util/logging.h"

#define kLogTagHttpTrans "[HttpTrans]"

// TimerHolder is needed because the callback is invoked even the timer
// cancelled or deleted.
class HttpTransaction::TimerHolder {
 public:
  TimerHolder(HttpTransaction* trans,
              boost::asio::io_service& io)  // NOLINT
      : trans_(trans),
        timer_(io) {}

  void Schedule() {
    timer_.expires_from_now(boost::posix_time::microseconds(0));
    timer_.async_wait(boost::bind(&TimerHolder::OnTimer, trans_->timer_holder_,
                                  boost::asio::placeholders::error));
  }

  void OnTransactionCancelled() {
    trans_ = NULL;
    timer_.cancel();
  }

 private:
  void OnTimer(const boost::system::error_code& err) {
    if (!err && trans_) {
      trans_->DoLoop();
    }
  }

  HttpTransaction* trans_;
  boost::asio::deadline_timer timer_;
};

HttpTransaction::HttpTransaction(boost::asio::io_service& io)
    : http_request_(NULL),
      delegate_(NULL),
      http_response_(NULL),
      resolver_(io),
      socket_(io),
      stream_buf_(new boost::asio::streambuf),
      pending_read_size_(0),
      next_state_(STATE_NONE),
      started_(false),
      timer_holder_(new TimerHolder(this, io)) {
}

HttpTransaction::~HttpTransaction() {
  Cancel();
}

void HttpTransaction::Start(int* out_error_code) {
  assert(!started_);
  assert(http_request_->HasHeader("host"));
  started_ = true;
  next_state_ = STATE_CONNECT;
  DoLoop();
  *out_error_code = 0;
}

void HttpTransaction::Cancel() {
  next_state_ = STATE_NONE;
  timer_holder_->OnTransactionCancelled();
  socket_.close();
  if (http_response_) {
    delete http_response_;
    http_response_ = NULL;
  }
}

void HttpTransaction::DoLoop() {
  bool rv = false;
  do {
    State state = next_state_;
    next_state_ = STATE_NONE;
    switch (state) {
      case STATE_CONNECT:
        rv = DoResolveAndConnect();
        break;
      case STATE_SEND_REQUEST:
        rv = DoSendRequest();
        break;
      case STATE_READ_HEADER:
        rv = DoReadHeader();
        break;
      case STATE_READ_BODY:
        rv = DoReadBody();
        break;
      case STATE_READ_UNTIL_EOF:
        rv = DoReadUntilEof();
        break;
      case STATE_READ_CHUNK_SIZE:
        rv = DoReadChunkSize();
        break;
      case STATE_READ_CHUNK_DATA:
        rv = DoReadChunkData();
        break;
      case STATE_CALL_ON_FINISHED:
        rv = DoCallOnFinished();
        break;
      default:
        assert(0);
        break;
    }
  } while (rv);
}

bool HttpTransaction::DoResolveAndConnect() {
  URL url(http_request_->url());
  // TODO(liuhx): if url is ip address.
  boost::asio::ip::tcp::resolver::query query(
      url.host(), url.port() == 0 ? url.protocol() : url.port_string());
  boost::system::error_code err;
  boost::asio::ip::tcp::resolver::iterator it = resolver_.resolve(query, err);
  boost::asio::ip::tcp::resolver::iterator end;  // Default is end.
  if (err || it == end) {
    LOG_DEBUG(kLogTagHttpTrans, "resolve error:%s", err.message().c_str());
    delegate_->OnError(this, err.value());
    return false;
  }

  do {
    LOG_INFO(kLogTagHttpTrans, "dns result:%s",
             it->endpoint().address().to_string().c_str());
    // "unspecified" means address is "0.0.0.0". It may appear on some machines
    // running Apache. Please google it for more detail.
    if (!it->endpoint().address().is_unspecified()) {
      socket_.close();
      LOG_INFO(kLogTagHttpTrans, "connecting:%s",
               it->endpoint().address().to_string().c_str());
      socket_.connect(*it, err);
      if (!err)
        break;
    }
    ++it;
  } while (it != end);

  if (err) {
    LOG_DEBUG(kLogTagHttpTrans, "connect error:%s", err.message().c_str());
    delegate_->OnError(this, err.value());
    return false;
  }

  next_state_ = STATE_SEND_REQUEST;
  return true;
}

bool HttpTransaction::DoSendRequest() {
  URL url(http_request_->url());
  std::ostream request_stream(stream_buf_.get());
  request_stream << http_request_->method() << " " << url.path()
                 << " HTTP/1.0\r\n";
  const char* name;
  const char* value;
  void* iter = NULL;
  while (http_request_->EnumerateHeaderLines(&iter, &name, &value))
    request_stream << name << ": " << value << "\r\n";
  request_stream << "\r\n";
  size_t size = 0;
  if (http_request_->body(&value, &size))
    request_stream.write(value, size);

  boost::system::error_code err;
  // boost::asio::write() consumes |stream_buf_|, no need to do it ourselves.
  boost::asio::write(socket_, *stream_buf_, err);
  if (err) {
    LOG_DEBUG(kLogTagHttpTrans, "send request error:%s", err.message().c_str());
    delegate_->OnError(this, err.value());
  } else {
    next_state_ = STATE_READ_HEADER;
    timer_holder_->Schedule();
  }
  return false;
}

bool HttpTransaction::DoReadHeader() {
  boost::system::error_code err;
  boost::asio::read_until(socket_, *stream_buf_, "\r\n\r\n", err);
  if (err) {
    LOG_DEBUG(kLogTagHttpTrans, "read header error:%s", err.message().c_str());
    delegate_->OnError(this, err.value());
    return false;
  }

  size_t size = stream_buf_->size();
  const char* data = boost::asio::buffer_cast<const char*>(stream_buf_->data());
  size_t pos = std::string(data, size).find("\r\n\r\n");
  if (pos == std::string::npos) {
    LOG_DEBUG(kLogTagHttpTrans,
              "Can not find header end. Maybe TCP data Out-of-Order");
    delegate_->OnError(this, 1234);
    return false;
  }
  http_response_ = new HttpResponseImpl(http_request_->url(), data, pos);
  stream_buf_->consume(pos + 4);  // +4 = skip "\r\n\r\n".
  if (http_response_->status_code() < 100) {
    LOG_DEBUG(kLogTagHttpTrans, "Header invalid");
    delegate_->OnError(this, 2345);
    return false;
  }

  if (http_response_->status_code() != 200) {
    next_state_ = STATE_CALL_ON_FINISHED;
  } else {
    const char* content_length = http_response_->GetHeader("content-length");
    if (content_length) {
      pending_read_size_ = atoi(content_length);
      next_state_ = STATE_READ_BODY;
    } else {
      const char* encoding = http_response_->GetHeader("Transfer-Encoding");
      bool isChunk = encoding && (std::string(encoding).find("chunked") !=
                                  std::string::npos);
      if (isChunk) {
        next_state_ = STATE_READ_CHUNK_SIZE;
      } else {
        next_state_ = STATE_READ_UNTIL_EOF;
      }
    }
  }
  timer_holder_->Schedule();

  delegate_->OnResponseReceived(this, *http_response_);
  return false;
}

bool HttpTransaction::DoReadBody() {
  // If content-length exists, the connection may be keep-alive. We MUST keep
  // counting |pending_read_size_| instead of reading until EOF.
  if (pending_read_size_ == 0) {
    delegate_->OnFinished(this);
    return false;
  }

  while (true) {
    // boost may read addtional data beyond the condition in STATE_READ_HEADER,
    // pass left data first if exists.
    size_t size = stream_buf_->size();
    if (size) {
      next_state_ = STATE_READ_BODY;
      timer_holder_->Schedule();
      // TODO(liuhx): assert -> OnError
      assert(pending_read_size_ >= size);
      pending_read_size_ -= size;
      CallOnDataReceived(size);
      break;
    } else {
      boost::system::error_code err;
      boost::asio::read(socket_, *stream_buf_,
                        boost::asio::transfer_at_least(1), err);
      if (err) {
        LOG_DEBUG(kLogTagHttpTrans, "read body error:%s",
                  err.message().c_str());
        delegate_->OnError(this, err.value());
        break;
      }
    }
  }

  return false;
}

// About chunked encoding, refer to http://tools.ietf.org/html/rfc2616#page-25
bool HttpTransaction::DoReadChunkSize() {
  while (true) {
    // boost may read addtional data beyond the condition, find "\r\n" first.
    size_t size = stream_buf_->size();
    if (size) {
      const char* data =
          boost::asio::buffer_cast<const char*>(stream_buf_->data());
      size_t index = std::string(data, size).find("\r\n");
      if (index != std::string::npos) {
        pending_read_size_ = static_cast<size_t>(strtol(data, NULL, 16));
        stream_buf_->consume(index + 2);  // +2 = skip "\r\n"
        if (pending_read_size_ == 0) {
          delegate_->OnFinished(this);
          return false;
        }
        break;
      }
    }

    boost::system::error_code err;
    boost::asio::read_until(socket_, *stream_buf_, "\r\n", err);
    if (err) {
      LOG_DEBUG(kLogTagHttpTrans, "read chunk size error:%s",
                err.message().c_str());
      delegate_->OnError(this, err.value());
      return false;
    }
  }

  next_state_ = STATE_READ_CHUNK_DATA;
  return true;
}

bool HttpTransaction::DoReadChunkData() {
  while (true) {
    size_t size = stream_buf_->size();
    if (size) {
      bool reach_end = size >= pending_read_size_;
      size_t data_size = reach_end ? pending_read_size_ : size;
      pending_read_size_ -= data_size;
      next_state_ = reach_end ? STATE_READ_CHUNK_SIZE : STATE_READ_CHUNK_DATA;
      timer_holder_->Schedule();
      CallOnDataReceived(data_size, reach_end ? 2 : 0);  // 2 = skip "\r\n".
      break;
    } else {
      boost::system::error_code err;
      boost::asio::read_until(socket_, *stream_buf_, "\r\n", err);
      if (err) {
        LOG_DEBUG(kLogTagHttpTrans, "read chunk data error:%s",
                  err.message().c_str());
        delegate_->OnError(this, err.value());
        break;
      }
    }
  }

  return false;
}

bool HttpTransaction::DoReadUntilEof() {
  while (true) {
    size_t size = stream_buf_->size();
    if (size) {
      next_state_ = STATE_READ_UNTIL_EOF;
      timer_holder_->Schedule();
      CallOnDataReceived(size);
      break;
    } else {
      boost::system::error_code err;
      boost::asio::read(socket_, *stream_buf_,
                        boost::asio::transfer_at_least(1), err);
      if (err) {
        if (err == boost::asio::error::eof) {
          delegate_->OnFinished(this);
        } else {
          LOG_DEBUG(kLogTagHttpTrans, "read until eof error:%s",
                    err.message().c_str());
          delegate_->OnError(this, err.value());
        }
        break;
      }
    }
  }

  return false;
}

bool HttpTransaction::DoCallOnFinished() {
  delegate_->OnFinished(this);
  return false;
}

void HttpTransaction::CallOnDataReceived(size_t size,
                                         size_t additional_consume_size) {
  const char* data = boost::asio::buffer_cast<const char*>(stream_buf_->data());
  // Because Delegate may delete HttpTransaction during call back, we MUST NOT
  // access member variable after callback method, instead, use the |buf|.
  boost::shared_ptr<boost::asio::streambuf> buf = stream_buf_;
  delegate_->OnDataReceived(this, data, size);
  buf->consume(size + additional_consume_size);
}

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。