Kannel: Open Source WAP and SMS gateway  svn-r5335
bb_store_redis.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
64 #include "gw-config.h"
65 
66 #include <unistd.h>
67 #include <sys/types.h>
68 #include <sys/stat.h>
69 #include <fcntl.h>
70 #include <dirent.h>
71 #include <errno.h>
72 
73 #include "gwlib/gwlib.h"
74 #include "msg.h"
75 #include "sms.h"
76 #include "bearerbox.h"
77 #include "bb_store.h"
78 
79 #ifdef HAVE_REDIS
80 #include "gwlib/dbpool.h"
81 
82 /*
83  * Define REDIS_TRACE to get DEBUG level output of the
84  * Redis commands send to the server.
85  */
86 /* #define REDIS_TRACE 1 */
87 
88 static Counter *counter;
89 static List *loaded;
90 
91 static DBPool *pool = NULL;
92 
93 struct store_db_fields {
94  Octstr *table;
95  Octstr *field_uuid;
96  Octstr *field_message;
97 };
98 
99 static struct store_db_fields *fields = NULL;
100 
101 static int hash = 0;
102 
103 
104 /*
105  * Convert a Msg structure to a Dict hash.
106  * This will assume we handle the msg->sms type only.
107  */
108 /*
109 static Dict *hash_msg_pack(Msg *msg)
110 {
111  Dict *h;
112 
113  gw_assert(msg->type == sms);
114 
115  h = dict_create(32, octstr_destroy_item);
116 
117 #define INTEGER(name) dict_put(h, octstr_imm(#name), octstr_format("%ld", p->name));
118 #define OCTSTR(name) dict_put(h, octstr_imm(#name), octstr_duplicate(p->name));
119 #define UUID(name) { \
120  char id[UUID_STR_LEN + 1]; \
121  uuid_unparse(p->name, id); \
122  dict_put(h, octstr_imm(#name), octstr_create(id)); \
123  }
124 #define VOID(name)
125 #define MSG(type, stmt) \
126  case type: { struct type *p = &msg->type; stmt } break;
127 
128  switch (msg->type) {
129 #include "msg-decl.h"
130  default:
131  panic(0, "Internal error: unknown message type: %d",
132  msg->type);
133  }
134 
135  return h;
136 }
137 */
138 
139 
140 static Msg *hash_msg_unpack(Dict *hash)
141 {
142  Msg *msg;
143  Octstr *os;
144 
145  if (hash == NULL)
146  return NULL;
147 
148  msg = msg_create(sms);
149 #define INTEGER(name) \
150  if ((os = dict_get(hash, octstr_imm(#name))) != NULL) \
151  p->name = atol(octstr_get_cstr(os));
152 #define OCTSTR(name) p->name = octstr_duplicate(dict_get(hash, octstr_imm(#name)));
153 #define UUID(name) \
154  if ((os = dict_get(hash, octstr_imm(#name))) != NULL) \
155  uuid_parse(octstr_get_cstr(os), p->name);
156 #define VOID(name)
157 #define MSG(type, stmt) \
158  case type: { struct type *p = &msg->type; stmt } break;
159 
160  switch (msg->type) {
161 #include "msg-decl.h"
162  default:
163  panic(0, "Internal error: unknown message type: %d",
164  msg->type);
165  }
166 
167  return msg;
168 }
169 
170 
171 static int store_redis_dump()
172 {
173  /* nothing to do */
174  return 0;
175 }
176 
177 
178 static long store_redis_messages()
179 {
180  return counter ? counter_value(counter) : -1;
181 }
182 
183 
184 static int redis_update(const Octstr *cmd, List *binds)
185 {
186  int res;
187  DBPoolConn *pc;
188 
189 #if defined(REDIS_TRACE)
190  debug("store.redis", 0, "redis cmd: %s", octstr_get_cstr(cmd));
191 #endif
192 
193  pc = dbpool_conn_consume(pool);
194  if (pc == NULL) {
195  error(0, "Database pool got no connection! Redis update failed!");
196  return 0;
197  }
198 
199  res = dbpool_conn_update(pc, cmd, binds);
200 
201  if (res < 0) {
202  error(0, "Store-Redis: Error while updating: command was `%s'",
203  octstr_get_cstr(cmd));
204  }
205 
207 
208  return res;
209 }
210 
211 
212 static int store_redis_add(Octstr *id, Octstr *os)
213 {
214  Octstr *cmd;
215  int rc;
216 
218  cmd = octstr_format("HSETNX %s %s %s",
219  octstr_get_cstr(fields->table),
221  rc = redis_update(cmd, NULL);
222 
223  octstr_destroy(cmd);
224 
225  return rc == 1 ? 0 : -1;
226 }
227 
228 
229 /*
230 static void store_redis_add_hash(Octstr *id, Dict *hash)
231 {
232  List *l, *b;
233  Octstr *cmd, *key, *val;
234 
235  cmd = octstr_create("");
236  b = gwlist_create();
237  gwlist_produce(b, octstr_create("HMSET"));
238  gwlist_produce(b, octstr_duplicate(id));
239  l = dict_keys(hash);
240  while ((key = gwlist_extract_first(l)) != NULL) {
241  if ((val = dict_get(hash, key)) != NULL) {
242  gwlist_produce(b, key);
243  gwlist_produce(b, octstr_duplicate(val));
244  }
245  }
246  gwlist_destroy(l, NULL);
247 
248  redis_update(cmd, b);
249 
250  gwlist_destroy(b, octstr_destroy_item);
251  octstr_destroy(cmd);
252 }
253 */
254 
255 
256 /*
257  * In order to a) speed-up the processing of the bind list in the dbpool_redis.c
258  * module and b) safe space in the redis-server memory, we will only store
259  * values that are set.
260  */
261 static int store_redis_add_msg(Octstr *id, Msg *msg)
262 {
263  List *b;
264  Octstr *cmd;
265  char uuid[UUID_STR_LEN + 1];
266  int rc;
267 
268  cmd = octstr_create("");
269  b = gwlist_create();
270  gwlist_produce(b, octstr_create("HMSET"));
272 
273 #define INTEGER(name) \
274  if (p->name != MSG_PARAM_UNDEFINED) { \
275  gwlist_produce(b, octstr_imm(#name)); \
276  gwlist_produce(b, octstr_format("%ld", p->name)); \
277  }
278 #define OCTSTR(name) \
279  if (p->name != NULL) { \
280  gwlist_produce(b, octstr_imm(#name)); \
281  gwlist_produce(b, octstr_duplicate(p->name)); \
282  }
283 #define UUID(name) \
284  gwlist_produce(b, octstr_imm(#name)); \
285  uuid_unparse(p->name, uuid); \
286  gwlist_produce(b, octstr_create(uuid));
287 #define VOID(name)
288 #define MSG(type, stmt) \
289  case type: { struct type *p = &msg->type; stmt } break;
290 
291  switch (msg->type) {
292 #include "msg-decl.h"
293  default:
294  panic(0, "Internal error: unknown message type: %d",
295  msg->type);
296  break;
297  }
298 
299  rc = redis_update(cmd, b);
300 
302  octstr_destroy(cmd);
303 
304  return rc == 0 ? 0 : -1;
305 }
306 
307 
308 static int store_redis_delete(Octstr *id)
309 {
310  Octstr *cmd;
311  int rc;
312 
313  cmd = octstr_format("HDEL %s %s",
314  octstr_get_cstr(fields->table),
315  octstr_get_cstr(id));
316  rc = redis_update(cmd, NULL);
317 
318  octstr_destroy(cmd);
319 
320  return rc == 1 ? 0 : -1;
321 }
322 
323 
324 static int store_redis_delete_hash(Octstr *id)
325 {
326  Octstr *cmd;
327  int rc;
328 
329  cmd = octstr_format("DEL %s", octstr_get_cstr(id));
330  rc = redis_update(cmd, NULL);
331 
332  octstr_destroy(cmd);
333 
334  return rc == 1 ? 0 : -1;
335 }
336 
337 
338 static struct store_db_fields *store_db_fields_create(CfgGroup *grp)
339 {
340  struct store_db_fields *ret;
341 
342  ret = gw_malloc(sizeof(*ret));
343  gw_assert(ret != NULL);
344  memset(ret, 0, sizeof(*ret));
345 
346  if ((ret->table = cfg_get(grp, octstr_imm("table"))) == NULL) {
347  grp_dump(grp);
348  panic(0, "Directive 'table' is not specified in 'group = store-db' context!");
349  }
350 
351  return ret;
352 }
353 
354 
355 static void store_db_fields_destroy(struct store_db_fields *fields)
356 {
357  /* sanity check */
358  if (fields == NULL)
359  return;
360 
361  octstr_destroy(fields->table);
362  octstr_destroy(fields->field_uuid);
363  octstr_destroy(fields->field_message);
364 
365  gw_free(fields);
366 }
367 
368 
369 static int store_redis_getall(int ignore_err, void(*cb)(Octstr*, void*), void *data)
370 {
371  DBPoolConn *pc;
372  Octstr *cmd;
373  Octstr *os, *key;
374  List *result, *row;
375 
376  cmd = octstr_format("HGETALL %s", octstr_get_cstr(fields->table));
377 
378 #if defined(REDIS_TRACE)
379  debug("store.redis", 0, "redis cmd: %s", octstr_get_cstr(cmd));
380 #endif
381 
382  pc = dbpool_conn_consume(pool);
383  if (pc == NULL) {
384  error(0, "Database pool got no connection! Redis HGETALL failed!");
386  return -1;
387  }
388  if (dbpool_conn_select(pc, cmd, NULL, &result) != 0) {
389  error(0, "Failed to fetch messages from redis with cmd `%s'",
390  octstr_get_cstr(cmd));
391  octstr_destroy(cmd);
393  return -1;
394  }
396  octstr_destroy(cmd);
397 
398  if (gwlist_len(result) == 1 && (row = gwlist_extract_first(result)) != NULL) {
399  while (gwlist_len(row) > 0) {
400  key = gwlist_extract_first(row);
401  os = gwlist_extract_first(row);
402  if (key && os) {
403  debug("store.redis", 0, "Found entry for message ID <%s>", octstr_get_cstr(key));
405  if (os == NULL) {
406  error(0, "Could not base64 decode message ID <%s>", octstr_get_cstr(key));
407  } else {
408  cb(os, data);
409  }
410  }
411  octstr_destroy(os);
412  octstr_destroy(key);
413  }
415  } else {
416  debug("store.redis", 0, "No messages loaded from redis store");
417  }
418  gwlist_destroy(result, NULL);
419 
420  return 0;
421 }
422 
423 
424 static int store_redis_getall_hash(int ignore_err, void(*cb)(Dict*, void*), void *data)
425 {
426  DBPoolConn *pc;
427  Octstr *cmd;
428  Octstr *os, *key, *id;
429  List *result, *row, *result_key, *row_key;
430  Dict *hash;
431  long cursor = 0;
432 
433  pc = dbpool_conn_consume(pool);
434  if (pc == NULL) {
435  error(0, "Database pool got no connection! Redis KEYS failed!");
437  return -1;
438  }
439 
440  do {
441  result = NULL;
442  cmd = octstr_format("SCAN %ld", cursor);
443 
444 #if defined(REDIS_TRACE)
445  debug("store.redis", 0, "redis cmd: %s", octstr_get_cstr(cmd));
446 #endif
447 
448  if (dbpool_conn_select(pc, cmd, NULL, &result) != 0) {
449  error(0, "Failed to fetch messages from redis with cmd `%s'", octstr_get_cstr(cmd));
450  octstr_destroy(cmd);
452  return -1;
453  }
454  octstr_destroy(cmd);
455 
456  if (gwlist_len(result) == 1 && ((row = gwlist_extract_first(result)) != NULL)) {
457  /* first entry is the next cursor */
458  if ((id = gwlist_extract_first(row)) == NULL || octstr_parse_long(&cursor, id, 0, 10) == -1) {
459  error(0, "Failed to parse next cursor from redis `%s'", octstr_get_cstr(id));
461  octstr_destroy(id);
462  /* second element is array of keys */
463  row_key = gwlist_extract_first(row);
465  gwlist_destroy(row, NULL);
466  gwlist_destroy(result, NULL);
467  return -1;
468  }
469  /* second element is array of keys */
470  octstr_destroy(id);
471  row_key = gwlist_extract_first(row);
472  gwlist_destroy(row, NULL);
473  row = row_key; row_key = NULL;
474 
475  /* loop via keys */
476  while ((id = gwlist_extract_first(row)) != NULL) {
477  cmd = octstr_format("HGETALL %s", octstr_get_cstr(id));
478  if (dbpool_conn_select(pc, cmd, NULL, &result_key) != 0) {
479  error(0, "Failed to fetch messages from redis with cmd `%s'", octstr_get_cstr(cmd));
481  octstr_destroy(cmd);
482  octstr_destroy(id);
484  return -1;
485  }
486  octstr_destroy(cmd);
487  octstr_destroy(id);
488 
489  if (gwlist_len(result_key) == 1 && ((row_key = gwlist_extract_first(result_key)) != NULL)) {
490  hash = dict_create(32, octstr_destroy_item);
491  while (gwlist_len(row_key) > 0) {
492  key = gwlist_extract_first(row_key);
493  os = gwlist_extract_first(row_key);
494  if (key && os) {
495  dict_put(hash, key, os);
496  }
497  octstr_destroy(key);
498  }
499  cb(hash, data);
500  dict_destroy(hash);
502  }
503  gwlist_destroy(result_key, NULL);
504  }
506  } else {
507  debug("store.redis", 0, "No messages loaded from redis store");
508  }
509  gwlist_destroy(result, NULL);
510  } while (cursor > 0);
511 
513 
514  return 0;
515 }
516 
517 
518 struct status {
519  void(*callback_fn)(Msg* msg, void *data);
520  void *data;
521 };
522 
523 
524 static void status_cb(Octstr *msg_s, void *d)
525 {
526  struct status *data = d;
527  Msg *msg;
528 
529  msg = store_msg_unpack(msg_s);
530  if (msg == NULL)
531  return;
532 
533  data->callback_fn(msg, data->data);
534 
535  msg_destroy(msg);
536 }
537 
538 static void status_cb_hash(Dict *msg_h, void *d)
539 {
540  struct status *data = d;
541  Msg *msg;
542 
543  msg = hash_msg_unpack(msg_h);
544  if (msg == NULL)
545  return;
546 
547  data->callback_fn(msg, data->data);
548 
549  msg_destroy(msg);
550 }
551 
552 
553 static void store_redis_for_each_message(void(*callback_fn)(Msg* msg, void *data), void *data)
554 {
555  struct status d;
556 
557  if (pool == NULL)
558  return;
559 
561  d.data = data;
562 
563  /* ignore error because files may disappear */
564  if (hash)
565  store_redis_getall_hash(1, status_cb_hash, &d);
566  else
567  store_redis_getall(1, status_cb, &d);
568 }
569 
570 
571 static void dispatch(Octstr *msg_s, void *data)
572 {
573  Msg *msg;
574  void (*receive_msg)(Msg*) = data;
575 
576  if (msg_s == NULL)
577  return;
578 
579  msg = store_msg_unpack(msg_s);
580  if (msg != NULL) {
581  receive_msg(msg);
583  } else {
584  error(0, "Could not unpack message from redis store!");
585  }
586 }
587 
588 
589 static void dispatch_hash(Dict *msg_h, void *data)
590 {
591  Msg *msg;
592  void (*receive_msg)(Msg*) = data;
593 
594  if (msg_h == NULL)
595  return;
596 
597  msg = hash_msg_unpack(msg_h);
598  if (msg != NULL) {
599  receive_msg(msg);
601  } else {
602  error(0, "Could not unpack message hash from redis store!");
603  }
604 }
605 
606 
607 static int store_redis_load(void(*receive_msg)(Msg*))
608 {
609  int rc;
610 
611  /* check if we are active */
612  if (pool == NULL)
613  return 0;
614 
615  /* sanity check */
616  if (receive_msg == NULL)
617  return -1;
618 
619  /*
620  * We will use a Dict as an intermediate data structure to re-construct the
621  * Msg struct itself. This is faster, then using pre-processor magic and
622  * then strcmp() on the msg field names.
623  */
624  rc = hash ? store_redis_getall_hash(0, dispatch_hash, receive_msg) :
625  store_redis_getall(0, dispatch, receive_msg);
626 
627  info(0, "Loaded %ld messages from store.", counter_value(counter));
628 
629  /* allow using of storage */
631 
632  return rc;
633 }
634 
635 
636 static int store_redis_save(Msg *msg)
637 {
638  char id[UUID_STR_LEN + 1];
639  Octstr *id_s;
640  int rc = 0;
641 
642  /* always set msg id and timestamp */
643  if (msg_type(msg) == sms && uuid_is_null(msg->sms.id))
644  uuid_generate(msg->sms.id);
645 
646  if (msg_type(msg) == sms && msg->sms.time == MSG_PARAM_UNDEFINED)
647  time(&msg->sms.time);
648 
649  if (pool == NULL)
650  return -1;
651 
652  /* block here if store still not loaded */
654 
655  switch (msg_type(msg)) {
656  case sms:
657  {
658  uuid_unparse(msg->sms.id, id);
659  id_s = octstr_create(id);
660 
661  /* XXX we could use function pointers to avoid iteration checks */
662  if (hash) {
663  rc = store_redis_add_msg(id_s, msg);
664  } else {
665  Octstr *os = store_msg_pack(msg);
666 
667  if (os == NULL) {
668  error(0, "Could not pack message.");
669  return -1;
670  }
671  rc = store_redis_add(id_s, os);
672  octstr_destroy(os);
673  }
674  octstr_destroy(id_s);
675  if (!rc)
677  break;
678  }
679  case ack:
680  {
681  uuid_unparse(msg->ack.id, id);
682  id_s = octstr_create(id);
683  if (hash)
684  rc = store_redis_delete_hash(id_s);
685  else
686  rc = store_redis_delete(id_s);
687  octstr_destroy(id_s);
688  if (!rc)
690  break;
691  }
692  default:
693  return -1;
694  }
695 
696  return rc;
697 }
698 
699 
700 static int store_redis_save_ack(Msg *msg, ack_status_t status)
701 {
702  int ret;
703  Msg *nack = msg_create(ack);
704 
705  nack->ack.nack = status;
706  uuid_copy(nack->ack.id, msg->sms.id);
707  nack->ack.time = msg->sms.time;
708  ret = store_redis_save(nack);
709  msg_destroy(nack);
710 
711  return ret;
712 }
713 
714 
715 static void store_redis_shutdown()
716 {
717  dbpool_destroy(pool);
718  store_db_fields_destroy(fields);
719 
721  gwlist_destroy(loaded, NULL);
722 }
723 
724 
725 int store_redis_init(Cfg *cfg)
726 {
727  CfgGroup *grp;
728  List *grplist;
729  Octstr *redis_host, *redis_pass, *redis_id;
730  long redis_port = 0, redis_database = -1, redis_idle_timeout = -1;
731  Octstr *p = NULL;
732  long pool_size;
733  DBConf *db_conf = NULL;
734 
735  /*
736  * Check for all mandatory directives that specify the field names
737  * of the used Redis key
738  */
739  if (!(grp = cfg_get_single_group(cfg, octstr_imm("store-db"))))
740  panic(0, "Store-Redis: group 'store-db' is not specified!");
741 
742  if (!(redis_id = cfg_get(grp, octstr_imm("id"))))
743  panic(0, "Store-Redis: directive 'id' is not specified!");
744 
745  cfg_get_bool(&hash, grp, octstr_imm("hash"));
746 
747  fields = store_db_fields_create(grp);
748  gw_assert(fields != NULL);
749 
750  /* select corresponding functions */
751  store_messages = store_redis_messages;
752  store_save = store_redis_save;
753  store_save_ack = store_redis_save_ack;
754  store_load = store_redis_load;
755  store_dump = store_redis_dump;
756  store_shutdown = store_redis_shutdown;
757  store_for_each_message = store_redis_for_each_message;
758 
759  /*
760  * Now grab the required information from the 'redis-connection' group
761  * with the id we just obtained.
762  *
763  * We have to loop through all available Redis connection definitions
764  * and search for the one we are looking for.
765  */
766  grplist = cfg_get_multi_group(cfg, octstr_imm("redis-connection"));
767  while (grplist && (grp = gwlist_extract_first(grplist)) != NULL) {
768  p = cfg_get(grp, octstr_imm("id"));
769  if (p != NULL && octstr_compare(p, redis_id) == 0) {
770  octstr_destroy(p);
771  break;
772  }
773  if (p != NULL)
774  octstr_destroy(p);
775  }
776  if (grp == NULL)
777  panic(0, "Connection settings for 'redis-connection' with id '%s' are not specified!",
778  octstr_get_cstr(redis_id));
779 
780  gwlist_destroy(grplist, NULL);
781 
782  if (cfg_get_integer(&pool_size, grp, octstr_imm("max-connections")) == -1 || pool_size == 0)
783  pool_size = 1;
784 
785  if (!(redis_host = cfg_get(grp, octstr_imm("host")))) {
786  grp_dump(grp);
787  panic(0, "Directive 'host' is not specified in 'group = redis-connection' context!");
788  }
789  if (cfg_get_integer(&redis_port, grp, octstr_imm("port")) == -1) {
790  grp_dump(grp);
791  panic(0, "Directive 'port' is not specified in 'group = redis-connection' context!");
792  }
793  redis_pass = cfg_get(grp, octstr_imm("password"));
794  cfg_get_integer(&redis_database, grp, octstr_imm("database"));
795  cfg_get_integer(&redis_idle_timeout, grp, octstr_imm("idle-timeout"));
796 
797  /*
798  * Ok, ready to connect to Redis
799  */
800  db_conf = gw_malloc(sizeof(DBConf));
801  gw_assert(db_conf != NULL);
802 
803  db_conf->redis = gw_malloc(sizeof(RedisConf));
804  gw_assert(db_conf->redis != NULL);
805 
806  db_conf->redis->host = redis_host;
807  db_conf->redis->port = redis_port;
808  db_conf->redis->password = redis_pass;
809  db_conf->redis->database = redis_database;
810  db_conf->redis->idle_timeout = redis_idle_timeout;
811 
812  pool = dbpool_create(DBPOOL_REDIS, db_conf, pool_size);
813  gw_assert(pool != NULL);
814 
815  /*
816  * Panic on failure to connect. Should we just try to reconnect?
817  */
818  if (dbpool_conn_count(pool) == 0)
819  panic(0, "Redis database pool has no connections!");
820 
821  loaded = gwlist_create();
824 
825  octstr_destroy(redis_id);
826 
827  return 0;
828 }
829 
830 #endif
Dict * dict_create(long size_hint, void(*destroy_value)(void *))
Definition: dict.c:192
void error(int err, const char *fmt,...)
Definition: log.c:648
void info(int err, const char *fmt,...)
Definition: log.c:672
long dbpool_conn_count(DBPool *p)
DBPool * dbpool_create(enum db_type db_type, DBConf *conf, unsigned int connections)
gw_assert(wtls_machine->packet_to_send !=NULL)
RedisConf * redis
Definition: dbpool.h:172
void dict_put(Dict *dict, Octstr *key, void *value)
Definition: dict.c:240
void counter_destroy(Counter *counter)
Definition: counter.c:110
void gwlist_produce(List *list, void *item)
Definition: list.c:411
long gwlist_len(List *list)
Definition: list.c:166
int(* store_save_ack)(Msg *msg, ack_status_t status)
Definition: bb_store.c:73
static List * loaded
Definition: bb_store_file.c:99
long(* store_messages)(void)
Definition: bb_store.c:71
static void dispatch(const Octstr *filename, void *data)
msg_type
Definition: msg.h:73
#define cfg_get(grp, varname)
Definition: cfg.h:86
void octstr_binary_to_base64(Octstr *ostr)
Definition: octstr.c:542
void uuid_unparse(const uuid_t uu, char *out)
Definition: gw_uuid.c:562
void uuid_generate(uuid_t out)
Definition: gw_uuid.c:393
#define msg_create(type)
Definition: msg.h:136
unsigned long counter_decrease(Counter *counter)
Definition: counter.c:155
Msg *(* store_msg_unpack)(Octstr *os)
Definition: bb_store.c:78
static Cfg * cfg
Definition: opensmppbox.c:95
int(* store_dump)(void)
Definition: bb_store.c:75
Octstr * password
Definition: dbpool.h:150
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
unsigned long counter_increase(Counter *counter)
Definition: counter.c:123
void(* store_for_each_message)(void(*callback_fn)(Msg *msg, void *data), void *data)
Definition: bb_store.c:79
void dbpool_conn_produce(DBPoolConn *conn)
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:283
Definition: msg.h:79
Definition: cfg.c:164
Counter * counter_create(void)
Definition: counter.c:94
Definition: gw_uuid.c:62
void * gwlist_extract_first(List *list)
Definition: list.c:305
void grp_dump(CfgGroup *grp)
Definition: cfg.c:811
void gwlist_remove_producer(List *list)
Definition: list.c:401
int uuid_is_null(const uuid_t uu)
Definition: gw_uuid.c:413
Definition: dict.c:116
#define octstr_duplicate(ostr)
Definition: octstr.h:187
List * cfg_get_multi_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:645
void uuid_copy(uuid_t dst, const uuid_t src)
Definition: gw_uuid.c:150
void msg_destroy(Msg *msg)
Definition: msg.c:132
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2464
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:324
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:336
unsigned long counter_value(Counter *counter)
Definition: counter.c:145
void * data
Octstr *(* store_msg_pack)(Msg *msg)
Definition: bb_store.c:77
void octstr_base64_to_binary(Octstr *ostr)
Definition: octstr.c:663
#define UUID_STR_LEN
Definition: gw_uuid.h:19
long database
Definition: dbpool.h:151
void(* store_shutdown)(void)
Definition: bb_store.c:76
Definition: dbpool.h:164
void dict_destroy(Dict *dict)
Definition: dict.c:215
void dbpool_destroy(DBPool *p)
int dbpool_conn_update(DBPoolConn *conn, const Octstr *sql, List *binds)
int cfg_get_bool(int *n, CfgGroup *grp, Octstr *varname)
Definition: cfg.c:759
Definition: octstr.c:118
void * gwlist_consume(List *list)
Definition: list.c:427
long port
Definition: dbpool.h:149
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:726
int cfg_get_integer(long *n, CfgGroup *grp, Octstr *varname)
Definition: cfg.c:742
#define panic
Definition: log.h:87
Definition: cfg.c:73
int(* store_load)(void(*receive_msg)(Msg *))
Definition: bb_store.c:74
#define MSG_PARAM_UNDEFINED
Definition: msg.h:71
long octstr_parse_long(long *nump, Octstr *ostr, long pos, int base)
Definition: octstr.c:749
static void status_cb(Msg *msg, void *d)
Definition: bb_store.c:111
int dbpool_conn_select(DBPoolConn *conn, const Octstr *sql, List *binds, List **result)
#define gwlist_create()
Definition: list.h:136
void(* callback_fn)(Msg *msg, void *data)
int(* store_save)(Msg *msg)
Definition: bb_store.c:72
DBPoolConn * dbpool_conn_consume(DBPool *p)
ack_status_t
Definition: msg.h:124
Octstr * host
Definition: dbpool.h:148
CfgGroup * cfg_get_single_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:639
void gwlist_add_producer(List *list)
Definition: list.c:383
Definition: list.c:102
static XMLRPCDocument * msg
Definition: test_xmlrpc.c:86
static Counter * counter
Octstr * status
Definition: bb_store.c:108
int octstr_compare(const Octstr *ostr1, const Octstr *ostr2)
Definition: octstr.c:871
long idle_timeout
Definition: dbpool.h:152
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.