1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
|
/* Licensed under GPLv3+ - see LICENSE file for details */
#include <ccan/lbalance/lbalance.h>
#include <ccan/tlist/tlist.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <stdlib.h>
/* Define tlist_lbalance_task */
TLIST_TYPE(lbalance_task, struct lbalance_task);
struct stats {
/* How many stats of for this value do we have? */
unsigned int num_stats;
/* What was our total work rate? */
float work_rate;
};
struct lbalance {
struct tlist_lbalance_task tasks;
unsigned int num_tasks;
/* We figured out how many we want to run. */
unsigned int target;
/* We need to recalc once a report comes in via lbalance_task_free. */
bool target_uptodate;
/* Integral of how many tasks were running so far */
struct timeval prev_tasks_time;
float tasks_sum;
/* For differential rusage. */
struct rusage prev_usage;
/* How many stats we have collected (we invalidate old ones). */
unsigned int total_stats;
/* Array of stats, indexed by number of tasks we were running. */
unsigned int max_stats;
struct stats *stats;
};
struct lbalance_task {
struct lbalance *lb;
struct list_node list;
/* The time this task started */
struct timeval start;
float tasks_sum_start;
};
struct lbalance *lbalance_new(void)
{
struct lbalance *lb = malloc(sizeof *lb);
if (!lb)
return NULL;
tlist_init(&lb->tasks);
lb->num_tasks = 0;
gettimeofday(&lb->prev_tasks_time, NULL);
lb->tasks_sum = 0.0;
getrusage(RUSAGE_CHILDREN, &lb->prev_usage);
lb->max_stats = 1;
lb->stats = malloc(sizeof(lb->stats[0]) * lb->max_stats);
if (!lb->stats) {
free(lb);
return NULL;
}
lb->stats[0].num_stats = 0;
lb->stats[0].work_rate = 0.0;
lb->total_stats = 0;
/* Start with # CPUS as a guess. */
lb->target = -1L;
#ifdef _SC_NPROCESSORS_ONLN
lb->target = sysconf(_SC_NPROCESSORS_ONLN);
#elif defined(_SC_NPROCESSORS_CONF)
if (lb->target == (unsigned int)-1L)
lb->target = sysconf(_SC_NPROCESSORS_CONF);
#endif
/* Otherwise, two is a good number. */
if (lb->target == (unsigned int)-1L || lb->target < 2)
lb->target = 2;
lb->target_uptodate = true;
return lb;
}
/* Return time differences in usec */
static float timeval_sub(struct timeval recent, struct timeval old)
{
float diff;
if (old.tv_usec > recent.tv_usec) {
diff = 1000000 + recent.tv_usec - old.tv_usec;
recent.tv_sec--;
} else
diff = recent.tv_usec - old.tv_usec;
diff += (float)(recent.tv_sec - old.tv_sec) * 1000000;
return diff;
}
/* There were num_tasks running between prev_tasks_time and now. */
static void update_tasks_sum(struct lbalance *lb,
const struct timeval *now)
{
lb->tasks_sum += timeval_sub(*now, lb->prev_tasks_time)
* lb->num_tasks;
lb->prev_tasks_time = *now;
}
struct lbalance_task *lbalance_task_new(struct lbalance *lb)
{
struct lbalance_task *task = malloc(sizeof *task);
if (!task)
return NULL;
if (lb->num_tasks + 1 == lb->max_stats) {
struct stats *s = realloc(lb->stats,
sizeof(*s) * (lb->max_stats + 1));
if (!s) {
free(task);
return NULL;
}
lb->stats = s;
lb->stats[lb->max_stats].num_stats = 0;
lb->stats[lb->max_stats].work_rate = 0.0;
lb->max_stats++;
}
task->lb = lb;
gettimeofday(&task->start, NULL);
/* Record that we ran num_tasks up until now. */
update_tasks_sum(lb, &task->start);
task->tasks_sum_start = lb->tasks_sum;
tlist_add_tail(&lb->tasks, task, list);
lb->num_tasks++;
return task;
}
/* We slowly erase old stats, once we have enough. */
static void degrade_stats(struct lbalance *lb)
{
unsigned int i;
if (lb->total_stats < lb->max_stats * 16)
return;
#if 0
fprintf(stderr, ".");
#endif
for (i = 0; i < lb->max_stats; i++) {
struct stats *s = &lb->stats[i];
unsigned int stats_lost = (s->num_stats + 1) / 2;
s->work_rate *= (float)(s->num_stats - stats_lost)
/ s->num_stats;
s->num_stats -= stats_lost;
lb->total_stats -= stats_lost;
if (s->num_stats == 0)
s->work_rate = 0.0;
}
}
static void add_to_stats(struct lbalance *lb,
unsigned int num_tasks,
float work_rate)
{
#if 0
fprintf(stderr, "With %.2f running, work rate was %.5f\n",
num_tasks, work_rate);
#endif
assert(num_tasks >= 1);
assert(num_tasks < lb->max_stats);
lb->stats[num_tasks].num_stats++;
lb->stats[num_tasks].work_rate += work_rate;
lb->total_stats++;
lb->target_uptodate = false;
}
void lbalance_task_free(struct lbalance_task *task,
const struct rusage *usage)
{
float work_done, duration;
unsigned int num_tasks;
struct timeval now;
struct rusage ru;
gettimeofday(&now, NULL);
duration = timeval_sub(now, task->start);
getrusage(RUSAGE_CHILDREN, &ru);
if (usage) {
work_done = usage->ru_utime.tv_usec + usage->ru_stime.tv_usec
+ (usage->ru_utime.tv_sec + usage->ru_stime.tv_sec)
* 1000000;
} else {
/* Take difference in rusage as rusage of that task. */
work_done = timeval_sub(ru.ru_utime,
task->lb->prev_usage.ru_utime)
+ timeval_sub(ru.ru_stime,
task->lb->prev_usage.ru_utime);
}
/* Update previous usage. */
task->lb->prev_usage = ru;
/* Record that we ran num_tasks up until now. */
update_tasks_sum(task->lb, &now);
/* So, on average, how many tasks were running during this time? */
num_tasks = (task->lb->tasks_sum - task->tasks_sum_start)
/ duration + 0.5;
/* Record the work rate for that many tasks. */
add_to_stats(task->lb, num_tasks, work_done / duration);
/* We throw away old stats. */
degrade_stats(task->lb);
/* We need to recalculate the target. */
task->lb->target_uptodate = false;
/* Remove this task. */
tlist_del_from(&task->lb->tasks, task, list);
task->lb->num_tasks--;
free(task);
}
/* We look for the point where the work rate starts to drop. Say you have
* 4 cpus, we'd expect the work rate for 5 processes to drop 20%.
*
* If we're within 1/4 of that ideal ratio, we assume it's still
* optimal. Any drop of more than 1/2 is interpreted as the point we
* are overloaded. */
static unsigned int best_target(const struct lbalance *lb)
{
unsigned int i, found_drop = 0;
float best_f_max = -1.0, cliff = -1.0;
#if 0
for (i = 1; i < lb->max_stats; i++) {
printf("%u: %f (%u)\n", i,
lb->stats[i].work_rate / lb->stats[i].num_stats,
lb->stats[i].num_stats);
}
#endif
for (i = 1; i < lb->max_stats; i++) {
float f;
if (!lb->stats[i].num_stats)
f = 0;
else
f = lb->stats[i].work_rate / lb->stats[i].num_stats;
if (f > best_f_max) {
#if 0
printf("Best is %i\n", i);
#endif
best_f_max = f - (f / (i + 1)) / 4;
cliff = f - (f / (i + 1)) / 2;
found_drop = 0;
} else if (!found_drop && f < cliff) {
#if 0
printf("Found drop at %i\n", i);
#endif
found_drop = i;
}
}
if (found_drop) {
return found_drop - 1;
}
return i - 1;
}
static unsigned int calculate_target(struct lbalance *lb)
{
unsigned int target;
target = best_target(lb);
/* Jitter if the adjacent ones are unknown. */
if (target >= lb->max_stats || lb->stats[target].num_stats == 0)
return target;
if (target + 1 == lb->max_stats || lb->stats[target+1].num_stats == 0)
return target + 1;
if (target > 1 && lb->stats[target-1].num_stats == 0)
return target - 1;
return target;
}
unsigned lbalance_target(struct lbalance *lb)
{
if (!lb->target_uptodate) {
lb->target = calculate_target(lb);
lb->target_uptodate = true;
}
return lb->target;
}
void lbalance_free(struct lbalance *lb)
{
struct lbalance_task *task;
while ((task = tlist_top(&lb->tasks, list))) {
assert(task->lb == lb);
tlist_del_from(&lb->tasks, task, list);
lb->num_tasks--;
free(task);
}
assert(lb->num_tasks == 0);
free(lb->stats);
free(lb);
}
|