Source code for stream.client

from requests.adapters import HTTPAdapter
from stream import exceptions, serializer
from stream.signing import sign
import logging
import os
import requests
from stream.utils import validate_feed_slug, validate_user_id


logger = logging.getLogger(__name__)


[docs]class StreamClient(object): base_url = 'https://getstream.io/api/' def __init__(self, api_key, api_secret, app_id, version='v1.0', timeout=3.0, base_url=None): ''' Initialize the client with the given api key and secret :param api_key: the api key :param api_secret: the api secret :param app_id: the app id **Example usage**:: import stream # initialize the client client = stream.connect('key', 'secret') # get a feed object feed = client.feed('aggregated:1') # write data to the feed activity_data = {'actor': 1, 'verb': 'tweet', 'object': 1} activity_id = feed.add_activity(activity_data)['id'] activities = feed.get() feed.follow('flat:3') activities = feed.get() feed.unfollow('flat:3') feed.remove_activity(activity_id) ''' self.api_key = api_key self.api_secret = api_secret self.app_id = app_id self.version = version self.timeout = timeout if base_url is not None: self.base_url = base_url if os.environ.get('LOCAL'): self.base_url = 'http://localhost:8000/api/' self.session = requests.Session() # TODO: turn this back on after we verify it doesnt retry on slower requests self.session.mount(self.base_url, HTTPAdapter(max_retries=0))
[docs] def feed(self, feed_slug, user_id): ''' Returns a Feed object :param feed_slug: the slug of the feed :param user_id: the user id ''' from stream.feed import Feed feed_slug = validate_feed_slug(feed_slug) user_id = validate_user_id(user_id) # generate the token feed_id = '%s%s' % (feed_slug, user_id) token = sign(self.api_secret, feed_id) return Feed(self, feed_slug, user_id, token)
[docs] def get_default_params(self): ''' Returns the params with the API key present ''' params = dict(api_key=self.api_key) return params
[docs] def get_full_url(self, relative_url): url = self.base_url + self.version + '/' + relative_url return url
[docs] def get_user_agent(self): from stream import __version__ agent = 'stream-python-client-%s' % __version__ return agent
def _make_request(self, method, relative_url, signature, params=None, data=None): params = params or {} data = data or {} default_params = self.get_default_params() default_params.update(params) headers = {'Authorization': signature} headers['Content-type'] = 'application/json' headers['User-Agent'] = self.get_user_agent() url = self.get_full_url(relative_url) serialized = serializer.dumps(data) response = method(url, data=serialized, headers=headers, params=default_params, timeout=self.timeout) logger.debug('stream api call %s, headers %s data %s', response.url, headers, data) try: parsed_result = serializer.loads(response.text) except ValueError: parsed_result = None if parsed_result.get('exception') or response.status_code >= 500: self.raise_exception(parsed_result, status_code=response.status_code) return parsed_result
[docs] def raise_exception(self, result, status_code): ''' Map the exception code to an exception class and raise it If result.exception and result.detail are available use that Otherwise just raise a generic error ''' from stream.exceptions import get_exception_dict exception_class = exceptions.StreamApiException if result is not None: error_message = result['detail'] exception_fields = result.get('exception_fields') if exception_fields is not None: errors = [] for field, errors in exception_fields.items(): errors.append('Field "%s" errors: %s' % (field, repr(errors))) error_message = '\n'.join(errors) error_code = result.get('code') exception_dict = get_exception_dict() exception_class = exception_dict.get( error_code, exceptions.StreamApiException) else: error_message = 'GetStreamAPI%s' % status_code exception = exception_class(error_message, status_code=status_code) raise exception
[docs] def post(self, *args, **kwargs): ''' Shortcut for make request ''' return self._make_request(self.session.post, *args, **kwargs)
[docs] def get(self, *args, **kwargs): ''' Shortcut for make request ''' return self._make_request(self.session.get, *args, **kwargs)
[docs] def delete(self, *args, **kwargs): ''' Shortcut for make request ''' return self._make_request(self.session.delete, *args, **kwargs)