DataChannelUsrsctp.cpp (54757B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #if !defined(__Userspace_os_Windows) 8 # include <arpa/inet.h> 9 #endif 10 // usrsctp.h expects to have errno definitions prior to its inclusion. 11 #include <errno.h> 12 13 #define SCTP_DEBUG 1 14 #define SCTP_STDINT_INCLUDE <stdint.h> 15 16 #ifdef _MSC_VER 17 // Disable "warning C4200: nonstandard extension used : zero-sized array in 18 // struct/union" 19 // ...which the third-party file usrsctp.h runs afoul of. 20 # pragma warning(push) 21 # pragma warning(disable : 4200) 22 #endif 23 24 #include "usrsctp.h" 25 26 #ifdef _MSC_VER 27 # pragma warning(pop) 28 #endif 29 30 #include "mozilla/media/MediaUtils.h" 31 #ifdef MOZ_PEERCONNECTION 32 # include "transport/runnable_utils.h" 33 #endif 34 35 #include "DataChannelUsrsctp.h" 36 #include "DataChannelLog.h" 37 38 namespace mozilla { 39 40 static LazyLogModule gSCTPLog("usrsctp"); 41 42 #define SCTP_LOG(args) \ 43 MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args) 44 45 static void debug_printf(const char* format, ...) { 46 va_list ap; 47 char buffer[1024]; 48 49 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { 50 va_start(ap, format); 51 #ifdef _WIN32 52 if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) { 53 #else 54 if (VsprintfLiteral(buffer, format, ap) > 0) { 55 #endif 56 SCTP_LOG(("%s", buffer)); 57 } 58 va_end(ap); 59 } 60 } 61 62 class DataChannelRegistry { 63 public: 64 static uintptr_t Register(DataChannelConnectionUsrsctp* aConnection) { 65 StaticMutexAutoLock lock(sInstanceMutex); 66 uintptr_t result = EnsureInstance()->RegisterImpl(aConnection); 67 DC_DEBUG( 68 ("Registering connection %p as ulp %p", aConnection, (void*)result)); 69 return result; 70 } 71 72 static void Deregister(uintptr_t aId) { 73 std::unique_ptr<DataChannelRegistry> maybeTrash; 74 75 { 76 StaticMutexAutoLock lock(sInstanceMutex); 77 DC_DEBUG(("Deregistering connection ulp = %p", (void*)aId)); 78 if (NS_WARN_IF(!Instance())) { 79 return; 80 } 81 Instance()->DeregisterImpl(aId); 82 if (Instance()->Empty()) { 83 // Unset singleton inside mutex lock, but don't call Shutdown until we 84 // unlock, since that involves calling into libusrsctp, which invites 85 // deadlock. 86 maybeTrash = std::move(Instance()); 87 } 88 } 89 } 90 91 static RefPtr<DataChannelConnectionUsrsctp> Lookup(uintptr_t aId) { 92 StaticMutexAutoLock lock(sInstanceMutex); 93 if (NS_WARN_IF(!Instance())) { 94 return nullptr; 95 } 96 return Instance()->LookupImpl(aId); 97 } 98 99 virtual ~DataChannelRegistry() { 100 MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); 101 102 if (NS_WARN_IF(!mConnections.empty())) { 103 MOZ_DIAGNOSTIC_CRASH("mConnections not empty"); 104 mConnections.clear(); 105 } 106 107 MOZ_DIAGNOSTIC_ASSERT(!Instance()); 108 DeinitUsrSctp(); 109 } 110 111 private: 112 // This is a singleton class, so don't let just anyone create one of these 113 DataChannelRegistry() { 114 MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); 115 mShutdownBlocker = media::ShutdownBlockingTicket::Create( 116 u"DataChannelRegistry::mShutdownBlocker"_ns, 117 NS_LITERAL_STRING_FROM_CSTRING(__FILE__), __LINE__); 118 MOZ_DIAGNOSTIC_ASSERT(!Instance()); 119 InitUsrSctp(); 120 } 121 122 static std::unique_ptr<DataChannelRegistry>& Instance() { 123 static std::unique_ptr<DataChannelRegistry> sRegistry; 124 return sRegistry; 125 } 126 127 static std::unique_ptr<DataChannelRegistry>& EnsureInstance() { 128 MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); 129 if (!Instance()) { 130 Instance().reset(new DataChannelRegistry()); 131 } 132 return Instance(); 133 } 134 135 uintptr_t RegisterImpl(DataChannelConnectionUsrsctp* aConnection) { 136 MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); 137 mConnections.emplace(mNextId, aConnection); 138 return mNextId++; 139 } 140 141 void DeregisterImpl(uintptr_t aId) { 142 MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); 143 size_t removed = mConnections.erase(aId); 144 (void)removed; 145 MOZ_DIAGNOSTIC_ASSERT(removed); 146 } 147 148 bool Empty() const { return mConnections.empty(); } 149 150 RefPtr<DataChannelConnectionUsrsctp> LookupImpl(uintptr_t aId) { 151 auto it = mConnections.find(aId); 152 if (NS_WARN_IF(it == mConnections.end())) { 153 DC_DEBUG(("Can't find connection ulp %p", (void*)aId)); 154 return nullptr; 155 } 156 return it->second; 157 } 158 159 static int SendSctpPacket(void* addr, void* buffer, size_t length, 160 uint8_t tos, uint8_t set_df) { 161 uintptr_t id = reinterpret_cast<uintptr_t>(addr); 162 RefPtr<DataChannelConnectionUsrsctp> connection = 163 DataChannelRegistry::Lookup(id); 164 if (NS_WARN_IF(!connection) || connection->InShutdown()) { 165 return 0; 166 } 167 return connection->SendSctpPacket(static_cast<uint8_t*>(buffer), length); 168 } 169 170 void InitUsrSctp() { 171 MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); 172 #ifndef MOZ_PEERCONNECTION 173 MOZ_CRASH("Trying to use SCTP/DTLS without dom/media/webrtc/transport"); 174 #endif 175 176 DC_DEBUG(("Calling usrsctp_init %p", this)); 177 178 MOZ_DIAGNOSTIC_ASSERT(!sInitted); 179 usrsctp_init(0, DataChannelRegistry::SendSctpPacket, debug_printf); 180 sInitted = true; 181 182 // Set logging to SCTP:LogLevel::Debug to get SCTP debugs 183 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { 184 usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); 185 } 186 187 // Do not send ABORTs in response to INITs (1). 188 // Do not send ABORTs for received Out of the Blue packets (2). 189 usrsctp_sysctl_set_sctp_blackhole(2); 190 191 // Disable the Explicit Congestion Notification extension (currently not 192 // supported by the Firefox code) 193 usrsctp_sysctl_set_sctp_ecn_enable(0); 194 195 // Enable interleaving messages for different streams (incoming) 196 // See: https://tools.ietf.org/html/rfc6458#section-8.1.20 197 usrsctp_sysctl_set_sctp_default_frag_interleave(2); 198 199 // Disabling authentication and dynamic address reconfiguration as neither 200 // of them are used for data channel and only result in additional code 201 // paths being used. 202 usrsctp_sysctl_set_sctp_asconf_enable(0); 203 usrsctp_sysctl_set_sctp_auth_enable(0); 204 205 // Disable this redundant limit. rwnd is what ought to be used for this 206 usrsctp_sysctl_set_sctp_max_chunks_on_queue( 207 std::numeric_limits<uint32_t>::max()); 208 } 209 210 void DeinitUsrSctp() { 211 MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); 212 MOZ_DIAGNOSTIC_ASSERT(sInitted); 213 DC_DEBUG(("Calling usrsctp_finish %p", this)); 214 usrsctp_finish(); 215 sInitted = false; 216 } 217 218 uintptr_t mNextId = 1; 219 std::map<uintptr_t, RefPtr<DataChannelConnectionUsrsctp>> mConnections; 220 UniquePtr<media::ShutdownBlockingTicket> mShutdownBlocker; 221 static StaticMutex sInstanceMutex MOZ_UNANNOTATED; 222 static bool sInitted; 223 }; 224 225 bool DataChannelRegistry::sInitted = false; 226 227 StaticMutex DataChannelRegistry::sInstanceMutex; 228 229 static int receive_cb(struct socket* sock, union sctp_sockstore addr, 230 void* data, size_t datalen, struct sctp_rcvinfo rcv, 231 int flags, void* ulp_info) { 232 DC_DEBUG(("In receive_cb, ulp_info=%p", ulp_info)); 233 uintptr_t id = reinterpret_cast<uintptr_t>(ulp_info); 234 RefPtr<DataChannelConnectionUsrsctp> connection = 235 DataChannelRegistry::Lookup(id); 236 if (!connection) { 237 // Unfortunately, we can get callbacks after calling 238 // usrsctp_close(socket), so we need to simply ignore them if we've 239 // already killed the DataChannelConnection object 240 DC_DEBUG(( 241 "Ignoring receive callback for terminated Connection ulp=%p, %zu bytes", 242 ulp_info, datalen)); 243 return 0; 244 } 245 return connection->ReceiveCallback(sock, data, datalen, rcv, flags); 246 } 247 248 static RefPtr<DataChannelConnectionUsrsctp> GetConnectionFromSocket( 249 struct socket* sock) { 250 struct sockaddr* addrs = nullptr; 251 int naddrs = usrsctp_getladdrs(sock, 0, &addrs); 252 if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) { 253 return nullptr; 254 } 255 // usrsctp_getladdrs() returns the addresses bound to this socket, which 256 // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer, 257 // then free the list of addresses once we have the pointer. We only open 258 // AF_CONN sockets, and they should all have the sconn_addr set to the 259 // pointer that created them, so [0] is as good as any other. 260 struct sockaddr_conn* sconn = 261 reinterpret_cast<struct sockaddr_conn*>(&addrs[0]); 262 uintptr_t id = reinterpret_cast<uintptr_t>(sconn->sconn_addr); 263 RefPtr<DataChannelConnectionUsrsctp> connection = 264 DataChannelRegistry::Lookup(id); 265 usrsctp_freeladdrs(addrs); 266 267 return connection; 268 } 269 270 // Called when the buffer empties to the threshold value. This is called 271 // from OnSctpPacketReceived() through the sctp stack. 272 int DataChannelConnectionUsrsctp::OnThresholdEvent(struct socket* sock, 273 uint32_t sb_free, 274 void* ulp_info) { 275 RefPtr<DataChannelConnectionUsrsctp> connection = 276 GetConnectionFromSocket(sock); 277 if (connection) { 278 connection->SendDeferredMessages(); 279 } else { 280 DC_ERROR(("Can't find connection for socket %p", sock)); 281 } 282 return 0; 283 } 284 285 DataChannelConnectionUsrsctp::~DataChannelConnectionUsrsctp() { 286 MOZ_ASSERT(!mSocket); 287 } 288 289 void DataChannelConnectionUsrsctp::Destroy() { 290 // Though it's probably ok to do this and close the sockets; 291 // if we really want it to do true clean shutdowns it can 292 // create a dependant Internal object that would remain around 293 // until the network shut down the association or timed out. 294 MOZ_ASSERT(NS_IsMainThread()); 295 DataChannelConnection::Destroy(); 296 297 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED 298 auto self = DataChannelRegistry::Lookup(mId); 299 MOZ_DIAGNOSTIC_ASSERT(self); 300 MOZ_DIAGNOSTIC_ASSERT(this == self.get()); 301 #endif 302 // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed, 303 // the usrsctp_close() calls can move back here (and just proxy the 304 // disconnect_all()) 305 RUN_ON_THREAD(mSTS, 306 WrapRunnable(RefPtr<DataChannelConnectionUsrsctp>(this), 307 &DataChannelConnectionUsrsctp::DestroyOnSTS), 308 NS_DISPATCH_NORMAL); 309 310 // All existing callbacks have refs to DataChannelConnection - however, 311 // we need to handle their destroying the object off mainthread/STS 312 313 // nsDOMDataChannel objects have refs to DataChannels that have refs to us 314 } 315 316 void DataChannelConnectionUsrsctp::DestroyOnSTS() { 317 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 318 319 if (mSocket) usrsctp_close(mSocket); 320 mSocket = nullptr; 321 322 usrsctp_deregister_address(reinterpret_cast<void*>(mId)); 323 DC_DEBUG( 324 ("Deregistered %p from the SCTP stack.", reinterpret_cast<void*>(mId))); 325 326 // We do this at the very last because it might tear down usrsctp, and we 327 // don't want that to happen before the usrsctp_close call above 328 Dispatch(NS_NewRunnableFunction( 329 "DataChannelConnection::Destroy", 330 [this, self = RefPtr<DataChannelConnection>(this)]() { 331 DataChannelRegistry::Deregister(mId); 332 })); 333 } 334 335 DataChannelConnectionUsrsctp::DataChannelConnectionUsrsctp( 336 DataChannelConnection::DataConnectionListener* aListener, 337 nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler) 338 : DataChannelConnection(aListener, aTarget, aHandler) {} 339 340 bool DataChannelConnectionUsrsctp::Init(const uint16_t aLocalPort, 341 const uint16_t aNumStreams) { 342 MOZ_ASSERT(NS_IsMainThread()); 343 344 struct sctp_initmsg initmsg = {}; 345 struct sctp_assoc_value av = {}; 346 struct sctp_event event = {}; 347 socklen_t len; 348 349 uint16_t event_types[] = { 350 SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE, 351 SCTP_REMOTE_ERROR, SCTP_SHUTDOWN_EVENT, 352 SCTP_ADAPTATION_INDICATION, SCTP_PARTIAL_DELIVERY_EVENT, 353 SCTP_SEND_FAILED_EVENT, SCTP_STREAM_RESET_EVENT, 354 SCTP_STREAM_CHANGE_EVENT}; 355 356 mId = DataChannelRegistry::Register(this); 357 358 socklen_t buf_size = 1024 * 1024; 359 360 // Open sctp with a callback 361 if ((mSocket = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, 362 &DataChannelConnectionUsrsctp::OnThresholdEvent, 363 usrsctp_sysctl_get_sctp_sendspace() / 2, 364 reinterpret_cast<void*>(mId))) == nullptr) { 365 goto error_cleanup; 366 } 367 368 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_RCVBUF, (const void*)&buf_size, 369 sizeof(buf_size)) < 0) { 370 DC_ERROR(("Couldn't change receive buffer size on SCTP socket")); 371 goto error_cleanup; 372 } 373 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_SNDBUF, (const void*)&buf_size, 374 sizeof(buf_size)) < 0) { 375 DC_ERROR(("Couldn't change send buffer size on SCTP socket")); 376 goto error_cleanup; 377 } 378 379 // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking 380 // in associations for normal IO 381 if (usrsctp_set_non_blocking(mSocket, 1) < 0) { 382 DC_ERROR(("Couldn't set non_blocking on SCTP socket")); 383 // We can't handle connect() safely if it will block, not that this will 384 // even happen. 385 goto error_cleanup; 386 } 387 388 // Make sure when we close the socket, make sure it doesn't call us back 389 // again! This would cause it try to use an invalid DataChannelConnection 390 // pointer 391 struct linger l; 392 l.l_onoff = 1; 393 l.l_linger = 0; 394 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, (const void*)&l, 395 (socklen_t)sizeof(struct linger)) < 0) { 396 DC_ERROR(("Couldn't set SO_LINGER on SCTP socket")); 397 // unsafe to allow it to continue if this fails 398 goto error_cleanup; 399 } 400 401 // XXX Consider disabling this when we add proper SDP negotiation. 402 // We may want to leave enabled for supporting 'cloning' of SDP offers, which 403 // implies re-use of the same pseudo-port number, or forcing a renegotiation. 404 { 405 const int option_value = 1; 406 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_REUSE_PORT, 407 (const void*)&option_value, 408 (socklen_t)sizeof(option_value)) < 0) { 409 DC_WARN(("Couldn't set SCTP_REUSE_PORT on SCTP socket")); 410 } 411 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_NODELAY, 412 (const void*)&option_value, 413 (socklen_t)sizeof(option_value)) < 0) { 414 DC_WARN(("Couldn't set SCTP_NODELAY on SCTP socket")); 415 } 416 } 417 418 // Set explicit EOR 419 { 420 const int option_value = 1; 421 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, 422 (const void*)&option_value, 423 (socklen_t)sizeof(option_value)) < 0) { 424 DC_ERROR(("*** failed to enable explicit EOR mode %d", errno)); 425 goto error_cleanup; 426 } 427 } 428 429 // Enable ndata 430 av.assoc_id = SCTP_FUTURE_ASSOC; 431 av.assoc_value = 1; 432 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, 433 &av, (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { 434 DC_ERROR(("*** failed enable ndata errno %d", errno)); 435 goto error_cleanup; 436 } 437 438 av.assoc_id = SCTP_ALL_ASSOC; 439 av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; 440 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, 441 (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { 442 DC_ERROR(("*** failed enable stream reset errno %d", errno)); 443 goto error_cleanup; 444 } 445 446 /* Enable the events of interest. */ 447 event.se_assoc_id = SCTP_ALL_ASSOC; 448 event.se_on = 1; 449 for (unsigned short event_type : event_types) { 450 event.se_type = event_type; 451 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_EVENT, &event, 452 sizeof(event)) < 0) { 453 DC_ERROR(("*** failed setsockopt SCTP_EVENT errno %d", errno)); 454 goto error_cleanup; 455 } 456 } 457 458 len = sizeof(initmsg); 459 if (usrsctp_getsockopt(mSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 460 0) { 461 DC_ERROR(("*** failed getsockopt SCTP_INITMSG")); 462 goto error_cleanup; 463 } 464 DC_DEBUG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, 465 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); 466 initmsg.sinit_num_ostreams = aNumStreams; 467 initmsg.sinit_max_instreams = MAX_NUM_STREAMS; 468 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, 469 (socklen_t)sizeof(initmsg)) < 0) { 470 DC_ERROR(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); 471 goto error_cleanup; 472 } 473 474 mSTS->Dispatch( 475 NS_NewRunnableFunction("DataChannelConnection::Init", [id = mId]() { 476 usrsctp_register_address(reinterpret_cast<void*>(id)); 477 DC_DEBUG(("Registered %p within the SCTP stack.", 478 reinterpret_cast<void*>(id))); 479 })); 480 481 return true; 482 483 error_cleanup: 484 usrsctp_close(mSocket); 485 mSocket = nullptr; 486 DataChannelRegistry::Deregister(mId); 487 return false; 488 } 489 490 void DataChannelConnectionUsrsctp::OnTransportReady() { 491 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 492 DC_DEBUG(("dtls open")); 493 if (mSctpConfigured) { 494 // mSocket could have been closed by an error or for some other reason, 495 // don't open an opportunity to reinit. 496 return; 497 } 498 499 mSctpConfigured = true; 500 501 struct sockaddr_conn addr = {}; 502 addr.sconn_family = AF_CONN; 503 #if defined(__Userspace_os_Darwin) 504 addr.sconn_len = sizeof(addr); 505 #endif 506 addr.sconn_port = htons(mLocalPort); 507 addr.sconn_addr = reinterpret_cast<void*>(mId); 508 509 DC_DEBUG(("Calling usrsctp_bind")); 510 int r = usrsctp_bind(mSocket, reinterpret_cast<struct sockaddr*>(&addr), 511 sizeof(addr)); 512 if (r < 0) { 513 DC_ERROR(("usrsctp_bind failed: %d", r)); 514 } else { 515 // This is the remote addr 516 addr.sconn_port = htons(mRemotePort); 517 DC_DEBUG(("Calling usrsctp_connect")); 518 r = usrsctp_connect(mSocket, reinterpret_cast<struct sockaddr*>(&addr), 519 sizeof(addr)); 520 if (r >= 0 || errno == EINPROGRESS) { 521 struct sctp_paddrparams paddrparams = {}; 522 socklen_t opt_len; 523 524 memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn)); 525 opt_len = (socklen_t)sizeof(struct sctp_paddrparams); 526 r = usrsctp_getsockopt(mSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, 527 &paddrparams, &opt_len); 528 if (r < 0) { 529 DC_ERROR(("usrsctp_getsockopt failed: %d", r)); 530 } else { 531 // This field is misnamed. |spp_pathmtu| represents the maximum 532 // _payload_ size in libusrsctp. So: 533 // 1280 (a reasonable IPV6 MTU according to RFC 8831) 534 // -12 (sctp header) 535 // -24 (GCM sipher) 536 // -13 (DTLS record header) 537 // -8 (UDP header) 538 // -4 (TURN ChannelData) 539 // -40 (IPV6 header) 540 // = 1179 541 // We could further restrict this, because RFC 8831 suggests a starting 542 // IPV4 path MTU of 1200, which would lead to a value of 1115. 543 // I suspect that in practice the path MTU for IPV4 is substantially 544 // larger than 1200. 545 paddrparams.spp_pathmtu = 1179; 546 paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE; 547 paddrparams.spp_flags |= SPP_PMTUD_DISABLE; 548 opt_len = (socklen_t)sizeof(struct sctp_paddrparams); 549 r = usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, 550 &paddrparams, opt_len); 551 if (r < 0) { 552 DC_ERROR(("usrsctp_getsockopt failed: %d", r)); 553 } else { 554 DC_ERROR(("usrsctp: PMTUD disabled, MTU set to %u", 555 paddrparams.spp_pathmtu)); 556 } 557 } 558 } 559 if (r < 0) { 560 if (errno == EINPROGRESS) { 561 // non-blocking 562 return; 563 } 564 DC_ERROR(("usrsctp_connect failed: %d", errno)); 565 SetState(DataChannelConnectionState::Closed); 566 } else { 567 // We fire ON_CONNECTION via SCTP_COMM_UP when we get that 568 return; 569 } 570 } 571 } 572 573 void DataChannelConnectionUsrsctp::OnSctpPacketReceived( 574 const MediaPacket& packet) { 575 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 576 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { 577 char* buf; 578 579 if ((buf = usrsctp_dumppacket((void*)packet.data(), packet.len(), 580 SCTP_DUMP_INBOUND)) != nullptr) { 581 SCTP_LOG(("%s", buf)); 582 usrsctp_freedumpbuffer(buf); 583 } 584 } 585 // Pass the data to SCTP 586 usrsctp_conninput(reinterpret_cast<void*>(mId), packet.data(), packet.len(), 587 0); 588 } 589 590 int DataChannelConnectionUsrsctp::SendSctpPacket(const uint8_t* buffer, 591 size_t length) { 592 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { 593 char* buf; 594 595 if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != 596 nullptr) { 597 SCTP_LOG(("%s", buf)); 598 usrsctp_freedumpbuffer(buf); 599 } 600 } 601 602 std::unique_ptr<MediaPacket> packet(new MediaPacket); 603 packet->SetType(MediaPacket::SCTP); 604 packet->Copy(static_cast<const uint8_t*>(buffer), length); 605 606 SendPacket(std::move(packet)); 607 return 0; // cheat! Packets can always be dropped later anyways 608 } 609 610 uint32_t DataChannelConnectionUsrsctp::UpdateCurrentStreamIndex() { 611 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 612 RefPtr<DataChannel> channel = mChannels.GetNextChannel(mCurrentStream); 613 if (!channel) { 614 mCurrentStream = 0; 615 } else { 616 mCurrentStream = channel->mStream; 617 } 618 return mCurrentStream; 619 } 620 621 uint32_t DataChannelConnectionUsrsctp::GetCurrentStreamIndex() { 622 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 623 if (!mChannels.Get(mCurrentStream)) { 624 // The stream muse have been removed, reset 625 DC_DEBUG(("Reset mCurrentChannel")); 626 mCurrentStream = 0; 627 } 628 return mCurrentStream; 629 } 630 631 bool DataChannelConnectionUsrsctp::RaiseStreamLimitTo(uint16_t aNewLimit) { 632 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 633 if (GetState() == DataChannelConnectionState::Closed) { 634 // Smile and nod, could end up here via a dispatch 635 return true; 636 } 637 638 if (mNegotiatedIdLimit == MAX_NUM_STREAMS) { 639 // We're already maxed out! 640 return false; 641 } 642 643 if (aNewLimit <= mNegotiatedIdLimit) { 644 // We already have enough 645 return true; 646 } 647 648 if (aNewLimit > MAX_NUM_STREAMS) { 649 // Hard cap: if someone calls again asking for this much, we'll return 650 // false above 651 aNewLimit = MAX_NUM_STREAMS; 652 } 653 654 struct sctp_status status = {}; 655 socklen_t len = (socklen_t)sizeof(struct sctp_status); 656 if (usrsctp_getsockopt(mSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 657 0) { 658 DC_ERROR(("***failed: getsockopt SCTP_STATUS")); 659 return false; 660 } 661 const uint16_t outStreamsNeeded = 662 aNewLimit - mNegotiatedIdLimit; // number to add 663 664 // Note: if multiple channel opens happen when we don't have enough space, 665 // we'll call RaiseStreamLimitTo() multiple times 666 struct sctp_add_streams sas = {}; 667 sas.sas_instrms = 0; 668 sas.sas_outstrms = outStreamsNeeded; /* XXX error handling */ 669 // Doesn't block, we get an event when it succeeds or fails 670 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, 671 (socklen_t)sizeof(struct sctp_add_streams)) < 0) { 672 if (errno == EALREADY) { 673 // Uhhhh, ok? 674 DC_DEBUG(("Already have %u output streams", outStreamsNeeded)); 675 return true; 676 } 677 678 DC_ERROR(("***failed: setsockopt ADD errno=%d", errno)); 679 return false; 680 } 681 DC_DEBUG(("Requested %u more streams", outStreamsNeeded)); 682 // We add to mNegotiatedIdLimit when we get a SCTP_STREAM_CHANGE_EVENT and the 683 // values are larger than mNegotiatedIdLimit 684 return true; 685 } 686 687 void DataChannelConnectionUsrsctp::SendDeferredMessages() { 688 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 689 RefPtr<DataChannel> channel; // we may null out the refs to this 690 691 DC_DEBUG(("SendDeferredMessages called, pending type: %s", 692 ToString(mPendingType))); 693 if (mPendingType == PendingType::None) { 694 return; 695 } 696 697 // Send pending control messages 698 // Note: If ndata is not active, check if DCEP messages are currently 699 // outstanding. These need to 700 // be sent first before other streams can be used for sending. 701 if (!mBufferedControl.IsEmpty() && 702 (mSendInterleaved || mPendingType == PendingType::Dcep)) { 703 if (SendBufferedMessages(mBufferedControl, nullptr)) { 704 return; 705 } 706 707 // Note: There may or may not be pending data messages 708 mPendingType = PendingType::Data; 709 } 710 711 bool blocked = false; 712 uint32_t i = GetCurrentStreamIndex(); 713 uint32_t end = i; 714 do { 715 channel = mChannels.Get(i); 716 if (!channel) { 717 continue; 718 } 719 720 // Should already be cleared if closing/closed 721 if (channel->mBufferedData.IsEmpty()) { 722 i = UpdateCurrentStreamIndex(); 723 continue; 724 } 725 726 // Send buffered data messages 727 // Warning: This will fail in case ndata is inactive and a previously 728 // deallocated data channel has not been closed properly. If you 729 // ever see that no messages can be sent on any channel, this is 730 // likely the cause (an explicit EOR message partially sent whose 731 // remaining chunks are still being waited for). 732 size_t written = 0; 733 blocked = SendBufferedMessages(channel->mBufferedData, &written); 734 if (written) { 735 channel->DecrementBufferedAmount(written); 736 } 737 738 // Update current stream index 739 // Note: If ndata is not active, the outstanding data messages on this 740 // stream need to be sent first before other streams can be used for 741 // sending. 742 if (mSendInterleaved || !blocked) { 743 i = UpdateCurrentStreamIndex(); 744 } 745 } while (!blocked && i != end); 746 747 if (!blocked) { 748 mPendingType = 749 mBufferedControl.IsEmpty() ? PendingType::None : PendingType::Dcep; 750 } 751 } 752 753 // buffer MUST have at least one item! 754 // returns if we're still blocked (true) 755 bool DataChannelConnectionUsrsctp::SendBufferedMessages( 756 nsTArray<OutgoingMsg>& buffer, size_t* aWritten) { 757 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 758 do { 759 // Re-send message 760 const int error = SendMsgInternal(buffer[0], aWritten); 761 switch (error) { 762 case 0: 763 buffer.RemoveElementAt(0); 764 break; 765 case EAGAIN: 766 #if (EAGAIN != EWOULDBLOCK) 767 case EWOULDBLOCK: 768 #endif 769 return true; 770 default: 771 buffer.RemoveElementAt(0); 772 DC_ERROR(("error on sending: %d", error)); 773 break; 774 } 775 } while (!buffer.IsEmpty()); 776 777 return false; 778 } 779 780 // NOTE: the updated spec from the IETF says we should set in-order until we 781 // receive an ACK. That would make this code moot. Keep it for now for 782 // backwards compatibility. 783 void DataChannelConnectionUsrsctp::OnStreamOpen(uint16_t stream) { 784 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 785 786 nsTArray<UniquePtr<QueuedDataMessage>> temp; 787 std::swap(temp, mQueuedData); 788 789 temp.RemoveElementsBy([stream, this](const auto& dataItem) { 790 const bool match = dataItem->mStream == stream; 791 if (match) { 792 DC_DEBUG(("Delivering queued data for stream %u, length %zu", stream, 793 dataItem->mData.Length())); 794 // Deliver the queued data 795 HandleDataMessageChunk( 796 dataItem->mData.Elements(), dataItem->mData.Length(), dataItem->mPpid, 797 dataItem->mStream, dataItem->mMessageId, dataItem->mFlags); 798 } 799 return match; 800 }); 801 802 std::swap(temp, mQueuedData); 803 } 804 805 bool DataChannelConnectionUsrsctp::HasQueuedData(uint16_t aStream) const { 806 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 807 for (const auto& data : mQueuedData) { 808 if (data->mStream == aStream) { 809 return true; 810 } 811 } 812 return false; 813 } 814 815 void DataChannelConnectionUsrsctp::HandleDataMessageChunk( 816 const void* data, size_t length, uint32_t ppid, uint16_t stream, 817 uint16_t messageId, int flags) { 818 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 819 DC_DEBUG(("%s: stream %u, length %zu, ppid %u, message-id %u", __func__, 820 stream, length, ppid, messageId)); 821 822 RefPtr<DataChannel> channel = FindChannelByStream(stream); 823 824 // XXX A closed channel may trip this... check 825 // NOTE: the updated spec from the IETF says we should set in-order until we 826 // receive an ACK. That would make this code moot. Keep it for now for 827 // backwards compatibility. 828 if (!channel || HasQueuedData(stream)) { 829 // In the updated 0-RTT open case, the sender can send data immediately 830 // after Open, and doesn't set the in-order bit (since we don't have a 831 // response or ack). Also, with external negotiation, data can come in 832 // before we're told about the external negotiation. We need to buffer 833 // data until either a) Open comes in, if the ordering get messed up, 834 // or b) the app tells us this channel was externally negotiated. When 835 // these occur, we deliver the data. 836 837 // Since this is rare and non-performance, keep a single list of queued 838 // data messages to deliver once the channel opens. 839 DC_DEBUG(("Queuing data for stream %u, length %zu", stream, length)); 840 // Copies data 841 mQueuedData.AppendElement( 842 new QueuedDataMessage(stream, ppid, messageId, flags, 843 static_cast<const uint8_t*>(data), length)); 844 return; 845 } 846 847 const char* type = (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL || 848 ppid == DATA_CHANNEL_PPID_DOMSTRING || 849 ppid == DATA_CHANNEL_PPID_DOMSTRING_EMPTY) 850 ? "string" 851 : "binary"; 852 853 auto it = channel->mRecvBuffers.find(messageId); 854 if (it != channel->mRecvBuffers.end()) { 855 IncomingMsg& msg(it->second); 856 if (!ReassembleMessageChunk(msg, data, length, ppid, stream)) { 857 FinishClose_s(channel); 858 return; 859 } 860 861 if (flags & MSG_EOR) { 862 DC_DEBUG( 863 ("%s: last chunk of multi-chunk %s message, id %u, " 864 "stream %u, length %zu", 865 __func__, type, messageId, stream, length)); 866 HandleDataMessage(std::move(msg)); 867 channel->mRecvBuffers.erase(messageId); 868 } else { 869 DC_DEBUG( 870 ("%s: middle chunk of multi-chunk %s message, id %u, " 871 "stream %u, length %zu", 872 __func__, type, messageId, stream, length)); 873 } 874 return; 875 } 876 877 IncomingMsg msg(ppid, stream); 878 if (!ReassembleMessageChunk(msg, data, length, ppid, stream)) { 879 FinishClose_s(channel); 880 return; 881 } 882 883 if (flags & MSG_EOR) { 884 DC_DEBUG( 885 ("%s: single-chunk %s message, id %u, stream %u, " 886 "length %zu", 887 __func__, type, messageId, stream, length)); 888 HandleDataMessage(std::move(msg)); 889 } else { 890 DC_DEBUG( 891 ("%s: first chunk of multi-chunk %s message, id %u, " 892 "stream %u, length %zu", 893 __func__, type, messageId, stream, length)); 894 channel->mRecvBuffers.insert({messageId, std::move(msg)}); 895 } 896 } 897 898 // A sane endpoint should not be fragmenting DCEP, but I think it is allowed 899 // technically? Use the same chunk reassembly logic that we use for DATA. 900 void DataChannelConnectionUsrsctp::HandleDCEPMessageChunk(const void* buffer, 901 size_t length, 902 uint32_t ppid, 903 uint16_t stream, 904 int flags) { 905 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 906 907 if (!mRecvBuffer.isSome()) { 908 mRecvBuffer = Some(IncomingMsg(ppid, stream)); 909 } 910 911 if (!ReassembleMessageChunk(*mRecvBuffer, buffer, length, ppid, stream)) { 912 CloseAll_s(); 913 return; 914 } 915 916 if (!(flags & MSG_EOR)) { 917 DC_DEBUG(("%s: No EOR, waiting for more chunks", __func__)); 918 return; 919 } 920 921 DC_DEBUG(("%s: EOR, handling", __func__)); 922 // Last chunk, ready to go. 923 HandleDCEPMessage(std::move(*mRecvBuffer)); 924 mRecvBuffer = Nothing(); 925 } 926 927 void DataChannelConnectionUsrsctp::HandleMessageChunk( 928 const void* buffer, size_t length, uint32_t ppid, uint16_t stream, 929 uint16_t messageId, int flags) { 930 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 931 932 switch (ppid) { 933 case DATA_CHANNEL_PPID_CONTROL: 934 DC_DEBUG(("%s: Got DCEP message size %zu", __func__, length)); 935 HandleDCEPMessageChunk(buffer, length, ppid, stream, flags); 936 break; 937 case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL: 938 case DATA_CHANNEL_PPID_DOMSTRING: 939 case DATA_CHANNEL_PPID_DOMSTRING_EMPTY: 940 case DATA_CHANNEL_PPID_BINARY_PARTIAL: 941 case DATA_CHANNEL_PPID_BINARY: 942 case DATA_CHANNEL_PPID_BINARY_EMPTY: 943 HandleDataMessageChunk(buffer, length, ppid, stream, messageId, flags); 944 break; 945 default: 946 DC_ERROR(( 947 "Unhandled message of length %zu PPID %u on stream %u received (%s).", 948 length, ppid, stream, (flags & MSG_EOR) ? "complete" : "partial")); 949 break; 950 } 951 } 952 953 void DataChannelConnectionUsrsctp::HandleAssociationChangeEvent( 954 const struct sctp_assoc_change* sac) { 955 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 956 957 uint32_t i, n; 958 DataChannelConnectionState state = GetState(); 959 switch (sac->sac_state) { 960 case SCTP_COMM_UP: 961 DC_DEBUG(("Association change: SCTP_COMM_UP")); 962 if (state == DataChannelConnectionState::Connecting) { 963 SetState(DataChannelConnectionState::Open); 964 965 DC_DEBUG(("Negotiated number of incoming streams: %" PRIu16, 966 sac->sac_inbound_streams)); 967 DC_DEBUG(("Negotiated number of outgoing streams: %" PRIu16, 968 sac->sac_outbound_streams)); 969 mNegotiatedIdLimit = std::max( 970 mNegotiatedIdLimit, 971 std::max(sac->sac_outbound_streams, sac->sac_inbound_streams)); 972 973 DC_DEBUG(("DTLS connect() succeeded! Entering connected mode")); 974 975 // Open any streams pending... 976 ProcessQueuedOpens(); 977 978 } else if (state == DataChannelConnectionState::Open) { 979 DC_DEBUG(("DataConnection Already OPEN")); 980 } else { 981 DC_ERROR(("Unexpected state: %s", ToString(state))); 982 } 983 break; 984 case SCTP_COMM_LOST: 985 DC_DEBUG(("Association change: SCTP_COMM_LOST")); 986 // This association is toast, so also close all the channels 987 CloseAll_s(); 988 break; 989 case SCTP_RESTART: 990 DC_DEBUG(("Association change: SCTP_RESTART")); 991 break; 992 case SCTP_SHUTDOWN_COMP: 993 DC_DEBUG(("Association change: SCTP_SHUTDOWN_COMP")); 994 CloseAll_s(); 995 break; 996 case SCTP_CANT_STR_ASSOC: 997 DC_DEBUG(("Association change: SCTP_CANT_STR_ASSOC")); 998 break; 999 default: 1000 DC_DEBUG(("Association change: UNKNOWN")); 1001 break; 1002 } 1003 DC_DEBUG(("Association change: streams (in/out) = (%u/%u)", 1004 sac->sac_inbound_streams, sac->sac_outbound_streams)); 1005 1006 if (NS_WARN_IF(!sac)) { 1007 return; 1008 } 1009 1010 n = sac->sac_length - sizeof(*sac); 1011 if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) { 1012 if (n > 0) { 1013 for (i = 0; i < n; ++i) { 1014 switch (sac->sac_info[i]) { 1015 case SCTP_ASSOC_SUPPORTS_PR: 1016 DC_DEBUG(("Supports: PR")); 1017 break; 1018 case SCTP_ASSOC_SUPPORTS_AUTH: 1019 DC_DEBUG(("Supports: AUTH")); 1020 break; 1021 case SCTP_ASSOC_SUPPORTS_ASCONF: 1022 DC_DEBUG(("Supports: ASCONF")); 1023 break; 1024 case SCTP_ASSOC_SUPPORTS_MULTIBUF: 1025 DC_DEBUG(("Supports: MULTIBUF")); 1026 break; 1027 case SCTP_ASSOC_SUPPORTS_RE_CONFIG: 1028 DC_DEBUG(("Supports: RE-CONFIG")); 1029 break; 1030 #if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING) 1031 case SCTP_ASSOC_SUPPORTS_INTERLEAVING: 1032 DC_DEBUG(("Supports: NDATA")); 1033 // TODO: This should probably be set earlier above in 'case 1034 // SCTP_COMM_UP' but we also need this for 'SCTP_RESTART'. 1035 mSendInterleaved = true; 1036 break; 1037 #endif 1038 default: 1039 DC_ERROR(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); 1040 break; 1041 } 1042 } 1043 } 1044 } else if (((sac->sac_state == SCTP_COMM_LOST) || 1045 (sac->sac_state == SCTP_CANT_STR_ASSOC)) && 1046 (n > 0)) { 1047 DC_DEBUG(("Association: ABORT =")); 1048 for (i = 0; i < n; ++i) { 1049 DC_DEBUG((" 0x%02x", sac->sac_info[i])); 1050 } 1051 } 1052 if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || 1053 (sac->sac_state == SCTP_SHUTDOWN_COMP) || 1054 (sac->sac_state == SCTP_COMM_LOST)) { 1055 return; 1056 } 1057 } 1058 1059 void DataChannelConnectionUsrsctp::HandlePeerAddressChangeEvent( 1060 const struct sctp_paddr_change* spc) { 1061 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1062 const char* addr = ""; 1063 #if !defined(__Userspace_os_Windows) 1064 char addr_buf[INET6_ADDRSTRLEN]; 1065 struct sockaddr_in* sin; 1066 struct sockaddr_in6* sin6; 1067 #endif 1068 1069 switch (spc->spc_aaddr.ss_family) { 1070 case AF_INET: 1071 #if !defined(__Userspace_os_Windows) 1072 sin = (struct sockaddr_in*)&spc->spc_aaddr; 1073 addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); 1074 #endif 1075 break; 1076 case AF_INET6: 1077 #if !defined(__Userspace_os_Windows) 1078 sin6 = (struct sockaddr_in6*)&spc->spc_aaddr; 1079 addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); 1080 #endif 1081 break; 1082 case AF_CONN: 1083 addr = "DTLS connection"; 1084 break; 1085 default: 1086 break; 1087 } 1088 DC_DEBUG(("Peer address %s is now ", addr)); 1089 switch (spc->spc_state) { 1090 case SCTP_ADDR_AVAILABLE: 1091 DC_DEBUG(("SCTP_ADDR_AVAILABLE")); 1092 break; 1093 case SCTP_ADDR_UNREACHABLE: 1094 DC_DEBUG(("SCTP_ADDR_UNREACHABLE")); 1095 break; 1096 case SCTP_ADDR_REMOVED: 1097 DC_DEBUG(("SCTP_ADDR_REMOVED")); 1098 break; 1099 case SCTP_ADDR_ADDED: 1100 DC_DEBUG(("SCTP_ADDR_ADDED")); 1101 break; 1102 case SCTP_ADDR_MADE_PRIM: 1103 DC_DEBUG(("SCTP_ADDR_MADE_PRIM")); 1104 break; 1105 case SCTP_ADDR_CONFIRMED: 1106 DC_DEBUG(("SCTP_ADDR_CONFIRMED")); 1107 break; 1108 default: 1109 DC_ERROR(("UNKNOWN SCP STATE")); 1110 break; 1111 } 1112 if (spc->spc_error) { 1113 DC_ERROR((" (error = 0x%08x).\n", spc->spc_error)); 1114 } 1115 } 1116 1117 void DataChannelConnectionUsrsctp::HandleRemoteErrorEvent( 1118 const struct sctp_remote_error* sre) { 1119 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1120 size_t i, n; 1121 1122 n = sre->sre_length - sizeof(struct sctp_remote_error); 1123 DC_WARN(("Remote Error (error = 0x%04x): ", sre->sre_error)); 1124 for (i = 0; i < n; ++i) { 1125 DC_WARN((" 0x%02x", sre->sre_data[i])); 1126 } 1127 } 1128 1129 void DataChannelConnectionUsrsctp::HandleShutdownEvent( 1130 const struct sctp_shutdown_event* sse) { 1131 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1132 DC_DEBUG(("Shutdown event.")); 1133 /* XXX: notify all channels. */ 1134 // Attempts to actually send anything will fail 1135 } 1136 1137 void DataChannelConnectionUsrsctp::HandleAdaptationIndication( 1138 const struct sctp_adaptation_event* sai) { 1139 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1140 DC_DEBUG(("Adaptation indication: %x.", sai->sai_adaptation_ind)); 1141 } 1142 1143 void DataChannelConnectionUsrsctp::HandlePartialDeliveryEvent( 1144 const struct sctp_pdapi_event* spde) { 1145 // Note: Be aware that stream and sequence number being u32 instead of u16 is 1146 // a bug in the SCTP API. This may change in the future. 1147 1148 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1149 DC_DEBUG(("Partial delivery event: ")); 1150 switch (spde->pdapi_indication) { 1151 case SCTP_PARTIAL_DELIVERY_ABORTED: 1152 DC_DEBUG(("delivery aborted ")); 1153 break; 1154 default: 1155 DC_ERROR(("??? ")); 1156 break; 1157 } 1158 DC_DEBUG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, 1159 spde->pdapi_flags, spde->pdapi_stream, spde->pdapi_seq)); 1160 1161 // Validate stream ID 1162 if (spde->pdapi_stream >= UINT16_MAX) { 1163 DC_ERROR(("Invalid stream id in partial delivery event: %" PRIu32 "\n", 1164 spde->pdapi_stream)); 1165 return; 1166 } 1167 1168 // Find channel and reset buffer 1169 RefPtr<DataChannel> channel = 1170 FindChannelByStream((uint16_t)spde->pdapi_stream); 1171 if (channel) { 1172 auto it = channel->mRecvBuffers.find(spde->pdapi_seq); 1173 if (it != channel->mRecvBuffers.end()) { 1174 DC_WARN(("Abort partially delivered message of %zu bytes\n", 1175 it->second.GetLength())); 1176 channel->mRecvBuffers.erase(it); 1177 } else { 1178 // Uhhh, ok? 1179 DC_WARN( 1180 ("Abort partially delivered message that we've never seen any " 1181 "of? What?")); 1182 } 1183 } 1184 } 1185 1186 void DataChannelConnectionUsrsctp::HandleSendFailedEvent( 1187 const struct sctp_send_failed_event* ssfe) { 1188 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1189 size_t i, n; 1190 1191 if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { 1192 DC_DEBUG(("Unsent ")); 1193 } 1194 if (ssfe->ssfe_flags & SCTP_DATA_SENT) { 1195 DC_DEBUG(("Sent ")); 1196 } 1197 if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { 1198 DC_DEBUG(("(flags = %x) ", ssfe->ssfe_flags)); 1199 } 1200 #ifdef XP_WIN 1201 # define PRIPPID "lu" 1202 #else 1203 # define PRIPPID "u" 1204 #endif 1205 DC_DEBUG(("message with PPID = %" PRIPPID 1206 ", SID = %d, flags: 0x%04x due to error = 0x%08x", 1207 ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, 1208 ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); 1209 #undef PRIPPID 1210 n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); 1211 for (i = 0; i < n; ++i) { 1212 DC_DEBUG((" 0x%02x", ssfe->ssfe_data[i])); 1213 } 1214 } 1215 1216 bool DataChannelConnectionUsrsctp::ResetStreams(nsTArray<uint16_t>& aStreams) { 1217 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1218 1219 DC_DEBUG(("%s %p: Sending outgoing stream reset for %zu streams", __func__, 1220 this, aStreams.Length())); 1221 if (aStreams.IsEmpty()) { 1222 DC_DEBUG(("No streams to reset")); 1223 return false; 1224 } 1225 const size_t len = 1226 sizeof(sctp_reset_streams) + (aStreams.Length()) * sizeof(uint16_t); 1227 struct sctp_reset_streams* srs = static_cast<struct sctp_reset_streams*>( 1228 moz_xmalloc(len)); // infallible malloc 1229 memset(srs, 0, len); 1230 srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; 1231 srs->srs_number_streams = aStreams.Length(); 1232 for (size_t i = 0; i < aStreams.Length(); ++i) { 1233 srs->srs_stream_list[i] = aStreams[i]; 1234 } 1235 if (usrsctp_setsockopt(mSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, 1236 (socklen_t)len) < 0) { 1237 DC_ERROR(("***failed: setsockopt RESET, errno %d", errno)); 1238 // if errno == EALREADY, this is normal - we can't send another reset 1239 // with one pending. 1240 // When we get an incoming reset (which may be a response to our 1241 // outstanding one), see if we have any pending outgoing resets and 1242 // send them 1243 } else { 1244 aStreams.Clear(); 1245 } 1246 free(srs); 1247 return aStreams.Length() == 0; 1248 } 1249 1250 void DataChannelConnectionUsrsctp::HandleStreamResetEvent( 1251 const struct sctp_stream_reset_event* strrst) { 1252 std::vector<uint16_t> streamsReset; 1253 1254 if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && 1255 !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { 1256 size_t n = 1257 (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / 1258 sizeof(uint16_t); 1259 for (size_t i = 0; i < n; ++i) { 1260 streamsReset.push_back(strrst->strreset_stream_list[i]); 1261 } 1262 } 1263 1264 if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { 1265 OnStreamsReset(std::move(streamsReset)); 1266 } else if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) { 1267 OnStreamsResetComplete(std::move(streamsReset)); 1268 } 1269 } 1270 1271 void DataChannelConnectionUsrsctp::HandleStreamChangeEvent( 1272 const struct sctp_stream_change_event* strchg) { 1273 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1274 if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { 1275 DC_ERROR(("*** Failed increasing number of streams from %u (%u/%u)", 1276 mNegotiatedIdLimit, strchg->strchange_instrms, 1277 strchg->strchange_outstrms)); 1278 // XXX FIX! notify pending opens of failure 1279 return; 1280 } 1281 if (strchg->strchange_instrms > mNegotiatedIdLimit) { 1282 DC_DEBUG(("Other side increased streams from %u to %u", mNegotiatedIdLimit, 1283 strchg->strchange_instrms)); 1284 } 1285 uint16_t old_limit = mNegotiatedIdLimit; 1286 uint16_t new_limit = 1287 std::min((uint16_t)MAX_NUM_STREAMS, 1288 std::max(strchg->strchange_outstrms, strchg->strchange_instrms)); 1289 if (new_limit > mNegotiatedIdLimit) { 1290 DC_DEBUG(("Increasing number of streams from %u to %u - adding %u (in: %u)", 1291 old_limit, new_limit, new_limit - old_limit, 1292 strchg->strchange_instrms)); 1293 // make sure both are the same length 1294 mNegotiatedIdLimit = new_limit; 1295 DC_DEBUG(("New length = %u (was %u)", mNegotiatedIdLimit, old_limit)); 1296 // Re-process any channels waiting for streams. 1297 // Linear search, but we don't increase channels often and 1298 // the array would only get long in case of an app error normally 1299 1300 // Make sure we request enough streams if there's a big jump in streams 1301 // Could make a more complex API for OpenXxxFinish() and avoid this loop 1302 auto channels = mChannels.GetAll(); 1303 size_t num_needed = 1304 channels.Length() ? (channels.LastElement()->mStream + 1) : 0; 1305 Maybe<uint16_t> num_desired; 1306 MOZ_ASSERT(num_needed != INVALID_STREAM); 1307 if (num_needed > new_limit) { 1308 // Round up to a multiple of 16, or cap out 1309 num_desired = 1310 Some(std::min(16 * (num_needed / 16 + 1), (size_t)MAX_NUM_STREAMS)); 1311 DC_DEBUG(("Not enough new streams, asking for %u", *num_desired)); 1312 } else if (strchg->strchange_outstrms < strchg->strchange_instrms) { 1313 num_desired = Some(strchg->strchange_instrms); 1314 DC_DEBUG(("Requesting %u output streams to match partner", *num_desired)); 1315 } 1316 1317 if (num_desired.isSome()) { 1318 RaiseStreamLimitTo(*num_desired); 1319 } 1320 1321 ProcessQueuedOpens(); 1322 } 1323 // else probably not a change in # of streams 1324 1325 if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || 1326 (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { 1327 // Other side denied our request. Need to AnnounceClosed some stuff. 1328 for (auto& channel : mChannels.GetAll()) { 1329 if (channel->mStream >= mNegotiatedIdLimit) { 1330 /* XXX: Signal to the other end. */ 1331 FinishClose_s(channel); 1332 // maybe fire onError (bug 843625) 1333 } 1334 } 1335 } 1336 } 1337 1338 void DataChannelConnectionUsrsctp::HandleNotification( 1339 const union sctp_notification* notif, size_t n) { 1340 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1341 if (notif->sn_header.sn_length != (uint32_t)n) { 1342 return; 1343 } 1344 switch (notif->sn_header.sn_type) { 1345 case SCTP_ASSOC_CHANGE: 1346 HandleAssociationChangeEvent(&(notif->sn_assoc_change)); 1347 break; 1348 case SCTP_PEER_ADDR_CHANGE: 1349 HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); 1350 break; 1351 case SCTP_REMOTE_ERROR: 1352 HandleRemoteErrorEvent(&(notif->sn_remote_error)); 1353 break; 1354 case SCTP_SHUTDOWN_EVENT: 1355 HandleShutdownEvent(&(notif->sn_shutdown_event)); 1356 break; 1357 case SCTP_ADAPTATION_INDICATION: 1358 HandleAdaptationIndication(&(notif->sn_adaptation_event)); 1359 break; 1360 case SCTP_AUTHENTICATION_EVENT: 1361 DC_DEBUG(("SCTP_AUTHENTICATION_EVENT")); 1362 break; 1363 case SCTP_SENDER_DRY_EVENT: 1364 // DC_DEBUG(("SCTP_SENDER_DRY_EVENT")); 1365 break; 1366 case SCTP_NOTIFICATIONS_STOPPED_EVENT: 1367 DC_DEBUG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); 1368 break; 1369 case SCTP_PARTIAL_DELIVERY_EVENT: 1370 HandlePartialDeliveryEvent(&(notif->sn_pdapi_event)); 1371 break; 1372 case SCTP_SEND_FAILED_EVENT: 1373 HandleSendFailedEvent(&(notif->sn_send_failed_event)); 1374 break; 1375 case SCTP_STREAM_RESET_EVENT: 1376 HandleStreamResetEvent(&(notif->sn_strreset_event)); 1377 break; 1378 case SCTP_ASSOC_RESET_EVENT: 1379 DC_DEBUG(("SCTP_ASSOC_RESET_EVENT")); 1380 break; 1381 case SCTP_STREAM_CHANGE_EVENT: 1382 HandleStreamChangeEvent(&(notif->sn_strchange_event)); 1383 break; 1384 default: 1385 DC_ERROR(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); 1386 break; 1387 } 1388 } 1389 1390 int DataChannelConnectionUsrsctp::ReceiveCallback(struct socket* sock, 1391 void* data, size_t datalen, 1392 struct sctp_rcvinfo rcv, 1393 int flags) { 1394 MOZ_ASSERT(!NS_IsMainThread()); 1395 DC_DEBUG(("In ReceiveCallback")); 1396 1397 mSTS->Dispatch(NS_NewRunnableFunction( 1398 "DataChannelConnection::ReceiveCallback", 1399 [data, datalen, rcv, flags, this, 1400 self = RefPtr<DataChannelConnection>(this)]() mutable { 1401 if (!data) { 1402 DC_DEBUG(("ReceiveCallback: SCTP has finished shutting down")); 1403 } else { 1404 if (flags & MSG_NOTIFICATION) { 1405 HandleNotification(static_cast<union sctp_notification*>(data), 1406 datalen); 1407 } else { 1408 // NOTE: When interleaved mode is in use, rcv.rcv_ssn holds the 1409 // message id instead of the stream sequence number, based on a read 1410 // of the usrsctp code. 1411 HandleMessageChunk(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, 1412 rcv.rcv_ssn, flags); 1413 } 1414 // sctp allocates 'data' with malloc(), and expects the receiver to 1415 // free it. 1416 // It would be nice if it were possible to eliminate a copy by passing 1417 // ownership here, but because DATA messages end up in an nsCString, 1418 // and ncCString requires null termination (which usrsctp does not 1419 // do), we _have_ to make a copy somewhere. That might as well be 1420 // here. The downstream code can avoid further copies in whatever way 1421 // makes sense. 1422 free(data); 1423 } 1424 })); 1425 1426 // usrsctp defines the callback as returning an int, but doesn't use it 1427 return 1; 1428 } 1429 1430 // Returns a POSIX error code directly instead of setting errno. 1431 int DataChannelConnectionUsrsctp::SendMsgInternal(OutgoingMsg& msg, 1432 size_t* aWritten) { 1433 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1434 1435 struct sctp_sendv_spa info = {}; 1436 // General flags 1437 info.sendv_flags = SCTP_SEND_SNDINFO_VALID; 1438 1439 // Set stream identifier and protocol identifier 1440 info.sendv_sndinfo.snd_sid = msg.GetMetadata().mStreamId; 1441 info.sendv_sndinfo.snd_ppid = htonl(msg.GetMetadata().mPpid); 1442 1443 if (msg.GetMetadata().mUnordered) { 1444 info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED; 1445 } 1446 1447 // Partial reliability policy, lifetime and rtx are mutually exclusive 1448 msg.GetMetadata().mMaxLifetimeMs.apply([&](auto value) { 1449 info.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; 1450 info.sendv_prinfo.pr_value = value; 1451 info.sendv_flags |= SCTP_SEND_PRINFO_VALID; 1452 }); 1453 1454 msg.GetMetadata().mMaxRetransmissions.apply([&](auto value) { 1455 info.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; 1456 info.sendv_prinfo.pr_value = value; 1457 info.sendv_flags |= SCTP_SEND_PRINFO_VALID; 1458 }); 1459 1460 // Send until buffer is empty 1461 Span<const uint8_t> chunk = msg.GetRemainingData(); 1462 do { 1463 if (chunk.Length() <= DATA_CHANNEL_MAX_BINARY_FRAGMENT) { 1464 // Last chunk! 1465 info.sendv_sndinfo.snd_flags |= SCTP_EOR; 1466 } else { 1467 chunk = chunk.To(DATA_CHANNEL_MAX_BINARY_FRAGMENT); 1468 } 1469 1470 // Send (or try at least) 1471 // SCTP will return EMSGSIZE if the message is bigger than the buffer 1472 // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE 1473 // by carefully crafting small enough message chunks. 1474 const ssize_t writtenOrError = usrsctp_sendv( 1475 mSocket, chunk.Elements(), chunk.Length(), nullptr, 0, (void*)&info, 1476 (socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0); 1477 1478 if (writtenOrError < 0) { 1479 return errno; 1480 } 1481 1482 const size_t written = writtenOrError; 1483 1484 if (aWritten && 1485 msg.GetMetadata().mPpid != DATA_CHANNEL_PPID_DOMSTRING_EMPTY && 1486 msg.GetMetadata().mPpid != DATA_CHANNEL_PPID_BINARY_EMPTY) { 1487 *aWritten += written; 1488 } 1489 DC_DEBUG(("Sent buffer (written=%zu, len=%zu, left=%zu)", written, 1490 chunk.Length(), msg.GetRemainingData().Length() - written)); 1491 1492 // TODO: Remove once resolved 1493 // (https://github.com/sctplab/usrsctp/issues/132) 1494 if (written == 0) { 1495 DC_ERROR(("@tuexen: usrsctp_sendv returned 0")); 1496 return EAGAIN; 1497 } 1498 1499 // Update buffer position 1500 msg.Advance(written); 1501 1502 // If not all bytes have been written, this obviously means that usrsctp's 1503 // buffer is full and we need to try again later. 1504 if (written < chunk.Length()) { 1505 return EAGAIN; 1506 } 1507 1508 chunk = msg.GetRemainingData(); 1509 } while (chunk.Length() > 0); 1510 1511 return 0; 1512 } 1513 1514 // Returns a POSIX error code directly instead of setting errno. 1515 int DataChannelConnectionUsrsctp::SendMsgInternalOrBuffer( 1516 nsTArray<OutgoingMsg>& buffer, OutgoingMsg&& msg, bool* buffered, 1517 size_t* aWritten) { 1518 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1519 NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!"); 1520 1521 int error = 0; 1522 bool need_buffering = false; 1523 1524 if (buffer.IsEmpty() && 1525 (mSendInterleaved || mPendingType == PendingType::None)) { 1526 error = SendMsgInternal(msg, aWritten); 1527 switch (error) { 1528 case 0: 1529 break; 1530 case EAGAIN: 1531 #if (EAGAIN != EWOULDBLOCK) 1532 case EWOULDBLOCK: 1533 #endif 1534 need_buffering = true; 1535 break; 1536 default: 1537 DC_ERROR(("error %d on sending", error)); 1538 break; 1539 } 1540 } else { 1541 need_buffering = true; 1542 } 1543 1544 if (need_buffering) { 1545 // queue data for resend! And queue any further data for the stream until 1546 // it is... 1547 buffer.EmplaceBack(std::move(msg)); 1548 DC_DEBUG(("Queued %zu buffers (left=%zu, total=%zu)", buffer.Length(), 1549 buffer.LastElement().GetLength(), msg.GetLength())); 1550 if (buffered) { 1551 *buffered = true; 1552 } 1553 return 0; 1554 } 1555 1556 if (buffered) { 1557 *buffered = false; 1558 } 1559 return error; 1560 } 1561 1562 int DataChannelConnectionUsrsctp::SendMessage(DataChannel& aChannel, 1563 OutgoingMsg&& aMsg) { 1564 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1565 bool buffered; 1566 if (aMsg.GetMetadata().mPpid == DATA_CHANNEL_PPID_CONTROL) { 1567 int error = SendMsgInternalOrBuffer(mBufferedControl, std::move(aMsg), 1568 &buffered, nullptr); 1569 // Set pending type (if buffered) 1570 if (!error && buffered && mPendingType == PendingType::None) { 1571 mPendingType = PendingType::Dcep; 1572 } 1573 return error; 1574 } 1575 1576 size_t written = 0; 1577 if (const int error = SendMsgInternalOrBuffer( 1578 aChannel.mBufferedData, std::move(aMsg), &buffered, &written); 1579 error) { 1580 return error; 1581 } 1582 1583 if (written && 1584 aMsg.GetMetadata().mPpid != DATA_CHANNEL_PPID_DOMSTRING_EMPTY && 1585 aMsg.GetMetadata().mPpid != DATA_CHANNEL_PPID_BINARY_EMPTY) { 1586 aChannel.DecrementBufferedAmount(written); 1587 } 1588 1589 // Set pending type and stream index (if buffered) 1590 if (buffered && mPendingType == PendingType::None) { 1591 mPendingType = PendingType::Data; 1592 mCurrentStream = aChannel.mStream; 1593 } 1594 1595 return 0; 1596 } 1597 1598 } // namespace mozilla