64 #include "gw-config.h" 67 #include <sys/types.h> 91 static DBPool *pool = NULL;
93 struct store_db_fields {
99 static struct store_db_fields *fields = NULL;
140 static Msg *hash_msg_unpack(
Dict *hash)
149 #define INTEGER(name) \ 150 if ((os = dict_get(hash, octstr_imm(#name))) != NULL) \ 151 p->name = atol(octstr_get_cstr(os)); 152 #define OCTSTR(name) p->name = octstr_duplicate(dict_get(hash, octstr_imm(#name))); 154 if ((os = dict_get(hash, octstr_imm(#name))) != NULL) \ 155 uuid_parse(octstr_get_cstr(os), p->name); 157 #define MSG(type, stmt) \ 158 case type: { struct type *p = &msg->type; stmt } break; 163 panic(0,
"Internal error: unknown message type: %d",
171 static int store_redis_dump()
178 static long store_redis_messages()
184 static int redis_update(
const Octstr *cmd,
List *binds)
189 #if defined(REDIS_TRACE) 195 error(0,
"Database pool got no connection! Redis update failed!");
202 error(0,
"Store-Redis: Error while updating: command was `%s'",
221 rc = redis_update(cmd, NULL);
225 return rc == 1 ? 0 : -1;
273 #define INTEGER(name) \ 274 if (p->name != MSG_PARAM_UNDEFINED) { \ 275 gwlist_produce(b, octstr_imm(#name)); \ 276 gwlist_produce(b, octstr_format("%ld", p->name)); \ 278 #define OCTSTR(name) \ 279 if (p->name != NULL) { \ 280 gwlist_produce(b, octstr_imm(#name)); \ 281 gwlist_produce(b, octstr_duplicate(p->name)); \ 284 gwlist_produce(b, octstr_imm(#name)); \ 285 uuid_unparse(p->name, uuid); \ 286 gwlist_produce(b, octstr_create(uuid)); 288 #define MSG(type, stmt) \ 289 case type: { struct type *p = &msg->type; stmt } break; 294 panic(0,
"Internal error: unknown message type: %d",
299 rc = redis_update(cmd, b);
304 return rc == 0 ? 0 : -1;
308 static int store_redis_delete(
Octstr *
id)
316 rc = redis_update(cmd, NULL);
320 return rc == 1 ? 0 : -1;
324 static int store_redis_delete_hash(
Octstr *
id)
330 rc = redis_update(cmd, NULL);
334 return rc == 1 ? 0 : -1;
338 static struct store_db_fields *store_db_fields_create(
CfgGroup *grp)
340 struct store_db_fields *ret;
342 ret = gw_malloc(
sizeof(*ret));
344 memset(ret, 0,
sizeof(*ret));
348 panic(0,
"Directive 'table' is not specified in 'group = store-db' context!");
355 static void store_db_fields_destroy(
struct store_db_fields *fields)
369 static int store_redis_getall(
int ignore_err,
void(*cb)(
Octstr*,
void*),
void *data)
378 #if defined(REDIS_TRACE) 384 error(0,
"Database pool got no connection! Redis HGETALL failed!");
389 error(0,
"Failed to fetch messages from redis with cmd `%s'",
416 debug(
"store.redis", 0,
"No messages loaded from redis store");
424 static int store_redis_getall_hash(
int ignore_err,
void(*cb)(
Dict*,
void*),
void *data)
429 List *result, *row, *result_key, *row_key;
435 error(0,
"Database pool got no connection! Redis KEYS failed!");
444 #if defined(REDIS_TRACE) 473 row = row_key; row_key = NULL;
507 debug(
"store.redis", 0,
"No messages loaded from redis store");
510 }
while (cursor > 0);
538 static void status_cb_hash(
Dict *msg_h,
void *d)
543 msg = hash_msg_unpack(msg_h);
565 store_redis_getall_hash(1, status_cb_hash, &d);
574 void (*receive_msg)(
Msg*) =
data;
584 error(0,
"Could not unpack message from redis store!");
589 static void dispatch_hash(
Dict *msg_h,
void *
data)
592 void (*receive_msg)(
Msg*) =
data;
597 msg = hash_msg_unpack(msg_h);
602 error(0,
"Could not unpack message hash from redis store!");
607 static int store_redis_load(
void(*receive_msg)(
Msg*))
616 if (receive_msg == NULL)
624 rc = hash ? store_redis_getall_hash(0, dispatch_hash, receive_msg) :
625 store_redis_getall(0,
dispatch, receive_msg);
636 static int store_redis_save(
Msg *
msg)
647 time(&
msg->sms.time);
663 rc = store_redis_add_msg(id_s,
msg);
668 error(0,
"Could not pack message.");
671 rc = store_redis_add(id_s, os);
684 rc = store_redis_delete_hash(id_s);
686 rc = store_redis_delete(id_s);
707 nack->ack.time =
msg->sms.time;
708 ret = store_redis_save(nack);
715 static void store_redis_shutdown()
718 store_db_fields_destroy(fields);
725 int store_redis_init(
Cfg *
cfg)
729 Octstr *redis_host, *redis_pass, *redis_id;
730 long redis_port = 0, redis_database = -1, redis_idle_timeout = -1;
740 panic(0,
"Store-Redis: group 'store-db' is not specified!");
743 panic(0,
"Store-Redis: directive 'id' is not specified!");
747 fields = store_db_fields_create(grp);
777 panic(0,
"Connection settings for 'redis-connection' with id '%s' are not specified!",
787 panic(0,
"Directive 'host' is not specified in 'group = redis-connection' context!");
791 panic(0,
"Directive 'port' is not specified in 'group = redis-connection' context!");
800 db_conf = gw_malloc(
sizeof(
DBConf));
819 panic(0,
"Redis database pool has no connections!");
Dict * dict_create(long size_hint, void(*destroy_value)(void *))
void error(int err, const char *fmt,...)
void info(int err, const char *fmt,...)
long dbpool_conn_count(DBPool *p)
DBPool * dbpool_create(enum db_type db_type, DBConf *conf, unsigned int connections)
gw_assert(wtls_machine->packet_to_send !=NULL)
void dict_put(Dict *dict, Octstr *key, void *value)
void counter_destroy(Counter *counter)
void gwlist_produce(List *list, void *item)
long gwlist_len(List *list)
int(* store_save_ack)(Msg *msg, ack_status_t status)
long(* store_messages)(void)
static void dispatch(const Octstr *filename, void *data)
#define cfg_get(grp, varname)
void octstr_binary_to_base64(Octstr *ostr)
void uuid_unparse(const uuid_t uu, char *out)
void uuid_generate(uuid_t out)
unsigned long counter_decrease(Counter *counter)
Msg *(* store_msg_unpack)(Octstr *os)
#define octstr_get_cstr(ostr)
unsigned long counter_increase(Counter *counter)
void(* store_for_each_message)(void(*callback_fn)(Msg *msg, void *data), void *data)
void dbpool_conn_produce(DBPoolConn *conn)
Octstr * octstr_imm(const char *cstr)
Counter * counter_create(void)
void * gwlist_extract_first(List *list)
void grp_dump(CfgGroup *grp)
void gwlist_remove_producer(List *list)
int uuid_is_null(const uuid_t uu)
#define octstr_duplicate(ostr)
List * cfg_get_multi_group(Cfg *cfg, Octstr *name)
void uuid_copy(uuid_t dst, const uuid_t src)
void msg_destroy(Msg *msg)
Octstr * octstr_format(const char *fmt,...)
void octstr_destroy(Octstr *ostr)
#define octstr_create(cstr)
void octstr_destroy_item(void *os)
unsigned long counter_value(Counter *counter)
Octstr *(* store_msg_pack)(Msg *msg)
void octstr_base64_to_binary(Octstr *ostr)
void(* store_shutdown)(void)
void dict_destroy(Dict *dict)
void dbpool_destroy(DBPool *p)
int dbpool_conn_update(DBPoolConn *conn, const Octstr *sql, List *binds)
int cfg_get_bool(int *n, CfgGroup *grp, Octstr *varname)
void * gwlist_consume(List *list)
void debug(const char *place, int err, const char *fmt,...)
int cfg_get_integer(long *n, CfgGroup *grp, Octstr *varname)
int(* store_load)(void(*receive_msg)(Msg *))
#define MSG_PARAM_UNDEFINED
long octstr_parse_long(long *nump, Octstr *ostr, long pos, int base)
static void status_cb(Msg *msg, void *d)
int dbpool_conn_select(DBPoolConn *conn, const Octstr *sql, List *binds, List **result)
void(* callback_fn)(Msg *msg, void *data)
int(* store_save)(Msg *msg)
DBPoolConn * dbpool_conn_consume(DBPool *p)
CfgGroup * cfg_get_single_group(Cfg *cfg, Octstr *name)
void gwlist_add_producer(List *list)
static XMLRPCDocument * msg
int octstr_compare(const Octstr *ostr1, const Octstr *ostr2)
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)