Source code for pyppeteer.tracing

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Tracing module."""

from pathlib import Path
from typing import Any

from pyppeteer.connection import CDPSession
from pyppeteer.util import merge_dict


[docs]class Tracing(object): """Tracing class. You can use :meth:`start` and :meth:`stop` to create a trace file which can be opened in Chrome DevTools or `timeline viewer <https://chromedevtools.github.io/timeline-viewer/>`_. .. code:: await page.tracing.start({'path': 'trace.json'}) await page.goto('https://www.google.com') await page.tracing.stop() """ def __init__(self, client: CDPSession) -> None: self._client = client self._recording = False self._path = ''
[docs] async def start(self, options: dict = None, **kwargs: Any) -> None: """Start tracing. Only one trace can be active at a time per browser. This method accepts the following options: * ``path`` (str): A path to write the trace file to. * ``screenshots`` (bool): Capture screenshots in the trace. * ``categories`` (List[str]): Specify custom categories to use instead of default. """ options = merge_dict(options, kwargs) defaultCategories = [ '-*', 'devtools.timeline', 'v8.execute', 'disabled-by-default-devtools.timeline', 'disabled-by-default-devtools.timeline.frame', 'toplevel', 'blink.console', 'blink.user_timing', 'latencyInfo', 'disabled-by-default-devtools.timeline.stack', 'disabled-by-default-v8.cpu_profiler', 'disabled-by-default-v8.cpu_profiler.hires', ] categoriesArray = options.get('categories', defaultCategories) if 'screenshots' in options: categoriesArray.append('disabled-by-default-devtools.screenshot') self._path = options.get('path', '') self._recording = True await self._client.send('Tracing.start', { 'transferMode': 'ReturnAsStream', 'categories': ','.join(categoriesArray), })
[docs] async def stop(self) -> str: """Stop tracing. :return: trace data as string. """ contentPromise = self._client._loop.create_future() self._client.once( 'Tracing.tracingComplete', lambda event: self._client._loop.create_task( self._readStream(event.get('stream'), self._path) ).add_done_callback( lambda fut: contentPromise.set_result(fut.result()) ) ) await self._client.send('Tracing.end') self._recording = False return await contentPromise
async def _readStream(self, handle: str, path: str) -> str: # might be better to return as bytes eof = False bufs = [] while not eof: response = await self._client.send('IO.read', {'handle': handle}) eof = response.get('eof', False) bufs.append(response.get('data', '')) await self._client.send('IO.close', {'handle': handle}) result = ''.join(bufs) if path: file = Path(path) with file.open('w') as f: f.write(result) return result