-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
233 lines (186 loc) · 9.36 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import traceback
try:
from os import system
from pythonosc.osc_server import AsyncIOOSCUDPServer
from pythonosc.dispatcher import Dispatcher
from pythonosc.udp_client import SimpleUDPClient
import pywintypes
from win32api import keybd_event
from win32con import VK_MEDIA_PLAY_PAUSE, VK_MEDIA_NEXT_TRACK, VK_MEDIA_PREV_TRACK, KEYEVENTF_EXTENDEDKEY
from pycaw.pycaw import AudioUtilities, ISimpleAudioVolume
from comtypes import CoInitialize, CoUninitialize
import winsdk.windows.media.control as wmc
import asyncio
def splash():
asci = """
_ ______ ________ __ _____ __ _ ____ ______ __ __
| | / / __ \/ ____/ /_ ____ _/ /_/ ___/____ ____ / /_(_) __/_ __/ ____/___ ____ / /__________ / /__ _____
| | / / /_/ / / / __ \/ __ `/ __/\__ \/ __ \/ __ \/ __/ / /_/ / / / / / __ \/ __ \/ __/ ___/ __ \/ / _ \/ ___/
| |/ / _, _/ /___/ / / / /_/ / /_ ___/ / /_/ / /_/ / /_/ / __/ /_/ / /___/ /_/ / / / / /_/ / / /_/ / / __/ /
|___/_/ |_|\____/_/ /_/\__,_/\__//____/ .___/\____/\__/_/_/ \__, /\____/\____/_/ /_/\__/_/ \____/_/\___/_/
/_/ /____/
by Jakhaxz
ed Denis Hik
"""
print(asci)
chatboxState = 1
muteSelf = 0
######## GET SPOTIFY NOW PLAYING ########
def getMuteselfText():
if (muteSelf == 1):
return "[mic off] "
else:
return ""
async def get_media_info():
sessions = await wmc.GlobalSystemMediaTransportControlsSessionManager.request_async()
# This source_app_user_model_id check and if statement is optional
# Use it if you want to only get a certain player/program's media
# (e.g. only chrome.exe's media not any other program's).
# To get the ID, use a breakpoint() to run sessions.get_current_session()
# while the media you want to get is playing.
# Then set TARGET_ID to the string this call returns.
global current_session
current_session = sessions.get_current_session()
if current_session: # there needs to be a media session running
if current_session.source_app_user_model_id.startswith('Spotify'):
info = await current_session.try_get_media_properties_async()
# song_attr[0] != '_' ignores system attributes
info_dict = {song_attr: info.__getattribute__(song_attr) for song_attr in dir(info) if song_attr[0] != '_'}
# converts winrt vector to list
info_dict['genres'] = list(info_dict['genres'])
info_dict["program"] = "Spotify"
return info_dict
if current_session.source_app_user_model_id.startswith('chrome'):
info = await current_session.try_get_media_properties_async()
info_dict = {song_attr: info.__getattribute__(song_attr) for song_attr in dir(info) if
song_attr[0] != '_'}
# converts winrt vector to list
info_dict['genres'] = list(info_dict['genres'])
info_dict["program"] = "Chrome"
return info_dict
if current_session.source_app_user_model_id.startswith('Яндекс Музыка'):
info = await current_session.try_get_media_properties_async()
info_dict = {song_attr: info.__getattribute__(song_attr) for song_attr in dir(info) if
song_attr[0] != '_'}
# converts winrt vector to list
info_dict['genres'] = list(info_dict['genres'])
info_dict["program"] = "Y Music"
return info_dict
# It could be possible to select a program from a list of current
# available ones. I just haven't implemented this here for my use case.
raise Exception('TARGET_PROGRAM is not the current media session')
def mediaIs(state):
if current_session == None:
return False
return int(wmc.GlobalSystemMediaTransportControlsSessionPlaybackStatus[state]) == current_session.get_playback_info().playback_status #get media state enum and compare to current main media session state
######## MEDIA CONTROLS START ########
def pauseTrack(unused_addr, arg):
if arg:
keybd_event(VK_MEDIA_PLAY_PAUSE, 0, KEYEVENTF_EXTENDEDKEY, 0) #Play/Pause
print("Detected Play/Pause")
def nextTrack(unused_addr, arg):
if arg:
keybd_event(VK_MEDIA_NEXT_TRACK, 0, KEYEVENTF_EXTENDEDKEY, 0) #Next Track
print("Detected Next Track")
def previousTrack(unused_addr, arg):
if arg:
keybd_event(VK_MEDIA_PREV_TRACK, 0, KEYEVENTF_EXTENDEDKEY, 0) #Previous Track
print("Detected Previous")
def volSlider(unused_addr, arg):
CoInitialize()
sessions = AudioUtilities.GetAllSessions()
for session in sessions:
volume = session._ctl.QueryInterface(ISimpleAudioVolume)
if session.Process and session.Process.name().startswith("Spotify"):
print("Spotify volume: %s" % str(round(volume.GetMasterVolume(), 2)))
volume.SetMasterVolume(arg, None)
CoUninitialize()
def chatBox(unused_addr, arg):
global chatboxState
if arg == 0:
chatboxState = 0
clearChat()
print("Now Playing Disabled")
else:
chatboxState = 1
print("Now Playing Enabled")
def infoMic(unused_addr, arg):
print("muted!" + str(arg))
global muteSelf
if arg == 0:
muteSelf = 0
else:
muteSelf = 1
######## MEDIA CONTROLS END ########
def clear():
system('cls')
def filter_handler(address, *args):
print(f"{address}: {args}")
dispatcher = Dispatcher()
dispatcher.map("/avatar/parameters/pause-play", pauseTrack)
dispatcher.map("/avatar/parameters/next-track", nextTrack)
dispatcher.map("/avatar/parameters/previous-track", previousTrack)
dispatcher.map("/avatar/parameters/vol-slider", volSlider)
dispatcher.map("/avatar/parameters/now-playing", chatBox)
dispatcher.map("/avatar/parameters/isMuteSelf", infoMic)
serverip = "127.0.0.1"
serverport = 9001
def sendChat(nowPlaying, program):
client.send_message("/chatbox/input", [getMuteselfText() + '[' + program + '] ~ ' + nowPlaying, True])
client.send_message("/avatar/parameters/isPlay", True)
def clearChat():
client.send_message("/chatbox/input", ['' + getMuteselfText(), True])
client.send_message("/avatar/parameters/isPlay", False)
async def loop():
global current_media_info
while True:
if chatboxState == 1:
if mediaIs("PLAYING") == True:
current_media_info = await get_media_info()
title = current_media_info.get('title')
artist = current_media_info.get('artist')
program = current_media_info.get('program')
nowPlaying = "%s - %s" % (title, artist)
clear()
splash()
print("[" + program + "] Now Playing: %s" % (nowPlaying))
sendChat(nowPlaying, program)
await asyncio.sleep(2)
else:
clear()
splash()
print("Nothing is playing")
clearChat()
try:
current_media_info = await get_media_info()
except:
print("Searching for an open Spotify.exe process...")
await asyncio.sleep(5)
continue
await asyncio.sleep(2)
else:
await asyncio.sleep(2)
async def init_main():
global client
global current_media_info
server = AsyncIOOSCUDPServer((serverip, serverport), dispatcher, asyncio.get_event_loop())
transport, protocol = await server.create_serve_endpoint() # Create datagram endpoint and start serving
clientip = "127.0.0.1"
clientport = 9000
client = SimpleUDPClient(clientip, clientport)
while True:
try:
current_media_info = await get_media_info()
break
except:
print("Searching for an open Spotify.exe process...")
await asyncio.sleep(5)
splash()
await loop() # Enter main loop of program
transport.close() # Clean up serve endpoint
asyncio.run(init_main())
except:
f = open('error.log', 'w')
e = traceback.format_exc()
f.write(str(e))
f.close()