peer_connection_client.cc (15553B)
1 /* 2 * Copyright 2012 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "examples/peerconnection/client/peer_connection_client.h" 12 13 #include <cerrno> 14 #include <cstddef> 15 #include <cstdio> 16 #include <cstdlib> 17 #include <cstring> 18 #include <memory> 19 #include <string> 20 21 #include "api/async_dns_resolver.h" 22 #include "api/task_queue/pending_task_safety_flag.h" 23 #include "api/units/time_delta.h" 24 #include "examples/peerconnection/client/defaults.h" 25 #include "rtc_base/async_dns_resolver.h" 26 #include "rtc_base/checks.h" 27 #include "rtc_base/logging.h" 28 #include "rtc_base/net_helpers.h" 29 #include "rtc_base/socket.h" 30 #include "rtc_base/thread.h" 31 32 namespace { 33 34 // This is our magical hangup signal. 35 constexpr char kByeMessage[] = "BYE"; 36 // Delay between server connection retries, in milliseconds 37 constexpr webrtc::TimeDelta kReconnectDelay = webrtc::TimeDelta::Seconds(2); 38 39 webrtc::Socket* CreateClientSocket(int family) { 40 webrtc::Thread* thread = webrtc::Thread::Current(); 41 RTC_DCHECK(thread != nullptr); 42 return thread->socketserver()->CreateSocket(family, SOCK_STREAM); 43 } 44 45 } // namespace 46 47 PeerConnectionClient::PeerConnectionClient() 48 : callback_(nullptr), 49 resolver_(nullptr), 50 state_(NOT_CONNECTED), 51 my_id_(-1) {} 52 53 PeerConnectionClient::~PeerConnectionClient() = default; 54 55 void PeerConnectionClient::InitSocketSignals() { 56 RTC_DCHECK(control_socket_.get() != nullptr); 57 RTC_DCHECK(hanging_get_.get() != nullptr); 58 control_socket_->SignalCloseEvent.connect(this, 59 &PeerConnectionClient::OnClose); 60 hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose); 61 control_socket_->SignalConnectEvent.connect(this, 62 &PeerConnectionClient::OnConnect); 63 hanging_get_->SignalConnectEvent.connect( 64 this, &PeerConnectionClient::OnHangingGetConnect); 65 control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead); 66 hanging_get_->SignalReadEvent.connect( 67 this, &PeerConnectionClient::OnHangingGetRead); 68 } 69 70 int PeerConnectionClient::id() const { 71 return my_id_; 72 } 73 74 bool PeerConnectionClient::is_connected() const { 75 return my_id_ != -1; 76 } 77 78 const Peers& PeerConnectionClient::peers() const { 79 return peers_; 80 } 81 82 void PeerConnectionClient::RegisterObserver( 83 PeerConnectionClientObserver* callback) { 84 RTC_DCHECK(!callback_); 85 callback_ = callback; 86 } 87 88 void PeerConnectionClient::Connect(const std::string& server, 89 int port, 90 const std::string& client_name) { 91 RTC_DCHECK(!server.empty()); 92 RTC_DCHECK(!client_name.empty()); 93 94 if (state_ != NOT_CONNECTED) { 95 RTC_LOG(LS_WARNING) 96 << "The client must not be connected before you can call Connect()"; 97 callback_->OnServerConnectionFailure(); 98 return; 99 } 100 101 if (server.empty() || client_name.empty()) { 102 callback_->OnServerConnectionFailure(); 103 return; 104 } 105 106 if (port <= 0) 107 port = kDefaultServerPort; 108 109 server_address_.SetIP(server); 110 server_address_.SetPort(port); 111 client_name_ = client_name; 112 113 if (server_address_.IsUnresolvedIP()) { 114 RTC_DCHECK_NE(state_, RESOLVING); 115 RTC_DCHECK(!resolver_); 116 state_ = RESOLVING; 117 resolver_ = std::make_unique<webrtc::AsyncDnsResolver>(); 118 resolver_->Start(server_address_, 119 [this] { OnResolveResult(resolver_->result()); }); 120 } else { 121 DoConnect(); 122 } 123 } 124 125 void PeerConnectionClient::OnResolveResult( 126 const webrtc::AsyncDnsResolverResult& result) { 127 if (result.GetError() != 0) { 128 callback_->OnServerConnectionFailure(); 129 resolver_.reset(); 130 state_ = NOT_CONNECTED; 131 return; 132 } 133 if (!result.GetResolvedAddress(AF_INET, &server_address_)) { 134 callback_->OnServerConnectionFailure(); 135 resolver_.reset(); 136 state_ = NOT_CONNECTED; 137 return; 138 } 139 DoConnect(); 140 } 141 142 void PeerConnectionClient::DoConnect() { 143 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family())); 144 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family())); 145 InitSocketSignals(); 146 char buffer[1024]; 147 snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n", 148 client_name_.c_str()); 149 onconnect_data_ = buffer; 150 151 bool ret = ConnectControlSocket(); 152 if (ret) 153 state_ = SIGNING_IN; 154 if (!ret) { 155 callback_->OnServerConnectionFailure(); 156 } 157 } 158 159 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) { 160 if (state_ != CONNECTED) 161 return false; 162 163 RTC_DCHECK(is_connected()); 164 RTC_DCHECK(control_socket_->GetState() == webrtc::Socket::CS_CLOSED); 165 if (!is_connected() || peer_id == -1) 166 return false; 167 168 char headers[1024]; 169 snprintf(headers, sizeof(headers), 170 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n" 171 "Content-Length: %zu\r\n" 172 "Content-Type: text/plain\r\n" 173 "\r\n", 174 my_id_, peer_id, message.length()); 175 onconnect_data_ = headers; 176 onconnect_data_ += message; 177 return ConnectControlSocket(); 178 } 179 180 bool PeerConnectionClient::SendHangUp(int peer_id) { 181 return SendToPeer(peer_id, kByeMessage); 182 } 183 184 bool PeerConnectionClient::IsSendingMessage() { 185 return state_ == CONNECTED && 186 control_socket_->GetState() != webrtc::Socket::CS_CLOSED; 187 } 188 189 bool PeerConnectionClient::SignOut() { 190 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT) 191 return true; 192 193 if (hanging_get_->GetState() != webrtc::Socket::CS_CLOSED) 194 hanging_get_->Close(); 195 196 if (control_socket_->GetState() == webrtc::Socket::CS_CLOSED) { 197 state_ = SIGNING_OUT; 198 199 if (my_id_ != -1) { 200 char buffer[1024]; 201 snprintf(buffer, sizeof(buffer), 202 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_); 203 onconnect_data_ = buffer; 204 return ConnectControlSocket(); 205 } else { 206 // Can occur if the app is closed before we finish connecting. 207 return true; 208 } 209 } else { 210 state_ = SIGNING_OUT_WAITING; 211 } 212 213 return true; 214 } 215 216 void PeerConnectionClient::Close() { 217 control_socket_->Close(); 218 hanging_get_->Close(); 219 onconnect_data_.clear(); 220 peers_.clear(); 221 resolver_.reset(); 222 my_id_ = -1; 223 state_ = NOT_CONNECTED; 224 } 225 226 bool PeerConnectionClient::ConnectControlSocket() { 227 RTC_DCHECK(control_socket_->GetState() == webrtc::Socket::CS_CLOSED); 228 int err = control_socket_->Connect(server_address_); 229 if (err == SOCKET_ERROR) { 230 Close(); 231 return false; 232 } 233 return true; 234 } 235 236 void PeerConnectionClient::OnConnect(webrtc::Socket* socket) { 237 RTC_DCHECK(!onconnect_data_.empty()); 238 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length()); 239 RTC_DCHECK(sent == onconnect_data_.length()); 240 onconnect_data_.clear(); 241 } 242 243 void PeerConnectionClient::OnHangingGetConnect(webrtc::Socket* socket) { 244 char buffer[1024]; 245 snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", 246 my_id_); 247 int len = static_cast<int>(strlen(buffer)); 248 int sent = socket->Send(buffer, len); 249 RTC_DCHECK(sent == len); 250 } 251 252 void PeerConnectionClient::OnMessageFromPeer(int peer_id, 253 const std::string& message) { 254 if (message.length() == (sizeof(kByeMessage) - 1) && 255 message.compare(kByeMessage) == 0) { 256 callback_->OnPeerDisconnected(peer_id); 257 } else { 258 callback_->OnMessageFromPeer(peer_id, message); 259 } 260 } 261 262 bool PeerConnectionClient::GetHeaderValue(const std::string& data, 263 size_t eoh, 264 const char* header_pattern, 265 size_t* value) { 266 RTC_DCHECK(value != nullptr); 267 size_t found = data.find(header_pattern); 268 if (found != std::string::npos && found < eoh) { 269 *value = atoi(&data[found + strlen(header_pattern)]); 270 return true; 271 } 272 return false; 273 } 274 275 bool PeerConnectionClient::GetHeaderValue(const std::string& data, 276 size_t eoh, 277 const char* header_pattern, 278 std::string* value) { 279 RTC_DCHECK(value != nullptr); 280 size_t found = data.find(header_pattern); 281 if (found != std::string::npos && found < eoh) { 282 size_t begin = found + strlen(header_pattern); 283 size_t end = data.find("\r\n", begin); 284 if (end == std::string::npos) 285 end = eoh; 286 value->assign(data.substr(begin, end - begin)); 287 return true; 288 } 289 return false; 290 } 291 292 bool PeerConnectionClient::ReadIntoBuffer(webrtc::Socket* socket, 293 std::string* data, 294 size_t* content_length) { 295 char buffer[0xffff]; 296 do { 297 int bytes = socket->Recv(buffer, sizeof(buffer), nullptr); 298 if (bytes <= 0) 299 break; 300 data->append(buffer, bytes); 301 } while (true); 302 303 bool ret = false; 304 size_t i = data->find("\r\n\r\n"); 305 if (i != std::string::npos) { 306 RTC_LOG(LS_INFO) << "Headers received"; 307 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) { 308 size_t total_response_size = (i + 4) + *content_length; 309 if (data->length() >= total_response_size) { 310 ret = true; 311 std::string should_close; 312 const char kConnection[] = "\r\nConnection: "; 313 if (GetHeaderValue(*data, i, kConnection, &should_close) && 314 should_close.compare("close") == 0) { 315 socket->Close(); 316 // Since we closed the socket, there was no notification delivered 317 // to us. Compensate by letting ourselves know. 318 OnClose(socket, 0); 319 } 320 } else { 321 // We haven't received everything. Just continue to accept data. 322 } 323 } else { 324 RTC_LOG(LS_ERROR) << "No content length field specified by the server."; 325 } 326 } 327 return ret; 328 } 329 330 void PeerConnectionClient::OnRead(webrtc::Socket* socket) { 331 size_t content_length = 0; 332 if (ReadIntoBuffer(socket, &control_data_, &content_length)) { 333 size_t peer_id = 0, eoh = 0; 334 bool ok = 335 ParseServerResponse(control_data_, content_length, &peer_id, &eoh); 336 if (ok) { 337 if (my_id_ == -1) { 338 // First response. Let's store our server assigned ID. 339 RTC_DCHECK(state_ == SIGNING_IN); 340 my_id_ = static_cast<int>(peer_id); 341 RTC_DCHECK(my_id_ != -1); 342 343 // The body of the response will be a list of already connected peers. 344 if (content_length) { 345 size_t pos = eoh + 4; 346 while (pos < control_data_.size()) { 347 size_t eol = control_data_.find('\n', pos); 348 if (eol == std::string::npos) 349 break; 350 int id = 0; 351 std::string name; 352 bool connected; 353 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id, 354 &connected) && 355 id != my_id_) { 356 peers_[id] = name; 357 callback_->OnPeerConnected(id, name); 358 } 359 pos = eol + 1; 360 } 361 } 362 RTC_DCHECK(is_connected()); 363 callback_->OnSignedIn(); 364 } else if (state_ == SIGNING_OUT) { 365 Close(); 366 callback_->OnDisconnected(); 367 } else if (state_ == SIGNING_OUT_WAITING) { 368 SignOut(); 369 } 370 } 371 372 control_data_.clear(); 373 374 if (state_ == SIGNING_IN) { 375 RTC_DCHECK(hanging_get_->GetState() == webrtc::Socket::CS_CLOSED); 376 state_ = CONNECTED; 377 hanging_get_->Connect(server_address_); 378 } 379 } 380 } 381 382 void PeerConnectionClient::OnHangingGetRead(webrtc::Socket* socket) { 383 RTC_LOG(LS_INFO) << __FUNCTION__; 384 size_t content_length = 0; 385 if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) { 386 size_t peer_id = 0, eoh = 0; 387 bool ok = 388 ParseServerResponse(notification_data_, content_length, &peer_id, &eoh); 389 390 if (ok) { 391 // Store the position where the body begins. 392 size_t pos = eoh + 4; 393 394 if (my_id_ == static_cast<int>(peer_id)) { 395 // A notification about a new member or a member that just 396 // disconnected. 397 int id = 0; 398 std::string name; 399 bool connected = false; 400 if (ParseEntry(notification_data_.substr(pos), &name, &id, 401 &connected)) { 402 if (connected) { 403 peers_[id] = name; 404 callback_->OnPeerConnected(id, name); 405 } else { 406 peers_.erase(id); 407 callback_->OnPeerDisconnected(id); 408 } 409 } 410 } else { 411 OnMessageFromPeer(static_cast<int>(peer_id), 412 notification_data_.substr(pos)); 413 } 414 } 415 416 notification_data_.clear(); 417 } 418 419 if (hanging_get_->GetState() == webrtc::Socket::CS_CLOSED && 420 state_ == CONNECTED) { 421 hanging_get_->Connect(server_address_); 422 } 423 } 424 425 bool PeerConnectionClient::ParseEntry(const std::string& entry, 426 std::string* name, 427 int* id, 428 bool* connected) { 429 RTC_DCHECK(name != nullptr); 430 RTC_DCHECK(id != nullptr); 431 RTC_DCHECK(connected != nullptr); 432 RTC_DCHECK(!entry.empty()); 433 434 *connected = false; 435 size_t separator = entry.find(','); 436 if (separator != std::string::npos) { 437 *id = atoi(&entry[separator + 1]); 438 name->assign(entry.substr(0, separator)); 439 separator = entry.find(',', separator + 1); 440 if (separator != std::string::npos) { 441 *connected = atoi(&entry[separator + 1]) ? true : false; 442 } 443 } 444 return !name->empty(); 445 } 446 447 int PeerConnectionClient::GetResponseStatus(const std::string& response) { 448 int status = -1; 449 size_t pos = response.find(' '); 450 if (pos != std::string::npos) 451 status = atoi(&response[pos + 1]); 452 return status; 453 } 454 455 bool PeerConnectionClient::ParseServerResponse(const std::string& response, 456 size_t content_length, 457 size_t* peer_id, 458 size_t* eoh) { 459 int status = GetResponseStatus(response.c_str()); 460 if (status != 200) { 461 RTC_LOG(LS_ERROR) << "Received error from server"; 462 Close(); 463 callback_->OnDisconnected(); 464 return false; 465 } 466 467 *eoh = response.find("\r\n\r\n"); 468 RTC_DCHECK(*eoh != std::string::npos); 469 if (*eoh == std::string::npos) 470 return false; 471 472 *peer_id = -1; 473 474 // See comment in peer_channel.cc for why we use the Pragma header. 475 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id); 476 477 return true; 478 } 479 480 void PeerConnectionClient::OnClose(webrtc::Socket* socket, int err) { 481 RTC_LOG(LS_INFO) << __FUNCTION__; 482 483 socket->Close(); 484 485 #ifdef WIN32 486 if (err != WSAECONNREFUSED) { 487 #else 488 if (err != ECONNREFUSED) { 489 #endif 490 if (socket == hanging_get_.get()) { 491 if (state_ == CONNECTED) { 492 hanging_get_->Close(); 493 hanging_get_->Connect(server_address_); 494 } 495 } else { 496 callback_->OnMessageSent(err); 497 } 498 } else { 499 if (socket == control_socket_.get()) { 500 RTC_LOG(LS_WARNING) << "Connection refused; retrying in 2 seconds"; 501 webrtc::Thread::Current()->PostDelayedTask( 502 SafeTask(safety_.flag(), [this] { DoConnect(); }), kReconnectDelay); 503 } else { 504 Close(); 505 callback_->OnDisconnected(); 506 } 507 } 508 }