Source code for pyppeteer.frame_manager

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Frame Manager module."""

import asyncio
from collections import OrderedDict
import logging
from types import SimpleNamespace
from typing import Any, Awaitable, Dict, Generator, List, Optional, Union
from typing import TYPE_CHECKING

from pyee import EventEmitter

from pyppeteer import helper
from pyppeteer.connection import CDPSession
from pyppeteer.element_handle import ElementHandle
from pyppeteer.errors import NetworkError
from pyppeteer.execution_context import ExecutionContext, JSHandle
from pyppeteer.errors import ElementHandleError, PageError, TimeoutError
from pyppeteer.util import merge_dict

if TYPE_CHECKING:
    from typing import Set  # noqa: F401

logger = logging.getLogger(__name__)


class FrameManager(EventEmitter):
    """FrameManager class."""

    Events = SimpleNamespace(
        FrameAttached='frameattached',
        FrameNavigated='framenavigated',
        FrameDetached='framedetached',
        LifecycleEvent='lifecycleevent',
        FrameNavigatedWithinDocument='framenavigatedwithindocument',
    )

    def __init__(self, client: CDPSession, frameTree: Dict, page: Any) -> None:
        """Make new frame manager."""
        super().__init__()
        self._client = client
        self._page = page
        self._frames: OrderedDict[str, Frame] = OrderedDict()
        self._mainFrame: Optional[Frame] = None
        self._contextIdToContext: Dict[str, ExecutionContext] = dict()

        client.on('Page.frameAttached',
                  lambda event: self._onFrameAttached(
                      event.get('frameId', ''), event.get('parentFrameId', ''))
                  )
        client.on('Page.frameNavigated',
                  lambda event: self._onFrameNavigated(event.get('frame')))
        client.on('Page.navigatedWithinDocument',
                  lambda event: self._onFrameNavigatedWithinDocument(
                      event.get('frameId'), event.get('url')
                  ))
        client.on('Page.frameDetached',
                  lambda event: self._onFrameDetached(event.get('frameId')))
        client.on('Page.frameStoppedLoading',
                  lambda event: self._onFrameStoppedLoading(
                      event.get('frameId')
                  ))
        client.on('Runtime.executionContextCreated',
                  lambda event: self._onExecutionContextCreated(
                      event.get('context')))
        client.on('Runtime.executionContextDestroyed',
                  lambda event: self._onExecutionContextDestroyed(
                      event.get('executionContextId')))
        client.on('Runtime.executionContextsCleared',
                  lambda event: self._onExecutionContextsCleared())
        client.on('Page.lifecycleEvent',
                  lambda event: self._onLifecycleEvent(event))

        self._handleFrameTree(frameTree)

    def _onLifecycleEvent(self, event: Dict) -> None:
        frame = self._frames.get(event['frameId'])
        if not frame:
            return
        frame._onLifecycleEvent(event['loaderId'], event['name'])
        self.emit(FrameManager.Events.LifecycleEvent, frame)

    def _onFrameStoppedLoading(self, frameId: str) -> None:
        frame = self._frames.get(frameId)
        if not frame:
            return
        frame._onLoadingStopped()
        self.emit(FrameManager.Events.LifecycleEvent, frame)

    def _handleFrameTree(self, frameTree: Dict) -> None:
        frame = frameTree['frame']
        if 'parentId' in frame:
            self._onFrameAttached(
                frame['id'],
                frame['parentId'],
            )
        self._onFrameNavigated(frame)
        if 'childFrames' not in frameTree:
            return
        for child in frameTree['childFrames']:
            self._handleFrameTree(child)

    @property
    def mainFrame(self) -> Optional['Frame']:
        """Return main frame."""
        return self._mainFrame

    def frames(self) -> List['Frame']:
        """Return all frames."""
        return list(self._frames.values())

    def frame(self, frameId: str) -> Optional['Frame']:
        """Return :class:`Frame` of ``frameId``."""
        return self._frames.get(frameId)

    def _onFrameAttached(self, frameId: str, parentFrameId: str) -> None:
        if frameId in self._frames:
            return
        parentFrame = self._frames.get(parentFrameId)
        frame = Frame(self._client, parentFrame, frameId)
        self._frames[frameId] = frame
        self.emit(FrameManager.Events.FrameAttached, frame)

    def _onFrameNavigated(self, framePayload: dict) -> None:
        isMainFrame = not framePayload.get('parentId')
        if isMainFrame:
            frame = self._mainFrame
        else:
            frame = self._frames.get(framePayload.get('id', ''))
        if not (isMainFrame or frame):
            raise PageError('We either navigate top level or have old version '
                            'of the navigated frame')

        # Detach all child frames first.
        if frame:
            for child in frame.childFrames:
                self._removeFramesRecursively(child)

        # Update or create main frame.
        _id = framePayload.get('id', '')
        if isMainFrame:
            if frame:
                # Update frame id to retain frame identity on cross-process navigation.  # noqa: E501
                self._frames.pop(frame._id, None)
                frame._id = _id
            else:
                # Initial main frame navigation.
                frame = Frame(self._client, None, _id)
            self._frames[_id] = frame
            self._mainFrame = frame

        # Update frame payload.
        frame._navigated(framePayload)  # type: ignore
        self.emit(FrameManager.Events.FrameNavigated, frame)

    def _onFrameNavigatedWithinDocument(self, frameId: str, url: str) -> None:
        frame = self._frames.get(frameId)
        if not frame:
            return
        frame._navigatedWithinDocument(url)
        self.emit(FrameManager.Events.FrameNavigatedWithinDocument, frame)
        self.emit(FrameManager.Events.FrameNavigated, frame)

    def _onFrameDetached(self, frameId: str) -> None:
        frame = self._frames.get(frameId)
        if frame:
            self._removeFramesRecursively(frame)

    def _onExecutionContextCreated(self, contextPayload: Dict) -> None:
        if (contextPayload.get('auxData') and
                contextPayload['auxData']['isDefault']):
            frameId = contextPayload['auxData']['frameId']
        else:
            frameId = None

        frame = self._frames.get(frameId) if frameId else None

        context = ExecutionContext(
            self._client,
            contextPayload,
            lambda obj: self.createJSHandle(contextPayload['id'], obj),
            frame,
        )
        self._contextIdToContext[contextPayload['id']] = context

        if frame:
            frame._setDefaultContext(context)

    def _removeContext(self, context: ExecutionContext) -> None:
        frame = self._frames[context._frameId] if context._frameId else None
        if frame and context._isDefault:
            frame._setDefaultContext(None)

    def _onExecutionContextDestroyed(self, executionContextId: str) -> None:
        context = self._contextIdToContext.get(executionContextId)
        if not context:
            return
        del self._contextIdToContext[executionContextId]
        self._removeContext(context)

    def _onExecutionContextsCleared(self) -> None:
        for context in self._contextIdToContext.values():
            self._removeContext(context)
        self._contextIdToContext.clear()

    def createJSHandle(self, contextId: str, remoteObject: Dict = None
                       ) -> JSHandle:
        """Create JS handle associated to the context id and remote object."""
        if remoteObject is None:
            remoteObject = dict()
        context = self._contextIdToContext.get(contextId)
        if not context:
            raise ElementHandleError(f'missing context with id = {contextId}')
        if remoteObject.get('subtype') == 'node':
            return ElementHandle(context, self._client, remoteObject,
                                 self._page, self)
        return JSHandle(context, self._client, remoteObject)

    def _removeFramesRecursively(self, frame: 'Frame') -> None:
        for child in frame.childFrames:
            self._removeFramesRecursively(child)
        frame._detach()
        self._frames.pop(frame._id, None)
        self.emit(FrameManager.Events.FrameDetached, frame)


[docs]class Frame(object): """Frame class. Frame objects can be obtained via :attr:`pyppeteer.page.Page.mainFrame`. """ def __init__(self, client: CDPSession, parentFrame: Optional['Frame'], frameId: str) -> None: self._client = client self._parentFrame = parentFrame self._url = '' self._detached = False self._id = frameId self._documentPromise: Optional[ElementHandle] = None self._contextResolveCallback = lambda _: None self._setDefaultContext(None) self._waitTasks: Set[WaitTask] = set() # maybe list self._loaderId = '' self._lifecycleEvents: Set[str] = set() self._childFrames: Set[Frame] = set() # maybe list if self._parentFrame: self._parentFrame._childFrames.add(self) def _setDefaultContext(self, context: Optional[ExecutionContext]) -> None: if context is not None: self._contextResolveCallback(context) # type: ignore self._contextResolveCallback = lambda _: None for waitTask in self._waitTasks: self._client._loop.create_task(waitTask.rerun()) else: self._documentPromise = None self._contextPromise = self._client._loop.create_future() self._contextResolveCallback = ( lambda _context: self._contextPromise.set_result(_context) )
[docs] async def executionContext(self) -> Optional[ExecutionContext]: """Return execution context of this frame. Return :class:`~pyppeteer.execution_context.ExecutionContext` associated to this frame. """ return await self._contextPromise
[docs] async def evaluateHandle(self, pageFunction: str, *args: Any) -> JSHandle: """Execute function on this frame. Details see :meth:`pyppeteer.page.Page.evaluateHandle`. """ context = await self.executionContext() if context is None: raise PageError('this frame has no context.') return await context.evaluateHandle(pageFunction, *args)
[docs] async def evaluate(self, pageFunction: str, *args: Any, force_expr: bool = False) -> Any: """Evaluate pageFunction on this frame. Details see :meth:`pyppeteer.page.Page.evaluate`. """ context = await self.executionContext() if context is None: raise ElementHandleError('ExecutionContext is None.') return await context.evaluate( pageFunction, *args, force_expr=force_expr)
[docs] async def querySelector(self, selector: str) -> Optional[ElementHandle]: """Get element which matches `selector` string. Details see :meth:`pyppeteer.page.Page.querySelector`. """ document = await self._document() value = await document.querySelector(selector) return value
async def _document(self) -> ElementHandle: if self._documentPromise: return self._documentPromise context = await self.executionContext() if context is None: raise PageError('No context exists.') document = (await context.evaluateHandle('document')).asElement() self._documentPromise = document if document is None: raise PageError('Could not find `document`.') return document
[docs] async def xpath(self, expression: str) -> List[ElementHandle]: """Evaluate the XPath expression. If there are no such elements in this frame, return an empty list. :arg str expression: XPath string to be evaluated. """ document = await self._document() value = await document.xpath(expression) return value
[docs] async def querySelectorEval(self, selector: str, pageFunction: str, *args: Any) -> Any: """Execute function on element which matches selector. Details see :meth:`pyppeteer.page.Page.querySelectorEval`. """ document = await self._document() return await document.querySelectorEval(selector, pageFunction, *args)
[docs] async def querySelectorAllEval(self, selector: str, pageFunction: str, *args: Any) -> Optional[Dict]: """Execute function on all elements which matches selector. Details see :meth:`pyppeteer.page.Page.querySelectorAllEval`. """ document = await self._document() value = await document.JJeval(selector, pageFunction, *args) return value
[docs] async def querySelectorAll(self, selector: str) -> List[ElementHandle]: """Get all elements which matches `selector`. Details see :meth:`pyppeteer.page.Page.querySelectorAll`. """ document = await self._document() value = await document.querySelectorAll(selector) return value
#: Alias to :meth:`querySelector` J = querySelector #: Alias to :meth:`xpath` Jx = xpath #: Alias to :meth:`querySelectorEval` Jeval = querySelectorEval #: Alias to :meth:`querySelectorAll` JJ = querySelectorAll #: Alias to :meth:`querySelectorAllEval` JJeval = querySelectorAllEval
[docs] async def content(self) -> str: """Get the whole HTML contents of the page.""" return await self.evaluate(''' () => { let retVal = ''; if (document.doctype) retVal = new XMLSerializer().serializeToString(document.doctype); if (document.documentElement) retVal += document.documentElement.outerHTML; return retVal; } '''.strip())
[docs] async def setContent(self, html: str) -> None: """Set content to this page.""" func = ''' function(html) { document.open(); document.write(html); document.close(); } ''' await self.evaluate(func, html)
@property def name(self) -> str: """Get frame name.""" return self.__dict__.get('_name', '') @property def url(self) -> str: """Get url of the frame.""" return self._url @property def parentFrame(self) -> Optional['Frame']: """Get parent frame. If this frame is main frame or detached frame, return ``None``. """ return self._parentFrame @property def childFrames(self) -> List['Frame']: """Get child frames.""" return list(self._childFrames)
[docs] def isDetached(self) -> bool: """Return ``True`` if this frame is detached. Otherwise return ``False``. """ return self._detached
[docs] async def injectFile(self, filePath: str) -> str: """[Deprecated] Inject file to the frame.""" logger.warning('`injectFile` method is deprecated.' ' Use `addScriptTag` method instead.') with open(filePath) as f: contents = f.read() contents += '/* # sourceURL= {} */'.format(filePath.replace('\n', '')) return await self.evaluate(contents)
[docs] async def addScriptTag(self, options: Dict) -> ElementHandle: # noqa: C901 """Add script tag to this frame. Details see :meth:`pyppeteer.page.Page.addScriptTag`. """ context = await self.executionContext() if context is None: raise ElementHandleError('ExecutionContext is None.') addScriptUrl = ''' async function addScriptUrl(url, type) { const script = document.createElement('script'); script.src = url; if (type) script.type = type; const promise = new Promise((res, rej) => { script.onload = res; script.onerror = rej; }); document.head.appendChild(script); await promise; return script; }''' addScriptContent = ''' function addScriptContent(content, type = 'text/javascript') { const script = document.createElement('script'); script.type = type; script.text = content; let error = null; script.onerror = e => error = e; document.head.appendChild(script); if (error) throw error; return script; }''' if isinstance(options.get('url'), str): url = options['url'] args = [addScriptUrl, url] if 'type' in options: args.append(options['type']) try: return (await context.evaluateHandle(*args) # type: ignore ).asElement() except ElementHandleError as e: raise PageError(f'Loading script from {url} failed') from e if isinstance(options.get('path'), str): with open(options['path']) as f: contents = f.read() contents = contents + '//# sourceURL={}'.format( options['path'].replace('\n', '')) args = [addScriptContent, contents] if 'type' in options: args.append(options['type']) return (await context.evaluateHandle(*args) # type: ignore ).asElement() if isinstance(options.get('content'), str): args = [addScriptContent, options['content']] if 'type' in options: args.append(options['type']) return (await context.evaluateHandle(*args) # type: ignore ).asElement() raise ValueError( 'Provide an object with a `url`, `path` or `content` property')
[docs] async def addStyleTag(self, options: Dict) -> ElementHandle: """Add style tag to this frame. Details see :meth:`pyppeteer.page.Page.addStyleTag`. """ context = await self.executionContext() if context is None: raise ElementHandleError('ExecutionContext is None.') addStyleUrl = ''' async function (url) { const link = document.createElement('link'); link.rel = 'stylesheet'; link.href = url; const promise = new Promise((res, rej) => { link.onload = res; link.onerror = rej; }); document.head.appendChild(link); await promise; return link; }''' addStyleContent = ''' async function (content) { const style = document.createElement('style'); style.type = 'text/css'; style.appendChild(document.createTextNode(content)); const promise = new Promise((res, rej) => { style.onload = res; style.onerror = rej; }); document.head.appendChild(style); await promise; return style; }''' if isinstance(options.get('url'), str): url = options['url'] try: return (await context.evaluateHandle( # type: ignore addStyleUrl, url)).asElement() except ElementHandleError as e: raise PageError(f'Loading style from {url} failed') from e if isinstance(options.get('path'), str): with open(options['path']) as f: contents = f.read() contents = contents + '/*# sourceURL={}*/'.format( options['path'].replace('\n', '')) return (await context.evaluateHandle( # type: ignore addStyleContent, contents)).asElement() if isinstance(options.get('content'), str): return (await context.evaluateHandle( # type: ignore addStyleContent, options['content'])).asElement() raise ValueError( 'Provide an object with a `url`, `path` or `content` property')
[docs] async def click(self, selector: str, options: dict = None, **kwargs: Any ) -> None: """Click element which matches ``selector``. Details see :meth:`pyppeteer.page.Page.click`. """ options = merge_dict(options, kwargs) handle = await self.J(selector) if not handle: raise PageError('No node found for selector: ' + selector) await handle.click(options) await handle.dispose()
[docs] async def focus(self, selector: str) -> None: """Focus element which matches ``selector``. Details see :meth:`pyppeteer.page.Page.focus`. """ handle = await self.J(selector) if not handle: raise PageError('No node found for selector: ' + selector) await self.evaluate('element => element.focus()', handle) await handle.dispose()
[docs] async def hover(self, selector: str) -> None: """Mouse hover the element which matches ``selector``. Details see :meth:`pyppeteer.page.Page.hover`. """ handle = await self.J(selector) if not handle: raise PageError('No node found for selector: ' + selector) await handle.hover() await handle.dispose()
[docs] async def select(self, selector: str, *values: str) -> List[str]: """Select options and return selected values. Details see :meth:`pyppeteer.page.Page.select`. """ for value in values: if not isinstance(value, str): raise TypeError( 'Values must be string. ' f'Found {value} of type {type(value)}' ) return await self.querySelectorEval( # type: ignore selector, ''' (element, values) => { if (element.nodeName.toLowerCase() !== 'select') throw new Error('Element is not a <select> element.'); const options = Array.from(element.options); element.value = undefined; for (const option of options) { option.selected = values.includes(option.value); if (option.selected && !element.multiple) break; } element.dispatchEvent(new Event('input', { 'bubbles': true })); element.dispatchEvent(new Event('change', { 'bubbles': true })); return options.filter(option => option.selected).map(options => options.value) } ''', values) # noqa: E501
[docs] async def tap(self, selector: str) -> None: """Tap the element which matches the ``selector``. Details see :meth:`pyppeteer.page.Page.tap`. """ handle = await self.J(selector) if not handle: raise PageError('No node found for selector: ' + selector) await handle.tap() await handle.dispose()
[docs] async def type(self, selector: str, text: str, options: dict = None, **kwargs: Any) -> None: """Type ``text`` on the element which matches ``selector``. Details see :meth:`pyppeteer.page.Page.type`. """ options = merge_dict(options, kwargs) handle = await self.querySelector(selector) if handle is None: raise PageError('Cannot find {} on this page'.format(selector)) await handle.type(text, options) await handle.dispose()
[docs] def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float], options: dict = None, *args: Any, **kwargs: Any ) -> Union[Awaitable, 'WaitTask']: """Wait until `selectorOrFunctionOrTimeout`. Details see :meth:`pyppeteer.page.Page.waitFor`. """ options = merge_dict(options, kwargs) if isinstance(selectorOrFunctionOrTimeout, (int, float)): fut: Awaitable[None] = self._client._loop.create_task( asyncio.sleep(selectorOrFunctionOrTimeout / 1000)) return fut if not isinstance(selectorOrFunctionOrTimeout, str): fut = self._client._loop.create_future() fut.set_exception(TypeError( 'Unsupported target type: ' + str(type(selectorOrFunctionOrTimeout)) )) return fut if args or helper.is_jsfunc(selectorOrFunctionOrTimeout): return self.waitForFunction( selectorOrFunctionOrTimeout, options, *args) if selectorOrFunctionOrTimeout.startswith('//'): return self.waitForXPath(selectorOrFunctionOrTimeout, options) return self.waitForSelector(selectorOrFunctionOrTimeout, options)
[docs] def waitForSelector(self, selector: str, options: dict = None, **kwargs: Any) -> 'WaitTask': """Wait until element which matches ``selector`` appears on page. Details see :meth:`pyppeteer.page.Page.waitForSelector`. """ options = merge_dict(options, kwargs) return self._waitForSelectorOrXPath(selector, False, options)
[docs] def waitForXPath(self, xpath: str, options: dict = None, **kwargs: Any) -> 'WaitTask': """Wait until element which matches ``xpath`` appears on page. Details see :meth:`pyppeteer.page.Page.waitForXPath`. """ options = merge_dict(options, kwargs) return self._waitForSelectorOrXPath(xpath, True, options)
[docs] def waitForFunction(self, pageFunction: str, options: dict = None, *args: Any, **kwargs: Any) -> 'WaitTask': """Wait until the function completes. Details see :meth:`pyppeteer.page.Page.waitForFunction`. """ options = merge_dict(options, kwargs) timeout = options.get('timeout', 30000) # msec polling = options.get('polling', 'raf') return WaitTask(self, pageFunction, 'function', polling, timeout, self._client._loop, *args)
def _waitForSelectorOrXPath(self, selectorOrXPath: str, isXPath: bool, options: dict = None, **kwargs: Any ) -> 'WaitTask': options = merge_dict(options, kwargs) timeout = options.get('timeout', 30000) waitForVisible = bool(options.get('visible')) waitForHidden = bool(options.get('hidden')) polling = 'raf' if waitForHidden or waitForVisible else 'mutation' predicate = ''' (selectorOrXPath, isXPath, waitForVisible, waitForHidden) => { const node = isXPath ? document.evaluate(selectorOrXPath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue : document.querySelector(selectorOrXPath); if (!node) return waitForHidden; if (!waitForVisible && !waitForHidden) return node; const element = /** @type {Element} */ (node.nodeType === Node.TEXT_NODE ? node.parentElement : node); const style = window.getComputedStyle(element); const isVisible = style && style.visibility !== 'hidden' && hasVisibleBoundingBox(); const success = (waitForVisible === isVisible || waitForHidden === !isVisible) return success ? node : null function hasVisibleBoundingBox() { const rect = element.getBoundingClientRect(); return !!(rect.top || rect.bottom || rect.width || rect.height); } } ''' # noqa: E501 return WaitTask( self, predicate, f'{"XPath" if isXPath else "selector"} "{selectorOrXPath}"', polling, timeout, self._client._loop, selectorOrXPath, isXPath, waitForVisible, waitForHidden, )
[docs] async def title(self) -> str: """Get title of the frame.""" return await self.evaluate('() => document.title')
def _navigated(self, framePayload: dict) -> None: self._name = framePayload.get('name', '') self._navigationURL = framePayload.get('url', '') self._url = framePayload.get('url', '') def _navigatedWithinDocument(self, url: str) -> None: self._url = url def _onLifecycleEvent(self, loaderId: str, name: str) -> None: if name == 'init': self._loaderId = loaderId self._lifecycleEvents.clear() else: self._lifecycleEvents.add(name) def _onLoadingStopped(self) -> None: self._lifecycleEvents.add('DOMContentLoaded') self._lifecycleEvents.add('load') def _detach(self) -> None: for waitTask in self._waitTasks: waitTask.terminate( PageError('waitForFunction failed: frame got detached.')) self._detached = True if self._parentFrame: self._parentFrame._childFrames.remove(self) self._parentFrame = None
class WaitTask(object): """WaitTask class. Instance of this class is awaitable. """ def __init__(self, frame: Frame, predicateBody: str, # noqa: C901 title: str, polling: Union[str, int], timeout: float, loop: asyncio.AbstractEventLoop, *args: Any) -> None: if isinstance(polling, str): if polling not in ['raf', 'mutation']: raise ValueError(f'Unknown polling: {polling}') elif isinstance(polling, (int, float)): if polling <= 0: raise ValueError( f'Cannot poll with non-positive interval: {polling}' ) else: raise ValueError(f'Unknown polling option: {polling}') self._frame = frame self._polling = polling self._timeout = timeout self._loop = loop if args or helper.is_jsfunc(predicateBody): self._predicateBody = f'return ({predicateBody})(...args)' else: self._predicateBody = f'return {predicateBody}' self._args = args self._runCount = 0 self._terminated = False self._timeoutError = False frame._waitTasks.add(self) self.promise = self._loop.create_future() async def timer(timeout: Union[int, float]) -> None: await asyncio.sleep(timeout / 1000) self._timeoutError = True self.terminate(TimeoutError( f'Waiting for {title} failed: timeout {timeout}ms exceeds.' )) if timeout: self._timeoutTimer = self._loop.create_task(timer(self._timeout)) self._runningTask = self._loop.create_task(self.rerun()) def __await__(self) -> Generator: """Make this class **awaitable**.""" result = yield from self.promise if isinstance(result, Exception): raise result return result def terminate(self, error: Exception) -> None: """Terminate this task.""" self._terminated = True if not self.promise.done(): self.promise.set_result(error) self._cleanup() async def rerun(self) -> None: # noqa: C901 """Start polling.""" runCount = self._runCount = self._runCount + 1 success: Optional[JSHandle] = None error = None try: context = await self._frame.executionContext() if context is None: raise PageError('No execution context.') success = await context.evaluateHandle( waitForPredicatePageFunction, self._predicateBody, self._polling, self._timeout, *self._args, ) except Exception as e: error = e if self.promise.done(): return if self._terminated or runCount != self._runCount: if success: await success.dispose() return # Add try/except referring to puppeteer. try: if not error and success and ( await self._frame.evaluate('s => !s', success)): await success.dispose() return except NetworkError: if success is not None: await success.dispose() return # page is navigated and context is destroyed. # Try again in the new execution context. if (isinstance(error, NetworkError) and 'Execution context was destroyed' in error.args[0]): return # Try again in the new execution context. if (isinstance(error, NetworkError) and 'Cannot find context with specified id' in error.args[0]): return if error: self.promise.set_exception(error) else: self.promise.set_result(success) self._cleanup() def _cleanup(self) -> None: if self._timeout and not self._timeoutError: self._timeoutTimer.cancel() self._frame._waitTasks.remove(self) waitForPredicatePageFunction = """ async function waitForPredicatePageFunction(predicateBody, polling, timeout, ...args) { const predicate = new Function('...args', predicateBody); let timedOut = false; if (timeout) setTimeout(() => timedOut = true, timeout); if (polling === 'raf') return await pollRaf(); if (polling === 'mutation') return await pollMutation(); if (typeof polling === 'number') return await pollInterval(polling); /** * @return {!Promise<*>} */ function pollMutation() { const success = predicate.apply(null, args); if (success) return Promise.resolve(success); let fulfill; const result = new Promise(x => fulfill = x); const observer = new MutationObserver(mutations => { if (timedOut) { observer.disconnect(); fulfill(); } const success = predicate.apply(null, args); if (success) { observer.disconnect(); fulfill(success); } }); observer.observe(document, { childList: true, subtree: true, attributes: true }); return result; } /** * @return {!Promise<*>} */ function pollRaf() { let fulfill; const result = new Promise(x => fulfill = x); onRaf(); return result; function onRaf() { if (timedOut) { fulfill(); return; } const success = predicate.apply(null, args); if (success) fulfill(success); else requestAnimationFrame(onRaf); } } /** * @param {number} pollInterval * @return {!Promise<*>} */ function pollInterval(pollInterval) { let fulfill; const result = new Promise(x => fulfill = x); onTimeout(); return result; function onTimeout() { if (timedOut) { fulfill(); return; } const success = predicate.apply(null, args); if (success) fulfill(success); else setTimeout(onTimeout, pollInterval); } } } """ # noqa: E501