Changeset 35045 for platform/mysql

Show
Ignore:
Timestamp:
08/25/09 18:13:43 (4 years ago)
Author:
kazuho
Message:

add mycached_stop()

Location:
platform/mysql/mycached/trunk
Files:
5 added
4 modified

Legend:

Unmodified
Added
Removed
  • platform/mysql/mycached/trunk/mycached.cc

    r35039 r35045  
    2121#include <sys/uio.h> 
    2222} 
     23#include "cac/cac_mutex.h" 
    2324#include "mycached.hh" 
    2425 
     
    3031#define SEND_CLIENT_ERROR(sock, es) send_error(sock, "CLIENT_ERROR ", es) 
    3132#define SEND_SERVER_ERROR(sock, es) send_error(sock, "SERVER_ERROR ", es) 
     33 
     34struct mycached_server_info { 
     35  int listen_sock_; 
     36  pthread_t worker_threads_[NUM_THREADS]; 
     37  mycached_server_info() : listen_sock_(-1) {} 
     38}; 
     39 
     40static cac_mutex_t<mycached_server_info> server_info_(NULL); 
    3241 
    3342inline void setup_thd(THD** thd) { 
     
    310319} 
    311320 
    312 int conn_t::listen_sock_ = -1; 
     321thread_t::thread_t(THD* thd, int listen_sock) 
     322  : thd_(thd), listen_sock_(listen_sock), loop_(NULL) 
     323{ 
     324  loop_ = picoev_create_loop(TIMEOUT_SECS); 
     325  picoev_add(loop_, listen_sock_, PICOEV_READ, 0, accept_conn, this); 
     326  memset(buf_pool_, 0, sizeof(buf_pool_)); 
     327} 
     328 
     329thread_t::~thread_t() 
     330{ 
     331  picoev_del(loop_, listen_sock_); 
     332  close(listen_sock_); 
     333  picoev_destroy_loop(loop_); 
     334  for (int i = 0; i < NUM_POOLED_BUFS_PER_THREAD; ++i) { 
     335    free((void*)((long)buf_pool_[i] & ~(PICOEV_PAGE_SIZE - 1))); 
     336  } 
     337  delete thd_; 
     338} 
     339 
     340void 
     341thread_t::accept_conn() 
     342{ 
     343  int sock = accept(listen_sock_, NULL, NULL); 
     344  if (sock != -1) { 
     345    if (sock >= MAX_FDS) { 
     346      SEND_SERVER_ERROR(sock, "too many connections"); 
     347      close(sock); 
     348      return; 
     349    } 
     350    setup_sock(sock); 
     351    new conn_t(this, sock); 
     352  } 
     353} 
     354 
     355void 
     356thread_t::event_loop() 
     357{ 
     358  setup_thd(&thd_); 
     359 
     360  // main loop 
     361  while (cac_mutex_t<mycached_server_info>::lockref(server_info_)->listen_sock_ 
     362         != -1) { 
     363    picoev_loop_once(loop_, 1); 
     364  } 
     365   
     366  // close all sockets 
     367  for (int fd = -1; 
     368       (fd = picoev_next_fd(loop_, fd)) != -1; 
     369       ) { 
     370    if (fd != listen_sock_) { 
     371      delete reinterpret_cast<conn_t*>(picoev.fds[fd].cb_arg); 
     372    } 
     373  } 
     374} 
     375 
     376void 
     377thread_t::accept_conn(picoev_loop* loop, int fd, int, void* cb_arg) 
     378{ 
     379  thread_t* thd = reinterpret_cast<thread_t*>(cb_arg); 
     380  thd->accept_conn(); 
     381} 
     382 
     383void* 
     384thread_t::event_loop(void* _thd) 
     385{ 
     386  thread_t* thd = (thread_t*)_thd; 
     387  thd->event_loop(); 
     388  delete thd; 
     389  return NULL; 
     390} 
     391 
     392bool 
     393thread_t::start_server(unsigned host, unsigned short port) 
     394{ 
     395  cac_mutex_t<mycached_server_info>::lockref server_info(server_info_); 
     396   
     397  if (server_info->listen_sock_ != -1) { 
     398    sql_print_error("mycached is already running"); 
     399    return false; 
     400  } 
     401   
     402  server_info->listen_sock_ = socket(AF_INET, SOCK_STREAM, 0); 
     403  assert(server_info->listen_sock_ != -1); 
     404  int on = 1; 
     405  setsockopt(server_info->listen_sock_, SOL_SOCKET, SO_REUSEADDR, &on, 
     406             sizeof(on)); 
     407  sockaddr_in listen_addr; 
     408  listen_addr.sin_family = AF_INET; 
     409  listen_addr.sin_port = htons(port); 
     410  listen_addr.sin_addr.s_addr = htonl(host); 
     411  if (bind(server_info->listen_sock_, (sockaddr*)&listen_addr, 
     412           sizeof(listen_addr)) 
     413      != 0 
     414      || listen(server_info->listen_sock_, 128) != 0) { 
     415    close(server_info->listen_sock_); 
     416    server_info->listen_sock_ = -1; 
     417    sql_print_error("could not listen to port\n"); 
     418    return false; 
     419  } 
     420   
     421  if (picoev.max_fd == 0) { 
     422    picoev_init(MAX_FDS); 
     423  } 
     424   
     425  for (int i = 0; i < NUM_THREADS; ++i) { 
     426    // dup socket for per-thread accept(2) 
     427    int sock = dup(server_info->listen_sock_); 
     428    assert(sock != -1); 
     429    setup_sock(sock); 
     430    pthread_create(server_info->worker_threads_ + i, NULL, event_loop, 
     431                   new thread_t(new THD(), sock)); 
     432  } 
     433   
     434  return true; 
     435} 
     436 
     437bool 
     438thread_t::stop_server() 
     439{ 
     440  { // close global listen sock 
     441    cac_mutex_t<mycached_server_info>::lockref server_info(server_info_); 
     442    if (server_info->listen_sock_ == -1) { 
     443      return false; 
     444    } 
     445    close(server_info->listen_sock_); 
     446    server_info->listen_sock_ = -1; 
     447  } 
     448   
     449  // wait for all threads to terminate 
     450  for (int i = 0; i < NUM_THREADS; ++i) { 
     451    pthread_t tid 
     452      = cac_mutex_t<mycached_server_info>::lockref(server_info_) 
     453      ->worker_threads_[i]; 
     454    pthread_join(tid, NULL); 
     455  } 
     456   
     457  return true; 
     458} 
    313459 
    314460size_t 
     
    523669   
    524670 CLOSE_CONN: 
    525   // all cleanups should be done here 
    526   if (req_buf_ != NULL) { 
    527     thd_->return_buf(req_buf_); 
    528     req_buf_ = NULL; 
    529   } 
    530   picoev_del(loop, sock_); 
    531   close(sock_); 
    532671  delete this; 
    533672} 
     
    539678  self->handle_conn(loop); 
    540679} 
    541  
    542 void 
    543 conn_t::accept_conn(picoev_loop* loop, int fd, int, void* cb_arg) 
    544 { 
    545   thread_t* thd = reinterpret_cast<thread_t*>(cb_arg); 
    546   int sock = accept(fd, NULL, NULL); 
    547   if (sock == -1) { 
    548     return; 
    549   } 
    550   if (sock >= MAX_FDS) { 
    551     SEND_SERVER_ERROR(sock, "too many connections"); 
    552     close(sock); 
    553     return; 
    554   } 
    555   setup_sock(sock); 
    556   new conn_t(thd, loop, sock); 
    557 } 
    558  
    559 void* 
    560 conn_t::event_loop(void* _thd) 
    561 { 
    562   thread_t thd((THD*)_thd); 
    563   setup_thd(&thd.thd_); 
    564  
    565   picoev_loop* loop = picoev_create_loop(TIMEOUT_SECS); 
    566   // use a dup socket instead since it is not possible to share a single 
    567   // file descriptor with multiple threads using picoev 
    568   int sock = dup(listen_sock_); 
    569   assert(sock != -1); 
    570   setup_sock(sock); 
    571   picoev_add(loop, sock, PICOEV_READ, 0, accept_conn, &thd); 
    572   while (1) { 
    573     picoev_loop_once(loop, 60); 
    574   } 
    575   picoev_destroy_loop(loop); 
    576    
    577   delete thd.thd_; 
    578   return NULL; 
    579 } 
    580  
    581 bool 
    582 conn_t::start_server(unsigned host, unsigned short port) 
    583 { 
    584   if (listen_sock_ != -1) { 
    585     sql_print_error("mycached is already running"); 
    586     return false; 
    587   } 
    588    
    589   listen_sock_ = socket(AF_INET, SOCK_STREAM, 0); 
    590   assert(listen_sock_ != -1); 
    591   int on = 1; 
    592   setsockopt(listen_sock_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); 
    593   sockaddr_in listen_addr; 
    594   listen_addr.sin_family = AF_INET; 
    595   listen_addr.sin_port = htons(port); 
    596   listen_addr.sin_addr.s_addr = htonl(host); 
    597   if (bind(listen_sock_, (sockaddr*)&listen_addr, sizeof(listen_addr)) != 0 
    598       || listen(listen_sock_, 128) != 0) { 
    599     close(listen_sock_); 
    600     listen_sock_ = -1; 
    601     sql_print_error("could not listen to port\n"); 
    602     return false; 
    603   } 
    604    
    605   if (picoev.max_fd == 0) { 
    606     picoev_init(MAX_FDS); 
    607   } 
    608   for (int i = 0; i < NUM_THREADS; ++i) { 
    609     pthread_t tid; 
    610     pthread_create(&tid, NULL, event_loop, new THD()); 
    611     pthread_detach(tid); 
    612   } 
    613    
    614   return true; 
    615 } 
  • platform/mysql/mycached/trunk/mycached.hh

    r35042 r35045  
    156156  struct thread_t { 
    157157    THD* thd_; 
     158    int listen_sock_; // per thread socket 
     159    picoev_loop* loop_; 
    158160    char* buf_pool_[NUM_POOLED_BUFS_PER_THREAD]; 
    159     thread_t(THD* thd) : thd_(thd) { 
    160       memset(buf_pool_, 0, sizeof(buf_pool_)); 
    161     } 
    162     ~thread_t() { 
    163       for (int i = 0; i < NUM_POOLED_BUFS_PER_THREAD; ++i) { 
    164         free((void*)((long)buf_pool_[i] & ~(PICOEV_PAGE_SIZE - 1))); 
    165       } 
    166     } 
     161    thread_t(THD* thd, int listen_sock); 
     162    ~thread_t(); 
     163    void accept_conn(); 
     164    void event_loop(); 
    167165    char* steal_buf() { 
    168166      char* r = buf_pool_[0]; 
     
    185183      } 
    186184    } 
     185    static void accept_conn(picoev_loop* loop, int fd, int, void* cb_arg); 
     186    static void* event_loop(void* _thd); 
     187    static bool start_server(unsigned host, unsigned short port); 
     188    static bool stop_server(); 
    187189  }; 
    188190   
     
    199201    size_t req_off_; 
    200202    char* req_buf_; 
    201     static int listen_sock_; 
    202     conn_t(thread_t* thd, picoev_loop* loop, int sock) 
     203    conn_t(thread_t* thd, int sock) 
    203204      : thd_(thd), sock_(sock), res_off_(0), res_buf_(), req_off_(0), 
    204205        req_buf_(NULL) { 
    205       picoev_add(loop, sock_, PICOEV_READ, TIMEOUT_SECS, handle_conn, this); 
     206      picoev_add(thd_->loop_, sock_, PICOEV_READ, TIMEOUT_SECS, handle_conn, 
     207                 this); 
     208    } 
     209    ~conn_t() { 
     210      if (req_buf_ != NULL) { 
     211        thd_->return_buf(req_buf_); 
     212      } 
     213      picoev_del(thd_->loop_, sock_); 
     214      close(sock_); 
    206215    } 
    207216    // all cleanups done in handle_conn 
     
    226235    } 
    227236    static void handle_conn(picoev_loop* loop, int, int, void* cb_arg); 
    228     static void accept_conn(picoev_loop* loop, int fd, int, void* cb_arg); 
    229     static void* event_loop(void* _thd); 
    230     static bool start_server(unsigned host, unsigned short port); 
    231237  }; 
    232238 
  • platform/mysql/mycached/trunk/mycached_as_udf.cc

    r35019 r35045  
    6767mycached_start(UDF_INIT* initid, UDF_ARGS* args, char* is_null, char* error) 
    6868{ 
    69   return mycached::conn_t::start_server(*(long long*)args->args[0], 
    70                                         *(long long*)args->args[1]); 
     69  return mycached::thread_t::start_server(*(long long*)args->args[0], 
     70                                          *(long long*)args->args[1]); 
    7171} 
     72 
     73extern "C" 
     74my_bool 
     75mycached_stop_init(UDF_INIT* initid, UDF_ARGS* args, char* message) 
     76{ 
     77  if (args->arg_count != 0) { 
     78    strcpy(message, "mycached_stop(): invalid arguments"); 
     79    return 1; 
     80  } 
     81  initid->maybe_null = 0; 
     82  return 0; 
     83} 
     84 
     85extern "C" 
     86void 
     87mycached_stop_deinit(UDF_INIT* initid) 
     88{ 
     89} 
     90 
     91extern "C" 
     92long long 
     93mycached_stop(UDF_INIT* initid, UDF_ARGS* args, char* is_null, char* error) 
     94{ 
     95  return mycached::thread_t::stop_server(); 
     96} 
  • platform/mysql/mycached/trunk/picoev.h

    r35035 r35045  
    274274    } 
    275275    return 0; 
     276  } 
     277   
     278  /* function to iterate registered information. To start iteration, set curfd 
     279     to -1 and call the function until -1 is returned */ 
     280  PICOEV_INLINE 
     281  int picoev_next_fd(picoev_loop* loop, int curfd) { 
     282    if (curfd != -1) { 
     283      assert(PICOEV_IS_INITED_AND_FD_IN_RANGE(curfd)); 
     284    } 
     285    while (++curfd < picoev.max_fd) { 
     286      if (loop->loop_id == picoev.fds[curfd].loop_id) { 
     287        return curfd; 
     288      } 
     289    } 
     290    return -1; 
    276291  } 
    277292