from tda.auth import easy_client import tda from tda.client import Client from tda.streaming import StreamClient import asyncio import pprint API_KEY = "XXXXXX" ACCOUNT_ID = "XXXXXX" REDIRECT = 'https://localhost' class MyStreamConsumer: """ We use a class to enforce good code organization practices """ def __init__(self, api_key, account_id, queue_size=1, credentials_path='./ameritrade-credentials.json'): # try changing it to .pickle again if it doesn't work """ We're storing the configuration variables within the class for easy access later in the code! """ self.api_key = api_key self.account_id = account_id self.credentials_path = credentials_path self.tda_client = None self.stream_client = None self.symbols = [ 'GOOG', 'GOOGL', 'BP', 'CVS', 'ADBE', 'CRM', 'SNAP', 'AMZN', 'BABA', 'DIS', 'TWTR', 'M', 'USO', 'AAPL', 'NFLX', 'GE', 'TSLA', 'F', 'SPY', 'FDX', 'UBER', 'ROKU', 'X', 'FB', 'BIDU', 'FIT' ] # Create a queue so we can queue up work gathered from the client self.queue = asyncio.Queue(queue_size) def initialize(self): """ Create the clients and log in. Using easy_client, we can get new creds from the user via the web browser if necessary """ try: self.tda_client = tda.auth.client_from_token_file(self.credentials_path, self.api_key) except FileNotFoundError: from selenium import webdriver with webdriver.Chrome() as driver: self.tda_client = tda.auth.client_from_login_flow(driver, self.api_key, REDIRECT, self.credentials_path) self.stream_client = StreamClient( self.tda_client, account_id=self.account_id) # The streaming client wants you to add a handler for every service type self.stream_client.add_timesale_equity_handler( self.handle_timesale_equity) async def stream(self): await self.stream_client.login() # Log into the streaming service await self.stream_client.quality_of_service(StreamClient.QOSLevel.EXPRESS) await self.stream_client.timesale_equity_subs(self.symbols) # Kick off our handle_queue function as an independent coroutine asyncio.ensure_future(self.handle_queue()) # Continuously handle inbound messages while True: await self.stream_client.handle_message() async def handle_timesale_equity(self, msg): """ This is where we take msgs from the streaming client and put them on a queue for later consumption. We use a queue to prevent us from wasting resources processing old data, and falling behind. """ # if the queue is full, make room if self.queue.full(): await self.queue.get() await self.queue.put(msg) async def handle_queue(self): """ Here we pull messages off the queue and process them. """ while True: msg = await self.queue.get() pprint.pprint(msg) async def main(): """ Create and instantiate the consumer, and start the stream """ consumer = MyStreamConsumer(API_KEY, ACCOUNT_ID) consumer.initialize() await consumer.stream() if __name__ == '__main__': asyncio.run(main())