Skip to content

Commit 4dafa6b

Browse files
authored
Backup s3 metadata simultaneously with local data (#192)
* Upload cloud storage metadata parallel with local data * Fix bug * Fix lint * Check if cloud metadata is old * Fix comment * Fix lint * Delete todo * Delte todo 2 * Fix python 3.10 pylint
1 parent 7e1f746 commit 4dafa6b

File tree

11 files changed

+135
-45
lines changed

11 files changed

+135
-45
lines changed

ch_backup/backup/layout.py

+74-18
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,28 @@ def upload_data_part(
197197
raise StorageError(msg) from e
198198

199199
def upload_cloud_storage_metadata(
200-
self, backup_meta: BackupMetadata, disk: Disk, delete_after_upload: bool = False
200+
self,
201+
backup_meta: BackupMetadata,
202+
disk: Disk,
203+
table: Table,
204+
delete_after_upload: bool = False,
201205
) -> bool:
202206
"""
203207
Upload specified disk metadata files from given directory path as a tarball.
204208
Returns: whether backed up disk had data.
205209
"""
210+
assert table.path_on_disk, f"Table {table} doesn't store data on disk"
211+
206212
backup_name = backup_meta.get_sanitized_name()
207213
compression = backup_meta.cloud_storage.compressed
208214
remote_path = _disk_metadata_path(
209-
self.get_backup_path(backup_name), disk.name, compression
215+
self.get_backup_path(backup_name),
216+
table.database,
217+
table.name,
218+
disk.name,
219+
compression,
210220
)
211-
shadow_path = os.path.join(disk.path, "shadow", backup_name)
221+
shadow_path = _table_shadow_path(disk.path, backup_name, table.path_on_disk)
212222
exclude_file_names = ["frozen_metadata.txt"]
213223
if dir_is_empty(shadow_path, exclude_file_names):
214224
return False
@@ -219,6 +229,7 @@ def upload_cloud_storage_metadata(
219229
self._storage_loader.upload_files_tarball_scan(
220230
dir_path=shadow_path,
221231
remote_path=remote_path,
232+
tar_base_dir=table.path_on_disk,
222233
exclude_file_names=exclude_file_names,
223234
is_async=True,
224235
encryption=backup_meta.cloud_storage.encrypted,
@@ -532,6 +543,28 @@ def check_data_part(self, backup_path: str, part: PartMetadata) -> bool:
532543
)
533544
return False
534545

546+
def _get_cloud_storage_metadata_remote_paths(
547+
self,
548+
backup_name: str,
549+
source_disk_name: str,
550+
compression: bool,
551+
) -> Sequence[str]:
552+
# Check if metadata is stored as 'disks/s3.tar.gz' for backwards compatibility
553+
old_style_remote_path = _disk_metadata_path(
554+
self.get_backup_path(backup_name), None, None, source_disk_name, compression
555+
)
556+
if self._storage_loader.path_exists(old_style_remote_path):
557+
return [old_style_remote_path]
558+
return self._storage_loader.list_dir(
559+
str(
560+
os.path.join(
561+
self.get_backup_path(backup_name), "disks", source_disk_name
562+
)
563+
),
564+
recursive=True,
565+
absolute=True,
566+
)
567+
535568
def download_cloud_storage_metadata(
536569
self, backup_meta: BackupMetadata, disk: Disk, source_disk_name: str
537570
) -> None:
@@ -542,22 +575,23 @@ def download_cloud_storage_metadata(
542575
compression = backup_meta.cloud_storage.compressed
543576
disk_path = os.path.join(disk.path, "shadow", backup_name)
544577
os.makedirs(disk_path, exist_ok=True)
545-
remote_path = _disk_metadata_path(
546-
self.get_backup_path(backup_name), source_disk_name, compression
578+
metadata_remote_paths = self._get_cloud_storage_metadata_remote_paths(
579+
backup_name, source_disk_name, compression
547580
)
548581

549-
logging.debug(f'Downloading "{disk_path}" files from "{remote_path}"')
550-
try:
551-
self._storage_loader.download_files(
552-
remote_path=remote_path,
553-
local_path=disk_path,
554-
is_async=True,
555-
encryption=backup_meta.cloud_storage.encrypted,
556-
compression=compression,
557-
)
558-
except Exception as e:
559-
msg = f'Failed to download tarball file "{remote_path}"'
560-
raise StorageError(msg) from e
582+
for remote_path in metadata_remote_paths:
583+
logging.debug(f'Downloading "{disk_path}" files from "{remote_path}"')
584+
try:
585+
self._storage_loader.download_files(
586+
remote_path=remote_path,
587+
local_path=disk_path,
588+
is_async=True,
589+
encryption=backup_meta.cloud_storage.encrypted,
590+
compression=compression,
591+
)
592+
except Exception as e:
593+
msg = f'Failed to download tarball file "{remote_path}"'
594+
raise StorageError(msg) from e
561595

562596
def delete_backup(self, backup_name: str) -> None:
563597
"""
@@ -715,17 +749,39 @@ def _part_path(
715749

716750

717751
def _disk_metadata_path(
718-
backup_path: str, disk_name: str, compressed: bool = False
752+
backup_path: str,
753+
db_name: Optional[str],
754+
table_name: Optional[str],
755+
disk_name: str,
756+
compressed: bool = False,
719757
) -> str:
720758
"""
721759
Returns path to store tarball with cloud storage shadow metadata.
722760
"""
723761
extension = ".tar"
724762
if compressed:
725763
extension += COMPRESSED_EXTENSION
764+
if table_name or db_name:
765+
assert table_name and db_name
766+
return os.path.join(
767+
backup_path,
768+
"disks",
769+
disk_name,
770+
_quote(db_name),
771+
f"{_quote(table_name)}{extension}",
772+
)
726773
return os.path.join(backup_path, "disks", f"{disk_name}{extension}")
727774

728775

776+
def _table_shadow_path(
777+
disk_path: str, backup_name: str, table_path_on_disk: str
778+
) -> str:
779+
"""
780+
Returns path to frozen table data on given disk.
781+
"""
782+
return os.path.join(disk_path, "shadow", backup_name, table_path_on_disk)
783+
784+
729785
def _quote(value: str) -> str:
730786
return quote(value, safe="").translate(
731787
{

ch_backup/backup/metadata/backup_metadata.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -254,13 +254,15 @@ def add_database(self, db: Database) -> None:
254254
"tables": {},
255255
}
256256

257-
def get_tables(self, db_name: str) -> Sequence[TableMetadata]:
257+
def get_tables(self, db_name: Optional[str] = None) -> Sequence[TableMetadata]:
258258
"""
259259
Get tables for the specified database.
260260
"""
261261
result = []
262-
for table_name, raw_metadata in self._databases[db_name]["tables"].items():
263-
result.append(TableMetadata.load(db_name, table_name, raw_metadata))
262+
databases = [db_name] if db_name else self._databases.keys()
263+
for db in databases:
264+
for table_name, raw_metadata in self._databases[db]["tables"].items():
265+
result.append(TableMetadata.load(db, table_name, raw_metadata))
264266

265267
return result
266268

ch_backup/backup/metadata/cloud_storage_metadata.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ def add_disk(self, disk_name: str) -> None:
3838
"""
3939
Add disk name in backed up disks list.
4040
"""
41-
self._disks.append(disk_name)
41+
if disk_name not in self._disks:
42+
self._disks.append(disk_name)
4243

4344
@property
4445
def encrypted(self) -> bool:

ch_backup/clickhouse/disks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def _create_temporary_disk(
184184

185185
self._ch_ctl.reload_config()
186186
source_disk = self._ch_ctl.get_disk(tmp_disk_name)
187-
logging.debug(f'Restoring Cloud Storage "shadow" data of disk "{disk_name}"')
187+
logging.debug(f'Restoring Cloud Storage "shadow" data of disk "{disk_name}"')
188188
self._backup_layout.download_cloud_storage_metadata(
189189
backup_meta, source_disk, disk_name
190190
)

ch_backup/clickhouse/models.py

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
ClickHouse resource models.
33
"""
44

5+
import os
56
import re
67
from types import SimpleNamespace
78
from typing import List, Optional, Tuple
@@ -75,6 +76,11 @@ def __init__(
7576
self.paths_with_disks = self._map_paths_to_disks(disks, data_paths)
7677
self.metadata_path = metadata_path
7778

79+
self.path_on_disk = None
80+
if self.paths_with_disks:
81+
path, disk = self.paths_with_disks[0]
82+
self.path_on_disk = os.path.relpath(path, disk.path)
83+
7884
def _map_paths_to_disks(
7985
self, disks: List[Disk], data_paths: List[str]
8086
) -> List[Tuple[str, Disk]]:

ch_backup/logic/table.py

+16-15
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ def backup(
6161

6262
backup_name = context.backup_meta.get_sanitized_name()
6363

64+
if context.cloud_conf.get("cloud_storage", {}).get("encryption", True):
65+
logging.debug('Cloud Storage "shadow" backup will be encrypted')
66+
context.backup_meta.cloud_storage.encrypt()
67+
if context.cloud_conf.get("cloud_storage", {}).get("compression", True):
68+
logging.debug('Cloud Storage "shadow" backup will be compressed')
69+
context.backup_meta.cloud_storage.compress()
70+
6471
for db in databases:
6572
self._backup(
6673
context,
@@ -71,8 +78,6 @@ def backup(
7178
freeze_threads,
7279
)
7380

74-
self._backup_cloud_storage_metadata(context)
75-
7681
def _collect_local_metadata_mtime(
7782
self, context: BackupContext, db: Database, tables: Sequence[str]
7883
) -> Dict[str, TableMetadataMtime]:
@@ -153,6 +158,7 @@ def _backup(
153158
mtimes,
154159
create_statement,
155160
)
161+
self._backup_cloud_storage_metadata(context, table)
156162

157163
context.backup_layout.wait()
158164
context.ch_ctl.remove_freezed_data()
@@ -223,28 +229,23 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
223229
return None
224230

225231
@staticmethod
226-
def _backup_cloud_storage_metadata(context: BackupContext) -> None:
232+
def _backup_cloud_storage_metadata(context: BackupContext, table: Table) -> None:
227233
"""
228234
Backup cloud storage metadata files.
229235
"""
230-
if context.cloud_conf.get("cloud_storage", {}).get("encryption", True):
231-
logging.debug('Cloud Storage "shadow" backup will be encrypted')
232-
context.backup_meta.cloud_storage.encrypt()
233-
if context.cloud_conf.get("cloud_storage", {}).get("compression", True):
234-
logging.debug('Cloud Storage "shadow" backup will be compressed')
235-
context.backup_meta.cloud_storage.compress()
236-
237-
logging.debug('Backing up Cloud Storage disks "shadow" directory')
238-
disks = context.ch_ctl.get_disks()
239-
for disk in disks.values():
236+
logging.debug(
237+
'Backing up Cloud Storage disks "shadow" directory of "{}"."{}"',
238+
table.database,
239+
table.name,
240+
)
241+
for _, disk in table.paths_with_disks:
240242
if disk.type == "s3" and not disk.cache_path:
241243
if not context.backup_layout.upload_cloud_storage_metadata(
242-
context.backup_meta, disk
244+
context.backup_meta, disk, table
243245
):
244246
logging.debug(f'No data frozen on disk "{disk.name}", skipping')
245247
continue
246248
context.backup_meta.cloud_storage.add_disk(disk.name)
247-
logging.debug("Cloud Storage disks has been backed up ")
248249

249250
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
250251
def restore(

ch_backup/storage/async_pipeline/pipeline_builder.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ def build_decompress_stage(self) -> "PipelineBuilder":
112112
return self
113113

114114
def build_read_files_tarball_scan_stage(
115-
self, dir_path: Path, exclude_file_names: Optional[List[str]] = None
115+
self,
116+
dir_path: Path,
117+
tar_base_dir: Optional[str] = None,
118+
exclude_file_names: Optional[List[str]] = None,
116119
) -> "PipelineBuilder":
117120
"""
118121
Build reading files to tarball stage.
@@ -122,7 +125,9 @@ def build_read_files_tarball_scan_stage(
122125

123126
self.append(
124127
thread_input(
125-
ReadFilesTarballScanStage(stage_config, dir_path, exclude_file_names),
128+
ReadFilesTarballScanStage(
129+
stage_config, dir_path, tar_base_dir, exclude_file_names
130+
),
126131
maxsize=queue_size,
127132
)
128133
)

ch_backup/storage/async_pipeline/pipeline_executor.py

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def upload_files_tarball_scan(
9090
encryption: bool,
9191
delete: bool,
9292
compression: bool,
93+
tar_base_dir: Optional[str] = None,
9394
exclude_file_names: Optional[List[str]] = None,
9495
callback: Optional[Callable] = None,
9596
) -> None:
@@ -107,6 +108,7 @@ def upload_files_tarball_scan(
107108
encryption,
108109
delete_after=delete,
109110
compression=compression,
111+
tar_base_dir=tar_base_dir,
110112
exclude_file_names=exclude_file_names,
111113
)
112114
self._exec_pipeline(job_id, pipeline, is_async, callback)

ch_backup/storage/async_pipeline/pipelines.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def upload_files_tarball_scan_pipeline(
6767
encrypt: bool,
6868
delete_after: bool,
6969
compression: bool,
70+
tar_base_dir: Optional[str] = None,
7071
exclude_file_names: Optional[List[str]] = None,
7172
) -> None:
7273
"""
@@ -80,7 +81,9 @@ def upload_files_tarball_scan_pipeline(
8081
estimated_size = calc_tarball_size_scan(
8182
base_path, estimated_size, exclude_file_names
8283
)
83-
builder.build_read_files_tarball_scan_stage(base_path, exclude_file_names)
84+
builder.build_read_files_tarball_scan_stage(
85+
base_path, tar_base_dir, exclude_file_names
86+
)
8487
if compression:
8588
builder.build_compress_stage()
8689
if encrypt:

ch_backup/storage/async_pipeline/stages/filesystem/read_files_tarball_stage.py

+16-4
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,26 @@ def __init__(
2222
self,
2323
config: dict,
2424
base_path: Path,
25+
tar_base_dir: Optional[str] = None,
2526
) -> None:
2627
self._chunk_size = config["chunk_size"]
2728
self._base_path = base_path
2829
self._file_source: Iterable[Any] = []
30+
self._tar_base_dir: Optional[str] = tar_base_dir
2931

3032
def __call__(self) -> Iterator[bytes]:
3133
"""
3234
Read files and yield them as TAR stream.
3335
"""
3436
for file_relative_path in self._file_source:
3537
file_path = self._base_path / file_relative_path
38+
file_path_in_tar = (
39+
Path(self._tar_base_dir) / file_relative_path
40+
if self._tar_base_dir
41+
else file_relative_path
42+
)
3643

37-
yield self.make_tar_header(str(file_relative_path), file_path)
44+
yield self.make_tar_header(str(file_path_in_tar), file_path)
3845
yield from self.read_file_content(file_path)
3946

4047
def read_file_content(self, file_path: Path) -> Iterator[bytes]:
@@ -70,9 +77,10 @@ def __init__(
7077
self,
7178
config: dict,
7279
base_path: Path,
80+
tar_base_dir: Optional[str] = None,
7381
exclude_file_names: Optional[List[str]] = None,
7482
) -> None:
75-
super().__init__(config, base_path)
83+
super().__init__(config, base_path, tar_base_dir)
7684
self._file_source = scan_dir_files(self._base_path, exclude_file_names)
7785

7886

@@ -82,7 +90,11 @@ class ReadFilesTarballStage(ReadFilesTarballStageBase):
8290
"""
8391

8492
def __init__(
85-
self, config: dict, base_path: Path, file_relative_paths: List[Path]
93+
self,
94+
config: dict,
95+
base_path: Path,
96+
file_relative_paths: List[Path],
97+
tar_base_dir: Optional[str] = None,
8698
) -> None:
87-
super().__init__(config, base_path)
99+
super().__init__(config, base_path, tar_base_dir)
88100
self._file_source = file_relative_paths

0 commit comments

Comments
 (0)