Changeset 13812 for lang/sql

Show
Ignore:
Timestamp:
06/13/08 15:35:17 (5 years ago)
Author:
kazuho
Message:

support paging, implement maxid cache

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • lang/sql/mysql_timeline/trunk/timeline.cc

    r13684 r13812  
     1#include <algorithm> 
    12#include "mysql_direct_access.hpp" 
    23 
    3 #define MAX_ROWS 20 
     4#ifndef MAX_ROWS 
     5# define MAX_ROWS 20 
     6#endif 
     7 
     8#ifndef CACHE_MAX_MESSAGE_IDS 
     9# define CACHE_MAX_MESSAGE_IDS 0 
     10#endif 
     11 
     12#ifndef MAX_MESSAGE_IDS_INITIAL_CAPACITY 
     13# define MAX_MESSAGE_IDS_INITIAL_CAPACITY 4096 
     14#endif 
     15 
     16using namespace std; 
     17 
     18/* per user cache for keeping max. message id 
     19 * i.e. max_message_ids[$uid] == select max(id) from message where user_id=$uid 
     20 *  
     21 * should be fullfilled on server startup by calling: 
     22 *  SELECT set_max_message_id(user_id,max(id)) FROM message GROUP BY user_id 
     23 */ 
     24class max_message_ids_t { 
     25   
     26protected: 
     27  unsigned *vect; 
     28  size_t vect_capacity; 
     29  pthread_rwlock_t rwlock; 
     30   
     31public: 
     32  max_message_ids_t() 
     33  : vect((unsigned*)malloc(sizeof(unsigned) 
     34                           * MAX_MESSAGE_IDS_INITIAL_CAPACITY)), 
     35    vect_capacity(MAX_MESSAGE_IDS_INITIAL_CAPACITY) 
     36  { 
     37    assert(vect != NULL); 
     38    fill(vect, vect + vect_capacity, 0); 
     39    pthread_rwlockattr_t attr; 
     40    pthread_rwlockattr_init(&attr); 
     41    // switch to writer-preferred lock on linux 
     42#ifdef PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP 
     43    pthread_rwlockattr_setkind_np(&attr, 
     44                                  PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); 
     45#endif 
     46    pthread_rwlock_init(&rwlock, &attr); 
     47    pthread_rwlockattr_destroy(&attr); 
     48  } 
     49  ~max_message_ids_t() { 
     50    free(vect); 
     51    vect = NULL; 
     52    pthread_rwlock_destroy(&rwlock); 
     53  } 
     54  void lock_reader() { 
     55    pthread_rwlock_rdlock(&rwlock); 
     56  } 
     57  void unlock_reader() { 
     58    pthread_rwlock_unlock(&rwlock); 
     59  } 
     60  unsigned get(unsigned user_id) { 
     61    return user_id < vect_capacity ? vect[user_id] : 0; 
     62  } 
     63  void set(unsigned user_id, unsigned message_id) { 
     64    pthread_rwlock_rdlock(&rwlock); 
     65    if (user_id < vect_capacity) { 
     66      vect[user_id] = message_id; 
     67      pthread_rwlock_unlock(&rwlock); 
     68      return; 
     69    } else { 
     70      pthread_rwlock_unlock(&rwlock); 
     71      pthread_rwlock_wrlock(&rwlock); 
     72      if (vect_capacity <= user_id) { 
     73        size_t old_capacity = vect_capacity; 
     74        do { 
     75        vect_capacity *= 2; 
     76        } while (vect_capacity <= user_id); 
     77        vect = (unsigned*)realloc(vect, sizeof(unsigned) * vect_capacity); 
     78        assert(vect != NULL); 
     79        fill(vect + old_capacity, vect + vect_capacity, 0); 
     80      } 
     81      vect[user_id] = message_id; 
     82      pthread_rwlock_unlock(&rwlock); 
     83    } 
     84  } 
     85}; 
     86 
     87static max_message_ids_t max_message_ids; 
     88 
     89 
     90template <typename Elem> struct sorted_list_t { 
     91   
     92  Elem list[MAX_ROWS]; /* top MAX_ROWS, sorted in desc. order */ 
     93  int cnt; 
     94   
     95  sorted_list_t() : cnt(0) {} 
     96  bool test_add(const Elem& item) { 
     97    int keep; 
     98    for (keep = cnt - 1; keep >= 0; keep--) { 
     99      if (item <= list[keep]) { 
     100        break; 
     101      } 
     102    } 
     103    if (keep == MAX_ROWS - 1) { 
     104      return false; 
     105    } 
     106    if (cnt < MAX_ROWS) { 
     107      cnt++; 
     108    } 
     109    for (int i = cnt - 2; i > keep; i--) { 
     110      list[i + 1] = list[i]; 
     111    } 
     112    list[keep + 1] = item; 
     113    return true; 
     114  } 
     115}; 
     116 
     117struct uid_maxid_t { 
     118  unsigned user_id; 
     119  unsigned max_id; 
     120  uid_maxid_t() {} 
     121  uid_maxid_t(unsigned u, unsigned m) : user_id(u), max_id(m) {} 
     122}; 
     123 
     124bool operator<=(const uid_maxid_t& x, const uid_maxid_t& y) { 
     125  return x.max_id <= y.max_id; 
     126} 
    4127 
    5128struct timeline_t : public mysql_direct_access<timeline_t> { 
    6129   
    7   unsigned id_list[MAX_ROWS]; /* top MAX_ROWS, sorted in desc. order */ 
    8   int id_cnt; 
    9    
    10   timeline_t() : mysql_direct_access<timeline_t>(), id_cnt(0) {} 
    11    
    12130  static int type_init(UDF_INIT *initid, UDF_ARGS *args, char *message) { 
    13     if (args->arg_count != 1) { 
    14       strcpy(message, "timeline(user_id)"); 
     131    switch (args->arg_count) { 
     132    case 2: 
     133      args->arg_type[1] = INT_RESULT; 
     134      args->maybe_null[1] = 0; 
     135      // don't break 
     136    case 1: 
     137      args->arg_type[0] = INT_RESULT; 
     138      args->maybe_null[0] = 0; 
     139      break; 
     140    default: 
     141      strcpy(message, "timeline(user_id[,max_message_id])"); 
    15142      return 1; 
    16143    } 
    17     args->arg_type[0] = INT_RESULT; 
    18     args->maybe_null[0] = 0; 
    19144    initid->maybe_null = 0; 
    20145    return 0; 
     
    28153  } 
    29154   
    30   int do_task() { 
    31      
    32     unsigned user_id = *(long long*)args->args[0]; 
    33      
    34     TABLE *message_tbl = get_table(0); 
    35     Field *message_id_fld = get_field(message_tbl, "id"); 
     155  TABLE *message_tbl; 
     156  Field *message_id_fld; 
     157  Field *message_user_id_fld; 
     158  KEY *message_key; 
     159  TABLE *follower_tbl; 
     160  Field *follower_user_id_fld; 
     161  Field *follower_follower_id_fld; 
     162  KEY *follower_key; 
     163  sorted_list_t<unsigned> message_ids; 
     164   
     165  void setup() { 
     166    message_tbl = get_table(0); 
     167    message_id_fld = get_field(message_tbl, "id"); 
    36168    assert(message_id_fld != NULL); 
    37     Field *message_user_id_fld = get_field(message_tbl, "user_id"); 
     169    message_user_id_fld = get_field(message_tbl, "user_id"); 
    38170    assert(message_user_id_fld != NULL); 
    39171    message_tbl->clear_column_bitmaps(); 
    40172    bitmap_set_bit(message_tbl->read_set, message_id_fld->field_index); 
    41173    bitmap_set_bit(message_tbl->read_set, message_user_id_fld->field_index); 
    42     KEY *message_key = index_init(message_tbl, "user_id_id", true); 
     174    message_key = index_init(message_tbl, "user_id_id", true); 
    43175    assert(message_key != NULL); 
    44     uchar* message_keybuff = (uchar*)alloca(message_key->key_length);     
    45     memset(message_keybuff, 0, message_key->key_length); 
    46      
    47     TABLE *follower_tbl = get_table(1); 
    48     Field *follower_user_id_fld = get_field(follower_tbl, "user_id"); 
     176     
     177    follower_tbl = get_table(1); 
     178    follower_user_id_fld = get_field(follower_tbl, "user_id"); 
    49179    assert(follower_user_id_fld != NULL); 
    50     Field *follower_follower_id_fld = get_field(follower_tbl, "follower_id"); 
     180    follower_follower_id_fld = get_field(follower_tbl, "follower_id"); 
    51181    assert(follower_follower_id_fld != NULL); 
    52182    follower_tbl->use_all_columns(); 
    53     KEY *follower_key = index_init(follower_tbl, "PRIMARY", false); 
     183    follower_key = index_init(follower_tbl, "PRIMARY", false); 
    54184    assert(follower_key != NULL); 
    55     uchar* follower_keybuff = (uchar*)alloca(follower_key->key_length); 
    56     memset(follower_keybuff, 0, follower_key->key_length); 
     185  } 
     186   
     187  void per_user_search(unsigned follower_id, unsigned max_id) { 
     188    uchar *keybuff = (uchar*)alloca(message_key->key_length); 
     189     
     190    if (max_id == 0) { 
     191      int4store(keybuff, follower_id); 
     192      if (message_tbl->file->index_read_map(message_tbl->record[0], keybuff, 1, 
     193                                            HA_READ_PREFIX_LAST) 
     194          != 0) { 
     195        return; 
     196      } 
     197    } else { 
     198      int4store(keybuff, follower_id); 
     199      int4store(keybuff + 4, max_id); 
     200      if (message_tbl->file->index_read_map(message_tbl->record[0], keybuff, 3, 
     201                                            HA_READ_BEFORE_KEY) 
     202          != 0 || 
     203          message_user_id_fld->val_int() != follower_id) { 
     204        return; 
     205      } 
     206    } 
     207    do { 
     208      if (! message_ids.test_add(message_id_fld->val_int())) { 
     209        break; 
     210      } 
     211    } while (message_tbl->file->index_prev(message_tbl->record[0]) == 0 
     212             && message_user_id_fld->val_int() == follower_id); 
     213  } 
     214   
     215  int do_task() { 
     216     
     217    unsigned user_id = *(long long*)args->args[0]; 
     218    unsigned max_id = args->arg_count >= 2 ? *(long long*)args->args[1] : 0; 
     219     
     220    setup(); 
    57221     
    58222    /* main */ 
    59     int4store(follower_keybuff, user_id); 
    60     if (follower_tbl->file->index_read_map(follower_tbl->record[0], 
    61                                            follower_keybuff, 1, HA_READ_PREFIX) 
     223    uchar *keybuff = (uchar*)alloca(follower_key->key_length); 
     224    memset(keybuff, 0, follower_key->key_length); 
     225    int4store(keybuff, user_id); 
     226    if (follower_tbl->file->index_read_map(follower_tbl->record[0], keybuff, 
     227                                           1, HA_READ_PREFIX) 
    62228        == 0) { 
    63       do { 
    64         unsigned follower_id = follower_follower_id_fld->val_int(); 
    65         int4store(message_keybuff, follower_id); 
    66         if (message_tbl->file->index_read_map(message_tbl->record[0], 
    67                                               message_keybuff, 1, 
    68                                               HA_READ_PREFIX_LAST) 
    69             == 0) { 
    70           do { 
    71             if (! test_add_id(message_id_fld->val_int())) { 
    72               break; 
    73             } 
    74           } while (message_tbl->file->index_prev(message_tbl->record[0]) == 0 
    75                    && message_user_id_fld->val_int() == follower_id); 
     229      if (CACHE_MAX_MESSAGE_IDS && max_id == 0) { 
     230        sorted_list_t<uid_maxid_t> uid_maxid_list; 
     231        max_message_ids.lock_reader(); 
     232        do { 
     233          unsigned fid = follower_follower_id_fld->val_int(); 
     234          uid_maxid_list.test_add(uid_maxid_t(fid, max_message_ids.get(fid))); 
     235        } while (follower_tbl->file->index_next(follower_tbl->record[0]) == 0 
     236                 && follower_user_id_fld->val_int() == user_id); 
     237        max_message_ids.unlock_reader(); 
     238        for (int i = 0; 
     239             i < uid_maxid_list.cnt && uid_maxid_list.list[i].max_id != 0; 
     240             i++) { 
     241          per_user_search(uid_maxid_list.list[i].user_id, 0); 
    76242        } 
    77       } while (follower_tbl->file->index_next(follower_tbl->record[0]) == 0 
    78                && follower_user_id_fld->val_int() == user_id); 
     243      } else { 
     244        do { 
     245          unsigned fid = follower_follower_id_fld->val_int(); 
     246          per_user_search(fid, max_id); 
     247        } while (follower_tbl->file->index_next(follower_tbl->record[0]) == 0 
     248                 && follower_user_id_fld->val_int() == user_id); 
     249      } 
    79250    } 
    80251    follower_tbl->file->ha_index_end(); 
     
    87258    out_tt->use_all_columns(); 
    88259    out_tt->file->ha_delete_all_rows(); 
    89     out_tt->file->ha_start_bulk_insert(id_cnt); 
     260    out_tt->file->ha_start_bulk_insert(message_ids.cnt); 
    90261    memset(out_tt->record[0], 0, out_tt->s->null_bytes); 
    91     for (int i = 0; i < id_cnt; i++) { 
    92       out_tt_id_fld->store(id_list[i], true); 
     262    for (int i = 0; i < message_ids.cnt; i++) { 
     263      out_tt_id_fld->store(message_ids.list[i], true); 
    93264      out_tt->file->ha_write_row(out_tt->record[0]); 
    94265    } 
     
    97268    return 0; 
    98269  } 
    99  
    100   bool test_add_id(unsigned id) { 
    101     int keep; 
    102     for (keep = id_cnt - 1; keep >= 0; keep--) { 
    103       if (id <= id_list[keep]) { 
    104         break; 
    105       } 
    106     } 
    107     if (keep == MAX_ROWS - 1) { 
    108       return false; 
    109     } 
    110     if (id_cnt < MAX_ROWS) { 
    111       id_cnt++; 
    112     } 
    113     for (int i = id_cnt - 2; i > keep; i--) { 
    114       id_list[i + 1] = id_list[i]; 
    115     } 
    116     id_list[keep + 1] = id; 
    117     return true; 
    118   } 
    119270}; 
    120271 
    121272MYSQL_DIRECT_ACCESS_DECLARE_AS_INT_UDF(timeline_t, timeline); 
     273 
     274extern "C" my_bool timeline_set_maxid_init(UDF_INIT *initid, UDF_ARGS *args, 
     275                                       char *message) 
     276{ 
     277  if (args->arg_count != 2) { 
     278    strcpy(message, "usage: timeline_set_maxid(user_id,max_id)"); 
     279    return 1; 
     280  } 
     281  args->arg_type[0] = INT_RESULT; 
     282  args->maybe_null[0] = 0; 
     283  args->arg_type[1] = INT_RESULT; 
     284  args->maybe_null[1] = 0; 
     285  initid->maybe_null = 0; 
     286  return 0; 
     287} 
     288 
     289extern "C" void timeline_set_maxid_deinit(UDF_INIT *initid) 
     290{ 
     291} 
     292 
     293extern "C" long long timeline_set_maxid(UDF_INIT *initid, UDF_ARGS *args, 
     294                                        char *is_null, char *error) 
     295{ 
     296  max_message_ids.set(*(long long*)args->args[0], *(long long*)args->args[1]); 
     297  return 0; 
     298}