From 0435f32619b9a103ffc2dea6f16d759fe8fe3c3e Mon Sep 17 00:00:00 2001 From: Axel Burri Date: Mon, 29 Jul 2019 21:59:03 +0200 Subject: [PATCH] btrbk: add stream_buffer_remote, rate_limit_remote; run stream_buffer only on local host Change sematics of stream_buffer: If set, run on local host only. If a remote stream buffer is required, use stream_buffer_remote / rate_limit_remote. --- btrbk | 264 ++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 156 insertions(+), 108 deletions(-) diff --git a/btrbk b/btrbk index 7a3c364..dae3abb 100755 --- a/btrbk +++ b/btrbk @@ -94,12 +94,14 @@ my %config_options = ( ssh_user => { default => "root", accept_regexp => qr/^[a-z_][a-z0-9_-]*$/ }, ssh_compression => { default => undef, accept => [ "yes", "no" ] }, ssh_cipher_spec => { default => "default", accept_regexp => qr/^$ssh_cipher_match(,$ssh_cipher_match)*$/ }, - rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/ }, # NOTE: requires 'mbuffer' command on target - stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on target transaction_log => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] }, transaction_syslog => { default => undef, accept => [ "no", @syslog_facilities ], context => [ "root" ] }, lockfile => { default => undef, accept => [ "no" ], accept_file => { absolute => 1 }, context => [ "root" ] }, + rate_limit => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/, require_bin => 'mbuffer' }, + rate_limit_remote => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgtKMGT]?$/ }, # NOTE: requires 'mbuffer' command on remote hosts + stream_buffer => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/, require_bin => 'mbuffer' }, + stream_buffer_remote => { default => undef, accept => [ "no" ], accept_regexp => qr/^[0-9]+[kmgKMG%]?$/ }, # NOTE: requires 'mbuffer' command on remote hosts stream_compress => { default => undef, accept => [ "no", (keys %compression) ] }, stream_compress_level => { default => "default", accept => [ "default" ], accept_numeric => 1 }, stream_compress_threads => { default => "default", accept => [ "default" ], accept_numeric => 1 }, @@ -564,31 +566,33 @@ sub check_exe($) return 0; } -sub stream_buffer_cmd_text(@) +sub stream_buffer_cmd_text($) { - my %opts = @_; - my $rate = $opts{rate_limit}; - my $bufsize = $opts{stream_buffer}; - my $blocksize = $opts{blocksize}; - my $progress = $opts{show_progress}; + my $opts = shift; + my $rl_in = $opts->{rate_limit_in} // $opts->{rate_limit}; # maximum read rate: b,k,M,G + my $rl_out = $opts->{rate_limit_out}; # maximum write rate: b,k,M,G + my $bufsize = $opts->{stream_buffer}; # b,k,M,G,% (default: 2%) + my $blocksize = $opts->{blocksize}; # defaults to 10k + my $progress = $opts->{show_progress}; - return undef unless($rate || $bufsize || $progress); + # return empty array if mbuffer is not needed + return () unless($rl_in || $rl_out || $bufsize || $progress); # NOTE: mbuffer takes defaults from /etc/mbuffer.rc my @cmd = ( "mbuffer" ); - push @cmd, ( "-v1" ); # disable warnings (they arrive asynchronously and cant be cought) + push @cmd, ( "-v", "1" ); # disable warnings (they arrive asynchronously and cant be cought) push @cmd, "-q" unless($progress); - push @cmd, ( "-s", $blocksize ) if($blocksize); # defaults to 10k - push @cmd, ( "-m", lc($bufsize) ) if($bufsize); # b,k,M,G,% (default: 2%) - push @cmd, ( "-r", lc($rate) ) if($rate); # maximum read rate: b,k,M,G - return join(' ', @cmd); + push @cmd, ( "-s", $blocksize ) if($blocksize); + push @cmd, ( "-m", lc($bufsize) ) if($bufsize); + push @cmd, ( "-r", lc($rl_in) ) if($rl_in); + push @cmd, ( "-R", lc($rl_out) ) if($rl_out); + return { cmd_text => join(' ', @cmd) }; } sub compress_cmd_text($;$) { - my $def = shift; + my $def = shift // die; my $decompress = shift; - return undef unless(defined($def)); my $cc = $compression{$def->{key}}; my @cmd = $decompress ? @{$cc->{decompress_cmd}} : @{$cc->{compress_cmd}}; @@ -613,7 +617,7 @@ sub compress_cmd_text($;$) WARN_ONCE "Threading is not supported for '$cc->{name}', ignoring"; } } - return join(' ', @cmd); + return { cmd_text => join(' ', @cmd) }; } sub decompress_cmd_text($) @@ -687,6 +691,10 @@ sub run_cmd(@) my @cmd_pipe; my @unsafe_cmd; my $compressed = undef; + my $stream_options = $cmd_pipe_in[0]->{stream_options} // {}; + + $cmd_pipe_in[0]->{stream_source} = 1; + $cmd_pipe_in[-1]->{stream_sink} = 1; foreach my $href (@cmd_pipe_in) { @@ -700,84 +708,92 @@ sub run_cmd(@) _safe_cmd($href->{check_unsafe}, \@unsafe_cmd); } - if($href->{compress}) { - if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{compress}->{key}}->{format})) { - push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) }; + if($href->{redirect_to_file}) { + die unless($href->{stream_sink}); + $href->{cmd_text} = _safe_cmd([ '>', $href->{redirect_to_file} ], \@unsafe_cmd); + } + elsif($href->{compress_stdin}) { + # does nothing if already compressed correctly by stream_compress + if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{compress_stdin}->{key}}->{format})) { + # re-compress with different algorithm + push @cmd_pipe, decompress_cmd_text($compressed); $compressed = undef; } unless($compressed) { - push @cmd_pipe, { cmd_text => compress_cmd_text($href->{compress}) }; - $compressed = $href->{compress}; + push @cmd_pipe, compress_cmd_text($href->{compress_stdin}); + $compressed = $href->{compress_stdin}; + } + + next; + } + elsif($href->{cmd}) { + $href->{cmd_text} = _safe_cmd($href->{cmd}, \@unsafe_cmd); + } + return undef unless(defined($href->{cmd_text})); + + my @rsh_compress_in; + my @rsh_compress_out; + my @decompress_in; + + # input stream compression: local, in front of rsh_cmd_pipe + if($href->{rsh} && $stream_options->{stream_compress} && (not $href->{stream_source})) { + if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$stream_options->{stream_compress}->{key}}->{format})) { + # re-compress with different algorithm, should be avoided! + push @rsh_compress_in, decompress_cmd_text($compressed); + $compressed = undef; + } + if(not $compressed) { + $compressed = $stream_options->{stream_compress}; + push @rsh_compress_in, compress_cmd_text($compressed); } } - else { - if($href->{redirect_to_file}) { - $href->{cmd_text} = _safe_cmd([ '>', $href->{redirect_to_file} ], \@unsafe_cmd); - } - elsif($href->{cmd}) { - $href->{cmd_text} = _safe_cmd($href->{cmd}, \@unsafe_cmd); - } - return undef unless(defined($href->{cmd_text})); - if($href->{rsh}) { - my @rsh_cmd_pipe = ( $href ); - - if($href->{rsh_compress_in}) { - if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{rsh_compress_in}->{key}}->{format})) - { - push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) }; - $compressed = undef; - } - unless($compressed) { - push @cmd_pipe, { cmd_text => compress_cmd_text($href->{rsh_compress_in}) }; - $compressed = $href->{rsh_compress_in}; - } - } - - if($compressed && (not ($href->{compressed_ok}))) { - unshift @rsh_cmd_pipe, { cmd_text => decompress_cmd_text($compressed) }; - $compressed = undef; - } - - if($href->{stream_buffer_sink}) { - if(my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}})) { - unshift @rsh_cmd_pipe, { cmd_text => $cmd_text }; # in front of decompress - } - if($show_progress) { - # add an additional stream buffer (with default options) to cmd_pipe for show_progress - push @cmd_pipe, { cmd_text => stream_buffer_cmd_text( show_progress => 1 ) }; - } - } - - if($href->{rsh_compress_out}) { - die if($href->{redirect_to_file}); - push @rsh_cmd_pipe, { cmd_text => compress_cmd_text($href->{rsh_compress_out}) }; - $compressed = $href->{rsh_compress_out}; - } - - if((scalar(@rsh_cmd_pipe) == 1) && ($rsh_cmd_pipe[0]->{redirect_to_file})) { - # NOTE: direct redirection in ssh command does not work: "ssh '> outfile'" - # we need to assemble: "ssh 'cat > outfile'" - unshift @rsh_cmd_pipe, { cmd_text => 'cat' }; - } - - my $rsh_text = _safe_cmd($href->{rsh}, \@unsafe_cmd); - return undef unless(defined($rsh_text)); - $href->{cmd_text} = $rsh_text . " '" . _assemble_cmd(\@rsh_cmd_pipe) . "'"; - } - else { - if($href->{stream_buffer_sink}) { - my $cmd_text = stream_buffer_cmd_text(%{$href->{stream_buffer_sink}}, show_progress => $show_progress); - push @cmd_pipe, { cmd_text => $cmd_text } if($cmd_text); - } - - if($compressed && (not ($href->{compressed_ok}))) { - push @cmd_pipe, { cmd_text => decompress_cmd_text($compressed) }; - $compressed = undef; - } - } - push @cmd_pipe, $href; + if($compressed && (not ($href->{compressed_ok}))) { + push @decompress_in, decompress_cmd_text($compressed); + $compressed = undef; } + + # output stream compression: remote, at end of rsh_cmd_pipe + if($href->{rsh} && $stream_options->{stream_compress} && (not $href->{stream_sink}) && (not $compressed)) { + $compressed = $stream_options->{stream_compress}; + push @rsh_compress_out, compress_cmd_text($compressed); + } + + if($href->{rsh}) { + # honor stream_buffer_remote, rate_limit_remote for stream source / sink + my @rsh_stream_buffer_in = $href->{stream_sink} ? stream_buffer_cmd_text($stream_options->{rsh_sink}) : (); + my @rsh_stream_buffer_out = $href->{stream_source} ? stream_buffer_cmd_text($stream_options->{rsh_source}) : (); + + my @rsh_cmd_pipe = ( + @decompress_in, + @rsh_stream_buffer_in, + $href, + @rsh_stream_buffer_out, + @rsh_compress_out, + ); + @decompress_in = (); + + # fixup redirect_to_file + if((scalar(@rsh_cmd_pipe) == 1) && ($rsh_cmd_pipe[0]->{redirect_to_file})) { + # NOTE: direct redirection in ssh command does not work: "ssh '> outfile'" + # we need to assemble: "ssh 'cat > outfile'" + unshift @rsh_cmd_pipe, { cmd_text => 'cat' }; + } + + my $rsh_text = _safe_cmd($href->{rsh}, \@unsafe_cmd); + return undef unless(defined($rsh_text)); + $href->{cmd_text} = $rsh_text . " '" . _assemble_cmd(\@rsh_cmd_pipe) . "'"; + } + + # local stream_buffer, rate_limit and show_progress in front of stream sink + my @stream_buffer_in = $href->{stream_sink} ? stream_buffer_cmd_text($stream_options->{local_sink}) : (); + + push @cmd_pipe, ( + @decompress_in, # empty if rsh + @stream_buffer_in, + @rsh_compress_in, # empty if not rsh + $href, # command or rsh_cmd_pipe + ); } my $cmd = _assemble_cmd(\@cmd_pipe, $catch_stderr); @@ -1437,6 +1453,8 @@ sub btrfs_send_receive($$;$$$) INFO "[send/receive] parent: $parent->{PRINT}" if($parent); INFO "[send/receive] clone-src: $_->{PRINT}" foreach(@$clone_src); + my $stream_options = config_stream_hash($snapshot, $target); + my @send_options; my @receive_options; push(@send_options, '-p', { unsafe => $parent_path} ) if($parent_path); @@ -1447,18 +1465,16 @@ sub btrfs_send_receive($$;$$$) my @cmd_pipe; push @cmd_pipe, { cmd => vinfo_cmd($snapshot, "btrfs send", @send_options, { unsafe => $snapshot_path } ), - rsh => vinfo_rsh($snapshot, disable_compression => config_compress_hash($snapshot, "stream_compress")), + rsh => vinfo_rsh($snapshot, disable_compression => $stream_options->{stream_compress}), name => "btrfs send", - rsh_compress_out => config_compress_hash($snapshot, "stream_compress"), + stream_options => $stream_options, catch_stderr => 1, # hack for shell-based run_cmd() }; + push @cmd_pipe, { cmd => vinfo_cmd($target, "btrfs receive", @receive_options, { unsafe => $target_path . '/' } ), - rsh => vinfo_rsh($target, disable_compression => config_compress_hash($target, "stream_compress")), + rsh => vinfo_rsh($target, disable_compression => $stream_options->{stream_compress}), name => "btrfs receive", - rsh_compress_in => config_compress_hash($target, "stream_compress"), - stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"), - rate_limit => config_key($target, "rate_limit") }, catch_stderr => 1, # hack for shell-based run_cmd() filter_stderr => sub { $err = $_; $_ = undef } }; @@ -1587,22 +1603,27 @@ sub btrfs_send_to_file($$$;$$) my $compress = config_compress_hash($target, "raw_target_compress"); my $encrypt = config_encrypt_hash($target, "raw_target_encrypt"); my $split = config_key($target, "raw_target_split"); + my $stream_options = config_stream_hash($source, $target); + + # make sure we dont re-compress, override "stream_compress" with "raw_target_compress" + $stream_options->{stream_compress} = $compress if($compress); my @send_options; - push(@send_options, '-v') if($loglevel >= 3); push(@send_options, '-p', $parent_path) if($parent_path); + #push(@send_options, '-v') if($loglevel >= 3); my @cmd_pipe; push @cmd_pipe, { cmd => vinfo_cmd($source, "btrfs send", @send_options, { unsafe => $source_path } ), - rsh => vinfo_rsh($source, disable_compression => $compress || config_compress_hash($source, "stream_compress")), + rsh => vinfo_rsh($source, disable_compression => $stream_options->{stream_compress}), name => "btrfs send", - rsh_compress_out => $compress || config_compress_hash($source, "stream_compress"), + stream_options => $stream_options, }; + if($compress) { - $raw_info{compress} = $compression{$compress->{key}}->{format} if($compress); + $raw_info{compress} = $compression{$compress->{key}}->{format}; $target_filename .= '.' . $compression{$compress->{key}}->{format}; - push @cmd_pipe, { compress => $compress }; # does nothing if already compressed by rsh_compress_out + push @cmd_pipe, { compress_stdin => $compress }; # does nothing if already compressed by stream_compress } if($encrypt) { $target_filename .= ($encrypt->{type} eq "gpg") ? '.gpg' : '.encrypted'; @@ -1730,10 +1751,7 @@ sub btrfs_send_to_file($$$;$$) push @cmd_pipe, { cmd => [ 'split', '-b', uc($split), '-', "${target_path}/${target_filename}.split_" ], check_unsafe => [ { unsafe => "${target_path}/${target_filename}.split_" } ], - rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")), - rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"), - stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"), - rate_limit => config_key($target, "rate_limit") }, + rsh => vinfo_rsh($target, disable_compression => $stream_options->{stream_compress}), compressed_ok => ($compress ? 1 : 0), } } @@ -1751,11 +1769,7 @@ sub btrfs_send_to_file($$$;$$) cmd => [ 'dd', 'status=none', 'bs=' . config_key($target, "raw_target_block_size"), "of=${target_path}/${target_filename}" ], check_unsafe => [ { unsafe => "${target_path}/${target_filename}" } ], #redirect_to_file => { unsafe => "${target_path}/${target_filename}" }, # alternative (use shell redirection), less overhead on local filesystems (barely measurable): - rsh => vinfo_rsh($target, disable_compression => $compress || config_compress_hash($target, "stream_compress")), - rsh_compress_in => $compress || config_compress_hash($target, "stream_compress"), - stream_buffer_sink => { stream_buffer => config_key($target, "stream_buffer"), - rate_limit => config_key($target, "rate_limit"), - blocksize => config_key($target, "raw_target_block_size") }, + rsh => vinfo_rsh($target, disable_compression => $stream_options->{stream_compress}), compressed_ok => ($compress ? 1 : 0), }; } @@ -3550,6 +3564,40 @@ sub config_compress_hash($$) } +sub config_stream_hash($$) +{ + my $source = shift || die; + my $target = shift || die; + return { + stream_compress => config_compress_hash($target, "stream_compress"), + + # for remote source, limits read rate of ssh stream output after decompress + # for remote target, limits read rate of "btrfs send" + # for both local, limits read rate of "btrfs send" + # for raw targets, limits read rate of "btrfs send | xz" (raw_target_compress) + local_sink => { + stream_buffer => config_key($target, "stream_buffer"), + rate_limit => config_key($target, "rate_limit"), + show_progress => $show_progress, + }, + + # limits read rate of "btrfs send" + rsh_source => { # limit read rate after "btrfs send", before compression + stream_buffer => config_key($source, "stream_buffer_remote"), + rate_limit => config_key($source, "rate_limit_remote"), + #rate_limit_out => config_key($source, "rate_limit_remote"), # limit write rate + }, + + # limits read rate of ssh stream output + rsh_sink => { + stream_buffer => config_key($target, "stream_buffer_remote"), + rate_limit => config_key($target, "rate_limit_remote"), + #rate_limit_in => config_key($target, "rate_limit_remote"), + }, + }; +} + + sub config_encrypt_hash($$) { my $config = shift || die;