test_postqueue.js (25180B)
1 /* Any copyright is dedicated to the Public Domain. 2 * http://creativecommons.org/publicdomain/zero/1.0/ */ 3 4 let { PostQueue } = ChromeUtils.importESModule( 5 "resource://services-sync/record.sys.mjs" 6 ); 7 8 function makeRecord(nbytes) { 9 return { 10 toJSON: () => ({ payload: "x".repeat(nbytes) }), 11 }; 12 } 13 14 // Note: This is 14 bytes. Tests make assumptions about this (even if it's just 15 // in setting config.max_request_bytes to a specific value). 16 makeRecord.nonPayloadOverhead = JSON.stringify(makeRecord(0).toJSON()).length; 17 18 // Gives how many encoded bytes a request with the given payload 19 // sizes will be (assuming the records were created by makeRecord) 20 // requestBytesFor([20]) => 22, requestBytesFor([20, 20]) => 43 21 function requestBytesFor(recordPayloadByteCounts) { 22 let requestBytes = 1; 23 for (let size of recordPayloadByteCounts) { 24 requestBytes += size + 1 + makeRecord.nonPayloadOverhead; 25 } 26 return requestBytes; 27 } 28 29 function makePostQueue(config, lastModTime, responseGenerator) { 30 let stats = { 31 posts: [], 32 batches: [], 33 }; 34 let poster = (data, headers, batch, commit) => { 35 let payloadBytes = 0; 36 let numRecords = 0; 37 for (let record of JSON.parse(data)) { 38 if (config.max_record_payload_bytes) { 39 less( 40 record.payload.length, 41 config.max_record_payload_bytes, 42 "PostQueue should respect max_record_payload_bytes" 43 ); 44 } 45 payloadBytes += record.payload.length; 46 ++numRecords; 47 } 48 49 let thisPost = { 50 nbytes: data.length, 51 batch, 52 commit, 53 payloadBytes, 54 numRecords, 55 }; 56 57 if (headers.length) { 58 thisPost.headers = headers; 59 } 60 61 // check that we respected the provided limits for the post 62 if (config.max_post_records) { 63 lessOrEqual( 64 numRecords, 65 config.max_post_records, 66 "PostQueue should respect max_post_records" 67 ); 68 } 69 70 if (config.max_post_bytes) { 71 less( 72 payloadBytes, 73 config.max_post_bytes, 74 "PostQueue should respect max_post_bytes" 75 ); 76 } 77 78 if (config.max_request_bytes) { 79 less( 80 thisPost.nbytes, 81 config.max_request_bytes, 82 "PostQueue should respect max_request_bytes" 83 ); 84 } 85 86 stats.posts.push(thisPost); 87 88 // Call this now so we can check if there's a batch id in it. 89 // Kind of cludgey, but allows us to have the correct batch id even 90 // before the next post is made. 91 let nextResponse = responseGenerator.next().value; 92 93 // Record info for the batch. 94 95 let curBatch = stats.batches[stats.batches.length - 1]; 96 // If there's no batch, it committed, or we requested a new one, 97 // then we need to start a new one. 98 if (!curBatch || batch == "true" || curBatch.didCommit) { 99 curBatch = { 100 posts: 0, 101 payloadBytes: 0, 102 numRecords: 0, 103 didCommit: false, 104 batch, 105 serverBatch: false, 106 }; 107 if (nextResponse.obj && nextResponse.obj.batch) { 108 curBatch.batch = nextResponse.obj.batch; 109 curBatch.serverBatch = true; 110 } 111 stats.batches.push(curBatch); 112 } 113 114 // If we provided a batch id, it must be the same as the current batch 115 if (batch && batch != "true") { 116 equal(curBatch.batch, batch); 117 } 118 119 curBatch.posts += 1; 120 curBatch.payloadBytes += payloadBytes; 121 curBatch.numRecords += numRecords; 122 curBatch.didCommit = commit; 123 124 // if this is an actual server batch (or it's a one-shot batch), check that 125 // we respected the provided total limits 126 if (commit && (batch == "true" || curBatch.serverBatch)) { 127 if (config.max_total_records) { 128 lessOrEqual( 129 curBatch.numRecords, 130 config.max_total_records, 131 "PostQueue should respect max_total_records" 132 ); 133 } 134 135 if (config.max_total_bytes) { 136 less( 137 curBatch.payloadBytes, 138 config.max_total_bytes, 139 "PostQueue should respect max_total_bytes" 140 ); 141 } 142 } 143 144 return Promise.resolve(nextResponse); 145 }; 146 147 let done = () => {}; 148 let pq = new PostQueue(poster, lastModTime, config, getTestLogger(), done); 149 return { pq, stats }; 150 } 151 152 add_task(async function test_simple() { 153 let config = { 154 max_request_bytes: 1000, 155 max_record_payload_bytes: 1000, 156 }; 157 158 const time = 11111111; 159 160 function* responseGenerator() { 161 yield { 162 success: true, 163 status: 200, 164 headers: { 165 "x-weave-timestamp": time + 100, 166 "x-last-modified": time + 100, 167 }, 168 }; 169 } 170 171 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 172 await pq.enqueue(makeRecord(10)); 173 await pq.flush(true); 174 175 deepEqual(stats.posts, [ 176 { 177 nbytes: requestBytesFor([10]), 178 payloadBytes: 10, 179 numRecords: 1, 180 commit: true, // we don't know if we have batch semantics, so committed. 181 headers: [["x-if-unmodified-since", time]], 182 batch: "true", 183 }, 184 ]); 185 deepEqual(stats.batches, [ 186 { 187 posts: 1, 188 payloadBytes: 10, 189 numRecords: 1, 190 didCommit: true, 191 batch: "true", 192 serverBatch: false, 193 }, 194 ]); 195 }); 196 197 // Test we do the right thing when we need to make multiple posts when there 198 // are no batch semantics 199 add_task(async function test_max_request_bytes_no_batch() { 200 let config = { 201 max_request_bytes: 50, 202 max_record_payload_bytes: 50, 203 }; 204 205 const time = 11111111; 206 function* responseGenerator() { 207 yield { 208 success: true, 209 status: 200, 210 headers: { 211 "x-weave-timestamp": time + 100, 212 "x-last-modified": time + 100, 213 }, 214 }; 215 yield { 216 success: true, 217 status: 200, 218 headers: { 219 "x-weave-timestamp": time + 200, 220 "x-last-modified": time + 200, 221 }, 222 }; 223 } 224 225 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 226 let payloadSize = 20 - makeRecord.nonPayloadOverhead; 227 await pq.enqueue(makeRecord(payloadSize)); // total size now 22 bytes - "[" + record + "]" 228 await pq.enqueue(makeRecord(payloadSize)); // total size now 43 bytes - "[" + record + "," + record + "]" 229 await pq.enqueue(makeRecord(payloadSize)); // this will exceed our byte limit, so will be in the 2nd POST. 230 await pq.flush(true); 231 deepEqual(stats.posts, [ 232 { 233 nbytes: 43, // 43 for the first part 234 payloadBytes: payloadSize * 2, 235 numRecords: 2, 236 commit: false, 237 headers: [["x-if-unmodified-since", time]], 238 batch: "true", 239 }, 240 { 241 nbytes: 22, 242 payloadBytes: payloadSize, 243 numRecords: 1, 244 commit: false, // we know we aren't in a batch, so never commit. 245 headers: [["x-if-unmodified-since", time + 100]], 246 batch: null, 247 }, 248 ]); 249 equal(stats.batches.filter(x => x.didCommit).length, 0); 250 equal(pq.lastModified, time + 200); 251 }); 252 253 add_task(async function test_max_record_payload_bytes_no_batch() { 254 let config = { 255 max_request_bytes: 100, 256 max_record_payload_bytes: 50, 257 }; 258 259 const time = 11111111; 260 261 function* responseGenerator() { 262 yield { 263 success: true, 264 status: 200, 265 headers: { 266 "x-weave-timestamp": time + 100, 267 "x-last-modified": time + 100, 268 }, 269 }; 270 } 271 272 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 273 // Should trigger when the record really is too large to fit 274 let { enqueued } = await pq.enqueue(makeRecord(51)); 275 ok(!enqueued); 276 // Shouldn't trigger when the encoded record is too big 277 ok( 278 (await pq.enqueue(makeRecord(50 - makeRecord.nonPayloadOverhead))).enqueued 279 ); // total size now 52 bytes - "[" + record + "]" 280 ok( 281 (await pq.enqueue(makeRecord(46 - makeRecord.nonPayloadOverhead))).enqueued 282 ); // total size now 99 bytes - "[" + record0 + "," + record1 + "]" 283 284 await pq.flush(true); 285 286 deepEqual(stats.posts, [ 287 { 288 nbytes: 99, 289 payloadBytes: 50 + 46 - makeRecord.nonPayloadOverhead * 2, 290 numRecords: 2, 291 commit: true, // we know we aren't in a batch, so never commit. 292 batch: "true", 293 headers: [["x-if-unmodified-since", time]], 294 }, 295 ]); 296 297 deepEqual(stats.batches, [ 298 { 299 posts: 1, 300 payloadBytes: 50 + 46 - makeRecord.nonPayloadOverhead * 2, 301 numRecords: 2, 302 didCommit: true, 303 batch: "true", 304 serverBatch: false, 305 }, 306 ]); 307 308 equal(pq.lastModified, time + 100); 309 }); 310 311 // Batch tests. 312 313 // Test making a single post when batch semantics are in place. 314 315 add_task(async function test_single_batch() { 316 let config = { 317 max_post_bytes: 1000, 318 max_post_records: 100, 319 max_total_records: 200, 320 max_record_payload_bytes: 1000, 321 }; 322 const time = 11111111; 323 function* responseGenerator() { 324 yield { 325 success: true, 326 status: 202, 327 obj: { batch: 1234 }, 328 headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 }, 329 }; 330 } 331 332 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 333 ok((await pq.enqueue(makeRecord(10))).enqueued); 334 await pq.flush(true); 335 336 deepEqual(stats.posts, [ 337 { 338 nbytes: requestBytesFor([10]), 339 numRecords: 1, 340 payloadBytes: 10, 341 commit: true, // we don't know if we have batch semantics, so committed. 342 batch: "true", 343 headers: [["x-if-unmodified-since", time]], 344 }, 345 ]); 346 347 deepEqual(stats.batches, [ 348 { 349 posts: 1, 350 payloadBytes: 10, 351 numRecords: 1, 352 didCommit: true, 353 batch: 1234, 354 serverBatch: true, 355 }, 356 ]); 357 }); 358 359 // Test we do the right thing when we need to make multiple posts due to 360 // max_post_bytes when there are batch semantics in place. 361 add_task(async function test_max_post_bytes_batch() { 362 let config = { 363 max_post_bytes: 50, 364 max_post_records: 4, 365 max_total_bytes: 5000, 366 max_total_records: 100, 367 max_record_payload_bytes: 50, 368 max_request_bytes: 4000, 369 }; 370 371 const time = 11111111; 372 function* responseGenerator() { 373 yield { 374 success: true, 375 status: 202, 376 obj: { batch: 1234 }, 377 headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 }, 378 }; 379 yield { 380 success: true, 381 status: 202, 382 obj: { batch: 1234 }, 383 headers: { 384 "x-last-modified": time + 200, 385 "x-weave-timestamp": time + 200, 386 }, 387 }; 388 } 389 390 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 391 ok((await pq.enqueue(makeRecord(20))).enqueued); // 20 392 ok((await pq.enqueue(makeRecord(20))).enqueued); // 40 393 // 60 would overflow, so post 394 ok((await pq.enqueue(makeRecord(20))).enqueued); // 20 395 await pq.flush(true); 396 397 deepEqual(stats.posts, [ 398 { 399 nbytes: requestBytesFor([20, 20]), 400 payloadBytes: 40, 401 numRecords: 2, 402 commit: false, 403 batch: "true", 404 headers: [["x-if-unmodified-since", time]], 405 }, 406 { 407 nbytes: requestBytesFor([20]), 408 payloadBytes: 20, 409 numRecords: 1, 410 commit: true, 411 batch: 1234, 412 headers: [["x-if-unmodified-since", time]], 413 }, 414 ]); 415 416 deepEqual(stats.batches, [ 417 { 418 posts: 2, 419 payloadBytes: 60, 420 numRecords: 3, 421 didCommit: true, 422 batch: 1234, 423 serverBatch: true, 424 }, 425 ]); 426 427 equal(pq.lastModified, time + 200); 428 }); 429 430 // Test we do the right thing when we need to make multiple posts due to 431 // max_request_bytes when there are batch semantics in place. 432 add_task(async function test_max_request_bytes_batch() { 433 let config = { 434 max_post_bytes: 60, 435 max_post_records: 40, 436 max_total_bytes: 5000, 437 max_total_records: 100, 438 max_record_payload_bytes: 500, 439 max_request_bytes: 100, 440 }; 441 442 const time = 11111111; 443 function* responseGenerator() { 444 yield { 445 success: true, 446 status: 202, 447 obj: { batch: 1234 }, 448 headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 }, 449 }; 450 yield { 451 success: true, 452 status: 202, 453 obj: { batch: 1234 }, 454 headers: { 455 "x-last-modified": time + 200, 456 "x-weave-timestamp": time + 200, 457 }, 458 }; 459 } 460 461 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 462 ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 10, request: 26 (10 + 14 + 2) 463 ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 20, request: 51 (10 + 14 + 1) * 2 + 1 464 ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 30, request: 76 (10 + 14 + 1) * 3 + 1 465 // 1 more would be post: 40 (fine), request: 101, So we should post. 466 ok((await pq.enqueue(makeRecord(10))).enqueued); 467 await pq.flush(true); 468 469 deepEqual(stats.posts, [ 470 { 471 nbytes: requestBytesFor([10, 10, 10]), 472 payloadBytes: 30, 473 numRecords: 3, 474 commit: false, 475 batch: "true", 476 headers: [["x-if-unmodified-since", time]], 477 }, 478 { 479 nbytes: requestBytesFor([10]), 480 payloadBytes: 10, 481 numRecords: 1, 482 commit: true, 483 batch: 1234, 484 headers: [["x-if-unmodified-since", time]], 485 }, 486 ]); 487 488 deepEqual(stats.batches, [ 489 { 490 posts: 2, 491 payloadBytes: 40, 492 numRecords: 4, 493 didCommit: true, 494 batch: 1234, 495 serverBatch: true, 496 }, 497 ]); 498 499 equal(pq.lastModified, time + 200); 500 }); 501 502 // Test we do the right thing when the batch bytes limit is exceeded. 503 add_task(async function test_max_total_bytes_batch() { 504 let config = { 505 max_post_bytes: 50, 506 max_post_records: 20, 507 max_total_bytes: 70, 508 max_total_records: 100, 509 max_record_payload_bytes: 50, 510 max_request_bytes: 500, 511 }; 512 513 const time0 = 11111111; 514 const time1 = 22222222; 515 function* responseGenerator() { 516 yield { 517 success: true, 518 status: 202, 519 obj: { batch: 1234 }, 520 headers: { "x-last-modified": time0, "x-weave-timestamp": time0 + 100 }, 521 }; 522 yield { 523 success: true, 524 status: 202, 525 obj: { batch: 1234 }, 526 headers: { "x-last-modified": time1, "x-weave-timestamp": time1 }, 527 }; 528 yield { 529 success: true, 530 status: 202, 531 obj: { batch: 5678 }, 532 headers: { "x-last-modified": time1, "x-weave-timestamp": time1 + 100 }, 533 }; 534 yield { 535 success: true, 536 status: 202, 537 obj: { batch: 5678 }, 538 headers: { 539 "x-last-modified": time1 + 200, 540 "x-weave-timestamp": time1 + 200, 541 }, 542 }; 543 } 544 545 let { pq, stats } = makePostQueue(config, time0, responseGenerator()); 546 547 ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 20 548 ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 40, batch: 40 549 550 // this will exceed our POST byte limit, so will be in the 2nd POST - but still in the first batch. 551 ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60 552 553 // this will exceed our batch byte limit, so will be in a new batch. 554 ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 20 555 ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 40, batch: 40 556 // This will exceed POST byte limit, so will be in the 4th post, part of the 2nd batch. 557 ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60 558 await pq.flush(true); 559 560 deepEqual(stats.posts, [ 561 { 562 nbytes: requestBytesFor([20, 20]), 563 payloadBytes: 40, 564 numRecords: 2, 565 commit: false, 566 batch: "true", 567 headers: [["x-if-unmodified-since", time0]], 568 }, 569 { 570 nbytes: requestBytesFor([20]), 571 payloadBytes: 20, 572 numRecords: 1, 573 commit: true, 574 batch: 1234, 575 headers: [["x-if-unmodified-since", time0]], 576 }, 577 { 578 nbytes: requestBytesFor([20, 20]), 579 payloadBytes: 40, 580 numRecords: 2, 581 commit: false, 582 batch: "true", 583 headers: [["x-if-unmodified-since", time1]], 584 }, 585 { 586 nbytes: requestBytesFor([20]), 587 payloadBytes: 20, 588 numRecords: 1, 589 commit: true, 590 batch: 5678, 591 headers: [["x-if-unmodified-since", time1]], 592 }, 593 ]); 594 595 deepEqual(stats.batches, [ 596 { 597 posts: 2, 598 payloadBytes: 60, 599 numRecords: 3, 600 didCommit: true, 601 batch: 1234, 602 serverBatch: true, 603 }, 604 { 605 posts: 2, 606 payloadBytes: 60, 607 numRecords: 3, 608 didCommit: true, 609 batch: 5678, 610 serverBatch: true, 611 }, 612 ]); 613 614 equal(pq.lastModified, time1 + 200); 615 }); 616 617 // Test we split up the posts when we exceed the record limit when batch semantics 618 // are in place. 619 add_task(async function test_max_post_records_batch() { 620 let config = { 621 max_post_bytes: 1000, 622 max_post_records: 2, 623 max_total_bytes: 5000, 624 max_total_records: 100, 625 max_record_payload_bytes: 1000, 626 max_request_bytes: 1000, 627 }; 628 629 const time = 11111111; 630 function* responseGenerator() { 631 yield { 632 success: true, 633 status: 202, 634 obj: { batch: 1234 }, 635 headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 }, 636 }; 637 yield { 638 success: true, 639 status: 202, 640 obj: { batch: 1234 }, 641 headers: { 642 "x-last-modified": time + 200, 643 "x-weave-timestamp": time + 200, 644 }, 645 }; 646 } 647 648 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 649 ok((await pq.enqueue(makeRecord(20))).enqueued); 650 ok((await pq.enqueue(makeRecord(20))).enqueued); 651 652 // will exceed record limit of 2, so will be in 2nd post. 653 ok((await pq.enqueue(makeRecord(20))).enqueued); 654 655 await pq.flush(true); 656 657 deepEqual(stats.posts, [ 658 { 659 nbytes: requestBytesFor([20, 20]), 660 numRecords: 2, 661 payloadBytes: 40, 662 commit: false, 663 batch: "true", 664 headers: [["x-if-unmodified-since", time]], 665 }, 666 { 667 nbytes: requestBytesFor([20]), 668 numRecords: 1, 669 payloadBytes: 20, 670 commit: true, 671 batch: 1234, 672 headers: [["x-if-unmodified-since", time]], 673 }, 674 ]); 675 676 deepEqual(stats.batches, [ 677 { 678 posts: 2, 679 payloadBytes: 60, 680 numRecords: 3, 681 batch: 1234, 682 serverBatch: true, 683 didCommit: true, 684 }, 685 ]); 686 687 equal(pq.lastModified, time + 200); 688 }); 689 690 // Test we do the right thing when the batch record limit is exceeded. 691 add_task(async function test_max_records_batch() { 692 let config = { 693 max_post_bytes: 1000, 694 max_post_records: 3, 695 max_total_bytes: 10000, 696 max_total_records: 5, 697 max_record_payload_bytes: 1000, 698 max_request_bytes: 10000, 699 }; 700 701 const time0 = 11111111; 702 const time1 = 22222222; 703 function* responseGenerator() { 704 yield { 705 success: true, 706 status: 202, 707 obj: { batch: 1234 }, 708 headers: { "x-last-modified": time0, "x-weave-timestamp": time0 + 100 }, 709 }; 710 yield { 711 success: true, 712 status: 202, 713 obj: { batch: 1234 }, 714 headers: { "x-last-modified": time1, "x-weave-timestamp": time1 }, 715 }; 716 yield { 717 success: true, 718 status: 202, 719 obj: { batch: 5678 }, 720 headers: { "x-last-modified": time1, "x-weave-timestamp": time1 + 100 }, 721 }; 722 yield { 723 success: true, 724 status: 202, 725 obj: { batch: 5678 }, 726 headers: { 727 "x-last-modified": time1 + 200, 728 "x-weave-timestamp": time1 + 200, 729 }, 730 }; 731 } 732 733 let { pq, stats } = makePostQueue(config, time0, responseGenerator()); 734 735 ok((await pq.enqueue(makeRecord(20))).enqueued); 736 ok((await pq.enqueue(makeRecord(20))).enqueued); 737 ok((await pq.enqueue(makeRecord(20))).enqueued); 738 739 ok((await pq.enqueue(makeRecord(20))).enqueued); 740 ok((await pq.enqueue(makeRecord(20))).enqueued); 741 742 ok((await pq.enqueue(makeRecord(20))).enqueued); 743 ok((await pq.enqueue(makeRecord(20))).enqueued); 744 ok((await pq.enqueue(makeRecord(20))).enqueued); 745 746 ok((await pq.enqueue(makeRecord(20))).enqueued); 747 748 await pq.flush(true); 749 750 deepEqual(stats.posts, [ 751 { 752 // 3 records 753 nbytes: requestBytesFor([20, 20, 20]), 754 payloadBytes: 60, 755 numRecords: 3, 756 commit: false, 757 batch: "true", 758 headers: [["x-if-unmodified-since", time0]], 759 }, 760 { 761 // 2 records -- end batch1 762 nbytes: requestBytesFor([20, 20]), 763 payloadBytes: 40, 764 numRecords: 2, 765 commit: true, 766 batch: 1234, 767 headers: [["x-if-unmodified-since", time0]], 768 }, 769 { 770 // 3 records 771 nbytes: requestBytesFor([20, 20, 20]), 772 payloadBytes: 60, 773 numRecords: 3, 774 commit: false, 775 batch: "true", 776 headers: [["x-if-unmodified-since", time1]], 777 }, 778 { 779 // 1 record -- end batch2 780 nbytes: requestBytesFor([20]), 781 payloadBytes: 20, 782 numRecords: 1, 783 commit: true, 784 batch: 5678, 785 headers: [["x-if-unmodified-since", time1]], 786 }, 787 ]); 788 789 deepEqual(stats.batches, [ 790 { 791 posts: 2, 792 payloadBytes: 100, 793 numRecords: 5, 794 batch: 1234, 795 serverBatch: true, 796 didCommit: true, 797 }, 798 { 799 posts: 2, 800 payloadBytes: 80, 801 numRecords: 4, 802 batch: 5678, 803 serverBatch: true, 804 didCommit: true, 805 }, 806 ]); 807 808 equal(pq.lastModified, time1 + 200); 809 }); 810 811 // Test we do the right thing when the limits are met but not exceeded. 812 add_task(async function test_packed_batch() { 813 let config = { 814 max_post_bytes: 41, 815 max_post_records: 4, 816 817 max_total_bytes: 81, 818 max_total_records: 8, 819 820 max_record_payload_bytes: 20 + makeRecord.nonPayloadOverhead + 1, 821 max_request_bytes: requestBytesFor([10, 10, 10, 10]) + 1, 822 }; 823 824 const time = 11111111; 825 function* responseGenerator() { 826 yield { 827 success: true, 828 status: 202, 829 obj: { batch: 1234 }, 830 headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 }, 831 }; 832 yield { 833 success: true, 834 status: 202, 835 obj: { batch: 1234 }, 836 headers: { 837 "x-last-modified": time + 200, 838 "x-weave-timestamp": time + 200, 839 }, 840 }; 841 } 842 843 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 844 ok((await pq.enqueue(makeRecord(10))).enqueued); 845 ok((await pq.enqueue(makeRecord(10))).enqueued); 846 ok((await pq.enqueue(makeRecord(10))).enqueued); 847 ok((await pq.enqueue(makeRecord(10))).enqueued); 848 849 ok((await pq.enqueue(makeRecord(10))).enqueued); 850 ok((await pq.enqueue(makeRecord(10))).enqueued); 851 ok((await pq.enqueue(makeRecord(10))).enqueued); 852 ok((await pq.enqueue(makeRecord(10))).enqueued); 853 854 await pq.flush(true); 855 856 deepEqual(stats.posts, [ 857 { 858 nbytes: requestBytesFor([10, 10, 10, 10]), 859 numRecords: 4, 860 payloadBytes: 40, 861 commit: false, 862 batch: "true", 863 headers: [["x-if-unmodified-since", time]], 864 }, 865 { 866 nbytes: requestBytesFor([10, 10, 10, 10]), 867 numRecords: 4, 868 payloadBytes: 40, 869 commit: true, 870 batch: 1234, 871 headers: [["x-if-unmodified-since", time]], 872 }, 873 ]); 874 875 deepEqual(stats.batches, [ 876 { 877 posts: 2, 878 payloadBytes: 80, 879 numRecords: 8, 880 batch: 1234, 881 serverBatch: true, 882 didCommit: true, 883 }, 884 ]); 885 886 equal(pq.lastModified, time + 200); 887 }); 888 889 // Tests that check that a single record fails to enqueue for the provided config 890 async function test_enqueue_failure_case(failureLimit, config) { 891 const time = 11111111; 892 function* responseGenerator() { 893 yield { 894 success: true, 895 status: 202, 896 obj: { batch: 1234 }, 897 headers: { 898 "x-last-modified": time + 100, 899 "x-weave-timestamp": time + 100, 900 }, 901 }; 902 } 903 904 let { pq, stats } = makePostQueue(config, time, responseGenerator()); 905 // Check on empty postqueue 906 let result = await pq.enqueue(makeRecord(failureLimit + 1)); 907 ok(!result.enqueued); 908 notEqual(result.error, undefined); 909 910 ok((await pq.enqueue(makeRecord(5))).enqueued); 911 912 // check on nonempty postqueue 913 result = await pq.enqueue(makeRecord(failureLimit + 1)); 914 ok(!result.enqueued); 915 notEqual(result.error, undefined); 916 917 // make sure that we keep working, skipping the bad record entirely 918 // (handling the error the queue reported is left up to caller) 919 ok((await pq.enqueue(makeRecord(5))).enqueued); 920 921 await pq.flush(true); 922 923 deepEqual(stats.posts, [ 924 { 925 nbytes: requestBytesFor([5, 5]), 926 numRecords: 2, 927 payloadBytes: 10, 928 commit: true, 929 batch: "true", 930 headers: [["x-if-unmodified-since", time]], 931 }, 932 ]); 933 934 deepEqual(stats.batches, [ 935 { 936 posts: 1, 937 payloadBytes: 10, 938 numRecords: 2, 939 batch: 1234, 940 serverBatch: true, 941 didCommit: true, 942 }, 943 ]); 944 945 equal(pq.lastModified, time + 100); 946 } 947 948 add_task(async function test_max_post_bytes_enqueue_failure() { 949 await test_enqueue_failure_case(50, { 950 max_post_bytes: 50, 951 max_post_records: 100, 952 953 max_total_bytes: 5000, 954 max_total_records: 100, 955 956 max_record_payload_bytes: 500, 957 max_request_bytes: 500, 958 }); 959 }); 960 961 add_task(async function test_max_request_bytes_enqueue_failure() { 962 await test_enqueue_failure_case(50, { 963 max_post_bytes: 500, 964 max_post_records: 100, 965 966 max_total_bytes: 5000, 967 max_total_records: 100, 968 969 max_record_payload_bytes: 500, 970 max_request_bytes: 50, 971 }); 972 }); 973 974 add_task(async function test_max_record_payload_bytes_enqueue_failure() { 975 await test_enqueue_failure_case(50, { 976 max_post_bytes: 500, 977 max_post_records: 100, 978 979 max_total_bytes: 5000, 980 max_total_records: 100, 981 982 max_record_payload_bytes: 50, 983 max_request_bytes: 500, 984 }); 985 });