btrbk: unconditionally add clone sources to btrfs-send if necessary

Preferences for parent (and required clone sources):

 1. closest older in snapdir (by btrbk timestamp), related
 2. closest older related (by cgen)
 3. closest newer related (by cgen)
 4. closest older in snapdir (by btrbk timestamp)
 5. closest newer in snapdir (by btrbk timestamp)

Note: prefering 1 over 2 helps keeping parent-chain within droot on
target (assuming that btrfs always uses correlated parent on
btrfs-receive).

This will e.g. add a clone source on "btrbk resume", if both older AND
newer snapshot/backup pairs exists.

Also makes sure that the closest older btrbk snapshot is always added
as clone source, even if another related subvolume has newer cgen.
pull/274/head
Axel Burri 2019-04-09 22:09:12 +02:00
parent 95e25eb2d1
commit c407d41db2
1 changed files with 133 additions and 71 deletions

204
btrbk
View File

@ -1378,31 +1378,26 @@ sub btrfs_send_receive($$;$$$)
my $snapshot = shift || die; my $snapshot = shift || die;
my $target = shift || die; my $target = shift || die;
my $parent = shift; my $parent = shift;
my $clone_src = shift // []; # arrayref of [ vinfo, correlated_target_node ] my $clone_src = shift // [];
my $ret_vol_received = shift; my $ret_vol_received = shift;
my $snapshot_path = $snapshot->{PATH} // die; my $snapshot_path = $snapshot->{PATH} // die;
my $target_path = $target->{PATH} // die; my $target_path = $target->{PATH} // die;
my $parent_path = $parent ? $parent->{PATH} : undef; my $parent_path = $parent ? $parent->{PATH} : undef;
my @clone_src_path;
my $incremental_clones = config_key($target, "incremental_clones");
if(my $cnt = $incremental_clones) {
@clone_src_path = map { --$cnt < 0 ? ( ) : $_->[0]{PATH} } @$clone_src;
}
my $vol_received = vinfo_child($target, $snapshot->{NAME}); my $vol_received = vinfo_child($target, $snapshot->{NAME});
$$ret_vol_received = $vol_received if(ref $ret_vol_received); $$ret_vol_received = $vol_received if(ref $ret_vol_received);
print STDOUT "Creating backup: $vol_received->{PRINT}\n" if($show_progress && (not $dryrun)); print STDOUT "Creating backup: $vol_received->{PRINT}\n" if($show_progress && (not $dryrun));
INFO "[send/receive] target: $vol_received->{PRINT}";
INFO "[send/receive] source: $snapshot->{PRINT}"; INFO "[send/receive] source: $snapshot->{PRINT}";
INFO "[send/receive] parent: $parent->{PRINT}" if($parent); INFO "[send/receive] parent: $parent->{PRINT}" if($parent);
INFO "[send/receive] target: $vol_received->{PRINT}"; INFO "[send/receive] clone-src: $_->{PRINT}" foreach(@$clone_src);
INFO "[send/receive] using " . (scalar @clone_src_path) . " clone sources" if($incremental_clones);
my @send_options; my @send_options;
my @receive_options; my @receive_options;
push(@send_options, '-p', { unsafe => $parent_path} ) if($parent_path); push(@send_options, '-p', { unsafe => $parent_path} ) if($parent_path);
push(@send_options, '-c', { unsafe => $_ } ) foreach(@clone_src_path); push(@send_options, '-c', { unsafe => $_ } ) foreach(map { $_->{PATH} } @$clone_src);
# push(@send_options, '-v') if($loglevel >= 3); # push(@send_options, '-v') if($loglevel >= 3);
# push(@receive_options, '-v') if($loglevel >= 3); # push(@receive_options, '-v') if($loglevel >= 3);
@ -2951,7 +2946,6 @@ sub _correlated_nodes($$)
my $uuid = $src_vol->{node}{uuid}; my $uuid = $src_vol->{node}{uuid};
my $received_uuid = $src_vol->{node}{received_uuid}; my $received_uuid = $src_vol->{node}{received_uuid};
$received_uuid = undef if($received_uuid eq '-'); $received_uuid = undef if($received_uuid eq '-');
TRACE "correlated_nodes: src_vol=\"$src_vol->{PRINT}\", droot=\"$droot->{PRINT}\"";
my $received_uuid_hash = $droot->{node}{TREE_ROOT}{RECEIVED_UUID_HASH}; my $received_uuid_hash = $droot->{node}{TREE_ROOT}{RECEIVED_UUID_HASH};
my $uuid_hash = $droot->{node}{TREE_ROOT}{UUID_HASH}; my $uuid_hash = $droot->{node}{TREE_ROOT}{UUID_HASH};
@ -2965,7 +2959,7 @@ sub _correlated_nodes($$)
} }
@ret = grep($_->{readonly}, @match); @ret = grep($_->{readonly}, @match);
TRACE "correlated_nodes: " . scalar(@ret) . " receive targets in \"$droot->{PRINT}/\" for: $src_vol->{PRINT}"; TRACE "correlated_nodes: droot=\"$droot->{PRINT}/\", src_vol=\"$src_vol->{PRINT}\": " . join(", ", map _fs_path($_),@ret) if($loglevel >= 4);
return @ret; return @ret;
} }
@ -3005,26 +2999,25 @@ sub get_receive_targets($$;@)
# returns best correlated receive target within droot (independent of btrbk name) # returns best correlated receive target within droot (independent of btrbk name)
sub get_best_correlated_target($$;@) sub get_best_correlated($$;@)
{ {
my $droot = shift || die; my $droot = shift || die;
my $src_vol = shift || die; my $src_vol = shift || die;
my %opts = @_; my %opts = @_;
my $filtered_nodes = $opts{push_filtered_nodes}; my $inaccessible_nodes = $opts{push_inaccessible_nodes};
my @correlated = _correlated_nodes($droot, $src_vol); # all matching src_vol, from droot->TREE_ROOT my @correlated = _correlated_nodes($droot, $src_vol); # all matching src_vol, from droot->TREE_ROOT
foreach (@correlated) { foreach (@correlated) {
my $vinfo = vinfo_resolved($_, $droot); # $vinfo is within $resolve_droot my $vinfo = vinfo_resolved($_, $droot); # $vinfo is within $droot
return $vinfo if($vinfo); return [ $src_vol, $vinfo ] if($vinfo);
push @$filtered_nodes, $_ if($filtered_nodes);
} }
if($opts{fallback_all_mountpoints}) { if($opts{fallback_all_mountpoints}) {
foreach (@correlated) { foreach (@correlated) {
my $vinfo = vinfo_resolved_all_mountpoints($_, $droot); # $vinfo is within any mountpoint of filesystem at $droot my $vinfo = vinfo_resolved_all_mountpoints($_, $droot); # $vinfo is within any mountpoint of filesystem at $droot
return $vinfo if($vinfo); return [ $src_vol, $vinfo ] if($vinfo);
push @$filtered_nodes, $_ if($filtered_nodes);
} }
} }
push @$inaccessible_nodes, @correlated if($inaccessible_nodes);
return undef; return undef;
} }
@ -3071,8 +3064,8 @@ sub _push_related_children
# returns all related readonly nodes (by parent_uuid relationship), # returns all related readonly nodes (by parent_uuid relationship),
# sort by absolute cgen delta, favor older # sorted by relation distance.
sub get_related_subvolume_nodes($) sub get_related_readonly_nodes($)
{ {
my $vol = shift // die; my $vol = shift // die;
TRACE "related_nodes: resolving related subvolumes of: $vol->{PATH}"; TRACE "related_nodes: resolving related subvolumes of: $vol->{PATH}";
@ -3100,21 +3093,22 @@ sub get_related_subvolume_nodes($)
} }
WARN "Maximum distance reached, related subvolume search aborted" if($distance >= 256); WARN "Maximum distance reached, related subvolume search aborted" if($distance >= 256);
TRACE "related_nodes: found total=" . scalar(@related_nodes) . " related readonly subvolumes"; TRACE "related_nodes: found total=" . scalar(@related_nodes) . " related readonly subvolumes";
my @sorted = sort { (abs($cgen_ref - $a->{cgen}) <=> abs($cgen_ref - $b->{cgen})) || return \@related_nodes;
($a->{cgen} <=> $b->{cgen}) } @related_nodes;
return \@sorted;
} }
# returns ( parent, first_matching_target_node ) # returns parent, along with clone sources
sub get_best_parent($$$;@) sub get_best_parent($$$;@)
{ {
my $svol = shift // die; my $svol = shift // die;
my $snaproot = shift // die; my $snaproot = shift // die;
my $droot = shift || die; my $droot = shift || die;
my %opts = @_; my %opts = @_;
my $fallback_btrbk_basename = $opts{fallback_btrbk_basename} // 1; # default true, see below my $ret_clone_src = $opts{clone_src};
my $clone_src = $opts{clone_src}; my $ret_clone_src_extra = $opts{clone_src_extra};
my $ret_target_parent_node = $opts{target_parent_node};
TRACE "get_best_parent: resolving best common parent for subvolume: $svol->{PRINT} (droot=$droot->{PRINT})";
# honor incremental_resolve option # honor incremental_resolve option
my $source_incremental_resolve = config_key($svol, "incremental_resolve"); my $source_incremental_resolve = config_key($svol, "incremental_resolve");
@ -3131,26 +3125,37 @@ sub get_best_parent($$$;@)
my $source_fallback_all_mountpoints = ($source_incremental_resolve eq "_all_accessible"); my $source_fallback_all_mountpoints = ($source_incremental_resolve eq "_all_accessible");
my $target_fallback_all_mountpoints = ($target_incremental_resolve eq "_all_accessible"); my $target_fallback_all_mountpoints = ($target_incremental_resolve eq "_all_accessible");
TRACE "get_best_parent: resolving best common parent for subvolume: $svol->{PATH} (droot=$droot->{PRINT})"; my @inaccessible_nodes;
my $all_related_nodes = get_related_subvolume_nodes($svol); my %gbc_opts = ( push_inaccessible_nodes => \@inaccessible_nodes,
fallback_all_mountpoints => $target_fallback_all_mountpoints,
);
# filter candidates # resolve correlated subvolumes by parent_uuid relationship
my @candidate; # candidates for parent, ordered by "best suited" my %c_rel_id; # map id to c_related
foreach (@$all_related_nodes) { my @c_related; # candidates for parent (correlated + related), unsorted
foreach (@{get_related_readonly_nodes($svol)}) {
my $vinfo = vinfo_resolved($_, $resolve_sroot); my $vinfo = vinfo_resolved($_, $resolve_sroot);
if((not $vinfo) && $source_fallback_all_mountpoints) { # related node is not under $resolve_sroot if((not $vinfo) && $source_fallback_all_mountpoints) { # related node is not under $resolve_sroot
$vinfo = vinfo_resolved_all_mountpoints($_, $svol); $vinfo = vinfo_resolved_all_mountpoints($_, $svol);
} }
if($vinfo) { if($vinfo) {
push @candidate, $vinfo; my $correlated = get_best_correlated($droot, $vinfo, %gbc_opts);
push @c_related, $correlated if($correlated);
$c_rel_id{$_->{id}} = $correlated;
} else { } else {
DEBUG "Related subvolume is not accessible within $source_incremental_resolve \"$resolve_sroot->{PRINT}\": " . _fs_path($_); DEBUG "Related subvolume is not accessible within $source_incremental_resolve \"$resolve_sroot->{PRINT}\": " . _fs_path($_);
} }
} }
# sort by cgen
my $cgen_ref = $svol->{node}{readonly} ? $svol->{node}{cgen} : $svol->{node}{gen};
my @c_related_older = sort { ($cgen_ref - $a->[0]{node}{cgen}) <=> ($cgen_ref - $b->[0]{node}{cgen}) }
grep { $_->[0]{node}{cgen} <= $cgen_ref } @c_related;
my @c_related_newer = sort { ($a->[0]{node}{cgen} - $cgen_ref) <=> ($b->[0]{node}{cgen} - $cgen_ref) }
grep { $_->[0]{node}{cgen} > $cgen_ref } @c_related;
# NOTE: get_related_subvolume_nodes() is very sophisticated and # NOTE: While get_related_readonly_nodes() returns deep parent_uuid
# returns all known relations, there is always a chance that # relations, there is always a chance that these relations get
# relations get broken. # broken.
# #
# Consider parent_uuid chain ($svol readonly) # Consider parent_uuid chain ($svol readonly)
# B->A, C->B, delete B: C has no relation to A. # B->A, C->B, delete B: C has no relation to A.
@ -3159,43 +3164,88 @@ sub get_best_parent($$$;@)
# For snapshots (here: S=$svol readwrite) the scenario is different: # For snapshots (here: S=$svol readwrite) the scenario is different:
# A->S, B->S, C->S, delete B: A still has a relation to C. # A->S, B->S, C->S, delete B: A still has a relation to C.
# #
# add subvolumes in same directory matching btrbk file name scheme # resolve correlated subvolumes in same directory matching btrbk file name scheme
if($fallback_btrbk_basename && exists($svol->{node}{BTRBK_BASENAME})) { my (@c_snapdir_older, @c_snapdir_newer);
if(exists($svol->{node}{BTRBK_BASENAME})) {
my $snaproot_btrbk_direct_leaf = vinfo_subvol_list($snaproot, readonly => 1, btrbk_direct_leaf => $svol->{node}{BTRBK_BASENAME}); my $snaproot_btrbk_direct_leaf = vinfo_subvol_list($snaproot, readonly => 1, btrbk_direct_leaf => $svol->{node}{BTRBK_BASENAME});
my @direct_leaf_older = grep { cmp_date($_->{node}{BTRBK_DATE}, $svol->{node}{BTRBK_DATE}) < 0 } @$snaproot_btrbk_direct_leaf; my @sbdl_older = sort { cmp_date($b->{node}{BTRBK_DATE}, $a->{node}{BTRBK_DATE}) }
my @direct_leaf_newer = grep { cmp_date($_->{node}{BTRBK_DATE}, $svol->{node}{BTRBK_DATE}) > 0 } @$snaproot_btrbk_direct_leaf; grep { cmp_date($_->{node}{BTRBK_DATE}, $svol->{node}{BTRBK_DATE}) < 0 } @$snaproot_btrbk_direct_leaf;
push @candidate, sort { cmp_date($b->{node}{BTRBK_DATE}, $a->{node}{BTRBK_DATE}) } @direct_leaf_older; my @sbdl_newer = sort { cmp_date($a->{node}{BTRBK_DATE}, $b->{node}{BTRBK_DATE}) }
push @candidate, sort { cmp_date($a->{node}{BTRBK_DATE}, $b->{node}{BTRBK_DATE}) } @direct_leaf_newer; grep { cmp_date($_->{node}{BTRBK_DATE}, $svol->{node}{BTRBK_DATE}) > 0 } @$snaproot_btrbk_direct_leaf;
TRACE "get_best_parent: subvolume has btrbk naming scheme, add " . scalar(@direct_leaf_older) . " older and " . scalar(@direct_leaf_newer) . " newer (by file suffix) candidates with scheme: $snaproot->{PRINT}/$svol->{node}{BTRBK_BASENAME}.*";
@c_snapdir_older = map { $c_rel_id{$_->{node}{id}} // get_best_correlated($droot, $_, %gbc_opts) // () } @sbdl_older;
@c_snapdir_newer = map { $c_rel_id{$_->{node}{id}} // get_best_correlated($droot, $_, %gbc_opts) // () } @sbdl_newer;
} }
# get correlated receive targets of candidates, return first matching within $resolve_droot if($loglevel >= 4) {
my @resolved; TRACE "get_best_parent: related reference cgen=$svol->{node}{cgen}";
my @filtered_nodes; TRACE "get_best_parent: related older: $_->[0]{PRINT} (cgen=$_->[0]{node}{cgen})" foreach(@c_related_older);
my %uniq; TRACE "get_best_parent: related newer: $_->[0]{PRINT} (cgen=$_->[0]{node}{cgen})" foreach(@c_related_newer);
foreach my $cand (@candidate) { TRACE "get_best_parent: snapdir older: $_->[0]{PRINT}" foreach(@c_snapdir_older);
next if($uniq{$cand->{node}{id}}); TRACE "get_best_parent: snapdir newer: $_->[0]{PRINT}" foreach(@c_snapdir_newer);
my $correlated_target = get_best_correlated_target($resolve_droot, $cand, push_filtered_nodes => \@filtered_nodes, fallback_all_mountpoints => $target_fallback_all_mountpoints);
if($correlated_target) {
TRACE "get_best_parent: common related from root=\"$resolve_droot->{PRINT}\": \"$cand->{PRINT}\", \"$correlated_target->{PRINT}\"";
push @resolved, [ $cand, $correlated_target ];
last unless($clone_src);
}
$uniq{$cand->{node}{id}} = 1;
}
if(scalar @filtered_nodes) {
WARN "Best common parent for \"$svol->{PRINT}\" is not accessible within target $target_incremental_resolve \"$resolve_droot->{PRINT}\", ignoring: " . join(", ", map('"' . _fs_path($_) . '"',@filtered_nodes));
} }
if(scalar @resolved) { if(scalar @inaccessible_nodes) { # populated by get_best_correlated()
DEBUG "Resolved best common parent (" . (scalar @resolved) . " total): " . $resolved[0][0]->{PRINT}; WARN "Best common parent for \"$svol->{PRINT}\" is not accessible within target $target_incremental_resolve \"$resolve_droot->{PRINT}\", ignoring: " . join(", ", map('"' . _fs_path($_) . '"',@inaccessible_nodes));
my $parent = shift @resolved; }
$$clone_src = \@resolved if($clone_src);
return ($parent->[0], $parent->[1]->{node}); # preferences for parent (and required clone sources):
} else { # 1. closest older in snapdir (by btrbk timestamp), related
# 2. closest older related (by cgen)
# 3. closest newer related (by cgen)
# 4. closest older in snapdir (by btrbk timestamp)
# 5. closest newer in snapdir (by btrbk timestamp)
#
my @parent;
if(my $cc = shift @c_related_older) {
push @parent, $cc; # 2. closest older related (by cgen)
DEBUG "Resolved best common parent (closest older parent_uuid relationship): $cc->[0]{PRINT}";
}
if(my $cc = shift @c_related_newer) {
DEBUG ((scalar @parent ? "Adding clone source" : "Resolved best common parent") . " (closest newer parent_uuid relationship): $cc->[0]{PRINT}");
push @parent, $cc; # 3. closest newer related (by cgen)
}
if(my $cc = shift @c_snapdir_older) {
unless(grep { $_->[0]{node}{id} == $cc->[0]{node}{id} } @parent) {
if($c_rel_id{$cc->[0]{node}{id}}) {
DEBUG "Resolved best common parent (closest older btrbk timestamp, with parent_uuid relationship): $cc->[0]{PRINT}";
unshift @parent, $cc; # 1. closest older in snapdir (by btrbk timestamp), related
}
else {
DEBUG ((scalar @parent ? "Adding clone source" : "Resolved best common parent") . " (closest older btrbk timestamp): $cc->[0]{PRINT}");
push @parent, $cc; # 4. closest older in snapdir (by btrbk timestamp)
}
}
}
if(my $cc = shift @c_snapdir_newer) {
unless(grep { $_->[0]{node}{id} == $cc->[0]{node}{id} } @parent) {
DEBUG ((scalar @parent ? "Adding clone source" : "Resolved best common parent") . " (closest newer btrbk timestamp): $cc->[0]{PRINT}");
push @parent, $cc; # 5. closest newer in snapdir (by btrbk timestamp)
}
}
# assemble results
unless(scalar @parent) {
DEBUG("No common parents of \"$svol->{PRINT}\" found in src=\"$resolve_sroot->{PRINT}/\", target=\"$resolve_droot->{PRINT}/\""); DEBUG("No common parents of \"$svol->{PRINT}\" found in src=\"$resolve_sroot->{PRINT}/\", target=\"$resolve_droot->{PRINT}/\"");
return undef; return undef;
} }
my @extra_clones;
foreach my $cc (@c_related_older, @c_related_newer, grep { not exists($c_rel_id{$_->[0]{node}{id}}) } (@c_snapdir_older, @c_snapdir_newer)) {
push @extra_clones, $cc->[0] unless(grep { $_->[0]{node}{id} == $cc->[0]{node}{id} } @parent);
}
DEBUG "Resolved " . (scalar @extra_clones) . " extra clone sources";
if($loglevel >= 4) {
TRACE "get_best_parent: parent,clones: $_->[0]{PRINT}" foreach(@parent);
TRACE "get_best_parent: extra clone : $_->{PRINT}" foreach(@extra_clones);
}
my $ret_parent = shift @parent;
my @clone_src = map { $_->[0] } @parent;
$$ret_clone_src = \@clone_src if($ret_clone_src);
$$ret_clone_src_extra = \@extra_clones if($ret_clone_src_extra);
$$ret_target_parent_node = $ret_parent->[1]{node} if($ret_target_parent_node);
return $ret_parent->[0];
} }
@ -3765,7 +3815,8 @@ sub macro_send_receive(@)
my $source = $info{source} || die; my $source = $info{source} || die;
my $target = $info{target} || die; my $target = $info{target} || die;
my $parent = $info{parent}; my $parent = $info{parent};
my $clone_src = $info{clone_src}; # arrayref of [ vinfo, correlated_target_node ] my @clone_src = @{ $info{clone_src} // [] }; # copy array
my $clone_src_extra = $info{clone_src_extra} // [];
my $config_target = $target->{CONFIG}; my $config_target = $target->{CONFIG};
die unless($config_target->{CONTEXT} eq "target"); die unless($config_target->{CONTEXT} eq "target");
my $target_type = $config_target->{target_type} || die; my $target_type = $config_target->{target_type} || die;
@ -3796,11 +3847,14 @@ sub macro_send_receive(@)
ABORTED($config_target, "No common parent subvolume found, and option \"incremental\" is set to \"strict\""); ABORTED($config_target, "No common parent subvolume found, and option \"incremental\" is set to \"strict\"");
return undef; return undef;
} }
# add extra clone_src if "incremental_clones" is set
my $ic = config_key($target, "incremental_clones");
push @clone_src, map { --$ic < 0 ? () : $_ } @$clone_src_extra if($ic);
} }
else { else {
INFO "Creating full backup..."; INFO "Creating full backup...";
$parent = undef; $parent = undef;
$clone_src = undef; @clone_src = ();
delete $info{parent}; delete $info{parent};
} }
@ -3809,7 +3863,7 @@ sub macro_send_receive(@)
my $raw_info; my $raw_info;
if($target_type eq "send-receive") if($target_type eq "send-receive")
{ {
$ret = btrfs_send_receive($source, $target, $parent, $clone_src, \$vol_received); $ret = btrfs_send_receive($source, $target, $parent, \@clone_src, \$vol_received);
ABORTED($config_target, "Failed to send/receive subvolume") unless($ret); ABORTED($config_target, "Failed to send/receive subvolume") unless($ret);
} }
elsif($target_type eq "raw") elsif($target_type eq "raw")
@ -3967,12 +4021,16 @@ sub macro_archive_target($$$;$)
my $archive_success = 0; my $archive_success = 0;
foreach my $svol (@archive) foreach my $svol (@archive)
{ {
my $clone_src; my ($clone_src, $clone_src_extra, $target_parent_node);
my ($parent, $target_parent_node) = get_best_parent($svol, $sroot, $droot, clone_src => \$clone_src); my $parent = get_best_parent($svol, $sroot, $droot,
clone_src => \$clone_src,
clone_src_extra => \$clone_src_extra,
target_parent_node => \$target_parent_node);
if(macro_send_receive(source => $svol, if(macro_send_receive(source => $svol,
target => $droot, target => $droot,
parent => $parent, # this is <undef> if no suitable parent found parent => $parent, # this is <undef> if no suitable parent found
clone_src => $clone_src, clone_src => $clone_src,
clone_src_extra => $clone_src_extra,
target_parent_node => $target_parent_node, target_parent_node => $target_parent_node,
)) ))
{ {
@ -6097,12 +6155,16 @@ MAIN:
} }
INFO "Creating subvolume backup (send-receive) for: $child->{PRINT}"; INFO "Creating subvolume backup (send-receive) for: $child->{PRINT}";
my $clone_src; my ($clone_src, $clone_src_extra, $target_parent_node);
my ($parent, $target_parent_node) = get_best_parent($child, $snaproot, $droot, clone_src => \$clone_src); my $parent = get_best_parent($child, $snaproot, $droot,
clone_src => \$clone_src,
clone_src_extra => \$clone_src_extra,
target_parent_node => \$target_parent_node);
if(macro_send_receive(source => $child, if(macro_send_receive(source => $child,
target => $droot, target => $droot,
parent => $parent, # this is <undef> if no suitable parent found parent => $parent, # this is <undef> if no suitable parent found
clone_src => $clone_src, clone_src => $clone_src,
clone_src_extra => $clone_src_extra,
target_parent_node => $target_parent_node, target_parent_node => $target_parent_node,
)) ))
{ {