微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python google.appengine.api.urlfetch 模块-make_fetch_call() 实例源码

Python google.appengine.api.urlfetch 模块,make_fetch_call() 实例源码

我们从Python开源项目中,提取了以下7代码示例,用于说明如何使用google.appengine.api.urlfetch.make_fetch_call()

项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def get(self):
        # [START urlfetch-rpc]
        rpc = urlfetch.create_rpc()
        urlfetch.make_fetch_call(rpc, 'http://www.google.com/')

        # ... do other things ...
        try:
            result = rpc.get_result()
            if result.status_code == 200:
                text = result.content
                self.response.write(text)
            else:
                self.response.status_int = result.status_code
                self.response.write('URL returned status code {}'.format(
                    result.status_code))
        except urlfetch.DownloadError:
            self.response.status_int = 500
            self.response.write('Error fetching URL')
        # [END urlfetch-rpc]
项目:theopencorps    作者:theopencorps    | 项目源码 | 文件源码
def request_async(self, resource, **kwargs):
        """
        Convenience for making requests

        Returns ASyncResult object,for which the JSON can
        be retrieved in the future using get_result()
        """
        request_args = self._create_request_args(**kwargs)
        rpc = urlfetch.create_rpc()
        rpc.msg = "%s: %s%s" % (request_args["method"],
                                self._endpoint,
                                resource)
        urlfetch.make_fetch_call(rpc, self._endpoint + resource, **request_args)
        return ASyncResult(rpc, self.log)
项目:theopencorps    作者:theopencorps    | 项目源码 | 文件源码
def request_json(self, valid_codes=(200,), **kwargs):
        """
        Returns a JSON-like object which is actually a future...
        """
        request_args = self._create_request_args(**kwargs)
        rpc = urlfetch.create_rpc()
        rpc.msg = "%s: %s%s" % (request_args["method"], self._endpoint+resource, **request_args)
        return ASyncJSONObject(rpc, self.log, valid_codes=valid_codes)
项目:Utmost_Bot    作者:jiachen247    | 项目源码 | 文件源码
def send_typing(uid):
    data = json.dumps({'chat_id': uid, 'action': 'typing'})
    try:
        rpc = urlfetch.create_rpc()
        urlfetch.make_fetch_call(rpc, url=TELEGRAM_URL_CHAT_ACTION, payload=data,
                                 method=urlfetch.POST, headers=JSON_HEADER)
    except:
        return
项目:instagram-tag-crawler    作者:muik    | 项目源码 | 文件源码
def _fetch_async(self, rpc, tag):
    url = 'https://www.instagram.com/explore/tags/%s/' % tag
    return urlfetch.make_fetch_call(rpc, url)
项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def get(self):
        # [START urlfetch-rpc-callback]
        def handle_result(rpc):
            result = rpc.get_result()
            self.response.write(result.content)
            logging.info('Handling RPC in callback: result {}'.format(result))

        urls = ['http://www.google.com',
                'http://www.github.com',
                'http://www.travis-ci.org']
        rpcs = []
        for url in urls:
            rpc = urlfetch.create_rpc()
            rpc.callback = functools.partial(handle_result, rpc)
            urlfetch.make_fetch_call(rpc, url)
            rpcs.append(rpc)

        # ... do other things ...

        # Finish all RPCs,and let callbacks process the results.

        for rpc in rpcs:
            rpc.wait()

        logging.info('Done waiting for RPCs')
        # [END urlfetch-rpc-callback]
项目:dancedeets-monorepo    作者:mikelAmbert    | 项目源码 | 文件源码
def _create_rpc_for_batch(self, batch_list, use_access_token):
        post_args = {'batch': json.dumps(batch_list)}
        token = self.random_access_token()
        if use_access_token and token:
            post_args["access_token"] = token
        else:
            post_args["access_token"] = '%s|%s' % (facebook.FACEBOOK_CONfig['app_id'], facebook.FACEBOOK_CONfig['secret_key'])
        post_args["include_headers"] = False  # Don't need to see all the caching headers per response
        post_data = None if post_args is None else urls.urlencode(post_args)
        rpc = urlfetch.create_rpc(deadline=DEADLINE)
        urlfetch.make_fetch_call(rpc, "https://graph.facebook.com/", post_data, "POST")
        self.fb_fetches += len(batch_list)
        return rpc, token

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