Source code for robin_stocks.tda.authentication

import pickle
from datetime import datetime, timedelta
from pathlib import Path

from cryptography.fernet import Fernet
from robin_stocks.tda.globals import DATA_DIR_NAME, PICKLE_NAME
from robin_stocks.tda.helper import (request_data, set_login_state,
from robin_stocks.tda.urls import URLS

[docs]def login_first_time(encryption_passcode, client_id, authorization_token, refresh_token): """ Stores log in information in a pickle file on the computer. After being used once, user can call login() to automatically read in information from pickle file and refresh authorization tokens when needed. :param encryption_passcode: Encryption key created by generate_encryption_passcode(). :type encryption_passcode: str :param client_id: The Consumer Key for the API account. :type client_id: str :param authorization_token: The authorization code returned from post request to :type authorization_token: str :param refresh_token: The refresh code returned from post request to :type refresh_token: str """ if type(encryption_passcode) is str: encryption_passcode = encryption_passcode.encode() cipher_suite = Fernet(encryption_passcode) # Create necessary folders and paths for pickle file as defined in globals. data_dir = Path.home().joinpath(DATA_DIR_NAME) if not data_dir.exists(): data_dir.mkdir(parents=True) pickle_path = data_dir.joinpath(PICKLE_NAME) if not pickle_path.exists(): Path.touch(pickle_path) # Write information to the file. with"wb") as pickle_file: pickle.dump( { 'authorization_token': cipher_suite.encrypt(authorization_token.encode()), 'refresh_token': cipher_suite.encrypt(refresh_token.encode()), 'client_id': cipher_suite.encrypt(client_id.encode()), 'authorization_timestamp':, 'refresh_timestamp': }, pickle_file)
[docs]def login(encryption_passcode): """ Set the authorization token so the API can be used. Gets a new authorization token every 30 minutes using the refresh token. Gets a new refresh token every 60 days. :param encryption_passcode: Encryption key created by generate_encryption_passcode(). :type encryption_passcode: str """ if type(encryption_passcode) is str: encryption_passcode = encryption_passcode.encode() cipher_suite = Fernet(encryption_passcode) # Check that file exists before trying to read from it. data_dir = Path.home().joinpath(DATA_DIR_NAME) pickle_path = data_dir.joinpath(PICKLE_NAME) if not pickle_path.exists(): raise FileExistsError( "Please Call login_first_time() to create pickle file.") # Read the information from the pickle file. with"rb") as pickle_file: pickle_data = pickle.load(pickle_file) access_token = cipher_suite.decrypt(pickle_data['authorization_token']).decode() refresh_token = cipher_suite.decrypt(pickle_data['refresh_token']).decode() client_id = cipher_suite.decrypt(pickle_data['client_id']).decode() authorization_timestamp = pickle_data['authorization_timestamp'] refresh_timestamp = pickle_data['refresh_timestamp'] # Authorization tokens expire after 30 mins. Refresh tokens expire after 90 days, # but you need to request a fresh authorization and refresh token before it expires. authorization_delta = timedelta(seconds=1800) refresh_delta = timedelta(days=60) url = URLS.oauth() # If it has been longer than 60 days. Get a new refresh and authorization token. # Else if it has been longer than 30 minutes, get only a new authorization token. if ( - refresh_timestamp > refresh_delta): payload = { "grant_type": "refresh_token", "access_type": "offline", "refresh_token": refresh_token, "client_id": client_id } data, _ = request_data(url, payload, True) if "access_token" not in data and "refresh_token" not in data: raise ValueError( "Refresh token is no longer valid. Call login_first_time() to get a new refresh token.") access_token = data["access_token"] refresh_token = data["refresh_token"] with"wb") as pickle_file: pickle.dump( { 'authorization_token': cipher_suite.encrypt(access_token.encode()), 'refresh_token': cipher_suite.encrypt(refresh_token.encode()), 'client_id': cipher_suite.encrypt(client_id.encode()), 'authorization_timestamp':, 'refresh_timestamp': }, pickle_file) elif ( - authorization_timestamp > authorization_delta): payload = { "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": client_id } data, _ = request_data(url, payload, True) if "access_token" not in data: raise ValueError( "Refresh token is no longer valid. Call login_first_time() to get a new refresh token.") access_token = data["access_token"] # Write new data to file. Do not replace the refresh timestamp. with"wb") as pickle_file: pickle.dump( { 'authorization_token': cipher_suite.encrypt(access_token.encode()), 'refresh_token': cipher_suite.encrypt(refresh_token.encode()), 'client_id': cipher_suite.encrypt(client_id.encode()), 'authorization_timestamp':, 'refresh_timestamp': refresh_timestamp }, pickle_file) # Store authorization token in session information to be used with API calls. auth_token = "Bearer {0}".format(access_token) update_session("Authorization", auth_token) update_session("apikey", client_id) set_login_state(True) return auth_token
[docs]def generate_encryption_passcode(): """ Returns an encryption key to be used for logging in. :returns: Returns a byte object to be used with cryptography. """ return Fernet.generate_key().decode()