Skip to content

Commit 3fa6135

Browse files
committed
DISPATCH-1979: Added code to capture the footer of the GRPC message in case it arrives before credit is available to send the message. Also code to create new nghttp2 session when a new connection is opened. This closes #1057.
1 parent 380a53c commit 3fa6135

File tree

4 files changed

+155
-37
lines changed

4 files changed

+155
-37
lines changed

include/qpid/dispatch/message.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
303303
void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool receive_complete);
304304
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, bool receive_complete);
305305
void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3, bool receive_complete);
306+
void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete);
306307

307308
/**
308309
* qd_message_extend

src/adaptors/http2/http2_adaptor.c

Lines changed: 137 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ static qdr_http2_adaptor_t *http2_adaptor;
6969

7070
static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context);
7171
static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_data_t *stream_data);
72+
static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on_shutdown);
73+
74+
static void free_all_connection_streams(qdr_http2_connection_t *http_conn, bool on_shutdown)
75+
{
76+
// Free all the stream data associated with this connection/session.
77+
qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
78+
while (stream_data) {
79+
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id);
80+
free_http2_stream_data(stream_data, on_shutdown);
81+
stream_data = DEQ_HEAD(http_conn->session_data->streams);
82+
}
83+
}
7284

7385
static void set_stream_data_delivery_flags(qdr_http2_stream_data_t * stream_data, qdr_delivery_t *dlv) {
7486
if (dlv == stream_data->in_dlv) {
@@ -345,12 +357,7 @@ static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
345357
void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdown)
346358
{
347359
// Free all the stream data associated with this connection/session.
348-
qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
349-
while (stream_data) {
350-
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id);
351-
free_http2_stream_data(stream_data, on_shutdown);
352-
stream_data = DEQ_HEAD(http_conn->session_data->streams);
353-
}
360+
free_all_connection_streams(http_conn, on_shutdown);
354361

355362
if(http_conn->remote_address) {
356363
free(http_conn->remote_address);
@@ -368,9 +375,11 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo
368375

369376
http_conn->context.context = 0;
370377

371-
nghttp2_session_del(http_conn->session_data->session);
378+
if (http_conn->session_data->session)
379+
nghttp2_session_del(http_conn->session_data->session);
372380

373381
free_qdr_http2_session_data_t(http_conn->session_data);
382+
http_conn->session_data = 0;
374383
sys_mutex_lock(http2_adaptor->lock);
375384
DEQ_REMOVE(http2_adaptor->connections, http_conn);
376385
sys_mutex_unlock(http2_adaptor->lock);
@@ -528,14 +537,13 @@ static int snd_data_callback(nghttp2_session *session,
528537
qdr_http2_session_data_t *session_data = conn->session_data;
529538
qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
530539

531-
qd_http2_buffer_t *http2_buff = qd_http2_buffer();
532-
DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
533-
// Insert the framehd of length 9 bytes into the buffer
534-
memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
535-
qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
536-
537540
int bytes_sent = 0; // This should not include the header length of 9.
538541
if (length) {
542+
qd_http2_buffer_t *http2_buff = qd_http2_buffer();
543+
DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
544+
// Insert the framehd of length 9 bytes into the buffer
545+
memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
546+
qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
539547
pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
540548
qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
541549

@@ -574,11 +582,10 @@ static int snd_data_callback(nghttp2_session *session,
574582

575583
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
576584

577-
if (length)
585+
if (length) {
578586
assert(bytes_sent == length);
579-
580-
write_buffers(conn);
581-
587+
write_buffers(conn);
588+
}
582589

583590
return 0;
584591

@@ -745,16 +752,37 @@ static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_d
745752
qd_compose_insert_binary(stream_data->body, 0, 0);
746753
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id);
747754
}
748-
qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
755+
756+
if (stream_data->footer_properties) {
757+
qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
758+
}
759+
else {
760+
qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
761+
}
749762
}
750763
else {
751764
if (stream_data->body) {
752765
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = false and has stream_data->body in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
753-
qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
766+
if (stream_data->footer_properties) {
767+
qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
768+
}
769+
else {
770+
qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
771+
}
754772
stream_data->body_data_added = true;
755773
}
756774
else {
757-
qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete);
775+
776+
if (stream_data->footer_properties) {
777+
//
778+
// The footer has already arrived but there was no body. Insert an empty body
779+
//
780+
stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
781+
qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
782+
}
783+
else {
784+
qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete);
785+
}
758786
}
759787
}
760788

@@ -990,8 +1018,15 @@ static int on_frame_recv_callback(nghttp2_session *session,
9901018
}
9911019

9921020
if (stream_data->entire_footer_arrived) {
993-
qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, false);
994-
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, qdr_delivery_continue "DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv));
1021+
if (stream_data->in_dlv) {
1022+
qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, false);
1023+
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, qdr_delivery_continue "DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv));
1024+
}
1025+
else {
1026+
if (route_delivery(stream_data, receive_complete)) {
1027+
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, delivery routed successfully (on_frame_recv_callback)", conn->conn_id, stream_id);
1028+
}
1029+
}
9951030
}
9961031
else {
9971032
//
@@ -1351,6 +1386,25 @@ static int qdr_http_get_credit(void *context, qdr_link_t *link)
13511386
}
13521387

13531388

1389+
ssize_t error_read_callback(nghttp2_session *session,
1390+
int32_t stream_id,
1391+
uint8_t *buf,
1392+
size_t length,
1393+
uint32_t *data_flags,
1394+
nghttp2_data_source *source,
1395+
void *user_data)
1396+
{
1397+
size_t len = 0;
1398+
char *error_msg = (char *) source->ptr;
1399+
if (error_msg) {
1400+
len = strlen(error_msg);
1401+
if (len > 0)
1402+
memcpy(buf, error_msg, len);
1403+
}
1404+
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
1405+
return len;
1406+
}
1407+
13541408
static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
13551409
{
13561410
qdr_http2_stream_data_t* stream_data = qdr_delivery_get_context(dlv);
@@ -1377,31 +1431,49 @@ static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_
13771431
}
13781432

13791433
if (settled) {
1380-
nghttp2_nv hdrs[1];
1434+
nghttp2_nv hdrs[2];
13811435
if (conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) {
13821436
if (disp == PN_RELEASED || disp == PN_MODIFIED) {
1437+
uint8_t * error_msg = (uint8_t *)"Service Unavailable";
13831438
hdrs[0].name = (uint8_t *)":status";
13841439
hdrs[0].value = (uint8_t *)"503";
13851440
hdrs[0].namelen = 7;
13861441
hdrs[0].valuelen = 3;
13871442
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
1388-
nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 1, 0);
1443+
1444+
hdrs[1].name = (uint8_t *)":content-type";
1445+
hdrs[1].value = (uint8_t *)"text/plain";
1446+
hdrs[1].namelen = 13;
1447+
hdrs[1].valuelen = 10;
1448+
hdrs[1].flags = NGHTTP2_NV_FLAG_NONE;
1449+
1450+
nghttp2_data_provider data_prd;
1451+
data_prd.read_callback = error_read_callback;
1452+
data_prd.source.ptr = error_msg;
1453+
1454+
nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd);
1455+
nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, error_msg, 19);
13891456
}
13901457
else if (disp == PN_REJECTED) {
1458+
uint8_t * error_msg = (uint8_t *)"Resource Unavailable";
13911459
hdrs[0].name = (uint8_t *)":status";
13921460
hdrs[0].value = (uint8_t *)"400";
13931461
hdrs[0].namelen = 7;
13941462
hdrs[0].valuelen = 3;
13951463
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
1396-
nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 1, 0);
1397-
}
13981464

1399-
//
1400-
// Send a GOAWAY frame on the client/ingress connection.
1401-
// The GOAWAY frame is used to initiate shutdown of a connection or to signal serious error conditions.
1402-
//
1403-
nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, (uint8_t *)"Service Unavailable", 19);
1465+
hdrs[1].name = (uint8_t *)":content-type";
1466+
hdrs[1].value = (uint8_t *)"text/plain";
1467+
hdrs[1].namelen = 13;
1468+
hdrs[1].valuelen = 10;
1469+
hdrs[1].flags = NGHTTP2_NV_FLAG_NONE;
1470+
1471+
nghttp2_data_provider data_prd;
1472+
data_prd.read_callback = error_read_callback;
1473+
data_prd.source.ptr = error_msg;
14041474

1475+
nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd);
1476+
}
14051477
}
14061478

14071479
if (!conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) {
@@ -1908,7 +1980,6 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
19081980
qd_http2_buffer_insert(buf, raw_buff_size);
19091981
count += raw_buff_size;
19101982
DEQ_INSERT_TAIL(buffers, buf);
1911-
//qd_log(http2_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_incoming_http - Inserting qd_http2_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
19121983
}
19131984
}
19141985

@@ -2072,10 +2143,37 @@ static void close_connections(qdr_http2_connection_t* conn)
20722143
qdr_action_enqueue(http2_adaptor->core, action);
20732144
}
20742145

2146+
static void clean_session_data(qdr_http2_connection_t* conn)
2147+
{
2148+
free_all_connection_streams(conn, false);
2149+
2150+
//
2151+
// This closes the nghttp2 session. Next time when a new connection is opened, a new nghttp2 session
2152+
// will be created by calling nghttp2_session_client_new
2153+
//
2154+
nghttp2_session_del(conn->session_data->session);
2155+
conn->session_data->session = 0;
2156+
2157+
//
2158+
// Free all the buffers on this session. This session is closed and any unsent buffers should be freed.
2159+
//
2160+
qd_http2_buffer_t *buf = DEQ_HEAD(conn->session_data->buffs);
2161+
qd_http2_buffer_t *curr_buf = 0;
2162+
while (buf) {
2163+
curr_buf = buf;
2164+
DEQ_REMOVE_HEAD(conn->session_data->buffs);
2165+
buf = DEQ_HEAD(conn->session_data->buffs);
2166+
free_qd_http2_buffer_t(curr_buf);
2167+
}
2168+
}
2169+
20752170

20762171
static void handle_disconnected(qdr_http2_connection_t* conn)
20772172
{
20782173
sys_mutex_lock(qd_server_get_activation_lock(http2_adaptor->core->qd->server));
2174+
2175+
clean_session_data(conn);
2176+
20792177
if (conn->pn_raw_conn) {
20802178
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Setting conn->pn_raw_conn=0", conn->conn_id);
20812179
conn->pn_raw_conn = 0;
@@ -2128,11 +2226,11 @@ static void egress_conn_timer_handler(void *context)
21282226
{
21292227
qdr_http2_connection_t* conn = (qdr_http2_connection_t*) context;
21302228

2131-
qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Running egress_conn_timer_handler", conn->conn_id);
2132-
2133-
if (conn->connection_established)
2229+
if (conn->pn_raw_conn || conn->connection_established)
21342230
return;
21352231

2232+
qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Running egress_conn_timer_handler", conn->conn_id);
2233+
21362234
if (!conn->ingress) {
21372235
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] - Egress_conn_timer_handler - Trying to establishing outbound connection", conn->conn_id);
21382236
http_connector_establish(conn);
@@ -2249,8 +2347,11 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
22492347
send_settings_frame(conn);
22502348
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted Ingress ((PN_RAW_CONNECTION_CONNECTED)) from %s", conn->conn_id, conn->remote_address);
22512349
} else {
2252-
if (!conn->session_data->session)
2350+
if (!conn->session_data->session) {
22532351
nghttp2_session_client_new(&conn->session_data->session, (nghttp2_session_callbacks *)http2_adaptor->callbacks, (void *)conn);
2352+
send_settings_frame(conn);
2353+
conn->client_magic_sent = true;
2354+
}
22542355
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected Egress (PN_RAW_CONNECTION_CONNECTED)", conn->conn_id);
22552356
conn->connection_established = true;
22562357
create_stream_dispatcher_link(conn);

src/adaptors/http2/http2_adaptor.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ struct qdr_http2_session_data_t {
6464
nghttp2_session *session; // A pointer to the nghttp2s' session object
6565
qd_http2_stream_data_list_t streams; // A session can have many streams.
6666
qd_http2_buffer_list_t buffs; // Buffers for writing
67-
bool max_buffs_in_pool;
6867
};
6968

7069
struct qdr_http2_stream_data_t {

src/message.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2350,6 +2350,23 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com
23502350
DEQ_APPEND(content->buffers, (*field3_buffers));
23512351
}
23522352

2353+
void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete)
2354+
{
2355+
qd_message_content_t *content = MSG_CONTENT(msg);
2356+
content->receive_complete = receive_complete;
2357+
qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1);
2358+
qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2);
2359+
qd_buffer_list_t *field3_buffers = qd_compose_buffers(field3);
2360+
qd_buffer_list_t *field4_buffers = qd_compose_buffers(field4);
2361+
2362+
content->buffers = *field1_buffers;
2363+
DEQ_INIT(*field1_buffers);
2364+
DEQ_APPEND(content->buffers, (*field2_buffers));
2365+
DEQ_APPEND(content->buffers, (*field3_buffers));
2366+
DEQ_APPEND(content->buffers, (*field4_buffers));
2367+
2368+
}
2369+
23532370

23542371
int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked)
23552372
{

0 commit comments

Comments
 (0)