| 3 | | #define MAX_ROWS 20 |
| | 4 | #ifndef MAX_ROWS |
| | 5 | # define MAX_ROWS 20 |
| | 6 | #endif |
| | 7 | |
| | 8 | #ifndef CACHE_MAX_MESSAGE_IDS |
| | 9 | # define CACHE_MAX_MESSAGE_IDS 0 |
| | 10 | #endif |
| | 11 | |
| | 12 | #ifndef MAX_MESSAGE_IDS_INITIAL_CAPACITY |
| | 13 | # define MAX_MESSAGE_IDS_INITIAL_CAPACITY 4096 |
| | 14 | #endif |
| | 15 | |
| | 16 | using namespace std; |
| | 17 | |
| | 18 | /* per user cache for keeping max. message id |
| | 19 | * i.e. max_message_ids[$uid] == select max(id) from message where user_id=$uid |
| | 20 | * |
| | 21 | * should be fullfilled on server startup by calling: |
| | 22 | * SELECT set_max_message_id(user_id,max(id)) FROM message GROUP BY user_id |
| | 23 | */ |
| | 24 | class max_message_ids_t { |
| | 25 | |
| | 26 | protected: |
| | 27 | unsigned *vect; |
| | 28 | size_t vect_capacity; |
| | 29 | pthread_rwlock_t rwlock; |
| | 30 | |
| | 31 | public: |
| | 32 | max_message_ids_t() |
| | 33 | : vect((unsigned*)malloc(sizeof(unsigned) |
| | 34 | * MAX_MESSAGE_IDS_INITIAL_CAPACITY)), |
| | 35 | vect_capacity(MAX_MESSAGE_IDS_INITIAL_CAPACITY) |
| | 36 | { |
| | 37 | assert(vect != NULL); |
| | 38 | fill(vect, vect + vect_capacity, 0); |
| | 39 | pthread_rwlockattr_t attr; |
| | 40 | pthread_rwlockattr_init(&attr); |
| | 41 | // switch to writer-preferred lock on linux |
| | 42 | #ifdef PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP |
| | 43 | pthread_rwlockattr_setkind_np(&attr, |
| | 44 | PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); |
| | 45 | #endif |
| | 46 | pthread_rwlock_init(&rwlock, &attr); |
| | 47 | pthread_rwlockattr_destroy(&attr); |
| | 48 | } |
| | 49 | ~max_message_ids_t() { |
| | 50 | free(vect); |
| | 51 | vect = NULL; |
| | 52 | pthread_rwlock_destroy(&rwlock); |
| | 53 | } |
| | 54 | void lock_reader() { |
| | 55 | pthread_rwlock_rdlock(&rwlock); |
| | 56 | } |
| | 57 | void unlock_reader() { |
| | 58 | pthread_rwlock_unlock(&rwlock); |
| | 59 | } |
| | 60 | unsigned get(unsigned user_id) { |
| | 61 | return user_id < vect_capacity ? vect[user_id] : 0; |
| | 62 | } |
| | 63 | void set(unsigned user_id, unsigned message_id) { |
| | 64 | pthread_rwlock_rdlock(&rwlock); |
| | 65 | if (user_id < vect_capacity) { |
| | 66 | vect[user_id] = message_id; |
| | 67 | pthread_rwlock_unlock(&rwlock); |
| | 68 | return; |
| | 69 | } else { |
| | 70 | pthread_rwlock_unlock(&rwlock); |
| | 71 | pthread_rwlock_wrlock(&rwlock); |
| | 72 | if (vect_capacity <= user_id) { |
| | 73 | size_t old_capacity = vect_capacity; |
| | 74 | do { |
| | 75 | vect_capacity *= 2; |
| | 76 | } while (vect_capacity <= user_id); |
| | 77 | vect = (unsigned*)realloc(vect, sizeof(unsigned) * vect_capacity); |
| | 78 | assert(vect != NULL); |
| | 79 | fill(vect + old_capacity, vect + vect_capacity, 0); |
| | 80 | } |
| | 81 | vect[user_id] = message_id; |
| | 82 | pthread_rwlock_unlock(&rwlock); |
| | 83 | } |
| | 84 | } |
| | 85 | }; |
| | 86 | |
| | 87 | static max_message_ids_t max_message_ids; |
| | 88 | |
| | 89 | |
| | 90 | template <typename Elem> struct sorted_list_t { |
| | 91 | |
| | 92 | Elem list[MAX_ROWS]; /* top MAX_ROWS, sorted in desc. order */ |
| | 93 | int cnt; |
| | 94 | |
| | 95 | sorted_list_t() : cnt(0) {} |
| | 96 | bool test_add(const Elem& item) { |
| | 97 | int keep; |
| | 98 | for (keep = cnt - 1; keep >= 0; keep--) { |
| | 99 | if (item <= list[keep]) { |
| | 100 | break; |
| | 101 | } |
| | 102 | } |
| | 103 | if (keep == MAX_ROWS - 1) { |
| | 104 | return false; |
| | 105 | } |
| | 106 | if (cnt < MAX_ROWS) { |
| | 107 | cnt++; |
| | 108 | } |
| | 109 | for (int i = cnt - 2; i > keep; i--) { |
| | 110 | list[i + 1] = list[i]; |
| | 111 | } |
| | 112 | list[keep + 1] = item; |
| | 113 | return true; |
| | 114 | } |
| | 115 | }; |
| | 116 | |
| | 117 | struct uid_maxid_t { |
| | 118 | unsigned user_id; |
| | 119 | unsigned max_id; |
| | 120 | uid_maxid_t() {} |
| | 121 | uid_maxid_t(unsigned u, unsigned m) : user_id(u), max_id(m) {} |
| | 122 | }; |
| | 123 | |
| | 124 | bool operator<=(const uid_maxid_t& x, const uid_maxid_t& y) { |
| | 125 | return x.max_id <= y.max_id; |
| | 126 | } |
| 55 | | uchar* follower_keybuff = (uchar*)alloca(follower_key->key_length); |
| 56 | | memset(follower_keybuff, 0, follower_key->key_length); |
| | 185 | } |
| | 186 | |
| | 187 | void per_user_search(unsigned follower_id, unsigned max_id) { |
| | 188 | uchar *keybuff = (uchar*)alloca(message_key->key_length); |
| | 189 | |
| | 190 | if (max_id == 0) { |
| | 191 | int4store(keybuff, follower_id); |
| | 192 | if (message_tbl->file->index_read_map(message_tbl->record[0], keybuff, 1, |
| | 193 | HA_READ_PREFIX_LAST) |
| | 194 | != 0) { |
| | 195 | return; |
| | 196 | } |
| | 197 | } else { |
| | 198 | int4store(keybuff, follower_id); |
| | 199 | int4store(keybuff + 4, max_id); |
| | 200 | if (message_tbl->file->index_read_map(message_tbl->record[0], keybuff, 3, |
| | 201 | HA_READ_BEFORE_KEY) |
| | 202 | != 0 || |
| | 203 | message_user_id_fld->val_int() != follower_id) { |
| | 204 | return; |
| | 205 | } |
| | 206 | } |
| | 207 | do { |
| | 208 | if (! message_ids.test_add(message_id_fld->val_int())) { |
| | 209 | break; |
| | 210 | } |
| | 211 | } while (message_tbl->file->index_prev(message_tbl->record[0]) == 0 |
| | 212 | && message_user_id_fld->val_int() == follower_id); |
| | 213 | } |
| | 214 | |
| | 215 | int do_task() { |
| | 216 | |
| | 217 | unsigned user_id = *(long long*)args->args[0]; |
| | 218 | unsigned max_id = args->arg_count >= 2 ? *(long long*)args->args[1] : 0; |
| | 219 | |
| | 220 | setup(); |
| 63 | | do { |
| 64 | | unsigned follower_id = follower_follower_id_fld->val_int(); |
| 65 | | int4store(message_keybuff, follower_id); |
| 66 | | if (message_tbl->file->index_read_map(message_tbl->record[0], |
| 67 | | message_keybuff, 1, |
| 68 | | HA_READ_PREFIX_LAST) |
| 69 | | == 0) { |
| 70 | | do { |
| 71 | | if (! test_add_id(message_id_fld->val_int())) { |
| 72 | | break; |
| 73 | | } |
| 74 | | } while (message_tbl->file->index_prev(message_tbl->record[0]) == 0 |
| 75 | | && message_user_id_fld->val_int() == follower_id); |
| | 229 | if (CACHE_MAX_MESSAGE_IDS && max_id == 0) { |
| | 230 | sorted_list_t<uid_maxid_t> uid_maxid_list; |
| | 231 | max_message_ids.lock_reader(); |
| | 232 | do { |
| | 233 | unsigned fid = follower_follower_id_fld->val_int(); |
| | 234 | uid_maxid_list.test_add(uid_maxid_t(fid, max_message_ids.get(fid))); |
| | 235 | } while (follower_tbl->file->index_next(follower_tbl->record[0]) == 0 |
| | 236 | && follower_user_id_fld->val_int() == user_id); |
| | 237 | max_message_ids.unlock_reader(); |
| | 238 | for (int i = 0; |
| | 239 | i < uid_maxid_list.cnt && uid_maxid_list.list[i].max_id != 0; |
| | 240 | i++) { |
| | 241 | per_user_search(uid_maxid_list.list[i].user_id, 0); |
| | 273 | |
| | 274 | extern "C" my_bool timeline_set_maxid_init(UDF_INIT *initid, UDF_ARGS *args, |
| | 275 | char *message) |
| | 276 | { |
| | 277 | if (args->arg_count != 2) { |
| | 278 | strcpy(message, "usage: timeline_set_maxid(user_id,max_id)"); |
| | 279 | return 1; |
| | 280 | } |
| | 281 | args->arg_type[0] = INT_RESULT; |
| | 282 | args->maybe_null[0] = 0; |
| | 283 | args->arg_type[1] = INT_RESULT; |
| | 284 | args->maybe_null[1] = 0; |
| | 285 | initid->maybe_null = 0; |
| | 286 | return 0; |
| | 287 | } |
| | 288 | |
| | 289 | extern "C" void timeline_set_maxid_deinit(UDF_INIT *initid) |
| | 290 | { |
| | 291 | } |
| | 292 | |
| | 293 | extern "C" long long timeline_set_maxid(UDF_INIT *initid, UDF_ARGS *args, |
| | 294 | char *is_null, char *error) |
| | 295 | { |
| | 296 | max_message_ids.set(*(long long*)args->args[0], *(long long*)args->args[1]); |
| | 297 | return 0; |
| | 298 | } |