diff --git a/.gitignore b/.gitignore index 334bfef..7cd2ca3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # Prerequisites *.d +# Cache +.cache + # Object files *.o *.ko diff --git a/Makefile b/Makefile index f3132c6..f8e514d 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ .PHONY: build clean download-nginx build-nginx configure build-module run test debug -build: clean download-nginx build-nginx +setup: clean download-nginx configure + +build: build-nginx build-module download-nginx: curl https://nginx.org/download/nginx-1.26.3.tar.gz > nginx.tar.gz @@ -9,7 +11,7 @@ download-nginx: configure: cd nginx-1.26.3 && ./configure --prefix=$(PWD)/build --add-dynamic-module=.. -build-nginx: configure +build-nginx: cd nginx-1.26.3 && make && make install build-module: @@ -29,3 +31,9 @@ debug: cd ./reader-go; go build -o debugger main.go; mv ./debugger .. ./debugger one.bin ./debugger two.bin + +log: + go run log_zone/*.go log + +send: + go run log_zone/*.go send diff --git a/config b/config index c35c6ac..6e8998b 100644 --- a/config +++ b/config @@ -3,7 +3,27 @@ ngx_module_type=HTTP ngx_module_name=ngx_http_limit_req_rw_module ngx_module_srcs="$ngx_addon_dir/ngx_http_limit_req_rw_module.c" -CFLAGS="$CFLAGS `pkg-config --cflags 'msgpack-c = 6.1.0'`" -CORE_LIBS="$CORE_LIBS `pkg-config --libs 'msgpack-c = 6.1.0'`" +# Detect platform +OS_NAME="$(uname -s)" + +# Default values +MSGPACK_PKG_NAME="" +MSGPACK_VERSION="" + +# Try to detect msgpack library +if pkg-config --exists 'msgpack-c'; then + MSGPACK_PKG_NAME="msgpack-c" + MSGPACK_VERSION="6.1.0" # Optional: only enforce version if strictly necessary +elif pkg-config --exists 'msgpack'; then + MSGPACK_PKG_NAME="msgpack" + MSGPACK_VERSION="3.1.0" +else + echo "Error: Neither 'msgpack-c' nor 'msgpack' pkg-config package found." + exit 1 +fi + +# Add flags +CFLAGS="$CFLAGS $(pkg-config --cflags "$MSGPACK_PKG_NAME")" +CORE_LIBS="$CORE_LIBS $(pkg-config --libs "$MSGPACK_PKG_NAME")" . auto/module diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0f83b49 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module github.com/tsuru/ngx-http-limit-req-rw-module + +go 1.24.3 + +require ( + github.com/sirupsen/logrus v1.9.3 + github.com/vmihailenco/msgpack/v5 v5.4.1 +) + +require ( + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5611ca6 --- /dev/null +++ b/go.sum @@ -0,0 +1,19 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/log_zone/logs.go b/log_zone/logs.go new file mode 100644 index 0000000..a08dbc0 --- /dev/null +++ b/log_zone/logs.go @@ -0,0 +1,77 @@ +package main + +import ( + "fmt" + "io" + "net/http" + "time" + + "github.com/sirupsen/logrus" + "github.com/vmihailenco/msgpack/v5" +) + +func logs(zone string) { + for { + zone, err := handleRequest(zone) + if err != nil { + logrus.Error("Error handling request", "error", err) + return + } + fmt.Print("\033[H\033[2J") + logrus.Infof("Zone: %s, RateLimitHeader: %+v, RateLimitEntries: %d", + zone.Name, zone.RateLimitHeader, len(zone.RateLimitEntries)) + for _, entry := range zone.RateLimitEntries { + logrus.Infof("Entry Key: %s, Last: %d, Excess: %d", + entry.Key.String(zone.RateLimitHeader), entry.Last, entry.Excess) + } + fmt.Println("\nPress Ctrl+C to exit") + time.Sleep(2 * time.Second) + } +} + +func handleRequest(zone string) (Zone, error) { + endpoint := fmt.Sprintf("http://localhost:9000/api/%s", zone) + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return Zone{}, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return Zone{}, fmt.Errorf("error making request to %s: %w", endpoint, err) + } + defer response.Body.Close() + decoder := msgpack.NewDecoder(response.Body) + var rateLimitHeader RateLimitHeader + rateLimitEntries := []RateLimitEntry{} + log := logrus.New() + if err := decoder.Decode(&rateLimitHeader); err != nil { + if err == io.EOF { + return Zone{ + Name: zone, + RateLimitHeader: rateLimitHeader, + RateLimitEntries: rateLimitEntries, + }, nil + } + log.Error("Error decoding header", "error", err) + return Zone{}, err + } + for { + var message RateLimitEntry + if err := decoder.Decode(&message); err != nil { + if err == io.EOF { + break + } + log.Error("Error decoding entry", "error", err) + return Zone{}, err + } + message.Last = toNonMonotonic(message.Last, rateLimitHeader) + rateLimitEntries = append(rateLimitEntries, message) + } + log.Debug("Received rate limit entries", "zone", zone, "entries", len(rateLimitEntries)) + return Zone{ + Name: zone, + RateLimitHeader: rateLimitHeader, + RateLimitEntries: rateLimitEntries, + }, nil +} diff --git a/log_zone/main.go b/log_zone/main.go new file mode 100644 index 0000000..b3831a4 --- /dev/null +++ b/log_zone/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "net" + "os" +) + +func main() { + arg := os.Args[1:] + zone := "one" + if arg[0] == "log" { + if len(arg) >= 2 { + zone = arg[1] + } + logs(zone) + return + } + if arg[0] == "send" { + if len(arg) >= 2 { + zone = arg[1] + } + send(zone) + } +} + +func toNonMonotonic(last int64, header RateLimitHeader) int64 { + return header.Now - (header.NowMonotonic - last) +} + +type Zone struct { + Name string + RateLimitHeader RateLimitHeader + RateLimitEntries []RateLimitEntry +} + +type RateLimitHeader struct { + Key string + Now int64 + NowMonotonic int64 +} + +type RateLimitEntry struct { + Key Key + Last int64 + Excess int64 +} + +const ( + BinaryRemoteAddress = "$binary_remote_addr" + RemoteAddress = "$remote_addr" +) + +type Key []byte + +func (r Key) String(header RateLimitHeader) string { + switch header.Key { + case BinaryRemoteAddress: + return net.IP(r).String() + case RemoteAddress: + fallthrough + default: + return string(r) + } +} diff --git a/log_zone/send.go b/log_zone/send.go new file mode 100644 index 0000000..effe051 --- /dev/null +++ b/log_zone/send.go @@ -0,0 +1,76 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "net/http" + "time" + + "github.com/sirupsen/logrus" + "github.com/vmihailenco/msgpack/v5" +) + +func send(zone string) { + err := sendRequest( + zone, + RateLimitHeader{ + Key: BinaryRemoteAddress, + Now: time.Now().Unix(), + NowMonotonic: time.Now().UnixNano() / int64(time.Millisecond), + }, []RateLimitEntry{ + {Key([]byte{127, 0, 0, 0}), 7, 99}, + {Key([]byte{127, 0, 0, 1}), 7, 12}, + {Key([]byte{127, 6, 4, 00}), 2, 98}, + {Key([]byte{127, 0, 0, 99}), 30, 300}, + {Key([]byte{10, 0, 0, 1}), 444, 21}, + }) + if err != nil { + logrus.Fatalf("Error sending request: %v", err) + } +} + +func sendRequest(zone string, header RateLimitHeader, entries []RateLimitEntry) error { + var buf bytes.Buffer + encoder := msgpack.NewEncoder(&buf) + var values []interface{} = []interface{}{ + headerToArray(header), + } + for _, entry := range entries { + values = append(values, entryToArray(entry, header)) + } + if err := encoder.Encode(values); err != nil { + return fmt.Errorf("error encoding entries: %w", err) + } + endpoint := fmt.Sprintf("http://localhost:9000/api/%s", zone) + req, err := http.NewRequest(http.MethodPost, endpoint, &buf) + if err != nil { + return fmt.Errorf("error creating request: %w", err) + } + req.Header.Set("Content-Type", "application/x-msgpack") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("error sending request to %s: %w", endpoint, err) + } + fmt.Println(resp.Status) + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + logrus.Infof("response status: %s, body: %s", resp.Status, respBody) + return nil +} + +func headerToArray(header RateLimitHeader) []interface{} { + return []interface{}{ + header.Key, + header.Now, + header.NowMonotonic, + } +} + +func entryToArray(entry RateLimitEntry, header RateLimitHeader) []interface{} { + return []interface{}{ + entry.Key, + entry.Last, + entry.Excess, + } +} diff --git a/nginx.conf b/nginx.conf index 5d2cea3..2b896d1 100644 --- a/nginx.conf +++ b/nginx.conf @@ -1,6 +1,5 @@ worker_processes 1; master_process on; -daemon off; error_log ./error.log debug; diff --git a/ngx_http_limit_req_module.h b/ngx_http_limit_req_module.h index e301d4b..99187e2 100644 --- a/ngx_http_limit_req_module.h +++ b/ngx_http_limit_req_module.h @@ -1,53 +1,50 @@ /* - * Copyright (C) Igor Sysoev - * Copyright (C) Nginx, Inc. - */ +TODO: copyright +*/ #include #include - typedef struct { - ngx_array_t limits; - ngx_uint_t limit_log_level; - ngx_uint_t delay_log_level; - ngx_uint_t status_code; - ngx_flag_t dry_run; +typedef struct { + ngx_array_t limits; + ngx_uint_t limit_log_level; + ngx_uint_t delay_log_level; + ngx_uint_t status_code; + ngx_flag_t dry_run; } ngx_http_limit_req_conf_t; typedef struct { - ngx_shm_zone_t *shm_zone; - /* integer value, 1 corresponds to 0.001 r/s */ - ngx_uint_t burst; - ngx_uint_t delay; + ngx_shm_zone_t *shm_zone; + /* integer value, 1 corresponds to 0.001 r/s */ + ngx_uint_t burst; + ngx_uint_t delay; } ngx_http_limit_req_limit_t; typedef struct { - u_char color; - u_char dummy; - u_short len; - ngx_queue_t queue; - ngx_msec_t last; - /* integer value, 1 corresponds to 0.001 r/s */ - ngx_uint_t excess; - ngx_uint_t count; - u_char data[1]; + u_char color; + u_char dummy; + u_short len; + ngx_queue_t queue; + ngx_msec_t last; + /* integer value, 1 corresponds to 0.001 r/s */ + ngx_uint_t excess; + ngx_uint_t count; + u_char data[1]; } ngx_http_limit_req_node_t; - typedef struct { - ngx_rbtree_t rbtree; - ngx_rbtree_node_t sentinel; - ngx_queue_t queue; + ngx_rbtree_t rbtree; + ngx_rbtree_node_t sentinel; + ngx_queue_t queue; } ngx_http_limit_req_shctx_t; typedef struct { - ngx_http_limit_req_shctx_t *sh; - ngx_slab_pool_t *shpool; - /* integer value, 1 corresponds to 0.001 r/s */ - ngx_uint_t rate; - ngx_http_complex_value_t key; - ngx_http_limit_req_node_t *node; + ngx_http_limit_req_shctx_t *sh; + ngx_slab_pool_t *shpool; + /* integer value, 1 corresponds to 0.001 r/s */ + ngx_uint_t rate; + ngx_http_complex_value_t key; + ngx_http_limit_req_node_t *node; } ngx_http_limit_req_ctx_t; - extern ngx_module_t ngx_http_limit_req_module; diff --git a/ngx_http_limit_req_rw_module.c b/ngx_http_limit_req_rw_module.c index e67c95d..ce31fa3 100644 --- a/ngx_http_limit_req_rw_module.c +++ b/ngx_http_limit_req_rw_module.c @@ -2,35 +2,50 @@ TODO: copyright */ +#include "ngx_http_limit_req_module.h" +#include #include -#include -#include -#include -#include #include #include -#include -#include +#include #include #include -#include -#include #include #include -#include - -#include "ngx_http_limit_req_module.h" const int MAX_NUMBER_OF_RATE_LIMIT_ELEMENTS = 30 * 1000; +typedef struct { + ngx_str_t Key; + uint64_t Last; + uint64_t Excess; +} entities; + +typedef struct { + ngx_str_t Key; // Key of the rate limit zone + uint64_t Now; // Current timestamp in milliseconds + uint64_t NowMonotonic; // Current monotonic timestamp in milliseconds +} header; + +typedef struct { + header *Header; // Header information + entities *Entities; // Array of entities + uint32_t EntitiesSize; // Size of the entities array +} ngx_zone_data_t; + +static ngx_int_t ngx_decode_msg_pack(ngx_http_request_t *r, + ngx_zone_data_t *msg_pack); + static ngx_int_t ngx_http_limit_req_read_handler(ngx_http_request_t *r); +static void ngx_http_limit_req_write_post_handler(ngx_http_request_t *r); +static ngx_int_t ngx_http_limit_req_write_handler(ngx_http_request_t *r); static char *ngx_http_limit_req_rw_handler(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static void strip_zone_name_from_uri(ngx_str_t *uri, ngx_str_t *zone_name); static ngx_int_t dump_rate_limit_zones(ngx_http_request_t *r, ngx_buf_t *b); -static ngx_int_t dump_req_zone(ngx_pool_t *pool, ngx_buf_t *b, - ngx_str_t *zone_name, ngx_uint_t last_greater_equal); +static ngx_shm_zone_t *find_rate_limit_shm_zone_by_name(ngx_http_request_t *r, + ngx_str_t zone_name); static ngx_int_t dump_req_limits(ngx_pool_t *pool, ngx_shm_zone_t *shm_zone, ngx_buf_t *buf, ngx_uint_t last_greater_equal); static ngx_command_t ngx_http_limit_req_rw_commands[] = { @@ -68,11 +83,263 @@ ngx_module_t ngx_http_limit_req_rw_module = {NGX_MODULE_V1, NGX_MODULE_V1_PADDING}; static ngx_int_t ngx_http_limit_req_handler(ngx_http_request_t *r) { - if (r->method != NGX_HTTP_GET) { - return NGX_HTTP_NOT_ALLOWED; + if (r->method == NGX_HTTP_GET) { + return ngx_http_limit_req_read_handler(r); + } + if (r->method == NGX_HTTP_POST) { + r->request_body_in_single_buf = 1; + return ngx_http_read_client_request_body( + r, ngx_http_limit_req_write_post_handler); + } + return NGX_HTTP_SERVICE_UNAVAILABLE; +} + +static ngx_int_t ngx_decode_msg_pack(ngx_http_request_t *r, + ngx_zone_data_t *ngx_zone_data) { + ngx_chain_t *cl; + size_t len = 0; + u_char *data, *p; + size_t size, deserialized_size; + msgpack_zone mempool; + msgpack_object deserialized; + + if (r->request_body == NULL || r->request_body->bufs == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no request body found"); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + for (cl = r->request_body->bufs; cl; cl = cl->next) { + len += cl->buf->last - cl->buf->pos; + } + + data = ngx_pnalloc(r->pool, len); + if (data == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "failed to allocate memory for request body"); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + p = data; + for (cl = r->request_body->bufs; cl; cl = cl->next) { + size = cl->buf->last - cl->buf->pos; + ngx_memcpy(p, cl->buf->pos, size); + p += size; + } + + msgpack_zone_init(&mempool, 2048); + + msgpack_unpack((char *)data, len, NULL, &mempool, &deserialized); + ngx_log_error( + NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: deserialized type: %d - size: %d", + deserialized.type, deserialized.via.array.size); + if (deserialized.type != MSGPACK_OBJECT_ARRAY) { + msgpack_zone_destroy(&mempool); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + deserialized_size = deserialized.via.array.size; + msgpack_object *items = deserialized.via.array.ptr; + + header *hdr = ngx_pnalloc(r->pool, sizeof(header)); + if (hdr == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "failed to allocate memory for header"); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + if (items->via.array.size >= 1) { + msgpack_object hdrKey = items[0].via.array.ptr[0]; + msgpack_object hdrNow = items[0].via.array.ptr[1]; + msgpack_object hdrNowMonotonic = items[0].via.array.ptr[2]; + hdr->Key.len = hdrKey.via.str.size; + u_char *keyData = ngx_palloc(r->pool, hdrKey.via.str.size); + if (keyData == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "failed to allocate memory for key"); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + ngx_memcpy(keyData, hdrKey.via.str.ptr, hdrKey.via.str.size); + hdr->Key.data = keyData; + hdr->Now = hdrNow.via.u64; + hdr->NowMonotonic = hdrNowMonotonic.via.u64; + } + ngx_zone_data->Header = hdr; + + entities *arr = ngx_pnalloc(r->pool, deserialized_size * sizeof(entities)); + ngx_zone_data->EntitiesSize = deserialized_size - 1; + ngx_zone_data->Entities = arr; + + for (uint32_t i = 1; i < deserialized_size; i++) { + if (items[i].via.array.size != 3) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "invalid number of items in array at index %d", i); + msgpack_zone_destroy(&mempool); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + msgpack_object iItemKey = items[i].via.array.ptr[0]; + msgpack_object iItemLast = items[i].via.array.ptr[1]; + msgpack_object iItemExcess = items[i].via.array.ptr[2]; + + u_char *keyData = ngx_palloc(r->pool, iItemKey.via.str.size); + if (keyData == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "failed to allocate memory for key"); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + ngx_memcpy(keyData, iItemKey.via.str.ptr, iItemKey.via.str.size); + + arr[i - 1].Key.len = iItemKey.via.str.size; + arr[i - 1].Key.data = keyData; + arr[i - 1].Last = iItemLast.via.u64; + arr[i - 1].Excess = iItemExcess.via.u64; + } + msgpack_zone_destroy(&mempool); + return NGX_OK; +} + +static void ngx_http_limit_req_write_post_handler(ngx_http_request_t *r) { + ngx_int_t rc; + + rc = ngx_http_limit_req_write_handler(r); + + if (rc != NGX_OK) { + ngx_http_finalize_request(r, rc); + return; + } + + ngx_str_t response = ngx_string("OK\n"); + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_length_n = response.len; + ngx_http_send_header(r); + + ngx_buf_t *b = ngx_create_temp_buf(r->pool, response.len); + ngx_memcpy(b->pos, response.data, response.len); + b->last = b->pos + response.len; + b->last_buf = 1; + + ngx_chain_t out = {.buf = b, .next = NULL}; + ngx_http_output_filter(r, &out); +} + +static ngx_int_t ngx_http_limit_req_write_handler(ngx_http_request_t *r) { + ngx_int_t rc, found; + ngx_zone_data_t *msg_pack = NULL; + ngx_str_t zone_name; + ngx_shm_zone_t *shm_zone; + ngx_http_limit_req_ctx_t *ctx; + ngx_str_t key; + size_t size; + uint32_t hash; + ngx_rbtree_node_t *node, *sentinel; + ngx_http_limit_req_node_t *lr; + + if (r != r->main) { + return NGX_DECLINED; + } + + msg_pack = ngx_pnalloc(r->pool, sizeof(ngx_zone_data_t)); + if (msg_pack == NULL) { + ngx_log_error( + NGX_LOG_ERR, r->connection->log, 0, + "ngx_http_limit_req_rw_module: failed to allocate memory for msg_pack"); + return NGX_HTTP_INTERNAL_SERVER_ERROR; } - return ngx_http_limit_req_read_handler(r); + rc = ngx_decode_msg_pack(r, msg_pack); + if (rc != NGX_OK) { + return rc; + } + + strip_zone_name_from_uri(&r->uri, &zone_name); + shm_zone = find_rate_limit_shm_zone_by_name(r, zone_name); + if (shm_zone == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "ngx_http_limit_req_rw_module: rate limit zone %*s not found", + zone_name); + return NGX_HTTP_NOT_FOUND; + } + + ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: Header Key: %*s", + msg_pack->Header->Key); + for (uint32_t i = 0; i < msg_pack->EntitiesSize; i++) { + ctx = shm_zone->data; + ngx_shmtx_lock(&ctx->shpool->mutex); + + key = msg_pack->Entities[i].Key; + + hash = ngx_crc32_short(key.data, key.len); + + node = ctx->sh->rbtree.root; + sentinel = ctx->sh->rbtree.sentinel; + + found = 0; + while (node != sentinel) { + if (hash < node->key) { + node = node->left; + continue; + } + if (hash > node->key) { + node = node->right; + continue; + } + + /* hash == node->key */ + + lr = (ngx_http_limit_req_node_t *)&node->color; + + rc = ngx_memn2cmp(key.data, lr->data, key.len, lr->len); + + if (rc == 0) { + found = 1; + break; + } + + node = (rc < 0) ? node->left : node->right; + } + + if (found) { + ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: existing node found %ul", + lr->excess); + lr->last = msg_pack->Entities[i].Last; + lr->excess = msg_pack->Entities[i].Excess; + ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: existing node updated %ul", + lr->excess); + } else { + size = offsetof(ngx_rbtree_node_t, color) + + offsetof(ngx_http_limit_req_node_t, data) + key.len; + node = ngx_slab_alloc_locked(ctx->shpool, size); + if (node == NULL) { + ngx_shmtx_unlock(&ctx->shpool->mutex); + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "failed to allocate memory for rate limit node"); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + node->key = hash; + + lr = (ngx_http_limit_req_node_t *)&node->color; + + lr->len = (u_short)key.len; + lr->excess = msg_pack->Entities[i].Excess; + lr->last = msg_pack->Entities[i].Last; + lr->count = 0; + + ngx_memcpy(lr->data, key.data, key.len); + + ngx_rbtree_insert(&ctx->sh->rbtree, node); + + ngx_queue_insert_head(&ctx->sh->queue, &lr->queue); + + ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: new node excess set to %ul", + lr->excess); + } + ngx_shmtx_unlock(&ctx->shpool->mutex); + } + return NGX_OK; } static ngx_int_t ngx_http_limit_req_read_handler(ngx_http_request_t *r) { @@ -82,15 +349,14 @@ static ngx_int_t ngx_http_limit_req_read_handler(ngx_http_request_t *r) { ngx_buf_t *b; ngx_chain_t out; ngx_http_core_loc_conf_t *clcf; - - if (r->method != NGX_HTTP_GET) { - return NGX_HTTP_NOT_ALLOWED; - } + ngx_shm_zone_t *shm_zone; clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); - b = ngx_create_temp_buf(r->pool, 1024 * 1024); if (b == NULL) { + ngx_log_error( + NGX_LOG_ERR, r->connection->log, 0, + "ngx_http_limit_req_rw_module: failed to create temporary buffer"); return NGX_HTTP_INTERNAL_SERVER_ERROR; } @@ -99,35 +365,39 @@ static ngx_int_t ngx_http_limit_req_read_handler(ngx_http_request_t *r) { "ngx_http_limit_req_rw_module: clcf is NULL"); return NGX_HTTP_INTERNAL_SERVER_ERROR; } - ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, - "ngx_http_limit_req_rw_module: Processing request for URI: %V", &r->uri); ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: clcf->name: %V", &clcf->name); - // When request location is /api + "ngx_http_limit_req_rw_module: request URI: %*s", r->uri); + ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: clcf->name: %*s", clcf->name); if (clcf->name.len == r->uri.len) { ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, "ngx_http_limit_req_rw_module: dumping rate limit zones"); - ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: r: %p", r); - ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: r->pool: %p", r->pool); rc = dump_rate_limit_zones(r, b); - // When request location is /api/{zone_name} } else { strip_zone_name_from_uri(&r->uri, &zone_name); - ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "zone name: %.*s", - (int)zone_name.len, zone_name.data); + ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, "zone name: %*s", + zone_name); last_greater_equal_arg.len = 0; last_greater_equal = 0; if (r->args.len) { - if (ngx_http_arg(r, (u_char *) "last_greater_equal", 18, &last_greater_equal_arg) == NGX_OK) { - last_greater_equal = ngx_atoi(last_greater_equal_arg.data, last_greater_equal_arg.len); - if (last_greater_equal == NGX_ERROR || last_greater_equal < 0) { - return NGX_HTTP_BAD_REQUEST; - } + if (ngx_http_arg(r, (u_char *)"last_greater_equal", 18, + &last_greater_equal_arg) == NGX_OK) { + last_greater_equal = + ngx_atoi(last_greater_equal_arg.data, last_greater_equal_arg.len); + if (last_greater_equal == NGX_ERROR || last_greater_equal < 0) { + return NGX_HTTP_BAD_REQUEST; + } } } - rc = dump_req_zone(r->pool, b, &zone_name, (ngx_uint_t) last_greater_equal); + shm_zone = find_rate_limit_shm_zone_by_name(r, zone_name); + if (shm_zone == NULL) { + ngx_log_error( + NGX_LOG_ERR, r->connection->log, 0, + "ngx_http_limit_req_rw_module: rate limit zone %*s not found", + zone_name); + return NGX_HTTP_NOT_FOUND; + } + rc = dump_req_limits(r->pool, shm_zone, b, last_greater_equal); } if (rc != NGX_OK) { @@ -189,9 +459,6 @@ static ngx_int_t dump_rate_limit_zones(ngx_http_request_t *r, ngx_buf_t *buf) { ngx_shm_zone_t *shm_zone; volatile ngx_list_part_t *part; - ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: dump_rate_limit_zones called"); - if (ngx_cycle == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_limit_req_rw_module: ngx_cycle is NULL"); @@ -200,7 +467,7 @@ static ngx_int_t dump_rate_limit_zones(ngx_http_request_t *r, ngx_buf_t *buf) { msgpack_packer pk; msgpack_packer_init(&pk, buf, msgpack_ngx_buf_write); - zones = ngx_array_create(r->pool, 0, sizeof(ngx_str_t*)); + zones = ngx_array_create(r->pool, 0, sizeof(ngx_str_t *)); if (zones == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_limit_req_rw_module: failed to create zones array"); @@ -217,21 +484,24 @@ static ngx_int_t dump_rate_limit_zones(ngx_http_request_t *r, ngx_buf_t *buf) { if (i >= part->nelts) { if (part->next == NULL) { ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: part->next is NULL, breaking out of loop"); + "ngx_http_limit_req_rw_module: part->next is NULL, " + "breaking out of loop"); break; } - ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: part->next is not NULL, advancing"); + ngx_log_error( + NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: part->next is not NULL, advancing"); part = part->next; shm_zone = part->elts; i = 0; ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: new part->nelts %d", part->nelts); + "ngx_http_limit_req_rw_module: new part->nelts %d", + part->nelts); } if (shm_zone == NULL) { ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: shm_zone is NULL, continuing"); + "ngx_http_limit_req_rw_module: shm_zone is NULL, skipping"); continue; } @@ -239,12 +509,14 @@ static ngx_int_t dump_rate_limit_zones(ngx_http_request_t *r, ngx_buf_t *buf) { "ngx_http_limit_req_rw_module: comparing shm_zone tag"); if (shm_zone[i].tag != &ngx_http_limit_req_module) { ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: shm_zone tag is not limit_req_module, continuing"); + "ngx_http_limit_req_rw_module: shm_zone tag is not " + "limit_req_module, skipping"); continue; } - ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: pushing new zone struct to array"); + ngx_log_error( + NGX_LOG_DEBUG, r->connection->log, 0, + "ngx_http_limit_req_rw_module: pushing new zone struct to array"); zone_name = ngx_array_push(zones); if (zone_name == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, @@ -262,13 +534,11 @@ static ngx_int_t dump_rate_limit_zones(ngx_http_request_t *r, ngx_buf_t *buf) { ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, "ngx_http_limit_req_rw_module: packing array of zones"); msgpack_pack_array(&pk, zones->nelts); - ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: packing zone name"); zone_name = zones->elts; for (i = 0; i < zones->nelts; i++) { ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0, - "ngx_http_limit_req_rw_module: packing zone name %.*s", - (int)zone_name[i]->len, zone_name[i]->data); + "ngx_http_limit_req_rw_module: packing zone %*s", + *zone_name[i]); msgpack_pack_str(&pk, zone_name[i]->len); msgpack_pack_str_body(&pk, zone_name[i]->data, zone_name[i]->len); } @@ -276,14 +546,16 @@ static ngx_int_t dump_rate_limit_zones(ngx_http_request_t *r, ngx_buf_t *buf) { return NGX_OK; } -static ngx_int_t dump_req_zone(ngx_pool_t *pool, ngx_buf_t *b, - ngx_str_t *zone_name, ngx_uint_t last_greater_equal) { +static ngx_shm_zone_t *find_rate_limit_shm_zone_by_name(ngx_http_request_t *r, + ngx_str_t zone_name) { ngx_uint_t i; ngx_shm_zone_t *shm_zone; volatile ngx_list_part_t *part; if (ngx_cycle == NULL) { - return NGX_HTTP_INTERNAL_SERVER_ERROR; + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "ngx_http_limit_req_rw_module: ngx_cycle is NULL"); + return NULL; } part = &ngx_cycle->shared_memory.part; @@ -308,19 +580,24 @@ static ngx_int_t dump_req_zone(ngx_pool_t *pool, ngx_buf_t *b, continue; } - if (shm_zone[i].shm.name.len != zone_name->len) { + if (shm_zone[i].shm.name.len != zone_name.len) { continue; } - if (ngx_strncmp(zone_name->data, shm_zone[i].shm.name.data, zone_name->len) == 0) { - return dump_req_limits(pool, &shm_zone[i], b, last_greater_equal); + if (ngx_strncmp(zone_name.data, shm_zone[i].shm.name.data, zone_name.len) == + 0) { + return shm_zone; } } - return NGX_HTTP_NOT_FOUND; + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "ngx_http_limit_req_rw_module: rate limit zone %*s not found", + zone_name); + return NULL; } static ngx_int_t dump_req_limits(ngx_pool_t *pool, ngx_shm_zone_t *shm_zone, - ngx_buf_t *buf, ngx_uint_t last_greater_equal) { + ngx_buf_t *buf, + ngx_uint_t last_greater_equal) { ngx_http_limit_req_ctx_t *ctx; ngx_queue_t *head, *q, *last; ngx_http_limit_req_node_t *lr; @@ -343,10 +620,6 @@ static ngx_int_t dump_req_limits(ngx_pool_t *pool, ngx_shm_zone_t *shm_zone, msgpack_pack_uint64(&pk, now); msgpack_pack_uint64(&pk, now_monotonic); - ngx_log_debug4( - NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0, "shm.name %p -> %.*s - rate: %lu", - ctx, (int)shm_zone->shm.name.len, shm_zone->shm.name.data, ctx->rate); - ngx_shmtx_lock(&ctx->shpool->mutex); if (ngx_queue_empty(&ctx->sh->queue)) {