Skip to content

Commit 8e16b59

Browse files
committed
Use the auto-scaling profiles to controll the auto-scaling
1 parent 949a8ed commit 8e16b59

File tree

8 files changed

+179
-75
lines changed

8 files changed

+179
-75
lines changed

cfg.y

+13-13
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
#include "socket_info.h"
100100
#include "name_alias.h"
101101
#include "ut.h"
102+
#include "pt_scaling.h"
102103
#include "dset.h"
103104
#include "pvar.h"
104105
#include "blacklists.h"
@@ -647,7 +648,6 @@ listen_def_param: ANYCAST {
647648
| USE_AUTO_SCALING_PROFILE ID {
648649
$$=mk_listen_param();
649650
$$->auto_scaling_profile=$2;
650-
auto_scaling_enabled = 1;
651651
}
652652
;
653653

@@ -709,29 +709,29 @@ auto_scale_profile_def:
709709
NUMBER CYCLES_WITHIN NUMBER
710710
SCALE_DOWN_TO NUMBER ON NUMBER MODULO FOR
711711
NUMBER CYCLES {
712-
//if (create_auto_scaling_profile($1,$3,$5,$8,$10,
713-
//$12, $14, $17)<0)
714-
// yyerror("failed to create auto scaling profile");
712+
if (create_auto_scaling_profile($1,$3,$5,$8,$10,
713+
$12, $14, $17,10*$17)<0)
714+
yyerror("failed to create auto scaling profile");
715715
}
716716
| ID SCALE_UP_TO NUMBER ON NUMBER MODULO FOR
717717
NUMBER CYCLES
718718
SCALE_DOWN_TO NUMBER ON NUMBER MODULO FOR
719719
NUMBER CYCLES {
720-
//if (create_auto_scaling_profile($1,$3,$5,$8,$8,
721-
//$12, $14, $17)<0)
722-
// yyerror("failed to create auto scaling profile");
720+
if (create_auto_scaling_profile($1,$3,$5,$8,$8,
721+
$11, $13, $16, 10*$16)<0)
722+
yyerror("failed to create auto scaling profile");
723723
}
724724
| ID SCALE_UP_TO NUMBER ON NUMBER MODULO FOR
725725
NUMBER CYCLES_WITHIN NUMBER {
726-
//if (create_auto_scaling_profile($1,$3,$5,$8,$10,
727-
//0, 0, 0)<0)
728-
// yyerror("failed to create auto scaling profile");
726+
if (create_auto_scaling_profile($1,$3,$5,$8,$10,
727+
0, 0, 0, 0)<0)
728+
yyerror("failed to create auto scaling profile");
729729
}
730730
| ID SCALE_UP_TO NUMBER ON NUMBER MODULO FOR
731731
NUMBER CYCLES {
732-
//if (create_auto_scaling_profile($1,$3,$5,$8,$8,
733-
//0, 0, 0)<0)
734-
// yyerror("failed to create auto scaling profile");
732+
if (create_auto_scaling_profile($1,$3,$5,$8,$8,
733+
0, 0, 0, 0)<0)
734+
yyerror("failed to create auto scaling profile");
735735
}
736736
;
737737

main.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ void handle_sigs(void)
544544
continue;
545545
}
546546
if (pt[i].flags & OSS_PROC_SELFEXIT) {
547-
LM_WARN("process %d/%d did selfexit with "
547+
LM_NOTICE("process %d/%d did selfexit with "
548548
"status %d\n", i, chld, WTERMSIG(chld_status));
549549
reset_process_slot(i);
550550
continue;
@@ -843,7 +843,7 @@ static int main_loop(void)
843843
if (auto_scaling_enabled) {
844844
sleep(1);
845845
if ( (get_uticks()-last_check) >= 1000000) {
846-
check_and_adjust_number_of_workers();
846+
do_workers_auto_scaling();
847847
last_check = get_uticks();
848848
}
849849
} else

net/net_udp.c

+4-5
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ int udp_count_processes(unsigned int *extra)
7474
if (protos[i].id!=PROTO_NONE && is_udp_based_proto(i))
7575
for( si=protos[i].listeners ; si; si=si->next) {
7676
n+=si->workers;
77-
e+=si->workers_max?(si->workers_max-si->workers):0;
77+
e+=si->s_profile?(si->s_profile->max_procs-si->workers):0;
7878
}
7979

8080
if (extra) *extra = e;
@@ -429,10 +429,9 @@ int udp_start_processes(int *chd_rank, int *startup_done)
429429

430430
for(si=protos[p].listeners; si ; si=si->next ) {
431431

432-
if ( auto_scaling_enabled &&
433-
create_process_group( TYPE_UDP, si, si->workers_min,
434-
si->workers_max, fork_dynamic_udp_process,
435-
udp_process_graceful_terminate)!=0)
432+
if ( auto_scaling_enabled && si->s_profile &&
433+
create_process_group( TYPE_UDP, si, si->s_profile,
434+
fork_dynamic_udp_process, udp_process_graceful_terminate)!=0)
436435
LM_ERR("failed to create group of UDP processes for <%.*s>, "
437436
"auto forking will not be possible\n",
438437
si->name.len, si->name.s);

pt_scaling.c

+93-39
Original file line numberDiff line numberDiff line change
@@ -28,60 +28,107 @@
2828
#include "ipc.h"
2929
#include "daemonize.h"
3030

31-
3231
struct process_group {
3332
enum process_type type;
3433
struct socket_info *si_filter;
3534
fork_new_process_f *fork_func;
3635
terminate_process_f *term_func;
37-
unsigned int max_procs;
38-
unsigned int min_procs;
39-
/* some reference to a profile to give us params for fork/rip procs */
36+
struct scaling_profile *prof;
4037
unsigned char history_size;
38+
unsigned char *history_map;
4139
unsigned char history_idx;
4240
unsigned short no_downscale_cycles;
43-
unsigned char *history_map;
4441
struct process_group *next;
4542
};
4643

47-
#define PG_HISTORY_DEFAULT_SIZE 5 /*to be replaced with val from profile*/
48-
#define PG_HIGH_MIN_SCORE 4 /*to be replaced with val from profile*/
49-
#define PG_HLOAD_TRESHOLD 50 /*to be replaced with val from profile*/
50-
#define PG_LLOAD_TRESHOLD 20 /*to be replaced with val from profile*/
44+
static struct process_group *pg_head = NULL;
45+
46+
static struct scaling_profile *profiles_head = NULL;
47+
48+
49+
50+
int create_auto_scaling_profile( char *name,
51+
unsigned int max_procs, unsigned int up_threshold,
52+
unsigned int up_cycles_needed, unsigned int up_cycles_tocheck,
53+
unsigned int min_procs, unsigned int down_threshold,
54+
unsigned int down_cycles_tocheck, unsigned short down_cycles_delay)
55+
{
56+
struct scaling_profile *p;
57+
58+
p = (struct scaling_profile*)pkg_malloc( sizeof(struct scaling_profile) +
59+
strlen(name) + 1 );
60+
if (p==NULL) {
61+
LM_ERR("failed to allocate memory for a new auto-scaling profile\n");
62+
return -1;
63+
}
64+
65+
/* not really need, more to be safe for future expansions */
66+
memset( p, 0, sizeof(struct scaling_profile));
67+
68+
p->max_procs = max_procs;
69+
p->up_threshold = up_threshold;
70+
p->up_cycles_needed = up_cycles_needed;
71+
p->up_cycles_tocheck = up_cycles_tocheck;
72+
p->min_procs = min_procs;
73+
p->down_threshold = down_threshold;
74+
p->down_cycles_tocheck = down_cycles_tocheck;
75+
p->down_cycles_delay = down_cycles_delay;
76+
p->name = (char*)(p+1);
77+
strcpy( p->name, name);
78+
79+
p->next = profiles_head;
80+
profiles_head = p;
81+
82+
return 0;
83+
}
84+
85+
86+
struct scaling_profile *get_scaling_profile(char *name)
87+
{
88+
struct scaling_profile *p;
89+
90+
for ( p=profiles_head ; p ; p=p->next )
91+
if (strcasecmp(name, p->name)==0)
92+
return p;
93+
94+
return NULL;
95+
}
5196

52-
struct process_group *pg_head = NULL;
5397

5498
int create_process_group(enum process_type type,
55-
struct socket_info *si_filter,
56-
unsigned int min_procs, unsigned int max_procs,
57-
fork_new_process_f *f1, terminate_process_f *f2)
99+
struct socket_info *si_filter, struct scaling_profile *prof,
100+
fork_new_process_f *f1, terminate_process_f *f2)
58101
{
59102
struct process_group *pg, *it;
103+
int h_size;
104+
105+
/* how much of a history do we need in order to cover both up and down
106+
* tranzitions ? */
107+
h_size = (prof->up_cycles_tocheck > prof->down_cycles_tocheck) ?
108+
prof->up_cycles_tocheck : prof->down_cycles_tocheck;
60109

61110
pg = (struct process_group*)shm_malloc( sizeof(struct process_group) +
62-
sizeof(char)*PG_HISTORY_DEFAULT_SIZE );
111+
sizeof(char)*h_size );
63112
if (pg==NULL) {
64113
LM_ERR("failed to allocate memory for a new process group\n");
65114
return -1;
66115
}
67-
memset( pg, 0, sizeof(struct process_group) +
68-
sizeof(char)*PG_HISTORY_DEFAULT_SIZE );
116+
memset( pg, 0, sizeof(struct process_group) + sizeof(char)*h_size );
69117

70118
LM_DBG("registering group of processes type %d, socket filter %p, "
71-
"process range [%d,%d]\n", type, si_filter, min_procs, max_procs );
119+
"scaling profile <%s>\n", type, si_filter, prof->name );
72120

73121
pg->type = type;
74122
pg->si_filter = si_filter;
75-
pg->max_procs = max_procs;
76-
pg->min_procs = min_procs;
123+
pg->prof = prof;
77124
pg->fork_func = f1;
78125
pg->term_func = f2;
79126
pg->next = NULL;
80127

81-
pg->history_size = PG_HISTORY_DEFAULT_SIZE;
128+
pg->history_size = h_size;
82129
pg->history_map = (unsigned char*)(pg+1);
83130
pg->history_idx = 0;
84-
pg->no_downscale_cycles = 10*PG_HISTORY_DEFAULT_SIZE;
131+
pg->no_downscale_cycles = pg->prof->down_cycles_delay;
85132

86133
/* add at the end of list, to avoid changing the head of the list due
87134
* forking */
@@ -108,12 +155,12 @@ void rescale_group_history(struct process_group *pg, unsigned int idx,
108155
LM_DBG("rescaling old %d to %d [idx %d]\n",
109156
old, pg->history_map[k], k);
110157

111-
k = k ? (k-1) : (PG_HISTORY_DEFAULT_SIZE-1) ;
158+
k = k ? (k-1) : (pg->history_size-1) ;
112159
} while(k!=idx);
113160
}
114161

