btrbk: use mbuffer instead of pv; add stream_buffer_sink framework

Add run_cmd option stream_buffer_sink, which handles stream_buffer,
rate_limit as well as --progress.

For rate limiting, run "mbuffer" (on the target host) in combination
with stream_buffer and --progress, instead of running "pv" (on the
source host).

Reasons:

 - mbuffer limits the read rate: For remote targets, we want a stream
   buffer in front of the rsh command pipe, before decompression.
 - For local targets, this can be combined with --process.
 - Combined stream_buffer and rate_limit: less commands in pipe.

Further changes:

 - always set mbuffer -v1 option (never show warnings)
 - restrict raw_target_block_size to "kmgKMG": compatibility to
   stream_compress and rate_limit options, simplicity.
 - use mbuffer blocksize option where applicable
pull/293/head
Axel Burri 2019-07-28 15:04:23 +02:00
parent c7a8d0bb11
commit 9dc717c701
1 changed files with 46 additions and 51 deletions

97
btrbk
View File

@ -94,8 +94,8 @@ my %config_options = (
ssh_user => { default => "root", accept_regexp => qr/^[a-z_][a-z0-9_-]*$/ },
ssh_compression => { default => undef, accept => [ "yes", "no" ] },
ssh_cipher_spec => { default => "default", accept_regexp => qr/^$ssh_cipher_match(,$ssh_cipher_match)*$/ },
rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/, require_bin => 'pv' },
stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on target
rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/ }, # NOTE: requires 'mbuffer' command on target
stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on target
transaction_log => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] },
transaction_syslog => { default => undef, accept => [ "no", @syslog_facilities ], context => [ "root" ] },
lockfile => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] },
@ -108,7 +108,7 @@ my %config_options = (
raw_target_compress_level => { default => "default", accept => [ "default" ], accept_numeric => 1 },
raw_target_compress_threads => { default => "default", accept => [ "default" ], accept_numeric => 1 },
raw_target_encrypt => { default => undef, accept => [ "no", "gpg", "openssl_enc" ] },
raw_target_block_size => { default => "128K", accept_regexp => qr/^[0-9]+(kB|k|K|KiB|MB|M|MiB)?$/ },
raw_target_block_size => { default => "128K", accept_regexp => qr/^[0-9]+[kmgKMG]?$/ },
raw_target_split => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+([kmgtpezyKMGTPEZY][bB]?)?$/ },
gpg_keyring => { default => undef, accept_file => { absolute => 1 } },
gpg_recipient => { default => undef, accept_regexp => qr/^[0-9a-zA-Z_@\+\-\.]+$/ },
@ -557,10 +557,24 @@ sub check_exe($)
return 0;
}
sub rate_limit_cmd($)
sub stream_buffer_cmd_text(@)
{
my $rate = shift;
return "pv -q -L " . lc($rate);
my %opts = @_;
my $rate = $opts{rate_limit};
my $bufsize = $opts{stream_buffer};
my $blocksize = $opts{blocksize};
my $progress = $opts{show_progress};
return undef unless($rate || $bufsize || $progress);
# NOTE: mbuffer takes defaults from /etc/mbuffer.rc
my @cmd = ( "mbuffer" );
push @cmd, ( "-v1" ); # disable warnings (they arrive asynchronously and cant be cought)
push @cmd, "-q" unless($progress);
push @cmd, ( "-s", $blocksize ) if($blocksize); # defaults to 10k
push @cmd, ( "-m", lc($bufsize) ) if($bufsize); # b,k,M,G,% (default: 2%)
push @cmd, ( "-r", lc($rate) ) if($rate); # maximum read rate: b,k,M,G
return join(' ', @cmd);
}
sub compress_cmd_text($;$)
@ -600,13 +614,6 @@ sub decompress_cmd_text($)
return compress_cmd_text($_[0], 1);
}
sub stream_buffer_cmd($)
{
my $bufsize = shift;
return "mbuffer -q -m " . lc($bufsize);
}
sub _assemble_cmd($;$)
{
my $cmd_pipe = shift;
@ -621,7 +628,7 @@ sub _assemble_cmd($;$)
}
# cmd result is something like this:
# { btrfs send <src> 2>&3 | pv | btrfs receive <dst> 2>&3 ; } 3>&1
# { btrfs send <src> 2>&3 | mbuffer | btrfs receive <dst> 2>&3 ; } 3>&1
my $pipe = "";
$cmd = "{ " if($catch_stderr);
foreach (@$cmd_pipe) {
@ -720,29 +727,27 @@ sub run_cmd(@)
}
}
if($href->{rsh_rate_limit_in}) {
push @cmd_pipe, { cmd_text => rate_limit_cmd($href->{rsh_rate_limit_in}) };
}
if($href->{stream_buffer}) {
unshift @rsh_cmd_pipe, { cmd_text => stream_buffer_cmd($href->{stream_buffer}) };
}
if($compressed && (not ($href->{compressed_ok}))) {
unshift @rsh_cmd_pipe, { cmd_text => decompress_cmd_text($compressed) };
$compressed = undef;
}
if($href->{stream_buffer_sink}) {
if(my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}})) {
unshift @rsh_cmd_pipe, { cmd_text => $cmd_text }; # in front of decompress
}
if($show_progress) {
# add an additional stream buffer (with default options) to cmd_pipe for show_progress
push @cmd_pipe, { cmd_text => stream_buffer_cmd_text( show_progress => 1 ) };
}
}
if($href->{rsh_compress_out}) {
die if($href->{redirect_to_file});
push @rsh_cmd_pipe, { cmd_text => compress_cmd_text($href->{rsh_compress_out}) };
$compressed = $href->{rsh_compress_out};
}
if($href->{rsh_rate_limit_out}) {
push @rsh_cmd_pipe, { cmd_text => rate_limit_cmd($href->{rsh_rate_limit_out}) };
}
if((scalar(@rsh_cmd_pipe) == 1) && ($rsh_cmd_pipe[0]->{redirect_to_file})) {
# NOTE: direct redirection in ssh command does not work: "ssh '> outfile'"
# we need to assemble: "ssh 'cat > outfile'"
@ -754,15 +759,15 @@ sub run_cmd(@)
$href->{cmd_text} = $rsh_text . " '" . _assemble_cmd(\@rsh_cmd_pipe) . "'";
}
else {
if($href->{stream_buffer_sink}) {
my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}}, show_progress => $show_progress);
push @cmd_pipe, { cmd_text => $cmd_text } if($cmd_text);
}
if($compressed && (not ($href->{compressed_ok}))) {
push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) };
$compressed = undef;
}
if($href->{stream_buffer}) {
push @cmd_pipe, { cmd_text => stream_buffer_cmd($href->{stream_buffer}) };
}
}
push @cmd_pipe, $href;
}
@ -824,15 +829,6 @@ sub run_cmd(@)
}
sub add_progress_command($)
{
my $cmd_pipe = shift || die;
if($show_progress) {
push @$cmd_pipe, { cmd => [ 'pv', '-trab' ], compressed_ok => 1 };
}
}
sub btrfs_filesystem_show($)
{
my $vol = shift || die;
@ -1444,19 +1440,17 @@ sub btrfs_send_receive($$;$$$)
push @cmd_pipe, {
cmd => vinfo_cmd($snapshot, "btrfs send", @send_options, { unsafe => $snapshot_path } ),
rsh => vinfo_rsh($snapshot, disable_compression => config_compress_hash($snapshot, "stream_compress")),
rsh_compress_out => config_compress_hash($snapshot, "stream_compress"),
rsh_rate_limit_out => config_key($snapshot, "rate_limit"),
name => "btrfs send",
rsh_compress_out => config_compress_hash($snapshot, "stream_compress"),
catch_stderr => 1, # hack for shell-based run_cmd()
};
add_progress_command(\@cmd_pipe);
push @cmd_pipe, {
cmd => vinfo_cmd($target, "btrfs receive", @receive_options, { unsafe => $target_path . '/' } ),
rsh => vinfo_rsh($target, disable_compression => config_compress_hash($target, "stream_compress")),
name => "btrfs receive",
rsh_compress_in => config_compress_hash($target, "stream_compress"),
rsh_rate_limit_in => config_key($target, "rate_limit"),
stream_buffer => config_key($target, "stream_buffer"),
stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"),
rate_limit => config_key($target, "rate_limit") },
catch_stderr => 1, # hack for shell-based run_cmd()
filter_stderr => sub { $err = $_; $_ = undef }
};
@ -1596,9 +1590,7 @@ sub btrfs_send_to_file($$$;$$)
rsh => vinfo_rsh($source, disable_compression => $compress || config_compress_hash($source, "stream_compress")),
name => "btrfs send",
rsh_compress_out => $compress || config_compress_hash($source, "stream_compress"),
rsh_rate_limit_out => config_key($source, "rate_limit"),
};
add_progress_command(\@cmd_pipe);
if($compress) {
$raw_info{compress} = $compression{$compress->{key}}->{format} if($compress);
$target_filename .= '.' . $compression{$compress->{key}}->{format};
@ -1732,7 +1724,8 @@ sub btrfs_send_to_file($$$;$$)
check_unsafe => [ { unsafe => "${target_path}/${target_filename}.split_" } ],
rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")),
rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"),
rsh_rate_limit_in => config_key($target, "rate_limit"),
stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"),
rate_limit => config_key($target, "rate_limit") },
compressed_ok => ($compress ? 1 : 0),
}
}
@ -1752,7 +1745,9 @@ sub btrfs_send_to_file($$$;$$)
#redirect_to_file => { unsafe => "${target_path}/${target_filename}" }, # alternative (use shell redirection), less overhead on local filesystems (barely measurable):
rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")),
rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"),
rsh_rate_limit_in => config_key($target, "rate_limit"),
stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"),
rate_limit => config_key($target, "rate_limit"),
blocksize => config_key($target, "raw_target_block_size") },
compressed_ok => ($compress ? 1 : 0),
};
}
@ -4877,8 +4872,8 @@ MAIN:
}
# check command line options
if($show_progress && (not check_exe('pv'))) {
WARN 'Found option "--progress", but required executable "pv" does not exist on your system. Please install "pv".';
if($show_progress && (not check_exe('mbuffer'))) {
WARN 'Found option "--progress", but required executable "mbuffer" does not exist on your system. Please install "mbuffer".';
$show_progress = 0;
}
my ($action_run, $action_usage, $action_resolve, $action_diff, $action_origin, $action_config_print, $action_list, $action_clean, $action_archive);