-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata.py
More file actions
151 lines (124 loc) · 5.71 KB
/
Copy pathdata.py
File metadata and controls
151 lines (124 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import itertools
import math
import random
import cv2
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
class ImageScaleDataset(Dataset):
def __init__(self, image_class_pairs, transform=None):
self.image_class_pairs = image_class_pairs
self.transform = transform
def __len__(self):
return len(self.image_class_pairs)
def __getitem__(self, idx):
image, label = self.image_class_pairs[idx]
if self.transform:
image = self.transform(image)
return image, label
class SiameseDataset(Dataset):
def __init__(self, image_pair_class_triplets, transform=None):
self.image_pair_class_triplets = image_pair_class_triplets
self.transform = transform
def __len__(self):
return len(self.image_pair_class_triplets)
def __getitem__(self, idx):
img1, img2, scales = self.image_pair_class_triplets[idx]
if self.transform:
img1 = self.transform(img1)
img2 = self.transform(img2)
label = int(scales[0] == scales[1])
return img1, img2, label, scales
def create_image_scale_dataset(image, n_scales, block_size, stride, norm_before=True):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
if norm_before:
image = transform(image)[0].numpy()
data = create_data(image, n_scales, block_size, stride)
train_dataset, test_dataset = split_by_class(data)
if norm_before:
train_dataset = ImageScaleDataset(train_dataset, transforms.ToTensor())
test_dataset = ImageScaleDataset(test_dataset, transforms.ToTensor())
else:
train_dataset = ImageScaleDataset(train_dataset, transform)
test_dataset = ImageScaleDataset(test_dataset, transform)
return train_dataset, test_dataset
def create_image_contrastive_dataset(image, n_scales, block_size, stride, max_pairs_per_scale, norm_before=True):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
if norm_before:
image = transform(image)[0].numpy()
image_scale_data = create_data(image, n_scales, block_size, stride)
data = []
min_class_size = min(len(scale_data) for scale_data in image_scale_data.values())
num_combos = min(math.comb(min_class_size, 2), max_pairs_per_scale)
print("Data points per same scale:", num_combos)
for i, xs in image_scale_data.items():
for _ in range(num_combos):
x1, x2 = random.sample(xs, 2)
data.append((x1, x2, (i, i)))
'''
# Create image pairs for same scale data.
same_scale_data = {i: [] for i in image_scale_data.keys()}
for i, xs in image_scale_data.items():
xs = random.sample(xs, min(len(xs), 2000)) # minimize combinatorial blowup
for x1, x2 in itertools.combinations(xs, 2):
same_scale_data[i].append((x1, x2, 1))
# Ensure we use equal number of pairs from each scale.
min_class_size = min(2000, min(len(scale_data) for scale_data in same_scale_data.values()))
for scale_data in same_scale_data.values():
data.extend(random.sample(scale_data, min_class_size))
print("Data points per same scale:", min_class_size)
'''
# Create same number of image pairs from diff scale data.
num_same_scale_pairs = len(data)
diff_scale_combos = [(i, j) for i, j in itertools.combinations(image_scale_data.keys(), 2)]
num_pairs_per_diff_scale = num_same_scale_pairs // len(diff_scale_combos)
for i, j in diff_scale_combos:
for x1 in random.sample(image_scale_data[i], int(math.sqrt(num_pairs_per_diff_scale))):
for x2 in random.sample(image_scale_data[j], int(math.sqrt(num_pairs_per_diff_scale))):
data.append((x1, x2, (i, j)))
print("Total data points:", len(data))
train_dataset, test_dataset = train_test_split(data, test_size=0.2, random_state=None)
if norm_before:
train_dataset = SiameseDataset(train_dataset, transforms.ToTensor())
test_dataset = SiameseDataset(test_dataset, transforms.ToTensor())
else:
train_dataset = SiameseDataset(train_dataset, transform)
test_dataset = SiameseDataset(test_dataset, transform)
return train_dataset, test_dataset
def create_data(image, n_scales, block_size, stride):
data = {i: None for i in range(n_scales)}
for i in range(n_scales):
assert image.shape[0] >= block_size[0] and image.shape[1] >= block_size[1]
data[i] = extract_blocks(image, block_size, stride)
image = image[::2, ::2]
# The following changes the distribution too much:
# image = cv2.resize(image, (image.shape[1] // 2, image.shape[0] // 2), interpolation=cv2.INTER_AREA)
return data
def extract_blocks(image, block_size, stride):
blocks = []
h, w = image.shape
block_h, block_w = block_size
for y in range(0, h - block_h + 1, stride):
for x in range(0, w - block_w + 1, stride):
block = image[y:y + block_h, x:x + block_w]
blocks.append(block)
return blocks
def split_by_class(data):
data_pairs = {i: [(x, i) for x in xs] for i, xs in data.items()}
train_data = []
val_data = []
min_class_size = min(len(class_data) for class_data in data_pairs.values())
print("Data points per class:", min_class_size)
for class_data in data_pairs.values():
class_data = random.sample(class_data, min_class_size)
train_class, val_class = train_test_split(class_data, test_size=0.2, random_state=None)
train_data.extend(train_class)
val_data.extend(val_class)
return train_data, val_data