peer_channel.cc (10635B)
1 /* 2 * Copyright 2011 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "examples/peerconnection/server/peer_channel.h" 12 13 #include <algorithm> 14 #include <cstdio> 15 #include <cstdlib> 16 #include <ctime> 17 #include <iterator> 18 #include <string> 19 20 #include "absl/strings/str_cat.h" 21 #include "absl/strings/string_view.h" 22 #include "examples/peerconnection/server/data_socket.h" 23 #include "rtc_base/checks.h" 24 25 // Set to the peer id of the originator when messages are being 26 // exchanged between peers, but set to the id of the receiving peer 27 // itself when notifications are sent from the server about the state 28 // of other peers. 29 // 30 // WORKAROUND: Since support for CORS varies greatly from one browser to the 31 // next, we don't use a custom name for our peer-id header (originally it was 32 // "X-Peer-Id: "). Instead, we use a "simple header", "Pragma" which should 33 // always be exposed to CORS requests. There is a special CORS header devoted 34 // to exposing proprietary headers (Access-Control-Expose-Headers), however 35 // at this point it is not working correctly in some popular browsers. 36 static const char kPeerIdHeader[] = "Pragma: "; 37 38 static const char* kRequestPaths[] = { 39 "/wait", 40 "/sign_out", 41 "/message", 42 }; 43 44 enum RequestPathIndex { 45 kWait, 46 kSignOut, 47 kMessage, 48 }; 49 50 const size_t kMaxNameLength = 512; 51 52 // 53 // ChannelMember 54 // 55 56 int ChannelMember::s_member_id_ = 0; 57 58 ChannelMember::ChannelMember(DataSocket* socket) 59 : waiting_socket_(nullptr), 60 id_(++s_member_id_), 61 connected_(true), 62 timestamp_(time(nullptr)) { 63 RTC_DCHECK(socket); 64 RTC_DCHECK_EQ(socket->method(), DataSocket::GET); 65 RTC_DCHECK(socket->PathEquals("/sign_in")); 66 name_ = socket->request_arguments(); 67 if (name_.empty()) 68 name_ = "peer_" + absl::StrCat(id_); 69 else if (name_.length() > kMaxNameLength) 70 name_.resize(kMaxNameLength); 71 72 std::replace(name_.begin(), name_.end(), ',', '_'); 73 } 74 75 ChannelMember::~ChannelMember() {} 76 77 bool ChannelMember::is_wait_request(DataSocket* ds) const { 78 return ds && ds->PathEquals(kRequestPaths[kWait]); 79 } 80 81 bool ChannelMember::TimedOut() { 82 return waiting_socket_ == nullptr && (time(nullptr) - timestamp_) > 30; 83 } 84 85 std::string ChannelMember::GetPeerIdHeader() const { 86 return kPeerIdHeader + absl::StrCat(id_) + "\r\n"; 87 } 88 89 bool ChannelMember::NotifyOfOtherMember(const ChannelMember& other) { 90 RTC_DCHECK_NE(&other, this); 91 QueueResponse("200 OK", "text/plain", GetPeerIdHeader(), other.GetEntry()); 92 return true; 93 } 94 95 // Returns a string in the form "name,id,connected\n". 96 std::string ChannelMember::GetEntry() const { 97 RTC_DCHECK(name_.length() <= kMaxNameLength); 98 99 // name, 11-digit int, 1-digit bool, newline, null 100 char entry[kMaxNameLength + 15]; 101 snprintf(entry, sizeof(entry), "%s,%d,%d\n", 102 name_.substr(0, kMaxNameLength).c_str(), id_, connected_); 103 return entry; 104 } 105 106 void ChannelMember::ForwardRequestToPeer(DataSocket* ds, ChannelMember* peer) { 107 RTC_DCHECK(peer); 108 RTC_DCHECK(ds); 109 110 std::string extra_headers(GetPeerIdHeader()); 111 112 if (peer == this) { 113 ds->Send("200 OK", true, ds->content_type(), extra_headers, ds->data()); 114 } else { 115 printf("Client %s sending to %s\n", name_.c_str(), peer->name().c_str()); 116 peer->QueueResponse("200 OK", ds->content_type(), extra_headers, 117 ds->data()); 118 ds->Send("200 OK", true, "text/plain", "", ""); 119 } 120 } 121 122 void ChannelMember::OnClosing(DataSocket* ds) { 123 if (ds == waiting_socket_) { 124 waiting_socket_ = nullptr; 125 timestamp_ = time(nullptr); 126 } 127 } 128 129 void ChannelMember::QueueResponse(const std::string& status, 130 const std::string& content_type, 131 const std::string& extra_headers, 132 const std::string& data) { 133 if (waiting_socket_) { 134 RTC_DCHECK(queue_.empty()); 135 RTC_DCHECK_EQ(waiting_socket_->method(), DataSocket::GET); 136 bool ok = 137 waiting_socket_->Send(status, true, content_type, extra_headers, data); 138 if (!ok) { 139 printf("Failed to deliver data to waiting socket\n"); 140 } 141 waiting_socket_ = nullptr; 142 timestamp_ = time(nullptr); 143 } else { 144 QueuedResponse qr; 145 qr.status = status; 146 qr.content_type = content_type; 147 qr.extra_headers = extra_headers; 148 qr.data = data; 149 queue_.push(qr); 150 } 151 } 152 153 void ChannelMember::SetWaitingSocket(DataSocket* ds) { 154 RTC_DCHECK_EQ(ds->method(), DataSocket::GET); 155 if (ds && !queue_.empty()) { 156 RTC_DCHECK(!waiting_socket_); 157 const QueuedResponse& response = queue_.front(); 158 ds->Send(response.status, true, response.content_type, 159 response.extra_headers, response.data); 160 queue_.pop(); 161 } else { 162 waiting_socket_ = ds; 163 } 164 } 165 166 // 167 // PeerChannel 168 // 169 170 // static 171 bool PeerChannel::IsPeerConnection(const DataSocket* ds) { 172 RTC_DCHECK(ds); 173 return (ds->method() == DataSocket::POST && ds->content_length() > 0) || 174 (ds->method() == DataSocket::GET && ds->PathEquals("/sign_in")); 175 } 176 177 ChannelMember* PeerChannel::Lookup(DataSocket* ds) const { 178 RTC_DCHECK(ds); 179 180 if (ds->method() != DataSocket::GET && ds->method() != DataSocket::POST) 181 return nullptr; 182 183 size_t i = 0; 184 for (; i < std::size(kRequestPaths); ++i) { 185 if (ds->PathEquals(kRequestPaths[i])) 186 break; 187 } 188 189 if (i == std::size(kRequestPaths)) 190 return nullptr; 191 192 std::string args(ds->request_arguments()); 193 static constexpr absl::string_view kPeerId = "peer_id="; 194 size_t found = args.find(kPeerId); 195 if (found == std::string::npos) 196 return nullptr; 197 198 int id = atoi(&args[found + kPeerId.size()]); 199 Members::const_iterator iter = members_.begin(); 200 for (; iter != members_.end(); ++iter) { 201 if (id == (*iter)->id()) { 202 if (i == kWait) 203 (*iter)->SetWaitingSocket(ds); 204 if (i == kSignOut) 205 (*iter)->set_disconnected(); 206 return *iter; 207 } 208 } 209 210 return nullptr; 211 } 212 213 ChannelMember* PeerChannel::IsTargetedRequest(const DataSocket* ds) const { 214 RTC_DCHECK(ds); 215 // Regardless of GET or POST, we look for the peer_id parameter 216 // only in the request_path. 217 const std::string& path = ds->request_path(); 218 size_t args = path.find('?'); 219 if (args == std::string::npos) 220 return nullptr; 221 size_t found; 222 static constexpr absl::string_view kTargetPeerIdParam = "to="; 223 do { 224 found = path.find(kTargetPeerIdParam, args); 225 if (found == std::string::npos) 226 return nullptr; 227 if (found == (args + 1) || path[found - 1] == '&') { 228 found += kTargetPeerIdParam.size(); 229 break; 230 } 231 args = found + kTargetPeerIdParam.size(); 232 } while (true); 233 int id = atoi(&path[found]); 234 Members::const_iterator i = members_.begin(); 235 for (; i != members_.end(); ++i) { 236 if ((*i)->id() == id) { 237 return *i; 238 } 239 } 240 return nullptr; 241 } 242 243 bool PeerChannel::AddMember(DataSocket* ds) { 244 RTC_DCHECK(IsPeerConnection(ds)); 245 ChannelMember* new_guy = new ChannelMember(ds); 246 Members failures; 247 BroadcastChangedState(*new_guy, &failures); 248 HandleDeliveryFailures(&failures); 249 members_.push_back(new_guy); 250 251 printf("New member added (total=%zu): %s\n", members_.size(), 252 new_guy->name().c_str()); 253 254 // Let the newly connected peer know about other members of the channel. 255 std::string content_type; 256 std::string response = BuildResponseForNewMember(*new_guy, &content_type); 257 ds->Send("200 Added", true, content_type, new_guy->GetPeerIdHeader(), 258 response); 259 return true; 260 } 261 262 void PeerChannel::CloseAll() { 263 Members::const_iterator i = members_.begin(); 264 for (; i != members_.end(); ++i) { 265 (*i)->QueueResponse("200 OK", "text/plain", "", "Server shutting down"); 266 } 267 DeleteAll(); 268 } 269 270 void PeerChannel::OnClosing(DataSocket* ds) { 271 for (Members::iterator i = members_.begin(); i != members_.end(); ++i) { 272 ChannelMember* m = (*i); 273 m->OnClosing(ds); 274 if (!m->connected()) { 275 i = members_.erase(i); 276 Members failures; 277 BroadcastChangedState(*m, &failures); 278 HandleDeliveryFailures(&failures); 279 delete m; 280 if (i == members_.end()) 281 break; 282 } 283 } 284 printf("Total connected: %zu\n", members_.size()); 285 } 286 287 void PeerChannel::CheckForTimeout() { 288 for (Members::iterator i = members_.begin(); i != members_.end(); ++i) { 289 ChannelMember* m = (*i); 290 if (m->TimedOut()) { 291 printf("Timeout: %s\n", m->name().c_str()); 292 m->set_disconnected(); 293 i = members_.erase(i); 294 Members failures; 295 BroadcastChangedState(*m, &failures); 296 HandleDeliveryFailures(&failures); 297 delete m; 298 if (i == members_.end()) 299 break; 300 } 301 } 302 } 303 304 void PeerChannel::DeleteAll() { 305 for (Members::iterator i = members_.begin(); i != members_.end(); ++i) 306 delete (*i); 307 members_.clear(); 308 } 309 310 void PeerChannel::BroadcastChangedState(const ChannelMember& member, 311 Members* delivery_failures) { 312 // This function should be called prior to DataSocket::Close(). 313 RTC_DCHECK(delivery_failures); 314 315 if (!member.connected()) { 316 printf("Member disconnected: %s\n", member.name().c_str()); 317 } 318 319 Members::iterator i = members_.begin(); 320 for (; i != members_.end(); ++i) { 321 if (&member != (*i)) { 322 if (!(*i)->NotifyOfOtherMember(member)) { 323 (*i)->set_disconnected(); 324 delivery_failures->push_back(*i); 325 i = members_.erase(i); 326 if (i == members_.end()) 327 break; 328 } 329 } 330 } 331 } 332 333 void PeerChannel::HandleDeliveryFailures(Members* failures) { 334 RTC_DCHECK(failures); 335 336 while (!failures->empty()) { 337 Members::iterator i = failures->begin(); 338 ChannelMember* member = *i; 339 RTC_DCHECK(!member->connected()); 340 failures->erase(i); 341 BroadcastChangedState(*member, failures); 342 delete member; 343 } 344 } 345 346 // Builds a simple list of "name,id\n" entries for each member. 347 std::string PeerChannel::BuildResponseForNewMember(const ChannelMember& member, 348 std::string* content_type) { 349 RTC_DCHECK(content_type); 350 351 *content_type = "text/plain"; 352 // The peer itself will always be the first entry. 353 std::string response(member.GetEntry()); 354 for (Members::iterator i = members_.begin(); i != members_.end(); ++i) { 355 if (member.id() != (*i)->id()) { 356 RTC_DCHECK((*i)->connected()); 357 response += (*i)->GetEntry(); 358 } 359 } 360 361 return response; 362 }