forked from Wentao-Xu/HIST
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdataloader.py
78 lines (57 loc) · 2.71 KB
/
dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import torch
import numpy as np
class DataLoader:
def __init__(self, df_feature, df_label, df_market_value, df_stock_index, batch_size=800, pin_memory=True, start_index = 0, device=None):
assert len(df_feature) == len(df_label)
self.df_feature = df_feature.values
self.df_label = df_label.values
self.df_market_value = df_market_value
self.df_stock_index = df_stock_index
self.device = device
if pin_memory:
self.df_feature = torch.tensor(self.df_feature, dtype=torch.float, device=device)
self.df_label = torch.tensor(self.df_label, dtype=torch.float, device=device)
self.df_market_value = torch.tensor(self.df_market_value, dtype=torch.float, device=device)
self.df_stock_index = torch.tensor(self.df_stock_index, dtype=torch.long, device=device)
self.index = df_label.index
self.batch_size = batch_size
self.pin_memory = pin_memory
self.start_index = start_index
self.daily_count = df_label.groupby(level=0).size().values
self.daily_index = np.roll(np.cumsum(self.daily_count), 1)
self.daily_index[0] = 0
@property
def batch_length(self):
if self.batch_size <= 0:
return self.daily_length
return len(self.df_label) // self.batch_size
@property
def daily_length(self):
return len(self.daily_count)
def iter_batch(self):
if self.batch_size <= 0:
yield from self.iter_daily_shuffle()
return
indices = np.arange(len(self.df_label))
np.random.shuffle(indices)
for i in range(len(indices))[::self.batch_size]:
if len(indices) - i < self.batch_size:
break
yield i, indices[i:i+self.batch_size] # NOTE: advanced indexing will cause copy
def iter_daily_shuffle(self):
indices = np.arange(len(self.daily_count))
np.random.shuffle(indices)
for i in indices:
yield i, slice(self.daily_index[i], self.daily_index[i] + self.daily_count[i])
def iter_daily(self):
indices = np.arange(len(self.daily_count))
for i in indices:
yield i, slice(self.daily_index[i], self.daily_index[i] + self.daily_count[i])
# for idx, count in zip(self.daily_index, self.daily_count):
# yield slice(idx, idx + count) # NOTE: slice index will not cause copy
def get(self, slc):
outs = self.df_feature[slc], self.df_label[slc][:,0], self.df_market_value[slc], self.df_stock_index[slc]
# outs = self.df_feature[slc], self.df_label[slc]
if not self.pin_memory:
outs = tuple(torch.tensor(x, device=self.device) for x in outs)
return outs + (self.index[slc],)