btrbk: add stream_buffer_remote, rate_limit_remote; run stream_buffer only on local host

Change sematics of stream_buffer: If set, run on local host only. If a
remote stream buffer is required, use stream_buffer_remote /
rate_limit_remote.
pull/293/head
Axel Burri 2019-07-29 21:59:03 +02:00
parent 73108d2309
commit 0435f32619
1 changed files with 156 additions and 108 deletions

264
btrbk
View File

@ -94,12 +94,14 @@ my %config_options = (
ssh_user => { default => "root", accept_regexp => qr/^[a-z_][a-z0-9_-]*$/ },
ssh_compression => { default => undef, accept => [ "yes", "no" ] },
ssh_cipher_spec => { default => "default", accept_regexp => qr/^$ssh_cipher_match(,$ssh_cipher_match)*$/ },
rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/ }, # NOTE: requires 'mbuffer' command on target
stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on target
transaction_log => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] },
transaction_syslog => { default => undef, accept => [ "no", @syslog_facilities ], context => [ "root" ] },
lockfile => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] },
rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/, require_bin => 'mbuffer' },
rate_limit_remote => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/ }, # NOTE: requires 'mbuffer' command on remote hosts
stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/, require_bin => 'mbuffer' },
stream_buffer_remote => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on remote hosts
stream_compress => { default => undef, accept => [ "no", (keys %compression) ] },
stream_compress_level => { default => "default", accept => [ "default" ], accept_numeric => 1 },
stream_compress_threads => { default => "default", accept => [ "default" ], accept_numeric => 1 },
@ -564,31 +566,33 @@ sub check_exe($)
return 0;
}
sub stream_buffer_cmd_text(@)
sub stream_buffer_cmd_text($)
{
my %opts = @_;
my $rate = $opts{rate_limit};
my $bufsize = $opts{stream_buffer};
my $blocksize = $opts{blocksize};
my $progress = $opts{show_progress};
my $opts = shift;
my $rl_in = $opts->{rate_limit_in} // $opts->{rate_limit}; # maximum read rate: b,k,M,G
my $rl_out = $opts->{rate_limit_out}; # maximum write rate: b,k,M,G
my $bufsize = $opts->{stream_buffer}; # b,k,M,G,% (default: 2%)
my $blocksize = $opts->{blocksize}; # defaults to 10k
my $progress = $opts->{show_progress};
return undef unless($rate || $bufsize || $progress);
# return empty array if mbuffer is not needed
return () unless($rl_in || $rl_out || $bufsize || $progress);
# NOTE: mbuffer takes defaults from /etc/mbuffer.rc
my @cmd = ( "mbuffer" );
push @cmd, ( "-v1" ); # disable warnings (they arrive asynchronously and cant be cought)
push @cmd, ( "-v", "1" ); # disable warnings (they arrive asynchronously and cant be cought)
push @cmd, "-q" unless($progress);
push @cmd, ( "-s", $blocksize ) if($blocksize); # defaults to 10k
push @cmd, ( "-m", lc($bufsize) ) if($bufsize); # b,k,M,G,% (default: 2%)
push @cmd, ( "-r", lc($rate) ) if($rate); # maximum read rate: b,k,M,G
return join(' ', @cmd);
push @cmd, ( "-s", $blocksize ) if($blocksize);
push @cmd, ( "-m", lc($bufsize) ) if($bufsize);
push @cmd, ( "-r", lc($rl_in) ) if($rl_in);
push @cmd, ( "-R", lc($rl_out) ) if($rl_out);
return { cmd_text => join(' ', @cmd) };
}
sub compress_cmd_text($;$)
{
my $def = shift;
my $def = shift // die;
my $decompress = shift;
return undef unless(defined($def));
my $cc = $compression{$def->{key}};
my @cmd = $decompress ? @{$cc->{decompress_cmd}} : @{$cc->{compress_cmd}};
@ -613,7 +617,7 @@ sub compress_cmd_text($;$)
WARN_ONCE "Threading is not supported for '$cc->{name}', ignoring";
}
}
return join(' ', @cmd);
return { cmd_text => join(' ', @cmd) };
}
sub decompress_cmd_text($)
@ -687,6 +691,10 @@ sub run_cmd(@)
my @cmd_pipe;
my @unsafe_cmd;
my $compressed = undef;
my $stream_options = $cmd_pipe_in[0]->{stream_options} // {};
$cmd_pipe_in[0]->{stream_source} = 1;
$cmd_pipe_in[-1]->{stream_sink} = 1;
foreach my $href (@cmd_pipe_in)
{
@ -700,84 +708,92 @@ sub run_cmd(@)
_safe_cmd($href->{check_unsafe}, \@unsafe_cmd);
}
if($href->{compress}) {
if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{compress}->{key}}->{format})) {
push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) };
if($href->{redirect_to_file}) {
die unless($href->{stream_sink});
$href->{cmd_text} = _safe_cmd([ '>', $href->{redirect_to_file} ], \@unsafe_cmd);
}
elsif($href->{compress_stdin}) {
# does nothing if already compressed correctly by stream_compress
if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{compress_stdin}->{key}}->{format})) {
# re-compress with different algorithm
push @cmd_pipe, decompress_cmd_text($compressed);
$compressed = undef;
}
unless($compressed) {
push @cmd_pipe, { cmd_text => compress_cmd_text($href->{compress}) };
$compressed = $href->{compress};
push @cmd_pipe, compress_cmd_text($href->{compress_stdin});
$compressed = $href->{compress_stdin};
}
next;
}
elsif($href->{cmd}) {
$href->{cmd_text} = _safe_cmd($href->{cmd}, \@unsafe_cmd);
}
return undef unless(defined($href->{cmd_text}));
my @rsh_compress_in;
my @rsh_compress_out;
my @decompress_in;
# input stream compression: local, in front of rsh_cmd_pipe
if($href->{rsh} && $stream_options->{stream_compress} && (not $href->{stream_source})) {
if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$stream_options->{stream_compress}->{key}}->{format})) {
# re-compress with different algorithm, should be avoided!
push @rsh_compress_in, decompress_cmd_text($compressed);
$compressed = undef;
}
if(not $compressed) {
$compressed = $stream_options->{stream_compress};
push @rsh_compress_in, compress_cmd_text($compressed);
}
}
else {
if($href->{redirect_to_file}) {
$href->{cmd_text} = _safe_cmd([ '>', $href->{redirect_to_file} ], \@unsafe_cmd);
}
elsif($href->{cmd}) {
$href->{cmd_text} = _safe_cmd($href->{cmd}, \@unsafe_cmd);
}
return undef unless(defined($href->{cmd_text}));
if($href->{rsh}) {
my @rsh_cmd_pipe = ( $href );
if($href->{rsh_compress_in}) {
if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{rsh_compress_in}->{key}}->{format}))
{
push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) };
$compressed = undef;
}
unless($compressed) {
push @cmd_pipe, { cmd_text => compress_cmd_text($href->{rsh_compress_in}) };
$compressed = $href->{rsh_compress_in};
}
}
if($compressed && (not ($href->{compressed_ok}))) {
unshift @rsh_cmd_pipe, { cmd_text => decompress_cmd_text($compressed) };
$compressed = undef;
}
if($href->{stream_buffer_sink}) {
if(my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}})) {
unshift @rsh_cmd_pipe, { cmd_text => $cmd_text }; # in front of decompress
}
if($show_progress) {
# add an additional stream buffer (with default options) to cmd_pipe for show_progress
push @cmd_pipe, { cmd_text => stream_buffer_cmd_text( show_progress => 1 ) };
}
}
if($href->{rsh_compress_out}) {
die if($href->{redirect_to_file});
push @rsh_cmd_pipe, { cmd_text => compress_cmd_text($href->{rsh_compress_out}) };
$compressed = $href->{rsh_compress_out};
}
if((scalar(@rsh_cmd_pipe) == 1) && ($rsh_cmd_pipe[0]->{redirect_to_file})) {
# NOTE: direct redirection in ssh command does not work: "ssh '> outfile'"
# we need to assemble: "ssh 'cat > outfile'"
unshift @rsh_cmd_pipe, { cmd_text => 'cat' };
}
my $rsh_text = _safe_cmd($href->{rsh}, \@unsafe_cmd);
return undef unless(defined($rsh_text));
$href->{cmd_text} = $rsh_text . " '" . _assemble_cmd(\@rsh_cmd_pipe) . "'";
}
else {
if($href->{stream_buffer_sink}) {
my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}}, show_progress => $show_progress);
push @cmd_pipe, { cmd_text => $cmd_text } if($cmd_text);
}
if($compressed && (not ($href->{compressed_ok}))) {
push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) };
$compressed = undef;
}
}
push @cmd_pipe, $href;
if($compressed && (not ($href->{compressed_ok}))) {
push @decompress_in, decompress_cmd_text($compressed);
$compressed = undef;
}
# output stream compression: remote, at end of rsh_cmd_pipe
if($href->{rsh} && $stream_options->{stream_compress} && (not $href->{stream_sink}) && (not $compressed)) {
$compressed = $stream_options->{stream_compress};
push @rsh_compress_out, compress_cmd_text($compressed);
}
if($href->{rsh}) {
# honor stream_buffer_remote, rate_limit_remote for stream source / sink
my @rsh_stream_buffer_in = $href->{stream_sink} ? stream_buffer_cmd_text($stream_options->{rsh_sink}) : ();
my @rsh_stream_buffer_out = $href->{stream_source} ? stream_buffer_cmd_text($stream_options->{rsh_source}) : ();
my @rsh_cmd_pipe = (
@decompress_in,
@rsh_stream_buffer_in,
$href,
@rsh_stream_buffer_out,
@rsh_compress_out,
);
@decompress_in = ();
# fixup redirect_to_file
if((scalar(@rsh_cmd_pipe) == 1) && ($rsh_cmd_pipe[0]->{redirect_to_file})) {
# NOTE: direct redirection in ssh command does not work: "ssh '> outfile'"
# we need to assemble: "ssh 'cat > outfile'"
unshift @rsh_cmd_pipe, { cmd_text => 'cat' };
}
my $rsh_text = _safe_cmd($href->{rsh}, \@unsafe_cmd);
return undef unless(defined($rsh_text));
$href->{cmd_text} = $rsh_text . " '" . _assemble_cmd(\@rsh_cmd_pipe) . "'";
}
# local stream_buffer, rate_limit and show_progress in front of stream sink
my @stream_buffer_in = $href->{stream_sink} ? stream_buffer_cmd_text($stream_options->{local_sink}) : ();
push @cmd_pipe, (
@decompress_in, # empty if rsh
@stream_buffer_in,
@rsh_compress_in, # empty if not rsh
$href, # command or rsh_cmd_pipe
);
}
my $cmd = _assemble_cmd(\@cmd_pipe, $catch_stderr);
@ -1437,6 +1453,8 @@ sub btrfs_send_receive($$;$$$)
INFO "[send/receive] parent: $parent->{PRINT}" if($parent);
INFO "[send/receive] clone-src: $_->{PRINT}" foreach(@$clone_src);
my $stream_options = config_stream_hash($snapshot, $target);
my @send_options;
my @receive_options;
push(@send_options, '-p', { unsafe => $parent_path} ) if($parent_path);
@ -1447,18 +1465,16 @@ sub btrfs_send_receive($$;$$$)
my @cmd_pipe;
push @cmd_pipe, {
cmd => vinfo_cmd($snapshot, "btrfs send", @send_options, { unsafe => $snapshot_path } ),
rsh => vinfo_rsh($snapshot, disable_compression => config_compress_hash($snapshot, "stream_compress")),
rsh => vinfo_rsh($snapshot, disable_compression => $stream_options->{stream_compress}),
name => "btrfs send",
rsh_compress_out => config_compress_hash($snapshot, "stream_compress"),
stream_options => $stream_options,
catch_stderr => 1, # hack for shell-based run_cmd()
};
push @cmd_pipe, {
cmd => vinfo_cmd($target, "btrfs receive", @receive_options, { unsafe => $target_path . '/' } ),
rsh => vinfo_rsh($target, disable_compression => config_compress_hash($target, "stream_compress")),
rsh => vinfo_rsh($target, disable_compression => $stream_options->{stream_compress}),
name => "btrfs receive",
rsh_compress_in => config_compress_hash($target, "stream_compress"),
stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"),
rate_limit => config_key($target, "rate_limit") },
catch_stderr => 1, # hack for shell-based run_cmd()
filter_stderr => sub { $err = $_; $_ = undef }
};
@ -1587,22 +1603,27 @@ sub btrfs_send_to_file($$$;$$)
my $compress = config_compress_hash($target, "raw_target_compress");
my $encrypt = config_encrypt_hash($target, "raw_target_encrypt");
my $split = config_key($target, "raw_target_split");
my $stream_options = config_stream_hash($source, $target);
# make sure we dont re-compress, override "stream_compress" with "raw_target_compress"
$stream_options->{stream_compress} = $compress if($compress);
my @send_options;
push(@send_options, '-v') if($loglevel >= 3);
push(@send_options, '-p', $parent_path) if($parent_path);
#push(@send_options, '-v') if($loglevel >= 3);
my @cmd_pipe;
push @cmd_pipe, {
cmd => vinfo_cmd($source, "btrfs send", @send_options, { unsafe => $source_path } ),
rsh => vinfo_rsh($source, disable_compression => $compress || config_compress_hash($source, "stream_compress")),
rsh => vinfo_rsh($source, disable_compression => $stream_options->{stream_compress}),
name => "btrfs send",
rsh_compress_out => $compress || config_compress_hash($source, "stream_compress"),
stream_options => $stream_options,
};
if($compress) {
$raw_info{compress} = $compression{$compress->{key}}->{format} if($compress);
$raw_info{compress} = $compression{$compress->{key}}->{format};
$target_filename .= '.' . $compression{$compress->{key}}->{format};
push @cmd_pipe, { compress => $compress }; # does nothing if already compressed by rsh_compress_out
push @cmd_pipe, { compress_stdin => $compress }; # does nothing if already compressed by stream_compress
}
if($encrypt) {
$target_filename .= ($encrypt->{type} eq "gpg") ? '.gpg' : '.encrypted';
@ -1730,10 +1751,7 @@ sub btrfs_send_to_file($$$;$$)
push @cmd_pipe, {
cmd => [ 'split', '-b', uc($split), '-', "${target_path}/${target_filename}.split_" ],
check_unsafe => [ { unsafe => "${target_path}/${target_filename}.split_" } ],
rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")),
rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"),
stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"),
rate_limit => config_key($target, "rate_limit") },
rsh => vinfo_rsh($target, disable_compression => $stream_options->{stream_compress}),
compressed_ok => ($compress ? 1 : 0),
}
}
@ -1751,11 +1769,7 @@ sub btrfs_send_to_file($$$;$$)
cmd => [ 'dd', 'status=none', 'bs=' . config_key($target, "raw_target_block_size"), "of=${target_path}/${target_filename}" ],
check_unsafe => [ { unsafe => "${target_path}/${target_filename}" } ],
#redirect_to_file => { unsafe => "${target_path}/${target_filename}" }, # alternative (use shell redirection), less overhead on local filesystems (barely measurable):
rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")),
rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"),
stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"),
rate_limit => config_key($target, "rate_limit"),
blocksize => config_key($target, "raw_target_block_size") },
rsh => vinfo_rsh($target, disable_compression => $stream_options->{stream_compress}),
compressed_ok => ($compress ? 1 : 0),
};
}
@ -3550,6 +3564,40 @@ sub config_compress_hash($$)
}
sub config_stream_hash($$)
{
my $source = shift || die;
my $target = shift || die;
return {
stream_compress => config_compress_hash($target, "stream_compress"),
# for remote source, limits read rate of ssh stream output after decompress
# for remote target, limits read rate of "btrfs send"
# for both local, limits read rate of "btrfs send"
# for raw targets, limits read rate of "btrfs send | xz" (raw_target_compress)
local_sink => {
stream_buffer => config_key($target, "stream_buffer"),
rate_limit => config_key($target, "rate_limit"),
show_progress => $show_progress,
},
# limits read rate of "btrfs send"
rsh_source => { # limit read rate after "btrfs send", before compression
stream_buffer => config_key($source, "stream_buffer_remote"),
rate_limit => config_key($source, "rate_limit_remote"),
#rate_limit_out => config_key($source, "rate_limit_remote"), # limit write rate
},
# limits read rate of ssh stream output
rsh_sink => {
stream_buffer => config_key($target, "stream_buffer_remote"),
rate_limit => config_key($target, "rate_limit_remote"),
#rate_limit_in => config_key($target, "rate_limit_remote"),
},
};
}
sub config_encrypt_hash($$)
{
my $config = shift || die;