rdkafka_performance: added latency measurement mode (-l) and offset reporting (-O)

This commit is contained in:
Magnus Edenhill 2014-11-04 23:12:14 +01:00
parent 031e3b7995
commit 65880c390c

View file

@ -54,6 +54,8 @@ static int exit_eof = 0;
static FILE *stats_fp; static FILE *stats_fp;
static int dr_disp_div; static int dr_disp_div;
static int verbosity = 1; static int verbosity = 1;
static int latency_mode = 0;
static int report_offset = 0;
static void stop (int sig) { static void stop (int sig) {
if (!run) if (!run)
@ -65,6 +67,11 @@ static long int msgs_wait_cnt = 0;
static rd_ts_t t_end; static rd_ts_t t_end;
static rd_kafka_t *rk; static rd_kafka_t *rk;
struct avg {
int64_t val;
int cnt;
uint64_t ts_start;
};
static struct { static struct {
rd_ts_t t_start; rd_ts_t t_start;
@ -79,13 +86,27 @@ static struct {
uint64_t tx_err; uint64_t tx_err;
uint64_t avg_rtt; uint64_t avg_rtt;
uint64_t offset; uint64_t offset;
rd_ts_t t_latency; rd_ts_t t_fetch_latency;
rd_ts_t t_last; rd_ts_t t_last;
rd_ts_t t_enobufs_last; rd_ts_t t_enobufs_last;
rd_ts_t t_total; rd_ts_t t_total;
rd_ts_t latency_last;
rd_ts_t latency_lo;
rd_ts_t latency_hi;
rd_ts_t latency_sum;
int latency_cnt;
int64_t last_offset;
} cnt = {}; } cnt = {};
/* Returns wall clock time in microseconds */
uint64_t wall_clock (void) {
struct timeval tv;
gettimeofday(&tv, NULL);
return ((uint64_t)tv.tv_sec * 1000000LLU) +
((uint64_t)tv.tv_usec);
}
static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) { static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
printf("%% ERROR CALLBACK: %s: %s: %s\n", printf("%% ERROR CALLBACK: %s: %s: %s\n",
rd_kafka_name(rk), rd_kafka_err2str(err), reason); rd_kafka_name(rk), rd_kafka_err2str(err), reason);
@ -93,9 +114,7 @@ static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
static void msg_delivered (rd_kafka_t *rk, static void msg_delivered (rd_kafka_t *rk,
void *payload, size_t len, const rd_kafka_message_t *rkmessage, void *opaque) {
int error_code,
void *opaque, void *msg_opaque) {
static rd_ts_t last; static rd_ts_t last;
rd_ts_t now = rd_clock(); rd_ts_t now = rd_clock();
static int msgs; static int msgs;
@ -104,33 +123,39 @@ static void msg_delivered (rd_kafka_t *rk,
msgs_wait_cnt--; msgs_wait_cnt--;
if (error_code) if (rkmessage->err)
cnt.msgs_dr_err++; cnt.msgs_dr_err++;
else { else {
cnt.msgs_dr_ok++; cnt.msgs_dr_ok++;
cnt.bytes_dr_ok += len; cnt.bytes_dr_ok += rkmessage->len;
} }
if ((error_code && if ((rkmessage->err &&
(cnt.msgs_dr_err < 50 || (cnt.msgs_dr_err < 50 ||
!(cnt.msgs_dr_err % (dispintvl / 1000)))) || !(cnt.msgs_dr_err % (dispintvl / 1000)))) ||
!last || msgs_wait_cnt < 5 || !last || msgs_wait_cnt < 5 ||
!(msgs_wait_cnt % dr_disp_div) || !(msgs_wait_cnt % dr_disp_div) ||
(now - last) >= dispintvl * 1000 || (now - last) >= dispintvl * 1000 ||
verbosity >= 3) { verbosity >= 3) {
if (error_code && verbosity >= 2) if (rkmessage->err && verbosity >= 2)
printf("%% Message delivery failed: %s (%li remain)\n", printf("%% Message delivery failed: %s (%li remain)\n",
rd_kafka_err2str(error_code), rd_kafka_err2str(rkmessage->err),
msgs_wait_cnt); msgs_wait_cnt);
else if (verbosity >= 2) else if (verbosity >= 2)
printf("%% Message delivered: %li remain\n", printf("%% Message delivered (offset %"PRId64"): "
msgs_wait_cnt); "%li remain\n",
rkmessage->offset, msgs_wait_cnt);
if (verbosity >= 3 && do_seq) if (verbosity >= 3 && do_seq)
printf(" --> \"%.*s\"\n", (int)len, (char *)payload); printf(" --> \"%.*s\"\n",
(int)rkmessage->len,
(const char *)rkmessage->payload);
last = now; last = now;
} }
if (report_offset)
cnt.last_offset = rkmessage->offset;
if (msgs_wait_cnt == 0 && !forever) { if (msgs_wait_cnt == 0 && !forever) {
if (verbosity >= 2) if (verbosity >= 2)
printf("All messages delivered!\n"); printf("All messages delivered!\n");
@ -143,7 +168,6 @@ static void msg_delivered (rd_kafka_t *rk,
exit_after); exit_after);
exit(0); exit(0);
} }
} }
@ -185,14 +209,35 @@ static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
printf("@%"PRId64": %.*s\n", printf("@%"PRId64": %.*s\n",
rkmessage->offset, rkmessage->offset,
(int)rkmessage->len, (char *)rkmessage->payload); (int)rkmessage->len, (char *)rkmessage->payload);
#if 0 /* Future API */
/* We store offset when we're done processing
* the current message. */
rd_kafka_offset_store(rkmessage->rkt, rkmessage->partition,
rd_kafka_offset_next(rkmessage));
#endif
if (latency_mode) {
uint64_t remote_ts, ts;
if (rkmessage->len > 8 &&
!memcmp(rkmessage->payload, "LATENCY:", 8) &&
sscanf(rkmessage->payload, "LATENCY:%"SCNd64,
&remote_ts) == 1) {
ts = wall_clock() - remote_ts;
if (ts > 0 && ts < (1000000 * 60 * 5)) {
if (ts > cnt.latency_hi)
cnt.latency_hi = ts;
if (!cnt.latency_lo || ts < cnt.latency_lo)
cnt.latency_lo = ts;
cnt.latency_last = ts;
cnt.latency_cnt++;
cnt.latency_sum += ts;
} else {
if (verbosity >= 1)
printf("Received latency timestamp is too far off: %"PRId64"us (message offset %"PRId64"): ignored\n",
ts, rkmessage->offset);
}
} else
printf("not a LATENCY: %.*s\n",
(int)rkmessage->len,
(char *)rkmessage->payload);
}
} }
@ -287,12 +332,24 @@ static void print_stats (int mode, int otype, const char *compression) {
rd_ts_t now = rd_clock(); rd_ts_t now = rd_clock();
rd_ts_t t_total; rd_ts_t t_total;
static int rows_written = 0; static int rows_written = 0;
int print_header;
char extra[512];
int extra_of = 0;
*extra = '\0';
#define EXTRA_PRINTF(fmt...) do { \
if (extra_of < sizeof(extra)) \
extra_of += snprintf(extra+extra_of, \
sizeof(extra)-extra_of, fmt); \
} while (0)
if (!(otype & _OTYPE_FORCE) && if (!(otype & _OTYPE_FORCE) &&
(verbosity == 0 || (((otype & _OTYPE_SUMMARY) && verbosity == 0) ||
cnt.t_last + dispintvl > now)) cnt.t_last + dispintvl > now))
return; return;
print_header = !rows_written ||(verbosity > 0 && !(rows_written % 20));
if (cnt.t_end_send) if (cnt.t_end_send)
t_total = cnt.t_end_send - cnt.t_start; t_total = cnt.t_end_send - cnt.t_start;
else if (cnt.t_end) else if (cnt.t_end)
@ -312,7 +369,7 @@ static void print_stats (int mode, int otype, const char *compression) {
rows_written++; \ rows_written++; \
} while (0) } while (0)
if (!(rows_written % 20)) { if (print_header) {
/* First time, print header */ /* First time, print header */
ROW_START(); ROW_START();
COL_HDR("elapsed"); COL_HDR("elapsed");
@ -325,6 +382,8 @@ static void print_stats (int mode, int otype, const char *compression) {
COL_HDR("dr_err"); COL_HDR("dr_err");
COL_HDR("tx_err"); COL_HDR("tx_err");
COL_HDR("outq"); COL_HDR("outq");
if (report_offset)
COL_HDR("offset");
ROW_END(); ROW_END();
} }
@ -341,28 +400,38 @@ static void print_stats (int mode, int otype, const char *compression) {
COL_PR64("dr_err", cnt.msgs_dr_err); COL_PR64("dr_err", cnt.msgs_dr_err);
COL_PR64("tx_err", cnt.tx_err); COL_PR64("tx_err", cnt.tx_err);
COL_PR64("outq", (uint64_t)rd_kafka_outq_len(rk)); COL_PR64("outq", (uint64_t)rd_kafka_outq_len(rk));
if (report_offset)
COL_PR64("offset", (uint64_t)cnt.last_offset);
ROW_END(); ROW_END();
} }
if (otype & _OTYPE_SUMMARY) { if (otype & _OTYPE_SUMMARY) {
printf("%% %"PRIu64" messages produced " printf("%% %"PRIu64" messages produced "
"(%"PRIu64" bytes), " "(%"PRIu64" bytes), "
"%"PRIu64" delivered (%"PRIu64" failed) " "%"PRIu64" delivered "
"(offset %"PRId64", %"PRIu64" failed) "
"in %"PRIu64"ms: %"PRIu64" msgs/s and " "in %"PRIu64"ms: %"PRIu64" msgs/s and "
"%.02f Mb/s, " "%.02f Mb/s, "
"%"PRIu64" produce failures, %i in queue, " "%"PRIu64" produce failures, %i in queue, "
"%s compression\n", "%s compression\n",
cnt.msgs, cnt.bytes, cnt.msgs, cnt.bytes,
cnt.msgs_dr_ok, cnt.msgs_dr_err, cnt.msgs_dr_ok, cnt.last_offset, cnt.msgs_dr_err,
t_total / 1000, t_total / 1000,
((cnt.msgs_dr_ok * 1000000) / t_total), ((cnt.msgs_dr_ok * 1000000) / t_total),
(float)((cnt.bytes_dr_ok) / (float)t_total), (float)((cnt.bytes_dr_ok) / (float)t_total),
cnt.tx_err, rd_kafka_outq_len(rk), compression); cnt.tx_err, rd_kafka_outq_len(rk),
compression);
} }
} else { } else {
float latency_avg = 0.0f;
if (latency_mode && cnt.latency_cnt)
latency_avg = (double)cnt.latency_sum /
(float)cnt.latency_cnt;
if (otype & _OTYPE_TAB) { if (otype & _OTYPE_TAB) {
if (!(rows_written % 20)) { if (print_header) {
/* First time, print header */ /* First time, print header */
ROW_START(); ROW_START();
COL_HDR("elapsed"); COL_HDR("elapsed");
@ -373,6 +442,12 @@ static void print_stats (int mode, int otype, const char *compression) {
COL_HDR("MB/s"); COL_HDR("MB/s");
COL_HDR("rx_err"); COL_HDR("rx_err");
COL_HDR("offset"); COL_HDR("offset");
if (latency_mode) {
COL_HDR("lat_curr");
COL_HDR("lat_avg");
COL_HDR("lat_lo");
COL_HDR("lat_hi");
}
ROW_END(); ROW_END();
} }
@ -387,22 +462,37 @@ static void print_stats (int mode, int otype, const char *compression) {
(float)((cnt.bytes) / (float)t_total)); (float)((cnt.bytes) / (float)t_total));
COL_PR64("rx_err", cnt.msgs_dr_err); COL_PR64("rx_err", cnt.msgs_dr_err);
COL_PR64("offset", cnt.offset); COL_PR64("offset", cnt.offset);
if (latency_mode) {
COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
COL_PRF("lat_avg", latency_avg / 1000.0f);
COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
}
ROW_END(); ROW_END();
} }
if (otype & _OTYPE_SUMMARY) { if (otype & _OTYPE_SUMMARY) {
printf("%% %"PRIu64" messages and %"PRIu64" bytes " if (latency_avg >= 1.0f)
"%s in %"PRIu64"ms: %"PRIu64" msgs/s and " extra_of += snprintf(extra+extra_of,
"%.02f Mb/s, " sizeof(extra)-extra_of,
"%s compression\n", ", latency "
"curr/avg/lo/hi "
"%.2f/%.2f/%.2f/%.2fms",
cnt.latency_last / 1000.0f,
latency_avg / 1000.0f,
cnt.latency_lo / 1000.0f,
cnt.latency_hi / 1000.0f)
;
printf("%% %"PRIu64" messages (%"PRIu64" bytes) "
"consumed in %"PRIu64"ms: %"PRIu64" msgs/s "
"(%.02f Mb/s)"
"%s\n",
cnt.msgs, cnt.bytes, cnt.msgs, cnt.bytes,
mode == 'P' ? "produced" : "consumed",
t_total / 1000, t_total / 1000,
((cnt.msgs * 1000000) / t_total), ((cnt.msgs * 1000000) / t_total),
(float)((cnt.bytes) / (float)t_total), (float)((cnt.bytes) / (float)t_total),
compression); extra);
} }
} }
@ -448,7 +538,7 @@ int main (int argc, char **argv) {
/* Kafka configuration */ /* Kafka configuration */
conf = rd_kafka_conf_new(); conf = rd_kafka_conf_new();
rd_kafka_conf_set_error_cb(conf, err_cb); rd_kafka_conf_set_error_cb(conf, err_cb);
rd_kafka_conf_set_dr_cb(conf, msg_delivered); rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);
/* Producer config */ /* Producer config */
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "500000", rd_kafka_conf_set(conf, "queue.buffering.max.messages", "500000",
@ -473,7 +563,7 @@ int main (int argc, char **argv) {
while ((opt = while ((opt =
getopt(argc, argv, getopt(argc, argv,
"PCt:p:b:s:k:c:fi:Dd:m:S:x:" "PCt:p:b:s:k:c:fi:Dd:m:S:x:"
"R:a:z:o:X:B:eT:G:qvIur:")) != -1) { "R:a:z:o:X:B:eT:G:qvIur:lO")) != -1) {
switch (opt) { switch (opt) {
case 'P': case 'P':
case 'C': case 'C':
@ -625,6 +715,7 @@ int main (int argc, char **argv) {
case 'u': case 'u':
otype = _OTYPE_TAB; otype = _OTYPE_TAB;
verbosity--; /* remove some fluff */
break; break;
case 'r': case 'r':
@ -639,6 +730,22 @@ int main (int argc, char **argv) {
rate_sleep = (int)(1000000.0 / dtmp); rate_sleep = (int)(1000000.0 / dtmp);
break; break;
case 'l':
latency_mode = 1;
break;
case 'O':
if (rd_kafka_topic_conf_set(topic_conf,
"produce.offset.report",
"true",
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
report_offset = 1;
break;
default: default:
goto usage; goto usage;
} }
@ -689,6 +796,11 @@ int main (int argc, char **argv) {
" -v Increase verbosity (default 1)\n" " -v Increase verbosity (default 1)\n"
" -u Output stats in table format\n" " -u Output stats in table format\n"
" -r <rate> Producer msg/s limit\n" " -r <rate> Producer msg/s limit\n"
" -l Latency measurement.\n"
" Needs two matching instances, one\n"
" consumer and one producer, both\n"
" running with the -l switch.\n"
" -O Report produced offset (producer)\n"
"\n" "\n"
" In Consumer mode:\n" " In Consumer mode:\n"
" consumes messages and prints thruput\n" " consumes messages and prints thruput\n"
@ -707,8 +819,9 @@ int main (int argc, char **argv) {
dispintvl *= 1000; /* us */ dispintvl *= 1000; /* us */
printf("%% Using random seed %i, verbosity level %i\n", if (verbosity > 1)
seed, verbosity); printf("%% Using random seed %i, verbosity level %i\n",
seed, verbosity);
srand(seed); srand(seed);
signal(SIGINT, stop); signal(SIGINT, stop);
signal(SIGUSR1, sig_usr1); signal(SIGUSR1, sig_usr1);
@ -740,6 +853,9 @@ int main (int argc, char **argv) {
exit(1); exit(1);
} }
if (latency_mode)
do_seq = 0;
if (stats_intvlstr) { if (stats_intvlstr) {
/* User enabled stats (-T) */ /* User enabled stats (-T) */
@ -768,11 +884,17 @@ int main (int argc, char **argv) {
off_t rof = 0; off_t rof = 0;
size_t plen = strlen(msgpattern); size_t plen = strlen(msgpattern);
if (do_seq) { if (latency_mode) {
if (msgsize < strlen("18446744073709551615: ")+1) msgsize = strlen("LATENCY:") +
msgsize = strlen("18446744073709551615: ")+1; strlen("18446744073709551615 ")+1;
sendflags |= RD_KAFKA_MSG_F_COPY;
} else if (do_seq) {
int minlen = strlen("18446744073709551615 ")+1;
if (msgsize < minlen)
msgsize = minlen;
/* Force duplication of payload */ /* Force duplication of payload */
sendflags |= RD_KAFKA_MSG_F_FREE; sendflags |= RD_KAFKA_MSG_F_FREE;
} }
sbuf = malloc(msgsize); sbuf = malloc(msgsize);
@ -832,9 +954,13 @@ int main (int argc, char **argv) {
continue; continue;
} }
if (do_seq) { if (latency_mode) {
snprintf(sbuf, msgsize-1, "%"PRIu64": ", seq); snprintf(sbuf, msgsize-1,
seq++; "LATENCY:%"PRIu64, wall_clock());
} else if (do_seq) {
snprintf(sbuf,
msgsize-1, "%"PRIu64": ", seq);
seq++;
} }
if (sendflags & RD_KAFKA_MSG_F_FREE) { if (sendflags & RD_KAFKA_MSG_F_FREE) {
@ -987,18 +1113,18 @@ int main (int argc, char **argv) {
rd_kafka_err2str(rd_kafka_errno2err(errno))); rd_kafka_err2str(rd_kafka_errno2err(errno)));
exit(1); exit(1);
} }
cnt.t_start = rd_clock(); cnt.t_start = rd_clock();
while (run && (msgcnt == -1 || msgcnt > cnt.msgs)) { while (run && (msgcnt == -1 || msgcnt > cnt.msgs)) {
/* Consume messages. /* Consume messages.
* A message may either be a real message, or * A message may either be a real message, or
* an error signaling (if rkmessage->err is set). * an error signaling (if rkmessage->err is set).
*/ */
uint64_t latency; uint64_t fetch_latency;
int r; int r;
latency = rd_clock(); fetch_latency = rd_clock();
if (batch_size) { if (batch_size) {
int i; int i;
@ -1022,8 +1148,7 @@ int main (int argc, char **argv) {
NULL); NULL);
} }
cnt.t_latency += rd_clock() - latency; cnt.t_fetch_latency += rd_clock() - fetch_latency;
if (r == -1) if (r == -1)
fprintf(stderr, "%% Error: %s\n", fprintf(stderr, "%% Error: %s\n",
rd_kafka_err2str( rd_kafka_err2str(
@ -1052,9 +1177,9 @@ int main (int argc, char **argv) {
print_stats(mode, otype|_OTYPE_FORCE, compression); print_stats(mode, otype|_OTYPE_FORCE, compression);
if (cnt.t_latency && cnt.msgs) if (cnt.t_fetch_latency && cnt.msgs)
printf("%% Average application fetch latency: %"PRIu64"us\n", printf("%% Average application fetch latency: %"PRIu64"us\n",
cnt.t_latency / cnt.msgs); cnt.t_fetch_latency / cnt.msgs);
if (stats_cmd) { if (stats_cmd) {
pclose(stats_fp); pclose(stats_fp);