btrbk: fix and cleanup stream compression

pull/106/merge
Axel Burri 2016-08-21 12:57:15 +02:00
parent 8f7d3e3c3d
commit 96f0d4c175
2 changed files with 84 additions and 74 deletions

View File

@ -1,7 +1,5 @@
btrbk-current btrbk-current
* KNOWN BUG: for raw targets, the send stream is not decompressed if
stream_compress is set and raw_target_compress is not set.
* MIGRATION * MIGRATION
- update ssh_filter_btrbk.sh on remote hosts if "stream_compress" - update ssh_filter_btrbk.sh on remote hosts if "stream_compress"
is enabled. Also add "--compress" option to ssh_filter_btrbk.sh is enabled. Also add "--compress" option to ssh_filter_btrbk.sh

152
btrbk
View File

@ -506,122 +506,133 @@ sub decompress_cmd($)
} }
sub _assemble_cmd($;$)
{
my $cmd_pipe = shift;
my $catch_stderr = shift;
my $cmd = "";
# simple single-command
if(scalar(@$cmd_pipe) == 1) {
$cmd = $cmd_pipe->[0]->{safe_cmd_text};
$cmd .= ' 2>&1' if($catch_stderr && $cmd_pipe->[0]->{catch_stderr});
return $cmd;
}
# cmd result is something like this:
# { btrfs send <src> 2>&3 | pv | btrfs receive <dst> 2>&3 ; } 3>&1
my $pipe = "";
$cmd = "{ " if($catch_stderr);
foreach (@$cmd_pipe) {
if($_->{safe_cmd_text} =~ /^>/) {
die unless($pipe);
$cmd .= ' ' . $_->{safe_cmd_text};
$pipe = undef; # this dies if it is not last command
} else {
$cmd .= $pipe . $_->{safe_cmd_text};
$cmd .= ' 2>&3' if($catch_stderr && $_->{catch_stderr});
$pipe = ' | ';
}
}
$cmd .= ' ; } 3>&1' if($catch_stderr);
return $cmd;
}
sub run_cmd(@) sub run_cmd(@)
{ {
# shell-based implementation. # shell-based implementation.
# this needs some redirection magic for filter_stderr to work. # this needs some redirection magic for filter_stderr to work.
# NOTE: multiple filters are not supported! # NOTE: multiple filters are not supported!
my @cmd_pipe = (ref($_[0]) eq "HASH") ? @_ : { @_ }; my @cmd_pipe_in = (ref($_[0]) eq "HASH") ? @_ : { @_ };
die unless(scalar(@cmd_pipe)); die unless(scalar(@cmd_pipe_in));
$err = ""; $err = "";
my $destructive = 0; my $destructive = 0;
my $catch_stderr = 0; my $catch_stderr = 0;
my $filter_stderr = undef; my $filter_stderr = undef;
my @cmd_text; my @cmd_pipe;
my $compressed = undef; my $compressed = undef;
foreach my $href (@cmd_pipe)
foreach my $href (@cmd_pipe_in)
{ {
die if(defined($href->{safe_cmd_text}));
$catch_stderr = 1 if($href->{catch_stderr}); $catch_stderr = 1 if($href->{catch_stderr});
$filter_stderr = $href->{filter_stderr} if($href->{filter_stderr}); # NOTE: last filter wins! $filter_stderr = $href->{filter_stderr} if($href->{filter_stderr}); # NOTE: last filter wins!
$destructive = 1 unless($href->{non_destructive}); $destructive = 1 unless($href->{non_destructive});
if($href->{compress}) { if($href->{compress}) {
if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{compress}->{key}}->{format})) { if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{compress}->{key}}->{format})) {
push @cmd_text, { text => decompress_cmd($compressed) }; push @cmd_pipe, { safe_cmd_text => decompress_cmd($compressed) };
$compressed = undef; $compressed = undef;
} }
unless($compressed) { unless($compressed) {
push @cmd_text, { text => compress_cmd($href->{compress}) }; push @cmd_pipe, { safe_cmd_text => compress_cmd($href->{compress}) };
$compressed = $href->{compress}; $compressed = $href->{compress};
} }
} }
elsif($href->{redirect}) {
my $file = safe_cmd([ $href->{redirect} ]);
return undef unless(defined($file));
if($href->{rsh}) {
my $rsh_text = safe_cmd($href->{rsh});
return undef unless(defined($rsh_text));
push @cmd_text, { text => $rsh_text . " 'cat > " . $file . "'" };
}
else { else {
push @cmd_text, { redirect => $file }; if($href->{redirect_to_file}) {
} my $file = safe_cmd([ $href->{redirect_to_file} ]);
return undef unless(defined($file));
$href->{safe_cmd_text} = "> $file";
} }
elsif($href->{cmd}) { elsif($href->{cmd}) {
my $sc = safe_cmd($href->{cmd}); $href->{safe_cmd_text} = safe_cmd($href->{cmd});
return undef unless(defined($sc)); }
return undef unless(defined($href->{safe_cmd_text}));
if($href->{rsh}) { if($href->{rsh}) {
my $rsh_text = safe_cmd($href->{rsh}); my @rsh_cmd_pipe = ( $href );
return undef unless(defined($rsh_text));
if($href->{rsh_compress_in}) { if($href->{rsh_compress_in}) {
if($compressed && ((not $href->{compressed_ok}) || if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{rsh_compress_in}->{key}}->{format}))
($compression{$compressed->{key}}->{format} ne $compression{$href->{rsh_compress_in}->{key}}->{format})))
{ {
push @cmd_text, { text => decompress_cmd($compressed) }; push @cmd_pipe, { safe_cmd_text => decompress_cmd($compressed) };
$compressed = undef; $compressed = undef;
} }
unless($compressed) { unless($compressed) {
push @cmd_text, { text => compress_cmd($href->{rsh_compress_in}) }; push @cmd_pipe, { safe_cmd_text => compress_cmd($href->{rsh_compress_in}) };
$compressed = $href->{rsh_compress_in}; $compressed = $href->{rsh_compress_in};
} }
unless($href->{compressed_ok}) { }
$sc = decompress_cmd($href->{rsh_compress_in}) . ' | ' . $sc;
if($compressed && (not ($href->{compressed_ok}))) {
unshift @rsh_cmd_pipe, { safe_cmd_text => decompress_cmd($compressed) };
$compressed = undef; $compressed = undef;
} }
}
if($href->{rsh_compress_out}) { if($href->{rsh_compress_out}) {
$sc .= ' | ' . compress_cmd($href->{rsh_compress_out}); die if($href->{redirect_to_file});
push @rsh_cmd_pipe, { safe_cmd_text => compress_cmd($href->{rsh_compress_out}) };
$compressed = $href->{rsh_compress_out}; $compressed = $href->{rsh_compress_out};
} }
$sc = $rsh_text . " '" . $sc . "'";
if((scalar(@rsh_cmd_pipe) == 1) && ($rsh_cmd_pipe[0]->{redirect_to_file})) {
# NOTE: direct redirection in ssh command does not work: "ssh '> outfile'"
# we need to assemble: "ssh 'cat > outfile'"
unshift @rsh_cmd_pipe, { safe_cmd_text => 'cat' };
}
my $rsh_text = safe_cmd($href->{rsh});
return undef unless(defined($rsh_text));
$href->{safe_cmd_text} = $rsh_text . " '" . _assemble_cmd(\@rsh_cmd_pipe) . "'";
} }
else { else {
if($compressed && (not ($href->{compressed_ok}))) { if($compressed && (not ($href->{compressed_ok}))) {
push @cmd_text, { text => decompress_cmd($compressed) }; push @cmd_pipe, { safe_cmd_text => decompress_cmd($compressed) };
$compressed = undef; $compressed = undef;
} }
} }
push @cmd_text, { text => $sc, push @cmd_pipe, $href;
catch_stderr => $href->{catch_stderr} };
} }
} }
# cmd result is something like this: my $cmd = _assemble_cmd(\@cmd_pipe, $catch_stderr);
# { btrfs send <src> 2>&3 | pv | btrfs receive <dst> 2>&3 ; } 3>&1 my $cmd_print = _assemble_cmd(\@cmd_pipe); # hide redirection magic from debug output
my $cmd_print = "";
my $cmd = "{ ";
my $pipe = "";
foreach (@cmd_text) {
if($_->{redirect}) {
die unless($pipe);
$cmd_print .= ' > ' . $_->{redirect};
$cmd .= ' > ' . $_->{redirect};
$pipe = undef; # this dies if it is not last command
} else {
$cmd_print .= $pipe . $_->{text};
$cmd .= $pipe . $_->{text};
$cmd .= ' 2>&3' if($_->{catch_stderr});
$pipe = ' | ';
}
}
$cmd .= ' ; } 3>&1';
if($catch_stderr) {
if(scalar(@cmd_text) == 1) {
# no pipes, simply redirect stderr to stdout
$cmd = $cmd_print . ' 2>&1';
}
}
else {
$cmd = $cmd_print;
}
# hide redirection magic from debug output
if($dryrun && $destructive) { if($dryrun && $destructive) {
DEBUG "### (dryrun) $cmd_print"; DEBUG "### (dryrun) $cmd_print";
return ""; return "";
@ -1233,14 +1244,14 @@ sub btrfs_send_to_file($$$$;@)
my @cmd_pipe; my @cmd_pipe;
push @cmd_pipe, { push @cmd_pipe, {
cmd => [ qw(btrfs send), @send_options, $source_path ], cmd => [ qw(btrfs send), @send_options, $source_path ],
rsh => vinfo_rsh($source, disable_compression => config_compress_hash($source, "stream_compress")), rsh => vinfo_rsh($source, disable_compression => $opts{compress} || config_compress_hash($source, "stream_compress")),
name => "btrfs send", name => "btrfs send",
rsh_compress_out => config_compress_hash($source, "stream_compress"), rsh_compress_out => $opts{compress} || config_compress_hash($source, "stream_compress"),
}; };
add_pv_command(\@cmd_pipe, show_progress => $show_progress, rate_limit => $opts{rate_limit}); add_pv_command(\@cmd_pipe, show_progress => $show_progress, rate_limit => $opts{rate_limit});
if($opts{compress}) { if($opts{compress}) {
$target_filename .= '.' . $compression{$opts{compress}->{key}}->{format}; $target_filename .= '.' . $compression{$opts{compress}->{key}}->{format};
push @cmd_pipe, { compress => $opts{compress} }; push @cmd_pipe, { compress => $opts{compress} }; # does nothing if already compressed by rsh_compress_out
} }
if($opts{encrypt}) { if($opts{encrypt}) {
die unless($opts{encrypt}->{type} eq "gpg"); die unless($opts{encrypt}->{type} eq "gpg");
@ -1251,13 +1262,14 @@ sub btrfs_send_to_file($$$$;@)
push @cmd_pipe, { push @cmd_pipe, {
cmd => [ 'gpg', @gpg_options, '--encrypt' ], cmd => [ 'gpg', @gpg_options, '--encrypt' ],
name => 'gpg', name => 'gpg',
compressed_ok => 1, compressed_ok => ($opts{compress} ? 1 : 0),
}; };
} }
push @cmd_pipe, { push @cmd_pipe, {
redirect => "${target_path}/${target_filename}.part", redirect_to_file => "${target_path}/${target_filename}.part",
rsh => vinfo_rsh($target), rsh => vinfo_rsh($target, disable_compression => $opts{compress} || config_compress_hash($target, "stream_compress")),
compressed_ok => 1, rsh_compress_in => $opts{compress} || config_compress_hash($target, "stream_compress"),
compressed_ok => ($opts{compress} ? 1 : 0),
}; };
my $vol_received = vinfo_child($target, $target_filename); my $vol_received = vinfo_child($target, $target_filename);