115162

116-
void check_and_adjust_number_of_workers(void)
163+
void do_workers_auto_scaling(void)
117164
{
118165
struct process_group *pg;
119166
unsigned int i, k, idx;
@@ -142,7 +189,7 @@ void check_and_adjust_number_of_workers(void)
142189
}
143190

144191
/* set the current value */
145-
idx = (pg->history_idx+1)%PG_HISTORY_DEFAULT_SIZE;
192+
idx = (pg->history_idx+1)%pg->history_size;
146193
pg->history_map[idx] = (unsigned char) ( load / procs_no );
147194

148195
LM_DBG("group %d (with %d procs) has average load of %d\n",
@@ -152,20 +199,24 @@ void check_and_adjust_number_of_workers(void)
152199
cnt_over = 0;
153200
cnt_under = 0;
154201
k = idx;
202+
i = 1;
155203
do {
156-
if (pg->history_map[k]>PG_HLOAD_TRESHOLD)
204+
if ( pg->history_map[k] > pg->prof->up_threshold &&
205+
i <= pg->prof->up_cycles_tocheck )
157206
cnt_over++;
158-
else if (pg->history_map[k]<PG_LLOAD_TRESHOLD)
207+
else if ( pg->history_map[k] < pg->prof->down_threshold &&
208+
i <= pg->prof->down_cycles_tocheck )
159209
cnt_under++;
160210

161-
k = k ? (k-1) : (PG_HISTORY_DEFAULT_SIZE-1) ;
211+
i++;
212+
k = k ? (k-1) : (pg->history_size-1) ;
162213
} while(k!=idx);
163214

164215
/* decide what to do */
165-
if (cnt_over>=PG_HIGH_MIN_SCORE) {
166-
if (procs_no<pg->max_procs) {
216+
if ( cnt_over >= pg->prof->up_cycles_needed ) {
217+
if ( procs_no < pg->prof->max_procs ) {
167218
LM_NOTICE("score %d/%d -> forking new proc in group %d "
168-
"(with %d procs)\n", cnt_over, PG_HISTORY_DEFAULT_SIZE,
219+
"(with %d procs)\n", cnt_over, pg->prof->up_cycles_tocheck,
169220
pg->type, procs_no);
170221
/* we need to fork one more process here */
171222
if ( (p_id=pg->fork_func(pg->si_filter))<0 ||
@@ -174,25 +225,28 @@ void check_and_adjust_number_of_workers(void)
174225
"(current %d procs)\n",pg->type,procs_no);
175226
} else {
176227
rescale_group_history( pg, idx, procs_no, +1);
177-
pg->no_downscale_cycles = 10*PG_HISTORY_DEFAULT_SIZE;
228+
pg->no_downscale_cycles = pg->prof->down_cycles_delay;
178229
}
179230
}
180-
} else if (cnt_under==PG_HISTORY_DEFAULT_SIZE) {
181-
if (procs_no>pg->min_procs && procs_no!=1 &&
231+
} else if ( pg->prof->down_cycles_tocheck != 0 &&
232+
cnt_under == pg->prof->down_cycles_tocheck ) {
233+
if ( procs_no > pg->prof->min_procs &&
182234
pg->no_downscale_cycles==0) {
183235
/* try to estimate the load after downscaling */
184236
load = 0;
185237
k = idx;
238+
i = 0;
186239
do {
187240
load += pg->history_map[k];
188-
k = k ? (k-1) : (PG_HISTORY_DEFAULT_SIZE-1) ;
189-
} while(k!=idx);
190-
load = (load*procs_no) / (procs_no-1);
191-
if (load<PG_HLOAD_TRESHOLD) {
241+
k = k ? (k-1) : (pg->history_size-1) ;
242+
} while( k != idx && i <= pg->prof->down_cycles_tocheck );
243+
load = (load*procs_no) /
244+
(pg->prof->down_cycles_tocheck * (procs_no-1));
245+
if ( load < pg->prof->up_threshold ) {
192246
/* down scale one more process here */
193-
LM_DBG("score %d/%d -> ripping one proc from group %d "
247+
LM_NOTICE("score %d/%d -> ripping one proc from group %d "
194248
"(with %d procs), estimated load -> %d\n", cnt_under,
195-
PG_HISTORY_DEFAULT_SIZE, pg->type, procs_no,
249+
pg->prof->down_cycles_tocheck, pg->type, procs_no,
196250
load );
197251
ipc_send_rpc( last_idx_in_pg, pg->term_func, NULL);
198252
}

pt_scaling.h

+39-3
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,51 @@
3131
#include "socket_info.h"
3232
#include "ipc.h"
3333

34+
35+
36+
struct scaling_profile {
37+
/* the name of the profile */
38+
char *name;
39+
40+
/* the maximum number of processes to scale to */
41+
unsigned int max_procs;
42+
/* the load threshold (in percentages) to trigger up scaling */
43+
unsigned int up_threshold;
44+
/* the number of cycles needed to be over the TH in order to up scale */
45+
unsigned int up_cycles_needed;
46+
/* the number of cycles to check for spotting the needed ones */
47+
unsigned int up_cycles_tocheck;
48+
49+
/* the minimum number of processes to scale (if 0 -> no downscale) */
50+
unsigned int min_procs;
51+
/* the load threshold (in percentages) to trigger down scaling */
52+
unsigned int down_threshold;
53+
/* the number of cycles needed to be below the TH in order to down scale */
54+
unsigned int down_cycles_tocheck;
55+
/* the number of cycles to wait before down scaling (after up or start) */
56+
unsigned short down_cycles_delay;
57+
58+
struct scaling_profile *next;
59+
};
60+
61+
62+
int create_auto_scaling_profile( char *name,
63+
unsigned int max_procs, unsigned int up_threshold,
64+
unsigned int up_cycles_needed, unsigned int up_cycles_tocheck,
65+
unsigned int min_procs, unsigned int down_threshold,
66+
unsigned int down_cycles_tocheck, unsigned short down_cycles_delay);
67+
68+
struct scaling_profile *get_scaling_profile(char *name);
69+
70+
3471
typedef int (fork_new_process_f)(void *);
3572
typedef ipc_rpc_f terminate_process_f;
3673

3774
int create_process_group(enum process_type type,
38-
struct socket_info *si_filter,
39-
unsigned int min_procs, unsigned int max_procs,
75+
struct socket_info *si_filter, struct scaling_profile *prof,
4076
fork_new_process_f *f1, terminate_process_f *f2);
4177

42-
void check_and_adjust_number_of_workers(void);
78+
void do_workers_auto_scaling(void);
4379

4480

4581
#endif

0 commit comments

Comments
 (0)