Doppelte-Bilder-bzw-Dateien-finden-und-aufräumen

Aus lugvswiki
Version vom 30. Dezember 2023, 10:37 Uhr von Marc (Diskussion | Beiträge) (Die Seite wurde neu angelegt: „= Doppelte-Bilder-bzw-Dateien-finden-und-aufräumen = #!/usr/bin/env python # Findet doppelte Bilder nach Größe und falls Exif-Daten vorhanden sind, n…“)
(Unterschied) ← Nächstältere Version | Aktuelle Version (Unterschied) | Nächstjüngere Version → (Unterschied)
Zur Navigation springenZur Suche springen

Doppelte-Bilder-bzw-Dateien-finden-und-aufräumen

 #!/usr/bin/env python
 
 # Findet doppelte Bilder nach Größe und falls Exif-Daten vorhanden sind, nach Kamera-Modell / Aufnahmedatum.
 
 # Nehmen wir diesen Test an:
 
 # Verzeichnisse erstellen
 # mkdir -p find_groups/{a,b,c}
 # Inhalte erstellen
 # echo 1 > find_groups/a/A
 # echo 1 > find_groups/b/A
 # echo 1 > find_groups/c/A
 # echo 1 > find_groups/a/B
 # echo 1 > find_groups/b/B
 # echo 1 > find_groups/c/B
 # 
 # echo 2 > find_groups/a/Z
 # echo 2 > find_groups/b/Z
 #
 # Dann werden 2 Gruppen gefunden:
 
 # GRUPPE INDEX=0
 # Verzeichnis 0: find_groups/a
 # Verzeichnis 1: find_groups/b
 # Verzeichnis 2: find_groups/c
 # Dateien A,B
 #
 # GRUPPE INDEX=1
 # Verzeichnis 0: find_groups/a
 # Verzeichnis 1: find_groups/b
 # Dateien Z
 #
 # Dann kann gesagt werden, dass Gruppenindex 0 aus Verzeichnis 1 gelöscht werden kann.
 #
 # Exif und sizes werden in cache.json gespeichert. Auch 1TB an Daten kann so
 # schnell und wiederholend verglichen werden.
 
 
 # finds duplicate images
 # by size and if exif data present by comara model / capture date
 #
 # Assume this test:
 # 
 # mkdir -p find_groups/{a,b,c}
 # echo 1 > find_groups/a/A
 # echo 1 > find_groups/b/A
 # echo 1 > find_groups/c/A
 # echo 1 > find_groups/a/B
 # echo 1 > find_groups/b/B
 # echo 1 > find_groups/c/B
 # 
 # echo 2 > find_groups/a/Z
 # echo 2 > find_groups/b/Z
 #
 #
 # Then 2 groups will be found:
 #
 # GROUP INDEX=0
 # directory 0: find_groups/a
 # directory 1: find_groups/b
 # directory 2: find_groups/c
 # files A,B
 #
 # GROUP INDEX=1
 
 # directory 0: find_groups/a
 # directory 1: find_groups/b
 # files Z
 #
 # Then you can say delete group index 0 from directory 1
 # 
 # Exif and sizes get cached in cache.json. Even TB of data can be maintained
 # this way easily.
 
 import signal
 import sys
 import exifread
 from datetime import datetime
 import json
 from concurrent.futures import ThreadPoolExecutor
 # from typing import Self
 from tqdm import tqdm
 import os
 import traceback
 import json
 from pathlib import Path
 
 def files(path):
     matches = []
     for root, dirnames, filenames in os.walk(path):
         for p in filenames:
             x = os.path.join(root, p)
             if os.path.isfile(x):
                 matches.append(x)
     return matches
 
 
 def get_exif_date_model(image_path):
     try:
         with open(image_path, 'rb') as image_file:
             exif_tags = exifread.process_file(image_file, details=False)
             if 'EXIF DateTimeOriginal' in exif_tags:
                 exif_date = exif_tags['EXIF DateTimeOriginal']
                 exif_date = datetime.strptime(str(exif_date), '%Y:%m:%d %H:%M:%S')
                 model = exif_tags['Image Model']
                 print([exif_date, model])
                 return [image_path, f"{exif_date}:f{model}"]
     except Exception as e:
         print(f"Error reading EXIF data: {e}")
         traceback.print_exc()
     return [image_path, "NO-EXIF"]
 
 def get_size(path):
     return [path, os.path.getsize(path)]
 
 def fill_cache(cache, paths, f, prefix):
     with tqdm(total=len(paths)) as progress:
         with ThreadPoolExecutor() as executor:
             for x in executor.map(f, paths):
                 if x:
                     file, r = x
                     progress.update()
                     cache[f"{prefix}{file}"] = r
                 else:
                     pass # no jpeg
             executor.shutdown()
 
 
 class UJsonStorage:
 
     def __init__(self, file_path):
         self.file_path = file_path
         self.data = {}
         try:
             with open(file_path, 'r') as file:
                 self.data = json.load(file)
         except FileNotFoundError:
             pass
 
     def __enter__(self):
         # even ctrl-c is enough to have the file written incompletely.
         self.set_sigterm_handler()
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         with open(self.file_path, 'w') as file:
             json.dump(self.data, file, indent=4)
 
     def get_or(self, k, f):
         if not k in self.data:
             self.data[k] = f()
         return self.data[k]
 
     def __getitem__(self, item):
         return self.data[item]
 
     def __setitem__(self, key, value):
         self.data[key] = value
 
     def __delitem__(self, key):
         del self.data[key]
 
     def set_sigterm_handler(self):
         Assigns sigterm_handler for graceful shutdown during dump()
         def sigterm_handler(*args, **kwargs):
             if self.dthread is not None:
                 self.dthread.join()
             sys.exit(0)
         signal.signal(signal.SIGTERM, sigterm_handler)
 
 
 def main(directories):
 
     with UJsonStorage("cache.json") as cache:
         files_ = [y  for x in directories for y in files(x)]
         print("files_")
         print(files_)
         print("e")
 
         exif_missing = []
         size_missing = []
 
         def exif_key(file):
             return f"exif:{file}"
         def size_key(file):
             return f"size:{file}"
 
         for x in files_:
             ek = exif_key(x)
             sk = size_key(x)
             if not ek in cache.data:
                 exif_missing.append(x)
             if not sk in cache.data:
                 size_missing.append(x)
 
         print('lese missing exif')
         fill_cache(cache, exif_missing, get_exif_date_model, "exif:")
         print('lese missing size')
         fill_cache(cache, size_missing, get_size, "size:")
 
         def key(path):
             size = cache[f"size:{path}"]
             exif = cache[f"exif:{path}"]
             return f"{os.path.basename(path)}:{size}:{exif}"
 
         bydirs = {}
         for f in files_:
             p = Path(f)
             k = key(f)
             if not k in bydirs:
                 bydirs[k] = {"basename": p.name, "directories": []}
             bydirs[k]["directories"].append(str(p.parent))
 
         print(bydirs)
 
         groups = {}
 
         for k, v in bydirs.items():
             v["directories"].sort()
             d_str = "::".join(v["directories"])
             if not d_str in groups:
                 groups[d_str] = []
             groups[d_str].append(v["basename"])
 
         group_list = []
 
         for k, v in groups.items():
             group_list.append({
                 "directory_list": k.split("::"),
                 "files": v
                 })
 
         group_list = [x for x in group_list if len(x["directory_list"]) > 1]
 
         group_list.sort(key = lambda x: len(x["files"]))
 
         def print_dirs(g):
             for i, d in enumerate(g["directory_list"]):
                 print("%s: %s " % (i, d))
 
         for i, g in enumerate(group_list):
             print("")
             print("")
             print("GROUP [INDEX=%s] === count: %s" % (i, len(g["files"])))
             print("directories:")
             print_dirs(g)
             print("files:")
             print(g["files"])
 
         while True:
             a = input("delete group [index|Q=quit]: ")
             if (a == "Q"):
                 return
             a = int(a)
             group = group_list[int(a)]
             print_dirs(group)
             b = input("delete files from directory [index]: ")
             idx = int(a)
             d = group["directory_list"][int(b)]
             for f in group["files"]:
                 x = os.path.join(d, f)
                 print("del %s" % x)
                 os.unlink(x)
 
 if __name__ == "__main__":
     directories = sys.argv[1:]
     print(directories)
     main(directories)