#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Frame Manager module."""
import asyncio
from collections import OrderedDict
import logging
from types import SimpleNamespace
from typing import Any, Awaitable, Dict, Generator, List, Optional, Union
from typing import TYPE_CHECKING
from pyee import EventEmitter
from pyppeteer import helper
from pyppeteer.connection import CDPSession
from pyppeteer.element_handle import ElementHandle
from pyppeteer.errors import NetworkError
from pyppeteer.execution_context import ExecutionContext, JSHandle
from pyppeteer.errors import ElementHandleError, PageError, TimeoutError
from pyppeteer.util import merge_dict
if TYPE_CHECKING:
from typing import Set # noqa: F401
logger = logging.getLogger(__name__)
class FrameManager(EventEmitter):
"""FrameManager class."""
Events = SimpleNamespace(
FrameAttached='frameattached',
FrameNavigated='framenavigated',
FrameDetached='framedetached',
LifecycleEvent='lifecycleevent',
FrameNavigatedWithinDocument='framenavigatedwithindocument',
)
def __init__(self, client: CDPSession, frameTree: Dict, page: Any) -> None:
"""Make new frame manager."""
super().__init__()
self._client = client
self._page = page
self._frames: OrderedDict[str, Frame] = OrderedDict()
self._mainFrame: Optional[Frame] = None
self._contextIdToContext: Dict[str, ExecutionContext] = dict()
client.on('Page.frameAttached',
lambda event: self._onFrameAttached(
event.get('frameId', ''), event.get('parentFrameId', ''))
)
client.on('Page.frameNavigated',
lambda event: self._onFrameNavigated(event.get('frame')))
client.on('Page.navigatedWithinDocument',
lambda event: self._onFrameNavigatedWithinDocument(
event.get('frameId'), event.get('url')
))
client.on('Page.frameDetached',
lambda event: self._onFrameDetached(event.get('frameId')))
client.on('Page.frameStoppedLoading',
lambda event: self._onFrameStoppedLoading(
event.get('frameId')
))
client.on('Runtime.executionContextCreated',
lambda event: self._onExecutionContextCreated(
event.get('context')))
client.on('Runtime.executionContextDestroyed',
lambda event: self._onExecutionContextDestroyed(
event.get('executionContextId')))
client.on('Runtime.executionContextsCleared',
lambda event: self._onExecutionContextsCleared())
client.on('Page.lifecycleEvent',
lambda event: self._onLifecycleEvent(event))
self._handleFrameTree(frameTree)
def _onLifecycleEvent(self, event: Dict) -> None:
frame = self._frames.get(event['frameId'])
if not frame:
return
frame._onLifecycleEvent(event['loaderId'], event['name'])
self.emit(FrameManager.Events.LifecycleEvent, frame)
def _onFrameStoppedLoading(self, frameId: str) -> None:
frame = self._frames.get(frameId)
if not frame:
return
frame._onLoadingStopped()
self.emit(FrameManager.Events.LifecycleEvent, frame)
def _handleFrameTree(self, frameTree: Dict) -> None:
frame = frameTree['frame']
if 'parentId' in frame:
self._onFrameAttached(
frame['id'],
frame['parentId'],
)
self._onFrameNavigated(frame)
if 'childFrames' not in frameTree:
return
for child in frameTree['childFrames']:
self._handleFrameTree(child)
@property
def mainFrame(self) -> Optional['Frame']:
"""Return main frame."""
return self._mainFrame
def frames(self) -> List['Frame']:
"""Return all frames."""
return list(self._frames.values())
def frame(self, frameId: str) -> Optional['Frame']:
"""Return :class:`Frame` of ``frameId``."""
return self._frames.get(frameId)
def _onFrameAttached(self, frameId: str, parentFrameId: str) -> None:
if frameId in self._frames:
return
parentFrame = self._frames.get(parentFrameId)
frame = Frame(self._client, parentFrame, frameId)
self._frames[frameId] = frame
self.emit(FrameManager.Events.FrameAttached, frame)
def _onFrameNavigated(self, framePayload: dict) -> None:
isMainFrame = not framePayload.get('parentId')
if isMainFrame:
frame = self._mainFrame
else:
frame = self._frames.get(framePayload.get('id', ''))
if not (isMainFrame or frame):
raise PageError('We either navigate top level or have old version '
'of the navigated frame')
# Detach all child frames first.
if frame:
for child in frame.childFrames:
self._removeFramesRecursively(child)
# Update or create main frame.
_id = framePayload.get('id', '')
if isMainFrame:
if frame:
# Update frame id to retain frame identity on cross-process navigation. # noqa: E501
self._frames.pop(frame._id, None)
frame._id = _id
else:
# Initial main frame navigation.
frame = Frame(self._client, None, _id)
self._frames[_id] = frame
self._mainFrame = frame
# Update frame payload.
frame._navigated(framePayload) # type: ignore
self.emit(FrameManager.Events.FrameNavigated, frame)
def _onFrameNavigatedWithinDocument(self, frameId: str, url: str) -> None:
frame = self._frames.get(frameId)
if not frame:
return
frame._navigatedWithinDocument(url)
self.emit(FrameManager.Events.FrameNavigatedWithinDocument, frame)
self.emit(FrameManager.Events.FrameNavigated, frame)
def _onFrameDetached(self, frameId: str) -> None:
frame = self._frames.get(frameId)
if frame:
self._removeFramesRecursively(frame)
def _onExecutionContextCreated(self, contextPayload: Dict) -> None:
if (contextPayload.get('auxData') and
contextPayload['auxData']['isDefault']):
frameId = contextPayload['auxData']['frameId']
else:
frameId = None
frame = self._frames.get(frameId) if frameId else None
context = ExecutionContext(
self._client,
contextPayload,
lambda obj: self.createJSHandle(contextPayload['id'], obj),
frame,
)
self._contextIdToContext[contextPayload['id']] = context
if frame:
frame._setDefaultContext(context)
def _removeContext(self, context: ExecutionContext) -> None:
frame = self._frames[context._frameId] if context._frameId else None
if frame and context._isDefault:
frame._setDefaultContext(None)
def _onExecutionContextDestroyed(self, executionContextId: str) -> None:
context = self._contextIdToContext.get(executionContextId)
if not context:
return
del self._contextIdToContext[executionContextId]
self._removeContext(context)
def _onExecutionContextsCleared(self) -> None:
for context in self._contextIdToContext.values():
self._removeContext(context)
self._contextIdToContext.clear()
def createJSHandle(self, contextId: str, remoteObject: Dict = None
) -> JSHandle:
"""Create JS handle associated to the context id and remote object."""
if remoteObject is None:
remoteObject = dict()
context = self._contextIdToContext.get(contextId)
if not context:
raise ElementHandleError(f'missing context with id = {contextId}')
if remoteObject.get('subtype') == 'node':
return ElementHandle(context, self._client, remoteObject,
self._page, self)
return JSHandle(context, self._client, remoteObject)
def _removeFramesRecursively(self, frame: 'Frame') -> None:
for child in frame.childFrames:
self._removeFramesRecursively(child)
frame._detach()
self._frames.pop(frame._id, None)
self.emit(FrameManager.Events.FrameDetached, frame)
[docs]class Frame(object):
"""Frame class.
Frame objects can be obtained via :attr:`pyppeteer.page.Page.mainFrame`.
"""
def __init__(self, client: CDPSession, parentFrame: Optional['Frame'],
frameId: str) -> None:
self._client = client
self._parentFrame = parentFrame
self._url = ''
self._detached = False
self._id = frameId
self._documentPromise: Optional[ElementHandle] = None
self._contextResolveCallback = lambda _: None
self._setDefaultContext(None)
self._waitTasks: Set[WaitTask] = set() # maybe list
self._loaderId = ''
self._lifecycleEvents: Set[str] = set()
self._childFrames: Set[Frame] = set() # maybe list
if self._parentFrame:
self._parentFrame._childFrames.add(self)
def _setDefaultContext(self, context: Optional[ExecutionContext]) -> None:
if context is not None:
self._contextResolveCallback(context) # type: ignore
self._contextResolveCallback = lambda _: None
for waitTask in self._waitTasks:
self._client._loop.create_task(waitTask.rerun())
else:
self._documentPromise = None
self._contextPromise = self._client._loop.create_future()
self._contextResolveCallback = (
lambda _context: self._contextPromise.set_result(_context)
)
[docs] async def executionContext(self) -> Optional[ExecutionContext]:
"""Return execution context of this frame.
Return :class:`~pyppeteer.execution_context.ExecutionContext`
associated to this frame.
"""
return await self._contextPromise
[docs] async def evaluateHandle(self, pageFunction: str, *args: Any) -> JSHandle:
"""Execute function on this frame.
Details see :meth:`pyppeteer.page.Page.evaluateHandle`.
"""
context = await self.executionContext()
if context is None:
raise PageError('this frame has no context.')
return await context.evaluateHandle(pageFunction, *args)
[docs] async def evaluate(self, pageFunction: str, *args: Any,
force_expr: bool = False) -> Any:
"""Evaluate pageFunction on this frame.
Details see :meth:`pyppeteer.page.Page.evaluate`.
"""
context = await self.executionContext()
if context is None:
raise ElementHandleError('ExecutionContext is None.')
return await context.evaluate(
pageFunction, *args, force_expr=force_expr)
[docs] async def querySelector(self, selector: str) -> Optional[ElementHandle]:
"""Get element which matches `selector` string.
Details see :meth:`pyppeteer.page.Page.querySelector`.
"""
document = await self._document()
value = await document.querySelector(selector)
return value
async def _document(self) -> ElementHandle:
if self._documentPromise:
return self._documentPromise
context = await self.executionContext()
if context is None:
raise PageError('No context exists.')
document = (await context.evaluateHandle('document')).asElement()
self._documentPromise = document
if document is None:
raise PageError('Could not find `document`.')
return document
[docs] async def xpath(self, expression: str) -> List[ElementHandle]:
"""Evaluate the XPath expression.
If there are no such elements in this frame, return an empty list.
:arg str expression: XPath string to be evaluated.
"""
document = await self._document()
value = await document.xpath(expression)
return value
[docs] async def querySelectorEval(self, selector: str, pageFunction: str,
*args: Any) -> Any:
"""Execute function on element which matches selector.
Details see :meth:`pyppeteer.page.Page.querySelectorEval`.
"""
document = await self._document()
return await document.querySelectorEval(selector, pageFunction, *args)
[docs] async def querySelectorAllEval(self, selector: str, pageFunction: str,
*args: Any) -> Optional[Dict]:
"""Execute function on all elements which matches selector.
Details see :meth:`pyppeteer.page.Page.querySelectorAllEval`.
"""
document = await self._document()
value = await document.JJeval(selector, pageFunction, *args)
return value
[docs] async def querySelectorAll(self, selector: str) -> List[ElementHandle]:
"""Get all elements which matches `selector`.
Details see :meth:`pyppeteer.page.Page.querySelectorAll`.
"""
document = await self._document()
value = await document.querySelectorAll(selector)
return value
#: Alias to :meth:`querySelector`
J = querySelector
#: Alias to :meth:`xpath`
Jx = xpath
#: Alias to :meth:`querySelectorEval`
Jeval = querySelectorEval
#: Alias to :meth:`querySelectorAll`
JJ = querySelectorAll
#: Alias to :meth:`querySelectorAllEval`
JJeval = querySelectorAllEval
[docs] async def content(self) -> str:
"""Get the whole HTML contents of the page."""
return await self.evaluate('''
() => {
let retVal = '';
if (document.doctype)
retVal = new XMLSerializer().serializeToString(document.doctype);
if (document.documentElement)
retVal += document.documentElement.outerHTML;
return retVal;
}
'''.strip())
[docs] async def setContent(self, html: str) -> None:
"""Set content to this page."""
func = '''
function(html) {
document.open();
document.write(html);
document.close();
}
'''
await self.evaluate(func, html)
@property
def name(self) -> str:
"""Get frame name."""
return self.__dict__.get('_name', '')
@property
def url(self) -> str:
"""Get url of the frame."""
return self._url
@property
def parentFrame(self) -> Optional['Frame']:
"""Get parent frame.
If this frame is main frame or detached frame, return ``None``.
"""
return self._parentFrame
@property
def childFrames(self) -> List['Frame']:
"""Get child frames."""
return list(self._childFrames)
[docs] def isDetached(self) -> bool:
"""Return ``True`` if this frame is detached.
Otherwise return ``False``.
"""
return self._detached
[docs] async def injectFile(self, filePath: str) -> str:
"""[Deprecated] Inject file to the frame."""
logger.warning('`injectFile` method is deprecated.'
' Use `addScriptTag` method instead.')
with open(filePath) as f:
contents = f.read()
contents += '/* # sourceURL= {} */'.format(filePath.replace('\n', ''))
return await self.evaluate(contents)
[docs] async def addScriptTag(self, options: Dict) -> ElementHandle: # noqa: C901
"""Add script tag to this frame.
Details see :meth:`pyppeteer.page.Page.addScriptTag`.
"""
context = await self.executionContext()
if context is None:
raise ElementHandleError('ExecutionContext is None.')
addScriptUrl = '''
async function addScriptUrl(url, type) {
const script = document.createElement('script');
script.src = url;
if (type)
script.type = type;
const promise = new Promise((res, rej) => {
script.onload = res;
script.onerror = rej;
});
document.head.appendChild(script);
await promise;
return script;
}'''
addScriptContent = '''
function addScriptContent(content, type = 'text/javascript') {
const script = document.createElement('script');
script.type = type;
script.text = content;
let error = null;
script.onerror = e => error = e;
document.head.appendChild(script);
if (error)
throw error;
return script;
}'''
if isinstance(options.get('url'), str):
url = options['url']
args = [addScriptUrl, url]
if 'type' in options:
args.append(options['type'])
try:
return (await context.evaluateHandle(*args) # type: ignore
).asElement()
except ElementHandleError as e:
raise PageError(f'Loading script from {url} failed') from e
if isinstance(options.get('path'), str):
with open(options['path']) as f:
contents = f.read()
contents = contents + '//# sourceURL={}'.format(
options['path'].replace('\n', ''))
args = [addScriptContent, contents]
if 'type' in options:
args.append(options['type'])
return (await context.evaluateHandle(*args) # type: ignore
).asElement()
if isinstance(options.get('content'), str):
args = [addScriptContent, options['content']]
if 'type' in options:
args.append(options['type'])
return (await context.evaluateHandle(*args) # type: ignore
).asElement()
raise ValueError(
'Provide an object with a `url`, `path` or `content` property')
[docs] async def addStyleTag(self, options: Dict) -> ElementHandle:
"""Add style tag to this frame.
Details see :meth:`pyppeteer.page.Page.addStyleTag`.
"""
context = await self.executionContext()
if context is None:
raise ElementHandleError('ExecutionContext is None.')
addStyleUrl = '''
async function (url) {
const link = document.createElement('link');
link.rel = 'stylesheet';
link.href = url;
const promise = new Promise((res, rej) => {
link.onload = res;
link.onerror = rej;
});
document.head.appendChild(link);
await promise;
return link;
}'''
addStyleContent = '''
async function (content) {
const style = document.createElement('style');
style.type = 'text/css';
style.appendChild(document.createTextNode(content));
const promise = new Promise((res, rej) => {
style.onload = res;
style.onerror = rej;
});
document.head.appendChild(style);
await promise;
return style;
}'''
if isinstance(options.get('url'), str):
url = options['url']
try:
return (await context.evaluateHandle( # type: ignore
addStyleUrl, url)).asElement()
except ElementHandleError as e:
raise PageError(f'Loading style from {url} failed') from e
if isinstance(options.get('path'), str):
with open(options['path']) as f:
contents = f.read()
contents = contents + '/*# sourceURL={}*/'.format(
options['path'].replace('\n', ''))
return (await context.evaluateHandle( # type: ignore
addStyleContent, contents)).asElement()
if isinstance(options.get('content'), str):
return (await context.evaluateHandle( # type: ignore
addStyleContent, options['content'])).asElement()
raise ValueError(
'Provide an object with a `url`, `path` or `content` property')
[docs] async def click(self, selector: str, options: dict = None, **kwargs: Any
) -> None:
"""Click element which matches ``selector``.
Details see :meth:`pyppeteer.page.Page.click`.
"""
options = merge_dict(options, kwargs)
handle = await self.J(selector)
if not handle:
raise PageError('No node found for selector: ' + selector)
await handle.click(options)
await handle.dispose()
[docs] async def focus(self, selector: str) -> None:
"""Focus element which matches ``selector``.
Details see :meth:`pyppeteer.page.Page.focus`.
"""
handle = await self.J(selector)
if not handle:
raise PageError('No node found for selector: ' + selector)
await self.evaluate('element => element.focus()', handle)
await handle.dispose()
[docs] async def hover(self, selector: str) -> None:
"""Mouse hover the element which matches ``selector``.
Details see :meth:`pyppeteer.page.Page.hover`.
"""
handle = await self.J(selector)
if not handle:
raise PageError('No node found for selector: ' + selector)
await handle.hover()
await handle.dispose()
[docs] async def select(self, selector: str, *values: str) -> List[str]:
"""Select options and return selected values.
Details see :meth:`pyppeteer.page.Page.select`.
"""
for value in values:
if not isinstance(value, str):
raise TypeError(
'Values must be string. '
f'Found {value} of type {type(value)}'
)
return await self.querySelectorEval( # type: ignore
selector, '''
(element, values) => {
if (element.nodeName.toLowerCase() !== 'select')
throw new Error('Element is not a <select> element.');
const options = Array.from(element.options);
element.value = undefined;
for (const option of options) {
option.selected = values.includes(option.value);
if (option.selected && !element.multiple)
break;
}
element.dispatchEvent(new Event('input', { 'bubbles': true }));
element.dispatchEvent(new Event('change', { 'bubbles': true }));
return options.filter(option => option.selected).map(options => options.value)
}
''', values) # noqa: E501
[docs] async def tap(self, selector: str) -> None:
"""Tap the element which matches the ``selector``.
Details see :meth:`pyppeteer.page.Page.tap`.
"""
handle = await self.J(selector)
if not handle:
raise PageError('No node found for selector: ' + selector)
await handle.tap()
await handle.dispose()
[docs] async def type(self, selector: str, text: str, options: dict = None,
**kwargs: Any) -> None:
"""Type ``text`` on the element which matches ``selector``.
Details see :meth:`pyppeteer.page.Page.type`.
"""
options = merge_dict(options, kwargs)
handle = await self.querySelector(selector)
if handle is None:
raise PageError('Cannot find {} on this page'.format(selector))
await handle.type(text, options)
await handle.dispose()
[docs] def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
options: dict = None, *args: Any, **kwargs: Any
) -> Union[Awaitable, 'WaitTask']:
"""Wait until `selectorOrFunctionOrTimeout`.
Details see :meth:`pyppeteer.page.Page.waitFor`.
"""
options = merge_dict(options, kwargs)
if isinstance(selectorOrFunctionOrTimeout, (int, float)):
fut: Awaitable[None] = self._client._loop.create_task(
asyncio.sleep(selectorOrFunctionOrTimeout / 1000))
return fut
if not isinstance(selectorOrFunctionOrTimeout, str):
fut = self._client._loop.create_future()
fut.set_exception(TypeError(
'Unsupported target type: ' +
str(type(selectorOrFunctionOrTimeout))
))
return fut
if args or helper.is_jsfunc(selectorOrFunctionOrTimeout):
return self.waitForFunction(
selectorOrFunctionOrTimeout, options, *args)
if selectorOrFunctionOrTimeout.startswith('//'):
return self.waitForXPath(selectorOrFunctionOrTimeout, options)
return self.waitForSelector(selectorOrFunctionOrTimeout, options)
[docs] def waitForSelector(self, selector: str, options: dict = None,
**kwargs: Any) -> 'WaitTask':
"""Wait until element which matches ``selector`` appears on page.
Details see :meth:`pyppeteer.page.Page.waitForSelector`.
"""
options = merge_dict(options, kwargs)
return self._waitForSelectorOrXPath(selector, False, options)
[docs] def waitForXPath(self, xpath: str, options: dict = None,
**kwargs: Any) -> 'WaitTask':
"""Wait until element which matches ``xpath`` appears on page.
Details see :meth:`pyppeteer.page.Page.waitForXPath`.
"""
options = merge_dict(options, kwargs)
return self._waitForSelectorOrXPath(xpath, True, options)
[docs] def waitForFunction(self, pageFunction: str, options: dict = None,
*args: Any, **kwargs: Any) -> 'WaitTask':
"""Wait until the function completes.
Details see :meth:`pyppeteer.page.Page.waitForFunction`.
"""
options = merge_dict(options, kwargs)
timeout = options.get('timeout', 30000) # msec
polling = options.get('polling', 'raf')
return WaitTask(self, pageFunction, 'function', polling, timeout,
self._client._loop, *args)
def _waitForSelectorOrXPath(self, selectorOrXPath: str, isXPath: bool,
options: dict = None, **kwargs: Any
) -> 'WaitTask':
options = merge_dict(options, kwargs)
timeout = options.get('timeout', 30000)
waitForVisible = bool(options.get('visible'))
waitForHidden = bool(options.get('hidden'))
polling = 'raf' if waitForHidden or waitForVisible else 'mutation'
predicate = '''
(selectorOrXPath, isXPath, waitForVisible, waitForHidden) => {
const node = isXPath
? document.evaluate(selectorOrXPath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue
: document.querySelector(selectorOrXPath);
if (!node)
return waitForHidden;
if (!waitForVisible && !waitForHidden)
return node;
const element = /** @type {Element} */ (node.nodeType === Node.TEXT_NODE ? node.parentElement : node);
const style = window.getComputedStyle(element);
const isVisible = style && style.visibility !== 'hidden' && hasVisibleBoundingBox();
const success = (waitForVisible === isVisible || waitForHidden === !isVisible)
return success ? node : null
function hasVisibleBoundingBox() {
const rect = element.getBoundingClientRect();
return !!(rect.top || rect.bottom || rect.width || rect.height);
}
}
''' # noqa: E501
return WaitTask(
self,
predicate,
f'{"XPath" if isXPath else "selector"} "{selectorOrXPath}"',
polling,
timeout,
self._client._loop,
selectorOrXPath,
isXPath,
waitForVisible,
waitForHidden,
)
[docs] async def title(self) -> str:
"""Get title of the frame."""
return await self.evaluate('() => document.title')
def _navigated(self, framePayload: dict) -> None:
self._name = framePayload.get('name', '')
self._navigationURL = framePayload.get('url', '')
self._url = framePayload.get('url', '')
def _navigatedWithinDocument(self, url: str) -> None:
self._url = url
def _onLifecycleEvent(self, loaderId: str, name: str) -> None:
if name == 'init':
self._loaderId = loaderId
self._lifecycleEvents.clear()
else:
self._lifecycleEvents.add(name)
def _onLoadingStopped(self) -> None:
self._lifecycleEvents.add('DOMContentLoaded')
self._lifecycleEvents.add('load')
def _detach(self) -> None:
for waitTask in self._waitTasks:
waitTask.terminate(
PageError('waitForFunction failed: frame got detached.'))
self._detached = True
if self._parentFrame:
self._parentFrame._childFrames.remove(self)
self._parentFrame = None
class WaitTask(object):
"""WaitTask class.
Instance of this class is awaitable.
"""
def __init__(self, frame: Frame, predicateBody: str, # noqa: C901
title: str, polling: Union[str, int], timeout: float,
loop: asyncio.AbstractEventLoop, *args: Any) -> None:
if isinstance(polling, str):
if polling not in ['raf', 'mutation']:
raise ValueError(f'Unknown polling: {polling}')
elif isinstance(polling, (int, float)):
if polling <= 0:
raise ValueError(
f'Cannot poll with non-positive interval: {polling}'
)
else:
raise ValueError(f'Unknown polling option: {polling}')
self._frame = frame
self._polling = polling
self._timeout = timeout
self._loop = loop
if args or helper.is_jsfunc(predicateBody):
self._predicateBody = f'return ({predicateBody})(...args)'
else:
self._predicateBody = f'return {predicateBody}'
self._args = args
self._runCount = 0
self._terminated = False
self._timeoutError = False
frame._waitTasks.add(self)
self.promise = self._loop.create_future()
async def timer(timeout: Union[int, float]) -> None:
await asyncio.sleep(timeout / 1000)
self._timeoutError = True
self.terminate(TimeoutError(
f'Waiting for {title} failed: timeout {timeout}ms exceeds.'
))
if timeout:
self._timeoutTimer = self._loop.create_task(timer(self._timeout))
self._runningTask = self._loop.create_task(self.rerun())
def __await__(self) -> Generator:
"""Make this class **awaitable**."""
result = yield from self.promise
if isinstance(result, Exception):
raise result
return result
def terminate(self, error: Exception) -> None:
"""Terminate this task."""
self._terminated = True
if not self.promise.done():
self.promise.set_result(error)
self._cleanup()
async def rerun(self) -> None: # noqa: C901
"""Start polling."""
runCount = self._runCount = self._runCount + 1
success: Optional[JSHandle] = None
error = None
try:
context = await self._frame.executionContext()
if context is None:
raise PageError('No execution context.')
success = await context.evaluateHandle(
waitForPredicatePageFunction,
self._predicateBody,
self._polling,
self._timeout,
*self._args,
)
except Exception as e:
error = e
if self.promise.done():
return
if self._terminated or runCount != self._runCount:
if success:
await success.dispose()
return
# Add try/except referring to puppeteer.
try:
if not error and success and (
await self._frame.evaluate('s => !s', success)):
await success.dispose()
return
except NetworkError:
if success is not None:
await success.dispose()
return
# page is navigated and context is destroyed.
# Try again in the new execution context.
if (isinstance(error, NetworkError) and
'Execution context was destroyed' in error.args[0]):
return
# Try again in the new execution context.
if (isinstance(error, NetworkError) and
'Cannot find context with specified id' in error.args[0]):
return
if error:
self.promise.set_exception(error)
else:
self.promise.set_result(success)
self._cleanup()
def _cleanup(self) -> None:
if self._timeout and not self._timeoutError:
self._timeoutTimer.cancel()
self._frame._waitTasks.remove(self)
waitForPredicatePageFunction = """
async function waitForPredicatePageFunction(predicateBody, polling, timeout, ...args) {
const predicate = new Function('...args', predicateBody);
let timedOut = false;
if (timeout)
setTimeout(() => timedOut = true, timeout);
if (polling === 'raf')
return await pollRaf();
if (polling === 'mutation')
return await pollMutation();
if (typeof polling === 'number')
return await pollInterval(polling);
/**
* @return {!Promise<*>}
*/
function pollMutation() {
const success = predicate.apply(null, args);
if (success)
return Promise.resolve(success);
let fulfill;
const result = new Promise(x => fulfill = x);
const observer = new MutationObserver(mutations => {
if (timedOut) {
observer.disconnect();
fulfill();
}
const success = predicate.apply(null, args);
if (success) {
observer.disconnect();
fulfill(success);
}
});
observer.observe(document, {
childList: true,
subtree: true,
attributes: true
});
return result;
}
/**
* @return {!Promise<*>}
*/
function pollRaf() {
let fulfill;
const result = new Promise(x => fulfill = x);
onRaf();
return result;
function onRaf() {
if (timedOut) {
fulfill();
return;
}
const success = predicate.apply(null, args);
if (success)
fulfill(success);
else
requestAnimationFrame(onRaf);
}
}
/**
* @param {number} pollInterval
* @return {!Promise<*>}
*/
function pollInterval(pollInterval) {
let fulfill;
const result = new Promise(x => fulfill = x);
onTimeout();
return result;
function onTimeout() {
if (timedOut) {
fulfill();
return;
}
const success = predicate.apply(null, args);
if (success)
fulfill(success);
else
setTimeout(onTimeout, pollInterval);
}
}
}
""" # noqa: E501