| 312 | | int conn_t::listen_sock_ = -1; |
| | 321 | thread_t::thread_t(THD* thd, int listen_sock) |
| | 322 | : thd_(thd), listen_sock_(listen_sock), loop_(NULL) |
| | 323 | { |
| | 324 | loop_ = picoev_create_loop(TIMEOUT_SECS); |
| | 325 | picoev_add(loop_, listen_sock_, PICOEV_READ, 0, accept_conn, this); |
| | 326 | memset(buf_pool_, 0, sizeof(buf_pool_)); |
| | 327 | } |
| | 328 | |
| | 329 | thread_t::~thread_t() |
| | 330 | { |
| | 331 | picoev_del(loop_, listen_sock_); |
| | 332 | close(listen_sock_); |
| | 333 | picoev_destroy_loop(loop_); |
| | 334 | for (int i = 0; i < NUM_POOLED_BUFS_PER_THREAD; ++i) { |
| | 335 | free((void*)((long)buf_pool_[i] & ~(PICOEV_PAGE_SIZE - 1))); |
| | 336 | } |
| | 337 | delete thd_; |
| | 338 | } |
| | 339 | |
| | 340 | void |
| | 341 | thread_t::accept_conn() |
| | 342 | { |
| | 343 | int sock = accept(listen_sock_, NULL, NULL); |
| | 344 | if (sock != -1) { |
| | 345 | if (sock >= MAX_FDS) { |
| | 346 | SEND_SERVER_ERROR(sock, "too many connections"); |
| | 347 | close(sock); |
| | 348 | return; |
| | 349 | } |
| | 350 | setup_sock(sock); |
| | 351 | new conn_t(this, sock); |
| | 352 | } |
| | 353 | } |
| | 354 | |
| | 355 | void |
| | 356 | thread_t::event_loop() |
| | 357 | { |
| | 358 | setup_thd(&thd_); |
| | 359 | |
| | 360 | // main loop |
| | 361 | while (cac_mutex_t<mycached_server_info>::lockref(server_info_)->listen_sock_ |
| | 362 | != -1) { |
| | 363 | picoev_loop_once(loop_, 1); |
| | 364 | } |
| | 365 | |
| | 366 | // close all sockets |
| | 367 | for (int fd = -1; |
| | 368 | (fd = picoev_next_fd(loop_, fd)) != -1; |
| | 369 | ) { |
| | 370 | if (fd != listen_sock_) { |
| | 371 | delete reinterpret_cast<conn_t*>(picoev.fds[fd].cb_arg); |
| | 372 | } |
| | 373 | } |
| | 374 | } |
| | 375 | |
| | 376 | void |
| | 377 | thread_t::accept_conn(picoev_loop* loop, int fd, int, void* cb_arg) |
| | 378 | { |
| | 379 | thread_t* thd = reinterpret_cast<thread_t*>(cb_arg); |
| | 380 | thd->accept_conn(); |
| | 381 | } |
| | 382 | |
| | 383 | void* |
| | 384 | thread_t::event_loop(void* _thd) |
| | 385 | { |
| | 386 | thread_t* thd = (thread_t*)_thd; |
| | 387 | thd->event_loop(); |
| | 388 | delete thd; |
| | 389 | return NULL; |
| | 390 | } |
| | 391 | |
| | 392 | bool |
| | 393 | thread_t::start_server(unsigned host, unsigned short port) |
| | 394 | { |
| | 395 | cac_mutex_t<mycached_server_info>::lockref server_info(server_info_); |
| | 396 | |
| | 397 | if (server_info->listen_sock_ != -1) { |
| | 398 | sql_print_error("mycached is already running"); |
| | 399 | return false; |
| | 400 | } |
| | 401 | |
| | 402 | server_info->listen_sock_ = socket(AF_INET, SOCK_STREAM, 0); |
| | 403 | assert(server_info->listen_sock_ != -1); |
| | 404 | int on = 1; |
| | 405 | setsockopt(server_info->listen_sock_, SOL_SOCKET, SO_REUSEADDR, &on, |
| | 406 | sizeof(on)); |
| | 407 | sockaddr_in listen_addr; |
| | 408 | listen_addr.sin_family = AF_INET; |
| | 409 | listen_addr.sin_port = htons(port); |
| | 410 | listen_addr.sin_addr.s_addr = htonl(host); |
| | 411 | if (bind(server_info->listen_sock_, (sockaddr*)&listen_addr, |
| | 412 | sizeof(listen_addr)) |
| | 413 | != 0 |
| | 414 | || listen(server_info->listen_sock_, 128) != 0) { |
| | 415 | close(server_info->listen_sock_); |
| | 416 | server_info->listen_sock_ = -1; |
| | 417 | sql_print_error("could not listen to port\n"); |
| | 418 | return false; |
| | 419 | } |
| | 420 | |
| | 421 | if (picoev.max_fd == 0) { |
| | 422 | picoev_init(MAX_FDS); |
| | 423 | } |
| | 424 | |
| | 425 | for (int i = 0; i < NUM_THREADS; ++i) { |
| | 426 | // dup socket for per-thread accept(2) |
| | 427 | int sock = dup(server_info->listen_sock_); |
| | 428 | assert(sock != -1); |
| | 429 | setup_sock(sock); |
| | 430 | pthread_create(server_info->worker_threads_ + i, NULL, event_loop, |
| | 431 | new thread_t(new THD(), sock)); |
| | 432 | } |
| | 433 | |
| | 434 | return true; |
| | 435 | } |
| | 436 | |
| | 437 | bool |
| | 438 | thread_t::stop_server() |
| | 439 | { |
| | 440 | { // close global listen sock |
| | 441 | cac_mutex_t<mycached_server_info>::lockref server_info(server_info_); |
| | 442 | if (server_info->listen_sock_ == -1) { |
| | 443 | return false; |
| | 444 | } |
| | 445 | close(server_info->listen_sock_); |
| | 446 | server_info->listen_sock_ = -1; |
| | 447 | } |
| | 448 | |
| | 449 | // wait for all threads to terminate |
| | 450 | for (int i = 0; i < NUM_THREADS; ++i) { |
| | 451 | pthread_t tid |
| | 452 | = cac_mutex_t<mycached_server_info>::lockref(server_info_) |
| | 453 | ->worker_threads_[i]; |
| | 454 | pthread_join(tid, NULL); |
| | 455 | } |
| | 456 | |
| | 457 | return true; |
| | 458 | } |
| 541 | | |
| 542 | | void |
| 543 | | conn_t::accept_conn(picoev_loop* loop, int fd, int, void* cb_arg) |
| 544 | | { |
| 545 | | thread_t* thd = reinterpret_cast<thread_t*>(cb_arg); |
| 546 | | int sock = accept(fd, NULL, NULL); |
| 547 | | if (sock == -1) { |
| 548 | | return; |
| 549 | | } |
| 550 | | if (sock >= MAX_FDS) { |
| 551 | | SEND_SERVER_ERROR(sock, "too many connections"); |
| 552 | | close(sock); |
| 553 | | return; |
| 554 | | } |
| 555 | | setup_sock(sock); |
| 556 | | new conn_t(thd, loop, sock); |
| 557 | | } |
| 558 | | |
| 559 | | void* |
| 560 | | conn_t::event_loop(void* _thd) |
| 561 | | { |
| 562 | | thread_t thd((THD*)_thd); |
| 563 | | setup_thd(&thd.thd_); |
| 564 | | |
| 565 | | picoev_loop* loop = picoev_create_loop(TIMEOUT_SECS); |
| 566 | | // use a dup socket instead since it is not possible to share a single |
| 567 | | // file descriptor with multiple threads using picoev |
| 568 | | int sock = dup(listen_sock_); |
| 569 | | assert(sock != -1); |
| 570 | | setup_sock(sock); |
| 571 | | picoev_add(loop, sock, PICOEV_READ, 0, accept_conn, &thd); |
| 572 | | while (1) { |
| 573 | | picoev_loop_once(loop, 60); |
| 574 | | } |
| 575 | | picoev_destroy_loop(loop); |
| 576 | | |
| 577 | | delete thd.thd_; |
| 578 | | return NULL; |
| 579 | | } |
| 580 | | |
| 581 | | bool |
| 582 | | conn_t::start_server(unsigned host, unsigned short port) |
| 583 | | { |
| 584 | | if (listen_sock_ != -1) { |
| 585 | | sql_print_error("mycached is already running"); |
| 586 | | return false; |
| 587 | | } |
| 588 | | |
| 589 | | listen_sock_ = socket(AF_INET, SOCK_STREAM, 0); |
| 590 | | assert(listen_sock_ != -1); |
| 591 | | int on = 1; |
| 592 | | setsockopt(listen_sock_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); |
| 593 | | sockaddr_in listen_addr; |
| 594 | | listen_addr.sin_family = AF_INET; |
| 595 | | listen_addr.sin_port = htons(port); |
| 596 | | listen_addr.sin_addr.s_addr = htonl(host); |
| 597 | | if (bind(listen_sock_, (sockaddr*)&listen_addr, sizeof(listen_addr)) != 0 |
| 598 | | || listen(listen_sock_, 128) != 0) { |
| 599 | | close(listen_sock_); |
| 600 | | listen_sock_ = -1; |
| 601 | | sql_print_error("could not listen to port\n"); |
| 602 | | return false; |
| 603 | | } |
| 604 | | |
| 605 | | if (picoev.max_fd == 0) { |
| 606 | | picoev_init(MAX_FDS); |
| 607 | | } |
| 608 | | for (int i = 0; i < NUM_THREADS; ++i) { |
| 609 | | pthread_t tid; |
| 610 | | pthread_create(&tid, NULL, event_loop, new THD()); |
| 611 | | pthread_detach(tid); |
| 612 | | } |
| 613 | | |
| 614 | | return true; |
| 615 | | } |