Changeset 35138
- Timestamp:
- 08/31/09 09:17:59 (4 years ago)
- Location:
- platform/mysql/mycached/trunk
- Files:
-
- 3 modified
-
mycached.cc (modified) (17 diffs)
-
mycached.hh (modified) (7 diffs)
-
testclient.cc (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
platform/mysql/mycached/trunk/mycached.cc
r35136 r35138 77 77 78 78 inline void 79 escape_n_append(string& json, const char ch)79 json_escape_n_append(conn_t& conn, const char ch) 80 80 { 81 81 switch (ch) { 82 #define MAP(val, sym) case val: json += sym; break82 #define MAP(val, sym) case val: conn.res_push(sym, 2); break 83 83 MAP('"', "\\\""); 84 84 MAP('\\', "\\\\"); … … 92 92 default: 93 93 if ((unsigned char)ch < 0x20 || ch == 0x7f) { 94 json += "\\u00";95 json.push_back("0123456789abcdef"[(unsigned char)ch >> 4]);96 json.push_back("0123456789abcdef"[ch & 0xf]);94 conn.res_push("\\u00", 3); 95 conn.res_push("0123456789abcdef"[(unsigned char)ch >> 4]); 96 conn.res_push("0123456789abcdef"[ch & 0xf]); 97 97 } else { 98 json.push_back(ch);98 conn.res_push(ch); 99 99 } 100 100 break; … … 103 103 104 104 static void 105 escape_n_append(string& json, const char* first, const char* last)106 { 107 json.push_back('"');105 json_escape_n_append(conn_t& conn, const char* first, const char* last) 106 { 107 conn.res_push('"'); 108 108 for (const char* p = first; p != last; ++p) { 109 escape_n_append(json, *p);110 } 111 json.push_back('"');109 json_escape_n_append(conn, *p); 110 } 111 conn.res_push('"'); 112 112 } 113 113 114 114 static void 115 escape_n_append(string& json, const char* s)116 { 117 json.push_back('"');115 json_escape_n_append(conn_t& conn, const char* s) 116 { 117 conn.res_push('"'); 118 118 for (; *s != '\0'; ++s) { 119 escape_n_append(json, *s);120 } 121 json.push_back('"');119 json_escape_n_append(conn, *s); 120 } 121 conn.res_push('"'); 122 122 } 123 123 … … 190 190 191 191 void 192 fetcher_t::fetch( string& response)192 fetcher_t::fetch(conn_t& conn) 193 193 { 194 194 // init indexes … … 221 221 HA_READ_KEY_EXACT) 222 222 == 0) { 223 (*ki->response_builder)(*ki, table, response); 223 // prepare header with dummy size 224 conn.res_push("VALUE ", 6); 225 range_string entire_key(ki->entire_key()); 226 conn.res_push(entire_key.first_, entire_key.last_ - entire_key.first_); 227 conn.res_push(" 0 4292967296\r\n", sizeof(" 0 4292967296\r\n") - 1); 228 // build body 229 size_t body_from = conn.res_.size_; 230 (*ki->response_builder)(*ki, table, conn); 231 // rewrite size 232 sprintf(conn.res_.buf_ + body_from - 12, "%10u", 233 (unsigned)(conn.res_.size_ - body_from)); 234 conn.res_.buf_[body_from - 2] = '\r'; 235 // add cr-lf 236 conn.res_push("\r\n", 2); 224 237 } 225 238 } … … 247 260 248 261 void 249 fetcher_t::build_nlv_response(const key_t& key, TABLE* table, string& response)262 fetcher_t::build_nlv_response(const key_t& key, TABLE* table, conn_t& conn) 250 263 { 251 264 char buf[769], szbuf[16]; 252 265 253 // build name-length-value list254 string nlv;255 266 for (Field** field = table->field; *field != NULL; ++field) { 256 nlv += (*field)->field_name;267 conn.res_push((*field)->field_name, strlen((*field)->field_name)); 257 268 if ((*field)->is_null()) { 258 nlv += ":null:";269 conn.res_push(":null:", 6); 259 270 } else { 260 271 String tmp(buf, sizeof(buf), &my_charset_bin); … … 263 274 } 264 275 sprintf(szbuf, ":%u:", tmp.length()); 265 nlv += szbuf; 266 nlv.insert(nlv.end(), tmp.ptr(), (const char*)tmp.ptr() + tmp.length()); 267 } 268 } 269 // build response line 270 response += "VALUE "; 271 range_string entire_key(key.entire_key()); 272 response.insert(response.end(), entire_key.first_, entire_key.last_); 273 sprintf(buf, " 0 %u\r\n", (unsigned)nlv.size()); 274 response += buf; 275 // append json to response line 276 response += nlv; 277 response += "\r\n"; 278 } 279 280 void 281 fetcher_t::build_json_response(const key_t& key, TABLE* table, string& response) 276 conn.res_push(szbuf, strlen(szbuf)); 277 conn.res_push(tmp.ptr(), tmp.length()); 278 } 279 } 280 } 281 282 void 283 fetcher_t::build_json_response(const key_t& key, TABLE* table, conn_t& conn) 282 284 { 283 285 char buf[769]; 284 286 285 287 // build json 286 string json("{");288 conn.res_push('{'); 287 289 for (Field** field = table->field; *field != NULL; ++field) { 288 escape_n_append(json, (*field)->field_name);290 json_escape_n_append(conn, (*field)->field_name); 289 291 if ((*field)->is_null()) { 290 json += ":null";292 conn.res_push(":null", 5); 291 293 } else { 292 json.push_back(':');294 conn.res_push(':'); 293 295 String tmp(buf, sizeof(buf), &my_charset_utf8_bin); 294 296 if (! (*field)->val_str(&tmp)) { … … 299 301 case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_FLOAT: 300 302 case MYSQL_TYPE_DOUBLE: 301 json.insert(json.end(), (const char*)tmp.ptr(), 302 (const char*)tmp.ptr() + tmp.length()); 303 conn.res_push((const char*)tmp.ptr(), tmp.length()); 303 304 break; 304 305 default: 305 escape_n_append(json, (const char*)tmp.ptr(),306 (const char*)tmp.ptr() + tmp.length());306 json_escape_n_append(conn, (const char*)tmp.ptr(), 307 (const char*)tmp.ptr() + tmp.length()); 307 308 break; 308 309 } 309 310 } 310 json.push_back(','); 311 } 312 json.end()[-1] = '}'; 313 // build response line 314 response += "VALUE "; 315 range_string entire_key(key.entire_key()); 316 response.insert(response.end(), entire_key.first_, entire_key.last_); 317 sprintf(buf, " 0 %u\r\n", (unsigned)json.size()); 318 response += buf; 319 // append json to response line 320 response += json; 321 response += "\r\n"; 311 conn.res_push(','); 312 } 313 conn.res_.buf_[conn.res_.size_ - 1] = '}'; 322 314 } 323 315 … … 639 631 EXPECT('\n'); 640 632 goto PARSE_COMPLETE; 641 } else if (*req == '\ r') {633 } else if (*req == '\n') { 642 634 ++req; 643 635 goto PARSE_COMPLETE; … … 663 655 if (*req == ':') { 664 656 ++req; 657 CHECK_EOF(); 658 switch (*req) { 659 case 'j': 660 ++req; 661 EXPECT('s'); 662 EXPECT('o'); 663 EXPECT('n'); 664 keys.back().response_builder = &fetcher_t::build_json_response; 665 break; 665 666 #if MSGPACK 666 if (*req == 'm') {667 case 'm': 667 668 ++req; 668 669 EXPECT('s'); … … 673 674 EXPECT('k'); 674 675 keys.back().response_builder = &fetcher_t::build_msgpack_response; 675 } else { 676 break; 676 677 #endif 677 EXPECT('j'); 678 EXPECT('s'); 679 EXPECT('o'); 680 EXPECT('n'); 681 keys.back().response_builder = &fetcher_t::build_json_response; 682 #if MSGPACK 683 } 684 #endif 678 default: 679 return 0; 680 } 685 681 } else { 686 682 keys.back().response_builder = &fetcher_t::build_nlv_response; … … 708 704 conn_t::build_send_response(const vector<key_t>& keys) 709 705 { 710 assert(res_buf_.empty()); 706 assert(res_.buf_ == NULL); 707 708 res_prepare(); 711 709 712 710 if (! keys.empty()) { 713 711 // fetch data from db 714 fetcher_t(thd_->thd_, keys).fetch(res_buf_); 715 } 716 res_buf_ += "END\r\n"; 717 res_off_ = 0; 712 fetcher_t(thd_->thd_, keys).fetch(*this); 713 } 714 res_push("END\r\n", 5); 718 715 719 716 // try to send 720 ssize_t r = write(sock_, res_ buf_.c_str(), res_buf_.size());717 ssize_t r = write(sock_, res_.buf_, res_.size_); 721 718 if (IO_IS_FATAL(r)) { 722 719 return false; 723 720 } else if (r > 0) { 724 if ((size_t)r == res_ buf_.size()) {725 res_ buf_.clear();721 if ((size_t)r == res_.size_) { 722 res_clear(); 726 723 } else { 727 res_ off_ = r;724 res_.off_ = r; 728 725 } 729 726 } … … 756 753 } 757 754 req_off_ -= req_size; 758 if ( ! res_buf_.empty()) {755 if (res_.buf_ != NULL) { 759 756 return true; 760 757 } … … 768 765 picoev_set_timeout(loop, sock_, TIMEOUT_SECS); 769 766 770 if ( ! res_buf_.empty()) {767 if (res_.buf_ != NULL) { 771 768 // write response 772 ssize_t r 773 = write(sock_, res_buf_.c_str() + res_off_, res_buf_.size() - res_off_); 769 ssize_t r = write(sock_, res_.buf_ + res_.off_, res_.size_ - res_.off_); 774 770 if (r <= 0) { 775 771 if (IO_IS_FATAL(r)) { … … 778 774 return; 779 775 } 780 res_ off_ += r;781 if (res_ off_ != res_buf_.size()) {776 res_.off_ += r; 777 if (res_.off_ != res_.size_) { 782 778 return; 783 779 } 784 res_buf_.clear(); 785 res_off_ = 0; 780 res_clear(); 786 781 // flows down 787 782 } … … 792 787 goto CLOSE_CONN; 793 788 } 794 if (res_ buf_.empty()) {789 if (res_.buf_ == NULL) { 795 790 if (req_buf_ == NULL) { 796 791 if ((req_buf_ = thd_->steal_buf()) == NULL) { … … 827 822 828 823 // update state and return 829 if (req_buf_ != NULL && req_off_ == 0 && res_buf_.empty()) {824 if (req_buf_ != NULL && req_off_ == 0) { 830 825 thd_->return_buf(req_buf_); 831 826 req_buf_ = NULL; 832 827 } 833 picoev_set_events(loop, sock_, res_buf_.empty() ? PICOEV_READ : PICOEV_WRITE); 828 picoev_set_events(loop, sock_, 829 res_.buf_ == NULL ? PICOEV_READ : PICOEV_WRITE); 834 830 return; 835 831 -
platform/mysql/mycached/trunk/mycached.hh
r35134 r35138 48 48 error_t(const std::string& what) : std::domain_error(what) {} 49 49 }; 50 51 struct packet_too_long_t : public error_t { 52 packet_too_long_t() : error_t("packet too long") {} 53 }; 54 55 class conn_t; 50 56 51 57 struct range_string { … … 79 85 } 80 86 }; 81 87 82 88 inline 83 89 bool … … 116 122 uchar key_value_[8]; 117 123 void (*response_builder)(const key_t& key, TABLE* table, 118 std::string& response);124 conn_t& conn); 119 125 range_string entire_key() const { 120 126 return range_string(db_.first_, key_str_.last_); … … 134 140 fetcher_t(THD* thd, const std::vector<key_t>& keys); 135 141 ~fetcher_t(); 136 void fetch( std::string& response);142 void fetch(conn_t& conn); 137 143 void register_table(const range_string& db, const range_string& table); 138 144 TABLE_LIST* get_table(const range_string& db, const range_string& table, … … 152 158 } 153 159 static void build_nlv_response(const key_t& key, TABLE* table, 154 std::string& response);160 conn_t& conn); 155 161 static void build_json_response(const key_t& key, TABLE* table, 156 std::string& response);162 conn_t& conn); 157 163 #ifdef MSGPACK 158 164 static void build_msgpack_response(const key_t& key, TABLE* table, 159 std::string& response);165 conn_t& conn); 160 166 #endif 161 167 }; … … 186 192 thread_t* thd_; 187 193 int sock_; 188 size_t res_off_;189 std::string res_buf_;190 194 size_t req_off_; 191 195 char* req_buf_; 196 struct { 197 char* buf_; 198 size_t off_, size_; 199 } res_; 192 200 conn_t(thread_t* thd, int sock) 193 : thd_(thd), sock_(sock), res_off_(0), res_buf_(), req_off_(0), 194 req_buf_(NULL) { 201 : thd_(thd), sock_(sock), req_off_(0), req_buf_(NULL) { 202 res_.buf_ = NULL; 203 res_.off_ = res_.size_ = 0; 195 204 picoev_add(thd_->loop_, sock_, PICOEV_READ, TIMEOUT_SECS, handle_conn, 196 205 this); … … 200 209 thd_->return_buf(req_buf_); 201 210 } 211 if (res_.buf_ != NULL) { 212 thd_->return_buf(res_.buf_); 213 } 202 214 picoev_del(thd_->loop_, sock_); 203 215 close(sock_); 216 } 217 void res_prepare() { 218 assert(res_.buf_ == NULL); 219 res_.buf_ = thd_->steal_buf(); 220 res_.off_ = res_.size_ = 0; 221 } 222 void res_clear() { 223 assert(res_.buf_ != NULL); 224 thd_->return_buf(res_.buf_); 225 res_.buf_ = NULL; 226 res_.off_ = res_.size_ = 0; 227 } 228 void res_push(int ch) { 229 if (res_.size_ + 1 > MAX_PACKET_SIZE) { 230 throw packet_too_long_t(); 231 } 232 res_.buf_[res_.size_++] = ch; 233 } 234 void res_push(const char* p, size_t s) { 235 if (res_.size_ + s > MAX_PACKET_SIZE) { 236 throw packet_too_long_t(); 237 } 238 memcpy(res_.buf_ + res_.size_, p, s); 239 res_.size_ += s; 204 240 } 205 241 // all cleanups done in handle_conn -
platform/mysql/mycached/trunk/testclient.cc
r35067 r35138 109 109 explen += 8; 110 110 } 111 if (len !=(ssize_t)explen) {111 if (len < (ssize_t)explen) { 112 112 fprintf(stderr, "unexpected response for test.t1.%d, got:\n", fd); 113 113 fwrite(buf, 1, len, stderr);
![(please configure the [header_logo] section in trac.ini)](/share/chrome/site/your_project_logo.png)