Skip to content

API reference

Top-level package — re-exports the public API.

pulser_azure

AzureConnection

AzureConnection(resource_id: str | None = None)

Bases: RemoteConnection

Azure Quantum connection bridge.

Parameters:

Name Type Description Default
resource_id str | None

option resource ID of the Azure workspace, if the provided resource_id is None, the value is loaded from the PULSER_AZURE_RESOURCE_ID environment variable.

None
Source code in src/pulser_azure/connection.py
 93
 94
 95
 96
 97
 98
 99
100
def __init__(self, resource_id: str | None = None):
    credential = DefaultAzureCredential(
        exclude_interactive_browser_credential=False
    )
    self._workspace = Workspace(
        resource_id=resource_id or os.getenv("PULSER_AZURE_RESOURCE_ID"),
        credential=credential,
    )

submit

submit(
    sequence: Sequence,
    wait: bool = False,
    open: bool = False,
    batch_id: str | None = None,
    device_type: DeviceTypeName | None = None,
    backend_configuration: EmulationConfig | None = None,
    **kwargs: Any,
) -> RemoteResults

Submit a job for execution.

Session ownership rule: the caller that opens a session is responsible for closing it.

  • open=True (open_batch): opens a session, returns without submitting jobs. Caller closes via _close_batch on exit.
  • batch_id provided: submits jobs into the caller's session and leaves it open.
  • Neither: opens a session, submits jobs, and closes it before returning.
Source code in src/pulser_azure/connection.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def submit(
    self,
    sequence: Sequence,
    wait: bool = False,
    open: bool = False,
    batch_id: str | None = None,
    device_type: DeviceTypeName | None = None,
    backend_configuration: EmulationConfig | None = None,
    **kwargs: Any,
) -> RemoteResults:
    """Submit a job for execution.

    Session ownership rule: the caller that opens a session is responsible
    for closing it.

    - ``open=True`` (open_batch): opens a session, returns without
      submitting jobs. Caller closes via ``_close_batch`` on exit.
    - ``batch_id`` provided: submits jobs into the caller's session and
      leaves it open.
    - Neither: opens a session, submits jobs, and closes it before
      returning.
    """

    # device_type is set only when using RemoteEmulatorBackend
    if device_type is None:
        target_name = _QPU_DEVICE_NAME_TARGET_NAME_MAP[sequence.device.name]
    else:
        target_name = _EMU_DEVICE_NAME_TARGET_NAME_MAP[device_type]

    target: Pasqal | None = self._workspace.get_targets(  # ty: ignore[invalid-assignment]
        name=target_name, provider_id=_PASQAL_PROVIDER_ID
    )

    if not target:
        raise RuntimeError(
            f"The target {target_name} isn't available on your workspace"
        )

    job_params = make_json_compatible(kwargs.get("job_params", []))

    # Context manager use case (eg: with backend.open_batch()): open a new
    # session and return immediately. The caller (the open_batch context
    # manager) owns this session and must close it on __exit__.
    if open:
        batch_id = self._setup_session(target)

        return RemoteResults(batch_id=batch_id, connection=self)

    owns_session = False

    if batch_id:
        # when fetching the target from azure, the session isn't attached
        # to it even if it is still open
        session = self._workspace.get_session(batch_id)
        target.latest_session = session

        job_ids = self._submit_jobs(
            target=target,
            sequence=sequence,
            open_batch=True,
            job_params=job_params,
            emulation_config=backend_configuration,
        )
    else:
        batch_id = self._setup_session(target)
        owns_session = True

        job_ids = self._submit_jobs(
            target=target,
            sequence=sequence,
            job_params=job_params,
            emulation_config=backend_configuration,
        )

    if wait:
        for job_id in job_ids:
            job = self._workspace.get_job(job_id)
            job.wait_until_completed()

    if owns_session:
        self._close_batch(batch_id)

    return RemoteResults(batch_id=batch_id, connection=self, job_ids=job_ids)

fetch_available_devices

fetch_available_devices() -> dict[str, Device]

Fetches the devices available through this connection.

Source code in src/pulser_azure/connection.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def fetch_available_devices(self) -> dict[str, Device]:
    """Fetches the devices available through this connection."""

    # Only build real QPU devices
    devices = [
        Device.from_abstract_repr(spec["specs"])
        for spec in self._get_device_specs()
    ]

    return {
        _QPU_DEVICE_NAME_TARGET_NAME_MAP[d.name]: d
        for d in devices
        if d.name in _QPU_DEVICE_NAME_TARGET_NAME_MAP
    }

supports_open_batch

supports_open_batch() -> bool

Flag to confirm this class can support creating an open batch.

Source code in src/pulser_azure/connection.py
389
390
391
def supports_open_batch(self) -> bool:
    """Flag to confirm this class can support creating an open batch."""
    return True