from __future__ import (absolute_import, division, print_function)
from PIL import Image, ImageFile
from imagehash import average_hash, phash, dhash, whash
from argparse import RawTextHelpFormatter
ImageFile.LOAD_TRUNCATED_IMAGES = True
except OSError as exception:
if exception.errno != errno.EEXIST:
def image_ahash(image_path):
with Image.open(image_path) as image:
hash = average_hash(image)
return (image_path, None)
return (image_path, hash)
def image_phash(image_path):
with Image.open(image_path) as image:
return (image_path, None)
return (image_path, hash)
def image_dhash(image_path):
with Image.open(image_path) as image:
return (image_path, None)
return (image_path, hash)
def image_whash_haar(image_path):
with Image.open(image_path) as image:
return (image_path, None)
return (image_path, hash)
def image_whash_db4(image_path):
with Image.open(image_path) as image:
hash = whash(image, mode='db4')
return (image_path, None)
return (image_path, hash)
return f.endswith(".png") or f.endswith(".jpg") or f.endswith(".jpeg") or f.endswith(".bmp") or f.endswith(".gif")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Find similar images in the input folder and move them to the output folder', formatter_class=RawTextHelpFormatter)
hash_dict = { 'ahash': image_ahash,
'whash-haar': image_whash_haar,
'whash-db4': image_whash_db4
whash-haar: Haar wavelet hash
whash-db4: Daubechies wavelet hash"""
parser.add_argument("-hm", type=str,
choices = hash_dict.keys(),
parser.add_argument('input_folder', type=str,
parser.add_argument('output_folder', type=str,
parser.add_argument("-now", type=int,
help='number of subproccesses for hashing', default=None)
args = parser.parse_args()
folder_in = os.path.abspath(args.input_folder)
folder_out = os.path.abspath(args.output_folder)
image_hashfunc = hash_dict[args.hm]
if os.path.exists(folder_out):
print("output folder exists!")
print("Delete the folder before proceding")
print("Searching input folder:", folder_in)
image_filenames = tuple([os.path.join(root, file) for root, dirs, files in os.walk(folder_in) for file in files if is_image(file)])
total = len(image_filenames)
print("Number of found images:", total)
images = collections.defaultdict(list)
total = len(image_filenames)
p = multiprocessing.Pool(poolsize)
rs = p.imap_unordered(image_hashfunc, image_filenames, chunksize = 100)
for n, (image_path, hash) in enumerate(rs):
images[hash].append(image_path)
percents = round(100.0 * n / float(total), 1)
filled_len = int(round(bar_length * n / total))
bar = '=' * filled_len + '-' * (bar_length - filled_len)
sys.stdout.write('[%s] %s\r' % (bar, str(percents) + "%"))
sys.stdout.write('[%s] %s' % ('=' * bar_length, "100%"))
#are there any files which could not be hashed?
if len(images[None]) > 0:
print("Problematic files:")
for image in images[None]:
for img_list in images.values():
similar_files += 0 if tmp == 1 else tmp
print("Number of similar files:", similar_files)
print("Moving to output folder")
for k, img_list in images.items():
(path, _) = os.path.split(img)
to_folder = os.path.join(folder_out, path.split(":")[-1][1:])
shutil.move(img, to_folder)
print("No similar pictures found")