Patch-Source: https://github.com/slact/nchan/commit/cc705a948cf6797f669b64649605f007f6f26a42 -- From cc705a948cf6797f669b64649605f007f6f26a42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Urquiza?= Date: Mon, 20 Mar 2023 10:31:33 -0300 Subject: [PATCH] Fix redis connection race condition on many workers (#666) diff --git a/src/nchan_module.c b/src/nchan_module.c index ad6f3534..95b51521 100644 --- a/src/nchan_module.c +++ b/src/nchan_module.c @@ -39,7 +39,6 @@ int nchan_redis_stats_enabled = 0; static void nchan_publisher_body_handler(ngx_http_request_t *r); -static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r); //#define DEBUG_LEVEL NGX_LOG_WARN //#define DEBUG_LEVEL NGX_LOG_DEBUG @@ -746,18 +745,6 @@ ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) { return NGX_OK; } - if(cf->redis.enabled && !nchan_store_redis_ready(cf)) { - //using redis, and it's not ready yet - if(r->method == NGX_HTTP_POST || r->method == NGX_HTTP_PUT) { - //discard request body before responding - nchan_http_publisher_handler(r, nchan_publisher_unavailable_body_handler); - } - else { - nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0); - } - return NGX_OK; - } - if(cf->pub.websocket || cf->pub.http) { char *err; if(!nchan_parse_message_buffer_config(r, cf, &err)) { @@ -841,6 +828,13 @@ ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) { #if FAKESHARD memstore_sub_debug_start(); #endif + + if(cf->redis.enabled && ngx_process_slot == memstore_channel_owner(channel_id) && !nchan_store_redis_ready(cf)) { + //using redis, and it's not ready yet + nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0); + return NGX_OK; + } + if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) { goto bad_msgid; } @@ -1249,11 +1243,6 @@ static ngx_int_t nchan_publisher_body_authorize_handler(ngx_http_request_t *r, v return NGX_OK; } -static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r) { - nchan_http_finalize_request(r, NGX_HTTP_SERVICE_UNAVAILABLE); - return; -} - static void nchan_publisher_body_handler(ngx_http_request_t *r) { ngx_str_t *channel_id; nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module); diff --git a/src/store/memory/ipc-handlers.c b/src/store/memory/ipc-handlers.c index 55df2fd3..bee7c1d9 100644 --- a/src/store/memory/ipc-handlers.c +++ b/src/store/memory/ipc-handlers.c @@ -47,6 +47,8 @@ L(benchmark_finish_reply) \ L(redis_stats_request) \ L(redis_stats_reply) \ + L(redis_conn_ready) \ + L(redis_conn_ready_reply) \ @@ -1205,6 +1207,35 @@ static void receive_redis_stats_reply(ngx_int_t sender, redis_stats_request_data shm_free(nchan_store_memory_shmem, data->stats); } +////////// REDIS CONN READY DATA //////////////// +typedef struct { + ngx_int_t ready; + nchan_loc_conf_t *cf; + callback_pt callback; + void *privdata; +} redis_conn_ready_data_t; + +ngx_int_t memstore_ipc_send_redis_conn_ready(ngx_int_t dst, nchan_loc_conf_t *cf, callback_pt callback, void* privdata) { + DBG("send redis_conn_ready to %i", dst); + redis_conn_ready_data_t data; + DEBUG_MEMZERO(&data); + data.ready = 0; + data.cf = cf; + data.callback = callback; + data.privdata = privdata; + + return ipc_cmd(redis_conn_ready, dst, &data); +} + +static void receive_redis_conn_ready(ngx_int_t sender, redis_conn_ready_data_t *d) { + DBG("received redis_conn_ready request for privdata %p", d->privdata); + d->ready = nchan_store_redis_ready(d->cf); + ipc_cmd(redis_conn_ready_reply, sender, d); +} + +static void receive_redis_conn_ready_reply(ngx_int_t sender, redis_conn_ready_data_t *d) { + d->callback(d->ready, NULL, d->privdata); +} #define MAKE_ipc_cmd_handler(val) [offsetof(ipc_handlers_t, val)/sizeof(ipc_handler_pt)] = (ipc_handler_pt )receive_ ## val, static ipc_handler_pt ipc_cmd_handler[] = { diff --git a/src/store/memory/ipc-handlers.h b/src/store/memory/ipc-handlers.h index e22c4a0d..7fd9c19c 100644 --- a/src/store/memory/ipc-handlers.h +++ b/src/store/memory/ipc-handlers.h @@ -26,3 +26,5 @@ ngx_int_t memstore_ipc_broadcast_benchmark_finish(void); ngx_int_t memstore_ipc_broadcast_benchmark_abort(void); ngx_int_t memstore_ipc_broadcast_redis_stats_request(void *nodeset, callback_pt cb, void *pd); + +ngx_int_t memstore_ipc_send_redis_conn_ready(ngx_int_t dst, nchan_loc_conf_t *cf, callback_pt callback, void* privdata); \ No newline at end of file diff --git a/src/store/memory/memstore.c b/src/store/memory/memstore.c index f8b4ee8b..e1e1c7a8 100755 --- a/src/store/memory/memstore.c +++ b/src/store/memory/memstore.c @@ -2079,12 +2079,13 @@ static void subscribe_data_free(subscribe_data_t *d) { #define SUB_CHANNEL_NOTSURE 2 static ngx_int_t nchan_store_subscribe_channel_existence_check_callback(ngx_int_t channel_status, void* _, subscribe_data_t *d); -static ngx_int_t nchan_store_subscribe_continued(ngx_int_t channel_status, void* _, subscribe_data_t *d); +static ngx_int_t nchan_store_subscribe_stage2(ngx_int_t continue_subscription, void* _, subscribe_data_t *d); +static ngx_int_t nchan_store_subscribe_stage3(ngx_int_t channel_status, void* _, subscribe_data_t *d); static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) { ngx_int_t owner = memstore_channel_owner(channel_id); subscribe_data_t *d = subscribe_data_alloc(sub->cf->redis.enabled ? -1 : owner); - + assert(d != NULL); d->channel_owner = owner; @@ -2095,24 +2096,51 @@ static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) d->channel_exists = 0; d->group_channel_limit_pass = 0; d->msg_id = sub->last_msgid; - - if(sub->cf->subscribe_only_existing_channel || sub->cf->max_channel_subscribers > 0) { + + if(sub->cf->redis.enabled && memstore_slot() != owner) { + ngx_int_t rc; sub->fn->reserve(sub); d->reserved = 1; - if(memstore_slot() != owner) { - ngx_int_t rc; - rc = memstore_ipc_send_channel_existence_check(owner, channel_id, sub->cf, (callback_pt )nchan_store_subscribe_channel_existence_check_callback, d); - if(rc == NGX_DECLINED) { // out of memory - nchan_store_subscribe_channel_existence_check_callback(SUB_CHANNEL_UNAUTHORIZED, NULL, d); - return NGX_ERROR; + rc = memstore_ipc_send_redis_conn_ready(d->channel_owner, sub->cf, (callback_pt)nchan_store_subscribe_stage2, d); + if(rc == NGX_DECLINED) { // out of memory + nchan_store_subscribe_stage2(0, NULL, d); + return NGX_ERROR; + } + } else { + return nchan_store_subscribe_stage2(1, NULL, d); + } + return NGX_OK; +} + +static ngx_int_t nchan_store_subscribe_stage2(ngx_int_t continue_subscription, void* _, subscribe_data_t *d) { + if(continue_subscription) { + if(d->sub->cf->subscribe_only_existing_channel || d->sub->cf->max_channel_subscribers > 0) { + if(!d->reserved) { + d->sub->fn->reserve(d->sub); + d->reserved = 1; + } + if(memstore_slot() != d->channel_owner) { + ngx_int_t rc; + rc = memstore_ipc_send_channel_existence_check(d->channel_owner, d->channel_id, d->sub->cf, (callback_pt )nchan_store_subscribe_channel_existence_check_callback, d); + if(rc == NGX_DECLINED) { // out of memory + nchan_store_subscribe_channel_existence_check_callback(SUB_CHANNEL_UNAUTHORIZED, NULL, d); + return NGX_ERROR; + } + } + else { + return nchan_store_subscribe_stage3(SUB_CHANNEL_NOTSURE, NULL, d); } } else { - return nchan_store_subscribe_continued(SUB_CHANNEL_NOTSURE, NULL, d); + return nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d); } - } - else { - return nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d); + } else { + //using redis, and it's not ready yet + if(d->sub->fn->release(d->sub, 0) == NGX_OK) { + d->reserved = 0; + nchan_respond_status(d->sub->request, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0); + } + subscribe_data_free(d); } return NGX_OK; } @@ -2120,7 +2148,7 @@ static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) static ngx_int_t nchan_store_subscribe_channel_existence_check_callback(ngx_int_t channel_status, void* _, subscribe_data_t *d) { if(d->sub->fn->release(d->sub, 0) == NGX_OK) { d->reserved = 0; - return nchan_store_subscribe_continued(channel_status, _, d); + return nchan_store_subscribe_stage3(channel_status, _, d); } else {//don't go any further, the sub has been deleted subscribe_data_free(d); @@ -2143,7 +2171,7 @@ static ngx_int_t redis_subscribe_channel_existence_callback(ngx_int_t status, vo /* else if (cf->max_channel_subscribers > 0) { // don't check this anymore -- a total subscribers count check is less - // useful as a per-instance check, which is handled in nchan_store_subscribe_continued + // useful as a per-instance check, which is handled in nchan_store_subscribe_stage3 // shared total subscriber count check can be re-enabled with another config setting channel_status = channel->subscribers >= cf->max_channel_subscribers ? SUB_CHANNEL_UNAUTHORIZED : SUB_CHANNEL_AUTHORIZED; } @@ -2152,7 +2180,7 @@ static ngx_int_t redis_subscribe_channel_existence_callback(ngx_int_t status, vo channel_status = SUB_CHANNEL_AUTHORIZED; } - nchan_store_subscribe_continued(channel_status, NULL, data); + nchan_store_subscribe_stage3(channel_status, NULL, data); } else { //error!! @@ -2196,7 +2224,7 @@ static ngx_int_t group_subscribe_channel_limit_reached(ngx_int_t rc, nchan_chann if(d->sub->status != DEAD) { if(chaninfo) { //ok, channel already exists. - nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d); + nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d); } else { //nope. no channel, no subscribing. @@ -2219,14 +2247,14 @@ static ngx_int_t group_subscribe_channel_limit_check(ngx_int_t _, nchan_group_t if(shm_group) { if(!shm_group->limit.channels || (shm_group->channels < shm_group->limit.channels)) { d->group_channel_limit_pass = 1; - rc = nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d); + rc = nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d); } else if (shm_group->limit.channels && shm_group->channels == shm_group->limit.channels){ //no new channels! rc = nchan_store_find_channel(d->channel_id, d->sub->cf, (callback_pt )group_subscribe_channel_limit_reached, d); } else { - rc = nchan_store_subscribe_continued(SUB_CHANNEL_UNAUTHORIZED, NULL, d); + rc = nchan_store_subscribe_stage3(SUB_CHANNEL_UNAUTHORIZED, NULL, d); } } @@ -2246,7 +2274,7 @@ static ngx_int_t group_subscribe_channel_limit_check(ngx_int_t _, nchan_group_t return rc; } -static ngx_int_t nchan_store_subscribe_continued(ngx_int_t channel_status, void* _, subscribe_data_t *d) { +static ngx_int_t nchan_store_subscribe_stage3(ngx_int_t channel_status, void* _, subscribe_data_t *d) { memstore_channel_head_t *chanhead = NULL; int retry_null_chanhead = 1; //store_message_t *chmsg;