btrbk: add configuration option "stream_compress"

- add sophisticated stream compression in run_cmd
- add special "compress" cmd_pipe item
- add special "redirect" cmd_pipe item:
  use shell redirection instead of troublesome "dd of=".
- disable ssh_compression if stream_compression is set
pull/106/merge
Axel Burri 2016-05-11 20:15:46 +02:00
parent 99ff88f82a
commit a019d8a3e3
1 changed files with 184 additions and 79 deletions

249
btrbk
View File

@ -66,10 +66,15 @@ my $timestamp_postfix_match = qr/\.(?<YYYY>[0-9]{4})(?<MM>[0-9]{2})(?<DD>[0-9]{2
my $raw_postfix_match = qr/--(?<received_uuid>$uuid_match)(\@(?<parent_uuid>$uuid_match))?\.btrfs?(\.(?<compress>(gz|bz2|xz)))?(\.(?<encrypt>gpg))?(\.(?<incomplete>part))?/; # matches ".btrfs_<received_uuid>[@<parent_uuid>][.gz|bz2|xz][.gpg][.part]" my $raw_postfix_match = qr/--(?<received_uuid>$uuid_match)(\@(?<parent_uuid>$uuid_match))?\.btrfs?(\.(?<compress>(gz|bz2|xz)))?(\.(?<encrypt>gpg))?(\.(?<incomplete>part))?/; # matches ".btrfs_<received_uuid>[@<parent_uuid>][.gz|bz2|xz][.gpg][.part]"
my $group_match = qr/[a-zA-Z0-9_:-]+/; my $group_match = qr/[a-zA-Z0-9_:-]+/;
my $ssh_cipher_match = qr/[a-z0-9][a-z0-9@.-]+/; my $ssh_cipher_match = qr/[a-z0-9][a-z0-9@.-]+/;
my $safe_cmd_match = qr/[0-9a-zA-Z_@=\+\-\.\/]+/; # $file_match plus '=': good enough for our purpose my $safe_cmd_match = $file_match; # good enough
my %day_of_week_map = ( sunday => 0, monday => 1, tuesday => 2, wednesday => 3, thursday => 4, friday => 5, saturday => 6 ); my %day_of_week_map = ( sunday => 0, monday => 1, tuesday => 2, wednesday => 3, thursday => 4, friday => 5, saturday => 6 );
my @syslog_facilities = qw( user mail daemon auth lpr news cron authpriv local0 local1 local2 local3 local4 local5 local6 local7 ); my @syslog_facilities = qw( user mail daemon auth lpr news cron authpriv local0 local1 local2 local3 local4 local5 local6 local7 );
my %compression = (
gzip => { name => 'gzip' , format => 'gz', compress_cmd => [ 'gzip', '-c' ], decompress_cmd => [ 'gzip', '-d', '-c' ], level_min => 1, level_max => 9 },
bzip2 => { name => 'bzip2', format => 'bz2', compress_cmd => [ 'bzip2', '-c' ], decompress_cmd => [ 'bzip2', '-d', '-c' ], level_min => 1, level_max => 9 },
xz => { name => 'xz' , format => 'xz', compress_cmd => [ 'xz', '-c' ], decompress_cmd => [ 'xz', '-d', '-c' ], level_min => 0, level_max => 9, threads => '-T ' },
);
my %config_options = ( my %config_options = (
# NOTE: the parser always maps "no" to undef # NOTE: the parser always maps "no" to undef
@ -98,6 +103,10 @@ my %config_options = (
transaction_syslog => { default => undef, accept => \@syslog_facilities }, transaction_syslog => { default => undef, accept => \@syslog_facilities },
lockfile => { default => undef, accept_file => { absolute => 1 }, context => [ "root" ] }, lockfile => { default => undef, accept_file => { absolute => 1 }, context => [ "root" ] },
stream_compress => { default => undef, accept => [ "no", (keys %compression) ] },
stream_compress_level => { default => "default", accept => [ "default" ], accept_numeric => 1 },
stream_compress_threads => { default => "default", accept => [ "default" ], accept_numeric => 1 },
raw_target_compress => { default => undef, accept => [ "no", "gzip", "bzip2", "xz" ] }, raw_target_compress => { default => undef, accept => [ "no", "gzip", "bzip2", "xz" ] },
raw_target_compress_level => { default => "default", accept => [ "default" ], accept_numeric => 1 }, raw_target_compress_level => { default => "default", accept => [ "default" ], accept_numeric => 1 },
raw_target_compress_threads => { default => "default", accept => [ "default" ], accept_numeric => 1 }, raw_target_compress_threads => { default => "default", accept => [ "default" ], accept_numeric => 1 },
@ -448,53 +457,161 @@ sub safe_cmd($)
return undef; return undef;
} }
} }
return 1; return join(' ', @$aref);
} }
sub compress_cmd($;$)
{
my $def = shift;
my $decompress = shift;
return undef unless(defined($def));
my $cc = $compression{$def->{key}};
my @cmd = $decompress ? @{$cc->{decompress_cmd}} : @{$cc->{compress_cmd}};
if((not $decompress) && defined($def->{level}) && ($def->{level} ne "default")) {
my $level = $def->{level};
if($level < $cc->{level_min}) {
WARN_ONCE "Compression level capped to minimum for '$cc->{name}': $cc->{level_min}";
$level = $cc->{level_min};
}
if($level > $cc->{level_max}) {
WARN_ONCE "Compression level capped to maximum for '$cc->{name}': $cc->{level_max}";
$level = $cc->{level_max};
}
push @cmd, '-' . $level;
}
if(defined($def->{threads}) && ($def->{threads} ne "default")) {
my $thread_opt = $cc->{threads};
if($thread_opt) {
push @cmd, $thread_opt . $def->{threads};
}
else {
WARN_ONCE "Threading is not supported for '$cc->{name}', ignoring";
}
}
return join(' ', @cmd);
}
sub decompress_cmd($)
{
return compress_cmd($_[0], 1);
}
sub run_cmd(@) sub run_cmd(@)
{ {
# shell-based implementation. # shell-based implementation.
# this needs some redirection magic for filter_stderr to work. # this needs some redirection magic for filter_stderr to work.
# NOTE: multiple filters are not supported! # NOTE: multiple filters are not supported!
my @commands = (ref($_[0]) eq "HASH") ? @_ : { @_ }; my @cmd_pipe = (ref($_[0]) eq "HASH") ? @_ : { @_ };
die unless(scalar(@commands)); die unless(scalar(@cmd_pipe));
$err = ""; $err = "";
my $destructive = 0; my $destructive = 0;
my $catch_stderr = 0; my $catch_stderr = 0;
my $filter_stderr = undef; my $filter_stderr = undef;
foreach (@commands) { my @cmd_text;
$_->{rsh} //= []; my $compressed = undef;
$_->{cmd} = [ @{$_->{rsh}}, @{$_->{cmd}} ]; foreach my $href (@cmd_pipe)
return undef unless(safe_cmd($_->{cmd}));
$_->{cmd_text} = join(' ', map { "'$_'" } @{$_->{cmd}});
$catch_stderr = 1 if($_->{catch_stderr});
$filter_stderr = $_->{filter_stderr} if($_->{filter_stderr}); # NOTE: last filter wins!
$destructive = 1 unless($_->{non_destructive});
}
my $cmd_print = join(' | ', map { $_->{cmd_text} } @commands);
my $cmd = $cmd_print;
if($catch_stderr) {
if(scalar(@commands) == 1) {
# no pipes, simply redirect stderr to stdout
$cmd .= ' 2>&1';
}
else
{ {
# pipe chain is more complicated, result is something like this: $catch_stderr = 1 if($href->{catch_stderr});
$filter_stderr = $href->{filter_stderr} if($href->{filter_stderr}); # NOTE: last filter wins!
$destructive = 1 unless($href->{non_destructive});
if($href->{compress}) {
if($compressed && ($compression{$compressed->{key}}->{format} ne $compression{$href->{compress}->{key}}->{format})) {
push @cmd_text, { text => decompress_cmd($compressed) };
$compressed = undef;
}
unless($compressed) {
push @cmd_text, { text => compress_cmd($href->{compress}) };
$compressed = $href->{compress};
}
}
elsif($href->{redirect}) {
my $file = safe_cmd([ $href->{redirect} ]);
return undef unless(defined($file));
if($href->{rsh}) {
my $rsh_text = safe_cmd($href->{rsh});
return undef unless(defined($rsh_text));
push @cmd_text, { text => $rsh_text . " 'cat > " . $file . "'" };
}
else {
push @cmd_text, { redirect => $file };
}
}
elsif($href->{cmd}) {
my $sc = safe_cmd($href->{cmd});
return undef unless(defined($sc));
if($href->{rsh}) {
my $rsh_text = safe_cmd($href->{rsh});
return undef unless(defined($rsh_text));
if($href->{rsh_compress_in}) {
if($compressed && ((not $href->{compressed_ok}) ||
($compression{$compressed->{key}}->{format} ne $compression{$href->{rsh_compress_in}->{key}}->{format})))
{
push @cmd_text, { text => decompress_cmd($compressed) };
$compressed = undef;
}
unless($compressed) {
push @cmd_text, { text => compress_cmd($href->{rsh_compress_in}) };
$compressed = $href->{rsh_compress_in};
}
unless($href->{compressed_ok}) {
$sc = decompress_cmd($href->{rsh_compress_in}) . ' | ' . $sc;
$compressed = undef;
}
}
if($href->{rsh_compress_out}) {
$sc .= ' | ' . compress_cmd($href->{rsh_compress_out});
$compressed = $href->{rsh_compress_out};
}
$sc = $rsh_text . " '" . $sc . "'";
}
else {
if($compressed && (not ($href->{compressed_ok}))) {
push @cmd_text, { text => decompress_cmd($compressed) };
$compressed = undef;
}
}
push @cmd_text, { text => $sc,
catch_stderr => $href->{catch_stderr} };
}
}
# cmd result is something like this:
# { btrfs send <src> 2>&3 | pv | btrfs receive <dst> 2>&3 ; } 3>&1 # { btrfs send <src> 2>&3 | pv | btrfs receive <dst> 2>&3 ; } 3>&1
$cmd = "{ "; my $cmd_print = "";
my $cmd = "{ ";
my $pipe = ""; my $pipe = "";
foreach (@commands) { foreach (@cmd_text) {
$cmd .= $pipe . $_->{cmd_text}; if($_->{redirect}) {
die unless($pipe);
$cmd_print .= ' > ' . $_->{redirect};
$cmd .= ' > ' . $_->{redirect};
$pipe = undef; # this dies if it is not last command
} else {
$cmd_print .= $pipe . $_->{text};
$cmd .= $pipe . $_->{text};
$cmd .= ' 2>&3' if($_->{catch_stderr}); $cmd .= ' 2>&3' if($_->{catch_stderr});
$pipe = ' | '; $pipe = ' | ';
} }
}
$cmd .= ' ; } 3>&1'; $cmd .= ' ; } 3>&1';
if($catch_stderr) {
if(scalar(@cmd_text) == 1) {
# no pipes, simply redirect stderr to stdout
$cmd = $cmd_print . ' 2>&1';
} }
} }
else {
$cmd = $cmd_print;
}
# hide redirection magic from debug output # hide redirection magic from debug output
if($dryrun && $destructive) { if($dryrun && $destructive) {
@ -536,13 +653,13 @@ sub add_pv_command($@)
if($opts{show_progress}) { if($opts{show_progress}) {
if($rate_limit) { if($rate_limit) {
push @$cmd_pipe, { cmd => [ 'pv', '-trab', '-L', $rate_limit ] }; push @$cmd_pipe, { cmd => [ 'pv', '-trab', '-L', $rate_limit ], compressed_ok => 1 };
} else { } else {
push @$cmd_pipe, { cmd => [ 'pv', '-trab' ] }; push @$cmd_pipe, { cmd => [ 'pv', '-trab' ], compressed_ok => 1 };
} }
} }
elsif($rate_limit) { elsif($rate_limit) {
push @$cmd_pipe, { cmd => [ 'pv', '-q', '-L', $rate_limit ] }; push @$cmd_pipe, { cmd => [ 'pv', '-q', '-L', $rate_limit ], compressed_ok => 1 };
} }
} }
@ -967,15 +1084,17 @@ sub btrfs_send_receive($$$$;@)
my @cmd_pipe; my @cmd_pipe;
push @cmd_pipe, { push @cmd_pipe, {
cmd => [ qw(btrfs send), @send_options, $snapshot_path ], cmd => [ qw(btrfs send), @send_options, $snapshot_path ],
rsh => vinfo_rsh($snapshot), rsh => vinfo_rsh($snapshot, disable_compression => config_compress_hash($snapshot, "stream_compress")),
rsh_compress_out => config_compress_hash($snapshot, "stream_compress"),
name => "btrfs send", name => "btrfs send",
catch_stderr => 1, # hack for shell-based run_cmd() catch_stderr => 1, # hack for shell-based run_cmd()
}; };
add_pv_command(\@cmd_pipe, show_progress => $show_progress, rate_limit => $opts{rate_limit}); add_pv_command(\@cmd_pipe, show_progress => $show_progress, rate_limit => $opts{rate_limit});
push @cmd_pipe, { push @cmd_pipe, {
cmd => [ qw(btrfs receive), @receive_options, $target_path . '/' ], cmd => [ qw(btrfs receive), @receive_options, $target_path . '/' ],
rsh => vinfo_rsh($target), rsh => vinfo_rsh($target, disable_compression => config_compress_hash($target, "stream_compress")),
name => "btrfs receive", name => "btrfs receive",
rsh_compress_in => config_compress_hash($target, "stream_compress"),
catch_stderr => 1, # hack for shell-based run_cmd() catch_stderr => 1, # hack for shell-based run_cmd()
filter_stderr => sub { $err = $_; $_ = undef } filter_stderr => sub { $err = $_; $_ = undef }
}; };
@ -1051,11 +1170,6 @@ sub btrfs_send_to_file($$$$;@)
$target_filename .= '@' . $parent_uuid if($parent_uuid); $target_filename .= '@' . $parent_uuid if($parent_uuid);
$target_filename .= ".btrfs"; $target_filename .= ".btrfs";
my %compress = ( gzip => { name => 'gzip' , cmd => [ 'gzip' ], postfix => '.gz', level_min => 1, level_max => 9 },
bzip2 => { name => 'bzip2', cmd => [ 'bzip2' ], postfix => '.bz2', level_min => 1, level_max => 9 },
xz => { name => 'xz' , cmd => [ 'xz' ], postfix => '.xz', level_min => 0, level_max => 9, threads => '--threads=' },
);
my @send_options; my @send_options;
push(@send_options, '-v') if($loglevel >= 3); push(@send_options, '-v') if($loglevel >= 3);
push(@send_options, '-p', $parent_path) if($parent_path); push(@send_options, '-p', $parent_path) if($parent_path);
@ -1063,38 +1177,14 @@ sub btrfs_send_to_file($$$$;@)
my @cmd_pipe; my @cmd_pipe;
push @cmd_pipe, { push @cmd_pipe, {
cmd => [ qw(btrfs send), @send_options, $source_path ], cmd => [ qw(btrfs send), @send_options, $source_path ],
rsh => vinfo_rsh($source), rsh => vinfo_rsh($source, disable_compression => config_compress_hash($source, "stream_compress")),
name => "btrfs send", name => "btrfs send",
rsh_compress_out => config_compress_hash($source, "stream_compress"),
}; };
add_pv_command(\@cmd_pipe, show_progress => $show_progress, rate_limit => $opts{rate_limit}); add_pv_command(\@cmd_pipe, show_progress => $show_progress, rate_limit => $opts{rate_limit});
if($opts{compress}) { if($opts{compress}) {
die unless($compress{$opts{compress}}); $target_filename .= '.' . $compression{$opts{compress}->{key}}->{format};
$target_filename .= $compress{$opts{compress}}->{postfix}; push @cmd_pipe, { compress => $opts{compress} };
my $compress_cmd = $compress{$opts{compress}}->{cmd};
if(defined($opts{compress_level}) && ($opts{compress_level} ne "default")) {
my $compress_level = $opts{compress_level};
if($compress_level < $compress{$opts{compress}}->{level_min}) {
WARN "Compression level (raw_target_compress_level) capped to minimum for '$opts{compress}': $compress{$opts{compress}}->{level_min}";
$compress_level = $compress{$opts{compress}}->{level_min};
}
if($compress_level > $compress{$opts{compress}}->{level_max}) {
WARN "Compression level (raw_target_compress_level) capped to maximum for '$opts{compress}': $compress{$opts{compress}}->{level_max}";
$compress_level = $compress{$opts{compress}}->{level_max};
}
push @$compress_cmd, '-' . $compress_level;
}
if(defined($opts{compress_threads}) && ($opts{compress_threads} ne "default")) {
my $thread_opt = $compress{$opts{compress}}->{threads};
if($thread_opt) {
push @$compress_cmd, $thread_opt . $opts{compress_threads};
}
else {
WARN "Threading (raw_target_compress_threads) is not supported for '$opts{compress}', ignoring";
}
}
push @cmd_pipe, { cmd => $compress_cmd,
name => $compress{$opts{compress}}->{name}
};
} }
if($opts{encrypt}) { if($opts{encrypt}) {
die unless($opts{encrypt}->{type} eq "gpg"); die unless($opts{encrypt}->{type} eq "gpg");
@ -1105,12 +1195,13 @@ sub btrfs_send_to_file($$$$;@)
push @cmd_pipe, { push @cmd_pipe, {
cmd => [ 'gpg', @gpg_options, '--encrypt' ], cmd => [ 'gpg', @gpg_options, '--encrypt' ],
name => 'gpg', name => 'gpg',
compressed_ok => 1,
}; };
} }
push @cmd_pipe, { push @cmd_pipe, {
cmd => [ 'dd', 'status=none', "of=${target_path}/${target_filename}.part" ], redirect => "${target_path}/${target_filename}.part",
rsh => vinfo_rsh($target), rsh => vinfo_rsh($target),
name => 'dd', compressed_ok => 1,
}; };
my $vol_received = vinfo_child($target, $target_filename); my $vol_received = vinfo_child($target, $target_filename);
@ -1130,7 +1221,8 @@ sub btrfs_send_to_file($$$$;@)
my $ret = run_cmd(@cmd_pipe); my $ret = run_cmd(@cmd_pipe);
if(defined($ret)) { if(defined($ret)) {
# Test target file for "exists and size > 0" after writing, # Test target file for "exists and size > 0" after writing,
# as we can not rely on the exit status of 'dd' # as we can not rely on the exit status of the command pipe,
# and the shell command always creates the target file.
DEBUG "Testing target file (non-zero size): $target->{PRINT}.part"; DEBUG "Testing target file (non-zero size): $target->{PRINT}.part";
$ret = run_cmd({ $ret = run_cmd({
cmd => ['test', '-s', "${target_path}/${target_filename}.part"], cmd => ['test', '-s', "${target_path}/${target_filename}.part"],
@ -1541,9 +1633,10 @@ sub vinfo_child($$;$)
} }
sub vinfo_rsh($) sub vinfo_rsh($;@)
{ {
my $vinfo = shift || die; my $vinfo = shift || die;
my %opts = @_;
my $host = $vinfo->{HOST}; my $host = $vinfo->{HOST};
return undef unless(defined($host)); return undef unless(defined($host));
@ -1553,7 +1646,7 @@ sub vinfo_rsh($)
my $ssh_port = config_key($config, "ssh_port"); my $ssh_port = config_key($config, "ssh_port");
my $ssh_user = config_key($config, "ssh_user"); my $ssh_user = config_key($config, "ssh_user");
my $ssh_identity = config_key($config, "ssh_identity"); my $ssh_identity = config_key($config, "ssh_identity");
my $ssh_compression = config_key($config, "ssh_compression"); my $ssh_compression = $opts{disable_compression} ? undef : config_key($config, "ssh_compression");
my $ssh_cipher_spec = config_key($config, "ssh_cipher_spec") // "default"; my $ssh_cipher_spec = config_key($config, "ssh_cipher_spec") // "default";
my @ssh_options; my @ssh_options;
push(@ssh_options, '-p', $ssh_port) if($ssh_port ne "default"); push(@ssh_options, '-p', $ssh_port) if($ssh_port ne "default");
@ -2228,6 +2321,20 @@ sub config_preserve_hash($$)
} }
sub config_compress_hash($$)
{
my $config = shift || die;
my $config_key = shift || die;
my $compress_key = config_key($config, $config_key);
return undef unless($compress_key);
return {
key => $compress_key,
level => config_key($config, $config_key . "_level"),
threads => config_key($config, $config_key . "_threads"),
};
}
sub config_dump_keys($;@) sub config_dump_keys($;@)
{ {
my $config = shift || die; my $config = shift || die;
@ -2671,9 +2778,7 @@ sub macro_send_receive(@)
} }
} }
$ret = btrfs_send_to_file($source, $target, $parent, \$vol_received, $ret = btrfs_send_to_file($source, $target, $parent, \$vol_received,
compress => config_key($config_target, "raw_target_compress"), compress => config_compress_hash($config_target, "raw_target_compress"),
compress_level => config_key($config_target, "raw_target_compress_level"),
compress_threads => config_key($config_target, "raw_target_compress_threads"),
encrypt => $encrypt, encrypt => $encrypt,
rate_limit => config_key($config_target, "rate_limit"), rate_limit => config_key($config_target, "rate_limit"),
); );