Source code for instapi.client

# Copyright (c) 2017 https://github.com/ping
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT

import logging
import hmac
import hashlib
import uuid
import json
import re
import time
import random
from datetime import datetime
import gzip
from io import BytesIO
import warnings
from socket import timeout, error as SocketError
from ssl import SSLError
from .compat import (
    compat_urllib_parse,
    compat_urllib_error,
    compat_urllib_request,
    compat_urllib_parse_urlparse,
    compat_http_client,
)
from .errors import (
    ErrorHandler,
    ClientError,
    ClientLoginRequiredError,
    ClientCookieExpiredError,
    ClientConnectionError,
)

try:  # Python 3:
    # Not a no-op, we're adding this to the namespace so it can be imported.
    ConnectionError = ConnectionError  # pylint: disable=redefined-builtin
except NameError:  # Python 2:

    class ConnectionError(Exception):
        pass


from .constants import Constants
from .http import ClientCookieJar
from .endpoints import (
    AccountsEndpointsMixin,
    DiscoverEndpointsMixin,
    FeedEndpointsMixin,
    FriendshipsEndpointsMixin,
    LiveEndpointsMixin,
    MediaEndpointsMixin,
    MiscEndpointsMixin,
    LocationsEndpointsMixin,
    TagsEndpointsMixin,
    UsersEndpointsMixin,
    UploadEndpointsMixin,
    UsertagsEndpointsMixin,
    CollectionsEndpointsMixin,
    HighlightsEndpointsMixin,
    IGTVEndpointsMixin,
    InsightsEndpointsMixin,
    AddressBookEndpointMixin,
    ClientDeprecationWarning,
    ClientPendingDeprecationWarning,
    ClientExperimentalWarning,
)

logger = logging.getLogger(__name__)
# Force Client deprecation warnings to always appear
warnings.simplefilter('always', ClientDeprecationWarning)
warnings.simplefilter('always', ClientPendingDeprecationWarning)
warnings.simplefilter('default', ClientExperimentalWarning)


