1 module nats.parser; 2 3 import std.exception: assumeUnique; 4 import std.string: assumeUTF, representation; 5 6 import nats.interface_; 7 8 /* Nats protocol parsing implementation */ 9 10 package: 11 12 enum MSG = "MSG".representation; 13 enum PING = "PING".representation; 14 enum PONG = "PONG".representation; 15 enum OK = "+OK".representation; 16 enum INFO = "INFO".representation; 17 enum ERR = "-ERR".representation; 18 enum CRLF = "\r\n".representation; 19 enum SPACE = " ".representation; 20 enum TAB = "\t".representation; 21 22 23 size_t parseNats(scope const(ubyte)[] response, out Msg msg) @safe 24 { 25 import std.algorithm.comparison: equal; 26 import std.algorithm.searching: findSplitAfter, startsWith; 27 import std.algorithm.iteration: splitter; 28 import std.ascii: isDigit; 29 import std.conv: to; 30 31 size_t consumed = 0; 32 33 auto fragments = response.findSplitAfter(CRLF); 34 if (!fragments) 35 { 36 // we don't have a full NATS protocol line, only a fragment 37 msg.type = NatsResponse.FRAGMENT; 38 return consumed; 39 } 40 auto protocolLine = fragments[0]; 41 if (protocolLine.length == 2) 42 { 43 // drop a line consisting only of leading CRLF 44 msg.type = NatsResponse.FRAGMENT; 45 return consumed + 2; 46 } 47 auto remaining = fragments[1]; 48 consumed = response.length - remaining.length; 49 50 if (protocolLine.startsWith(MSG)) 51 { 52 auto tokens = protocolLine[4..$].assumeUTF.splitter; 53 msg.subject = tokens.front; 54 tokens.popFront(); 55 msg.sid = tokens.front.to!uint; 56 tokens.popFront(); 57 if (!tokens.front[0].isDigit) 58 { 59 msg.type = NatsResponse.MSG_REPLY; 60 msg.replySubject = tokens.front; 61 tokens.popFront(); 62 } 63 else 64 { 65 msg.type = NatsResponse.MSG; 66 } 67 msg.length = tokens.front.to!uint; 68 } 69 else if (protocolLine.startsWith(PONG)) 70 { 71 msg.type = NatsResponse.PONG; 72 } 73 else if (protocolLine.startsWith(PING)) 74 { 75 msg.type = NatsResponse.PING; 76 } 77 else if (protocolLine.startsWith(OK)) 78 { 79 msg.type = NatsResponse.OK; 80 } 81 else if (protocolLine.startsWith(INFO)) 82 { 83 msg.type = NatsResponse.INFO; 84 msg.payload = protocolLine[5..$-2]; 85 } 86 else if (protocolLine.startsWith(ERR)) 87 { 88 msg.type = NatsResponse.ERR; 89 msg.payload = protocolLine[5..$-2]; 90 } 91 else 92 { 93 version(Have_vibe_core) 94 { 95 import vibe.core.log; 96 logDebug("protocolLine: %s", protocolLine); 97 } 98 throw new NatsProtocolException("Expected start of a NATS response token."); 99 } 100 return consumed; 101 } 102 103 104 size_t parseNatsNew(scope const(ubyte)[] response, out Msg msg) @trusted 105 { 106 import std.algorithm.searching: find, findSplitBefore; 107 import std.conv: to; 108 109 string[5] token; 110 bool wholeLine; 111 uint tokenCount; 112 ulong tokenLength; 113 114 size_t consumed = 0; 115 auto remaining = response; 116 loop: do 117 { 118 auto tokenSplitter = remaining.find(' ',CRLF); 119 if (!tokenSplitter[1]) 120 { 121 msg.type = NatsResponse.FRAGMENT; 122 return consumed; 123 } 124 tokenLength = remaining.length - tokenSplitter[0].length; 125 token[tokenCount] = remaining[0..tokenLength].assumeUTF.assumeUnique; 126 remaining = tokenSplitter[0][tokenSplitter[1]..$]; 127 tokenCount++; 128 if (tokenSplitter[1] == 2) 129 { 130 wholeLine = true; 131 } 132 final switch (msg.type) 133 { 134 case NatsResponse.FRAGMENT: 135 switch (token[0]) 136 { 137 case "MSG": 138 msg.type = NatsResponse.MSG; 139 break; 140 case "PONG": 141 msg.type = NatsResponse.PONG; 142 break; 143 case "PING": 144 msg.type = NatsResponse.PING; 145 break; 146 case "+OK": 147 msg.type = NatsResponse.OK; 148 break; 149 case "INFO": 150 msg.type = NatsResponse.INFO; 151 break; 152 case "-ERR": 153 msg.type = NatsResponse.ERR; 154 break; 155 default: 156 throw new NatsProtocolException("Expected a NATS response token."); 157 } 158 continue loop; 159 case NatsResponse.INFO: 160 case NatsResponse.ERR: 161 auto payload = response.findSplitBefore(CRLF); 162 if (payload) 163 { 164 msg.payload = payload[0][5..$]; 165 } 166 else 167 msg.type = NatsResponse.FRAGMENT; 168 break loop; 169 case NatsResponse.MSG: 170 case NatsResponse.MSG_REPLY: 171 continue loop; 172 case NatsResponse.PONG: 173 case NatsResponse.PING: 174 case NatsResponse.OK: 175 break loop; 176 } 177 } while (!wholeLine && tokenCount < 5 && msg.type != NatsResponse.FRAGMENT); 178 179 if (msg.type == NatsResponse.MSG) 180 { 181 msg.subject = token[1]; 182 msg.sid = token[2].to!uint; 183 if (tokenCount == 4) 184 msg.length = token[3].to!uint; 185 else 186 { 187 msg.type = NatsResponse.MSG_REPLY; 188 msg.replySubject = token[3]; 189 msg.length = token[4].to!uint; 190 } 191 if (msg.length + 2 <= remaining.length) 192 { 193 msg.payload = remaining[0..msg.length]; 194 consumed += msg.length + 2; 195 } 196 } 197 return consumed; 198 } 199 200 201 void processMsgArgs(scope const(ubyte)[] args, out Msg msg) @trusted 202 { 203 MsgField field; 204 ubyte b; 205 size_t start; 206 207 import std.ascii: isAlpha; 208 209 for (size_t i; i < args.length; i++) { 210 b = args[i]; 211 final switch (field) { 212 case MsgField.SUBJECT: 213 switch (b) { 214 case ' ': 215 msg.subject = assumeUTF(args[0..i]); 216 msg.type = NatsResponse.MSG; 217 field = MsgField.SID; 218 continue; 219 default: 220 continue; 221 } 222 case MsgField.SID: 223 switch (b) { 224 case '0': .. case '9': 225 msg.sid *= 10; 226 msg.sid += (b - 48); 227 continue; 228 default: 229 start = i+1; 230 field = isAlpha(cast(char)args[start]) ? MsgField.REPLY : MsgField.LENGTH; 231 continue; 232 } 233 case MsgField.REPLY: 234 switch (b) { 235 case ' ': 236 msg.replySubject = assumeUTF(args[start..i]); 237 msg.type = NatsResponse.MSG_REPLY; 238 field = MsgField.LENGTH; 239 continue; 240 default: 241 continue; 242 } 243 case MsgField.LENGTH: 244 switch (b) { 245 case '0': .. case '9': 246 msg.length *= 10; 247 msg.length += (b - 48); 248 continue; 249 default: 250 break; 251 } 252 } 253 } 254 } 255 256 257 258 size_t parse(scope const(ubyte)[] response, out Msg msg) @trusted 259 { 260 CmdState cmd; 261 ubyte b; 262 uint start; 263 uint drop; 264 265 msgloop: 266 for (uint i; i < response.length; i++) 267 { 268 b = response[i]; 269 final switch (cmd) 270 { 271 case CmdState.OP_START: 272 switch (b) { 273 case 'M': 274 case 'm': 275 cmd = CmdState.OP_M; 276 continue; 277 case 'P': 278 case 'p': 279 cmd = CmdState.OP_P; 280 continue; 281 case '+': 282 cmd = CmdState.OP_PLUS; 283 continue; 284 case '-': 285 cmd = CmdState.OP_MINUS; 286 continue; 287 case 'I': 288 case 'i': 289 cmd = CmdState.OP_I; 290 continue; 291 default: 292 throw new NatsProtocolException("Expected start of a NATS response token."); 293 } 294 case CmdState.OP_M: 295 switch (b) { 296 case 'S': 297 case 's': 298 cmd = CmdState.OP_MS; 299 continue; 300 default: 301 throw new NatsProtocolException("Was expecting MSG token."); 302 } 303 case CmdState.OP_MS: 304 switch (b) { 305 case 'G': 306 case 'g': 307 cmd = CmdState.OP_MSG; 308 continue; 309 default: 310 throw new NatsProtocolException("Was expecting MSG token."); 311 } 312 case CmdState.OP_MSG: 313 switch (b) { 314 case ' ': 315 case '\t': 316 cmd = CmdState.OP_MSG_SPC; 317 continue; 318 default: 319 throw new NatsProtocolException("Was expecting whitespace after MSG token."); 320 } 321 case CmdState.OP_MSG_SPC: 322 switch (b) { 323 case ' ': 324 case '\t': 325 continue; 326 default: 327 cmd = CmdState.MSG_ARG; 328 start = i; 329 continue; 330 } 331 case CmdState.MSG_ARG: 332 switch (b) { 333 case '\r': 334 drop = 1; 335 continue; 336 case '\n': 337 processMsgArgs(response[start..i-drop], msg); 338 start = i+1; 339 drop = 0; 340 cmd = CmdState.MSG_PAYLOAD; 341 continue; 342 default: 343 continue; 344 } 345 case CmdState.MSG_PAYLOAD: 346 break msgloop; 347 348 case CmdState.MSG_END: 349 switch (b) { 350 case '\n': 351 start = i+1; 352 drop = 0; 353 cmd = CmdState.OP_START; 354 break msgloop; 355 default: 356 continue; 357 } 358 case CmdState.OP_PLUS: 359 switch (b) { 360 case 'O': 361 case 'o': 362 cmd = CmdState.OP_PLUS_O; 363 continue; 364 default: 365 throw new NatsProtocolException("Was expecting +OK token."); 366 } 367 case CmdState.OP_PLUS_O: 368 switch (b) { 369 case 'K': 370 case 'k': 371 cmd = CmdState.OP_PLUS_OK; 372 continue; 373 default: 374 throw new NatsProtocolException("Was expecting +OK token."); 375 } 376 case CmdState.OP_PLUS_OK: 377 switch (b) { 378 case '\r': 379 continue; 380 case '\n': 381 msg.type = NatsResponse.OK; 382 start = i+1; 383 drop = 0; 384 cmd = CmdState.OP_START; 385 break msgloop; 386 default: 387 throw new NatsProtocolException("Error after +OK token."); 388 } 389 case CmdState.OP_P: 390 switch (b) { 391 case 'I': 392 case 'i': 393 cmd = CmdState.OP_PI; 394 continue; 395 case 'O': 396 case 'o': 397 cmd = CmdState.OP_PO; 398 continue; 399 default: 400 throw new NatsProtocolException("Was expecting PING or PONG token."); 401 } 402 case CmdState.OP_PO: 403 switch (b) { 404 case 'N': 405 case 'n': 406 cmd = CmdState.OP_PON; 407 continue; 408 default: 409 throw new NatsProtocolException("Was expecting PONG token."); 410 } 411 case CmdState.OP_PON: 412 switch (b) { 413 case 'G': 414 case 'g': 415 cmd = CmdState.OP_PONG; 416 continue; 417 default: 418 throw new NatsProtocolException("Was expecting PONG token."); 419 } 420 case CmdState.OP_PONG: 421 switch (b) { 422 case '\r': 423 continue; 424 case '\n': 425 msg.type = NatsResponse.PONG; 426 start = i+1; 427 drop = 0; 428 cmd = CmdState.OP_START; 429 break msgloop; 430 default: 431 throw new NatsProtocolException("Error after PONG token."); 432 } 433 case CmdState.OP_PI: 434 switch (b) { 435 case 'N': 436 case 'n': 437 cmd = CmdState.OP_PIN; 438 continue; 439 default: 440 throw new NatsProtocolException("Was expecting PING token."); 441 } 442 case CmdState.OP_PIN: 443 switch (b) { 444 case 'G': 445 case 'g': 446 cmd = CmdState.OP_PING; 447 continue; 448 default: 449 throw new NatsProtocolException("Was expecting PING token."); 450 } 451 case CmdState.OP_PING: 452 switch (b) { 453 case '\r': 454 continue; 455 case '\n': 456 msg.type = NatsResponse.PING; 457 start = i+1; 458 drop = 0; 459 cmd = CmdState.OP_START; 460 break msgloop; 461 default: 462 throw new NatsProtocolException("Error after PING token."); 463 } 464 case CmdState.OP_MINUS: 465 switch (b) { 466 case 'E': 467 case 'e': 468 cmd = CmdState.OP_MINUS_E; 469 continue; 470 default: 471 throw new NatsProtocolException("Was expecting -ERR token."); 472 } 473 case CmdState.OP_MINUS_E: 474 switch (b) { 475 case 'R': 476 case 'r': 477 cmd = CmdState.OP_MINUS_ER; 478 continue; 479 default: 480 throw new NatsProtocolException("Was expecting -ERR token."); 481 } 482 case CmdState.OP_MINUS_ER: 483 switch (b) { 484 case 'R': 485 case 'r': 486 cmd = CmdState.OP_MINUS_ERR; 487 continue; 488 default: 489 throw new NatsProtocolException("Was expecting -ERR token."); 490 } 491 case CmdState.OP_MINUS_ERR: 492 switch (b) { 493 case ' ': 494 case '\t': 495 cmd = CmdState.OP_MINUS_ERR_SPC; 496 continue; 497 default: 498 throw new NatsProtocolException("Was expecting -ERR token."); 499 } 500 case CmdState.OP_MINUS_ERR_SPC: 501 switch (b) { 502 case ' ': 503 case '\t': 504 continue; 505 default: 506 cmd = CmdState.MINUS_ERR_ARG; 507 start = i; 508 continue; 509 } 510 case CmdState.MINUS_ERR_ARG: 511 switch (b) { 512 case '\r': 513 drop = 1; 514 continue; 515 case '\n': 516 msg.type = NatsResponse.ERR; 517 msg.payload = response[start..i-drop]; 518 // processMinusErrArgs returns the number of bytes it reads from buffer 519 //i += processMinusErrArgs(_buffer[start..i-drop]); 520 start = i+1; 521 drop = 0; 522 cmd = CmdState.OP_START; 523 break msgloop; 524 default: 525 continue; 526 } 527 case CmdState.OP_I: 528 switch (b) { 529 case 'N': 530 case 'n': 531 cmd = CmdState.OP_IN; 532 continue; 533 default: 534 throw new NatsProtocolException("Was expecting INFO token."); 535 } 536 case CmdState.OP_IN: 537 switch (b) { 538 case 'F': 539 case 'f': 540 cmd = CmdState.OP_INF; 541 continue; 542 default: 543 throw new NatsProtocolException("Was expecting INFO token."); 544 } 545 case CmdState.OP_INF: 546 switch (b) { 547 case 'O': 548 case 'o': 549 cmd = CmdState.OP_INFO; 550 continue; 551 default: 552 throw new NatsProtocolException("Was expecting INFO token."); 553 } 554 case CmdState.OP_INFO: 555 switch (b) { 556 case ' ': 557 case '\t': 558 cmd = CmdState.OP_INFO_SPC; 559 continue; 560 default: 561 throw new NatsProtocolException("Was expecting INFO token."); 562 } 563 case CmdState.OP_INFO_SPC: 564 switch (b) { 565 case ' ': 566 case '\t': 567 continue; 568 default: 569 cmd = CmdState.INFO_ARG; 570 start = i; 571 continue; 572 } 573 case CmdState.INFO_ARG: 574 switch (b) { 575 case '\r': 576 drop = 1; 577 continue; 578 case '\n': 579 msg.type = NatsResponse.INFO; 580 msg.payload = response[start..i-drop]; 581 start = i+1; 582 drop = 0; 583 cmd = CmdState.OP_START; 584 break msgloop; 585 default: 586 continue; 587 } 588 } 589 } 590 if (cmd != CmdState.OP_START) 591 { 592 // anything else means we have ended on a fragmented Nats command 593 msg.type = NatsResponse.FRAGMENT; 594 } 595 return response.length - start; 596 } 597 598 unittest { 599 enum test_msg = "MSG notices 1 12\r\nHello world!\r\n".representation; 600 enum test_msg_w_reply = "MSG notices 12 reply 29\r\nHello world - please respond!\r\n".representation; 601 enum two_messages = test_msg ~ test_msg_w_reply; 602 enum info = `INFO {"server_id":"p5YHW98yUXPd3BTRHoBNAE","version":"1.4.1","proto":1,`.representation 603 ~ `"go":"go1.11.5","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":12}`.representation 604 ~ "\r\n".representation; 605 606 void idiomatic_d() 607 { 608 Msg msg1, msg2; 609 size_t consumed; 610 611 consumed = parseNats(test_msg, msg1); 612 assert(consumed == 18); 613 assert(msg1.type == NatsResponse.MSG); 614 assert(msg1.subject == "notices"); 615 assert(msg1.sid == 1); 616 assert(test_msg[consumed..$] == "Hello world!\r\n"); 617 618 consumed = parseNats(test_msg_w_reply, msg2); 619 assert(consumed == 25); 620 assert(msg2.type == NatsResponse.MSG_REPLY); 621 assert(msg2.subject == "notices"); 622 assert(msg2.replySubject == "reply"); 623 assert(msg2.sid == 12); 624 assert(test_msg_w_reply[consumed..$] == "Hello world - please respond!\r\n"); 625 626 msg1 = Msg.init; 627 msg2 = Msg.init; 628 consumed = parseNats(two_messages, msg1); 629 assert(msg1.subject == "notices"); 630 assert(msg1.length == 12); 631 consumed += msg1.length; 632 consumed += parseNats(two_messages[consumed .. $], msg2); 633 assert(msg2.type == NatsResponse.FRAGMENT); 634 msg2 = Msg.init; 635 consumed += parseNats(two_messages[consumed .. $], msg2); 636 assert(msg2.type == NatsResponse.MSG_REPLY); 637 assert(msg2.replySubject == "reply"); 638 assert(msg2.length == 29); 639 640 msg1 = Msg.init; 641 consumed = parseNats(test_msg[0..17], msg1); 642 assert(msg1.type == NatsResponse.FRAGMENT); 643 assert(consumed == 0); 644 645 msg1 = Msg.init; 646 consumed = parseNats(info[0..65], msg1); 647 assert(msg1.type == NatsResponse.FRAGMENT); 648 assert(consumed == 0); 649 650 msg1 = Msg.init; 651 consumed = parseNats(info, msg1); 652 assert(msg1.type == NatsResponse.INFO); 653 assert(msg1.payloadAsString == 654 `{"server_id":"p5YHW98yUXPd3BTRHoBNAE","version":"1.4.1","proto":1,` 655 ~ `"go":"go1.11.5","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":12}`); 656 } 657 idiomatic_d(); 658 659 // void idiomatic_d_new() 660 // { 661 // Msg msg1, msg2; 662 663 // msg1 = parseNatsNew(test_msg); 664 // assert(msg1.type == NatsResponse.MSG); 665 // assert(msg1.subject == "notices"); 666 // assert(msg1.payload == "Hello world!"); 667 // assert(msg1.sid == 1); 668 669 // msg2 = parseNatsNew(test_msg_w_reply); 670 // assert(msg2.type == NatsResponse.MSG_REPLY); 671 // assert(msg2.subject == "notices"); 672 // assert(msg2.payload == "Hello world - please respond!"); 673 // assert(msg2.replySubject == "reply"); 674 // assert(msg2.sid == 12); 675 676 // msg1 = parseNatsNew(two_messages); 677 // assert(msg1.payload == "Hello world!"); 678 // assert(msg1.sid == 1); 679 680 // ubyte[] remaining = two_messages[msg1.consumed..$]; 681 // msg2 = parseNatsNew(remaining); 682 // assert(msg2.payload == "Hello world - please respond!"); 683 // assert(msg2.sid == 12); 684 685 // msg1 = parseNatsNew(test_msg[0..12]); 686 // assert(msg1.type == NatsResponse.FRAGMENT); 687 // assert(msg1.consumed == 0); 688 689 // msg1 = parseNatsNew(info[0..65]); 690 // assert(msg1.type == NatsResponse.FRAGMENT); 691 // assert(msg1.consumed == 0); 692 693 // msg1 = parseNatsNew(info); 694 // assert(msg1.type == NatsResponse.INFO); 695 // assert(msg1.payloadAsString == 696 // `{"server_id":"p5YHW98yUXPd3BTRHoBNAE","version":"1.4.1","proto":1,` 697 // ~ `"go":"go1.11.5","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":12}`); 698 // } 699 700 // void go_port() 701 // { 702 // Msg msg1, msg2; 703 704 // msg1 = parse(test_msg); 705 // assert(msg1.type == NatsResponse.MSG); 706 // assert(msg1.subject == "notices"); 707 // assert(msg1.payload == "Hello world!"); 708 // assert(msg1.sid == 1); 709 710 // msg2 = parse(test_msg_w_reply); 711 // assert(msg2.type == NatsResponse.MSG_REPLY); 712 // assert(msg2.subject == "notices"); 713 // assert(msg2.payload == "Hello world - please respond!"); 714 // assert(msg2.replySubject == "reply"); 715 // assert(msg2.sid == 12); 716 717 // msg1 = parse(two_messages); 718 // assert(msg1.payload == "Hello world!"); 719 // assert(msg1.sid == 1); 720 721 // ubyte[] remaining = two_messages[msg1.consumed..$]; 722 // msg2 = parse(remaining); 723 // assert(msg2.payload == "Hello world - please respond!"); 724 // assert(msg2.sid == 12); 725 726 // msg1 = parse(test_msg[0..12]); 727 // assert(msg1.type == NatsResponse.FRAGMENT); 728 // assert(msg1.consumed == 0); 729 730 // msg1 = parse(info[0..65]); 731 // assert(msg1.type == NatsResponse.FRAGMENT); 732 // assert(msg1.consumed == 0); 733 734 // msg1 = parse(info); 735 // assert(msg1.type == NatsResponse.INFO); 736 // assert(msg1.payloadAsString == 737 // `{"server_id":"p5YHW98yUXPd3BTRHoBNAE","version":"1.4.1","proto":1,` 738 // ~ `"go":"go1.11.5","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":12}`); 739 // } 740 741 import std.datetime.stopwatch: benchmark; 742 import std.stdio: writeln; 743 744 // auto results = benchmark!(idiomatic_d, idiomatic_d_new, go_port)(1_000); 745 auto results = benchmark!(idiomatic_d)(1_000); 746 747 writeln("7000x idiomatic_d parser: ", results[0]); 748 // writeln("7000x idiomatic_d_new parser: ", results[1]); 749 // writeln("7000x go_natsparser_port: ", results[2]); 750 751 } 752 753 754 enum CmdState { 755 OP_START, 756 OP_PLUS, 757 OP_PLUS_O, 758 OP_PLUS_OK, 759 OP_MINUS, 760 OP_MINUS_E, 761 OP_MINUS_ER, 762 OP_MINUS_ERR, 763 OP_MINUS_ERR_SPC, 764 MINUS_ERR_ARG, 765 OP_M, 766 OP_MS, 767 OP_MSG, 768 OP_MSG_SPC, 769 MSG_ARG, 770 MSG_PAYLOAD, 771 MSG_END, 772 OP_P, 773 OP_PI, 774 OP_PIN, 775 OP_PING, 776 OP_PO, 777 OP_PON, 778 OP_PONG, 779 OP_I, 780 OP_IN, 781 OP_INF, 782 OP_INFO, 783 OP_INFO_SPC, 784 INFO_ARG 785 } 786 787 enum MsgField { 788 SUBJECT, 789 SID, 790 REPLY, 791 LENGTH 792 }