-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhungarian.py
169 lines (143 loc) · 5.97 KB
/
hungarian.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import numpy as np
class HungarianAlg(object):
def __init__(self, cost_matrix):
'''
This creates a HungarianAlg object with the cost matrix associated to it. It stores a copy of the matrix as well as the original.
It then records the shape and initiates some helper variables, like the covers for the rows and columns and the markings.
'''
self.O = cost_matrix
self.C = cost_matrix.copy(deep=True)
self.n, self.m = self.C.shape
self.row_covered = np.zeros(self.n, dtype=bool)
self.col_covered = np.zeros(self.m, dtype=bool)
self.marked = np.zeros((self.n, self.m), dtype=int)
def _clear_covers(self):
'''
This clears any covers, as they can change meaning from one step to another
'''
self.row_covered[:] = False
self.col_covered[:] = False
def _clear_marks(self):
'''
Clears marks when trying new solutions
'''
self.marked[:,:] = 0
def solve(self):
'''
This chooses an initial step for the process and then begins following the appropriate steps.
It saves the assignment solution to self.solution and the final cost found to self.minimum_cost.
'''
initial_step = _step0
if(self.n==self.m):
initial_step = _step1
step = initial_step
while type(step) is not tuple:
step = step(self)
if(step[0]):
self.solution = step[2]
self.minimum_cost = step[1]
return step[0]
def print_results(self):
'''
Just a pretty print for the results
'''
if self.solution == None:
raise Exception("No solution was computed yet or there is no solution. Run the solve method or try another cost matrix.")
for k,v in self.solution.items():
print("For {} is assignes {}".format(v,k))
print("The final total cost was {}".format(self.minimum_cost))
def _step0(state):
'''
This step pads the matrix so that it's squared
'''
matrix_size = max(state.n, state.m)
pad_columns = matrix_size - state.n
pad_rows = matrix_size - state.m
state.C = np.pad(state.C, ((0,pad_columns),(0,pad_rows)), 'constant', constant_values=(0))
state.row_covered = np.zeros(state.C.shape[0], dtype=bool)
state.col_covered = np.zeros(state.C.shape[1], dtype=bool)
state.marked = np.zeros((state.C.shape[0], state.C.shape[1]), dtype=int)
return _step1
def _step1(state):
'''
Subtracts the minimum value per column for each cell of that column
'''
state.C = state.C - np.min(state.C,axis=1)[:,np.newaxis]
return _step2
def _step2(state):
'''
Subtracts the minimum value per row for each cell of that row
'''
state.C = state.C - np.min(state.C,axis=0)[np.newaxis,:]
return _step3
def _step3(state):
'''
This step tries to find a coverage of all zeroes in the matrix using the minimum amount of row/column covers.
It then uses this coverage to check for a solution. If one is found, the algorithm stops. Otherwise, it goes to step 4 and back to step 3.
'''
row_marked = np.zeros(state.C.shape[0], dtype=bool)
col_marked = np.zeros(state.C.shape[1], dtype=bool)
for j in range(state.C.shape[1]):
for i in range(state.C.shape[0]):
if(not state.row_covered[i] and not state.col_covered[j] and state.C[i][j]==0):
state.marked[i][j] = 1
state.row_covered[i] = True
state.col_covered[j] = True
state._clear_covers()
for i in range(state.C.shape[0]):
if np.sum(state.marked[i,:])==0:
row_marked[i] = True
for j in range(state.C.shape[1]):
if not col_marked[j] and state.C[i][j] ==0:
col_marked[j] = True
for k in range(state.C.shape[0]):
if not row_marked[k] and state.marked[k][j]==1:
row_marked[k]=True
state.row_covered = np.logical_not(row_marked)
state.col_covered = col_marked
num_lines = np.sum(state.row_covered) + np.sum(state.col_covered)
if num_lines == state.C.shape[0]:
sol = _check_for_solution(state)
return sol
else:
return _step4
def _step4(state):
'''
If no solution was found in step 3, this step changes some values in the matrix so that we may now find some coverage.
The algorithm may be stuck in a step 3 - step 4 loop. If it happens, there is no solution or the wrong matrix was given.
'''
smallest_uncovered = np.inf
for i in range(state.C.shape[0]):
for j in range(state.C.shape[1]):
if not state.row_covered[i] and \
not state.col_covered[j] and \
state.C[i][j] < smallest_uncovered:
smallest_uncovered = state.C[i][j]
for i in range(state.C.shape[0]):
for j in range(state.C.shape[1]):
if not state.row_covered[i] and not state.col_covered[j]:
state.C[i][j] -= smallest_uncovered
elif state.row_covered[i] and state.col_covered[j]:
state.C[i][j] += smallest_uncovered
state._clear_covers()
state._clear_marks()
return _step3
def _check_for_solution(state):
'''
This method uses the coverage of the cost matrix to try and find a solution.
'''
for j in range(state.C.shape[1]):
for i in range(state.C.shape[0]):
if(not state.row_covered[i] and not state.col_covered[j] and state.C[i][j]==0):
state.marked[i][j] = 1
state.row_covered[i] = True
state.col_covered[j] = True
sol = {}
cost = 0
for i in range(state.n):
for j in range(state.m):
if(state.marked[i][j]==1):
sol[j] = i
cost = cost + state.O[i][j]
state._clear_covers()
return len(sol)==state.m, cost, sol