diff --git a/train/federated_attribute_attack.py b/train/federated_attribute_attack.py index fc61bf0..e942702 100644 --- a/train/federated_attribute_attack.py +++ b/train/federated_attribute_attack.py @@ -1,21 +1,26 @@ import torch import torch.multiprocessing from torch.utils.data import DataLoader -from pytorch_lightning.callbacks.early_stopping import EarlyStopping -from pytorch_lightning.callbacks import ModelCheckpoint -from pytorch_lightning.loggers import MLFlowLogger -from pytorch_lightning import seed_everything -import pytorch_lightning as pl +from torch import optim +from torch.optim.lr_scheduler import ReduceLROnPlateau +from copy import deepcopy + from sklearn.model_selection import train_test_split from pathlib import Path import pandas as pd import numpy as np +import torch.nn as nn import sys, os, shutil, pickle, argparse, pdb sys.path.append(os.path.join(str(Path(os.path.realpath(__file__)).parents[1]), 'model')) +sys.path.append(os.path.join(str(Path(os.path.realpath(__file__)).parents[1]), 'utils')) + +from training_tools import EarlyStopping, seed_worker, result_summary from attack_model import attack_model +EarlyStopping + # some general mapping for this script gender_dict = {'F': 0, 'M': 1} leak_layer_dict = {'full': ['w0', 'b0', 'w1', 'b1', 'w2', 'b2'], @@ -31,28 +36,45 @@ def __len__(self): return len(self.dict_keys) def __getitem__(self, idx): - data_file_str = self.dict_keys[idx] gender = gender_dict[self.data_dict[data_file_str]['gender']] - tmp_data = (self.data_dict[data_file_str][weight_name] - weight_norm_mean_dict[weight_name]) / (weight_norm_std_dict[weight_name] + 0.00001) weights = torch.from_numpy(np.ascontiguousarray(tmp_data)) tmp_data = (self.data_dict[data_file_str][bias_name] - weight_norm_mean_dict[bias_name]) / (weight_norm_std_dict[bias_name] + 0.00001) bias = torch.from_numpy(np.ascontiguousarray(tmp_data)) - return weights, bias, gender -class AttackDataModule(pl.LightningDataModule): - def __init__(self, train, val): - super().__init__() - self.train = train - self.val = val - - def train_dataloader(self): - return DataLoader(self.train, batch_size=20, num_workers=0, shuffle=True) +def run_one_epoch(model, data_loader, optimizer, scheduler, loss_func, epoch, mode='train'): + + model.train() if mode == 'train' else model.eval() + step_outputs = [] + + for batch_idx, data_batch in enumerate(data_loader): + weights, bias, y = data_batch + weights, bias, y = weights.to(device), bias.to(device), y.to(device) + logits = model(weights.float().unsqueeze(dim=1), bias.float()) + loss = loss_func(logits, y) + + predictions = np.argmax(logits.detach().cpu().numpy(), axis=1) + pred_list = [predictions[pred_idx] for pred_idx in range(len(predictions))] + truth_list = [y.detach().cpu().numpy()[pred_idx] for pred_idx in range(len(predictions))] + step_outputs.append({'loss': loss.item(), 'pred': pred_list, 'truth': truth_list}) + + # step the loss back + if mode == 'train': + optimizer.zero_grad() + loss.backward() + optimizer.step() + del data_batch, logits, loss + torch.cuda.empty_cache() + result_dict = result_summary(step_outputs, mode, epoch) + + # if validate mode, step the loss + if mode == 'validate': + mean_loss = np.mean(result_dict['loss']) + scheduler.step(mean_loss) + return result_dict - def val_dataloader(self): - return DataLoader(self.val, batch_size=20, num_workers=0, shuffle=False) if __name__ == '__main__': @@ -77,13 +99,16 @@ def val_dataloader(self): parser.add_argument('--save_dir', default='/media/data/projects/speech-privacy') args = parser.parse_args() - seed_everything(8, workers=True) + seed_worker(8) device = torch.device("cuda:"+str(args.device)) if torch.cuda.is_available() else "cpu" if torch.cuda.is_available(): print('GPU available, use GPU') model_setting_str = 'local_epoch_'+str(args.local_epochs) if args.model_type == 'fed_avg' else 'local_epoch_1' model_setting_str += '_dropout_' + str(args.dropout).replace('.', '') model_setting_str += '_lr_' + str(args.learning_rate)[2:] + + torch.cuda.empty_cache() + torch.multiprocessing.set_sharing_strategy('file_system') # 1. normalization tmp computations weight_norm_mean_dict, weight_norm_std_dict = {}, {} @@ -110,12 +135,11 @@ def val_dataloader(self): for speaker_id in adv_gradient_dict: data_key = str(shadow_idx)+'_'+str(epoch)+'_'+speaker_id gradients = adv_gradient_dict[speaker_id]['gradient'] - gender = adv_gradient_dict[speaker_id]['gender'] shadow_training_sample_size += 1 # calculate running stats for computing std and mean shadow_data_dict[data_key] = {} - shadow_data_dict[data_key]['gender'] = gender + shadow_data_dict[data_key]['gender'] = adv_gradient_dict[speaker_id]['gender'] shadow_data_dict[data_key][weight_name] = gradients[weight_idx] shadow_data_dict[data_key][bias_name] = gradients[bias_idx] for layer_name in leak_layer_dict[args.leak_layer]: @@ -133,29 +157,53 @@ def val_dataloader(self): train_key_list, validate_key_list = train_test_split(list(shadow_data_dict.keys()), test_size=0.2, random_state=0) model = attack_model(args.leak_layer, args.feature_type) model = model.to(device) + optimizer = optim.Adam(model.parameters(), lr=float(args.model_learning_rate), weight_decay=1e-04, betas=(0.9, 0.98), eps=1e-9) + scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.2, verbose=True, min_lr=1e-6) # 2.2 define data loader dataset_train = WeightDataGenerator(train_key_list, shadow_data_dict) dataset_valid = WeightDataGenerator(validate_key_list, shadow_data_dict) - data_module = AttackDataModule(dataset_train, dataset_valid) - + train_loader = DataLoader(dataset_train, batch_size=20, num_workers=0, shuffle=True) + validation_loader =DataLoader(dataset_valid, batch_size=20, num_workers=0, shuffle=False) + # 2.3 initialize the early_stopping object - early_stopping = EarlyStopping(monitor="val_loss", mode='min', patience=5, stopping_threshold=1e-4, check_finite=True) - + early_stopping = EarlyStopping(patience=5, verbose=True) + loss = nn.NLLLoss().to(device) + # 2.4 log saving path attack_model_result_path = Path(os.path.realpath(__file__)).parents[1].joinpath('results', 'attack', args.leak_layer, args.model_type, args.feature_type, model_setting_str) log_path = Path.joinpath(attack_model_result_path, 'log_private_' + str(args.dataset)) if log_path.exists(): shutil.rmtree(log_path) Path.mkdir(log_path, parents=True, exist_ok=True) - mlf_logger = MLFlowLogger(experiment_name="ser", save_dir=str(log_path)) - - checkpoint_callback = ModelCheckpoint(monitor="val_acc_epoch", mode="max", - dirpath=str(attack_model_result_path), - filename='private_' + str(args.dataset) + '_model') - # 2.5 training using pytorch lighting framework - trainer = pl.Trainer(logger=mlf_logger, gpus=1, callbacks=[checkpoint_callback, early_stopping], max_epochs=50) - trainer.fit(model, data_module) + # 2.5 training attack model + result_dict, best_val_dict = {}, {} + for epoch in range(30): + # perform the training, validate, and test + train_result = run_one_epoch(model, train_loader, optimizer, scheduler, loss, epoch, mode='train') + validate_result = run_one_epoch(model, validation_loader, optimizer, scheduler, loss, epoch, mode='validate') + + # save the results for later + result_dict[epoch] = {} + result_dict[epoch]['train'], result_dict[epoch]['validate'] = train_result, validate_result + + if len(best_val_dict) == 0: best_val_dict, best_epoch = validate_result, epoch + if validate_result['uar'] > best_val_dict['uar'] and epoch > 10: + best_val_dict, best_epoch = validate_result, epoch + best_model = deepcopy(model.state_dict()) + + # early_stopping needs the validation loss to check if it has decresed, + # and if it has, it will make a checkpoint of the current model + if epoch > 10: early_stopping(validate_result['loss'], model) + + # print(final_acc, best_val_acc, best_epoch) + print('best epoch %d, best final acc %.2f, best final uar %.2f' % (best_epoch, best_val_dict['acc']*100, best_val_dict['uar']*100)) + print(best_val_dict['conf']) + + if early_stopping.early_stop and epoch > 10: + print("Early stopping") + break + # 3. we evaluate the attacker performance on service provider training save_result_df = pd.DataFrame() # 3.1 we perform 5 fold evaluation, since we also train the private data 5 times @@ -179,14 +227,12 @@ def val_dataloader(self): test_data_dict[data_key][bias_name] = gradients[bias_idx] dataset_test = WeightDataGenerator(list(test_data_dict.keys()), test_data_dict) - dataloader_test = DataLoader(dataset_test, batch_size=20, num_workers=1, shuffle=False) - - # model.freeze() - # trainer.test(test_dataloaders=data_module.train_dataloader()) - result_dict = trainer.test(dataloaders=dataloader_test, ckpt_path='best') - row_df['acc'], row_df['uar'] = result_dict[0]['test_acc_epoch'], result_dict[0]['test_uar_epoch'] + test_loader = DataLoader(dataset_test, batch_size=20, num_workers=0, shuffle=False) + test_result = run_one_epoch(model, test_loader, optimizer, scheduler, loss, best_epoch, mode='test') + + row_df['acc'], row_df['uar'] = test_result['acc'], test_result['uar'] save_result_df = pd.concat([save_result_df, row_df]) - del dataset_test, dataloader_test + del dataset_test, test_loader row_df = pd.DataFrame(index=['average']) row_df['acc'], row_df['uar'] = np.mean(save_result_df['acc']), np.mean(save_result_df['uar']) diff --git a/train/federated_ser_classifier.py b/train/federated_ser_classifier.py index c0731d1..592bd1a 100644 --- a/train/federated_ser_classifier.py +++ b/train/federated_ser_classifier.py @@ -4,6 +4,7 @@ import argparse, logging import torch.multiprocessing from torch.utils.data import DataLoader +from pytorch_lightning import seed_everything import numpy as np from pathlib import Path @@ -11,20 +12,11 @@ import copy, time, pickle, shutil, sys, os, pdb from copy import deepcopy -sys.path.append(os.path.join(os.path.abspath(os.path.curdir), '..', 'model')) -sys.path.append(os.path.join(os.path.abspath(os.path.curdir), 'model')) - -from baseline_models import dnn_classifier +sys.path.append(os.path.join(str(Path(os.path.realpath(__file__)).parents[1]), 'model')) -from update import average_weights, average_gradients -from pytorch_lightning import seed_everything -from pytorch_lightning.callbacks.early_stopping import EarlyStopping -from pytorch_lightning.loggers import WandbLogger -from pytorch_lightning.callbacks import ModelCheckpoint -from pytorch_lightning.loggers import MLFlowLogger -import pytorch_lightning as pl +from dnn_models import dnn_classifier +from update import average_weights, average_gradients, local_trainer -pl.utilities.distributed.log.setLevel(logging.ERROR) # define label mapping emo_dict = {'neu': 0, 'hap': 1, 'sad': 2, 'ang': 3} @@ -44,68 +36,95 @@ def save_result(save_index, acc, uar, best_epoch, dataset): class DatasetGenerator(): - def __init__(self, dataset, idxs): + def __init__(self, dataset): self.dataset = dataset - self.idxs = idxs def __len__(self): - return len(self.idxs) + return len(self.dataset['dataset']) def __getitem__(self, item): - data = self.dataset[self.idxs[item]]['data'] - label = emo_dict[self.dataset[self.idxs[item]]['label']] - dataset_str = self.dataset[self.idxs[item]]['dataset'] - - return data, label, dataset_str - + data = self.dataset['data'][item] + label = self.dataset['label'][item] + dataset_str = self.dataset['dataset'][item] + return torch.tensor(data), torch.tensor(int(label)), dataset_str def read_data_dict_by_client(dataset_list, fold_idx): return_train_dict, return_test_dict = {}, {} + dataset_label_list = [] # prepare the data for the training for dataset in dataset_list: with open(preprocess_path.joinpath(dataset, 'fold'+str(int(fold_idx+1)), 'training_'+args.norm+'.pkl'), 'rb') as f: train_dict = pickle.load(f) with open(preprocess_path.joinpath(dataset, 'fold'+str(int(fold_idx+1)), 'test_'+args.norm+'.pkl'), 'rb') as f: test_dict = pickle.load(f) - for tmp_dict in [train_dict, test_dict]: for key in tmp_dict: tmp_dict[key]['dataset'] = dataset # test set will be the same - for key in test_dict: return_test_dict[key] = test_dict[key].copy() + x_test, y_test = np.zeros([len(test_dict), feature_len_dict[args.feature_type]]), np.zeros([len(test_dict)]) + for key_idx, key in enumerate(list(test_dict.keys())): + x_test[key_idx], y_test[key_idx] = test_dict[key]['data'], int(emo_dict[test_dict[key]['label']]) + dataset_label_list.append(test_dict[key]['dataset']) + if len(return_test_dict) == 0: + return_test_dict['data'], return_test_dict['label'] = x_test, y_test + return_test_dict['dataset'] = dataset_label_list + else: + return_test_dict['data'] = np.append(return_test_dict['data'], x_test, axis=0) + return_test_dict['label'] = np.append(return_test_dict['label'], y_test, axis=0) + return_test_dict['dataset'] = dataset_label_list + # we remake the data dict per speaker for the ease of local training - speaker_data_dict = {} + train_speaker_data_dict = {} for key in train_dict: speaker_id = str(train_dict[key]['speaker_id']) - if speaker_id not in speaker_data_dict: - speaker_data_dict[speaker_id] = [] - speaker_data_dict[speaker_id].append(key) - + if speaker_id not in train_speaker_data_dict: train_speaker_data_dict[speaker_id] = [] + train_speaker_data_dict[speaker_id].append(key) + # in federated setting that the norm validation dict will be based on local client data # so we combine train_dict and validate_dict in centralized setting # then choose certain amount data per client as local validation set if dataset == 'crema-d': - for key in train_dict: - speaker_id = str(train_dict[key]['speaker_id']) - if speaker_id not in return_train_dict: return_train_dict[speaker_id] = {} - return_train_dict[speaker_id][key] = train_dict[key].copy() + for speaker_id in train_speaker_data_dict: + speaker_data_key_list = train_speaker_data_dict[speaker_id] + x, y = np.zeros([len(speaker_data_key_list), feature_len_dict[args.feature_type]]), np.zeros([len(speaker_data_key_list)]) + dataset_list = [] + + for idx, data_key in enumerate(speaker_data_key_list): + x[idx], y[idx] = train_dict[data_key]['data'], int(emo_dict[train_dict[data_key]['label']]) + dataset_list.append(train_dict[data_key]['dataset']) + + return_train_dict[speaker_id] = {} + return_train_dict[speaker_id]['data'] = x + return_train_dict[speaker_id]['label'] = y + return_train_dict[speaker_id]['gender'] = train_dict[data_key]['gender'] + return_train_dict[speaker_id]['dataset'] = dataset_list else: # we want to divide speaker data if the dataset is iemocap or msp-improv to increase client size - for speaker_id in speaker_data_dict: + for speaker_id in train_speaker_data_dict: # in iemocap and msp-improv # we spilit each speaker data into 10 parts in order to create more clients - idx_array = np.random.permutation(len(speaker_data_dict[speaker_id])) - key_list = speaker_data_dict[speaker_id] + idx_array = np.random.permutation(len(train_speaker_data_dict[speaker_id])) + speaker_data_key_list = train_speaker_data_dict[speaker_id] split_array = np.array_split(idx_array, 10) for split_idx in range(len(split_array)): # we randomly pick 10% of data idxs_train = split_array[split_idx] - for idx in idxs_train: - key = key_list[idx] - if speaker_id+'_'+str(split_idx) not in return_train_dict: return_train_dict[speaker_id+'_'+str(split_idx)] = {} - return_train_dict[speaker_id+'_'+str(split_idx)][key] = train_dict[key].copy() + + x, y = np.zeros([len(idxs_train), feature_len_dict[args.feature_type]]), np.zeros([len(idxs_train)]) + dataset_list = [] + for idx, key_idx in enumerate(idxs_train): + data_key = speaker_data_key_list[key_idx] + x[idx], y[idx] = train_dict[data_key]['data'], int(emo_dict[train_dict[data_key]['label']]) + dataset_list.append(train_dict[data_key]['dataset']) + # if speaker_id+'_'+str(split_idx) not in return_train_dict: return_train_dict[speaker_id+'_'+str(split_idx)] = {} + # return_train_dict[speaker_id+'_'+str(split_idx)][key] = train_dict[key].copy() + return_train_dict[speaker_id+'_'+str(split_idx)] = {} + return_train_dict[speaker_id+'_'+str(split_idx)]['data'] = x + return_train_dict[speaker_id+'_'+str(split_idx)]['label'] = y + return_train_dict[speaker_id+'_'+str(split_idx)]['gender'] = train_dict[data_key]['gender'] + return_train_dict[speaker_id+'_'+str(split_idx)]['dataset'] = dataset_list return return_train_dict, return_test_dict if __name__ == '__main__': @@ -124,6 +143,7 @@ def read_data_dict_by_client(dataset_list, fold_idx): parser.add_argument('--optimizer', default='adam') parser.add_argument('--model_type', default='fed_sgd') parser.add_argument('--pred', default='emotion') + parser.add_argument('--local_dp', default=0.1) parser.add_argument('--save_dir', default='/media/data/projects/speech-privacy') args = parser.parse_args() @@ -147,12 +167,12 @@ def read_data_dict_by_client(dataset_list, fold_idx): model_setting_str = 'local_epoch_'+str(args.local_epochs) if args.model_type == 'fed_avg' else 'local_epoch_1' model_setting_str += '_dropout_' + str(args.dropout).replace('.', '') model_setting_str += '_lr_' + str(args.learning_rate)[2:] + # model_setting_str += '_local_dp_' + str(args.local_dp).replace('.', '') # Read the data per speaker train_speaker_dict, test_speaker_dict = read_data_dict_by_client(dataset_list, fold_idx) - num_of_speakers = len(train_speaker_dict) - speaker_list = list(set(train_speaker_dict.keys())) - + num_of_speakers, speaker_list = len(train_speaker_dict), list(set(train_speaker_dict.keys())) + # We save the utterance keys used for training and validation per speaker (client) train_val_idx_dict = {} for speaker_id in train_speaker_dict: @@ -163,9 +183,12 @@ def read_data_dict_by_client(dataset_list, fold_idx): for idx in idx_array[int(0.8*len(idx_array)):]: train_val_idx_dict[speaker_id]['val'].append(tmp_keys[idx]) # Define the model - global_model = dnn_classifier(pred='emotion', input_spec=feature_len_dict[args.feature_type], dropout=float(args.dropout), args=args) + global_model = dnn_classifier(pred='emotion', input_spec=feature_len_dict[args.feature_type], dropout=float(args.dropout)) global_model = global_model.to(device) global_weights = global_model.state_dict() + + # copy weights + criterion = nn.NLLLoss().to(device) # log saving path model_result_path = Path(args.save_dir).joinpath('federated_model_params', args.model_type, args.pred, args.feature_type, args.dataset, model_setting_str, save_row_str) @@ -176,18 +199,9 @@ def read_data_dict_by_client(dataset_list, fold_idx): log_path = Path.joinpath(model_result_path, 'log') if log_path.exists(): shutil.rmtree(log_path) Path.mkdir(log_path, parents=True, exist_ok=True) - mlf_logger = MLFlowLogger(experiment_name="ser", save_dir=str(log_path)) - - # trainer - if args.model_type == 'fed_avg': - trainer = pl.Trainer(logger=mlf_logger, gpus=1, weights_summary=None, - progress_bar_refresh_rate=0, max_epochs=1) - else: - trainer = pl.Trainer(logger=mlf_logger, gpus=1, weights_summary=None, - progress_bar_refresh_rate=0, max_epochs=1, limit_train_batches=1) # test loader - dataset_test = DatasetGenerator(test_speaker_dict, list(test_speaker_dict.keys())) + dataset_test = DatasetGenerator(test_speaker_dict) test_dataloaders = DataLoader(dataset_test, batch_size=20, num_workers=0, shuffle=False) # Training steps @@ -206,23 +220,20 @@ def read_data_dict_by_client(dataset_list, fold_idx): print('speaker id %s' % (speaker_id)) # 1.1 Local training - local_model = copy.deepcopy(global_model) - dataset_train = DatasetGenerator(train_speaker_dict[speaker_id], train_val_idx_dict[speaker_id]['train']) + dataset_train = DatasetGenerator(train_speaker_dict[speaker_id]) train_dataloaders = DataLoader(dataset_train, batch_size=20, num_workers=0, shuffle=True) - trainer.fit(local_model, train_dataloaders=train_dataloaders) - - # read params to save - train_sample_size = int(trainer.callback_metrics['train_size']) - local_losses.append(float(trainer.callback_metrics['train_loss'])) - local_num_sampels.append(train_sample_size) + trainer = local_trainer(args, device, criterion, args.model_type, train_dataloaders) + # read shared updates: parameters in fed_avg and gradients for fed_sgd if args.model_type == 'fed_avg': - local_update = copy.deepcopy(local_model.state_dict()) - local_updates.append(local_update) + local_update, train_result = trainer.update_weights(model=copy.deepcopy(global_model)) else: - local_update = [] - for param in local_model.gradient: local_update.append(param) - local_updates.append(local_update) + local_update, train_result = trainer.update_gradients(model=copy.deepcopy(global_model)) + local_updates.append(copy.deepcopy(local_update)) + + # read params to save + local_losses.append(train_result['loss']) + local_num_sampels.append(train_result['num_samples']) # 1.2 calculate and save the raw gradients or pseudo gradients gradients = [] @@ -232,8 +243,8 @@ def read_data_dict_by_client(dataset_list, fold_idx): original_model = copy.deepcopy(global_model).state_dict() # calculate how many updates per local epoch - local_update_per_epoch = int(train_sample_size / int(args.batch_size)) + 1 - + local_update_per_epoch = int(train_result['num_samples'] / int(args.batch_size)) + 1 + for key in original_model: original_params = original_model[key].detach().clone().cpu().numpy() update_params = local_update[key].detach().clone().cpu().numpy() @@ -250,8 +261,8 @@ def read_data_dict_by_client(dataset_list, fold_idx): tmp_key = list(train_speaker_dict[speaker_id].keys())[0] gradient_hist_dict[speaker_id] = {} gradient_hist_dict[speaker_id]['gradient'] = gradients - gradient_hist_dict[speaker_id]['gender'] = train_speaker_dict[speaker_id][tmp_key]['gender'] - del local_model + gradient_hist_dict[speaker_id]['gender'] = train_speaker_dict[speaker_id]['gender'] + del trainer # 1.4 dump the gradients for the later usage f = open(str(model_result_path.joinpath('gradient_hist_'+str(epoch)+'.pkl')), "wb") @@ -283,17 +294,18 @@ def read_data_dict_by_client(dataset_list, fold_idx): # 3.1 Iterate each client at the current global round, calculate the performance for idx in idxs_speakers: speaker_id = speaker_list[idx] - - dataset_validation = DatasetGenerator(train_speaker_dict[speaker_id], train_val_idx_dict[speaker_id]['val']) + dataset_validation = DatasetGenerator(train_speaker_dict[speaker_id]) val_dataloaders = DataLoader(dataset_validation, batch_size=20, num_workers=0, shuffle=False) - val_result_dict = trainer.validate(copy.deepcopy(global_model), dataloaders=val_dataloaders, verbose=False) + + trainer = local_trainer(args, device, criterion, args.model_type, val_dataloaders) + local_val_result = trainer.inference(copy.deepcopy(global_model)) # save validation accuracy, uar, and loss - local_num_sampels.append(int(val_result_dict[0]['val_size'])) - validation_acc.append(float(val_result_dict[0]['val_acc'])) - validation_uar.append(float(val_result_dict[0]['val_uar'])) - validation_loss.append(float(val_result_dict[0]['val_loss'])) - del val_dataloaders + local_num_sampels.append(local_val_result['num_samples']) + validation_acc.append(local_val_result['acc']) + validation_uar.append(local_val_result['uar']) + validation_loss.append(local_val_result['loss']) + del val_dataloaders, trainer # 3.2 Re-Calculate weigted performance scores validate_result = {} @@ -302,42 +314,40 @@ def read_data_dict_by_client(dataset_list, fold_idx): for acc_idx in range(len(validation_acc)): weighted_acc += validation_acc[acc_idx] * (local_num_sampels[acc_idx] / total_num_samples) weighted_rec += validation_uar[acc_idx] * (local_num_sampels[acc_idx] / total_num_samples) - validate_result['val_acc'], validate_result['val_uar'] = weighted_acc, weighted_rec + validate_result['acc'], validate_result['uar'] = weighted_acc, weighted_rec validate_result['loss'] = np.mean(validation_loss) print('| Global Round validation : {} | \tacc: {:.2f}% | \tuar: {:.2f}% | \tLoss: {:.6f}\n'.format( epoch, weighted_acc*100, weighted_rec*100, np.mean(validation_loss))) # 4. Perform the test on holdout set - test_result_dict = trainer.test(copy.deepcopy(global_model), dataloaders=test_dataloaders, verbose=False) - test_result_dict[0]['conf'] = trainer.model.test_conf - + trainer = local_trainer(args, device, criterion, args.model_type, test_dataloaders) + test_result = trainer.inference(copy.deepcopy(global_model)) + # 5. Save the results for later result_dict[epoch] = {} result_dict[epoch]['train'] = {} result_dict[epoch]['train']['loss'] = sum(local_losses) / len(local_losses) - result_dict[epoch]['validate'], result_dict[epoch]['test'] = validate_result, test_result_dict[0] - - if validate_result['val_uar'] > best_score and epoch > 100: - best_val_acc, best_score = validate_result['val_acc'], validate_result['val_uar'] - final_acc, final_recall = test_result_dict[0]['test_acc'], test_result_dict[0]['test_uar'] - final_confusion = test_result_dict[0]['conf'] - best_epoch, best_dict = epoch, test_result_dict[0].copy() - best_model = deepcopy(global_model.state_dict()) + result_dict[epoch]['validate'] = validate_result + result_dict[epoch]['test'] = test_result + + if epoch == 0: best_epoch, best_val_dict = 0, validate_result + if validate_result['uar'] > best_val_dict['uar'] and epoch > 100: + # Save best model and training history + best_epoch, best_val_dict, best_test_dict = epoch, validate_result, test_result + torch.save(deepcopy(global_model.state_dict()), str(model_result_path.joinpath('model.pt'))) if epoch > 100: # log results - print('best epoch %d, best final acc %.2f, best val acc %.2f' % (best_epoch, final_acc*100, best_val_acc*100)) - print('best epoch %d, best final rec %.2f, best val rec %.2f' % (best_epoch, final_recall*100, best_score*100)) - print(best_dict['conf']) + print('best epoch %d, best final acc %.2f, best val acc %.2f' % (best_epoch, best_test_dict['acc']*100, best_val_dict['acc']*100)) + print('best epoch %d, best final rec %.2f, best val rec %.2f' % (best_epoch, best_test_dict['uar']*100, best_val_dict['uar']*100)) + print(best_test_dict['conf']) # Performance save code - row_df = save_result(save_row_str, best_dict['test_acc'], best_dict['test_uar'], best_epoch, args.dataset) + row_df = save_result(save_row_str, best_test_dict['acc'], best_test_dict['uar'], best_epoch, args.dataset) save_result_df = pd.concat([save_result_df, row_df]) save_result_df.to_csv(str(model_result_csv_path.joinpath('private_'+ str(args.dataset) + '.csv'))) - # Save best model and training history - torch.save(best_model, str(model_result_path.joinpath('model.pt'))) f = open(str(model_result_path.joinpath('results.pkl')), "wb") pickle.dump(result_dict, f) f.close() diff --git a/train/update.py b/train/update.py index ac12f11..e5bab67 100644 --- a/train/update.py +++ b/train/update.py @@ -1,14 +1,11 @@ import pandas as pd -import torch from torch import nn from torch.utils import data from torch.utils.data import DataLoader, Dataset -from torch.optim.lr_scheduler import ReduceLROnPlateau -import copy +import copy, pdb, time, warnings, torch import numpy as np -import time -import warnings -import pdb +from sklearn.metrics import accuracy_score, recall_score +from sklearn.metrics import confusion_matrix warnings.filterwarnings('ignore') @@ -39,4 +36,102 @@ def average_gradients(g, num_samples_list): for layer_idx in range(len(g[0])): for client_idx in range(1, len(g)): g_avg[layer_idx] += torch.div(g[client_idx][layer_idx]*num_samples_list[client_idx], total_num_samples) - return g_avg \ No newline at end of file + return g_avg + + +def result_summary(step_outputs): + loss_list, y_true, y_pred = [], [], [] + for step in range(len(step_outputs)): + for idx in range(len(step_outputs[step]['pred'])): + y_true.append(step_outputs[step]['truth'][idx]) + y_pred.append(step_outputs[step]['pred'][idx]) + loss_list.append(step_outputs[step]['loss']) + + result_dict = {} + acc_score = accuracy_score(y_true, y_pred) + rec_score = recall_score(y_true, y_pred, average='macro') + confusion_matrix_arr = np.round(confusion_matrix(y_true, y_pred, normalize='true')*100, decimals=2) + + result_dict['acc'] = acc_score + result_dict['uar'] = rec_score + result_dict['conf'] = confusion_matrix_arr + result_dict['loss'] = np.mean(loss_list) + result_dict['num_samples'] = len(y_pred) + return result_dict + + +class local_trainer(object): + def __init__(self, args, device, criterion, model_type, dataloader): + self.args = args + self.device = device + self.criterion = criterion + self.model_type = model_type + self.dataloader = dataloader + + def update_weights(self, model): + # Set mode to train model + model.train() + + step_outputs = [] + optimizer = torch.optim.Adam(model.parameters(), lr=float(self.args.learning_rate), weight_decay=1e-04, betas=(0.9, 0.98), eps=1e-9) + + for iter in range(int(self.args.local_epochs)): + for batch_idx, batch_data in enumerate(self.dataloader): + x, y, dataset = batch_data + x, y = x.to(self.device), y.to(self.device) + + model.zero_grad() + optimizer.zero_grad() + logits = model(x.float()) + loss = self.criterion(logits, y) + loss.backward() + optimizer.step() + + predictions = np.argmax(logits.detach().cpu().numpy(), axis=1) + pred_list = [predictions[pred_idx] for pred_idx in range(len(predictions))] + truth_list = [y.detach().cpu().numpy()[pred_idx] for pred_idx in range(len(predictions))] + step_outputs.append({'loss': loss.item(), 'pred': pred_list, 'truth': truth_list}) + + result_dict = result_summary(step_outputs) + return model.state_dict(), result_dict + + def update_gradients(self, model): + # Set mode to train model + model.train() + step_outputs = [] + for batch_idx, batch_data in enumerate(self.dataloader): + if batch_idx == 0: + x, y, dataset = batch_data + x, y = x.to(self.device), y.to(self.device) + + model.zero_grad() + logits = model(x.float()) + loss = self.criterion(logits, y) + loss.backward() + grads = [param.grad.detach().clone() for param in model.parameters()] + + predictions = np.argmax(logits.detach().cpu().numpy(), axis=1) + pred_list = [predictions[pred_idx] for pred_idx in range(len(predictions))] + truth_list = [y.detach().cpu().numpy()[pred_idx] for pred_idx in range(len(predictions))] + step_outputs.append({'loss': loss.item(), 'pred': pred_list, 'truth': truth_list}) + result_dict = result_summary(step_outputs) + return grads, result_dict + + def inference(self, model): + model.eval() + step_outputs = [] + + for batch_idx, batch_data in enumerate(self.dataloader): + x, y, dataset = batch_data + x, y = x.to(self.device), y.to(self.device) + + logits = model(x.float()) + loss = self.criterion(logits, y) + + predictions = np.argmax(logits.detach().cpu().numpy(), axis=1) + pred_list = [predictions[pred_idx] for pred_idx in range(len(predictions))] + truth_list = [y.detach().cpu().numpy()[pred_idx] for pred_idx in range(len(predictions))] + step_outputs.append({'loss': loss.item(), 'pred': pred_list, 'truth': truth_list}) + result_dict = result_summary(step_outputs) + return result_dict + diff --git a/utils/training_tools.py b/utils/training_tools.py index e48f2a1..69f13c0 100644 --- a/utils/training_tools.py +++ b/utils/training_tools.py @@ -76,46 +76,27 @@ def save_checkpoint(self, val_loss, model): self.val_loss_min = val_loss -def ReturnResultDict(truth_dict, predict_dict, dataset, pred, mode='test', loss=None, epoch=None): +def result_summary(step_outputs, mode, epoch): + loss_list, y_true, y_pred = [], [], [] + for step in range(len(step_outputs)): + for idx in range(len(step_outputs[step]['pred'])): + y_true.append(step_outputs[step]['truth'][idx]) + y_pred.append(step_outputs[step]['pred'][idx]) + loss_list.append(step_outputs[step]['loss']) + result_dict = {} - result_dict[dataset] = {} - result_dict[dataset]['acc'] = {} - result_dict[dataset]['rec'] = {} - result_dict[dataset]['loss'] = {} - result_dict[dataset]['conf'] = {} + acc_score = accuracy_score(y_true, y_pred) + rec_score = recall_score(y_true, y_pred, average='macro') + confusion_matrix_arr = np.round(confusion_matrix(y_true, y_pred, normalize='true')*100, decimals=2) - acc_score = accuracy_score(truth_dict[dataset], predict_dict[dataset]) - rec_score = recall_score(truth_dict[dataset], predict_dict[dataset], average='macro') - confusion_matrix_arr = np.round(confusion_matrix(truth_dict[dataset], predict_dict[dataset], normalize='true')*100, decimals=2) + result_dict['acc'] = acc_score + result_dict['uar'] = rec_score + result_dict['conf'] = confusion_matrix_arr + result_dict['loss'] = np.mean(loss_list) + result_dict['num_samples'] = len(y_pred) - print('Total %s, %s accuracy %.3f / recall %.3f after %d' % (dataset, mode, acc_score, rec_score, epoch)) + print('%s accuracy %.3f / recall %.3f / loss %.3f after %d' % (mode, acc_score, rec_score, np.mean(loss_list), epoch)) print(confusion_matrix_arr) - - result_dict[dataset]['acc'][pred] = acc_score - result_dict[dataset]['rec'][pred] = rec_score - result_dict[dataset]['conf'][pred] = confusion_matrix_arr - result_dict[dataset]['loss'][pred] = loss - - if 'combine' in dataset: - tmp_list = ['iemocap', 'crema-d', 'msp-improv'] if dataset == 'combine' else ['iemocap', 'crema-d'] - for tmp_str in tmp_list: - result_dict[tmp_str] = {} - result_dict[tmp_str]['acc'] = {} - result_dict[tmp_str]['rec'] = {} - result_dict[tmp_str]['loss'] = {} - result_dict[tmp_str]['conf'] = {} - - acc_score = accuracy_score(truth_dict[tmp_str], predict_dict[tmp_str]) - rec_score = recall_score(truth_dict[tmp_str], predict_dict[tmp_str], average='macro') - confusion_matrix_arr = np.round(confusion_matrix(truth_dict[tmp_str], predict_dict[tmp_str], normalize='true')*100, decimals=2) - - print('%s: total %s accuracy %.3f / recall %.3f after %d' % (tmp_str, mode, acc_score, rec_score, epoch)) - print(confusion_matrix_arr) - - result_dict[tmp_str]['acc'][pred] = acc_score - result_dict[tmp_str]['rec'][pred] = rec_score - result_dict[tmp_str]['conf'][pred] = confusion_matrix_arr - return result_dict