diff --git a/src/tests/python/ampsave/tests/throughput.py b/src/tests/python/ampsave/tests/throughput.py index e343554..fd88ddb 100644 --- a/src/tests/python/ampsave/tests/throughput.py +++ b/src/tests/python/ampsave/tests/throughput.py @@ -98,6 +98,11 @@ def get_data(data): "bytes": i.bytes, "direction": direction_to_string(i.direction), "tcpreused": params["tcpreused"], + "congestion": ( i.tcpinfo.congestion_type + if i.HasField("tcpinfo") and + i.tcpinfo.HasField("congestion_type") + else + None ), } ) diff --git a/src/tests/throughput/tcpinfo.h b/src/tests/throughput/tcpinfo.h index d1158f5..fcfd45b 100644 --- a/src/tests/throughput/tcpinfo.h +++ b/src/tests/throughput/tcpinfo.h @@ -97,7 +97,7 @@ struct amp_tcp_info { __u32 tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */ __u32 tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */ - __u32 tcpi_notsent_bytes; + __u32 tcpi_notsent_bytes; __u32 tcpi_min_rtt; __u32 tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */ __u32 tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */ diff --git a/src/tests/throughput/throughput.h b/src/tests/throughput/throughput.h index 8946b3b..e4dfda1 100644 --- a/src/tests/throughput/throughput.h +++ b/src/tests/throughput/throughput.h @@ -60,6 +60,7 @@ #define DEFAULT_TEST_DURATION 10 /* iperf default: 10s */ #define MAX_MALLOC 20e6 +#define MAXSAMPLES 1024 /* Number of samples to take of RTTs */ /* * Used as shortcuts for scheduling common tests through the web interface. @@ -109,6 +110,7 @@ struct tcpinfo_result_t { uint32_t rtt; uint32_t rttvar; uint32_t min_rtt; + uint32_t congestion_type; }; /** @@ -135,6 +137,13 @@ struct test_request_t { struct test_request_t *next; }; +/* sample set of rtt timmings */ +struct rtt_samples_t { + char samples[MAXSAMPLES]; + int rttcount; + int keep_recording; +}; + /* * Global test options that control packet size and timing. @@ -180,7 +189,8 @@ int sendStream(int sock_fd, struct test_request_t *test_opts, /* Receive incoming test */ int incomingTest(int sock_fd, struct test_result_t *result); -int writeBuffer(int sock_fd, void *packet, size_t length); +int writeBuffer(int sock_fd, void *packet, size_t length, + struct rtt_samples_t * rtt_samples); int readBuffer(int test_socket); uint64_t timeNanoseconds(void); diff --git a/src/tests/throughput/throughput.proto b/src/tests/throughput/throughput.proto index 165ff05..3a3ee89 100644 --- a/src/tests/throughput/throughput.proto +++ b/src/tests/throughput/throughput.proto @@ -98,6 +98,20 @@ message TCPInfo { optional uint64 sndbuf_limited = 7; /* Minimum round trip time (usec) */ optional uint32 min_rtt = 8; + /** + * Type of congestion inferred from rtt timmings. + * Special cases for NULL if not measured + * TOO_HEALTHY for no packets dropped in range + * UNHEALTHY if packets dropped too soon + */ + enum Congestion_Type { + NULL = 0; + SELF = 1; + EXTERNAL = 2; + TOO_HEALTHY = 3; + UNHEALTHY = 4; + } + optional Congestion_Type congestion_type = 9; } diff --git a/src/tests/throughput/throughput_client.c b/src/tests/throughput/throughput_client.c index 1fe7681..6bf5e58 100644 --- a/src/tests/throughput/throughput_client.c +++ b/src/tests/throughput/throughput_client.c @@ -95,7 +95,7 @@ static void printSchedule(struct test_request_t *schedule) { * Note: this uses strtok() and will destroy the input argument * * @param options - A opt_t structure to append to the schedule - * @param request - A string reprensenting the what is to be added to + * @param request - A string representing the what is to be added to * the schedule */ static void parseSchedule(struct opt_t *options, char *request) { @@ -181,7 +181,7 @@ static void parseSchedule(struct opt_t *options, char *request) { } } - /* Get the next string and move current foward */ + /* Get the next string and move current forward */ pch = strtok(NULL, ","); current = &(*current)->next; } @@ -302,6 +302,7 @@ static struct tcpinfo_result_t *extract_tcpinfo(ProtobufCBinaryData *data) { tcpinfo->busy_time = item->tcpinfo->busy_time; tcpinfo->rwnd_limited = item->tcpinfo->rwnd_limited; tcpinfo->sndbuf_limited = item->tcpinfo->sndbuf_limited; + tcpinfo->congestion_type = item->tcpinfo->congestion_type; } amplet2__throughput__item__free_unpacked(item, NULL); @@ -922,6 +923,29 @@ void print_throughput(amp_test_result_t *result) { 100.0 * item->tcpinfo->sndbuf_limited / item->tcpinfo->busy_time); } + + printf("\tLimiting congestion: "); + switch ( item->tcpinfo->congestion_type ) { + case AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__NULL: + printf("No congestion tests performed\n"); + break; + case AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__SELF: + printf("Self induced congestion from test traffic\n"); + break; + case AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__EXTERNAL: + printf("External congestion from other traffic\n"); + break; + case AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__TOO_HEALTHY: + printf("Can not infer end of slow-start\n"); + break; + case AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__UNHEALTHY: + printf("Less than two packets of slow-start\n"); + break; + default: + printf("Something went wrong\n"); + break; + } + } else { printf("\tNo further TCP information available from sender\n"); } diff --git a/src/tests/throughput/throughput_common.c b/src/tests/throughput/throughput_common.c index 27e5b00..80ed2cd 100644 --- a/src/tests/throughput/throughput_common.c +++ b/src/tests/throughput/throughput_common.c @@ -39,7 +39,7 @@ */ /** - * Common functions used by both the throughtput client and + * Common functions used by both the throughput client and * server * * @author Richard Sanger @@ -226,6 +226,8 @@ Amplet2__Throughput__Item* report_schedule(struct test_request_t *info) { item->tcpinfo->rwnd_limited = info->result->tcpinfo->rwnd_limited; item->tcpinfo->has_sndbuf_limited = 1; item->tcpinfo->sndbuf_limited = info->result->tcpinfo->sndbuf_limited; + item->tcpinfo->has_congestion_type = 1; + item->tcpinfo->congestion_type = info->result->tcpinfo->congestion_type; } Log(LOG_DEBUG, "tput result: %" PRIu64 " bytes in %" PRIu64 "ms to %s", @@ -340,12 +342,11 @@ static void addHttpHeaders(void *data, unsigned int size) { } - /* * Query the socket for some more in-depth information about the TCP state - * at the end of the throughput test. + * can specify if waiting till connection is finished sending. */ -static struct tcpinfo_result_t *get_tcp_info(int sock_fd) { +static struct tcpinfo_result_t *get_tcp_info(int sock_fd, int wait_finished) { struct tcpinfo_result_t *result; struct amp_tcp_info *tcp_info = calloc(1, sizeof(struct amp_tcp_info)); int tcp_info_len = sizeof(struct amp_tcp_info); @@ -362,32 +363,27 @@ static struct tcpinfo_result_t *get_tcp_info(int sock_fd) { /* delay until all bytes have been sent or we've waited long enough */ attempts--; } while ( attempts > 0 && tcp_info->tcpi_notsent_bytes > 0 && - usleep(100000) == 0 ); - - /* - * don't try to parse any results that don't match our expected format - - * new fields get added at any point in the structure so we can't be sure - * that we are getting the correct data. - */ - if ( tcp_info_len != sizeof(struct amp_tcp_info) ) { - free(tcp_info); - return NULL; - } - + wait_finished && usleep(100000) == 0 ); result = calloc(1, sizeof(struct tcpinfo_result_t)); /* - * if the connection isn't app limited then the delivery rate is the most - * recent value rather than the maximum - * TODO why isn't it always app limited when we have no data left to send? - * https://github.com/torvalds/linux/commit/eb8329e0a04db0061f714f033b4454 - * https://github.com/torvalds/linux/commit/b9f64820fb226a4e8ab10591f46cec - * https://github.com/torvalds/linux/commit/d7722e8570fc0f1e003cee7cf37694 - */ + * if the connection isn't app limited then the delivery rate is the most + * recent value rather than the maximum + * TODO why isn't it always app limited when we have no data left to send? + * https://github.com/torvalds/linux/commit/eb8329e0a04db0061f714f033b4454 + * https://github.com/torvalds/linux/commit/b9f64820fb226a4e8ab10591f46cec + * https://github.com/torvalds/linux/commit/d7722e8570fc0f1e003cee7cf37694 + */ if ( tcp_info->tcpi_delivery_rate_app_limited ) { result->delivery_rate = tcp_info->tcpi_delivery_rate; } + /* + * Core part of a struct that contains the values we care about don't change + * or don't exist, so we can read where they could be which will either have + * the value or be empty, both are valid. + */ + result->total_retrans = tcp_info->tcpi_total_retrans; result->rtt = tcp_info->tcpi_rtt; result->rttvar = tcp_info->tcpi_rttvar; @@ -402,7 +398,105 @@ static struct tcpinfo_result_t *get_tcp_info(int sock_fd) { } -/** +/* + * Decision tree classifier created from data set generated by this program + */ +static int classify(float CoV, float NormRange) { + if ( NormRange <= 0.816066712141037 ) { + if ( CoV <= 0.09541601315140724 ) { + return AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__EXTERNAL; + } else { + if ( NormRange <= 0.6482022404670715 ) { + return AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__EXTERNAL; + } else { + return AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__SELF; + } + } + } else { + if ( CoV <= 0.11838533729314804 ) { + if ( NormRange <= 0.9539760947227478 ) { + return AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__EXTERNAL; + } else { + return AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__SELF; + } + } else { + return AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__SELF; + } + } +} + + + +/* + * Infer the kind of congestion that is occurring from the collected round trip + * times of the slow start period by calculating the Normalized range + * and the Coefficient of variation. + */ +static void process_rtt(struct test_result_t *res, + struct rtt_samples_t *rtt_samples) { + + int32_t mean_sum = 0; + int32_t min = INT_MAX; + int32_t max = INT_MIN; + float mean = 0; + float sum = 0; + float NormRange; + float CoV; + + + if ( rtt_samples->rttcount <= 2 ) { + /* + * Insufficient records to say what form of congestion, + * link is too unhealthy to infer congestion reason + */ + res->tcpinfo->congestion_type = + AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__UNHEALTHY; + return; + } else if ( res->tcpinfo->total_retrans == 0 ) { + /* + * somehow managed to not get a single re-transmit, + * link is too healthy to infer congestion reason + * (or even if congestion is occurring at all) + */ + res->tcpinfo->congestion_type = + AMPLET2__THROUGHPUT__TCPINFO__CONGESTION__TYPE__TOO_HEALTHY; + return; + } + + /* Calculate the mean/min/max */ + for ( int i = 0; i < rtt_samples->rttcount; i++ ) { + mean_sum+= rtt_samples->samples[i]; + if ( rtt_samples->samples[i] > max ) max = rtt_samples->samples[i]; + if ( rtt_samples->samples[i] < min ) min = rtt_samples->samples[i]; + } + mean = (float)mean_sum / (float)rtt_samples->rttcount; + + /* Calculate std dev */ + for (int i = 0; i < rtt_samples->rttcount; i++ ) { + sum += ((rtt_samples->samples[i] - mean)*(rtt_samples->samples[i] - mean)); + } + sum /= (float)rtt_samples->rttcount; + + /* Normalized range of rtt */ + NormRange = (max - min)/(float)max; + + /* Coefficient of variation of rrt */ + CoV = sum/(mean*mean); + + res->tcpinfo->congestion_type = classify(CoV, NormRange); + + /* output dump for training data */ + Log(LOG_DEBUG, "Congestion-Data: CoV:%.10f NormRange:%.10f " + "RTT:count%d type:%d\n", + CoV, + NormRange, + rtt_samples->rttcount, + res->tcpinfo->congestion_type); + +} + + +/* * Do the actual write and ensure the entire buffer is written. * * @param sock_fd @@ -414,13 +508,26 @@ static struct tcpinfo_result_t *get_tcp_info(int sock_fd) { * * @return 0 if successful, -1 if failure. */ -int writeBuffer(int sock_fd, void *data, size_t length) { +int writeBuffer(int sock_fd, void *data, size_t length, + struct rtt_samples_t *rtt_samples) { int result; size_t total_written = 0; do { result = write(sock_fd, data + total_written, length - total_written); + if ( rtt_samples->keep_recording ) { + struct tcpinfo_result_t* tcpi = get_tcp_info(sock_fd, 0); + + if ( (tcpi->total_retrans != 0) || + (rtt_samples->rttcount >= MAXSAMPLES) ) { + rtt_samples->keep_recording = 0; + } + rtt_samples->samples[rtt_samples->rttcount] = tcpi->rtt; + rtt_samples->rttcount++; + free(tcpi); + } + if ( result > 0 ) { total_written += result; } @@ -497,6 +604,10 @@ int sendStream(int sock_fd, struct test_request_t *test_opts, int result; fd_set write_set; + struct rtt_samples_t *rtt_samples = calloc(1, sizeof(struct rtt_samples_t)); + rtt_samples->rttcount = 0; + rtt_samples->keep_recording = 1; + /* Make sure the test is valid */ if ( test_opts->bytes == 0 && test_opts->duration == 0 ) { Log(LOG_ERR, "no terminating condition for test"); @@ -587,7 +698,7 @@ int sendStream(int sock_fd, struct test_request_t *test_opts, /* send the data */ if ( (bytes_sent = writeBuffer(sock_fd, packet_out, - bytes_to_send)) < 0 ) { + bytes_to_send, rtt_samples)) < 0 ) { Log(LOG_ERR, "sendStream() could not send data packet\n"); break; } @@ -597,10 +708,11 @@ int sendStream(int sock_fd, struct test_request_t *test_opts, } while ( more ); res->end_ns = timeNanoseconds(); - free(packet_out); - - res->tcpinfo = get_tcp_info(sock_fd); + res->tcpinfo = get_tcp_info(sock_fd, 1); + process_rtt(res, rtt_samples); + free(packet_out); + free(rtt_samples); return 0; }