Kannel: Open Source WAP and SMS gateway  $Revision: 5037 $
dbpool_redis.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * dbpool_redis.c - implement REDIS operations for generic database connection pool
59  *
60  * In the redis_[update|select]() functions no NULL values are allowed in the
61  * binds list, otherwise we may get value corruption when getting back values
62  * from redis.
63  *
64  * Toby Phipps <toby.phipps at nexmedia.com.sg>, 2011 Initial version.
65  * Stipe Tolj <stolj at kannel.org>, 2013, 2015
66  */
67 
68 #ifdef HAVE_REDIS
69 #include <hiredis.h>
70 
71 /*
72  * Define REDIS_DEBUG to get DEBUG level output of the
73  * Redis commands send to the server.
74  */
75 /* #define REDIS_DEBUG 1 */
76 
77 #define REDIS_DEFAULT_PORT 6379
78 
79 
80 static void *redis_open_conn(const DBConf *db_conf)
81 {
82  redisContext *redis = NULL;
83  RedisConf *conf = db_conf->redis; /* make compiler happy */
84  redisReply *reply = NULL;
85  Octstr *os, *line;
86  List *lines;
87  long delimiter;
88 
89  /* sanity check */
90  if (conf == NULL)
91  return NULL;
92 
93  struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
94  redis = redisConnectWithTimeout(octstr_get_cstr(conf->host), conf->port, timeout);
95  if (redis->err) {
96  error(0, "REDIS: can not connect to server!");
97  error(0, "REDIS: %s", redis->errstr);
98  goto failed;
99  }
100 
101  info(0, "REDIS: Connected to server at %s:%ld.",
102  octstr_get_cstr(conf->host), conf->port);
103 
104  if (conf->password != NULL) {
105  reply = redisCommand(redis, "AUTH %s", octstr_get_cstr(conf->password));
106  if (strncmp("OK", reply->str, 2) != 0) {
107  error(0, "REDIS: Password authentication failed!");
108  goto failed;
109  }
110  freeReplyObject(reply);
111  }
112 
113  if (conf->idle_timeout != -1) {
114  reply = redisCommand(redis, "CONFIG SET TIMEOUT %ld", conf->idle_timeout);
115  if (strncmp("OK", reply->str, 2) != 0)
116  warning(0, "REDIS: CONFIG SET TIMEOUT %ld failed - could not set timeout",
117  conf->idle_timeout);
118  else
119  info(0, "REDIS: Set idle timeout to %ld seconds", conf->idle_timeout);
120  freeReplyObject(reply);
121  }
122 
123  if (conf->database != -1) {
124  reply = redisCommand(redis,"SELECT %ld", conf->database);
125  if (strncmp("OK", reply->str, 2) != 0)
126  error(0,"REDIS: SELECT %ld failed - could not select database", conf->database);
127  else
128  info(0,"REDIS: Selected database %ld", conf->database);
129  freeReplyObject(reply);
130  }
131 
132  reply = redisCommand(redis, "INFO");
133  if (reply->type != REDIS_REPLY_STRING) {
134  error(0, "REDIS: INFO command to get version failed!");
135  goto failed;
136  }
137 
138  os = octstr_create(reply->str);
139 
140 #if defined(REDIS_DEBUG)
141  debug("dbpool.redis",0,"Received REDIS_REPLY_STRING for INFO cmd");
142  /* octstr_dump(os, 0); */
143 #endif
144 
145  lines = octstr_split(os, octstr_imm("\n"));
146  octstr_destroy(os);
147  os = NULL;
148 
149  while ((line = gwlist_extract_first(lines)) != NULL) {
150  Octstr *key, *value;
151 
152  /* comment line */
153  if (octstr_get_char(line, 0) == '#') {
154  octstr_destroy(line);
155  continue;
156  }
157  delimiter = octstr_search_char(line, ':', 0);
158  key = octstr_copy(line, 0, delimiter);
159  octstr_strip_blanks(key);
160  value = octstr_copy(line, delimiter + 1, octstr_len(line));
161  octstr_strip_blanks(value);
162  if (octstr_str_compare(key, "redis_version") == 0) {
163  os = octstr_duplicate(value);
164  octstr_destroy(key);
165  octstr_destroy(value);
166  octstr_destroy(line);
167  break;
168  }
169  octstr_destroy(key);
170  octstr_destroy(value);
171  octstr_destroy(line);
172  }
174 
175  if (os == NULL) {
176  error(0, "REDIS: Could not parse version from INFO output!");
177  goto failed;
178  }
179 
180  info(0, "REDIS: server version %s.", octstr_get_cstr(os));
181  octstr_destroy(os);
182 
183  freeReplyObject(reply);
184  return redis;
185 
186 failed:
187  if (reply != NULL)
188  freeReplyObject(reply);
189  if (redis != NULL)
190  redisFree(redis);
191 
192  return NULL;
193 }
194 
195 
196 static void redis_close_conn(void *conn)
197 {
198  if (conn == NULL)
199  return;
200 
201  redisFree((redisContext*) conn);
202 }
203 
204 
205 static int redis_check_conn(void *conn)
206 {
207  redisReply *reply;
208 
209  if (conn == NULL)
210  return -1;
211 
212  reply = redisCommand(conn, "PING");
213  if (reply != NULL) {
214  if (strcmp(reply->str,"PONG") == 0) {
215  freeReplyObject(reply);
216  return 0;
217  }
218  }
219 
220  error(0, "REDIS: server connection check failed!");
221  error(0, "REDIS: %s", ((redisContext*)conn)->errstr);
222  if (reply != NULL)
223  freeReplyObject(reply);
224  return -1;
225 }
226 
227 
228 static int redis_select(void *conn, const Octstr *sql, List *binds, List **res)
229 {
230  redisReply *reply;
231  long i, binds_len;
232  List *row;
233  Octstr *temp = NULL;
234  const char **argv;
235 
236  /* bind parameters if any */
237  binds_len = gwlist_len(binds);
238 
239  if (binds_len > 0) {
240 #if defined(REDIS_DEBUG)
241  Octstr *os = octstr_create("");;
242 #endif
243 
244  argv = gw_malloc(sizeof(*argv) * binds_len);
245  for (i = 0; i < binds_len; i++) {
246  argv[i] = (char*)octstr_get_cstr(gwlist_get(binds, i));
247 #if defined(REDIS_DEBUG)
248  octstr_format_append(os, "\"%s\" ", argv[i]);
249 #endif
250  }
251 
252 #if defined(REDIS_DEBUG)
253  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(os));
254  octstr_destroy(os);
255 #endif
256 
257  /* execute statement */
258  reply = redisCommandArgv(conn, binds_len, argv, NULL);
259 
260  gw_free(argv);
261 
262  } else {
263 
264 #if defined(REDIS_DEBUG)
265  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(sql));
266 #endif
267 
268  /* execute statement */
269  reply = redisCommand(conn, octstr_get_cstr(sql));
270  }
271 
272  if (reply == NULL)
273  return -1;
274 
275  /* evaluate reply */
276  switch (reply->type) {
277  case REDIS_REPLY_ERROR:
278 #if defined(REDIS_DEBUG)
279  debug("dbpool.redis",0,"Received REDIS_REPLY_ERROR");
280 #endif
281  error(0, "REDIS: redisCommand() failed: `%s'", reply->str);
282  break;
283  case REDIS_REPLY_NIL:
284 #if defined(REDIS_DEBUG)
285  debug("dbpool.redis",0,"Received REDIS_REPLY_NIL");
286 #endif
287  break;
288  case REDIS_REPLY_STATUS:
289 #if defined(REDIS_DEBUG)
290  debug("dbpool.redis",0,"Received REDIS_REPLY_STATUS");
291 #endif
292  break;
293 
294  case REDIS_REPLY_STRING:
295 #if defined(REDIS_DEBUG)
296  debug("dbpool.redis",0,"Received REDIS_REPLY_STRING");
297 #endif
298  *res = gwlist_create();
299  row = gwlist_create();
300  temp = octstr_create_from_data(reply->str, reply->len);
301  gwlist_append(row, temp);
302  gwlist_produce(*res, row);
303  freeReplyObject(reply);
304  return 0;
305  break;
306 
307  case REDIS_REPLY_INTEGER:
308 #if defined(REDIS_DEBUG)
309  debug("dbpool.redis",0,"Received REDIS_REPLY_INTEGER");
310 #endif
311  *res = gwlist_create();
312  row = gwlist_create();
313  temp = octstr_format("%ld", reply->integer);
314  gwlist_append(row, temp);
315  gwlist_produce(*res, row);
316  freeReplyObject(reply);
317  return 0;
318  break;
319 
320  case REDIS_REPLY_ARRAY:
321 #if defined(REDIS_DEBUG)
322  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY");
323 #endif
324  *res = gwlist_create();
325  row = gwlist_create();
326  for (i = 0; i < reply->elements; i++) {
327  if (reply->element[i]->type == REDIS_REPLY_NIL ||
328  reply->element[i]->str == NULL || reply->element[i]->len == 0) {
329  gwlist_produce(row, octstr_imm(""));
330  continue;
331  }
332  temp = octstr_create_from_data(reply->element[i]->str, reply->element[i]->len);
333 #if defined(REDIS_DEBUG)
334  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY[%ld]: %s", i, octstr_get_cstr(temp));
335 #endif
336  gwlist_append(row, temp);
337  }
338  gwlist_produce(*res, row);
339  freeReplyObject(reply);
340  return 0;
341  break;
342 
343  default:
344 #if defined(REDIS_DEBUG)
345  error(0,"REDIS: Received unknown Redis reply type %d", reply->type);
346 #endif
347  break;
348  }
349 
350  freeReplyObject(reply);
351 
352  return -1;
353 }
354 
355 
356 static int redis_update(void *conn, const Octstr *sql, List *binds)
357 {
358  long i, binds_len;
359  int ret;
360  redisReply *reply;
361  const char **argv;
362 
363  /* bind parameters if any */
364  binds_len = gwlist_len(binds);
365 
366  if (binds_len > 0) {
367 #if defined(REDIS_DEBUG)
368  Octstr *os = octstr_create("");;
369 #endif
370 
371  argv = gw_malloc(sizeof(*argv) * binds_len);
372  for (i = 0; i < binds_len; i++) {
373  argv[i] = (char*)octstr_get_cstr(gwlist_get(binds, i));
374 #if defined(REDIS_DEBUG)
375  octstr_format_append(os, "\"%s\" ", argv[i]);
376 #endif
377  }
378 
379 #if defined(REDIS_DEBUG)
380  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(os));
381  octstr_destroy(os);
382 #endif
383 
384  /* execute statement */
385  reply = redisCommandArgv(conn, binds_len, argv, NULL);
386 
387  gw_free(argv);
388 
389  } else {
390 
391 #if defined(REDIS_DEBUG)
392  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(sql));
393 #endif
394 
395  /* execute statement */
396  reply = redisCommand(conn, octstr_get_cstr(sql));
397  }
398 
399  if (reply == NULL)
400  return -1;
401 
402  /* evaluate reply */
403  switch (reply->type) {
404  case REDIS_REPLY_ERROR:
405 #if defined(REDIS_DEBUG)
406  debug("dbpool.redis",0,"Received REDIS_REPLY_ERROR");
407 #endif
408  error(0, "REDIS: redisCommand() failed: `%s'", reply->str);
409  break;
410  case REDIS_REPLY_STATUS:
411  /* Some Redis commands (e.g. WATCH) return a boolean status */
412 #if defined(REDIS_DEBUG)
413  debug("dbpool.redis",0,"Received REDIS_REPLY_STATUS: %s", reply->str);
414 #endif
415  if (strcmp(reply->str, "OK") == 0) {
416  freeReplyObject(reply);
417  return 0;
418  }
419  break;
420  case REDIS_REPLY_INTEGER:
421  /* Other commands (e.g. DEL) return an integer indicating
422  * the number of keys affected */
423 #if defined(REDIS_DEBUG)
424  debug("dbpool.redis",0,"Received REDIS_REPLY_INTEGER: %qi", reply->integer);
425 #endif
426  /*
427  * Note: Redis returns a long long. Casting it to an int here could
428  * cause precision loss, however as we're returning an update status,
429  * this should only ever be used to return a count of keys
430  * deleted/updated, and this will almost invariably be 1.
431  */
432  ret = (int)reply->integer;
433  freeReplyObject(reply);
434  return ret;
435  break;
436  case REDIS_REPLY_ARRAY:
437  /* The EXEC command returns an array of replies
438  * when executed successfully */
439 #if defined(REDIS_DEBUG)
440  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY");
441 #endif
442  freeReplyObject(reply);
443  /* For now, we only support EXEC commands with an array
444  * return and in that case, all is well */
445  return 0;
446  break;
447  case REDIS_REPLY_NIL:
448  /* Finally, the EXEC command can return a NULL
449  * if it fails (e.g. due to a WATCH triggering */
450 #if defined(REDIS_DEBUG)
451  debug("dbpool.redis",0,"Received REDIS_REPLY_NIL");
452 #endif
453  break;
454  default:
455 #if defined(REDIS_DEBUG)
456  debug("dbpool.redis",0,"Received unknown Redis reply %d", reply->type);
457 #endif
458  break;
459  }
460 
461  freeReplyObject(reply);
462 
463  return -1;
464 }
465 
466 
467 static void redis_conf_destroy(DBConf *db_conf)
468 {
469  RedisConf *conf = db_conf->redis;
470 
471  octstr_destroy(conf->host);
472  octstr_destroy(conf->password);
473 
474  gw_free(conf);
475  gw_free(db_conf);
476 }
477 
478 
479 static struct db_ops redis_ops = {
480  .open = redis_open_conn,
481  .close = redis_close_conn,
482  .check = redis_check_conn,
483  .select = redis_select,
484  .update = redis_update,
485  .conf_destroy = redis_conf_destroy
486 };
487 
488 #endif /* HAVE_REDIS */
void error(int err, const char *fmt,...)
Definition: log.c:648
static Octstr * delimiter
Definition: test_ppg.c:104
void info(int err, const char *fmt,...)
Definition: log.c:672
RedisConf * redis
Definition: dbpool.h:172
void gwlist_append(List *list, void *item)
Definition: list.c:179
void gwlist_produce(List *list, void *item)
Definition: list.c:411
long gwlist_len(List *list)
Definition: list.c:166
void * gwlist_get(List *list, long pos)
Definition: list.c:292
void octstr_strip_blanks(Octstr *text)
Definition: octstr.c:1344
Octstr * password
Definition: dbpool.h:150
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
#define octstr_copy(ostr, from, len)
Definition: octstr.h:178
long octstr_search_char(const Octstr *ostr, int ch, long pos)
Definition: octstr.c:1010
static List * lines
Definition: mtbatch.c:88
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:281
void * gwlist_extract_first(List *list)
Definition: list.c:305
#define octstr_duplicate(ostr)
Definition: octstr.h:187
void warning(int err, const char *fmt,...)
Definition: log.c:660
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2462
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:322
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:334
long database
Definition: dbpool.h:151
Definition: dbpool.h:164
long octstr_len(const Octstr *ostr)
Definition: octstr.c:340
Definition: octstr.c:118
long port
Definition: dbpool.h:149
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:726
int octstr_str_compare(const Octstr *ostr, const char *str)
Definition: octstr.c:971
void octstr_format_append(Octstr *os, const char *fmt,...)
Definition: octstr.c:2505
#define gwlist_create()
Definition: list.h:136
Octstr * host
Definition: dbpool.h:148
int octstr_get_char(const Octstr *ostr, long pos)
Definition: octstr.c:404
#define octstr_create_from_data(data, len)
Definition: octstr.h:134
List * octstr_split(const Octstr *os, const Octstr *sep)
Definition: octstr.c:1638
Definition: list.c:102
void *(* open)(const DBConf *conf)
Definition: dbpool_p.h:73
static void reply(HTTPClient *c, List *push_headers)
long idle_timeout
Definition: dbpool.h:152
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.