Skip to content

Commit d176c62

Browse files
committed
ipc: add new set of synchronous pipes
These new pipe set for each process can be used to synchronously send a command from one process to the other. When using these pipes, make sure that there's no other job that can overlap, otherwise the receiver will not be albe to handle it
1 parent 139bb4c commit d176c62

File tree

4 files changed

+77
-0
lines changed

4 files changed

+77
-0
lines changed

ipc.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ int create_ipc_pipes( int proc_no )
106106
i, errno, strerror(errno));
107107
return -1;
108108
}
109+
110+
if (pipe(pt[i].ipc_sync_pipe_holder)<0) {
111+
LM_ERR("failed to create IPC sync pipe for process %d, err %d/%s\n",
112+
i, errno, strerror(errno));
113+
return -1;
114+
}
109115
}
110116
return 0;
111117
}
@@ -193,6 +199,39 @@ int ipc_dispatch_rpc( ipc_rpc_f *rpc, void *param)
193199
return __ipc_send_job(ipc_shared_pipe[1], ipc_rpc_type, rpc, param);
194200
}
195201

202+
int ipc_send_sync_reply(int dst_proc, void *param)
203+
{
204+
int n;
205+
206+
again:
207+
n = write(IPC_FD_SYNC_WRITE(dst_proc), &param, sizeof(param));
208+
if (n<0) {
209+
if (errno==EINTR)
210+
goto again;
211+
LM_ERR("sending sync rpc %d[%s]\n", errno, strerror(errno));
212+
return -1;
213+
}
214+
return 0;
215+
}
216+
217+
int ipc_recv_sync_reply(void **param)
218+
{
219+
void *ret;
220+
int n;
221+
222+
again:
223+
n = read(IPC_FD_SYNC_READ_SELF, &ret, sizeof(ret));
224+
if (n < sizeof(*ret)) {
225+
if (errno == EINTR)
226+
goto again;
227+
/* if we got here, it's definitely an error, because the socket is
228+
* blocking, so we can't read partial messages */
229+
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
230+
return -1;
231+
}
232+
*param = ret;
233+
return 0;
234+
}
196235

197236
void ipc_handle_job(int fd)
198237
{

ipc.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ extern int ipc_shared_fd_read;
3232
#define IPC_FD_WRITE(_proc_no) pt[_proc_no].ipc_pipe[1]
3333
#define IPC_FD_READ_SELF IPC_FD_READ(process_no)
3434
#define IPC_FD_READ_SHARED ipc_shared_fd_read
35+
#define IPC_FD_SYNC_READ(_proc_no) pt[_proc_no].ipc_sync_pipe[0]
36+
#define IPC_FD_SYNC_WRITE(_proc_no) pt[_proc_no].ipc_sync_pipe[1]
37+
#define IPC_FD_SYNC_READ_SELF IPC_FD_SYNC_READ(process_no)
3538

3639
/* prototype of IPC handler - function called by the IPC engine
3740
* when the a job with the correspoding type was received */
@@ -68,6 +71,25 @@ int ipc_send_job(int dst_proc, ipc_handler_type type, void *payload);
6871
int ipc_send_rpc(int dst_proc, ipc_rpc_f *rpc, void *param);
6972

7073

74+
/*
75+
* Send a synchronous message to a specific "dst_proc" process
76+
* Use this command when you are sure that the "dst_proc" is waiting only for
77+
* this specific "response", and cannot overlap with a different task
78+
*
79+
* Return: 0 on success, -1 on failure
80+
*/
81+
int ipc_send_sync_reply(int dst_proc, void *param);
82+
83+
84+
/*
85+
* Wait for a message sent by a different process synchronously using the
86+
* ipc_send_sync_reply() function.
87+
*
88+
* Return: 0 on success, -1 on failure
89+
*/
90+
int ipc_recv_sync_reply(void **param);
91+
92+
7193
/*
7294
* Push a job for the next available OpenSIPS worker and quickly return
7395
*

pt.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ int init_multi_proc_support(void)
8181
pt[i].unix_sock = -1;
8282
pt[i].pid = -1;
8383
pt[i].ipc_pipe[0] = pt[i].ipc_pipe[1] = -1;
84+
pt[i].ipc_sync_pipe[0] = pt[i].ipc_sync_pipe[1] = -1;
8485
}
8586

8687
/* create the load-related stats (initially marked as hidden */
@@ -199,6 +200,7 @@ void reset_process_slot( int p_id )
199200
pt[p_id].flags = 0;
200201

201202
pt[p_id].ipc_pipe[0] = pt[p_id].ipc_pipe[1] = -1;
203+
pt[p_id].ipc_sync_pipe[0] = pt[p_id].ipc_sync_pipe[1] = -1;
202204
pt[p_id].unix_sock = -1;
203205

204206
pt[p_id].log_level = pt[p_id].default_log_level = 0; /*not really needed*/
@@ -260,12 +262,16 @@ int internal_fork(char *proc_desc, unsigned int flags,
260262
/* advertise no IPC to the rest of the procs */
261263
pt[new_idx].ipc_pipe[0] = -1;
262264
pt[new_idx].ipc_pipe[1] = -1;
265+
pt[new_idx].ipc_sync_pipe[0] = -1;
266+
pt[new_idx].ipc_sync_pipe[1] = -1;
263267
/* NOTE: the IPC fds will remain open in the other processes,
264268
* but they will not be known */
265269
} else {
266270
/* activate the IPC pipes */
267271
pt[new_idx].ipc_pipe[0]=pt[new_idx].ipc_pipe_holder[0];
268272
pt[new_idx].ipc_pipe[1]=pt[new_idx].ipc_pipe_holder[1];
273+
pt[new_idx].ipc_sync_pipe[0]=pt[new_idx].ipc_sync_pipe_holder[0];
274+
pt[new_idx].ipc_sync_pipe[1]=pt[new_idx].ipc_sync_pipe_holder[1];
269275
}
270276

271277
pt[new_idx].pid = 0;
@@ -388,6 +394,8 @@ void dynamic_process_final_exit(void)
388394
/* prevent any more IPC */
389395
pt[process_no].ipc_pipe[0] = -1;
390396
pt[process_no].ipc_pipe[1] = -1;
397+
pt[process_no].ipc_sync_pipe[0] = -1;
398+
pt[process_no].ipc_sync_pipe[1] = -1;
391399

392400
/* clear the per-process connection from the DB queues */
393401
ql_force_process_disconnect(process_no);

pt.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ struct process_table {
5656
* does not exist */
5757
int ipc_pipe_holder[2];
5858

59+
/* pipe used by the process to receive a synchronoys job
60+
* this pipe should only be used by a process to synchronously receive a
61+
* message after he knows that some other process will send it for sure,
62+
* and there's no other job that can overlap in the meantime */
63+
int ipc_sync_pipe[2];
64+
/* same as above, but holder for non-existing processes */
65+
int ipc_sync_pipe_holder[2];
66+
5967
/* holder for the unixsocks used by TCP layer for inter-proc communication;
6068
* used when the corresponding process does not exist */
6169
int tcp_socks_holder[2];

0 commit comments

Comments
 (0)