"""Contains all functions for the purpose of logging in and out to Robinhood."""
import getpass
import os
import pickle
import random
from robin_stocks.robinhood.helper import *
from robin_stocks.robinhood.urls import *
def generate_device_token():
"""This function will generate a token used when loggin on.
:returns: A string representing the token.
"""
rands = []
for i in range(0, 16):
r = random.random()
rand = 4294967296.0 * r
rands.append((int(rand) >> ((3 & i) << 3)) & 255)
hexa = []
for i in range(0, 256):
hexa.append(str(hex(i+256)).lstrip("0x").rstrip("L")[1:])
id = ""
for i in range(0, 16):
id += hexa[rands[i]]
if (i == 3) or (i == 5) or (i == 7) or (i == 9):
id += "-"
return(id)
def respond_to_challenge(challenge_id, sms_code):
"""This function will post to the challenge url.
:param challenge_id: The challenge id.
:type challenge_id: str
:param sms_code: The sms code.
:type sms_code: str
:returns: The response from requests.
"""
url = challenge_url(challenge_id)
payload = {
'response': sms_code
}
return(request_post(url, payload))
[docs]def login(username=None, password=None, expiresIn=86400, scope='internal', by_sms=True, store_session=True, mfa_code=None, pickle_name=""):
"""This function will effectively log the user into robinhood by getting an
authentication token and saving it to the session header. By default, it
will store the authentication token in a pickle file and load that value
on subsequent logins.
:param username: The username for your robinhood account, usually your email.
Not required if credentials are already cached and valid.
:type username: Optional[str]
:param password: The password for your robinhood account. Not required if
credentials are already cached and valid.
:type password: Optional[str]
:param expiresIn: The time until your login session expires. This is in seconds.
:type expiresIn: Optional[int]
:param scope: Specifies the scope of the authentication.
:type scope: Optional[str]
:param by_sms: Specifies whether to send an email(False) or an sms(True)
:type by_sms: Optional[boolean]
:param store_session: Specifies whether to save the log in authorization
for future log ins.
:type store_session: Optional[boolean]
:param mfa_code: MFA token if enabled.
:type mfa_code: Optional[str]
:param pickle_name: Allows users to name Pickle token file in order to switch
between different accounts without having to re-login every time.
:returns: A dictionary with log in information. The 'access_token' keyword contains the access token, and the 'detail' keyword \
contains information on whether the access token was generated or loaded from pickle file.
"""
device_token = generate_device_token()
home_dir = os.path.expanduser("~")
data_dir = os.path.join(home_dir, ".tokens")
if not os.path.exists(data_dir):
os.makedirs(data_dir)
creds_file = "robinhood" + pickle_name + ".pickle"
pickle_path = os.path.join(data_dir, creds_file)
# Challenge type is used if not logging in with two-factor authentication.
if by_sms:
challenge_type = "sms"
else:
challenge_type = "email"
url = login_url()
payload = {
'client_id': 'c82SH0WZOsabOXGP2sxqcj34FxkvfnWRZBKlBjFS',
'expires_in': expiresIn,
'grant_type': 'password',
'password': password,
'scope': scope,
'username': username,
'challenge_type': challenge_type,
'device_token': device_token
}
if mfa_code:
payload['mfa_code'] = mfa_code
# If authentication has been stored in pickle file then load it. Stops login server from being pinged so much.
if os.path.isfile(pickle_path):
# If store_session has been set to false then delete the pickle file, otherwise try to load it.
# Loading pickle file will fail if the acess_token has expired.
if store_session:
try:
with open(pickle_path, 'rb') as f:
pickle_data = pickle.load(f)
access_token = pickle_data['access_token']
token_type = pickle_data['token_type']
refresh_token = pickle_data['refresh_token']
# Set device_token to be the original device token when first logged in.
pickle_device_token = pickle_data['device_token']
payload['device_token'] = pickle_device_token
# Set login status to True in order to try and get account info.
set_login_state(True)
update_session(
'Authorization', '{0} {1}'.format(token_type, access_token))
# Try to load account profile to check that authorization token is still valid.
res = request_get(
positions_url(), 'pagination', {'nonzero': 'true'}, jsonify_data=False)
# Raises exception is response code is not 200.
res.raise_for_status()
return({'access_token': access_token, 'token_type': token_type,
'expires_in': expiresIn, 'scope': scope, 'detail': 'logged in using authentication in {0}'.format(creds_file),
'backup_code': None, 'refresh_token': refresh_token})
except:
print(
"ERROR: There was an issue loading pickle file. Authentication may be expired - logging in normally.", file=get_output())
set_login_state(False)
update_session('Authorization', None)
else:
os.remove(pickle_path)
# Try to log in normally.
if not username:
username = input("Robinhood username: ")
payload['username'] = username
if not password:
password = getpass.getpass("Robinhood password: ")
payload['password'] = password
data = request_post(url, payload)
# Handle case where mfa or challenge is required.
if data:
if 'mfa_required' in data:
mfa_token = input("Please type in the MFA code: ")
payload['mfa_code'] = mfa_token
res = request_post(url, payload, jsonify_data=False)
while (res.status_code != 200):
mfa_token = input(
"That MFA code was not correct. Please type in another MFA code: ")
payload['mfa_code'] = mfa_token
res = request_post(url, payload, jsonify_data=False)
data = res.json()
elif 'challenge' in data:
challenge_id = data['challenge']['id']
sms_code = input('Enter Robinhood code for validation: ')
res = respond_to_challenge(challenge_id, sms_code)
while 'challenge' in res and res['challenge']['remaining_attempts'] > 0:
sms_code = input('That code was not correct. {0} tries remaining. Please type in another code: '.format(
res['challenge']['remaining_attempts']))
res = respond_to_challenge(challenge_id, sms_code)
update_session(
'X-ROBINHOOD-CHALLENGE-RESPONSE-ID', challenge_id)
data = request_post(url, payload)
# Update Session data with authorization or raise exception with the information present in data.
if 'access_token' in data:
token = '{0} {1}'.format(data['token_type'], data['access_token'])
update_session('Authorization', token)
set_login_state(True)
data['detail'] = "logged in with brand new authentication code."
if store_session:
with open(pickle_path, 'wb') as f:
pickle.dump({'token_type': data['token_type'],
'access_token': data['access_token'],
'refresh_token': data['refresh_token'],
'device_token': payload['device_token']}, f)
else:
raise Exception(data['detail'])
else:
raise Exception('Error: Trouble connecting to robinhood API. Check internet connection.')
return(data)
[docs]@login_required
def logout():
"""Removes authorization from the session header.
:returns: None
"""
set_login_state(False)
update_session('Authorization', None)