From 9dc717c7011b2e1a31512d54569b379518bd5cae Mon Sep 17 00:00:00 2001 From: Axel Burri Date: Sun, 28 Jul 2019 15:04:23 +0200 Subject: [PATCH] btrbk: use mbuffer instead of pv; add stream_buffer_sink framework Add run_cmd option stream_buffer_sink, which handles stream_buffer, rate_limit as well as --progress. For rate limiting, run "mbuffer" (on the target host) in combination with stream_buffer and --progress, instead of running "pv" (on the source host). Reasons: - mbuffer limits the read rate: For remote targets, we want a stream buffer in front of the rsh command pipe, before decompression. - For local targets, this can be combined with --process. - Combined stream_buffer and rate_limit: less commands in pipe. Further changes: - always set mbuffer -v1 option (never show warnings) - restrict raw_target_block_size to "kmgKMG": compatibility to stream_compress and rate_limit options, simplicity. - use mbuffer blocksize option where applicable --- btrbk | 97 ++++++++++++++++++++++++++++------------------------------- 1 file changed, 46 insertions(+), 51 deletions(-) diff --git a/btrbk b/btrbk index edd2310..7cd6278 100755 --- a/btrbk +++ b/btrbk @@ -94,8 +94,8 @@ my %config_options = ( ssh_user => { default => "root", accept_regexp => qr/^[a-z_][a-z0-9_-]*$/ }, ssh_compression => { default => undef, accept => [ "yes", "no" ] }, ssh_cipher_spec => { default => "default", accept_regexp => qr/^$ssh_cipher_match(,$ssh_cipher_match)*$/ }, - rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/, require_bin => 'pv' }, - stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on target + rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/ }, # NOTE: requires 'mbuffer' command on target + stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on target transaction_log => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] }, transaction_syslog => { default => undef, accept => [ "no", @syslog_facilities ], context => [ "root" ] }, lockfile => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] }, @@ -108,7 +108,7 @@ my %config_options = ( raw_target_compress_level => { default => "default", accept => [ "default" ], accept_numeric => 1 }, raw_target_compress_threads => { default => "default", accept => [ "default" ], accept_numeric => 1 }, raw_target_encrypt => { default => undef, accept => [ "no", "gpg", "openssl_enc" ] }, - raw_target_block_size => { default => "128K", accept_regexp => qr/^[0-9]+(kB|k|K|KiB|MB|M|MiB)?$/ }, + raw_target_block_size => { default => "128K", accept_regexp => qr/^[0-9]+[kmgKMG]?$/ }, raw_target_split => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+([kmgtpezyKMGTPEZY][bB]?)?$/ }, gpg_keyring => { default => undef, accept_file => { absolute => 1 } }, gpg_recipient => { default => undef, accept_regexp => qr/^[0-9a-zA-Z_@\+\-\.]+$/ }, @@ -557,10 +557,24 @@ sub check_exe($) return 0; } -sub rate_limit_cmd($) +sub stream_buffer_cmd_text(@) { - my $rate = shift; - return "pv -q -L " . lc($rate); + my %opts = @_; + my $rate = $opts{rate_limit}; + my $bufsize = $opts{stream_buffer}; + my $blocksize = $opts{blocksize}; + my $progress = $opts{show_progress}; + + return undef unless($rate || $bufsize || $progress); + + # NOTE: mbuffer takes defaults from /etc/mbuffer.rc + my @cmd = ( "mbuffer" ); + push @cmd, ( "-v1" ); # disable warnings (they arrive asynchronously and cant be cought) + push @cmd, "-q" unless($progress); + push @cmd, ( "-s", $blocksize ) if($blocksize); # defaults to 10k + push @cmd, ( "-m", lc($bufsize) ) if($bufsize); # b,k,M,G,% (default: 2%) + push @cmd, ( "-r", lc($rate) ) if($rate); # maximum read rate: b,k,M,G + return join(' ', @cmd); } sub compress_cmd_text($;$) @@ -600,13 +614,6 @@ sub decompress_cmd_text($) return compress_cmd_text($_[0], 1); } -sub stream_buffer_cmd($) -{ - my $bufsize = shift; - return "mbuffer -q -m " . lc($bufsize); -} - - sub _assemble_cmd($;$) { my $cmd_pipe = shift; @@ -621,7 +628,7 @@ sub _assemble_cmd($;$) } # cmd result is something like this: - # { btrfs send 2>&3 | pv | btrfs receive 2>&3 ; } 3>&1 + # { btrfs send 2>&3 | mbuffer | btrfs receive 2>&3 ; } 3>&1 my $pipe = ""; $cmd = "{ " if($catch_stderr); foreach (@$cmd_pipe) { @@ -720,29 +727,27 @@ sub run_cmd(@) } } - if($href->{rsh_rate_limit_in}) { - push @cmd_pipe, { cmd_text => rate_limit_cmd($href->{rsh_rate_limit_in}) }; - } - - if($href->{stream_buffer}) { - unshift @rsh_cmd_pipe, { cmd_text => stream_buffer_cmd($href->{stream_buffer}) }; - } - if($compressed && (not ($href->{compressed_ok}))) { unshift @rsh_cmd_pipe, { cmd_text => decompress_cmd_text($compressed) }; $compressed = undef; } + if($href->{stream_buffer_sink}) { + if(my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}})) { + unshift @rsh_cmd_pipe, { cmd_text => $cmd_text }; # in front of decompress + } + if($show_progress) { + # add an additional stream buffer (with default options) to cmd_pipe for show_progress + push @cmd_pipe, { cmd_text => stream_buffer_cmd_text( show_progress => 1 ) }; + } + } + if($href->{rsh_compress_out}) { die if($href->{redirect_to_file}); push @rsh_cmd_pipe, { cmd_text => compress_cmd_text($href->{rsh_compress_out}) }; $compressed = $href->{rsh_compress_out}; } - if($href->{rsh_rate_limit_out}) { - push @rsh_cmd_pipe, { cmd_text => rate_limit_cmd($href->{rsh_rate_limit_out}) }; - } - if((scalar(@rsh_cmd_pipe) == 1) && ($rsh_cmd_pipe[0]->{redirect_to_file})) { # NOTE: direct redirection in ssh command does not work: "ssh '> outfile'" # we need to assemble: "ssh 'cat > outfile'" @@ -754,15 +759,15 @@ sub run_cmd(@) $href->{cmd_text} = $rsh_text . " '" . _assemble_cmd(\@rsh_cmd_pipe) . "'"; } else { + if($href->{stream_buffer_sink}) { + my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}}, show_progress => $show_progress); + push @cmd_pipe, { cmd_text => $cmd_text } if($cmd_text); + } + if($compressed && (not ($href->{compressed_ok}))) { push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) }; $compressed = undef; } - - if($href->{stream_buffer}) { - push @cmd_pipe, { cmd_text => stream_buffer_cmd($href->{stream_buffer}) }; - } - } push @cmd_pipe, $href; } @@ -824,15 +829,6 @@ sub run_cmd(@) } -sub add_progress_command($) -{ - my $cmd_pipe = shift || die; - if($show_progress) { - push @$cmd_pipe, { cmd => [ 'pv', '-trab' ], compressed_ok => 1 }; - } -} - - sub btrfs_filesystem_show($) { my $vol = shift || die; @@ -1444,19 +1440,17 @@ sub btrfs_send_receive($$;$$$) push @cmd_pipe, { cmd => vinfo_cmd($snapshot, "btrfs send", @send_options, { unsafe => $snapshot_path } ), rsh => vinfo_rsh($snapshot, disable_compression => config_compress_hash($snapshot, "stream_compress")), - rsh_compress_out => config_compress_hash($snapshot, "stream_compress"), - rsh_rate_limit_out => config_key($snapshot, "rate_limit"), name => "btrfs send", + rsh_compress_out => config_compress_hash($snapshot, "stream_compress"), catch_stderr => 1, # hack for shell-based run_cmd() }; - add_progress_command(\@cmd_pipe); push @cmd_pipe, { cmd => vinfo_cmd($target, "btrfs receive", @receive_options, { unsafe => $target_path . '/' } ), rsh => vinfo_rsh($target, disable_compression => config_compress_hash($target, "stream_compress")), name => "btrfs receive", rsh_compress_in => config_compress_hash($target, "stream_compress"), - rsh_rate_limit_in => config_key($target, "rate_limit"), - stream_buffer => config_key($target, "stream_buffer"), + stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"), + rate_limit => config_key($target, "rate_limit") }, catch_stderr => 1, # hack for shell-based run_cmd() filter_stderr => sub { $err = $_; $_ = undef } }; @@ -1596,9 +1590,7 @@ sub btrfs_send_to_file($$$;$$) rsh => vinfo_rsh($source, disable_compression => $compress || config_compress_hash($source, "stream_compress")), name => "btrfs send", rsh_compress_out => $compress || config_compress_hash($source, "stream_compress"), - rsh_rate_limit_out => config_key($source, "rate_limit"), }; - add_progress_command(\@cmd_pipe); if($compress) { $raw_info{compress} = $compression{$compress->{key}}->{format} if($compress); $target_filename .= '.' . $compression{$compress->{key}}->{format}; @@ -1732,7 +1724,8 @@ sub btrfs_send_to_file($$$;$$) check_unsafe => [ { unsafe => "${target_path}/${target_filename}.split_" } ], rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")), rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"), - rsh_rate_limit_in => config_key($target, "rate_limit"), + stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"), + rate_limit => config_key($target, "rate_limit") }, compressed_ok => ($compress ? 1 : 0), } } @@ -1752,7 +1745,9 @@ sub btrfs_send_to_file($$$;$$) #redirect_to_file => { unsafe => "${target_path}/${target_filename}" }, # alternative (use shell redirection), less overhead on local filesystems (barely measurable): rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")), rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"), - rsh_rate_limit_in => config_key($target, "rate_limit"), + stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"), + rate_limit => config_key($target, "rate_limit"), + blocksize => config_key($target, "raw_target_block_size") }, compressed_ok => ($compress ? 1 : 0), }; } @@ -4877,8 +4872,8 @@ MAIN: } # check command line options - if($show_progress && (not check_exe('pv'))) { - WARN 'Found option "--progress", but required executable "pv" does not exist on your system. Please install "pv".'; + if($show_progress && (not check_exe('mbuffer'))) { + WARN 'Found option "--progress", but required executable "mbuffer" does not exist on your system. Please install "mbuffer".'; $show_progress = 0; } my ($action_run, $action_usage, $action_resolve, $action_diff, $action_origin, $action_config_print, $action_list, $action_clean, $action_archive);