root/platform/mysql/mycached/trunk/mycached.hh @ 35045

Revision 35045, 6.2 kB (checked in by kazuho, 4 years ago)

add mycached_stop()

Line 
1/*
2 * mycached - memcached protocol handler for mysqld
3 *
4 * Copyright (C) 2009 Cybozu Labs, Inc.
5 *
6 * This program is free software; you can redistribute it and/or modify it under
7 * the terms of the GNU General Public License as published by the FreeSoftware
8 * Foundation; either version 2 of the License.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
17 * Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20#ifndef mycached_hh
21#define mycached_hh
22
23#include <cassert>
24#include <stdexcept>
25#include <string>
26#include <vector>
27extern "C" {
28#include "picoev.h"
29}
30
31#define MYSQL_SERVER 1
32
33#include "mysql_priv.h"
34#include "sql_select.h"
35
36#define NUM_THREADS 4
37#define MAX_FDS 11000
38#define TIMEOUT_SECS 3600
39#define MAX_PACKET_SIZE 1048576
40#define NUM_POOLED_BUFS_PER_THREAD 4
41
42namespace mycached {
43 
44  struct error_t : public std::domain_error {
45    error_t(const std::string& what) : std::domain_error(what) {}
46  };
47 
48  struct range_string {
49    const char* first_, * last_;
50    range_string() : first_(NULL), last_(NULL) {}
51    range_string(const char* first, const char* last)
52      : first_(first), last_(last) {}
53    size_t size() const { return last_ - first_; }
54    void clear() { first_ = last_ = NULL; }
55    bool to_int(long long& v) const {
56      v = 0;
57      bool is_negative = false;
58      const char* p = first_;
59      if (p != last_ && *p == '-') {
60        is_negative = true;
61        ++p;
62      }
63      if (p == last_) {
64        return false;
65      }
66      do {
67        if (! ('0' <= *p && *p <= '9')) {
68          return false;
69        }
70        v = v * 10 + *p - '0';
71      } while (++p != last_);
72      if (is_negative) {
73        v *= -1;
74      }
75      return v;
76    }
77  };
78
79  inline
80  bool
81  operator==(const range_string& x, const range_string& y)
82  {
83    if (x.size() != y.size()) {
84      return false;
85    }
86    if (x.first_ == y.first_) {
87      return true;
88    }
89    if (x.first_ == NULL || y.first_ == NULL) {
90      return true;
91    }
92    for (const char* xp = x.first_, * yp = y.first_;
93         xp != x.last_;
94         ++xp, ++yp) {
95      if (*xp != *yp) {
96        return false;
97      }
98    }
99    return true;
100  }
101 
102  inline
103  bool
104  operator!=(const range_string& x, const range_string& y)
105  {
106    return ! (x == y);
107  }
108 
109  struct key_t {
110    range_string db_;
111    range_string table_;
112    range_string key_str_;
113    uchar key_value_[8];
114    void (*response_builder)(const key_t& key, TABLE* table,
115                             std::string& response);
116    range_string entire_key() const {
117      return range_string(db_.first_, key_str_.last_);
118    }
119  };
120
121  struct fetcher_t {
122    THD* thd_;
123    Query_tables_list tables_;
124    size_t num_tables_;
125    const std::vector<key_t>& keys_;
126    enum {
127      state_not_locked,
128      state_locked,
129      state_index_is_open
130    } state_;
131    fetcher_t(THD* thd, const std::vector<key_t>& keys);
132    ~fetcher_t();
133    void fetch(std::string& response);
134    void register_table(const range_string& db, const range_string& table);
135    TABLE_LIST* get_table(const range_string& db, const range_string& table,
136                          size_t& idx) {
137      idx = 0;
138      for (TABLE_LIST* tl = tables_.query_tables;
139           tl != NULL;
140           tl = tl->next_global, ++idx) {
141        if (table.size() == tl->table_name_length
142            && strncmp(table.first_, tl->table_name, table.size()) == 0
143            && db.size() == tl->db_length
144            && strncmp(db.first_, tl->db, db.size()) == 0) {
145          return tl;
146        }
147      }
148      return NULL;
149    }
150    static void build_nlv_response(const key_t& key, TABLE* table,
151                                   std::string& response);
152    static void build_json_response(const key_t& key, TABLE* table,
153                                    std::string& response);
154  };
155 
156  struct thread_t {
157    THD* thd_;
158    int listen_sock_; // per thread socket
159    picoev_loop* loop_;
160    char* buf_pool_[NUM_POOLED_BUFS_PER_THREAD];
161    thread_t(THD* thd, int listen_sock);
162    ~thread_t();
163    void accept_conn();
164    void event_loop();
165    char* steal_buf() {
166      char* r = buf_pool_[0];
167      if (r != NULL) {
168        memmove(buf_pool_, buf_pool_ + 1,
169                sizeof(buf_pool_[0]) * (NUM_POOLED_BUFS_PER_THREAD - 1));
170      } else if ((r = (char*)valloc(MAX_PACKET_SIZE + PICOEV_PAGE_SIZE))
171                 != NULL) {
172        r += PICOEV_RND_UP(rand() % PICOEV_PAGE_SIZE, PICOEV_CACHE_LINE_SIZE);
173      }
174      return r;
175    }
176    void return_buf(char* buf) {
177      if (buf_pool_[NUM_POOLED_BUFS_PER_THREAD - 1] != NULL) {
178        free((void*)((long)buf & ~(PICOEV_PAGE_SIZE - 1)));
179      } else {
180        memmove(buf_pool_ + 1, buf_pool_,
181                sizeof(buf_pool_[0]) * (NUM_POOLED_BUFS_PER_THREAD - 1));
182        buf_pool_[0] = buf;
183      }
184    }
185    static void accept_conn(picoev_loop* loop, int fd, int, void* cb_arg);
186    static void* event_loop(void* _thd);
187    static bool start_server(unsigned host, unsigned short port);
188    static bool stop_server();
189  };
190 
191  struct conn_t {
192    enum {
193      rr_invalid_request = 0,
194      rr_too_long,
195      rr_closed
196    };
197    thread_t* thd_;
198    int sock_;
199    size_t res_off_;
200    std::string res_buf_;
201    size_t req_off_;
202    char* req_buf_;
203    conn_t(thread_t* thd, int sock)
204      : thd_(thd), sock_(sock), res_off_(0), res_buf_(), req_off_(0),
205        req_buf_(NULL) {
206      picoev_add(thd_->loop_, sock_, PICOEV_READ, TIMEOUT_SECS, handle_conn,
207                 this);
208    }
209    ~conn_t() {
210      if (req_buf_ != NULL) {
211        thd_->return_buf(req_buf_);
212      }
213      picoev_del(thd_->loop_, sock_);
214      close(sock_);
215    }
216    // all cleanups done in handle_conn
217    size_t parse_request(std::vector<key_t>& keys, bool& is_partial) const;
218    bool build_send_response(const std::vector<key_t>& keys);
219    bool handle_request();
220    void handle_conn(picoev_loop* loop);
221    static const char* get_token(const char* s, const char* s_end,
222                                 range_string& token) {
223      const char* first = s;
224      for (; s != s_end; ++s) {
225        switch (*s) {
226        case '\r': case '\n': case ' ': case '.': case ':':
227          token = range_string(first, s);
228          return s;
229        default:
230          break;
231        }
232      }
233      token.clear();
234      return NULL;
235    }
236    static void handle_conn(picoev_loop* loop, int, int, void* cb_arg);
237  };
238
239}
240
241#endif
Note: See TracBrowser for help on using the browser.