diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..13a7fd0 --- /dev/null +++ b/COPYING @@ -0,0 +1,25 @@ +BSD 2-Clause License + +Copyright (c) 2020, Tampere University +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1aa5b3b --- /dev/null +++ b/Makefile @@ -0,0 +1,73 @@ +.PHONY: all clean + +CXX = g++ +CXXFLAGS = -Wall -Wextra -O2 -std=c++11 -g + +uvgrtp_receiver: + $(CXX) $(CXXFLAGS) -o uvgrtp/receiver uvgrtp/receiver.cc util/util.cc -luvgrtp -lkvazaar -lpthread + +uvgrtp_sender: + $(CXX) $(CXXFLAGS) -o uvgrtp/sender uvgrtp/sender.cc util/util.cc -luvgrtp -lkvazaar -lpthread + +uvgrtp_latency_sender: + $(CXX) $(CXXFLAGS) -o uvgrtp/latency_sender uvgrtp/latency_sender.cc util/util.cc -luvgrtp -lkvazaar -lpthread + +uvgrtp_latency_receiver: + $(CXX) $(CXXFLAGS) -o uvgrtp/latency_receiver uvgrtp/latency_receiver.cc util/util.cc -luvgrtp -lkvazaar -lpthread + +ffmpeg_sender: + $(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/sender \ + ffmpeg/sender.cc util/util.cc -lkvazaar `pkg-config --libs libavformat` -lpthread + +ffmpeg_receiver: + $(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/receiver \ + ffmpeg/receiver.cc util/util.cc -lkvazaar -lavformat -lavcodec -lswscale -lz -lavutil -lpthread + +ffmpeg_latency_sender: + $(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/latency_sender \ + ffmpeg/latency_sender.cc util/util.cc -lkvazaar `pkg-config --libs libavformat` -lpthread + +ffmpeg_latency_receiver: + $(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/latency_receiver \ + ffmpeg/latency_receiver.cc util/util.cc -lkvazaar -lavformat -lavcodec -lswscale -lz -lavutil -lpthread + +live555_sender: + $(CXX) $(CXXFLAGS) live555/sender.cc live555/source.cc util/util.cc -o live555/sender \ + -I /usr/local/include/liveMedia \ + -I /usr/local/include/groupsock \ + -I /usr/local/include/BasicUsageEnvironment \ + -I /usr/local/include/UsageEnvironment \ + -lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \ + -lUsageEnvironment -lcrypto -lssl + +live555_receiver: + $(CXX) $(CXXFLAGS) live555/receiver.cc live555/sink.cc util/util.cc -o live555/receiver \ + -I /usr/local/include/liveMedia \ + -I /usr/local/include/groupsock \ + -I /usr/local/include/BasicUsageEnvironment \ + -I /usr/local/include/UsageEnvironment \ + -lkvazaar -lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \ + -lUsageEnvironment -lcrypto -lssl + +live555_latency_sender: + $(CXX) $(CXXFLAGS) live555/latency_sender.cc util/util.cc -o live555/latency_sender \ + -I /usr/local/include/liveMedia \ + -I /usr/local/include/groupsock \ + -I /usr/local/include/BasicUsageEnvironment \ + -I /usr/local/include/UsageEnvironment \ + -lkvazaar -lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \ + -lUsageEnvironment -lcrypto -lssl + +live555_latency_receiver: + $(CXX) $(CXXFLAGS) live555/latency_receiver.cc -o live555/latency_receiver \ + -I /usr/local/include/liveMedia \ + -I /usr/local/include/groupsock \ + -I /usr/local/include/BasicUsageEnvironment \ + -I /usr/local/include/UsageEnvironment \ + -lkvazaar -lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \ + -lUsageEnvironment -lcrypto -lssl + +clean: + rm -f uvgrtp/receiver uvgrtp/sender uvgrtp/latency_sender uvgrtp/latency_receiver \ + ffmpeg/receiver ffmpeg/sender ffmpeg/latency_sender ffmpeg/latency_receiver \ + live555/receiver live555/sender live555/latency diff --git a/README.md b/README.md new file mode 100644 index 0000000..27e9ead --- /dev/null +++ b/README.md @@ -0,0 +1,126 @@ +# RTP Benchmarks + +This repository contains all benchmarking code related to the benchmark of uvgRTP against LIVE555 and FFmpeg. + +Directories [uvgrtp](), [ffmpeg](), and [live555]() contain the C++ implementations for RTP (latency) senders and receivers. + +Script benchmark.pl can be used to automate the benchmark runs and its usage is described below. + +Script parse.pl can be used to parse the output of benchmark runs. + +This repository also contains file [udperf.c](udperf.c) which can be used to test the throughput of a network and it was used for the paper to determine the upper limit for UDP traffic using 1500-byte Ethernet frames. + +## Running the benchmarks + +### Example 1 + +Benchmark uvgRTP's send goodput. Run the benchmark configuration with 8 different thread settings +so first starting with 8 threads, then 7 threads then 6 etc. + +Each thread configuration will test all FPS values between the range 30 - 480 and and each FPS +is tested 20 times. FPS is doubled so the tested values are: 30, 60, 120, 480 + +Each FPS value for each thread configuration provides one log file + +Sender +``` +./benchmark.pl \ + --lib uvgrtp \ + --role send \ + --addr 127.0.0.1 \ + --port 9999 \ + --threads 3 \ + --start 30 \ + --end 480 \ + --iter 20 +``` + +Receiver +``` +./benchmark.pl \ + --lib uvgrtp \ + --role recv \ + --use-nc \ + --addr 127.0.0.1 \ + --port 9999 \ + --threads 3 \ + --start 30 \ + --end 480 \ + --iter 20 +``` + +### Example 2 + +Benchmark uvgRTP's send goodput using netcat + +Sender +``` +./benchmark.pl \ + --lib uvgrtp \ + --role send \ + --use-nc \ + --addr 127.0.0.1 \ + --port 9999 \ + --threads 3 \ + --start 30 \ + --end 60 \ +``` + +Receiver +``` +./benchmark.pl \ + --lib uvgrtp \ + --role recv \ + --use-nc \ + --addr 127.0.0.1 \ + --port 9999 \ + --threads 3 \ + --start 30 \ + --end 60 \ +``` + +## Parsing the benchmark results + +If the log file matches the pattern `.*(send|recv).*(\d+)threads.*(\d+)fps.*(\d+)iter.*` you don't +have to provide `--role` `--threads` or `--iter` + +### Parsing one output file + +#### Parse one benchmark, generic file name + +``` +./parse.pl \ + --lib ffmpeg \ + --role recv \ + --path log_file \ + --threads 3 + --iter 20 +``` + +#### Parse one benchmark, output file generated by benchmark.pl + +``` +./parse.pl --path results/uvgrtp/send_results_4threads_240fps_10iter +``` + +### Parsing multiple output files + +NB: path must point to a directory! + +NB2: `--iter` needn't be provided if file names follow the pattern defined above + +#### Find best configuration + +Find the best configurations for maximizing single-thread performance and total performance +where frame loss is less than 2% + +``` +./parse.pl --path results/uvgrtp/all --iter 10 --parse=best --frame-loss=2 +``` + +#### Output goodput/frame loss values to a CSV file + + +``` +./parse.pl --path results/uvgrtp/all --parse=csv +``` diff --git a/benchmark.pl b/benchmark.pl new file mode 100755 index 0000000..3037a3e --- /dev/null +++ b/benchmark.pl @@ -0,0 +1,242 @@ +#!/usr/bin/env perl + +use warnings; +use strict; +use IO::Socket; +use IO::Socket::INET; +use Getopt::Long; + +$| = 1; # autoflush + +my $DEFAULT_ADDR = "10.21.25.200"; +my $DEFAULT_PORT = 9999; + +sub clamp { + my ($start, $end) = @_; + my @clamped = (0, 0); + + $clamped[0] = $start < 30 ? 30 : $start; + $clamped[1] = $end > 5000 ? 5000 : $end; + + return @clamped; +} + +sub mk_ssock { + my $s = IO::Socket::INET->new( + LocalAddr => $_[0], + LocalPort => $_[1], + Proto => "tcp", + Type => SOCK_STREAM, + Listen => 1, + ) or die "Couldn't connect to $_[0]:$_[1]: $@\n"; + + return $s; +} + +sub mk_rsock { + my $s = IO::Socket::INET->new( + PeerAddr => $_[0], + PeerPort => $_[1], + Proto => "tcp", + Type => SOCK_STREAM, + Timeout => 1, + ) or die "Couldn't connect to $_[0]:$_[1]: $@\n"; + + return $s; +} + +sub send_benchmark { + my ($lib, $addr, $port, $iter, $threads, $gen_recv, $mode, $e, @fps_vals) = @_; + my ($socket, $remote, $data); + my @execs = split ",", $e; + + $socket = mk_ssock($addr, $port); + $remote = $socket->accept(); + + foreach (@execs) { + my $exec = $_; + foreach ((1 .. $threads)) { + my $thread = $_; + foreach (@fps_vals) { + my $fps = $_; + my $logname = "send_results_$thread" . "threads_$fps". "fps_$iter" . "iter_$exec"; + + for ((1 .. $iter)) { + $remote->recv($data, 16); + system ("time ./$lib/$exec $addr $thread $fps $mode >> $lib/results/$logname 2>&1"); + $remote->send("end") if $gen_recv; + } + } + } + } +} + +sub recv_benchmark { + my ($lib, $addr, $port, $iter, $threads, $e, @fps_vals) = @_; + my $socket = mk_rsock($addr, $port); + my @execs = split ",", $e; + + foreach (@execs) { + my $exec = $_; + foreach ((1 .. $threads)) { + my $thread = $_; + foreach (@fps_vals) { + my $logname = "recv_results_$thread" . "threads_$_". "fps_$iter" . "iter_$exec"; + for ((1 .. $iter)) { + $socket->send("start"); + system ("time ./$lib/receiver $addr $thread >> $lib/results/$logname 2>&1"); + } + } + } + } +} + +# use netcat to capture the stream +sub recv_generic { + my ($lib, $addr, $port, $iter, $threads, @fps_vals) = @_; + # my ($sfps, $efps) = clamp($start, $end); + my $socket = mk_rsock($addr, $port); + my $ports = ""; + + # spawn N netcats using gnu parallel, send message to sender to start sending, + # wait for message from sender that all the packets have been sent, sleep a tiny bit + # move receiver output from separate files to one common file and proceed to next iteration + $ports .= (8888 + $_ * 2) . " " for ((0 .. $threads - 1)); + + while ($threads ne 0) { + foreach (@fps_vals) { + my $logname = "recv_results_$threads" . "threads_$_". "fps"; + system "parallel --files nc -kluvw 0 $addr ::: $ports &"; + $socket->send("start"); + $socket->recv(my $data, 16); + sleep 1; + system "killall nc"; + + open my $fhz, '>>', "$lib/results/$logname"; + opendir my $dir, "/tmp"; + + foreach my $of (grep (/par.+\.par/i, readdir $dir)) { + print $fhz -s "/tmp/$of"; + print $fhz "\n"; + unlink "/tmp/$of"; + } + closedir $dir; + } + + $threads--; + } +} + +sub lat_send { + my ($lib, $addr, $port) = @_; + my ($socket, $remote, $data); + + $socket = mk_ssock($addr, $port); + $remote = $socket->accept(); + + for ((1 .. 100)) { + $remote->recv($data, 16); + system ("./$lib/latency_sender >> $lib/results/latencies 2>&1"); + } +} + +sub lat_recv { + my ($lib, $addr, $port) = @_; + my $socket = mk_rsock($addr, $port); + + for ((1 .. 100)) { + $socket->send("start"); + system ("./$lib/latency_receiver 2>&1 >/dev/null"); + sleep 2; + } +} + +# TODO explain every parameter +sub print_help { + print "usage (benchmark):\n ./benchmark.pl \n" + . "\t--lib \n" + . "\t--role \n" + . "\t--addr \n" + . "\t--port \n" + . "\t--threads <# of threads>\n" + . "\t--mode \n" + . "\t--start \n" + . "\t--end \n\n"; + + print "usage (latency):\n ./benchmark.pl \n" + . "\t--latency\n" + . "\t--role \n" + . "\t--addr \n" + . "\t--port \n" + . "\t--lib \n\n" and exit; +} + +GetOptions( + "lib|l=s" => \(my $lib = ""), + "role|r=s" => \(my $role = ""), + "addr|a=s" => \(my $addr = ""), + "port|p=i" => \(my $port = 0), + "iter|i=i" => \(my $iter = 10), + "threads|t=i" => \(my $threads = 1), + "start|s=f" => \(my $start = 0), + "end|e=f" => \(my $end = 0), + "step=i" => \(my $step = 0), + "use-nc" => \(my $nc = 0), + "fps=s" => \(my $fps = ""), + "latency" => \(my $lat = 0), + "mode=s" => \(my $mode = "best-effort"), + "exec=s" => \(my $exec = "default"), + "help" => \(my $help = 0) +) or die "failed to parse command line!\n"; + +$port = $DEFAULT_PORT if !$port; +$addr = $DEFAULT_ADDR if !$addr; + +print_help() if $help or !$lib; +print_help() if ((!$start or !$end) and !$fps) and !$lat; +print_help() if not grep /$mode/, ("strict", "best-effort"); + +die "not implemented\n" if !grep (/$lib/, ("uvgrtp", "ffmpeg", "live555")); +my @fps_vals = (); + +if (!$lat) { + if ($fps) { + @fps_vals = split ",", $fps; + } else { + ($start, $end) = clamp($start, $end); + for (my $i = $start; $i <= $end; ) { + push @fps_vals, $i; + + if ($step) { $i += $step; } + else { $i *= 2; } + } + } +} + +if ($role eq "send") { + if ($lat) { + system "make $lib" . "_latency_sender"; + lat_send($lib, $addr, $port); + } else { + if ($exec eq "default") { + system "make $lib" . "_sender"; + $exec = "sender"; + } + send_benchmark($lib, $addr, $port, $iter, $threads, $nc, $mode, $exec, @fps_vals); + } +} elsif ($role eq "recv" ) { + if ($lat) { + system "make $lib" . "_latency_receiver"; + lat_recv($lib, $addr, $port); + } elsif (!$nc) { + if ($exec eq "default") { + system "make $lib" . "_receiver"; + $exec = "receiver"; + } + recv_benchmark($lib, $addr, $port, $iter, $threads, $exec, @fps_vals); + } else { + recv_generic($lib, $addr, $port, $iter, $threads, @fps_vals); + } +} else { + print "invalid role: '$role'\n" and exit; +} diff --git a/ffmpeg/latency_receiver.cc b/ffmpeg/latency_receiver.cc new file mode 100644 index 0000000..86e9b31 --- /dev/null +++ b/ffmpeg/latency_receiver.cc @@ -0,0 +1,208 @@ +extern "C" { +#include +#include + +#include +#include +#include +#include +#include +#include +#include +} + +#include +#include +#include +#include + +extern void *get_mem(int argc, char **argv, size_t& len); + +#define WIDTH 3840 +#define HEIGHT 2160 +#define FPS 200 +#define SLEEP 8 + +std::chrono::high_resolution_clock::time_point fs, fe; +std::atomic received(false); + +struct ffmpeg_ctx { + AVFormatContext *sender; + AVFormatContext *receiver; +}; + +static ffmpeg_ctx *init_ffmpeg(const char *ip) +{ + avcodec_register_all(); + av_register_all(); + avformat_network_init(); + + av_log_set_level(AV_LOG_PANIC); + + ffmpeg_ctx *ctx = new ffmpeg_ctx; + enum AVCodecID codec_id = AV_CODEC_ID_H265; + int i, ret, x, y, got_output; + AVCodecContext *c = NULL; + AVCodec *codec; + AVFrame *frame; + AVPacket pkt; + + codec = avcodec_find_encoder(codec_id); + c = avcodec_alloc_context3(codec); + + c->width = HEIGHT; + c->height = WIDTH; + c->time_base.num = 1; + c->time_base.den = FPS; + c->pix_fmt = AV_PIX_FMT_YUV420P; + c->codec_type = AVMEDIA_TYPE_VIDEO; + c->flags = AV_CODEC_FLAG_GLOBAL_HEADER; + + avcodec_open2(c, codec, NULL); + + frame = av_frame_alloc(); + frame->format = c->pix_fmt; + frame->width = c->width; + frame->height = c->height; + ret = av_image_alloc(frame->data, frame->linesize, c->width, c->height, + c->pix_fmt, 32); + + AVOutputFormat *fmt = av_guess_format("rtp", NULL, NULL); + + ret = avformat_alloc_output_context2(&ctx->sender, fmt, fmt->name, "rtp://10.21.25.200:8889"); + + avio_open(&ctx->sender->pb, ctx->sender->filename, AVIO_FLAG_WRITE); + + struct AVStream* stream = avformat_new_stream(ctx->sender, codec); + stream->codecpar->width = WIDTH; + stream->codecpar->height = HEIGHT; + stream->codecpar->codec_id = AV_CODEC_ID_HEVC; + stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; + stream->time_base.num = 1; + stream->time_base.den = FPS; + + char buf[256]; + AVDictionary *d_s = NULL; + AVDictionary *d_r = NULL; + + snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000); + av_dict_set(&d_s, "buffer_size", buf, 32); + + /* Flush the underlying I/O stream after each packet. + * + * Default is -1 (auto), which means that the underlying protocol will decide, + * 1 enables it, and has the effect of reducing the latency, + * 0 disables it and may increase IO throughput in some cases. */ + snprintf(buf, sizeof(buf), "%d", 1); + av_dict_set(&d_s, "flush_packets", NULL, 32); + + /* Set maximum buffering duration for interleaving. The duration is expressed in microseconds, + * and defaults to 10000000 (10 seconds). + * + * To ensure all the streams are interleaved correctly, libavformat will wait until it has + * at least one packet for each stream before actually writing any packets to the output file. + * When some streams are "sparse" (i.e. there are large gaps between successive packets), + * this can result in excessive buffering. + * + * This field specifies the maximum difference between the timestamps of the first and + * the last packet in the muxing queue, above which libavformat will output a packet regardless of + * whether it has queued a packet for all the streams. + * + * If set to 0, libavformat will continue buffering packets until it has a packet for each stream, + * regardless of the maximum timestamp difference between the buffered packets. */ + snprintf(buf, sizeof(buf), "%d", 1000); + av_dict_set(&d_s, "max_interleave_delta", buf, 32); + + /* avioflags flags (input/output) + * + * Possible values: + * ‘direct’ + * Reduce buffering. */ + snprintf(buf, sizeof(buf), "direct"); + av_dict_set(&d_s, "avioflags", buf, 32); + + (void)avformat_write_header(ctx->sender, &d_s); + + /* When sender has been initialized, initialize receiver */ + ctx->receiver = avformat_alloc_context(); + int video_stream_index; + + av_dict_set(&d_r, "protocol_whitelist", "file,udp,rtp", 0); + + /* input buffer size */ + snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000); + av_dict_set(&d_r, "buffer_size", buf, 32); + + /* avioflags flags (input/output) + * + * Possible values: + * ‘direct’ + * Reduce buffering. */ + snprintf(buf, sizeof(buf), "direct"); + av_dict_set(&d_r, "avioflags", buf, 32); + + /* Reduce the latency introduced by buffering during initial input streams analysis. */ + av_dict_set(&d_r, "nobuffer", NULL, 32); + + /* Set probing size in bytes, i.e. the size of the data to analyze to get stream information. + * + * A higher value will enable detecting more information in case it is dispersed into the stream, + * but will increase latency. Must be an integer not lesser than 32. It is 5000000 by default. */ + snprintf(buf, sizeof(buf), "%d", 32); + av_dict_set(&d_r, "probesize", buf, 32); + + /* Set number of frames used to probe fps. */ + snprintf(buf, sizeof(buf), "%d", 2); + av_dict_set(&d_r, "fpsprobesize", buf, 32); + + ctx->receiver->flags = AVFMT_FLAG_NONBLOCK; + + if (!strcmp(ip, "127.0.0.1")) + snprintf(buf, sizeof(buf), "ffmpeg/sdp/localhost/hevc_0.sdp"); + else + snprintf(buf, sizeof(buf), "ffmpeg/sdp/lan/hevc_0.sdp"); + + if (avformat_open_input(&ctx->receiver, buf, NULL, &d_r) != 0) { + fprintf(stderr, "nothing found!\n"); + return NULL; + } + + if (avformat_find_stream_info(ctx->receiver, NULL) < 0) { + fprintf(stderr, "stream info not found!\n"); + return NULL; + } + + /* search video stream */ + for (size_t i = 0; i < ctx->receiver->nb_streams; i++) { + if (ctx->receiver->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO) + video_stream_index = i; + } + + return ctx; +} + +static int receiver(void) +{ + AVPacket pkt; + ffmpeg_ctx *ctx; + std::string addr("10.21.25.200"); + + if (!(ctx = init_ffmpeg(addr.c_str()))) + return EXIT_FAILURE; + + av_init_packet(&pkt); + av_read_play(ctx->receiver); + + while (av_read_frame(ctx->receiver, &pkt) >= 0) { + av_write_frame(ctx->sender, &pkt); + } + + return EXIT_SUCCESS; +} + +int main(int argc, char **argv) +{ + (void)argc, (void)argv; + + return receiver(); +} diff --git a/ffmpeg/latency_sender.cc b/ffmpeg/latency_sender.cc new file mode 100644 index 0000000..8360d55 --- /dev/null +++ b/ffmpeg/latency_sender.cc @@ -0,0 +1,319 @@ +extern "C" { +#include +#include + +#include +#include +#include +#include +#include +#include +#include +} + +#include +#include +#include +#include +#include +#include + +using namespace std::chrono; + +extern void *get_mem(int argc, char **argv, size_t& len); + +#define WIDTH 3840 +#define HEIGHT 2160 +#define FPS 30 +#define SLEEP 8 + +std::chrono::high_resolution_clock::time_point fs, fe; +std::atomic ready(false); +uint64_t ff_key = 0; + +static std::unordered_map timestamps; +static std::deque timestamps2; + +high_resolution_clock::time_point start2; + +struct ffmpeg_ctx { + AVFormatContext *sender; + AVFormatContext *receiver; +}; + +static ffmpeg_ctx *init_ffmpeg(const char *ip) +{ + avcodec_register_all(); + av_register_all(); + avformat_network_init(); + + av_log_set_level(AV_LOG_PANIC); + + ffmpeg_ctx *ctx = new ffmpeg_ctx; + enum AVCodecID codec_id = AV_CODEC_ID_H265; + int i, ret, x, y, got_output; + AVCodecContext *c = NULL; + AVCodec *codec; + AVFrame *frame; + AVPacket pkt; + + codec = avcodec_find_encoder(codec_id); + c = avcodec_alloc_context3(codec); + + c->width = HEIGHT; + c->height = WIDTH; + c->time_base.num = 1; + c->time_base.den = FPS; + c->pix_fmt = AV_PIX_FMT_YUV420P; + c->codec_type = AVMEDIA_TYPE_VIDEO; + c->flags = AV_CODEC_FLAG_GLOBAL_HEADER; + + avcodec_open2(c, codec, NULL); + + frame = av_frame_alloc(); + frame->format = c->pix_fmt; + frame->width = c->width; + frame->height = c->height; + ret = av_image_alloc(frame->data, frame->linesize, c->width, c->height, + c->pix_fmt, 32); + + AVOutputFormat *fmt = av_guess_format("rtp", NULL, NULL); + + ret = avformat_alloc_output_context2(&ctx->sender, fmt, fmt->name, "rtp://10.21.25.2:8888"); + + avio_open(&ctx->sender->pb, ctx->sender->filename, AVIO_FLAG_WRITE); + + struct AVStream* stream = avformat_new_stream(ctx->sender, codec); + stream->codecpar->width = WIDTH; + stream->codecpar->height = HEIGHT; + stream->codecpar->codec_id = AV_CODEC_ID_HEVC; + stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; + stream->time_base.num = 1; + stream->time_base.den = FPS; + + char buf[256]; + AVDictionary *d_s = NULL; + AVDictionary *d_r = NULL; + + snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000); + av_dict_set(&d_s, "buffer_size", buf, 32); + +#if 1 + /* Flush the underlying I/O stream after each packet. + * + * Default is -1 (auto), which means that the underlying protocol will decide, + * 1 enables it, and has the effect of reducing the latency, + * 0 disables it and may increase IO throughput in some cases. */ + snprintf(buf, sizeof(buf), "%d", 1); + av_dict_set(&d_s, "flush_packets", NULL, 32); + + /* Set maximum buffering duration for interleaving. The duration is expressed in microseconds, + * and defaults to 10000000 (10 seconds). + * + * To ensure all the streams are interleaved correctly, libavformat will wait until it has + * at least one packet for each stream before actually writing any packets to the output file. + * When some streams are "sparse" (i.e. there are large gaps between successive packets), + * this can result in excessive buffering. + * + * This field specifies the maximum difference between the timestamps of the first and + * the last packet in the muxing queue, above which libavformat will output a packet regardless of + * whether it has queued a packet for all the streams. + * + * If set to 0, libavformat will continue buffering packets until it has a packet for each stream, + * regardless of the maximum timestamp difference between the buffered packets. */ + snprintf(buf, sizeof(buf), "%d", 1000); + av_dict_set(&d_s, "max_interleave_delta", buf, 32); + + /* avioflags flags (input/output) + * + * Possible values: + * ‘direct’ + * Reduce buffering. */ + snprintf(buf, sizeof(buf), "direct"); + av_dict_set(&d_s, "avioflags", buf, 32); +#endif + + (void)avformat_write_header(ctx->sender, &d_s); + + /* When sender has been initialized, initialize receiver */ + ctx->receiver = avformat_alloc_context(); + int video_stream_index; + + av_dict_set(&d_r, "protocol_whitelist", "file,udp,rtp", 0); + + /* input buffer size */ + snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000); + av_dict_set(&d_r, "buffer_size", buf, 32); + +#if 1 + /* avioflags flags (input/output) + * + * Possible values: + * ‘direct’ + * Reduce buffering. */ + snprintf(buf, sizeof(buf), "direct"); + av_dict_set(&d_r, "avioflags", buf, 32); + + /* Reduce the latency introduced by buffering during initial input streams analysis. */ + av_dict_set(&d_r, "nobuffer", NULL, 32); + + /* Set probing size in bytes, i.e. the size of the data to analyze to get stream information. + * + * A higher value will enable detecting more information in case it is dispersed into the stream, + * but will increase latency. Must be an integer not lesser than 32. It is 5000000 by default. */ + snprintf(buf, sizeof(buf), "%d", 32); + av_dict_set(&d_r, "probesize", buf, 32); + + /* Set number of frames used to probe fps. */ + snprintf(buf, sizeof(buf), "%d", 2); + av_dict_set(&d_r, "fpsprobesize", buf, 32); +#endif + + ctx->receiver->flags = AVFMT_FLAG_NONBLOCK; + + if (!strcmp(ip, "127.0.0.1")) + snprintf(buf, sizeof(buf), "ffmpeg/sdp/localhost/lat_hevc.sdp"); + else + snprintf(buf, sizeof(buf), "ffmpeg/sdp/lan/lat_hevc.sdp"); + + if (avformat_open_input(&ctx->receiver, buf, NULL, &d_r) != 0) { + fprintf(stderr, "nothing found!\n"); + return NULL; + } + + for (size_t i = 0; i < ctx->receiver->nb_streams; i++) { + if (ctx->receiver->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO) + video_stream_index = i; + } + + return ctx; +} + +static void receiver(ffmpeg_ctx *ctx) +{ + uint64_t key = 0; + uint64_t diff = 0; + + uint64_t frame_total = 0; + uint64_t intra_total = 0; + uint64_t inter_total = 0; + + uint64_t frames = 0; + uint64_t intras = 0; + uint64_t inters = 0; + + AVPacket packet; + av_init_packet(&packet); + + std::chrono::high_resolution_clock::time_point start; + start = std::chrono::high_resolution_clock::now(); + + /* start reading packets from stream */ + av_read_play(ctx->receiver); + + while (av_read_frame(ctx->receiver, &packet) >= 0) { + key = packet.size - 1; + + if (!frames) + key = ff_key; + + auto diff = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - timestamps2.front() + ).count(); + timestamps2.pop_front(); + + if (((packet.data[3] >> 1) & 0x3f) == 19) + intra_total += (diff / 1000), intras++; + else if (((packet.data[3] >> 1) & 0x3f) == 1) + inter_total += (diff / 1000), inters++; + + if (++frames < 596) + frame_total += (diff / 1000); + else + break; + + timestamps.erase(key); + + av_free_packet(&packet); + av_init_packet(&packet); + } + + fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n", + frames, + intra_total / (float)intras, + inter_total / (float)inters, + frame_total / (float)frames + ); + + ready = true; +} + +static int sender(void) +{ + size_t len = 0; + void *mem = get_mem(0, NULL, len); + + std::string addr("10.21.25.2"); + ffmpeg_ctx *ctx = init_ffmpeg(addr.c_str()); + + (void)new std::thread(receiver, ctx); + + uint64_t chunk_size = 0; + uint64_t diff = 0; + uint64_t counter = 0; + uint64_t total = 0; + uint64_t current = 0; + uint64_t key = 0; + uint64_t period = (uint64_t)((1000 / (float)FPS) * 1000); + + AVPacket pkt; + std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); + + for (size_t i = 0; i < len; ) { + memcpy(&chunk_size, (uint8_t *)mem + i, sizeof(uint64_t)); + + /* Start code lookup/merging of small packets causes the incoming frame size + * to differ quite significantly from "chunk_size" */ + if (!i) + ff_key = chunk_size; + + i += sizeof(uint64_t); + + if (timestamps.find(chunk_size) != timestamps.end()) { + fprintf(stderr, "cannot use %zu for key!\n", chunk_size); + continue; + } + + timestamps[chunk_size] = std::chrono::high_resolution_clock::now(); + timestamps2.push_back(std::chrono::high_resolution_clock::now()); + + av_init_packet(&pkt); + pkt.data = (uint8_t *)mem + i; + pkt.size = chunk_size; + + av_interleaved_write_frame(ctx->sender, &pkt); + av_packet_unref(&pkt); + + auto runtime = (uint64_t)std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count(); + + if (runtime < current * period) + std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime)); + + current++; + i += chunk_size; + } + + while (!ready.load()) + ; + + return 0; +} + +int main(int argc, char **argv) +{ + (void)argc, (void)argv; + + return sender(); +} diff --git a/ffmpeg/receiver.cc b/ffmpeg/receiver.cc new file mode 100644 index 0000000..f1f41f0 --- /dev/null +++ b/ffmpeg/receiver.cc @@ -0,0 +1,151 @@ +extern "C" { +#include +#include +#include +#include +} + +#include +#include +#include +#include + +struct thread_info { + size_t pkts; + size_t bytes; + std::chrono::high_resolution_clock::time_point start; +} *thread_info; + +std::atomic nready(0); + +void thread_func(char *addr, int thread_num) +{ + AVFormatContext *format_ctx = avformat_alloc_context(); + AVCodecContext *codec_ctx = NULL; + int video_stream_index = 0; + + /* register everything */ + av_register_all(); + avformat_network_init(); + + av_log_set_level(AV_LOG_PANIC); + + /* open rtsp */ + AVDictionary *d = NULL; + av_dict_set(&d, "protocol_whitelist", "file,udp,rtp", 0); + + char buf[256]; + + /* input buffer size */ + snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000); + av_dict_set(&d, "buffer_size", buf, 32); + +#if 1 + snprintf(buf, sizeof(buf), "%d", 10000000); + av_dict_set(&d, "max_delay", buf, 32); + + snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000); + av_dict_set(&d, "recv_buffer_size", buf, 32); + + snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000); + av_dict_set(&d, "rcvbuf", buf, 32); + + /* avioflags flags (input/output) + * + * Possible values: + * ‘direct’ + * Reduce buffering. */ + snprintf(buf, sizeof(buf), "direct"); + av_dict_set(&d, "avioflags", buf, 32); + + /* Reduce the latency introduced by buffering during initial input streams analysis. */ + av_dict_set(&d, "nobuffer", NULL, 32); + + /* Set probing size in bytes, i.e. the size of the data to analyze to get stream information. + * + * A higher value will enable detecting more information in case it is dispersed into the stream, + * but will increase latency. Must be an integer not lesser than 32. It is 5000000 by default. */ + snprintf(buf, sizeof(buf), "%d", 32); + av_dict_set(&d, "probesize", buf, 32); + + /* Set number of frames used to probe fps. */ + snprintf(buf, sizeof(buf), "%d", 2); + av_dict_set(&d, "fpsprobesize", buf, 32); +#endif + + if (!strcmp(addr, "127.0.0.1")) + snprintf(buf, sizeof(buf), "ffmpeg/sdp/localhost/hevc_%d.sdp", thread_num / 2); + else + snprintf(buf, sizeof(buf), "ffmpeg/sdp/lan/hevc_%d.sdp", thread_num / 2); + + if (avformat_open_input(&format_ctx, buf, NULL, &d)) { + fprintf(stderr, "failed to open input file\n"); + nready++; + return; + } + + if (avformat_find_stream_info(format_ctx, NULL) < 0) { + fprintf(stderr, "failed to find stream info!\n"); + nready++; + return; + } + + for (size_t i = 0; i < format_ctx->nb_streams; i++) { + if (format_ctx->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO) + video_stream_index = i; + } + + size_t pkts = 0; + size_t size = 0; + AVPacket packet; + av_init_packet(&packet); + + std::chrono::high_resolution_clock::time_point start, last; + start = std::chrono::high_resolution_clock::now(); + + /* start reading packets from stream */ + av_read_play(format_ctx); + + while (av_read_frame(format_ctx, &packet) >= 0) { + if (packet.stream_index == video_stream_index) + size += packet.size; + + av_free_packet(&packet); + av_init_packet(&packet); + + if (++pkts == 598) + break; + else + last = std::chrono::high_resolution_clock::now(); + } + + if (pkts == 598) { + fprintf(stderr, "%zu %zu %zu\n", size, pkts, + std::chrono::duration_cast(last - start).count() + ); + } else { + fprintf(stderr, "discard %zu %zu %zu\n", size, pkts, + std::chrono::duration_cast(last - start).count() + ); + } + + av_read_pause(format_ctx); + nready++; +} + +int main(int argc, char **argv) +{ + if (argc != 3) { + fprintf(stderr, "usage: ./%s \n", __FILE__); + return -1; + } + + int nthreads = atoi(argv[2]); + thread_info = (struct thread_info *)calloc(nthreads, sizeof(*thread_info)); + + for (int i = 0; i < nthreads; ++i) + new std::thread(thread_func, argv[1], i * 2); + + while (nready.load() != nthreads) + std::this_thread::sleep_for(std::chrono::milliseconds(20)); +} diff --git a/ffmpeg/sdp/lan/hevc_0.sdp b/ffmpeg/sdp/lan/hevc_0.sdp new file mode 100644 index 0000000..50ee827 --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_0.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8888 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/hevc_1.sdp b/ffmpeg/sdp/lan/hevc_1.sdp new file mode 100644 index 0000000..3fde411 --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_1.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8890 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/hevc_2.sdp b/ffmpeg/sdp/lan/hevc_2.sdp new file mode 100644 index 0000000..3bb6f2e --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_2.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8892 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/hevc_3.sdp b/ffmpeg/sdp/lan/hevc_3.sdp new file mode 100644 index 0000000..5478b47 --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_3.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8894 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/hevc_4.sdp b/ffmpeg/sdp/lan/hevc_4.sdp new file mode 100644 index 0000000..af1f35b --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_4.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8896 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/hevc_5.sdp b/ffmpeg/sdp/lan/hevc_5.sdp new file mode 100644 index 0000000..47d9012 --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_5.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8898 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/hevc_6.sdp b/ffmpeg/sdp/lan/hevc_6.sdp new file mode 100644 index 0000000..98778d2 --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_6.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8900 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/hevc_7.sdp b/ffmpeg/sdp/lan/hevc_7.sdp new file mode 100644 index 0000000..1970553 --- /dev/null +++ b/ffmpeg/sdp/lan/hevc_7.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8902 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/lan/lat_hevc.sdp b/ffmpeg/sdp/lan/lat_hevc.sdp new file mode 100644 index 0000000..e20b56c --- /dev/null +++ b/ffmpeg/sdp/lan/lat_hevc.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 10.21.25.200 +s=No Name +c=IN IP4 10.21.25.200 +t=0 0 +m=video 8889 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_0.sdp b/ffmpeg/sdp/localhost/hevc_0.sdp new file mode 100644 index 0000000..0634b97 --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_0.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8888 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_1.sdp b/ffmpeg/sdp/localhost/hevc_1.sdp new file mode 100644 index 0000000..d7f1407 --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_1.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8890 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_2.sdp b/ffmpeg/sdp/localhost/hevc_2.sdp new file mode 100644 index 0000000..8b30095 --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_2.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8892 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_3.sdp b/ffmpeg/sdp/localhost/hevc_3.sdp new file mode 100644 index 0000000..5f66a36 --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_3.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8894 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_4.sdp b/ffmpeg/sdp/localhost/hevc_4.sdp new file mode 100644 index 0000000..85e2ac7 --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_4.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8896 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_5.sdp b/ffmpeg/sdp/localhost/hevc_5.sdp new file mode 100644 index 0000000..e691ca6 --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_5.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8898 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_6.sdp b/ffmpeg/sdp/localhost/hevc_6.sdp new file mode 100644 index 0000000..9ed0d1d --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_6.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8900 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/hevc_7.sdp b/ffmpeg/sdp/localhost/hevc_7.sdp new file mode 100644 index 0000000..9167d33 --- /dev/null +++ b/ffmpeg/sdp/localhost/hevc_7.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8902 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sdp/localhost/lat_hevc.sdp b/ffmpeg/sdp/localhost/lat_hevc.sdp new file mode 100644 index 0000000..a618f89 --- /dev/null +++ b/ffmpeg/sdp/localhost/lat_hevc.sdp @@ -0,0 +1,8 @@ +v=0 +o=user 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +m=video 8889 RTP/AVP 96 +a=rtpmap:96 H265/90000 +a=recvonly diff --git a/ffmpeg/sender.cc b/ffmpeg/sender.cc new file mode 100644 index 0000000..af647d9 --- /dev/null +++ b/ffmpeg/sender.cc @@ -0,0 +1,159 @@ +extern "C" { +#include +#include + +#include +#include +#include +#include +#include +#include +#include +} +#include +#include +#include + +extern void *get_mem(int argc, char **argv, size_t& len); + +#define WIDTH 3840 +#define HEIGHT 2160 + +std::atomic nready(0); + +void thread_func(void *mem, size_t len, char *addr_, int thread_num, double fps, bool strict) +{ + char addr[64] = { 0 }; + enum AVCodecID codec_id = AV_CODEC_ID_H265; + AVCodec *codec; + AVCodecContext *c = NULL; + int i, ret, x, y, got_output; + AVFrame *frame; + AVPacket pkt; + + codec = avcodec_find_encoder(codec_id); + c = avcodec_alloc_context3(codec); + + av_log_set_level(AV_LOG_PANIC); + + c->width = HEIGHT; + c->height = WIDTH; + c->time_base.num = 1; + c->time_base.den = fps; + c->pix_fmt = AV_PIX_FMT_YUV420P; + c->codec_type = AVMEDIA_TYPE_VIDEO; + c->flags = AV_CODEC_FLAG_GLOBAL_HEADER; + + avcodec_open2(c, codec, NULL); + + frame = av_frame_alloc(); + frame->format = c->pix_fmt; + frame->width = c->width; + frame->height = c->height; + ret = av_image_alloc(frame->data, frame->linesize, c->width, c->height, + c->pix_fmt, 32); + + AVFormatContext* avfctx; + AVOutputFormat* fmt = av_guess_format("rtp", NULL, NULL); + + snprintf(addr, 64, "rtp://10.21.25.2:%d", 8888 + thread_num); + ret = avformat_alloc_output_context2(&avfctx, fmt, fmt->name, addr); + + avio_open(&avfctx->pb, avfctx->filename, AVIO_FLAG_WRITE); + + struct AVStream* stream = avformat_new_stream(avfctx, codec); + /* stream->codecpar->bit_rate = 400000; */ + stream->codecpar->width = WIDTH; + stream->codecpar->height = HEIGHT; + stream->codecpar->codec_id = AV_CODEC_ID_HEVC; + stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; + stream->time_base.num = 1; + stream->time_base.den = fps; + + (void)avformat_write_header(avfctx, NULL); + + uint64_t chunk_size, total_size; + uint64_t fpt_ms = 0; + uint64_t fsize = 0; + uint32_t frames = 0; + uint64_t diff = 0; + uint64_t current = 0; + uint64_t period = (uint64_t)((1000 / (float)fps) * 1000); + + std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); + + for (size_t rounds = 0; rounds < 1; ++rounds) { + for (size_t i = 0; i < len; ) { + memcpy(&chunk_size, (uint8_t *)mem + i, sizeof(uint64_t)); + + i += sizeof(uint64_t); + total_size += chunk_size; + + av_init_packet(&pkt); + pkt.data = (uint8_t *)mem + i; + pkt.size = chunk_size; + + av_interleaved_write_frame(avfctx, &pkt); + av_packet_unref(&pkt); + + auto runtime = (uint64_t)std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count(); + + if (runtime < current * period) + std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime)); + + frames++; + current++; + i += chunk_size; + fsize += chunk_size; + } + } + diff = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count(); + + fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n", + fsize, fsize / 1000, fsize / 1000 / 1000, + diff, diff / 1000 + ); + +end: + nready++; + + avcodec_close(c); + av_free(c); + av_freep(&frame->data[0]); + av_frame_free(&frame); +} + +int main(int argc, char **argv) +{ + if (argc != 5) { + fprintf(stderr, "usage: ./%s \n", __FILE__); + return -1; + } + + avcodec_register_all(); + av_register_all(); + avformat_network_init(); + + size_t len = 0; + void *mem = get_mem(0, NULL, len); + int nthreads = atoi(argv[2]); + bool strict = !strcmp(argv[4], "strict"); + std::thread **threads = (std::thread **)malloc(sizeof(std::thread *) * nthreads); + + for (int i = 0; i < nthreads; ++i) + threads[i] = new std::thread(thread_func, mem, len, argv[1], i * 2, atof(argv[3]), strict); + + while (nready.load() != nthreads) + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + for (int i = 0; i < nthreads; ++i) { + threads[i]->join(); + delete threads[i]; + } + free(threads); + +} diff --git a/live555/latency.cc b/live555/latency.cc new file mode 100644 index 0000000..5c353ec --- /dev/null +++ b/live555/latency.cc @@ -0,0 +1,88 @@ +#include +#include +#include +#include +#include +#include "latsink.hh" +#include "latsource.hh" +#include "sink.hh" +#include "source.hh" + +static int receiver(char *addr) +{ + (void)addr; + + TaskScheduler *scheduler = BasicTaskScheduler::createNew(); + UsageEnvironment *env = BasicUsageEnvironment::createNew(*scheduler); + + Port rtpPort(8888); + struct in_addr dst_addr; + dst_addr.s_addr = our_inet_addr("0.0.0.0"); + Groupsock rtpGroupsock(*env, dst_addr, rtpPort, 255); + + OutPacketBuffer::maxSize = 40 * 1000 * 1000; + + RTPSource *source = H265VideoRTPSource::createNew(*env, &rtpGroupsock, 96); + RTPLatencySink *sink = new RTPLatencySink(*env); + + sink->startPlaying(*source, nullptr, nullptr); + env->taskScheduler().doEventLoop(); + + return 0; +} + +static int sender(char *addr) +{ + (void)addr; + + std::mutex lat_mtx; + H265VideoStreamDiscreteFramer *framer; + H265LatencyFramedSource *framedSource; + TaskScheduler *scheduler; + UsageEnvironment *env; + RTPSink *videoSink; + RTPSource *source; + RTPSink_ *sink; + + scheduler = BasicTaskScheduler::createNew(); + env = BasicUsageEnvironment::createNew(*scheduler); + + OutPacketBuffer::maxSize = 40 * 1000 * 1000; + + Port send_port(8889); + struct in_addr dst_addr; + dst_addr.s_addr = our_inet_addr("127.0.0.1"); + + Port recv_port(8888); + struct in_addr src_addr; + src_addr.s_addr = our_inet_addr("127.0.0.1"); + + Groupsock send_socket(*env, dst_addr, send_port, 255); + Groupsock recv_socket(*env, src_addr, recv_port, 255); + + /* sender */ + videoSink = H265VideoRTPSink::createNew(*env, &send_socket, 96); + framedSource = H265LatencyFramedSource::createNew(*env, lat_mtx); + framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource); + + /* receiver */ + source = H265VideoRTPSource::createNew(*env, &recv_socket, 96); + sink = new RTPSink_(*env); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + videoSink->startPlaying(*framer, NULL, videoSink); + sink->startPlaying(*source, nullptr, nullptr); + env->taskScheduler().doEventLoop(); + + return 0; +} + +int main(int argc, char **argv) +{ + if (argc != 3) { + fprintf(stderr, "usage: ./%s \n", __FILE__); + exit(EXIT_FAILURE); + } + + return !strcmp(argv[1], "send") ? sender(argv[2]) : receiver(argv[2]); +} diff --git a/live555/latency_receiver.cc b/live555/latency_receiver.cc new file mode 100644 index 0000000..5313a79 --- /dev/null +++ b/live555/latency_receiver.cc @@ -0,0 +1,255 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "latsink.hh" +#include "source.hh" + +#define BUFFER_SIZE 40 * 1000 * 1000 + +EventTriggerId H265FramedSource::eventTriggerId = 0; +unsigned H265FramedSource::referenceCount = 0; +H265FramedSource *framedSource; + +static size_t frames = 0; +static size_t bytes = 0; + +static uint8_t *nal_ptr = nullptr; +static size_t nal_size = 0; + +static std::mutex lat_mtx; +static std::chrono::high_resolution_clock::time_point start; +static std::chrono::high_resolution_clock::time_point last; + +static uint8_t *buf; +static size_t offset = 0; +/* static size_t bytes = 0; */ +static uint64_t current = 0; +static uint64_t period = 0; + +std::chrono::high_resolution_clock::time_point s_tmr, e_tmr; + +static void thread_func(void) +{ + unsigned prev_frames = UINT_MAX; + + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + + if (prev_frames == frames) + break; + + prev_frames = frames; + } + + fprintf(stderr, "%zu %zu %lu\n", bytes, frames, + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count() + ); + + exit(EXIT_FAILURE); +} + +RTPLatencySink::RTPLatencySink(UsageEnvironment& env): + MediaSink(env) +{ + fReceiveBuffer = new uint8_t[BUFFER_SIZE]; +} + +RTPLatencySink::~RTPLatencySink() +{ +} + +void RTPLatencySink::uninit() +{ + stopPlaying(); + delete fReceiveBuffer; +} + +void RTPLatencySink::afterGettingFrame( + void *clientData, + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + ((RTPLatencySink *)clientData)->afterGettingFrame( + frameSize, + numTruncatedBytes, + presentationTime, + durationInMicroseconds + ); +} + +void RTPLatencySink::afterGettingFrame( + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + (void)frameSize, (void)numTruncatedBytes; + (void)presentationTime, (void)durationInMicroseconds; + + /* start loop that monitors activity and if there has been + * no activity for 2s (same as uvgRTP) the receiver is stopped) */ + if (!frames) + (void)new std::thread(thread_func); + + //fprintf(stderr, "got frame %zu %zu\n", frames + 1, frameSize); + + nal_ptr = fReceiveBuffer; + nal_size = frameSize; + + lat_mtx.unlock(); + framedSource->deliver_frame(); + + if (++frames == 602) { + fprintf(stderr, "%zu %zu %lu\n", bytes, frames, + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count() + ); + exit(EXIT_SUCCESS); + } + + continuePlaying(); +} + +void RTPLatencySink::process() +{ +} + +Boolean RTPLatencySink::continuePlaying() +{ + if (!fSource) + return False; + + fSource->getNextFrame( + fReceiveBuffer, + BUFFER_SIZE, + afterGettingFrame, + this, + onSourceClosure, + this + ); + + return True; +} + +H265FramedSource *H265FramedSource::createNew(UsageEnvironment& env, unsigned fps) +{ + return new H265FramedSource(env, fps); +} + +H265FramedSource::H265FramedSource(UsageEnvironment& env, unsigned fps): + FramedSource(env), + fps_(fps) +{ + period = (uint64_t)((1000 / fps) * 1000); + + if (!eventTriggerId) + eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0); +} + +void H265FramedSource::deliver_frame() +{ + deliverFrame(); +} + +H265FramedSource::~H265FramedSource() +{ + if (!--referenceCount) { + envir().taskScheduler().deleteEventTrigger(eventTriggerId); + eventTriggerId = 0; + } +} + +void H265FramedSource::doGetNextFrame() +{ + deliverFrame(); +} + +void H265FramedSource::deliverFrame0(void *clientData) +{ + ((H265FramedSource *)clientData)->deliverFrame(); +} + +void H265FramedSource::deliverFrame() +{ + if (!isCurrentlyAwaitingData()) + return; + + if (!lat_mtx.try_lock()) + return; + + uint8_t *newFrameDataStart = nal_ptr; + unsigned newFrameSize = nal_size; + + bytes += newFrameSize; + + if (newFrameSize > fMaxSize) { + fFrameSize = fMaxSize; + fNumTruncatedBytes = newFrameSize - fMaxSize; + } else { + fFrameSize = newFrameSize; + } + + fDurationInMicroseconds = 0; + memmove(fTo, newFrameDataStart, fFrameSize); + + FramedSource::afterGetting(this); +} + +static int receiver(void) +{ + H265VideoStreamDiscreteFramer *framer; + TaskScheduler *scheduler; + UsageEnvironment *env; + RTPLatencySink *sink; + struct in_addr addr; + RTPSink *videoSink; + RTPSource *source; + + scheduler = BasicTaskScheduler::createNew(); + env = BasicUsageEnvironment::createNew(*scheduler); + + OutPacketBuffer::maxSize = 40 * 1000 * 1000; + lat_mtx.lock(); + + /* receiver */ + addr.s_addr = our_inet_addr("0.0.0.0"); + Groupsock recv_sock(*env, addr, Port(8888), 255); + + source = H265VideoRTPSource::createNew(*env, &recv_sock, 96); + sink = new RTPLatencySink(*env); + + /* sender */ + addr.s_addr = our_inet_addr("10.21.25.200"); + Groupsock send_socket(*env, addr, Port(8889), 255); + + framedSource = H265FramedSource::createNew(*env, 30); + framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource); + videoSink = H265VideoRTPSink::createNew(*env, &send_socket, 96); + + videoSink->startPlaying(*framer, NULL, videoSink); + sink->startPlaying(*source, nullptr, nullptr); + env->taskScheduler().doEventLoop(); + + return 0; +} + +int main(int argc, char **argv) +{ + (void)argc, (void)argv; + + return receiver(); +} diff --git a/live555/latency_sender.cc b/live555/latency_sender.cc new file mode 100644 index 0000000..046dfaa --- /dev/null +++ b/live555/latency_sender.cc @@ -0,0 +1,392 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "latsource.hh" +#include "sink.hh" + +using namespace std::chrono; + +#define BUFFER_SIZE 40 * 1000 * 1000 + +#define KEY(frame, size) ((((frame) % 64) << 26) + (size)) + +EventTriggerId H265LatencyFramedSource::eventTriggerId = 0; +unsigned H265LatencyFramedSource::referenceCount = 0; + +extern void *get_mem(int, char **, size_t&); +extern int get_next_frame_start(uint8_t *, uint32_t, uint32_t, uint8_t&); + +static uint64_t current = 0; +static uint64_t period = 0; +static bool initialized = false; + +static size_t frames = 0; +static size_t nintras = 0; +static size_t ninters = 0; + +static size_t intra_total = 0; +static size_t inter_total = 0; +static size_t frame_total = 0; + +static std::mutex lat_mtx; +static std::queue> nals; +static high_resolution_clock::time_point s_tmr, start; + +typedef std::pair finfo; +static std::unordered_map timestamps; + +static const uint8_t *ff_avc_find_startcode_internal(const uint8_t *p, const uint8_t *end) +{ + const uint8_t *a = p + 4 - ((intptr_t)p & 3); + + for (end -= 3; p < a && p < end; p++) { + if (p[0] == 0 && p[1] == 0 && p[2] == 1) + return p; + } + + for (end -= 3; p < end; p += 4) { + uint32_t x = *(const uint32_t*)p; +// if ((x - 0x01000100) & (~x) & 0x80008000) // little endian +// if ((x - 0x00010001) & (~x) & 0x00800080) // big endian + if ((x - 0x01010101) & (~x) & 0x80808080) { // generic + if (p[1] == 0) { + if (p[0] == 0 && p[2] == 1) + return p; + if (p[2] == 0 && p[3] == 1) + return p+1; + } + if (p[3] == 0) { + if (p[2] == 0 && p[4] == 1) + return p+2; + if (p[4] == 0 && p[5] == 1) + return p+3; + } + } + } + + for (end += 3; p < end; p++) { + if (p[0] == 0 && p[1] == 0 && p[2] == 1) + return p; + } + + return end + 3; +} + +const uint8_t *ff_avc_find_startcode(const uint8_t *p, const uint8_t *end) +{ + const uint8_t *out= ff_avc_find_startcode_internal(p, end); + if (p < out && out < end && !out[-1]) out--; + return out; +} + +static std::pair find_next_nal(void) +{ + static size_t len = 0; + static uint8_t *p = NULL; + static uint8_t *end = NULL; + static uint8_t *nal_start = NULL; + static uint8_t *nal_end = NULL; + + if (!p) { + p = (uint8_t *)get_mem(0, NULL, len); + end = p + len; + len = 0; + + nal_start = (uint8_t *)ff_avc_find_startcode(p, end); + } + + while (nal_start < end && !*(nal_start++)) + ; + + if (nal_start == end) + return std::make_pair(0, nullptr); + + nal_end = (uint8_t *)ff_avc_find_startcode(nal_start, end); + auto ret = std::make_pair((size_t)(nal_end - nal_start), (uint8_t *)nal_start); + len += 4 + nal_end - nal_start; + nal_start = nal_end; + + return ret; +} + +H265LatencyFramedSource *H265LatencyFramedSource::createNew(UsageEnvironment& env) +{ + return new H265LatencyFramedSource(env); +} + +H265LatencyFramedSource::H265LatencyFramedSource(UsageEnvironment& env): + FramedSource(env) +{ + period = (uint64_t)((1000 / (float)30) * 1000); + + if (!eventTriggerId) + eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0); +} + +H265LatencyFramedSource::~H265LatencyFramedSource() +{ + if (!--referenceCount) { + envir().taskScheduler().deleteEventTrigger(eventTriggerId); + eventTriggerId = 0; + } +} + +void H265LatencyFramedSource::doGetNextFrame() +{ + if (!initialized) { + s_tmr = std::chrono::high_resolution_clock::now(); + initialized = true; + } + + deliverFrame(); +} + +void H265LatencyFramedSource::deliverFrame0(void *clientData) +{ + ((H265LatencyFramedSource *)clientData)->deliverFrame(); +} + +void H265LatencyFramedSource::deliverFrame() +{ + if (!isCurrentlyAwaitingData()) + return; + + auto nal = find_next_nal(); + + if (!nal.first || !nal.second) + return; + + uint64_t runtime = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - s_tmr + ).count(); + + if (runtime < current * period) + std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime)); + + /* try to hold fps for intra/inter frames only */ + if (nal.first > 1500) + ++current; + + /* Start timer for the frame + * RTP sink will calculate the time difference once the frame is received */ + /* printf("send frame %u, size %u\n", current, nal.first); */ + + uint64_t key = nal.first; + + if (timestamps.find(key) != timestamps.end()) { + fprintf(stderr, "cannot use size as timestamp for this frame!\n"); + exit(EXIT_FAILURE); + } + timestamps[key].first = std::chrono::high_resolution_clock::now(); + timestamps[key].second = nal.first; + + uint8_t *newFrameDataStart = nal.second; + unsigned newFrameSize = nal.first; + + if (newFrameSize > fMaxSize) { + fFrameSize = fMaxSize; + fNumTruncatedBytes = newFrameSize - fMaxSize; + } else { + fFrameSize = newFrameSize; + } + + fDurationInMicroseconds = 0; + memmove(fTo, newFrameDataStart, fFrameSize); + + FramedSource::afterGetting(this); +} + +static void thread_func(void) +{ + unsigned prev_frames = UINT_MAX; + + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + + if (prev_frames == frames) { + /* fprintf(stderr, "frame lost\n"); */ + break; + } + + prev_frames = frames; + } + + fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n", + frames, + intra_total / (float)nintras, + inter_total / (float)ninters, + frame_total / (float)frames + ); + + exit(EXIT_FAILURE); +} + +RTPSink_::RTPSink_(UsageEnvironment& env): + MediaSink(env) +{ + fReceiveBuffer = new uint8_t[BUFFER_SIZE]; +} + +RTPSink_::~RTPSink_() +{ +} + +void RTPSink_::uninit() +{ + stopPlaying(); + delete fReceiveBuffer; +} + +void RTPSink_::afterGettingFrame( + void *clientData, + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + ((RTPSink_ *)clientData)->afterGettingFrame( + frameSize, + numTruncatedBytes, + presentationTime, + durationInMicroseconds + ); +} + +void RTPSink_::afterGettingFrame( + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + (void)frameSize, (void)numTruncatedBytes; + (void)presentationTime, (void)durationInMicroseconds; + + /* start loop that monitors activity and if there has been + * no activity for 2s (same as uvgRTP) the receiver is stopped) */ + if (!frames) + (void)new std::thread(thread_func); + + uint64_t diff; + uint8_t nal_type; + + uint64_t key = frameSize; + /* printf("recv frame %zu, size %u\n", frames + 1, frameSize); */ + + if (timestamps.find(key) == timestamps.end()) { + printf("frame %zu,%zu not found from set!\n", frames + 1, key); + exit(EXIT_FAILURE); + } + + if (timestamps[key].second != frameSize) { + printf("frame size mismatch (%zu vs %zu)\n", timestamps[key].second, frameSize); + exit(EXIT_FAILURE); + } + + diff = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - timestamps[key].first + ).count(); + timestamps.erase(key); + + nal_type = (fReceiveBuffer[0] >> 1) & 0x3f; + + if (nal_type == 19 || nal_type == 1) { + if (nal_type == 19) + nintras++, intra_total += (diff / 1000); + else + ninters++, inter_total += (diff / 1000); + frame_total += (diff / 1000); + } + + if (++frames == 601) { + fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n", + frames, + intra_total / (float)nintras, + inter_total / (float)ninters, + frame_total / (float)frames + ); + exit(EXIT_SUCCESS); + } + + continuePlaying(); +} + +Boolean RTPSink_::continuePlaying() +{ + if (!fSource) + return False; + + fSource->getNextFrame( + fReceiveBuffer, + BUFFER_SIZE, + afterGettingFrame, + this, + onSourceClosure, + this + ); + + return True; +} + +static int sender(char *addr) +{ + (void)addr; + + H265VideoStreamDiscreteFramer *framer; + H265LatencyFramedSource *framedSource; + TaskScheduler *scheduler; + UsageEnvironment *env; + RTPSink *videoSink; + RTPSource *source; + RTPSink_ *sink; + + scheduler = BasicTaskScheduler::createNew(); + env = BasicUsageEnvironment::createNew(*scheduler); + + OutPacketBuffer::maxSize = 40 * 1000 * 1000; + + Port send_port(8888); + struct in_addr dst_addr; + dst_addr.s_addr = our_inet_addr("10.21.25.2"); + + Port recv_port(8889); + struct in_addr src_addr; + src_addr.s_addr = our_inet_addr("0.0.0.0"); + + Groupsock send_socket(*env, dst_addr, send_port, 255); + Groupsock recv_socket(*env, src_addr, recv_port, 255); + + /* sender */ + videoSink = H265VideoRTPSink::createNew(*env, &send_socket, 96); + framedSource = H265LatencyFramedSource::createNew(*env); + framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource); + + /* receiver */ + source = H265VideoRTPSource::createNew(*env, &recv_socket, 96); + sink = new RTPSink_(*env); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + videoSink->startPlaying(*framer, NULL, videoSink); + sink->startPlaying(*source, nullptr, nullptr); + env->taskScheduler().doEventLoop(); + + return 0; +} + +int main(int argc, char **argv) +{ + (void)argc, (void)argv; + + return sender(argv[2]); +} diff --git a/live555/latsink.cc b/live555/latsink.cc new file mode 100644 index 0000000..c1ab603 --- /dev/null +++ b/live555/latsink.cc @@ -0,0 +1,116 @@ +#include +#include +#include +#include +#include "latsink.hh" + +#define BUFFER_SIZE 40 * 1000 * 1000 + +static size_t frames = 0; +static size_t bytes = 0; +static std::chrono::high_resolution_clock::time_point start; +static std::chrono::high_resolution_clock::time_point last; + +static void thread_func(void) +{ + unsigned prev_frames = UINT_MAX; + + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + + if (prev_frames == frames) + break; + + prev_frames = frames; + } + + fprintf(stderr, "%zu %zu %lu\n", bytes, frames, + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count() + ); + + exit(EXIT_FAILURE); +} + +RTPLatencySink::RTPLatencySink(UsageEnvironment& env): + MediaSink(env) +{ + fReceiveBuffer = new uint8_t[BUFFER_SIZE]; +} + +RTPLatencySink::~RTPLatencySink() +{ +} + +void RTPLatencySink::uninit() +{ + stopPlaying(); + delete fReceiveBuffer; +} + +void RTPLatencySink::afterGettingFrame( + void *clientData, + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + ((RTPLatencySink *)clientData)->afterGettingFrame( + frameSize, + numTruncatedBytes, + presentationTime, + durationInMicroseconds + ); +} + +void RTPLatencySink::afterGettingFrame( + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + (void)frameSize, (void)numTruncatedBytes; + (void)presentationTime, (void)durationInMicroseconds; + + /* start loop that monitors activity and if there has been + * no activity for 2s (same as uvgRTP) the receiver is stopped) */ + if (!frames) + (void)new std::thread(thread_func); + + fprintf(stderr, "got frame\n"); + + if (++frames == 601) { + fprintf(stderr, "%zu %zu %lu\n", bytes, frames, + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count() + ); + exit(EXIT_SUCCESS); + } + + continuePlaying(); +} + +void RTPLatencySink::process() +{ +} + +Boolean RTPLatencySink::continuePlaying() +{ + if (!fSource) + return False; + + fSource->getNextFrame( + fReceiveBuffer, + BUFFER_SIZE, + afterGettingFrame, + this, + onSourceClosure, + this + ); + + return True; +} diff --git a/live555/latsink.hh b/live555/latsink.hh new file mode 100644 index 0000000..edd060b --- /dev/null +++ b/live555/latsink.hh @@ -0,0 +1,32 @@ +#include + +class RTPLatencySink : public MediaSink +{ + public: + RTPLatencySink(UsageEnvironment& env); + virtual ~RTPLatencySink(); + + void uninit(); + + static void afterGettingFrame( + void *clientData, + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds + ); + + void afterGettingFrame( + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds + ); + + protected: + void process(); + + private: + virtual Boolean continuePlaying(); + uint8_t *fReceiveBuffer; +}; diff --git a/live555/latsource.cc b/live555/latsource.cc new file mode 100644 index 0000000..24ab9f9 --- /dev/null +++ b/live555/latsource.cc @@ -0,0 +1,185 @@ +#include +#include +#include "latsource.hh" +#include +#include +#include +#include + +EventTriggerId H265LatencyFramedSource::eventTriggerId = 0; +unsigned H265LatencyFramedSource::referenceCount = 0; + +extern void *get_mem(int, char **, size_t&); +extern int get_next_frame_start(uint8_t *, uint32_t, uint32_t, uint8_t&); + +static uint8_t *buf; +static size_t offset = 0; +static size_t bytes = 0; +static uint64_t current = 0; +static uint64_t period = 0; +static bool initialized = false; + +std::mutex delivery_mtx; +std::queue> nals; +std::chrono::high_resolution_clock::time_point s_tmr, e_tmr; + +static const uint8_t *ff_avc_find_startcode_internal(const uint8_t *p, const uint8_t *end) +{ + const uint8_t *a = p + 4 - ((intptr_t)p & 3); + + for (end -= 3; p < a && p < end; p++) { + if (p[0] == 0 && p[1] == 0 && p[2] == 1) + return p; + } + + for (end -= 3; p < end; p += 4) { + uint32_t x = *(const uint32_t*)p; +// if ((x - 0x01000100) & (~x) & 0x80008000) // little endian +// if ((x - 0x00010001) & (~x) & 0x00800080) // big endian + if ((x - 0x01010101) & (~x) & 0x80808080) { // generic + if (p[1] == 0) { + if (p[0] == 0 && p[2] == 1) + return p; + if (p[2] == 0 && p[3] == 1) + return p+1; + } + if (p[3] == 0) { + if (p[2] == 0 && p[4] == 1) + return p+2; + if (p[4] == 0 && p[5] == 1) + return p+3; + } + } + } + + for (end += 3; p < end; p++) { + if (p[0] == 0 && p[1] == 0 && p[2] == 1) + return p; + } + + return end + 3; +} + +const uint8_t *ff_avc_find_startcode(const uint8_t *p, const uint8_t *end) +{ + const uint8_t *out= ff_avc_find_startcode_internal(p, end); + if (p < out && out < end && !out[-1]) out--; + return out; +} + +static std::pair find_next_nal(void) +{ + static size_t len = 0; + static uint8_t *p = NULL; + static uint8_t *end = NULL; + static uint8_t *nal_start = NULL; + static uint8_t *nal_end = NULL; + + if (!p) { + p = (uint8_t *)get_mem(0, NULL, len); + end = p + len; + len = 0; + + nal_start = (uint8_t *)ff_avc_find_startcode(p, end); + } + + while (nal_start < end && !*(nal_start++)) + ; + + if (nal_start == end) + return std::make_pair(0, nullptr); + + nal_end = (uint8_t *)ff_avc_find_startcode(nal_start, end); + auto ret = std::make_pair((size_t)(nal_end - nal_start), (uint8_t *)nal_start); + len += 4 + nal_end - nal_start; + nal_start = nal_end; + + return ret; +} + +H265LatencyFramedSource *H265LatencyFramedSource::createNew(UsageEnvironment& env, std::mutex& lat_mtx) +{ + return new H265LatencyFramedSource(env, lat_mtx); +} + +H265LatencyFramedSource::H265LatencyFramedSource(UsageEnvironment& env, std::mutex& lat_mtx): + FramedSource(env), + mtx_(lat_mtx) +{ + period = (uint64_t)((1000 / (float)30) * 1000); + + if (!eventTriggerId) + eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0); +} + +H265LatencyFramedSource::~H265LatencyFramedSource() +{ + if (!--referenceCount) { + envir().taskScheduler().deleteEventTrigger(eventTriggerId); + eventTriggerId = 0; + } +} + +void H265LatencyFramedSource::doGetNextFrame() +{ + if (!initialized) { + s_tmr = std::chrono::high_resolution_clock::now(); + initialized = true; + } + + deliverFrame(); +} + +void H265LatencyFramedSource::deliverFrame0(void *clientData) +{ + ((H265LatencyFramedSource *)clientData)->deliverFrame(); +} + +void H265LatencyFramedSource::deliverFrame() +{ + if (!isCurrentlyAwaitingData()) + return; + + mtx_.lock(); + fprintf(stderr, "send frame\n"); + + auto nal = find_next_nal(); + + if (!nal.first || !nal.second) { + e_tmr = std::chrono::high_resolution_clock::now(); + uint64_t diff = (uint64_t)std::chrono::duration_cast(e_tmr - s_tmr).count(); + fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n", + bytes, bytes / 1000, bytes / 1000 / 1000, + diff, diff / 1000 + ); + exit(EXIT_SUCCESS); + } + + uint64_t runtime = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - s_tmr + ).count(); + + if (runtime < current * period) + std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime)); + + /* try to hold fps for intra/inter frames only */ + if (nal.first > 1500) + ++current; + + uint8_t *newFrameDataStart = nal.second; + unsigned newFrameSize = nal.first; + + bytes += newFrameSize; + + if (newFrameSize > fMaxSize) { + fFrameSize = fMaxSize; + fNumTruncatedBytes = newFrameSize - fMaxSize; + } else { + fFrameSize = newFrameSize; + } + + fDurationInMicroseconds = 0; + memmove(fTo, newFrameDataStart, fFrameSize); + + FramedSource::afterGetting(this); +} diff --git a/live555/latsource.hh b/live555/latsource.hh new file mode 100644 index 0000000..b24c496 --- /dev/null +++ b/live555/latsource.hh @@ -0,0 +1,23 @@ +#ifndef __h265_framed_source_h__ +#define __h265_framed_source_h__ + +#include + +class H265LatencyFramedSource : public FramedSource { + public: + static H265LatencyFramedSource *createNew(UsageEnvironment& env); + static EventTriggerId eventTriggerId; + + protected: + H265LatencyFramedSource(UsageEnvironment& env); + virtual ~H265LatencyFramedSource(); + + private: + void deliverFrame(); + virtual void doGetNextFrame(); + static void deliverFrame0(void *clientData); + + static unsigned referenceCount; +}; + +#endif /* __h265_framed_source_h__ */ diff --git a/live555/receiver.cc b/live555/receiver.cc new file mode 100644 index 0000000..9d78c49 --- /dev/null +++ b/live555/receiver.cc @@ -0,0 +1,25 @@ +#include +#include +#include +#include "sink.hh" + +int main(int argc, char **argv) +{ + (void)argc, (void)argv; + + TaskScheduler *scheduler = BasicTaskScheduler::createNew(); + UsageEnvironment *env = BasicUsageEnvironment::createNew(*scheduler); + + Port rtpPort(8888); + struct in_addr dst_addr; + dst_addr.s_addr = our_inet_addr("0.0.0.0"); + Groupsock rtpGroupsock(*env, dst_addr, rtpPort, 255); + + OutPacketBuffer::maxSize = 40 * 1000 * 1000; + + RTPSource *source = H265VideoRTPSource::createNew(*env, &rtpGroupsock, 96); + RTPSink_ *sink = new RTPSink_(*env); + + sink->startPlaying(*source, nullptr, nullptr); + env->taskScheduler().doEventLoop(); +} diff --git a/live555/sender.cc b/live555/sender.cc new file mode 100644 index 0000000..ce1fb42 --- /dev/null +++ b/live555/sender.cc @@ -0,0 +1,37 @@ +#include +#include +#include +#include "source.hh" +#include "H265VideoStreamDiscreteFramer.hh" + +int main(int argc, char **argv) +{ + if (argc != 5) { + fprintf(stderr, "usage: ./%s <# of threads> \n", __FILE__); + return -1; + } + + H265VideoStreamDiscreteFramer *framer; + H265FramedSource *framedSource; + TaskScheduler *scheduler; + UsageEnvironment *env; + RTPSink *videoSink; + + scheduler = BasicTaskScheduler::createNew(); + env = BasicUsageEnvironment::createNew(*scheduler); + + framedSource = H265FramedSource::createNew(*env, atoi(argv[3])); + framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource); + + Port rtpPort(8888); + struct in_addr dst_addr; + dst_addr.s_addr = our_inet_addr("10.21.25.2"); + + Groupsock rtpGroupsock(*env, dst_addr, rtpPort, 255); + + OutPacketBuffer::maxSize = 40 * 1000 * 1000; + videoSink = H265VideoRTPSink::createNew(*env, &rtpGroupsock, 96); + + videoSink->startPlaying(*framer, NULL, videoSink); + env->taskScheduler().doEventLoop(); +} diff --git a/live555/sink.cc b/live555/sink.cc new file mode 100644 index 0000000..b22cadd --- /dev/null +++ b/live555/sink.cc @@ -0,0 +1,114 @@ +#include +#include +#include +#include +#include "sink.hh" + +#define BUFFER_SIZE 1600000 + +size_t frames = 0; +size_t bytes = 0; +std::chrono::high_resolution_clock::time_point start; +std::chrono::high_resolution_clock::time_point last; + +static void thread_func(void) +{ + unsigned prev_frames = UINT_MAX; + + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + + if (prev_frames == frames) + break; + + prev_frames = frames; + } + + fprintf(stderr, "%zu %zu %lu\n", bytes, frames, + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count() + ); + + exit(EXIT_FAILURE); +} + +RTPSink_::RTPSink_(UsageEnvironment& env): + MediaSink(env) +{ + fReceiveBuffer = new uint8_t[BUFFER_SIZE]; +} + +RTPSink_::~RTPSink_() +{ +} + +void RTPSink_::uninit() +{ + stopPlaying(); + delete fReceiveBuffer; +} + +void RTPSink_::afterGettingFrame( + void *clientData, + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + ((RTPSink_ *)clientData)->afterGettingFrame( + frameSize, + numTruncatedBytes, + presentationTime, + durationInMicroseconds + ); +} + +void RTPSink_::afterGettingFrame( + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds +) +{ + (void)frameSize, (void)numTruncatedBytes; + (void)presentationTime, (void)durationInMicroseconds; + + /* start loop that monitors activity and if there has been + * no activity for 2s (same as uvgRTP) the receiver is stopped) */ + if (!frames) + (void)new std::thread(thread_func); + + if (++frames == 601) { + fprintf(stderr, "%zu %zu %lu\n", bytes, frames, + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count() + ); + exit(EXIT_SUCCESS); + } + + continuePlaying(); +} + +void RTPSink_::process() +{ +} + +Boolean RTPSink_::continuePlaying() +{ + if (!fSource) + return False; + + fSource->getNextFrame( + fReceiveBuffer, + BUFFER_SIZE, + afterGettingFrame, + this, + onSourceClosure, + this + ); + + return True; +} diff --git a/live555/sink.hh b/live555/sink.hh new file mode 100644 index 0000000..ca8b436 --- /dev/null +++ b/live555/sink.hh @@ -0,0 +1,32 @@ +#include + +class RTPSink_ : public MediaSink +{ + public: + RTPSink_(UsageEnvironment& env); + virtual ~RTPSink_(); + + void uninit(); + + static void afterGettingFrame( + void *clientData, + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds + ); + + void afterGettingFrame( + unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds + ); + + protected: + void process(); + + private: + virtual Boolean continuePlaying(); + uint8_t *fReceiveBuffer; +}; diff --git a/live555/source.cc b/live555/source.cc new file mode 100644 index 0000000..21660c1 --- /dev/null +++ b/live555/source.cc @@ -0,0 +1,186 @@ +#include +#include +#include "source.hh" +#include +#include +#include +#include + +EventTriggerId H265FramedSource::eventTriggerId = 0; +unsigned H265FramedSource::referenceCount = 0; + +extern void *get_mem(int, char **, size_t&); +extern int get_next_frame_start(uint8_t *, uint32_t, uint32_t, uint8_t&); + +uint8_t *buf; +size_t offset = 0; +size_t bytes = 0; +uint64_t current = 0; +uint64_t period = 0; +bool initialized = false; + +std::mutex delivery_mtx; +std::queue> nals; +std::chrono::high_resolution_clock::time_point s_tmr, e_tmr; + +static const uint8_t *ff_avc_find_startcode_internal(const uint8_t *p, const uint8_t *end) +{ + const uint8_t *a = p + 4 - ((intptr_t)p & 3); + + for (end -= 3; p < a && p < end; p++) { + if (p[0] == 0 && p[1] == 0 && p[2] == 1) + return p; + } + + for (end -= 3; p < end; p += 4) { + uint32_t x = *(const uint32_t*)p; +// if ((x - 0x01000100) & (~x) & 0x80008000) // little endian +// if ((x - 0x00010001) & (~x) & 0x00800080) // big endian + if ((x - 0x01010101) & (~x) & 0x80808080) { // generic + if (p[1] == 0) { + if (p[0] == 0 && p[2] == 1) + return p; + if (p[2] == 0 && p[3] == 1) + return p+1; + } + if (p[3] == 0) { + if (p[2] == 0 && p[4] == 1) + return p+2; + if (p[4] == 0 && p[5] == 1) + return p+3; + } + } + } + + for (end += 3; p < end; p++) { + if (p[0] == 0 && p[1] == 0 && p[2] == 1) + return p; + } + + return end + 3; +} + +const uint8_t *ff_avc_find_startcode(const uint8_t *p, const uint8_t *end) +{ + const uint8_t *out= ff_avc_find_startcode_internal(p, end); + if (p < out && out < end && !out[-1]) out--; + return out; +} + +static std::pair find_next_nal(void) +{ + static size_t len = 0; + static uint8_t *p = NULL; + static uint8_t *end = NULL; + static uint8_t *nal_start = NULL; + static uint8_t *nal_end = NULL; + + if (!p) { + p = (uint8_t *)get_mem(0, NULL, len); + end = p + len; + len = 0; + + nal_start = (uint8_t *)ff_avc_find_startcode(p, end); + } + + while (nal_start < end && !*(nal_start++)) + ; + + if (nal_start == end) + return std::make_pair(0, nullptr); + + nal_end = (uint8_t *)ff_avc_find_startcode(nal_start, end); + auto ret = std::make_pair((size_t)(nal_end - nal_start), (uint8_t *)nal_start); + len += 4 + nal_end - nal_start; + nal_start = nal_end; + + return ret; +} + +H265FramedSource *H265FramedSource::createNew(UsageEnvironment& env, unsigned fps) +{ + return new H265FramedSource(env, fps); +} + +H265FramedSource::H265FramedSource(UsageEnvironment& env, unsigned fps): + FramedSource(env), + fps_(fps) +{ + period = (uint64_t)((1000 / (float)fps) * 1000); + + if (!eventTriggerId) + eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0); +} + +H265FramedSource::~H265FramedSource() +{ + if (!--referenceCount) { + envir().taskScheduler().deleteEventTrigger(eventTriggerId); + eventTriggerId = 0; + } +} + +void H265FramedSource::doGetNextFrame() +{ + if (!initialized) { + s_tmr = std::chrono::high_resolution_clock::now(); + initialized = true; + } + + deliverFrame(); +} + +void H265FramedSource::deliverFrame0(void *clientData) +{ + ((H265FramedSource *)clientData)->deliverFrame(); +} + +void H265FramedSource::deliverFrame() +{ + if (!isCurrentlyAwaitingData()) + return; + + delivery_mtx.lock(); + + auto nal = find_next_nal(); + + if (!nal.first || !nal.second) { + e_tmr = std::chrono::high_resolution_clock::now(); + uint64_t diff = (uint64_t)std::chrono::duration_cast(e_tmr - s_tmr).count(); + fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n", + bytes, bytes / 1000, bytes / 1000 / 1000, + diff, diff / 1000 + ); + exit(EXIT_SUCCESS); + } + + uint64_t runtime = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - s_tmr + ).count(); + + if (runtime < current * period) + std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime)); + + /* try to hold fps for intra/inter frames only */ + if (nal.first > 1500) + ++current; + + uint8_t *newFrameDataStart = nal.second; + unsigned newFrameSize = nal.first; + + bytes += newFrameSize; + + if (newFrameSize > fMaxSize) { + fFrameSize = fMaxSize; + fNumTruncatedBytes = newFrameSize - fMaxSize; + } else { + fFrameSize = newFrameSize; + } + + fDurationInMicroseconds = 0; + memmove(fTo, newFrameDataStart, fFrameSize); + + delivery_mtx.unlock(); + + FramedSource::afterGetting(this); +} diff --git a/live555/source.hh b/live555/source.hh new file mode 100644 index 0000000..137f0e2 --- /dev/null +++ b/live555/source.hh @@ -0,0 +1,36 @@ +#ifndef __h265_framed_source_h__ +#define __h265_framed_source_h__ + +#include + +class H265FramedSource: public FramedSource { +public: + static H265FramedSource *createNew(UsageEnvironment& env, unsigned fps); + +public: + static EventTriggerId eventTriggerId; + // Note that this is defined here to be a static class variable, because this code is intended to illustrate how to + // encapsulate a *single* device - not a set of devices. + // You can, however, redefine this to be a non-static member variable. + void deliver_frame(); + +protected: + H265FramedSource(UsageEnvironment& env, unsigned fps); + // called only by createNew(), or by subclass constructors + virtual ~H265FramedSource(); + +private: + // redefined virtual functions: + virtual void doGetNextFrame(); + //virtual void doStopGettingFrames(); // optional + +private: + static void deliverFrame0(void* clientData); + void deliverFrame(); + +private: + static unsigned referenceCount; // used to count how many instances of this class currently exist + unsigned fps_; +}; + +#endif /* __h265_framed_source_h__ */ diff --git a/parse.pl b/parse.pl new file mode 100755 index 0000000..5b25648 --- /dev/null +++ b/parse.pl @@ -0,0 +1,427 @@ +#!/usr/bin/env perl + +use warnings; +use strict; +use Getopt::Long; +use Cwd qw(realpath); + +my $TOTAL_FRAMES_UVGRTP = 602; +my $TOTAL_FRAMES_LIVE555 = 601; +my $TOTAL_FRAMES_FFMPEG = 598; +my $TOTAL_BYTES = 411410113; + +# open the file, validate it and return file handle to caller +sub open_file { + my ($path, $expect) = @_; + my $lines = 0; + + open(my $fh, '<', $path) or die "failed to open file: $path"; + $lines++ while (<$fh>); + + # if ($lines != $expect) { + # return undef; + # } + + seek $fh, 0, 0; + return $fh; +} + +sub goodput { + if ($_[2] eq "mbit") { return ($_[0] / 1000 / 1000) / $_[1] * 8 * 1000; } + elsif ($_[2] eq "mb") { return ($_[0] / 1000 / 1000) / $_[1] * 1000; } + else { return ($_[0] / 1000 / 1000) / $_[1] * 8; } +} + +sub get_frame_count { + return ($_[0] eq "uvgrtp") ? $TOTAL_FRAMES_UVGRTP : + ($_[0] eq "ffmpeg") ? $TOTAL_FRAMES_FFMPEG : $TOTAL_FRAMES_LIVE555; +} + +sub parse_send { + my ($lib, $iter, $threads, $path, $unit) = @_; + + my ($t_usr, $t_sys, $t_cpu, $t_total, $t_time); + my ($t_sgp, $t_tgp, $fh); + + if ($lib eq "uvgrtp") { + my $e = ($iter * ($threads + 2)); + $fh = open_file($path, $e); + return if not defined $fh; + } else { + open $fh, '<', $path or die "failed to open file\n"; + } + + # each iteration parses one benchmark run + # and each benchmark run can have 1..N entries, one for each thread + START: while (my $line = <$fh>) { + my $rt_avg = 0; + my $rb_avg = 0; + + next if index ($line, "kB") == -1 or index ($line, "MB") == -1; + + # for multiple threads there are two numbers: + # - single thread performance + # -> for each thread, calculate the speed at which the data was sent, + # sum all those together and divide by the number of threads + # + # - total performance + # -> (amount of data * number of threads) / total time spent + # + for (my $i = 0; $i < $threads; $i++) { + next START if grep /terminated|corrupt/, $line; + my @nums = $line =~ /(\d+)/g; + $rt_avg += $nums[3]; + $line = <$fh>; + } + $rt_avg /= $threads; + + next START if grep /terminated|corrupt/, $line; + $line = <$fh> if grep /flush|Command/, $line; + my ($usr, $sys, $total, $cpu) = ($line =~ m/(\d+\.\d+)user\s(\d+\.\d+)system\s0:(\d+.\d+)elapsed\s(\d+)%CPU/); + + # discard line about inputs, outputs and pagefaults + $line = <$fh>; + + # update total + $t_usr += $usr; + $t_sys += $sys; + $t_cpu += $cpu; + $t_total += $total; + $t_sgp += goodput($TOTAL_BYTES, $rt_avg, $unit); + } + + $t_sgp = $t_sgp / $iter; + $t_tgp = ($threads > 1) ? goodput($TOTAL_BYTES * $threads, $t_total / $iter, $unit) : $t_sgp; + + close $fh; + return ($path, $t_usr / $iter, $t_sys / $iter, $t_cpu / $iter, $t_total / $iter, $t_sgp, $t_tgp); +} + +sub parse_recv { + my ($lib, $iter, $threads, $path, $unit) = @_; + my ($t_usr, $t_sys, $t_cpu, $t_total, $tb_avg, $tf_avg, $tt_avg, $fh); + my $tf = get_frame_count($lib); + + if ($lib eq "uvgrtp") { + my $e = ($iter * ($threads + 2)); + $fh = open_file($path, $e); + } else { + open $fh, '<', $path or die "failed to open file $path\n"; + } + + # each iteration parses one benchmark run + while (my $line = <$fh>) { + my ($a_f, $a_b, $a_t) = (0) x 3; + + # make sure this is a line produced by the benchmarking script before proceeding + if ($lib eq "ffmpeg") { + my @nums = $line =~ /(\d+)/g; + next if $#nums != 2 or grep /jitter/, $line; + } + + # calculate avg bytes/frames/time + for (my $i = 0; $i < $threads; $i++) { + my @nums = $line =~ /(\d+)/g; + + $a_b += $nums[0]; + $a_f += $nums[1]; + $a_t += $nums[2]; + + $line = <$fh>; + } + + $tf_avg += ($a_f / $threads); + $tb_avg += ($a_b / $threads); + $tt_avg += ($a_t / $threads); + + my ($usr, $sys, $total, $cpu) = ($line =~ m/(\d+\.\d+)user\s(\d+\.\d+)system\s0:(\d+.\d+)elapsed\s(\d+)%CPU/); + + # discard line about inputs, outputs and pagefaults + $line = <$fh>; + + # update total + $t_usr += $usr; + $t_sys += $sys; + $t_cpu += $cpu; + $t_total += $total; + } + + my $bytes = 100 * (($tb_avg / $iter) / $TOTAL_BYTES); + my $frames = 100 * (($tf_avg / $iter) / $tf); + my $gp = goodput(($TOTAL_BYTES * ($bytes / 100), ($tt_avg / $iter)), $unit); + + close $fh; + return ($path, $t_usr / $iter, $t_sys / $iter, $t_cpu / $iter, $t_total / $iter, $frames, $bytes, $gp); +} + +sub print_recv { + my ($path, $usr, $sys, $cpu, $total, $a_f, $a_b, $a_t) = parse_recv(@_); + + if (defined $path) { + print "$path: \n"; + print "\tuser: $usr \n"; + print "\tsystem: $sys \n"; + print "\tcpu: $cpu \n"; + print "\ttotal: $total\n"; + print "\tavg frames: $a_f\n"; + print "\tavg bytes: $a_b\n"; + print "\trecv goodput: $a_t\n"; + } +} + +sub print_send { + my ($path, $usr, $sys, $cpu, $total, $sgp, $tgp) = parse_send(@_); + + if (defined $path) { + print "$path: \n"; + print "\tuser: $usr\n"; + print "\tsystem: $sys\n"; + print "\tcpu: $cpu\n"; + print "\ttotal: $total\n"; + print "\tgoodput, single: $sgp\n"; + print "\tgoodput, total: $tgp\n"; + } +} + +sub parse_csv { + my ($lib, $iter, $path, $unit) = @_; + my ($threads, $fps, $ofps, $fiter, %a) = (0) x 4; + opendir my $dir, realpath($path); + + foreach my $fh (grep /(recv|send)/, readdir $dir) { + ($threads, $ofps, $fiter) = ($fh =~ /(\d+)threads_(\d+)fps_(\d+)iter/g); + $iter = $fiter if $fiter; + print "unable to determine iter, skipping file $fh\n" and next if !$iter; + $fps = sprintf("%05d", $ofps); + my @values; + + if (grep /recv/, $fh) { + @values = parse_recv($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit); + shift @values; + + if (not exists $a{"$threads $fps"}) { + $a{"$threads $fps"} = join(" ", @values); + } else { + $a{"$threads $fps"} = join(" ", @values) . " " . $a{"$threads $fps"}; + } + + } else { + @values = parse_send($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit); + shift @values; + + if (not exists $a{"$threads $fps"}) { + $a{"$threads $fps"} = join(" ", @values) . " $ofps"; + } else { + $a{"$threads $fps"} = $a{"$threads $fps"} . " " . join(" ", @values) . " $ofps"; + } + } + } + + my $c_key = 0; + open my $cfh, '>', "$lib.csv" or die "failed to open file: $lib.csv"; + my (@r_u, @r_s, @r_c, @r_t, @r_f, @r_b, @r_m) = () x 7; + my (@s_u, @s_s, @s_c, @s_t, @s_sg, @s_tg, @s_f) = () x 7; + + foreach my $key (sort(keys %a)) { + my $spz = (split " ", $key)[0]; + + if ($spz != $c_key){ + if ($spz ne 1) { + print $cfh "recv usr;" . join(";", @r_u) . "\n"; + print $cfh "recv sys;" . join(";", @r_s) . "\n"; + print $cfh "recv cpu;" . join(";", @r_c) . "\n"; + print $cfh "recv total;" . join(";", @r_t) . "\n"; + print $cfh "frames received;". join(";", @r_f) . "\n"; + print $cfh "bytes received;" . join(";", @r_b) . "\n"; + print $cfh "time estimate;" . join(";", @r_m) . "\n"; + print $cfh "send usr;" . join(";", @s_u) . "\n"; + print $cfh "send sys;" . join(";", @s_s) . "\n"; + print $cfh "send cpu;" . join(";", @s_c) . "\n"; + print $cfh "send total;" . join(";", @s_t) . "\n"; + print $cfh "single goodput;" . join(";", @s_sg) . "\n"; + print $cfh "total goodput;" . join(";", @s_tg) . "\n"; + print $cfh "fps;" . join(";", @s_f) . "\n\n"; + } + + print $cfh "$spz threads;\n"; + $c_key = $spz; + (@r_f, @r_b, @r_m, @r_c, @r_u, @r_s, @r_t) = () x 7; + (@s_c, @s_u, @s_s, @s_t, @s_sg, @s_tg, @s_f) = () x 7; + } + + my @comp = split " ", $a{$key}; + push @r_u, $comp[0]; push @r_s, $comp[1]; push @r_c, $comp[2]; + push @r_t, $comp[3]; push @r_f, $comp[4]; push @r_b, $comp[5]; + push @r_m, $comp[6]; push @s_u, $comp[7]; push @s_s, $comp[8]; + push @s_c, $comp[9]; push @s_t, $comp[10]; push @s_sg, $comp[11]; + push @s_tg, $comp[12]; push @s_f, $comp[13]; + } + + print $cfh "recv usr;" . join(";", @r_u) . "\n"; + print $cfh "recv sys;" . join(";", @r_s) . "\n"; + print $cfh "recv cpu;" . join(";", @r_c) . "\n"; + print $cfh "recv total;" . join(";", @r_t) . "\n"; + print $cfh "frames received;". join(";", @r_f) . "\n"; + print $cfh "bytes received;" . join(";", @r_b) . "\n"; + print $cfh "recv goodput;" . join(";", @r_m) . "\n"; + print $cfh "send usr;" . join(";", @s_u) . "\n"; + print $cfh "send sys;" . join(";", @s_s) . "\n"; + print $cfh "send cpu;" . join(";", @s_c) . "\n"; + print $cfh "send total;" . join(";", @s_t) . "\n"; + print $cfh "single goodput;" . join(";", @s_sg) . "\n"; + print $cfh "total goodput;" . join(";", @s_tg) . "\n"; + print $cfh "fps;" . join(";", @s_f) . "\n"; + + close $cfh; +} + +sub parse { + my ($lib, $iter, $path, $pkt_loss, $frame_loss, $type, $unit) = @_; + my ($tgp, $tgp_k, $sgp, $sgp_k, $threads, $fps, $fiter, %a) = (0) x 7; + opendir my $dir, realpath($path); + + foreach my $fh (grep /recv/, readdir $dir) { + ($threads, $fps, $fiter) = ($fh =~ /(\d+)threads_(\d+)fps_(\d+)iter/g); + $iter = $fiter if $fiter; + print "unable to determine iter, skipping file $fh\n" and next if !$iter; + + my @values = parse_recv($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit); + + if (100.0 - $values[5] <= $frame_loss and 100.0 - $values[6] <= $pkt_loss) { + $a{"$threads $fps"} = $path; + } + } + + rewinddir $dir; + + foreach my $fh (grep /send/, readdir $dir) { + ($threads, $fps, $fiter) = ($fh =~ /(\d+)threads_(\d+)fps_(\d+)iter/g); + $iter = $fiter if $fiter; + print "unable to determine iter, skipping file $fh\n" and next if !$iter; + + my @values = parse_send($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit); + + if (exists $a{"$threads $fps"}) { + if ($type eq "best") { + if ($values[5] > $sgp) { + $sgp = $values[5]; + $sgp_k = $fh; + } + + if ($values[6] > $tgp) { + $tgp = $values[6]; + $tgp_k = $fh; + } + } else { + print "$fh: $values[5] $values[6]\n" if exists $a{"$threads $fps"}; + } + } + } + + closedir $dir; + exit if $type eq "all"; + + if ($sgp_k) { + print "best goodput, single thread: $sgp_k\n"; + ($threads, $fps) = ($sgp_k =~ /(\d+)threads_(\d+)/g); + print_send($lib, $iter, $threads, realpath($path) . "/" . $sgp_k, $unit); + } else { + print "nothing found for single best goodput\n"; + } + + if ($tgp_k) { + print "\nbest goodput, total: $tgp_k\n"; + ($threads, $fps) = ($tgp_k =~ /(\d+)threads_(\d+)/g); + print_send($lib, $iter, $threads, realpath($path) . "/" . $tgp_k, $unit); + } else { + print "nothing found for total best goodput\n"; + } +} + +sub parse_latency { + my ($lib, $iter, $path, $unit) = @_; + my ($ts, $avg, $intra, $inter, $cnt) = (0) x 5; + + open my $fh, '<', $path or die "failed to open file $path\n"; + + # each iteration parses one benchmark run + while (my $line = <$fh>) { + my @nums = ($line =~ m/(\d+).*intra\s(\d+\.\d+).*inter\s(\d+\.\d+).*avg\s(\d+\.\d+)/); + + $frames += $nums[0]; + $intra += $nums[1]; + $inter += $nums[2]; + $avg += $nums[3]; + $cnt += 1; + } + + $intra /= $cnt; + $inter /= $cnt; + $avg /= $cnt; + $frames /= get_frame_count($lib); + + print "$frames: intra $intra, inter $inter, avg $avg\n"; +} + +sub print_help { + print "usage (one file, send/recv):\n ./parse.pl \n" + . "\t--lib \n" + . "\t--role \n" + . "\t--unit (defaults to mb)\n" + . "\t--path \n" + . "\t--iter <# of iterations>)\n" + . "\t--threads <# of threads used in the benchmark> (defaults to 1)\n\n"; + + print "usage (latency):\n ./parse.pl \n" + . "\t--unit (defaults to mb)\n" + . "\t--path \n" + . "\t--parse latency\n\n"; + + print "usage (directory):\n ./parse.pl \n" + . "\t--parse \n" + . "\t--lib \n" + . "\t--iter <# of iterations>)\n" + . "\t--unit (defaults to mb)\n" + . "\t--packet-loss (optional)\n" + . "\t--frame-loss (optional)\n" + . "\t--path \n" and exit; +} + +GetOptions( + "lib|l=s" => \(my $lib = ""), + "role|r=s" => \(my $role = ""), + "path|p=s" => \(my $path = ""), + "threadst|=i" => \(my $threads = 0), + "iter|i=i" => \(my $iter = 0), + "parse|s=s" => \(my $parse = ""), + "packet-loss|p=f" => \(my $pkt_loss = 100.0), + "frame-loss|f=f" => \(my $frame_loss = 100.0), + "unit=s" => \(my $unit = "mb"), + "help" => \(my $help = 0) +) or die "failed to parse command line!\n"; + +$lib = $1 if (!$lib and $path =~ m/.*(uvgrtp|ffmpeg|live555).*/i); +$role = $1 if (!$role and $path =~ m/.*(recv|send).*/i); +$threads = $1 if (!$threads and $path =~ m/.*_(\d+)threads.*/i); +$iter = $1 if (!$iter and $path =~ m/.*_(\d+)iter.*/i); + +print_help() if $help or (!$lib and $parse ne "latency"); +print_help() if !$iter and !$parse; +print_help() if !$parse and (!$role or !$threads); +print_help() if !grep /$unit/, ("mb", "mbit", "gbit"); + +die "not implemented\n" if !grep (/$lib/, ("uvgrtp", "ffmpeg", "live555")); + +if ($parse eq "best" or $parse eq "all") { + parse($lib, $iter, $path, $pkt_loss, $frame_loss, $parse, $unit); +} elsif ($parse eq "csv") { + parse_csv($lib, $iter, $path, $unit); +} elsif ($parse eq "latency") { + parse_latency($lib, $iter, $path, $unit); +} elsif ($role eq "send") { + print_send($lib, $iter, $threads, $path, $unit); +} elsif ($role eq "recv") { + print_recv($lib, $iter, $threads, $path, $unit); +} else { + die "unknown option!\n"; +} diff --git a/udperf.c b/udperf.c new file mode 100644 index 0000000..51c3af6 --- /dev/null +++ b/udperf.c @@ -0,0 +1,181 @@ +/* + * compile: gcc udperf.c + * start server: ./a.out -s + * start client: ./a.out -a 127.0.0.1 + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef PORT +# define PORT 8888 +#endif +#ifndef PKT_LEN +# define PKT_LEN 1458 +#endif +#ifndef MAX_ROUNDS +# define MAX_ROUNDS 10 +#endif +#ifndef MAX_PKTS +# define MAX_PKTS 350000 +#endif + +static inline float diff_ms(struct timespec s, struct timespec e) +{ + return ((float)((e.tv_sec - s.tv_sec) * (long)1e9 + e.tv_nsec - s.tv_nsec)) / 1000 / 1000; +} + +static int usage(void) +{ + fprintf(stderr, "server: ./%s -s\n", __FILE__); + fprintf(stderr, "client: ./%s -a \n", __FILE__); + + return -1; +} + +static int server(void) +{ + int s_u, s_t, s_n; + struct sockaddr_in sa_u, sa_t; + + /* initialize server udp socket */ + memset(&sa_u, 0, sizeof(sa_u)); + + sa_u.sin_family = AF_INET; + sa_u.sin_port = htons(PORT); + sa_u.sin_addr.s_addr = htonl(INADDR_ANY); + + s_u = socket(AF_INET, SOCK_DGRAM, 0); + + (void)bind(s_u, (struct sockaddr *)&sa_u, sizeof(sa_u)); + + /* initialize server tcp socket */ + memset(&sa_t, 0, sizeof(sa_t)); + + sa_t.sin_family = AF_INET; + sa_t.sin_port = htons(PORT + 1); + sa_t.sin_addr.s_addr = htonl(INADDR_ANY); + + s_t = socket(AF_INET, SOCK_STREAM, 0); + (void)setsockopt(s_t, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int)); + (void)bind(s_t, (struct sockaddr *)&sa_t, sizeof(sa_t)); + + (void)listen(s_t, 1); + s_n = accept(s_t, (struct sockaddr *)&sa_t, &(socklen_t){ sizeof(sa_t) }); + + /* receive packets from remote and once select() timeouts, + * send how many packets were received */ + uint8_t buffer[PKT_LEN]; + + fd_set read_fs; + FD_ZERO(&read_fs); + + for (int i = 0, npkts = 0; i < MAX_ROUNDS; ++i, npkts = 0) { + while (npkts != MAX_PKTS) { + FD_SET(s_u, &read_fs); + + if (!select(s_u + 1, &read_fs, NULL, NULL, &(struct timeval){ 2, 0 })) + break; + + while (recv(s_u, buffer, PKT_LEN, MSG_DONTWAIT) > 0) + ++npkts; + } + (void)send(s_n, &npkts, sizeof(int), 0); + } + + return 0; +} + +static int client(char *server_addr) +{ + float total = 0; + struct timespec start, end; + uint8_t data[PKT_LEN] = { 0 }; + struct sockaddr_in sa_t, sa_u; + int s_u, s_t, runtime, pkts, bytes; + + if (!server_addr) + return usage(); + + /* initialize client udp socket */ + memset(&sa_u, 0, sizeof(sa_u)); + + sa_u.sin_family = AF_INET; + sa_u.sin_port = htons(PORT); + + (void)inet_pton(AF_INET, server_addr, &sa_u.sin_addr); + s_u = socket(AF_INET, SOCK_DGRAM, 0); + + /* initialize client tcp socket */ + memset(&sa_t, 0, sizeof(sa_t)); + + sa_t.sin_family = AF_INET; + sa_t.sin_port = htons(PORT + 1); + + (void)inet_pton(AF_INET, server_addr, &sa_t.sin_addr); + s_t = socket(AF_INET, SOCK_STREAM, 0); + + (void)connect(s_t, (struct sockaddr *)&sa_t, sizeof(sa_t)); + + for (int i = 0; i < MAX_ROUNDS; ++i) { + + clock_gettime(CLOCK_MONOTONIC, &start); + + for (int k = 0; k < MAX_PKTS; ++k) + (void)sendto(s_u, data, PKT_LEN, 0, (struct sockaddr *)&sa_u, sizeof(sa_u)); + + clock_gettime(CLOCK_MONOTONIC, &end); + + runtime = diff_ms(start, end); + bytes = MAX_PKTS * PKT_LEN; + + /* read how many packets the server received */ + (void)recv(s_t, &pkts, sizeof(int), 0); + + fprintf(stderr, "%.2lf Gb/s, %.2lf Mb/s, %d MB transferred, %.2f%% packets received\n", + (float)bytes * 8 / 1000 / 1000 / 1000 / runtime * 1000, + (float)bytes * 8 / 1000 / 1000 / runtime * 1000, + bytes / 1000 / 1000, + ((float)pkts / MAX_PKTS) * 100 + ); + + total += (float)bytes * 8 / 1000 / 1000 / 1000 / runtime * 1000; + } + + fprintf(stderr, "\naverage goodput: %.2lf Gb/s\n", (float)total / MAX_ROUNDS); + + return 0; +} + +int main(int argc, char **argv) +{ + char *cvalue = NULL; + int srv = 0; + int c; + + opterr = 0; + + while ((c = getopt(argc, argv, "sa:")) != -1) { + switch (c) { + case 'a': + cvalue = optarg; + break; + + case 's': + srv = 1; + break; + + default: + return usage(); + } + } + + return srv ? server() : client(cvalue); +} diff --git a/util/live555_util.cc b/util/live555_util.cc new file mode 100644 index 0000000..07cf232 --- /dev/null +++ b/util/live555_util.cc @@ -0,0 +1,199 @@ +#include +#include +#include + +#include "live555_util.hh" + +#define MAX_WRITE_SIZE 1444 + +extern int get_next_frame_start(uint8_t *data, uint32_t offset, uint32_t data_len, uint8_t& start_len); + +FramedSourceCustom::FramedSourceCustom(UsageEnvironment *env) + :FramedSource(*env) +{ + len_ = 0; + off_ = 0; + chunk_ptr_ = 0; + afterEvent_ = envir().taskScheduler().createEventTrigger((TaskFunc*)FramedSource::afterGetting); +} + +FramedSourceCustom::~FramedSourceCustom() +{ +} + +void FramedSourceCustom::doGetNextFrame() +{ + if (isCurrentlyAwaitingData()) + sendFrame(); +} + +void FramedSourceCustom::doStopGettingFrames() +{ + noMoreTasks_ = true; +} + +void FramedSourceCustom::splitIntoNals() +{ + uint8_t start_len; + int32_t prev_offset = 0; + int offset = get_next_frame_start((uint8_t *)c_chunk_, 0, c_chunk_len_, start_len); + prev_offset = offset; + + while (offset != -1) { + offset = get_next_frame_start((uint8_t *)c_chunk_, offset, c_chunk_len_, start_len); + + if (offset > 4 && offset != -1) { + nals_.push(std::make_pair(offset - prev_offset - start_len, &c_chunk_[prev_offset])); + prev_offset = offset; + } + } + + if (prev_offset == -1) + prev_offset = 0; + + nals_.push(std::make_pair(c_chunk_len_ - prev_offset, &c_chunk_[prev_offset])); +} + +void FramedSourceCustom::sendFrame() +{ + /* initialization is not ready but scheduler + * is already asking for frames, send empty frame */ + if (len_ == 0 && off_ == 0) { + fFrameSize = 0; + envir().taskScheduler().triggerEvent(afterEvent_, this); + return; + } + + if (c_chunk_ == nullptr) { + fpt_start_ = std::chrono::high_resolution_clock::now(); + + if (chunks_.empty()) { + printStats(); + } + + /* TODO: framer */ + + auto cinfo = chunks_.front(); + chunks_.pop(); + + c_chunk_ = cinfo.second; + c_chunk_len_ = cinfo.first; + c_chunk_off_ = 0; + + splitIntoNals(); + } + + if (c_nal_ == nullptr) { + auto ninfo = nals_.front(); + nals_.pop(); + + c_nal_ = ninfo.second; + c_nal_len_ = ninfo.first; + c_nal_off_ = 0; + } + + void *send_ptr = nullptr; + size_t send_len = 0; + size_t send_off = 0; + + if (c_nal_len_ < MAX_WRITE_SIZE) { + send_len = c_nal_len_; + send_ptr = c_nal_; + send_off = 0; + } else { + int left = c_nal_len_ - c_nal_off_; + + if (left < MAX_WRITE_SIZE) + send_len = left; + else + send_len = MAX_WRITE_SIZE; + + send_ptr = c_nal_; + send_off = c_nal_off_; + } + + memcpy(fTo, (uint8_t *)send_ptr + send_off, send_len); + fFrameSize = send_len; + afterGetting(this); + + /* check if we need to change chunk or nal unit */ + bool nal_written_fully = (c_nal_len_ <= c_nal_off_ + send_len); + + if (nal_written_fully && nals_.empty()) { + c_chunk_ = nullptr; + + n_calls_++; + fpt_end_ = std::chrono::high_resolution_clock::now(); + diff_total_ += std::chrono::duration_cast(fpt_end_ - fpt_start_).count(); + } else { + if (!nal_written_fully) { + c_nal_off_ += send_len; + } else { + c_nal_ = nullptr; + } + } +} + +void FramedSourceCustom::printStats() +{ + *stop_ = 1; + end_ = std::chrono::high_resolution_clock::now(); + + uint64_t diff = (uint64_t)std::chrono::duration_cast(end_ - start_).count(); + + fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n", + total_size_, total_size_ / 1000, total_size_ / 1000 / 1000, + diff, diff / 1000 + ); + + fprintf(stderr, "n calls %u\n", n_calls_); + fprintf(stderr, "avg processing time of frame: %lu\n", diff_total_ / n_calls_); +} + +void FramedSourceCustom::startFramedSource(void *mem, size_t len, char *stop_rtp) +{ + mem_ = mem; + len_ = len; + off_ = 0; + n_calls_ = 0; + diff_total_ = 0; + stop_ = stop_rtp; + + c_chunk_ = nullptr; + c_chunk_len_ = 0; + c_chunk_off_ = 0; + total_size_ = 0; + + uint64_t chunk_size = 0; + + for (size_t i = 0, k = 0; i < len && k < 3000; k++) { + memcpy(&chunk_size, (uint8_t *)mem + i, sizeof(uint64_t)); + + i += sizeof(uint64_t); + + chunks_.push(std::make_pair(chunk_size, (uint8_t *)mem_ + i)); + + i += chunk_size; + total_size_ += chunk_size; + } + + start_ = std::chrono::high_resolution_clock::now(); +} + +void createConnection( + UsageEnvironment *env, + Connection& connection, + std::string sess_addr, + std::string ip_addr, + uint16_t portNum +) +{ + sockaddr_in addr, sess; + inet_pton(AF_INET, ip_addr.c_str(), &addr.sin_addr); + inet_pton(AF_INET, sess_addr.c_str(), &sess.sin_addr); + + connection.rtpPort = new Port(0); + connection.rtpGroupsock = new Groupsock(*env, sess.sin_addr, addr.sin_addr, *connection.rtpPort); + connection.rtpGroupsock->changeDestinationParameters(addr.sin_addr, portNum, 255); +} + diff --git a/util/live555_util.hh b/util/live555_util.hh new file mode 100644 index 0000000..e5b8c8f --- /dev/null +++ b/util/live555_util.hh @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +class FramedSourceCustom : public FramedSource +{ +public: + FramedSourceCustom(UsageEnvironment *env); + ~FramedSourceCustom(); + + void startFramedSource(void *mem, size_t len, char *stop_rtp); + virtual void doGetNextFrame(); + +protected: + virtual void doStopGettingFrames(); + +private: + void sendFrame(); + void printStats(); + void splitIntoNals(); + + EventTriggerId afterEvent_; + + bool separateInput_; + bool ending_; + bool removeStartCodes_; + + char *stop_; + + int n_calls_; + uint64_t diff_total_; + + uint64_t total_size_; + + void *mem_; + int len_; + int off_; + + uint8_t *c_nal_; + size_t c_nal_len_; + size_t c_nal_off_; + std::queue> nals_; + + uint8_t *c_chunk_; + size_t c_chunk_len_; + size_t c_chunk_off_; + + int chunk_ptr_; + std::queue> chunks_; + std::mutex mutex_; + + TaskToken currentTask_; + std::chrono::high_resolution_clock::time_point start_, end_, fpt_end_, fpt_start_; + bool noMoreTasks_; +}; + +struct Connection { + Port *rtpPort; + Port *rtcpPort; + Groupsock *rtpGroupsock; + Groupsock *rtcpGroupsock; +}; + +void createConnection(UsageEnvironment *env, Connection& conn, std::string sess, std::string ip, uint16_t port); diff --git a/util/util.cc b/util/util.cc new file mode 100644 index 0000000..73d0ca3 --- /dev/null +++ b/util/util.cc @@ -0,0 +1,172 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +int kvazaar_encode(char *input, char *output); + +void *get_mem(int argc, char **argv, size_t& len) +{ + char *input = NULL; + char *output = NULL; + + if (argc != 3) { + input = (char *)"util/video.raw"; + output = (char *)"util/out.hevc"; + } else { + input = argv[1]; + output = argv[2]; + } + + if (access(output, F_OK) == -1) { + (void)kvazaar_encode(input, output); + } + + int fd = open(output, O_RDONLY, 0); + + struct stat st; + stat(output, &st); + len = st.st_size; + + void *mem = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_POPULATE, fd, 0); + + madvise(mem, len, MADV_SEQUENTIAL | MADV_WILLNEED); + + return mem; +} + +int get_next_frame_start(uint8_t *data, uint32_t offset, uint32_t data_len, uint8_t& start_len) +{ + uint8_t zeros = 0; + uint32_t pos = 0; + + while (offset + pos < data_len) { + if (zeros >= 2 && data[offset + pos] == 1) { + start_len = zeros + 1; + return offset + pos + 1; + } + + if (data[offset + pos] == 0) + zeros++; + else + zeros = 0; + + pos++; + } + + return -1; +} + +int kvazaar_encode(char *input, char *output) +{ + FILE *inputFile = fopen(input, "r"); + FILE *outputFile = fopen(output, "w"); + + int width = 3840; + int height = 2160; + + kvz_encoder* enc = NULL; + const kvz_api * const api = kvz_api_get(8); + kvz_config* config = api->config_alloc(); + api->config_init(config); + api->config_parse(config, "preset", "ultrafast"); + config->width = width; + config->height = height; + config->hash = kvz_hash::KVZ_HASH_NONE; + config->intra_period = 64; + config->qp = 21; + config->framerate_num = 120; + config->framerate_denom = 1; + + enc = api->encoder_open(config); + if (!enc) { + fprintf(stderr, "Failed to open encoder.\n"); + return EXIT_FAILURE; + } + + kvz_picture *img_in[16]; + for (uint32_t i = 0; i < 16; ++i) { + img_in[i] = api->picture_alloc_csp(KVZ_CSP_420, width, height); + } + + uint8_t inputCounter = 0; + uint8_t outputCounter = 0; + bool done = false; + /* int r = 0; */ + + while (!done) { + kvz_data_chunk* chunks_out = NULL; + kvz_picture *img_rec = NULL; + kvz_picture *img_src = NULL; + uint32_t len_out = 0; + kvz_frame_info info_out; + + if (!fread(img_in[inputCounter]->y, width*height, 1, inputFile)) { + done = true; + continue; + } + if (!fread(img_in[inputCounter]->u, width*height>>2, 1, inputFile)) { + done = true; + continue; + } + if (!fread(img_in[inputCounter]->v, width*height>>2, 1, inputFile)) { + done = true; + continue; + } + + if (!api->encoder_encode(enc, + img_in[inputCounter], + &chunks_out, &len_out, &img_rec, &img_src, &info_out)) + { + fprintf(stderr, "Failed to encode image.\n"); + for (uint32_t i = 0; i < 16; i++) { + api->picture_free(img_in[i]); + } + return EXIT_FAILURE; + } + inputCounter = (inputCounter + 1) % 16; + + + if (chunks_out == NULL && img_in == NULL) { + // We are done since there is no more input and output left. + goto cleanup; + } + + if (chunks_out != NULL) { + uint64_t written = 0; + + // Write data into the output file. + for (kvz_data_chunk *chunk = chunks_out; chunk != NULL; chunk = chunk->next) { + written += chunk->len; + } + + fprintf(stderr, "write chunk size: %lu\n", written); + fwrite(&written, sizeof(uint64_t), 1, outputFile); + for (kvz_data_chunk *chunk = chunks_out; chunk != NULL; chunk = chunk->next) { + fwrite(chunk->data, chunk->len, 1, outputFile); + } + + outputCounter = (outputCounter + 1) % 16; + + /* if (++r > 5) */ + /* goto cleanup; */ + } + } + +cleanup: + fclose(inputFile); + fclose(outputFile); + + return 0; +} diff --git a/uvgrtp/latency_receiver.cc b/uvgrtp/latency_receiver.cc new file mode 100644 index 0000000..90fad41 --- /dev/null +++ b/uvgrtp/latency_receiver.cc @@ -0,0 +1,41 @@ +#include +#include +#include +#include + +size_t nframes = 0; + +void hook_receiver(void *arg, uvg_rtp::frame::rtp_frame *frame) +{ + auto hevc = (uvg_rtp::media_stream *)arg; + hevc->push_frame(frame->payload, frame->payload_len, 0); + nframes++; +} + +int receiver(void) +{ + uvg_rtp::context rtp_ctx; + std::string addr("10.21.25.200"); + + auto sess = rtp_ctx.create_session(addr); + auto hevc = sess->create_stream( + 8889, + 8888, + RTP_FORMAT_HEVC, + RCE_SYSTEM_CALL_DISPATCHER + ); + + hevc->install_receive_hook(hevc, hook_receiver); + + while (nframes != 602) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + return 0; +} + +int main(int argc, char **argv) +{ + (void)argc, (void)argv; + + return receiver(); +} diff --git a/uvgrtp/latency_sender.cc b/uvgrtp/latency_sender.cc new file mode 100644 index 0000000..a6ab014 --- /dev/null +++ b/uvgrtp/latency_sender.cc @@ -0,0 +1,110 @@ +#include +#include +#include +#include + +extern void *get_mem(int argc, char **argv, size_t& len); + +std::chrono::high_resolution_clock::time_point start2; + +size_t frames = 0; +size_t ninters = 0; +size_t nintras = 0; + +size_t total = 0; +size_t total_intra = 0; +size_t total_inter = 0; + +static void hook_sender(void *arg, uvg_rtp::frame::rtp_frame *frame) +{ + (void)arg, (void)frame; + + if (frame) { + + uint64_t diff = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start2 + ).count(); + + switch ((frame->payload[0] >> 1) & 0x3f) { + case 19: + total += (diff / 1000); + total_intra += (diff / 1000); + nintras++, frames++; + break; + + case 1: + total += (diff / 1000); + total_inter += (diff / 1000); + ninters++, frames++; + break; + } + } +} + +static int sender(void) +{ + size_t len = 0; + void *mem = get_mem(0, NULL, len); + uint64_t csize = 0; + uint64_t diff = 0; + uint64_t current = 0; + uint64_t chunk_size = 0; + uint64_t period = (uint64_t)((1000 / (float)30) * 1000); + rtp_error_t ret = RTP_OK; + std::string addr("10.21.25.2"); + + uvg_rtp::context rtp_ctx; + + auto sess = rtp_ctx.create_session(addr); + auto hevc = sess->create_stream( + 8888, + 8889, + RTP_FORMAT_HEVC, + RCE_SYSTEM_CALL_DISPATCHER + ); + + hevc->install_receive_hook(nullptr, hook_sender); + + std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); + + for (int rounds = 0; rounds < 1; ++rounds) { + for (size_t offset = 0, k = 0; offset < len; ++k) { + memcpy(&chunk_size, (uint8_t *)mem + offset, sizeof(uint64_t)); + + offset += sizeof(uint64_t); + + start2 = std::chrono::high_resolution_clock::now(); + if ((ret = hevc->push_frame((uint8_t *)mem + offset, chunk_size, 0)) != RTP_OK) { + fprintf(stderr, "push_frame() failed!\n"); + for (;;); + } + + auto runtime = (uint64_t)std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count(); + + if (runtime < current * period) + std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime)); + + current += 1; + offset += chunk_size; + } + } + rtp_ctx.destroy_session(sess); + + fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n", + frames, + total_intra / (float)nintras, + total_inter / (float)ninters, + total / (float)frames + ); + + return 0; +} + +int main(int argc, char **argv) +{ + (void)argc, (void)argv; + + return sender(); +} diff --git a/uvgrtp/receiver.cc b/uvgrtp/receiver.cc new file mode 100644 index 0000000..736e3e0 --- /dev/null +++ b/uvgrtp/receiver.cc @@ -0,0 +1,89 @@ +#include +#include +#include +#include + +struct thread_info { + size_t pkts; + size_t bytes; + std::chrono::high_resolution_clock::time_point start; + std::chrono::high_resolution_clock::time_point last; +} *thread_info; + +std::atomic nready(0); + +void hook(void *arg, uvg_rtp::frame::rtp_frame *frame) +{ + int tid = *(int *)arg; + + if (thread_info[tid].pkts == 0) + thread_info[tid].start = std::chrono::high_resolution_clock::now(); + + /* receiver returns NULL to indicate that it has not received a frame in 10s + * and the sender has likely stopped sending frames long time ago so the benchmark + * can proceed to next run and ma*/ + if (!frame) { + fprintf(stderr, "discard %zu %zu %lu\n", thread_info[tid].bytes, thread_info[tid].pkts, + std::chrono::duration_cast( + thread_info[tid].last - thread_info[tid].start + ).count() + ); + nready++; + while (1) + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + + thread_info[tid].last = std::chrono::high_resolution_clock::now(); + thread_info[tid].bytes += frame->payload_len; + + (void)uvg_rtp::frame::dealloc_frame(frame); + + if (++thread_info[tid].pkts == 602) { + fprintf(stderr, "%zu %zu %lu\n", thread_info[tid].bytes, thread_info[tid].pkts, + std::chrono::duration_cast( + thread_info[tid].last - thread_info[tid].start + ).count() + ); + nready++; + } +} + +void thread_func(char *addr, int thread_num) +{ + std::string addr_("10.21.25.200"); + uvg_rtp::context rtp_ctx; + + auto sess = rtp_ctx.create_session(addr_); + auto hevc = sess->create_stream( + 8888 + thread_num, + 8889 + thread_num, + RTP_FORMAT_HEVC, + 0 + ); + + int tid = thread_num / 2; + + hevc->install_receive_hook(&tid, hook); + + for (;;) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + rtp_ctx.destroy_session(sess); +} + +int main(int argc, char **argv) +{ + if (argc != 3) { + fprintf(stderr, "usage: ./%s \n", __FILE__); + return -1; + } + + int nthreads = atoi(argv[2]); + thread_info = (struct thread_info *)calloc(nthreads, sizeof(*thread_info)); + + for (int i = 0; i < nthreads; ++i) + new std::thread(thread_func, argv[1], i * 2); + + while (nready.load() != nthreads) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); +} diff --git a/uvgrtp/sender.cc b/uvgrtp/sender.cc new file mode 100644 index 0000000..ead9714 --- /dev/null +++ b/uvgrtp/sender.cc @@ -0,0 +1,97 @@ +#include +#include +#include +#include + +#define MAX(a, b) (((int)(a) > (int)(b)) ? (int)(a) : (int)(b)) + +extern void *get_mem(int argc, char **argv, size_t& len); + +std::atomic nready(0); + +void thread_func(void *mem, size_t len, char *addr, int thread_num, double fps, bool strict) +{ + size_t bytes_sent = 0; + uint64_t chunk_size = 0; + uint64_t total_size = 0; + uint64_t diff = 0; + uint64_t current = 0; + uint64_t period = (uint64_t)((1000 / (float)fps) * 1000); + rtp_error_t ret = RTP_OK; + std::string addr_("10.21.25.2"); + + uvg_rtp::context rtp_ctx; + + auto sess = rtp_ctx.create_session(addr_); + auto hevc = sess->create_stream( + 8889 + thread_num, + 8888 + thread_num, + RTP_FORMAT_HEVC, + RCE_SYSTEM_CALL_DISPATCHER + ); + + std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); + + for (int rounds = 0; rounds < 1; ++rounds) { + for (size_t offset = 0, k = 0; offset < len; ++k) { + memcpy(&chunk_size, (uint8_t *)mem + offset, sizeof(uint64_t)); + + offset += sizeof(uint64_t); + total_size += chunk_size; + + if ((ret = hevc->push_frame((uint8_t *)mem + offset, chunk_size, 0)) != RTP_OK) { + fprintf(stderr, "push_frame() failed!\n"); + for (;;); + } + + auto runtime = (uint64_t)std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start + ).count(); + + if (runtime < current * period) + std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime)); + + current += 1; + offset += chunk_size; + bytes_sent += chunk_size; + } + } + rtp_ctx.destroy_session(sess); + + auto end = std::chrono::high_resolution_clock::now(); + diff = std::chrono::duration_cast(end - start).count(); + + fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n", + bytes_sent, bytes_sent / 1000, bytes_sent / 1000 / 1000, + diff, diff / 1000 + ); + +end: + nready++; +} + +int main(int argc, char **argv) +{ + if (argc != 5) { + fprintf(stderr, "usage: ./%s \n", __FILE__); + return -1; + } + + size_t len = 0; + void *mem = get_mem(0, NULL, len); + int nthreads = atoi(argv[2]); + bool strict = !strcmp(argv[4], "strict"); + std::thread **threads = (std::thread **)malloc(sizeof(std::thread *) * nthreads); + + for (int i = 0; i < nthreads; ++i) + threads[i] = new std::thread(thread_func, mem, len, argv[1], i * 2, atof(argv[3]), strict); + + while (nready.load() != nthreads) + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + for (int i = 0; i < nthreads; ++i) { + threads[i]->join(); + delete threads[i]; + } + free(threads); +}