[docs]class Client( AccountsEndpointsMixin, DiscoverEndpointsMixin, FeedEndpointsMixin, FriendshipsEndpointsMixin, LiveEndpointsMixin, MediaEndpointsMixin, MiscEndpointsMixin, LocationsEndpointsMixin, TagsEndpointsMixin, UsersEndpointsMixin, UploadEndpointsMixin, UsertagsEndpointsMixin, CollectionsEndpointsMixin, HighlightsEndpointsMixin, IGTVEndpointsMixin, InsightsEndpointsMixin, AddressBookEndpointMixin, object, ): """Main API client class for the private app api.""" API_URL = 'https://i.instagram.com/api/{version!s}/' USER_AGENT = Constants.USER_AGENT IG_SIG_KEY = Constants.IG_SIG_KEY IG_CAPABILITIES = Constants.IG_CAPABILITIES SIG_KEY_VERSION = Constants.SIG_KEY_VERSION APPLICATION_ID = Constants.APPLICATION_ID
[docs] def __init__(self, username, password, **kwargs): """ :param username: Login username :param password: Login password :param kwargs: See below :Keyword Arguments: - **auto_patch**: Patch the api objects to match the public API. Default: False - **drop_incompat_key**: Remove api object keys that is not in the public API. Default: False - **timeout**: Timeout interval in seconds. Default: 15 - **api_url**: Override the default api url base - **cookie**: Saved cookie string from a previous session - **settings**: A dict of settings from a previous session - **on_login**: Callback after successful login - **proxy**: Specify a proxy ex: 'http://127.0.0.1:8888' (ALPHA) - **proxy_handler**: Specify your own proxy handler :return: """ self.username = username self.password = password self.auto_patch = kwargs.pop('auto_patch', False) self.drop_incompat_keys = kwargs.pop('drop_incompat_keys', False) self.api_url = kwargs.pop('api_url', None) or self.API_URL self.timeout = kwargs.pop('timeout', 15) self.on_login = kwargs.pop('on_login', None) self.logger = logger user_settings = kwargs.pop('settings', None) or {} self.uuid = ( kwargs.pop('guid', None) or kwargs.pop('uuid', None) or user_settings.get('uuid') or self.generate_uuid(False) ) self.device_id = ( kwargs.pop('device_id', None) or user_settings.get('device_id') or self.generate_deviceid() ) # application session ID self.session_id = ( kwargs.pop('session_id', None) or user_settings.get('session_id') or self.generate_uuid(False) ) self.signature_key = ( kwargs.pop('signature_key', None) or user_settings.get('signature_key') or self.IG_SIG_KEY ) self.key_version = ( kwargs.pop('key_version', None) or user_settings.get('key_version') or self.SIG_KEY_VERSION ) self.ig_capabilities = ( kwargs.pop('ig_capabilities', None) or user_settings.get('ig_capabilities') or self.IG_CAPABILITIES ) self.application_id = ( kwargs.pop('application_id', None) or user_settings.get('application_id') or self.APPLICATION_ID ) # to maintain backward compat for user_agent kwarg custom_ua = kwargs.pop('user_agent', '') or user_settings.get('user_agent') if custom_ua: self.user_agent = custom_ua else: self.app_version = ( kwargs.pop('app_version', None) or user_settings.get('app_version') or Constants.APP_VERSION ) self.android_release = ( kwargs.pop('android_release', None) or user_settings.get('android_release') or Constants.ANDROID_RELEASE ) self.android_version = int( kwargs.pop('android_version', None) or user_settings.get('android_version') or Constants.ANDROID_VERSION ) self.phone_manufacturer = ( kwargs.pop('phone_manufacturer', None) or user_settings.get('phone_manufacturer') or Constants.PHONE_MANUFACTURER ) self.phone_device = ( kwargs.pop('phone_device', None) or user_settings.get('phone_device') or Constants.PHONE_DEVICE ) self.phone_model = ( kwargs.pop('phone_model', None) or user_settings.get('phone_model') or Constants.PHONE_MODEL ) self.phone_dpi = ( kwargs.pop('phone_dpi', None) or user_settings.get('phone_dpi') or Constants.PHONE_DPI ) self.phone_resolution = ( kwargs.pop('phone_resolution', None) or user_settings.get('phone_resolution') or Constants.PHONE_RESOLUTION ) self.phone_chipset = ( kwargs.pop('phone_chipset', None) or user_settings.get('phone_chipset') or Constants.PHONE_CHIPSET ) self.version_code = ( kwargs.pop('version_code', None) or user_settings.get('version_code') or Constants.VERSION_CODE ) cookie_string = kwargs.pop('cookie', None) or user_settings.get('cookie') cookie_jar = ClientCookieJar(cookie_string=cookie_string) if ( cookie_string and cookie_jar.auth_expires and int(time.time()) >= cookie_jar.auth_expires ): raise ClientCookieExpiredError( 'Cookie expired at {0!s}'.format(cookie_jar.auth_expires) ) cookie_handler = compat_urllib_request.HTTPCookieProcessor(cookie_jar) proxy_handler = kwargs.pop('proxy_handler', None) if not proxy_handler: proxy = kwargs.pop('proxy', None) if proxy: warnings.warn('Proxy support is alpha.', UserWarning) parsed_url = compat_urllib_parse_urlparse(proxy) if parsed_url.netloc and parsed_url.scheme: proxy_address = '{0!s}://{1!s}'.format( parsed_url.scheme, parsed_url.netloc ) proxy_handler = compat_urllib_request.ProxyHandler( {'https': proxy_address} ) else: raise ValueError('Invalid proxy argument: {0!s}'.format(proxy)) handlers = [] if proxy_handler: handlers.append(proxy_handler) # Allow user to override custom ssl context where possible custom_ssl_context = kwargs.pop('custom_ssl_context', None) try: https_handler = compat_urllib_request.HTTPSHandler( context=custom_ssl_context ) except TypeError: # py version < 2.7.9 https_handler = compat_urllib_request.HTTPSHandler() handlers.extend( [compat_urllib_request.HTTPHandler(), https_handler, cookie_handler] ) opener = compat_urllib_request.build_opener(*handlers) opener.cookie_jar = cookie_jar self.opener = opener # ad_id must be initialised after cookie_jar/opener because # it relies on self.authenticated_user_name self.ad_id = ( kwargs.pop('ad_id', None) or user_settings.get('ad_id') or self.generate_adid() ) if ( not cookie_string ): # [TODO] There's probably a better way than to depend on cookie_string if not self.username or not self.password: raise ClientLoginRequiredError('login_required', code=400) self.login() self.logger.debug('USERAGENT: {0!s}'.format(self.user_agent)) super(Client, self).__init__()
@property def settings(self): """Helper property that extracts the settings that you should cache in addition to username and password.""" return { 'uuid': self.uuid, 'device_id': self.device_id, 'ad_id': self.ad_id, 'session_id': self.session_id, 'cookie': self.cookie_jar.dump(), 'created_ts': int(time.time()), } @property def user_agent(self): """Returns the useragent string that the client is currently using.""" return Constants.USER_AGENT_FORMAT.format( **{ 'app_version': self.app_version, 'android_version': self.android_version, 'android_release': self.android_release, 'brand': self.phone_manufacturer, 'device': self.phone_device, 'model': self.phone_model, 'dpi': self.phone_dpi, 'resolution': self.phone_resolution, 'chipset': self.phone_chipset, 'version_code': self.version_code, } ) @user_agent.setter def user_agent(self, value): """Override the useragent string with your own""" mobj = re.search(Constants.USER_AGENT_EXPRESSION, value) if not mobj: raise ValueError( 'User-agent specified does not fit format required: {0!s}'.format( Constants.USER_AGENT_EXPRESSION ) ) self.app_version = mobj.group('app_version') self.android_release = mobj.group('android_release') self.android_version = int(mobj.group('android_version')) self.phone_manufacturer = mobj.group('manufacturer') self.phone_device = mobj.group('device') self.phone_model = mobj.group('model') self.phone_dpi = mobj.group('dpi') self.phone_resolution = mobj.group('resolution') self.phone_chipset = mobj.group('chipset') self.version_code = mobj.group('version_code')
[docs] @staticmethod def generate_useragent(**kwargs): """ Helper method to generate a useragent string based on device parameters :param kwargs: - **app_version** - **android_version** - **android_release** - **brand** - **device** - **model** - **dpi** - **resolution** - **chipset** :return: A compatible user agent string """ return Constants.USER_AGENT_FORMAT.format( **{ 'app_version': kwargs.pop('app_version', None) or Constants.APP_VERSION, 'android_version': int( kwargs.pop('android_version', None) or Constants.ANDROID_VERSION ), 'android_release': kwargs.pop('android_release', None) or Constants.ANDROID_RELEASE, 'brand': kwargs.pop('phone_manufacturer', None) or Constants.PHONE_MANUFACTURER, 'device': kwargs.pop('phone_device', None) or Constants.PHONE_DEVICE, 'model': kwargs.pop('phone_model', None) or Constants.PHONE_MODEL, 'dpi': kwargs.pop('phone_dpi', None) or Constants.PHONE_DPI, 'resolution': kwargs.pop('phone_resolution', None) or Constants.PHONE_RESOLUTION, 'chipset': kwargs.pop('phone_chipset', None) or Constants.PHONE_CHIPSET, 'version_code': kwargs.pop('version_code', None) or Constants.VERSION_CODE, } )
[docs] @staticmethod def validate_useragent(value): """ Helper method to validate a useragent string for format correctness :param value: :return: """ mobj = re.search(Constants.USER_AGENT_EXPRESSION, value) if not mobj: raise ValueError( 'User-agent specified does not fit format required: {0!s}'.format( Constants.USER_AGENT_EXPRESSION ) ) parse_params = { 'app_version': mobj.group('app_version'), 'android_version': int(mobj.group('android_version')), 'android_release': mobj.group('android_release'), 'brand': mobj.group('manufacturer'), 'device': mobj.group('device'), 'model': mobj.group('model'), 'dpi': mobj.group('dpi'), 'resolution': mobj.group('resolution'), 'chipset': mobj.group('chipset'), 'version_code': mobj.group('version_code'), } return { 'user_agent': Constants.USER_AGENT_FORMAT.format(**parse_params), 'parsed_params': parse_params, }
def get_cookie_value(self, key, domain=''): now = int(time.time()) eternity = ( now + 100 * 365 * 24 * 60 * 60 ) # future date for non-expiring cookies if not domain: domain = compat_urllib_parse_urlparse(self.API_URL).netloc for cookie in sorted( self.cookie_jar, key=lambda c: c.expires or eternity, reverse=True ): # don't return expired cookie if cookie.expires and cookie.expires < now: continue # cookie domain may be i.instagram.com or .instagram.com cookie_domain = cookie.domain # simple domain matching if cookie_domain.startswith('.'): cookie_domain = cookie_domain[1:] if not domain.endswith(cookie_domain): continue if cookie.name.lower() == key.lower(): return cookie.value return None @property def csrftoken(self): """The client's current csrf token""" return self.get_cookie_value('csrftoken') @property def token(self): """For compatibility. Equivalent to :meth:`csrftoken`""" return self.csrftoken @property def authenticated_user_id(self): """The current authenticated user id""" return self.get_cookie_value('ds_user_id') @property def authenticated_user_name(self): """The current authenticated user name""" return self.get_cookie_value('ds_user') @property def phone_id(self): """Current phone ID. For use in certain functions.""" return self.generate_uuid(return_hex=False, seed=self.device_id) @property def timezone_offset(self): """Timezone offset in seconds. For use in certain functions.""" return int(round((datetime.now() - datetime.utcnow()).total_seconds())) @property def rank_token(self): if not self.authenticated_user_id: return None return '{0!s}_{1!s}'.format(self.authenticated_user_id, self.uuid) @property def authenticated_params(self): return { '_csrftoken': self.csrftoken, '_uuid': self.uuid, '_uid': self.authenticated_user_id, } @property def cookie_jar(self): """The client's cookiejar instance.""" return self.opener.cookie_jar @property def default_headers(self): return { 'User-Agent': self.user_agent, 'Connection': 'close', 'Accept': '*/*', 'Accept-Language': 'en-US', 'Accept-Encoding': 'gzip, deflate', 'X-IG-Capabilities': self.ig_capabilities, 'X-IG-Connection-Type': 'WIFI', 'X-IG-Connection-Speed': '{0:d}kbps'.format(random.randint(1000, 5000)), 'X-IG-App-ID': self.application_id, 'X-IG-Bandwidth-Speed-KBPS': '-1.000', 'X-IG-Bandwidth-TotalBytes-B': '0', 'X-IG-Bandwidth-TotalTime-MS': '0', 'X-FB-HTTP-Engine': Constants.FB_HTTP_ENGINE, } @property def radio_type(self): """For use in certain endpoints""" return 'wifi-none' def _generate_signature(self, data): """ Generates the signature for a data string :param data: content to be signed :return: """ return hmac.new( self.signature_key.encode('ascii'), data.encode('ascii'), digestmod=hashlib.sha256, ).hexdigest()
[docs] @classmethod def generate_uuid(cls, return_hex=False, seed=None): """ Generate uuid :param return_hex: Return in hex format :param seed: Seed value to generate a consistent uuid :return: """ if seed: m = hashlib.md5() m.update(seed.encode('utf-8')) new_uuid = uuid.UUID(m.hexdigest()) else: new_uuid = uuid.uuid1() if return_hex: return new_uuid.hex return str(new_uuid)
[docs] @classmethod def generate_deviceid(cls, seed=None): """ Generate an android device ID :param seed: Seed value to generate a consistent device ID :return: """ return 'android-{0!s}'.format(cls.generate_uuid(True, seed)[:16])
[docs] def generate_adid(self, seed=None): """ Generate an Advertising ID based on the login username since the Google Ad ID is a personally identifying but resettable ID. :return: """ modified_seed = seed or self.authenticated_user_name or self.username if modified_seed: # Do some trivial mangling of original seed sha2 = hashlib.sha256() sha2.update(modified_seed.encode('utf-8')) modified_seed = sha2.hexdigest() return self.generate_uuid(False, modified_seed)
@staticmethod def _read_response(response): """ Extract the response body from a http response. :param response: :return: """ if response.info().get('Content-Encoding') == 'gzip': buf = BytesIO(response.read()) res = gzip.GzipFile(fileobj=buf).read().decode('utf8') else: res = response.read().decode('utf8') return res def _call_api( self, endpoint, params=None, query=None, return_response=False, unsigned=False, version='v1', ): """ Calls the private api. :param endpoint: endpoint path that should end with '/', example 'discover/explore/' :param params: POST parameters :param query: GET url query parameters :param return_response: return the response instead of the parsed json object :param unsigned: use post params as-is without signing :param version: for the versioned api base url. Default 'v1'. :return: """ url = '{0}{1}'.format(self.api_url.format(version=version), endpoint) if query: url += ( '?' if '?' not in endpoint else '&' ) + compat_urllib_parse.urlencode(query) headers = self.default_headers data = None if params or params == '': headers['Content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8' if params == '': # force post if empty string data = ''.encode('ascii') else: if not unsigned: json_params = json.dumps(params, separators=(',', ':')) hash_sig = self._generate_signature(json_params) post_params = { 'ig_sig_key_version': self.key_version, 'signed_body': hash_sig + '.' + json_params, } else: # direct form post post_params = params data = compat_urllib_parse.urlencode(post_params).encode('ascii') req = compat_urllib_request.Request(url, data, headers=headers) try: self.logger.debug('REQUEST: {0!s} {1!s}'.format(url, req.get_method())) self.logger.debug('DATA: {0!s}'.format(data)) response = self.opener.open(req, timeout=self.timeout) except compat_urllib_error.HTTPError as e: error_response = self._read_response(e) self.logger.debug('RESPONSE: {0:d} {1!s}'.format(e.code, error_response)) ErrorHandler.process(e, error_response) except ( SSLError, timeout, SocketError, compat_urllib_error.URLError, # URLError is base of HTTPError compat_http_client.HTTPException, ConnectionError, ) as connection_error: raise ClientConnectionError( '{} {}'.format( connection_error.__class__.__name__, str(connection_error) ) ) if return_response: return response response_content = self._read_response(response) self.logger.debug( 'RESPONSE: {0:d} {1!s}'.format(response.code, response_content) ) json_response = json.loads(response_content) if json_response.get('message', '') == 'login_required': raise ClientLoginRequiredError( json_response.get('message'), code=response.code, error_response=json.dumps(json_response), ) # not from oembed or an ok response if ( not json_response.get('provider_url') and json_response.get('status', '') != 'ok' and not json_response.get('data', '') ): raise ClientError( json_response.get('message', 'Unknown error'), code=response.code, error_response=json.dumps(json_response), ) return json_response