Bases: BaseSession[ClientRequest, ClientNotification, ClientResult, ServerRequest, ServerNotification]
Source code in src/mcp/client/session.py103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615 | class ClientSession(
BaseSession[
types.ClientRequest,
types.ClientNotification,
types.ClientResult,
types.ServerRequest,
types.ServerNotification,
]
):
def __init__(
self,
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception],
write_stream: MemoryObjectSendStream[SessionMessage],
read_timeout_seconds: timedelta | None = None,
sampling_callback: SamplingFnT | None = None,
elicitation_callback: ElicitationFnT | None = None,
list_roots_callback: ListRootsFnT | None = None,
logging_callback: LoggingFnT | None = None,
message_handler: MessageHandlerFnT | None = None,
client_info: types.Implementation | None = None,
*,
sampling_capabilities: types.SamplingCapability | None = None,
experimental_task_handlers: ExperimentalTaskHandlers | None = None,
) -> None:
super().__init__(
read_stream,
write_stream,
types.ServerRequest,
types.ServerNotification,
read_timeout_seconds=read_timeout_seconds,
)
self._client_info = client_info or DEFAULT_CLIENT_INFO
self._sampling_callback = sampling_callback or _default_sampling_callback
self._sampling_capabilities = sampling_capabilities
self._elicitation_callback = elicitation_callback or _default_elicitation_callback
self._list_roots_callback = list_roots_callback or _default_list_roots_callback
self._logging_callback = logging_callback or _default_logging_callback
self._message_handler = message_handler or _default_message_handler
self._tool_output_schemas: dict[str, dict[str, Any] | None] = {}
self._server_capabilities: types.ServerCapabilities | None = None
self._experimental_features: ExperimentalClientFeatures | None = None
# Experimental: Task handlers (use defaults if not provided)
self._task_handlers = experimental_task_handlers or ExperimentalTaskHandlers()
async def initialize(self) -> types.InitializeResult:
sampling = (
(self._sampling_capabilities or types.SamplingCapability())
if self._sampling_callback is not _default_sampling_callback
else None
)
elicitation = (
types.ElicitationCapability(
form=types.FormElicitationCapability(),
url=types.UrlElicitationCapability(),
)
if self._elicitation_callback is not _default_elicitation_callback
else None
)
roots = (
# TODO: Should this be based on whether we
# _will_ send notifications, or only whether
# they're supported?
types.RootsCapability(listChanged=True)
if self._list_roots_callback is not _default_list_roots_callback
else None
)
result = await self.send_request(
types.ClientRequest(
types.InitializeRequest(
params=types.InitializeRequestParams(
protocolVersion=types.LATEST_PROTOCOL_VERSION,
capabilities=types.ClientCapabilities(
sampling=sampling,
elicitation=elicitation,
experimental=None,
roots=roots,
tasks=self._task_handlers.build_capability(),
),
clientInfo=self._client_info,
),
)
),
types.InitializeResult,
)
if result.protocolVersion not in SUPPORTED_PROTOCOL_VERSIONS:
raise RuntimeError(f"Unsupported protocol version from the server: {result.protocolVersion}")
self._server_capabilities = result.capabilities
await self.send_notification(types.ClientNotification(types.InitializedNotification()))
return result
def get_server_capabilities(self) -> types.ServerCapabilities | None:
"""Return the server capabilities received during initialization.
Returns None if the session has not been initialized yet.
"""
return self._server_capabilities
@property
def experimental(self) -> ExperimentalClientFeatures:
"""Experimental APIs for tasks and other features.
WARNING: These APIs are experimental and may change without notice.
Example:
status = await session.experimental.get_task(task_id)
result = await session.experimental.get_task_result(task_id, CallToolResult)
"""
if self._experimental_features is None:
self._experimental_features = ExperimentalClientFeatures(self)
return self._experimental_features
async def send_ping(self) -> types.EmptyResult:
"""Send a ping request."""
return await self.send_request(
types.ClientRequest(types.PingRequest()),
types.EmptyResult,
)
async def send_progress_notification(
self,
progress_token: str | int,
progress: float,
total: float | None = None,
message: str | None = None,
) -> None:
"""Send a progress notification."""
await self.send_notification(
types.ClientNotification(
types.ProgressNotification(
params=types.ProgressNotificationParams(
progressToken=progress_token,
progress=progress,
total=total,
message=message,
),
),
)
)
async def set_logging_level(self, level: types.LoggingLevel) -> types.EmptyResult:
"""Send a logging/setLevel request."""
return await self.send_request( # pragma: no cover
types.ClientRequest(
types.SetLevelRequest(
params=types.SetLevelRequestParams(level=level),
)
),
types.EmptyResult,
)
@overload
@deprecated("Use list_resources(params=PaginatedRequestParams(...)) instead")
async def list_resources(self, cursor: str | None) -> types.ListResourcesResult: ...
@overload
async def list_resources(self, *, params: types.PaginatedRequestParams | None) -> types.ListResourcesResult: ...
@overload
async def list_resources(self) -> types.ListResourcesResult: ...
async def list_resources(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListResourcesResult:
"""Send a resources/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
return await self.send_request(
types.ClientRequest(types.ListResourcesRequest(params=request_params)),
types.ListResourcesResult,
)
@overload
@deprecated("Use list_resource_templates(params=PaginatedRequestParams(...)) instead")
async def list_resource_templates(self, cursor: str | None) -> types.ListResourceTemplatesResult: ...
@overload
async def list_resource_templates(
self, *, params: types.PaginatedRequestParams | None
) -> types.ListResourceTemplatesResult: ...
@overload
async def list_resource_templates(self) -> types.ListResourceTemplatesResult: ...
async def list_resource_templates(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListResourceTemplatesResult:
"""Send a resources/templates/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
return await self.send_request(
types.ClientRequest(types.ListResourceTemplatesRequest(params=request_params)),
types.ListResourceTemplatesResult,
)
async def read_resource(self, uri: AnyUrl) -> types.ReadResourceResult:
"""Send a resources/read request."""
return await self.send_request(
types.ClientRequest(
types.ReadResourceRequest(
params=types.ReadResourceRequestParams(uri=uri),
)
),
types.ReadResourceResult,
)
async def subscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:
"""Send a resources/subscribe request."""
return await self.send_request( # pragma: no cover
types.ClientRequest(
types.SubscribeRequest(
params=types.SubscribeRequestParams(uri=uri),
)
),
types.EmptyResult,
)
async def unsubscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:
"""Send a resources/unsubscribe request."""
return await self.send_request( # pragma: no cover
types.ClientRequest(
types.UnsubscribeRequest(
params=types.UnsubscribeRequestParams(uri=uri),
)
),
types.EmptyResult,
)
async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
*,
meta: dict[str, Any] | None = None,
) -> types.CallToolResult:
"""Send a tools/call request with optional progress callback support."""
_meta: types.RequestParams.Meta | None = None
if meta is not None:
_meta = types.RequestParams.Meta(**meta)
result = await self.send_request(
types.ClientRequest(
types.CallToolRequest(
params=types.CallToolRequestParams(name=name, arguments=arguments, _meta=_meta),
)
),
types.CallToolResult,
request_read_timeout_seconds=read_timeout_seconds,
progress_callback=progress_callback,
)
if not result.isError:
await self._validate_tool_result(name, result)
return result
async def _validate_tool_result(self, name: str, result: types.CallToolResult) -> None:
"""Validate the structured content of a tool result against its output schema."""
if name not in self._tool_output_schemas:
# refresh output schema cache
await self.list_tools()
output_schema = None
if name in self._tool_output_schemas:
output_schema = self._tool_output_schemas.get(name)
else:
logger.warning(f"Tool {name} not listed by server, cannot validate any structured content")
if output_schema is not None:
from jsonschema import SchemaError, ValidationError, validate
if result.structuredContent is None:
raise RuntimeError(
f"Tool {name} has an output schema but did not return structured content"
) # pragma: no cover
try:
validate(result.structuredContent, output_schema)
except ValidationError as e:
raise RuntimeError(f"Invalid structured content returned by tool {name}: {e}") # pragma: no cover
except SchemaError as e: # pragma: no cover
raise RuntimeError(f"Invalid schema for tool {name}: {e}") # pragma: no cover
@overload
@deprecated("Use list_prompts(params=PaginatedRequestParams(...)) instead")
async def list_prompts(self, cursor: str | None) -> types.ListPromptsResult: ...
@overload
async def list_prompts(self, *, params: types.PaginatedRequestParams | None) -> types.ListPromptsResult: ...
@overload
async def list_prompts(self) -> types.ListPromptsResult: ...
async def list_prompts(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListPromptsResult:
"""Send a prompts/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
return await self.send_request(
types.ClientRequest(types.ListPromptsRequest(params=request_params)),
types.ListPromptsResult,
)
async def get_prompt(self, name: str, arguments: dict[str, str] | None = None) -> types.GetPromptResult:
"""Send a prompts/get request."""
return await self.send_request(
types.ClientRequest(
types.GetPromptRequest(
params=types.GetPromptRequestParams(name=name, arguments=arguments),
)
),
types.GetPromptResult,
)
async def complete(
self,
ref: types.ResourceTemplateReference | types.PromptReference,
argument: dict[str, str],
context_arguments: dict[str, str] | None = None,
) -> types.CompleteResult:
"""Send a completion/complete request."""
context = None
if context_arguments is not None:
context = types.CompletionContext(arguments=context_arguments)
return await self.send_request(
types.ClientRequest(
types.CompleteRequest(
params=types.CompleteRequestParams(
ref=ref,
argument=types.CompletionArgument(**argument),
context=context,
),
)
),
types.CompleteResult,
)
@overload
@deprecated("Use list_tools(params=PaginatedRequestParams(...)) instead")
async def list_tools(self, cursor: str | None) -> types.ListToolsResult: ...
@overload
async def list_tools(self, *, params: types.PaginatedRequestParams | None) -> types.ListToolsResult: ...
@overload
async def list_tools(self) -> types.ListToolsResult: ...
async def list_tools(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListToolsResult:
"""Send a tools/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
result = await self.send_request(
types.ClientRequest(types.ListToolsRequest(params=request_params)),
types.ListToolsResult,
)
# Cache tool output schemas for future validation
# Note: don't clear the cache, as we may be using a cursor
for tool in result.tools:
self._tool_output_schemas[tool.name] = tool.outputSchema
return result
async def send_roots_list_changed(self) -> None: # pragma: no cover
"""Send a roots/list_changed notification."""
await self.send_notification(types.ClientNotification(types.RootsListChangedNotification()))
async def _received_request(self, responder: RequestResponder[types.ServerRequest, types.ClientResult]) -> None:
ctx = RequestContext[ClientSession, Any](
request_id=responder.request_id,
meta=responder.request_meta,
session=self,
lifespan_context=None,
)
# Delegate to experimental task handler if applicable
if self._task_handlers.handles_request(responder.request):
with responder:
await self._task_handlers.handle_request(ctx, responder)
return None
# Core request handling
match responder.request.root:
case types.CreateMessageRequest(params=params):
with responder:
# Check if this is a task-augmented request
if params.task is not None:
response = await self._task_handlers.augmented_sampling(ctx, params, params.task)
else:
response = await self._sampling_callback(ctx, params)
client_response = ClientResponse.validate_python(response)
await responder.respond(client_response)
case types.ElicitRequest(params=params):
with responder:
# Check if this is a task-augmented request
if params.task is not None:
response = await self._task_handlers.augmented_elicitation(ctx, params, params.task)
else:
response = await self._elicitation_callback(ctx, params)
client_response = ClientResponse.validate_python(response)
await responder.respond(client_response)
case types.ListRootsRequest():
with responder:
response = await self._list_roots_callback(ctx)
client_response = ClientResponse.validate_python(response)
await responder.respond(client_response)
case types.PingRequest(): # pragma: no cover
with responder:
return await responder.respond(types.ClientResult(root=types.EmptyResult()))
case _: # pragma: no cover
pass # Task requests handled above by _task_handlers
return None
async def _handle_incoming(
self,
req: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception,
) -> None:
"""Handle incoming messages by forwarding to the message handler."""
await self._message_handler(req)
async def _received_notification(self, notification: types.ServerNotification) -> None:
"""Handle notifications from the server."""
# Process specific notification types
match notification.root:
case types.LoggingMessageNotification(params=params):
await self._logging_callback(params)
case types.ElicitCompleteNotification(params=params):
# Handle elicitation completion notification
# Clients MAY use this to retry requests or update UI
# The notification contains the elicitationId of the completed elicitation
pass
case _:
pass
|
Return the server capabilities received during initialization.
Returns None if the session has not been initialized yet.
Source code in src/mcp/client/session.py199
200
201
202
203
204 | def get_server_capabilities(self) -> types.ServerCapabilities | None:
"""Return the server capabilities received during initialization.
Returns None if the session has not been initialized yet.
"""
return self._server_capabilities
|
Experimental APIs for tasks and other features.
WARNING: These APIs are experimental and may change without notice.
Examplestatus = await session.experimental.get_task(task_id) result = await session.experimental.get_task_result(task_id, CallToolResult)
Send a ping request.
Source code in src/mcp/client/session.py220
221
222
223
224
225 | async def send_ping(self) -> types.EmptyResult:
"""Send a ping request."""
return await self.send_request(
types.ClientRequest(types.PingRequest()),
types.EmptyResult,
)
|
Send a progress notification.
Source code in src/mcp/client/session.py227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246 | async def send_progress_notification(
self,
progress_token: str | int,
progress: float,
total: float | None = None,
message: str | None = None,
) -> None:
"""Send a progress notification."""
await self.send_notification(
types.ClientNotification(
types.ProgressNotification(
params=types.ProgressNotificationParams(
progressToken=progress_token,
progress=progress,
total=total,
message=message,
),
),
)
)
|
Send a logging/setLevel request.
Source code in src/mcp/client/session.py248
249
250
251
252
253
254
255
256
257 | async def set_logging_level(self, level: types.LoggingLevel) -> types.EmptyResult:
"""Send a logging/setLevel request."""
return await self.send_request( # pragma: no cover
types.ClientRequest(
types.SetLevelRequest(
params=types.SetLevelRequestParams(level=level),
)
),
types.EmptyResult,
)
|
Send a resources/list request.
Parameters:
| cursor | str | None |
Simple cursor string for pagination (deprecated, use params instead) |
None |
| params | PaginatedRequestParams | None |
Full pagination parameters including cursor and any future fields |
None |
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294 | async def list_resources(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListResourcesResult:
"""Send a resources/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
return await self.send_request(
types.ClientRequest(types.ListResourcesRequest(params=request_params)),
types.ListResourcesResult,
)
|
Send a resources/templates/list request.
Parameters:
| cursor | str | None |
Simple cursor string for pagination (deprecated, use params instead) |
None |
| params | PaginatedRequestParams | None |
Full pagination parameters including cursor and any future fields |
None |
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333 | async def list_resource_templates(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListResourceTemplatesResult:
"""Send a resources/templates/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
return await self.send_request(
types.ClientRequest(types.ListResourceTemplatesRequest(params=request_params)),
types.ListResourceTemplatesResult,
)
|
Send a resources/read request.
Source code in src/mcp/client/session.py335
336
337
338
339
340
341
342
343
344 | async def read_resource(self, uri: AnyUrl) -> types.ReadResourceResult:
"""Send a resources/read request."""
return await self.send_request(
types.ClientRequest(
types.ReadResourceRequest(
params=types.ReadResourceRequestParams(uri=uri),
)
),
types.ReadResourceResult,
)
|
Send a resources/subscribe request.
Source code in src/mcp/client/session.py346
347
348
349
350
351
352
353
354
355 | async def subscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:
"""Send a resources/subscribe request."""
return await self.send_request( # pragma: no cover
types.ClientRequest(
types.SubscribeRequest(
params=types.SubscribeRequestParams(uri=uri),
)
),
types.EmptyResult,
)
|
Send a resources/unsubscribe request.
Source code in src/mcp/client/session.py357
358
359
360
361
362
363
364
365
366 | async def unsubscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:
"""Send a resources/unsubscribe request."""
return await self.send_request( # pragma: no cover
types.ClientRequest(
types.UnsubscribeRequest(
params=types.UnsubscribeRequestParams(uri=uri),
)
),
types.EmptyResult,
)
|
Send a tools/call request with optional progress callback support.
Source code in src/mcp/client/session.py368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397 | async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
*,
meta: dict[str, Any] | None = None,
) -> types.CallToolResult:
"""Send a tools/call request with optional progress callback support."""
_meta: types.RequestParams.Meta | None = None
if meta is not None:
_meta = types.RequestParams.Meta(**meta)
result = await self.send_request(
types.ClientRequest(
types.CallToolRequest(
params=types.CallToolRequestParams(name=name, arguments=arguments, _meta=_meta),
)
),
types.CallToolResult,
request_read_timeout_seconds=read_timeout_seconds,
progress_callback=progress_callback,
)
if not result.isError:
await self._validate_tool_result(name, result)
return result
|
Send a prompts/list request.
Parameters:
| cursor | str | None |
Simple cursor string for pagination (deprecated, use params instead) |
None |
| params | PaginatedRequestParams | None |
Full pagination parameters including cursor and any future fields |
None |
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460 | async def list_prompts(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListPromptsResult:
"""Send a prompts/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
return await self.send_request(
types.ClientRequest(types.ListPromptsRequest(params=request_params)),
types.ListPromptsResult,
)
|
Send a prompts/get request.
Source code in src/mcp/client/session.py462
463
464
465
466
467
468
469
470
471 | async def get_prompt(self, name: str, arguments: dict[str, str] | None = None) -> types.GetPromptResult:
"""Send a prompts/get request."""
return await self.send_request(
types.ClientRequest(
types.GetPromptRequest(
params=types.GetPromptRequestParams(name=name, arguments=arguments),
)
),
types.GetPromptResult,
)
|
Send a completion/complete request.
Source code in src/mcp/client/session.py473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495 | async def complete(
self,
ref: types.ResourceTemplateReference | types.PromptReference,
argument: dict[str, str],
context_arguments: dict[str, str] | None = None,
) -> types.CompleteResult:
"""Send a completion/complete request."""
context = None
if context_arguments is not None:
context = types.CompletionContext(arguments=context_arguments)
return await self.send_request(
types.ClientRequest(
types.CompleteRequest(
params=types.CompleteRequestParams(
ref=ref,
argument=types.CompletionArgument(**argument),
context=context,
),
)
),
types.CompleteResult,
)
|
Send a tools/list request.
Parameters:
| cursor | str | None |
Simple cursor string for pagination (deprecated, use params instead) |
None |
| params | PaginatedRequestParams | None |
Full pagination parameters including cursor and any future fields |
None |
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539 | async def list_tools(
self,
cursor: str | None = None,
*,
params: types.PaginatedRequestParams | None = None,
) -> types.ListToolsResult:
"""Send a tools/list request.
Args:
cursor: Simple cursor string for pagination (deprecated, use params instead)
params: Full pagination parameters including cursor and any future fields
"""
if params is not None and cursor is not None:
raise ValueError("Cannot specify both cursor and params")
if params is not None:
request_params = params
elif cursor is not None:
request_params = types.PaginatedRequestParams(cursor=cursor)
else:
request_params = None
result = await self.send_request(
types.ClientRequest(types.ListToolsRequest(params=request_params)),
types.ListToolsResult,
)
# Cache tool output schemas for future validation
# Note: don't clear the cache, as we may be using a cursor
for tool in result.tools:
self._tool_output_schemas[tool.name] = tool.outputSchema
return result
|
Send a roots/list_changed notification.
Source code in src/mcp/client/session.py541
542
543 | async def send_roots_list_changed(self) -> None: # pragma: no cover
"""Send a roots/list_changed notification."""
await self.send_notification(types.ClientNotification(types.RootsListChangedNotification()))
|
Client for managing connections to multiple MCP servers.
This class is responsible for encapsulating management of server connections. It aggregates tools, resources, and prompts from all connected servers.
For auxiliary handlers, such as resource subscription, this is delegated to the client and can be accessed via the session.
Example Usagename_fn = lambda name, server_info: f"{(server_info.name)}_{name}" async with ClientSessionGroup(component_name_hook=name_fn) as group: for server_param in server_params: await group.connect_to_server(server_param) ...
Source code in src/mcp/client/session_group.py 88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447 | class ClientSessionGroup:
"""Client for managing connections to multiple MCP servers.
This class is responsible for encapsulating management of server connections.
It aggregates tools, resources, and prompts from all connected servers.
For auxiliary handlers, such as resource subscription, this is delegated to
the client and can be accessed via the session.
Example Usage:
name_fn = lambda name, server_info: f"{(server_info.name)}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
for server_param in server_params:
await group.connect_to_server(server_param)
...
"""
class _ComponentNames(BaseModel):
"""Used for reverse index to find components."""
prompts: set[str] = set()
resources: set[str] = set()
tools: set[str] = set()
# Standard MCP components.
_prompts: dict[str, types.Prompt]
_resources: dict[str, types.Resource]
_tools: dict[str, types.Tool]
# Client-server connection management.
_sessions: dict[mcp.ClientSession, _ComponentNames]
_tool_to_session: dict[str, mcp.ClientSession]
_exit_stack: contextlib.AsyncExitStack
_session_exit_stacks: dict[mcp.ClientSession, contextlib.AsyncExitStack]
# Optional fn consuming (component_name, serverInfo) for custom names.
# This is provide a means to mitigate naming conflicts across servers.
# Example: (tool_name, serverInfo) => "{result.serverInfo.name}.{tool_name}"
_ComponentNameHook: TypeAlias = Callable[[str, types.Implementation], str]
_component_name_hook: _ComponentNameHook | None
def __init__(
self,
exit_stack: contextlib.AsyncExitStack | None = None,
component_name_hook: _ComponentNameHook | None = None,
) -> None:
"""Initializes the MCP client."""
self._tools = {}
self._resources = {}
self._prompts = {}
self._sessions = {}
self._tool_to_session = {}
if exit_stack is None:
self._exit_stack = contextlib.AsyncExitStack()
self._owns_exit_stack = True
else:
self._exit_stack = exit_stack
self._owns_exit_stack = False
self._session_exit_stacks = {}
self._component_name_hook = component_name_hook
async def __aenter__(self) -> Self: # pragma: no cover
# Enter the exit stack only if we created it ourselves
if self._owns_exit_stack:
await self._exit_stack.__aenter__()
return self
async def __aexit__(
self,
_exc_type: type[BaseException] | None,
_exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> bool | None: # pragma: no cover
"""Closes session exit stacks and main exit stack upon completion."""
# Only close the main exit stack if we created it
if self._owns_exit_stack:
await self._exit_stack.aclose()
# Concurrently close session stacks.
async with anyio.create_task_group() as tg:
for exit_stack in self._session_exit_stacks.values():
tg.start_soon(exit_stack.aclose)
@property
def sessions(self) -> list[mcp.ClientSession]:
"""Returns the list of sessions being managed."""
return list(self._sessions.keys()) # pragma: no cover
@property
def prompts(self) -> dict[str, types.Prompt]:
"""Returns the prompts as a dictionary of names to prompts."""
return self._prompts
@property
def resources(self) -> dict[str, types.Resource]:
"""Returns the resources as a dictionary of names to resources."""
return self._resources
@property
def tools(self) -> dict[str, types.Tool]:
"""Returns the tools as a dictionary of names to tools."""
return self._tools
@overload
async def call_tool(
self,
name: str,
arguments: dict[str, Any],
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
*,
meta: dict[str, Any] | None = None,
) -> types.CallToolResult: ...
@overload
@deprecated("The 'args' parameter is deprecated. Use 'arguments' instead.")
async def call_tool(
self,
name: str,
*,
args: dict[str, Any],
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
meta: dict[str, Any] | None = None,
) -> types.CallToolResult: ...
async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
*,
meta: dict[str, Any] | None = None,
args: dict[str, Any] | None = None,
) -> types.CallToolResult:
"""Executes a tool given its name and arguments."""
session = self._tool_to_session[name]
session_tool_name = self.tools[name].name
return await session.call_tool(
session_tool_name,
arguments if args is None else args,
read_timeout_seconds=read_timeout_seconds,
progress_callback=progress_callback,
meta=meta,
)
async def disconnect_from_server(self, session: mcp.ClientSession) -> None:
"""Disconnects from a single MCP server."""
session_known_for_components = session in self._sessions
session_known_for_stack = session in self._session_exit_stacks
if not session_known_for_components and not session_known_for_stack:
raise McpError(
types.ErrorData(
code=types.INVALID_PARAMS,
message="Provided session is not managed or already disconnected.",
)
)
if session_known_for_components: # pragma: no cover
component_names = self._sessions.pop(session) # Pop from _sessions tracking
# Remove prompts associated with the session.
for name in component_names.prompts:
if name in self._prompts:
del self._prompts[name]
# Remove resources associated with the session.
for name in component_names.resources:
if name in self._resources:
del self._resources[name]
# Remove tools associated with the session.
for name in component_names.tools:
if name in self._tools:
del self._tools[name]
if name in self._tool_to_session:
del self._tool_to_session[name]
# Clean up the session's resources via its dedicated exit stack
if session_known_for_stack:
session_stack_to_close = self._session_exit_stacks.pop(session) # pragma: no cover
await session_stack_to_close.aclose() # pragma: no cover
async def connect_with_session(
self, server_info: types.Implementation, session: mcp.ClientSession
) -> mcp.ClientSession:
"""Connects to a single MCP server."""
await self._aggregate_components(server_info, session)
return session
async def connect_to_server(
self,
server_params: ServerParameters,
session_params: ClientSessionParameters | None = None,
) -> mcp.ClientSession:
"""Connects to a single MCP server."""
server_info, session = await self._establish_session(server_params, session_params or ClientSessionParameters())
return await self.connect_with_session(server_info, session)
async def _establish_session(
self,
server_params: ServerParameters,
session_params: ClientSessionParameters,
) -> tuple[types.Implementation, mcp.ClientSession]:
"""Establish a client session to an MCP server."""
session_stack = contextlib.AsyncExitStack()
try:
# Create read and write streams that facilitate io with the server.
if isinstance(server_params, StdioServerParameters):
client = mcp.stdio_client(server_params)
read, write = await session_stack.enter_async_context(client)
elif isinstance(server_params, SseServerParameters):
client = sse_client(
url=server_params.url,
headers=server_params.headers,
timeout=server_params.timeout,
sse_read_timeout=server_params.sse_read_timeout,
)
read, write = await session_stack.enter_async_context(client)
else:
httpx_client = create_mcp_http_client(
headers=server_params.headers,
timeout=httpx.Timeout(
server_params.timeout.total_seconds(),
read=server_params.sse_read_timeout.total_seconds(),
),
)
await session_stack.enter_async_context(httpx_client)
client = streamable_http_client(
url=server_params.url,
http_client=httpx_client,
terminate_on_close=server_params.terminate_on_close,
)
read, write, _ = await session_stack.enter_async_context(client)
session = await session_stack.enter_async_context(
mcp.ClientSession(
read,
write,
read_timeout_seconds=session_params.read_timeout_seconds,
sampling_callback=session_params.sampling_callback,
elicitation_callback=session_params.elicitation_callback,
list_roots_callback=session_params.list_roots_callback,
logging_callback=session_params.logging_callback,
message_handler=session_params.message_handler,
client_info=session_params.client_info,
)
)
result = await session.initialize()
# Session successfully initialized.
# Store its stack and register the stack with the main group stack.
self._session_exit_stacks[session] = session_stack
# session_stack itself becomes a resource managed by the
# main _exit_stack.
await self._exit_stack.enter_async_context(session_stack)
return result.serverInfo, session
except Exception: # pragma: no cover
# If anything during this setup fails, ensure the session-specific
# stack is closed.
await session_stack.aclose()
raise
async def _aggregate_components(self, server_info: types.Implementation, session: mcp.ClientSession) -> None:
"""Aggregates prompts, resources, and tools from a given session."""
# Create a reverse index so we can find all prompts, resources, and
# tools belonging to this session. Used for removing components from
# the session group via self.disconnect_from_server.
component_names = self._ComponentNames()
# Temporary components dicts. We do not want to modify the aggregate
# lists in case of an intermediate failure.
prompts_temp: dict[str, types.Prompt] = {}
resources_temp: dict[str, types.Resource] = {}
tools_temp: dict[str, types.Tool] = {}
tool_to_session_temp: dict[str, mcp.ClientSession] = {}
# Query the server for its prompts and aggregate to list.
try:
prompts = (await session.list_prompts()).prompts
for prompt in prompts:
name = self._component_name(prompt.name, server_info)
prompts_temp[name] = prompt
component_names.prompts.add(name)
except McpError as err: # pragma: no cover
logging.warning(f"Could not fetch prompts: {err}")
# Query the server for its resources and aggregate to list.
try:
resources = (await session.list_resources()).resources
for resource in resources:
name = self._component_name(resource.name, server_info)
resources_temp[name] = resource
component_names.resources.add(name)
except McpError as err: # pragma: no cover
logging.warning(f"Could not fetch resources: {err}")
# Query the server for its tools and aggregate to list.
try:
tools = (await session.list_tools()).tools
for tool in tools:
name = self._component_name(tool.name, server_info)
tools_temp[name] = tool
tool_to_session_temp[name] = session
component_names.tools.add(name)
except McpError as err: # pragma: no cover
logging.warning(f"Could not fetch tools: {err}")
# Clean up exit stack for session if we couldn't retrieve anything
# from the server.
if not any((prompts_temp, resources_temp, tools_temp)):
del self._session_exit_stacks[session] # pragma: no cover
# Check for duplicates.
matching_prompts = prompts_temp.keys() & self._prompts.keys()
if matching_prompts:
raise McpError( # pragma: no cover
types.ErrorData(
code=types.INVALID_PARAMS,
message=f"{matching_prompts} already exist in group prompts.",
)
)
matching_resources = resources_temp.keys() & self._resources.keys()
if matching_resources:
raise McpError( # pragma: no cover
types.ErrorData(
code=types.INVALID_PARAMS,
message=f"{matching_resources} already exist in group resources.",
)
)
matching_tools = tools_temp.keys() & self._tools.keys()
if matching_tools:
raise McpError(
types.ErrorData(
code=types.INVALID_PARAMS,
message=f"{matching_tools} already exist in group tools.",
)
)
# Aggregate components.
self._sessions[session] = component_names
self._prompts.update(prompts_temp)
self._resources.update(resources_temp)
self._tools.update(tools_temp)
self._tool_to_session.update(tool_to_session_temp)
def _component_name(self, name: str, server_info: types.Implementation) -> str:
if self._component_name_hook:
return self._component_name_hook(name, server_info)
return name
|
Initializes the MCP client.
Source code in src/mcp/client/session_group.py130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 | def __init__(
self,
exit_stack: contextlib.AsyncExitStack | None = None,
component_name_hook: _ComponentNameHook | None = None,
) -> None:
"""Initializes the MCP client."""
self._tools = {}
self._resources = {}
self._prompts = {}
self._sessions = {}
self._tool_to_session = {}
if exit_stack is None:
self._exit_stack = contextlib.AsyncExitStack()
self._owns_exit_stack = True
else:
self._exit_stack = exit_stack
self._owns_exit_stack = False
self._session_exit_stacks = {}
self._component_name_hook = component_name_hook
|
Closes session exit stacks and main exit stack upon completion.
Source code in src/mcp/client/session_group.py158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 | async def __aexit__(
self,
_exc_type: type[BaseException] | None,
_exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> bool | None: # pragma: no cover
"""Closes session exit stacks and main exit stack upon completion."""
# Only close the main exit stack if we created it
if self._owns_exit_stack:
await self._exit_stack.aclose()
# Concurrently close session stacks.
async with anyio.create_task_group() as tg:
for exit_stack in self._session_exit_stacks.values():
tg.start_soon(exit_stack.aclose)
|
Returns the prompts as a dictionary of names to prompts.
Returns the resources as a dictionary of names to resources.
Executes a tool given its name and arguments.
Source code in src/mcp/client/session_group.py218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237 | async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
*,
meta: dict[str, Any] | None = None,
args: dict[str, Any] | None = None,
) -> types.CallToolResult:
"""Executes a tool given its name and arguments."""
session = self._tool_to_session[name]
session_tool_name = self.tools[name].name
return await session.call_tool(
session_tool_name,
arguments if args is None else args,
read_timeout_seconds=read_timeout_seconds,
progress_callback=progress_callback,
meta=meta,
)
|
Disconnects from a single MCP server.
Source code in src/mcp/client/session_group.py239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274 | async def disconnect_from_server(self, session: mcp.ClientSession) -> None:
"""Disconnects from a single MCP server."""
session_known_for_components = session in self._sessions
session_known_for_stack = session in self._session_exit_stacks
if not session_known_for_components and not session_known_for_stack:
raise McpError(
types.ErrorData(
code=types.INVALID_PARAMS,
message="Provided session is not managed or already disconnected.",
)
)
if session_known_for_components: # pragma: no cover
component_names = self._sessions.pop(session) # Pop from _sessions tracking
# Remove prompts associated with the session.
for name in component_names.prompts:
if name in self._prompts:
del self._prompts[name]
# Remove resources associated with the session.
for name in component_names.resources:
if name in self._resources:
del self._resources[name]
# Remove tools associated with the session.
for name in component_names.tools:
if name in self._tools:
del self._tools[name]
if name in self._tool_to_session:
del self._tool_to_session[name]
# Clean up the session's resources via its dedicated exit stack
if session_known_for_stack:
session_stack_to_close = self._session_exit_stacks.pop(session) # pragma: no cover
await session_stack_to_close.aclose() # pragma: no cover
|
Connects to a single MCP server.
Source code in src/mcp/client/session_group.py276
277
278
279
280
281 | async def connect_with_session(
self, server_info: types.Implementation, session: mcp.ClientSession
) -> mcp.ClientSession:
"""Connects to a single MCP server."""
await self._aggregate_components(server_info, session)
return session
|
Connects to a single MCP server.
Source code in src/mcp/client/session_group.py283
284
285
286
287
288
289
290 | async def connect_to_server(
self,
server_params: ServerParameters,
session_params: ClientSessionParameters | None = None,
) -> mcp.ClientSession:
"""Connects to a single MCP server."""
server_info, session = await self._establish_session(server_params, session_params or ClientSessionParameters())
return await self.connect_with_session(server_info, session)
|
Bases: BaseModel
Source code in src/mcp/client/stdio/__init__.py 72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 | class StdioServerParameters(BaseModel):
command: str
"""The executable to run to start the server."""
args: list[str] = Field(default_factory=list)
"""Command line arguments to pass to the executable."""
env: dict[str, str] | None = None
"""
The environment to use when spawning the process.
If not specified, the result of get_default_environment() will be used.
"""
cwd: str | Path | None = None
"""The working directory to use when spawning the process."""
encoding: str = "utf-8"
"""
The text encoding used when sending/receiving messages to the server
defaults to utf-8
"""
encoding_error_handler: Literal["strict", "ignore", "replace"] = "strict"
"""
The text encoding error handler.
See https://docs.python.org/3/library/codecs.html#codec-base-classes for
explanations of possible values
"""
|
Command line arguments to pass to the executable.
The environment to use when spawning the process.
If not specified, the result of get_default_environment() will be used.
The working directory to use when spawning the process.
The text encoding used when sending/receiving messages to the server
defaults to utf-8
The text encoding error handler.
See https://docs.python.org/3/library/codecs.html#codec-base-classes for explanations of possible values
Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout.
Source code in src/mcp/client/stdio/__init__.py105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 | @asynccontextmanager
async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr):
"""
Client transport for stdio: this will connect to a server by spawning a
process and communicating with it over stdin/stdout.
"""
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception]
write_stream: MemoryObjectSendStream[SessionMessage]
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
try:
command = _get_executable_command(server.command)
# Open process with stderr piped for capture
process = await _create_platform_compatible_process(
command=command,
args=server.args,
env=({**get_default_environment(), **server.env} if server.env is not None else get_default_environment()),
errlog=errlog,
cwd=server.cwd,
)
except OSError:
# Clean up streams if process creation fails
await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
await write_stream_reader.aclose()
raise
async def stdout_reader():
assert process.stdout, "Opened process is missing stdout"
try:
async with read_stream_writer:
buffer = ""
async for chunk in TextReceiveStream(
process.stdout,
encoding=server.encoding,
errors=server.encoding_error_handler,
):
lines = (buffer + chunk).split("\n")
buffer = lines.pop()
for line in lines:
try:
message = types.JSONRPCMessage.model_validate_json(line)
except Exception as exc: # pragma: no cover
logger.exception("Failed to parse JSONRPC message from server")
await read_stream_writer.send(exc)
continue
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()
async def stdin_writer():
assert process.stdin, "Opened process is missing stdin"
try:
async with write_stream_reader:
async for session_message in write_stream_reader:
json = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
await process.stdin.send(
(json + "\n").encode(
encoding=server.encoding,
errors=server.encoding_error_handler,
)
)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()
async with (
anyio.create_task_group() as tg,
process,
):
tg.start_soon(stdout_reader)
tg.start_soon(stdin_writer)
try:
yield read_stream, write_stream
finally:
# MCP spec: stdio shutdown sequence
# 1. Close input stream to server
# 2. Wait for server to exit, or send SIGTERM if it doesn't exit in time
# 3. Send SIGKILL if still not exited
if process.stdin: # pragma: no branch
try:
await process.stdin.aclose()
except Exception: # pragma: no cover
# stdin might already be closed, which is fine
pass
try:
# Give the process time to exit gracefully after stdin closes
with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT):
await process.wait()
except TimeoutError:
# Process didn't exit from stdin closure, use platform-specific termination
# which handles SIGTERM -> SIGKILL escalation
await _terminate_process_tree(process)
except ProcessLookupError: # pragma: no cover
# Process already exited, which is fine
pass
await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
await write_stream_reader.aclose()
|
Bases: BaseSession[ServerRequest, ServerNotification, ServerResult, ClientRequest, ClientNotification]
Source code in src/mcp/server/session.py 75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689 | class ServerSession(
BaseSession[
types.ServerRequest,
types.ServerNotification,
types.ServerResult,
types.ClientRequest,
types.ClientNotification,
]
):
_initialized: InitializationState = InitializationState.NotInitialized
_client_params: types.InitializeRequestParams | None = None
_experimental_features: ExperimentalServerSessionFeatures | None = None
def __init__(
self,
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception],
write_stream: MemoryObjectSendStream[SessionMessage],
init_options: InitializationOptions,
stateless: bool = False,
) -> None:
super().__init__(read_stream, write_stream, types.ClientRequest, types.ClientNotification)
self._initialization_state = (
InitializationState.Initialized if stateless else InitializationState.NotInitialized
)
self._init_options = init_options
self._incoming_message_stream_writer, self._incoming_message_stream_reader = anyio.create_memory_object_stream[
ServerRequestResponder
](0)
self._exit_stack.push_async_callback(lambda: self._incoming_message_stream_reader.aclose())
@property
def client_params(self) -> types.InitializeRequestParams | None:
return self._client_params # pragma: no cover
@property
def experimental(self) -> ExperimentalServerSessionFeatures:
"""Experimental APIs for server→client task operations.
WARNING: These APIs are experimental and may change without notice.
"""
if self._experimental_features is None:
self._experimental_features = ExperimentalServerSessionFeatures(self)
return self._experimental_features
def check_client_capability(self, capability: types.ClientCapabilities) -> bool: # pragma: no cover
"""Check if the client supports a specific capability."""
if self._client_params is None:
return False
client_caps = self._client_params.capabilities
if capability.roots is not None:
if client_caps.roots is None:
return False
if capability.roots.listChanged and not client_caps.roots.listChanged:
return False
if capability.sampling is not None:
if client_caps.sampling is None:
return False
if capability.sampling.context is not None and client_caps.sampling.context is None:
return False
if capability.sampling.tools is not None and client_caps.sampling.tools is None:
return False
if capability.elicitation is not None and client_caps.elicitation is None:
return False
if capability.experimental is not None:
if client_caps.experimental is None:
return False
for exp_key, exp_value in capability.experimental.items():
if exp_key not in client_caps.experimental or client_caps.experimental[exp_key] != exp_value:
return False
if capability.tasks is not None:
if client_caps.tasks is None:
return False
if not check_tasks_capability(capability.tasks, client_caps.tasks):
return False
return True
async def _receive_loop(self) -> None:
async with self._incoming_message_stream_writer:
await super()._receive_loop()
async def _received_request(self, responder: RequestResponder[types.ClientRequest, types.ServerResult]):
match responder.request.root:
case types.InitializeRequest(params=params):
requested_version = params.protocolVersion
self._initialization_state = InitializationState.Initializing
self._client_params = params
with responder:
await responder.respond(
types.ServerResult(
types.InitializeResult(
protocolVersion=requested_version
if requested_version in SUPPORTED_PROTOCOL_VERSIONS
else types.LATEST_PROTOCOL_VERSION,
capabilities=self._init_options.capabilities,
serverInfo=types.Implementation(
name=self._init_options.server_name,
version=self._init_options.server_version,
websiteUrl=self._init_options.website_url,
icons=self._init_options.icons,
),
instructions=self._init_options.instructions,
)
)
)
self._initialization_state = InitializationState.Initialized
case types.PingRequest():
# Ping requests are allowed at any time
pass
case _:
if self._initialization_state != InitializationState.Initialized:
raise RuntimeError("Received request before initialization was complete")
async def _received_notification(self, notification: types.ClientNotification) -> None:
# Need this to avoid ASYNC910
await anyio.lowlevel.checkpoint()
match notification.root:
case types.InitializedNotification():
self._initialization_state = InitializationState.Initialized
case _:
if self._initialization_state != InitializationState.Initialized: # pragma: no cover
raise RuntimeError("Received notification before initialization was complete")
async def send_log_message(
self,
level: types.LoggingLevel,
data: Any,
logger: str | None = None,
related_request_id: types.RequestId | None = None,
) -> None:
"""Send a log message notification."""
await self.send_notification(
types.ServerNotification(
types.LoggingMessageNotification(
params=types.LoggingMessageNotificationParams(
level=level,
data=data,
logger=logger,
),
)
),
related_request_id,
)
async def send_resource_updated(self, uri: AnyUrl) -> None: # pragma: no cover
"""Send a resource updated notification."""
await self.send_notification(
types.ServerNotification(
types.ResourceUpdatedNotification(
params=types.ResourceUpdatedNotificationParams(uri=uri),
)
)
)
@overload
async def create_message(
self,
messages: list[types.SamplingMessage],
*,
max_tokens: int,
system_prompt: str | None = None,
include_context: types.IncludeContext | None = None,
temperature: float | None = None,
stop_sequences: list[str] | None = None,
metadata: dict[str, Any] | None = None,
model_preferences: types.ModelPreferences | None = None,
tools: None = None,
tool_choice: types.ToolChoice | None = None,
related_request_id: types.RequestId | None = None,
) -> types.CreateMessageResult:
"""Overload: Without tools, returns single content."""
...
@overload
async def create_message(
self,
messages: list[types.SamplingMessage],
*,
max_tokens: int,
system_prompt: str | None = None,
include_context: types.IncludeContext | None = None,
temperature: float | None = None,
stop_sequences: list[str] | None = None,
metadata: dict[str, Any] | None = None,
model_preferences: types.ModelPreferences | None = None,
tools: list[types.Tool],
tool_choice: types.ToolChoice | None = None,
related_request_id: types.RequestId | None = None,
) -> types.CreateMessageResultWithTools:
"""Overload: With tools, returns array-capable content."""
...
async def create_message(
self,
messages: list[types.SamplingMessage],
*,
max_tokens: int,
system_prompt: str | None = None,
include_context: types.IncludeContext | None = None,
temperature: float | None = None,
stop_sequences: list[str] | None = None,
metadata: dict[str, Any] | None = None,
model_preferences: types.ModelPreferences | None = None,
tools: list[types.Tool] | None = None,
tool_choice: types.ToolChoice | None = None,
related_request_id: types.RequestId | None = None,
) -> types.CreateMessageResult | types.CreateMessageResultWithTools:
"""Send a sampling/create_message request.
Args:
messages: The conversation messages to send.
max_tokens: Maximum number of tokens to generate.
system_prompt: Optional system prompt.
include_context: Optional context inclusion setting.
Should only be set to "thisServer" or "allServers"
if the client has sampling.context capability.
temperature: Optional sampling temperature.
stop_sequences: Optional stop sequences.
metadata: Optional metadata to pass through to the LLM provider.
model_preferences: Optional model selection preferences.
tools: Optional list of tools the LLM can use during sampling.
Requires client to have sampling.tools capability.
tool_choice: Optional control over tool usage behavior.
Requires client to have sampling.tools capability.
related_request_id: Optional ID of a related request.
Returns:
The sampling result from the client.
Raises:
McpError: If tools are provided but client doesn't support them.
ValueError: If tool_use or tool_result message structure is invalid.
"""
client_caps = self._client_params.capabilities if self._client_params else None
validate_sampling_tools(client_caps, tools, tool_choice)
validate_tool_use_result_messages(messages)
request = types.ServerRequest(
types.CreateMessageRequest(
params=types.CreateMessageRequestParams(
messages=messages,
systemPrompt=system_prompt,
includeContext=include_context,
temperature=temperature,
maxTokens=max_tokens,
stopSequences=stop_sequences,
metadata=metadata,
modelPreferences=model_preferences,
tools=tools,
toolChoice=tool_choice,
),
)
)
metadata_obj = ServerMessageMetadata(related_request_id=related_request_id)
# Use different result types based on whether tools are provided
if tools is not None:
return await self.send_request(
request=request,
result_type=types.CreateMessageResultWithTools,
metadata=metadata_obj,
)
return await self.send_request(
request=request,
result_type=types.CreateMessageResult,
metadata=metadata_obj,
)
async def list_roots(self) -> types.ListRootsResult:
"""Send a roots/list request."""
return await self.send_request(
types.ServerRequest(types.ListRootsRequest()),
types.ListRootsResult,
)
async def elicit(
self,
message: str,
requestedSchema: types.ElicitRequestedSchema,
related_request_id: types.RequestId | None = None,
) -> types.ElicitResult:
"""Send a form mode elicitation/create request.
Args:
message: The message to present to the user
requestedSchema: Schema defining the expected response structure
related_request_id: Optional ID of the request that triggered this elicitation
Returns:
The client's response
Note:
This method is deprecated in favor of elicit_form(). It remains for
backward compatibility but new code should use elicit_form().
"""
return await self.elicit_form(message, requestedSchema, related_request_id)
async def elicit_form(
self,
message: str,
requestedSchema: types.ElicitRequestedSchema,
related_request_id: types.RequestId | None = None,
) -> types.ElicitResult:
"""Send a form mode elicitation/create request.
Args:
message: The message to present to the user
requestedSchema: Schema defining the expected response structure
related_request_id: Optional ID of the request that triggered this elicitation
Returns:
The client's response with form data
"""
return await self.send_request(
types.ServerRequest(
types.ElicitRequest(
params=types.ElicitRequestFormParams(
message=message,
requestedSchema=requestedSchema,
),
)
),
types.ElicitResult,
metadata=ServerMessageMetadata(related_request_id=related_request_id),
)
async def elicit_url(
self,
message: str,
url: str,
elicitation_id: str,
related_request_id: types.RequestId | None = None,
) -> types.ElicitResult:
"""Send a URL mode elicitation/create request.
This directs the user to an external URL for out-of-band interactions
like OAuth flows, credential collection, or payment processing.
Args:
message: Human-readable explanation of why the interaction is needed
url: The URL the user should navigate to
elicitation_id: Unique identifier for tracking this elicitation
related_request_id: Optional ID of the request that triggered this elicitation
Returns:
The client's response indicating acceptance, decline, or cancellation
"""
return await self.send_request(
types.ServerRequest(
types.ElicitRequest(
params=types.ElicitRequestURLParams(
message=message,
url=url,
elicitationId=elicitation_id,
),
)
),
types.ElicitResult,
metadata=ServerMessageMetadata(related_request_id=related_request_id),
)
async def send_ping(self) -> types.EmptyResult: # pragma: no cover
"""Send a ping request."""
return await self.send_request(
types.ServerRequest(types.PingRequest()),
types.EmptyResult,
)
async def send_progress_notification(
self,
progress_token: str | int,
progress: float,
total: float | None = None,
message: str | None = None,
related_request_id: str | None = None,
) -> None:
"""Send a progress notification."""
await self.send_notification(
types.ServerNotification(
types.ProgressNotification(
params=types.ProgressNotificationParams(
progressToken=progress_token,
progress=progress,
total=total,
message=message,
),
)
),
related_request_id,
)
async def send_resource_list_changed(self) -> None: # pragma: no cover
"""Send a resource list changed notification."""
await self.send_notification(types.ServerNotification(types.ResourceListChangedNotification()))
async def send_tool_list_changed(self) -> None: # pragma: no cover
"""Send a tool list changed notification."""
await self.send_notification(types.ServerNotification(types.ToolListChangedNotification()))
async def send_prompt_list_changed(self) -> None: # pragma: no cover
"""Send a prompt list changed notification."""
await self.send_notification(types.ServerNotification(types.PromptListChangedNotification()))
async def send_elicit_complete(
self,
elicitation_id: str,
related_request_id: types.RequestId | None = None,
) -> None:
"""Send an elicitation completion notification.
This should be sent when a URL mode elicitation has been completed
out-of-band to inform the client that it may retry any requests
that were waiting for this elicitation.
Args:
elicitation_id: The unique identifier of the completed elicitation
related_request_id: Optional ID of the request that triggered this
"""
await self.send_notification(
types.ServerNotification(
types.ElicitCompleteNotification(
params=types.ElicitCompleteNotificationParams(elicitationId=elicitation_id)
)
),
related_request_id,
)
def _build_elicit_form_request(
self,
message: str,
requestedSchema: types.ElicitRequestedSchema,
related_task_id: str | None = None,
task: types.TaskMetadata | None = None,
) -> types.JSONRPCRequest:
"""Build a form mode elicitation request without sending it.
Args:
message: The message to present to the user
requestedSchema: Schema defining the expected response structure
related_task_id: If provided, adds io.modelcontextprotocol/related-task metadata
task: If provided, makes this a task-augmented request
Returns:
A JSONRPCRequest ready to be sent or queued
"""
params = types.ElicitRequestFormParams(
message=message,
requestedSchema=requestedSchema,
task=task,
)
params_data = params.model_dump(by_alias=True, mode="json", exclude_none=True)
# Add related-task metadata if associated with a parent task
if related_task_id is not None:
# Defensive: model_dump() never includes _meta, but guard against future changes
if "_meta" not in params_data: # pragma: no cover
params_data["_meta"] = {}
params_data["_meta"][RELATED_TASK_METADATA_KEY] = types.RelatedTaskMetadata(
taskId=related_task_id
).model_dump(by_alias=True)
request_id = f"task-{related_task_id}-{id(params)}" if related_task_id else self._request_id
if related_task_id is None:
self._request_id += 1
return types.JSONRPCRequest(
jsonrpc="2.0",
id=request_id,
method="elicitation/create",
params=params_data,
)
def _build_elicit_url_request(
self,
message: str,
url: str,
elicitation_id: str,
related_task_id: str | None = None,
) -> types.JSONRPCRequest:
"""Build a URL mode elicitation request without sending it.
Args:
message: Human-readable explanation of why the interaction is needed
url: The URL the user should navigate to
elicitation_id: Unique identifier for tracking this elicitation
related_task_id: If provided, adds io.modelcontextprotocol/related-task metadata
Returns:
A JSONRPCRequest ready to be sent or queued
"""
params = types.ElicitRequestURLParams(
message=message,
url=url,
elicitationId=elicitation_id,
)
params_data = params.model_dump(by_alias=True, mode="json", exclude_none=True)
# Add related-task metadata if associated with a parent task
if related_task_id is not None:
# Defensive: model_dump() never includes _meta, but guard against future changes
if "_meta" not in params_data: # pragma: no cover
params_data["_meta"] = {}
params_data["_meta"][RELATED_TASK_METADATA_KEY] = types.RelatedTaskMetadata(
taskId=related_task_id
).model_dump(by_alias=True)
request_id = f"task-{related_task_id}-{id(params)}" if related_task_id else self._request_id
if related_task_id is None:
self._request_id += 1
return types.JSONRPCRequest(
jsonrpc="2.0",
id=request_id,
method="elicitation/create",
params=params_data,
)
def _build_create_message_request(
self,
messages: list[types.SamplingMessage],
*,
max_tokens: int,
system_prompt: str | None = None,
include_context: types.IncludeContext | None = None,
temperature: float | None = None,
stop_sequences: list[str] | None = None,
metadata: dict[str, Any] | None = None,
model_preferences: types.ModelPreferences | None = None,
tools: list[types.Tool] | None = None,
tool_choice: types.ToolChoice | None = None,
related_task_id: str | None = None,
task: types.TaskMetadata | None = None,
) -> types.JSONRPCRequest:
"""Build a sampling/createMessage request without sending it.
Args:
messages: The conversation messages to send
max_tokens: Maximum number of tokens to generate
system_prompt: Optional system prompt
include_context: Optional context inclusion setting
temperature: Optional sampling temperature
stop_sequences: Optional stop sequences
metadata: Optional metadata to pass through to the LLM provider
model_preferences: Optional model selection preferences
tools: Optional list of tools the LLM can use during sampling
tool_choice: Optional control over tool usage behavior
related_task_id: If provided, adds io.modelcontextprotocol/related-task metadata
task: If provided, makes this a task-augmented request
Returns:
A JSONRPCRequest ready to be sent or queued
"""
params = types.CreateMessageRequestParams(
messages=messages,
systemPrompt=system_prompt,
includeContext=include_context,
temperature=temperature,
maxTokens=max_tokens,
stopSequences=stop_sequences,
metadata=metadata,
modelPreferences=model_preferences,
tools=tools,
toolChoice=tool_choice,
task=task,
)
params_data = params.model_dump(by_alias=True, mode="json", exclude_none=True)
# Add related-task metadata if associated with a parent task
if related_task_id is not None:
# Defensive: model_dump() never includes _meta, but guard against future changes
if "_meta" not in params_data: # pragma: no cover
params_data["_meta"] = {}
params_data["_meta"][RELATED_TASK_METADATA_KEY] = types.RelatedTaskMetadata(
taskId=related_task_id
).model_dump(by_alias=True)
request_id = f"task-{related_task_id}-{id(params)}" if related_task_id else self._request_id
if related_task_id is None:
self._request_id += 1
return types.JSONRPCRequest(
jsonrpc="2.0",
id=request_id,
method="sampling/createMessage",
params=params_data,
)
async def send_message(self, message: SessionMessage) -> None:
"""Send a raw session message.
This is primarily used by TaskResultHandler to deliver queued messages
(elicitation/sampling requests) to the client during task execution.
WARNING: This is a low-level experimental method that may change without
notice. Prefer using higher-level methods like send_notification() or
send_request() for normal operations.
Args:
message: The session message to send
"""
await self._write_stream.send(message)
async def _handle_incoming(self, req: ServerRequestResponder) -> None:
await self._incoming_message_stream_writer.send(req)
@property
def incoming_messages(self) -> MemoryObjectReceiveStream[ServerRequestResponder]:
return self._incoming_message_stream_reader
|
Experimental APIs for server→client task operations.
WARNING: These APIs are experimental and may change without notice.
Check if the client supports a specific capability.
Source code in src/mcp/server/session.py120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 | def check_client_capability(self, capability: types.ClientCapabilities) -> bool: # pragma: no cover
"""Check if the client supports a specific capability."""
if self._client_params is None:
return False
client_caps = self._client_params.capabilities
if capability.roots is not None:
if client_caps.roots is None:
return False
if capability.roots.listChanged and not client_caps.roots.listChanged:
return False
if capability.sampling is not None:
if client_caps.sampling is None:
return False
if capability.sampling.context is not None and client_caps.sampling.context is None:
return False
if capability.sampling.tools is not None and client_caps.sampling.tools is None:
return False
if capability.elicitation is not None and client_caps.elicitation is None:
return False
if capability.experimental is not None:
if client_caps.experimental is None:
return False
for exp_key, exp_value in capability.experimental.items():
if exp_key not in client_caps.experimental or client_caps.experimental[exp_key] != exp_value:
return False
if capability.tasks is not None:
if client_caps.tasks is None:
return False
if not check_tasks_capability(capability.tasks, client_caps.tasks):
return False
return True
|
Send a log message notification.
Source code in src/mcp/server/session.py205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224 | async def send_log_message(
self,
level: types.LoggingLevel,
data: Any,
logger: str | None = None,
related_request_id: types.RequestId | None = None,
) -> None:
"""Send a log message notification."""
await self.send_notification(
types.ServerNotification(
types.LoggingMessageNotification(
params=types.LoggingMessageNotificationParams(
level=level,
data=data,
logger=logger,
),
)
),
related_request_id,
)
|
Send a resource updated notification.
Source code in src/mcp/server/session.py226
227
228
229
230
231
232
233
234 | async def send_resource_updated(self, uri: AnyUrl) -> None: # pragma: no cover
"""Send a resource updated notification."""
await self.send_notification(
types.ServerNotification(
types.ResourceUpdatedNotification(
params=types.ResourceUpdatedNotificationParams(uri=uri),
)
)
)
|
Send a sampling/create_message request.
Parameters:
| messages | list[SamplingMessage] |
The conversation messages to send. |
required |
| max_tokens | int |
Maximum number of tokens to generate. |
required |
| system_prompt | str | None |
Optional system prompt. |
None |
| include_context | IncludeContext | None |
Optional context inclusion setting. Should only be set to "thisServer" or "allServers" if the client has sampling.context capability. |
None |
| temperature | float | None |
Optional sampling temperature. |
None |
| stop_sequences | list[str] | None |
Optional stop sequences. |
None |
| metadata | dict[str, Any] | None |
Optional metadata to pass through to the LLM provider. |
None |
| model_preferences | ModelPreferences | None |
Optional model selection preferences. |
None |
| tools | list[Tool] | None |
Optional list of tools the LLM can use during sampling. Requires client to have sampling.tools capability. |
None |
| tool_choice | ToolChoice | None |
Optional control over tool usage behavior. Requires client to have sampling.tools capability. |
None |
| related_request_id | RequestId | None |
Optional ID of a related request. |
None |
Returns:
| CreateMessageResult | CreateMessageResultWithTools |
The sampling result from the client. |
Raises:
| McpError |
If tools are provided but client doesn't support them. |
| ValueError |
If tool_use or tool_result message structure is invalid. |
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348 | async def create_message(
self,
messages: list[types.SamplingMessage],
*,
max_tokens: int,
system_prompt: str | None = None,
include_context: types.IncludeContext | None = None,
temperature: float | None = None,
stop_sequences: list[str] | None = None,
metadata: dict[str, Any] | None = None,
model_preferences: types.ModelPreferences | None = None,
tools: list[types.Tool] | None = None,
tool_choice: types.ToolChoice | None = None,
related_request_id: types.RequestId | None = None,
) -> types.CreateMessageResult | types.CreateMessageResultWithTools:
"""Send a sampling/create_message request.
Args:
messages: The conversation messages to send.
max_tokens: Maximum number of tokens to generate.
system_prompt: Optional system prompt.
include_context: Optional context inclusion setting.
Should only be set to "thisServer" or "allServers"
if the client has sampling.context capability.
temperature: Optional sampling temperature.
stop_sequences: Optional stop sequences.
metadata: Optional metadata to pass through to the LLM provider.
model_preferences: Optional model selection preferences.
tools: Optional list of tools the LLM can use during sampling.
Requires client to have sampling.tools capability.
tool_choice: Optional control over tool usage behavior.
Requires client to have sampling.tools capability.
related_request_id: Optional ID of a related request.
Returns:
The sampling result from the client.
Raises:
McpError: If tools are provided but client doesn't support them.
ValueError: If tool_use or tool_result message structure is invalid.
"""
client_caps = self._client_params.capabilities if self._client_params else None
validate_sampling_tools(client_caps, tools, tool_choice)
validate_tool_use_result_messages(messages)
request = types.ServerRequest(
types.CreateMessageRequest(
params=types.CreateMessageRequestParams(
messages=messages,
systemPrompt=system_prompt,
includeContext=include_context,
temperature=temperature,
maxTokens=max_tokens,
stopSequences=stop_sequences,
metadata=metadata,
modelPreferences=model_preferences,
tools=tools,
toolChoice=tool_choice,
),
)
)
metadata_obj = ServerMessageMetadata(related_request_id=related_request_id)
# Use different result types based on whether tools are provided
if tools is not None:
return await self.send_request(
request=request,
result_type=types.CreateMessageResultWithTools,
metadata=metadata_obj,
)
return await self.send_request(
request=request,
result_type=types.CreateMessageResult,
metadata=metadata_obj,
)
|
Send a roots/list request.
Source code in src/mcp/server/session.py350
351
352
353
354
355 | async def list_roots(self) -> types.ListRootsResult:
"""Send a roots/list request."""
return await self.send_request(
types.ServerRequest(types.ListRootsRequest()),
types.ListRootsResult,
)
|
Send a form mode elicitation/create request.
Parameters:
| message | str |
The message to present to the user |
required |
| requestedSchema | ElicitRequestedSchema |
Schema defining the expected response structure |
required |
| related_request_id | RequestId | None |
Optional ID of the request that triggered this elicitation |
None |
Returns:
| ElicitResult |
The client's response |
This method is deprecated in favor of elicit_form(). It remains for backward compatibility but new code should use elicit_form().
Source code in src/mcp/server/session.py357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377 | async def elicit(
self,
message: str,
requestedSchema: types.ElicitRequestedSchema,
related_request_id: types.RequestId | None = None,
) -> types.ElicitResult:
"""Send a form mode elicitation/create request.
Args:
message: The message to present to the user
requestedSchema: Schema defining the expected response structure
related_request_id: Optional ID of the request that triggered this elicitation
Returns:
The client's response
Note:
This method is deprecated in favor of elicit_form(). It remains for
backward compatibility but new code should use elicit_form().
"""
return await self.elicit_form(message, requestedSchema, related_request_id)
|
Send a form mode elicitation/create request.
Parameters:
| message | str |
The message to present to the user |
required |
| requestedSchema | ElicitRequestedSchema |
Schema defining the expected response structure |
required |
| related_request_id | RequestId | None |
Optional ID of the request that triggered this elicitation |
None |
Returns:
| ElicitResult |
The client's response with form data |
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406 | async def elicit_form(
self,
message: str,
requestedSchema: types.ElicitRequestedSchema,
related_request_id: types.RequestId | None = None,
) -> types.ElicitResult:
"""Send a form mode elicitation/create request.
Args:
message: The message to present to the user
requestedSchema: Schema defining the expected response structure
related_request_id: Optional ID of the request that triggered this elicitation
Returns:
The client's response with form data
"""
return await self.send_request(
types.ServerRequest(
types.ElicitRequest(
params=types.ElicitRequestFormParams(
message=message,
requestedSchema=requestedSchema,
),
)
),
types.ElicitResult,
metadata=ServerMessageMetadata(related_request_id=related_request_id),
)
|
Send a URL mode elicitation/create request.
This directs the user to an external URL for out-of-band interactions like OAuth flows, credential collection, or payment processing.
Parameters:
| message | str |
Human-readable explanation of why the interaction is needed |
required |
| url | str |
The URL the user should navigate to |
required |
| elicitation_id | str |
Unique identifier for tracking this elicitation |
required |
| related_request_id | RequestId | None |
Optional ID of the request that triggered this elicitation |
None |
Returns:
| ElicitResult |
The client's response indicating acceptance, decline, or cancellation |
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441 | async def elicit_url(
self,
message: str,
url: str,
elicitation_id: str,
related_request_id: types.RequestId | None = None,
) -> types.ElicitResult:
"""Send a URL mode elicitation/create request.
This directs the user to an external URL for out-of-band interactions
like OAuth flows, credential collection, or payment processing.
Args:
message: Human-readable explanation of why the interaction is needed
url: The URL the user should navigate to
elicitation_id: Unique identifier for tracking this elicitation
related_request_id: Optional ID of the request that triggered this elicitation
Returns:
The client's response indicating acceptance, decline, or cancellation
"""
return await self.send_request(
types.ServerRequest(
types.ElicitRequest(
params=types.ElicitRequestURLParams(
message=message,
url=url,
elicitationId=elicitation_id,
),
)
),
types.ElicitResult,
metadata=ServerMessageMetadata(related_request_id=related_request_id),
)
|
Send a ping request.
Source code in src/mcp/server/session.py443
444
445
446
447
448 | async def send_ping(self) -> types.EmptyResult: # pragma: no cover
"""Send a ping request."""
return await self.send_request(
types.ServerRequest(types.PingRequest()),
types.EmptyResult,
)
|
Send a progress notification.
Source code in src/mcp/server/session.py450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471 | async def send_progress_notification(
self,
progress_token: str | int,
progress: float,
total: float | None = None,
message: str | None = None,
related_request_id: str | None = None,
) -> None:
"""Send a progress notification."""
await self.send_notification(
types.ServerNotification(
types.ProgressNotification(
params=types.ProgressNotificationParams(
progressToken=progress_token,
progress=progress,
total=total,
message=message,
),
)
),
related_request_id,
)
|
Send a resource list changed notification.
Source code in src/mcp/server/session.py473
474
475 | async def send_resource_list_changed(self) -> None: # pragma: no cover
"""Send a resource list changed notification."""
await self.send_notification(types.ServerNotification(types.ResourceListChangedNotification()))
|
Send a tool list changed notification.
Source code in src/mcp/server/session.py477
478
479 | async def send_tool_list_changed(self) -> None: # pragma: no cover
"""Send a tool list changed notification."""
await self.send_notification(types.ServerNotification(types.ToolListChangedNotification()))
|
Send a prompt list changed notification.
Source code in src/mcp/server/session.py481
482
483 | async def send_prompt_list_changed(self) -> None: # pragma: no cover
"""Send a prompt list changed notification."""
await self.send_notification(types.ServerNotification(types.PromptListChangedNotification()))
|
Send an elicitation completion notification.
This should be sent when a URL mode elicitation has been completed out-of-band to inform the client that it may retry any requests that were waiting for this elicitation.
Parameters:
| elicitation_id | str |
The unique identifier of the completed elicitation |
required |
| related_request_id | RequestId | None |
Optional ID of the request that triggered this |
None |
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 | async def send_elicit_complete(
self,
elicitation_id: str,
related_request_id: types.RequestId | None = None,
) -> None:
"""Send an elicitation completion notification.
This should be sent when a URL mode elicitation has been completed
out-of-band to inform the client that it may retry any requests
that were waiting for this elicitation.
Args:
elicitation_id: The unique identifier of the completed elicitation
related_request_id: Optional ID of the request that triggered this
"""
await self.send_notification(
types.ServerNotification(
types.ElicitCompleteNotification(
params=types.ElicitCompleteNotificationParams(elicitationId=elicitation_id)
)
),
related_request_id,
)
|
Send a raw session message.
This is primarily used by TaskResultHandler to deliver queued messages (elicitation/sampling requests) to the client during task execution.
WARNING: This is a low-level experimental method that may change without notice. Prefer using higher-level methods like send_notification() or send_request() for normal operations.
Parameters:
| message | SessionMessage |
The session message to send |
required |
669
670
671
672
673
674
675
676
677
678
679
680
681
682 | async def send_message(self, message: SessionMessage) -> None:
"""Send a raw session message.
This is primarily used by TaskResultHandler to deliver queued messages
(elicitation/sampling requests) to the client during task execution.
WARNING: This is a low-level experimental method that may change without
notice. Prefer using higher-level methods like send_notification() or
send_request() for normal operations.
Args:
message: The session message to send
"""
await self._write_stream.send(message)
|
Server transport for stdio: this communicates with an MCP client by reading from the current process' stdin and writing to stdout.
Source code in src/mcp/server/stdio.py33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 | @asynccontextmanager
async def stdio_server(
stdin: anyio.AsyncFile[str] | None = None,
stdout: anyio.AsyncFile[str] | None = None,
):
"""
Server transport for stdio: this communicates with an MCP client by reading
from the current process' stdin and writing to stdout.
"""
# Purposely not using context managers for these, as we don't want to close
# standard process handles. Encoding of stdin/stdout as text streams on
# python is platform-dependent (Windows is particularly problematic), so we
# re-wrap the underlying binary stream to ensure UTF-8.
if not stdin:
stdin = anyio.wrap_file(TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace"))
if not stdout:
stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8"))
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception]
write_stream: MemoryObjectSendStream[SessionMessage]
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def stdin_reader():
try:
async with read_stream_writer:
async for line in stdin:
try:
message = types.JSONRPCMessage.model_validate_json(line)
except Exception as exc:
await read_stream_writer.send(exc)
continue
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()
async def stdout_writer():
try:
async with write_stream_reader:
async for session_message in write_stream_reader:
json = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
await stdout.write(json + "\n")
await stdout.flush()
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()
async with anyio.create_task_group() as tg:
tg.start_soon(stdin_reader)
tg.start_soon(stdout_writer)
yield read_stream, write_stream
|
Bases: Exception
Exception type raised when an error arrives over an MCP connection.
Source code in src/mcp/shared/exceptions.py 8
9
10
11
12
13
14
15
16
17
18 | class McpError(Exception):
"""
Exception type raised when an error arrives over an MCP connection.
"""
error: ErrorData
def __init__(self, error: ErrorData):
"""Initialize McpError."""
super().__init__(error.message)
self.error = error
|
Initialize McpError.
Source code in src/mcp/shared/exceptions.py15
16
17
18 | def __init__(self, error: ErrorData):
"""Initialize McpError."""
super().__init__(error.message)
self.error = error
|
Bases: McpError
Specialized error for when a tool requires URL mode elicitation(s) before proceeding.
Servers can raise this error from tool handlers to indicate that the client must complete one or more URL elicitations before the request can be processed.
Exampleraise UrlElicitationRequiredError([ ElicitRequestURLParams( mode="url", message="Authorization required for your files", url="https://example.com/oauth/authorize", elicitationId="auth-001" ) ])
Source code in src/mcp/shared/exceptions.py21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 | class UrlElicitationRequiredError(McpError):
"""
Specialized error for when a tool requires URL mode elicitation(s) before proceeding.
Servers can raise this error from tool handlers to indicate that the client
must complete one or more URL elicitations before the request can be processed.
Example:
raise UrlElicitationRequiredError([
ElicitRequestURLParams(
mode="url",
message="Authorization required for your files",
url="https://example.com/oauth/authorize",
elicitationId="auth-001"
)
])
"""
def __init__(
self,
elicitations: list[ElicitRequestURLParams],
message: str | None = None,
):
"""Initialize UrlElicitationRequiredError."""
if message is None:
message = f"URL elicitation{'s' if len(elicitations) > 1 else ''} required"
self._elicitations = elicitations
error = ErrorData(
code=URL_ELICITATION_REQUIRED,
message=message,
data={"elicitations": [e.model_dump(by_alias=True, exclude_none=True) for e in elicitations]},
)
super().__init__(error)
@property
def elicitations(self) -> list[ElicitRequestURLParams]:
"""The list of URL elicitations required before the request can proceed."""
return self._elicitations
@classmethod
def from_error(cls, error: ErrorData) -> UrlElicitationRequiredError:
"""Reconstruct from an ErrorData received over the wire."""
if error.code != URL_ELICITATION_REQUIRED:
raise ValueError(f"Expected error code {URL_ELICITATION_REQUIRED}, got {error.code}")
data = cast(dict[str, Any], error.data or {})
raw_elicitations = cast(list[dict[str, Any]], data.get("elicitations", []))
elicitations = [ElicitRequestURLParams.model_validate(e) for e in raw_elicitations]
return cls(elicitations, error.message)
|
Initialize UrlElicitationRequiredError.
Source code in src/mcp/shared/exceptions.py39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 | def __init__(
self,
elicitations: list[ElicitRequestURLParams],
message: str | None = None,
):
"""Initialize UrlElicitationRequiredError."""
if message is None:
message = f"URL elicitation{'s' if len(elicitations) > 1 else ''} required"
self._elicitations = elicitations
error = ErrorData(
code=URL_ELICITATION_REQUIRED,
message=message,
data={"elicitations": [e.model_dump(by_alias=True, exclude_none=True) for e in elicitations]},
)
super().__init__(error)
|
The list of URL elicitations required before the request can proceed.
Reconstruct from an ErrorData received over the wire.
Source code in src/mcp/shared/exceptions.py62
63
64
65
66
67
68
69
70
71 | @classmethod
def from_error(cls, error: ErrorData) -> UrlElicitationRequiredError:
"""Reconstruct from an ErrorData received over the wire."""
if error.code != URL_ELICITATION_REQUIRED:
raise ValueError(f"Expected error code {URL_ELICITATION_REQUIRED}, got {error.code}")
data = cast(dict[str, Any], error.data or {})
raw_elicitations = cast(list[dict[str, Any]], data.get("elicitations", []))
elicitations = [ElicitRequestURLParams.model_validate(e) for e in raw_elicitations]
return cls(elicitations, error.message)
|
Bases: Request[CallToolRequestParams, Literal['tools/call']]
Used by the client to invoke a tool provided by the server.
Source code in src/mcp/types.py1356
1357
1358
1359
1360 | class CallToolRequest(Request[CallToolRequestParams, Literal["tools/call"]]):
"""Used by the client to invoke a tool provided by the server."""
method: Literal["tools/call"] = "tools/call"
params: CallToolRequestParams
|
Bases: BaseModel
Capabilities a client may support.
Source code in src/mcp/types.py417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434 | class ClientCapabilities(BaseModel):
"""Capabilities a client may support."""
experimental: dict[str, dict[str, Any]] | None = None
"""Experimental, non-standard capabilities that the client supports."""
sampling: SamplingCapability | None = None
"""
Present if the client supports sampling from an LLM.
Can contain fine-grained capabilities like context and tools support.
"""
elicitation: ElicitationCapability | None = None
"""Present if the client supports elicitation from the user."""
roots: RootsCapability | None = None
"""Present if the client supports listing roots."""
tasks: ClientTasksCapability | None = None
"""Present if the client supports task-augmented requests."""
model_config = ConfigDict(extra="allow")
|
Experimental, non-standard capabilities that the client supports.
Present if the client supports sampling from an LLM. Can contain fine-grained capabilities like context and tools support.
Present if the client supports elicitation from the user.
Present if the client supports listing roots.
Present if the client supports task-augmented requests.
Bases: Request[CompleteRequestParams, Literal['completion/complete']]
A request from the client to the server, to ask for completion options.
Source code in src/mcp/types.py1645
1646
1647
1648
1649 | class CompleteRequest(Request[CompleteRequestParams, Literal["completion/complete"]]):
"""A request from the client to the server, to ask for completion options."""
method: Literal["completion/complete"] = "completion/complete"
params: CompleteRequestParams
|
Bases: Request[CreateMessageRequestParams, Literal['sampling/createMessage']]
A request from the server to sample an LLM via the client.
Source code in src/mcp/types.py1539
1540
1541
1542
1543 | class CreateMessageRequest(Request[CreateMessageRequestParams, Literal["sampling/createMessage"]]):
"""A request from the server to sample an LLM via the client."""
method: Literal["sampling/createMessage"] = "sampling/createMessage"
params: CreateMessageRequestParams
|
Bases: Result
The client's response to a sampling/create_message request from the server.
This is the backwards-compatible version that returns single content (no arrays). Used when the request does not include tools.
Source code in src/mcp/types.py1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563 | class CreateMessageResult(Result):
"""The client's response to a sampling/create_message request from the server.
This is the backwards-compatible version that returns single content (no arrays).
Used when the request does not include tools.
"""
role: Role
"""The role of the message sender (typically 'assistant' for LLM responses)."""
content: SamplingContent
"""Response content. Single content block (text, image, or audio)."""
model: str
"""The name of the model that generated the message."""
stopReason: StopReason | None = None
"""The reason why sampling stopped, if known."""
|
The role of the message sender (typically 'assistant' for LLM responses).
Response content. Single content block (text, image, or audio).
The reason why sampling stopped, if known.
Bases: Result
The client's response to a sampling/create_message request when tools were provided.
This version supports array content for tool use flows.
Source code in src/mcp/types.py1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591 | class CreateMessageResultWithTools(Result):
"""The client's response to a sampling/create_message request when tools were provided.
This version supports array content for tool use flows.
"""
role: Role
"""The role of the message sender (typically 'assistant' for LLM responses)."""
content: SamplingMessageContentBlock | list[SamplingMessageContentBlock]
"""
Response content. May be a single content block or an array.
May include ToolUseContent if stopReason is 'toolUse'.
"""
model: str
"""The name of the model that generated the message."""
stopReason: StopReason | None = None
"""
The reason why sampling stopped, if known.
'toolUse' indicates the model wants to use a tool.
"""
@property
def content_as_list(self) -> list[SamplingMessageContentBlock]:
"""Returns the content as a list of content blocks, regardless of whether
it was originally a single block or a list."""
return self.content if isinstance(self.content, list) else [self.content]
|
The role of the message sender (typically 'assistant' for LLM responses).
Response content. May be a single content block or an array. May include ToolUseContent if stopReason is 'toolUse'.
The reason why sampling stopped, if known. 'toolUse' indicates the model wants to use a tool.
Returns the content as a list of content blocks, regardless of whether it was originally a single block or a list.
Bases: BaseModel
Error information for JSON-RPC error responses.
Source code in src/mcp/types.py193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 | class ErrorData(BaseModel):
"""Error information for JSON-RPC error responses."""
code: int
"""The error type that occurred."""
message: str
"""
A short description of the error. The message SHOULD be limited to a concise single
sentence.
"""
data: Any | None = None
"""
Additional information about the error. The value of this member is defined by the
sender (e.g. detailed error information, nested errors etc.).
"""
model_config = ConfigDict(extra="allow")
|
A short description of the error. The message SHOULD be limited to a concise single sentence.
Additional information about the error. The value of this member is defined by the sender (e.g. detailed error information, nested errors etc.).
Bases: Request[GetPromptRequestParams, Literal['prompts/get']]
Used by the client to get a prompt provided by the server.
Source code in src/mcp/types.py1019
1020
1021
1022
1023 | class GetPromptRequest(Request[GetPromptRequestParams, Literal["prompts/get"]]):
"""Used by the client to get a prompt provided by the server."""
method: Literal["prompts/get"] = "prompts/get"
params: GetPromptRequestParams
|
Bases: Result
The server's response to a prompts/get request from the client.
Source code in src/mcp/types.py1221
1222
1223
1224
1225
1226 | class GetPromptResult(Result):
"""The server's response to a prompts/get request from the client."""
description: str | None = None
"""An optional description for the prompt."""
messages: list[PromptMessage]
|
An optional description for the prompt.
Bases: BaseMetadata
Describes the name and version of an MCP implementation.
Source code in src/mcp/types.py263
264
265
266
267
268
269
270
271
272
273
274 | class Implementation(BaseMetadata):
"""Describes the name and version of an MCP implementation."""
version: str
websiteUrl: str | None = None
"""An optional URL of the website for this implementation."""
icons: list[Icon] | None = None
"""An optional list of icons for this implementation."""
model_config = ConfigDict(extra="allow")
|
Bases: Notification[NotificationParams | None, Literal['notifications/initialized']]
This notification is sent from the client to the server after initialization has finished.
Source code in src/mcp/types.py702
703
704
705
706
707
708
709 | class InitializedNotification(Notification[NotificationParams | None, Literal["notifications/initialized"]]):
"""
This notification is sent from the client to the server after initialization has
finished.
"""
method: Literal["notifications/initialized"] = "notifications/initialized"
params: NotificationParams | None = None
|
Bases: Request[InitializeRequestParams, Literal['initialize']]
This request is sent from the client to the server when it first connects, asking it to begin initialization.
Source code in src/mcp/types.py681
682
683
684
685
686
687
688 | class InitializeRequest(Request[InitializeRequestParams, Literal["initialize"]]):
"""
This request is sent from the client to the server when it first connects, asking it
to begin initialization.
"""
method: Literal["initialize"] = "initialize"
params: InitializeRequestParams
|
Bases: Result
After receiving an initialize request from the client, the server sends this.
Source code in src/mcp/types.py691
692
693
694
695
696
697
698
699 | class InitializeResult(Result):
"""After receiving an initialize request from the client, the server sends this."""
protocolVersion: str | int
"""The version of the Model Context Protocol that the server wants to use."""
capabilities: ServerCapabilities
serverInfo: Implementation
instructions: str | None = None
"""Instructions describing how to use the server and its features."""
|
Bases: BaseModel
A response to a request that indicates an error occurred.
Source code in src/mcp/types.py214
215
216
217
218
219
220 | class JSONRPCError(BaseModel):
"""A response to a request that indicates an error occurred."""
jsonrpc: Literal["2.0"]
id: str | int
error: ErrorData
model_config = ConfigDict(extra="allow")
|
Bases: Request[dict[str, Any] | None, str]
A request that expects a response.
Source code in src/mcp/types.py152
153
154
155
156
157
158 | class JSONRPCRequest(Request[dict[str, Any] | None, str]):
"""A request that expects a response."""
jsonrpc: Literal["2.0"]
id: RequestId
method: str
params: dict[str, Any] | None = None
|
Bases: BaseModel
A successful (non-error) response to a request.
Source code in src/mcp/types.py168
169
170
171
172
173
174 | class JSONRPCResponse(BaseModel):
"""A successful (non-error) response to a request."""
jsonrpc: Literal["2.0"]
id: RequestId
result: dict[str, Any]
model_config = ConfigDict(extra="allow")
|
Bases: PaginatedRequest[Literal['prompts/list']]
Sent from the client to request a list of prompts and prompt templates.
Source code in src/mcp/types.py968
969
970
971 | class ListPromptsRequest(PaginatedRequest[Literal["prompts/list"]]):
"""Sent from the client to request a list of prompts and prompt templates."""
method: Literal["prompts/list"] = "prompts/list"
|
Bases: PaginatedResult
The server's response to a prompts/list request from the client.
Source code in src/mcp/types.py1003
1004
1005
1006 | class ListPromptsResult(PaginatedResult):
"""The server's response to a prompts/list request from the client."""
prompts: list[Prompt]
|
Bases: PaginatedRequest[Literal['resources/list']]
Sent from the client to request a list of resources the server has.
Source code in src/mcp/types.py755
756
757
758 | class ListResourcesRequest(PaginatedRequest[Literal["resources/list"]]):
"""Sent from the client to request a list of resources the server has."""
method: Literal["resources/list"] = "resources/list"
|
Bases: PaginatedResult
The server's response to a resources/list request from the client.
Source code in src/mcp/types.py820
821
822
823 | class ListResourcesResult(PaginatedResult):
"""The server's response to a resources/list request from the client."""
resources: list[Resource]
|
Bases: PaginatedResult
The server's response to a tools/list request from the client.
Source code in src/mcp/types.py1342
1343
1344
1345 | class ListToolsResult(PaginatedResult):
"""The server's response to a tools/list request from the client."""
tools: list[Tool]
|
Bases: Notification[LoggingMessageNotificationParams, Literal['notifications/message']]
Notification of a log message passed from server to client.
Source code in src/mcp/types.py1415
1416
1417
1418
1419 | class LoggingMessageNotification(Notification[LoggingMessageNotificationParams, Literal["notifications/message"]]):
"""Notification of a log message passed from server to client."""
method: Literal["notifications/message"] = "notifications/message"
params: LoggingMessageNotificationParams
|
Bases: BaseModel, Generic[NotificationParamsT, MethodT]
Base class for JSON-RPC notifications.
Source code in src/mcp/types.py125
126
127
128
129
130 | class Notification(BaseModel, Generic[NotificationParamsT, MethodT]):
"""Base class for JSON-RPC notifications."""
method: MethodT
params: NotificationParamsT
model_config = ConfigDict(extra="allow")
|
Bases: Request[RequestParams | None, Literal['ping']]
A ping, issued by either the server or the client, to check that the other party is still alive.
Source code in src/mcp/types.py712
713
714
715
716
717
718
719 | class PingRequest(Request[RequestParams | None, Literal["ping"]]):
"""
A ping, issued by either the server or the client, to check that the other party is
still alive.
"""
method: Literal["ping"] = "ping"
params: RequestParams | None = None
|
Bases: Notification[ProgressNotificationParams, Literal['notifications/progress']]
An out-of-band notification used to inform the receiver of a progress update for a long-running request.
Source code in src/mcp/types.py745
746
747
748
749
750
751
752 | class ProgressNotification(Notification[ProgressNotificationParams, Literal["notifications/progress"]]):
"""
An out-of-band notification used to inform the receiver of a progress update for a
long-running request.
"""
method: Literal["notifications/progress"] = "notifications/progress"
params: ProgressNotificationParams
|
Bases: BaseModel
Capability for prompts operations.
Source code in src/mcp/types.py437
438
439
440
441
442 | class PromptsCapability(BaseModel):
"""Capability for prompts operations."""
listChanged: bool | None = None
"""Whether this server supports notifications for changes to the prompt list."""
model_config = ConfigDict(extra="allow")
|
Whether this server supports notifications for changes to the prompt list.
Bases: Request[ReadResourceRequestParams, Literal['resources/read']]
Sent from the client to the server, to read a specific resource URI.
Source code in src/mcp/types.py849
850
851
852
853 | class ReadResourceRequest(Request[ReadResourceRequestParams, Literal["resources/read"]]):
"""Sent from the client to the server, to read a specific resource URI."""
method: Literal["resources/read"] = "resources/read"
params: ReadResourceRequestParams
|
Bases: Result
The server's response to a resources/read request from the client.
Source code in src/mcp/types.py888
889
890
891 | class ReadResourceResult(Result):
"""The server's response to a resources/read request from the client."""
contents: list[TextResourceContents | BlobResourceContents]
|
Bases: BaseMetadata
A known resource that the server is capable of reading.
Source code in src/mcp/types.py767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791 | class Resource(BaseMetadata):
"""A known resource that the server is capable of reading."""
uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]
"""The URI of this resource."""
description: str | None = None
"""A description of what this resource represents."""
mimeType: str | None = None
"""The MIME type of this resource, if known."""
size: int | None = None
"""
The size of the raw resource content, in bytes (i.e., before base64 encoding
or any tokenization), if known.
This can be used by Hosts to display file sizes and estimate context window usage.
"""
icons: list[Icon] | None = None
"""An optional list of icons for this resource."""
annotations: Annotations | None = None
meta: dict[str, Any] | None = Field(alias="_meta", default=None)
"""
See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)
for notes on _meta usage.
"""
model_config = ConfigDict(extra="allow")
|
The URI of this resource.
A description of what this resource represents.
The MIME type of this resource, if known.
The size of the raw resource content, in bytes (i.e., before base64 encoding or any tokenization), if known.
This can be used by Hosts to display file sizes and estimate context window usage.
An optional list of icons for this resource.
See MCP specification for notes on _meta usage.
Bases: BaseModel
Capability for resources operations.
Source code in src/mcp/types.py445
446
447
448
449
450
451
452 | class ResourcesCapability(BaseModel):
"""Capability for resources operations."""
subscribe: bool | None = None
"""Whether this server supports subscribing to resource updates."""
listChanged: bool | None = None
"""Whether this server supports notifications for changes to the resource list."""
model_config = ConfigDict(extra="allow")
|
Bases: Notification[ResourceUpdatedNotificationParams, Literal['notifications/resources/updated']]
A notification from the server to the client, informing it that a resource has changed and may need to be read again.
Source code in src/mcp/types.py956
957
958
959
960
961
962
963
964
965 | class ResourceUpdatedNotification(
Notification[ResourceUpdatedNotificationParams, Literal["notifications/resources/updated"]]
):
"""
A notification from the server to the client, informing it that a resource has
changed and may need to be read again.
"""
method: Literal["notifications/resources/updated"] = "notifications/resources/updated"
params: ResourceUpdatedNotificationParams
|
Bases: BaseModel
Capability for root operations.
Source code in src/mcp/types.py277
278
279
280
281
282 | class RootsCapability(BaseModel):
"""Capability for root operations."""
listChanged: bool | None = None
"""Whether the client supports notifications for changes to the roots list."""
model_config = ConfigDict(extra="allow")
|
Whether the client supports notifications for changes to the roots list.
Bases: BaseModel
Sampling capability structure, allowing fine-grained capability advertisement.
Source code in src/mcp/types.py334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349 | class SamplingCapability(BaseModel):
"""
Sampling capability structure, allowing fine-grained capability advertisement.
"""
context: SamplingContextCapability | None = None
"""
Present if the client supports non-'none' values for includeContext parameter.
SOFT-DEPRECATED: New implementations should use tools parameter instead.
"""
tools: SamplingToolsCapability | None = None
"""
Present if the client supports tools and toolChoice parameters in sampling requests.
Presence indicates full tool calling support during sampling.
"""
model_config = ConfigDict(extra="allow")
|
Present if the client supports non-'none' values for includeContext parameter. SOFT-DEPRECATED: New implementations should use tools parameter instead.
Present if the client supports tools and toolChoice parameters in sampling requests. Presence indicates full tool calling support during sampling.
Basic content types for sampling responses (without tool use). Used for backwards-compatible CreateMessageResult when tools are not used.
Bases: BaseModel
Capability for context inclusion during sampling.
Indicates support for non-'none' values in the includeContext parameter. SOFT-DEPRECATED: New implementations should use tools parameter instead.
Source code in src/mcp/types.py285
286
287
288
289
290
291
292
293 | class SamplingContextCapability(BaseModel):
"""
Capability for context inclusion during sampling.
Indicates support for non-'none' values in the includeContext parameter.
SOFT-DEPRECATED: New implementations should use tools parameter instead.
"""
model_config = ConfigDict(extra="allow")
|
Bases: BaseModel
Describes a message issued to or received from an LLM API.
Source code in src/mcp/types.py1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174 | class SamplingMessage(BaseModel):
"""Describes a message issued to or received from an LLM API."""
role: Role
content: SamplingMessageContentBlock | list[SamplingMessageContentBlock]
"""
Message content. Can be a single content block or an array of content blocks
for multi-modal messages and tool interactions.
"""
meta: dict[str, Any] | None = Field(alias="_meta", default=None)
"""
See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)
for notes on _meta usage.
"""
model_config = ConfigDict(extra="allow")
@property
def content_as_list(self) -> list[SamplingMessageContentBlock]:
"""Returns the content as a list of content blocks, regardless of whether
it was originally a single block or a list."""
return self.content if isinstance(self.content, list) else [self.content]
|
Message content. Can be a single content block or an array of content blocks for multi-modal messages and tool interactions.
See MCP specification for notes on _meta usage.
Returns the content as a list of content blocks, regardless of whether it was originally a single block or a list.
Content block types allowed in sampling messages.
Bases: BaseModel
Capability indicating support for tool calling during sampling.
When present in ClientCapabilities.sampling, indicates that the client supports the tools and toolChoice parameters in sampling requests.
Source code in src/mcp/types.py296
297
298
299
300
301
302
303
304 | class SamplingToolsCapability(BaseModel):
"""
Capability indicating support for tool calling during sampling.
When present in ClientCapabilities.sampling, indicates that the client
supports the tools and toolChoice parameters in sampling requests.
"""
model_config = ConfigDict(extra="allow")
|
Bases: BaseModel
Capabilities that a server may support.
Source code in src/mcp/types.py506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523 | class ServerCapabilities(BaseModel):
"""Capabilities that a server may support."""
experimental: dict[str, dict[str, Any]] | None = None
"""Experimental, non-standard capabilities that the server supports."""
logging: LoggingCapability | None = None
"""Present if the server supports sending log messages to the client."""
prompts: PromptsCapability | None = None
"""Present if the server offers any prompt templates."""
resources: ResourcesCapability | None = None
"""Present if the server offers any resources to read."""
tools: ToolsCapability | None = None
"""Present if the server offers any tools to call."""
completions: CompletionsCapability | None = None
"""Present if the server offers autocompletion suggestions for prompts and resources."""
tasks: ServerTasksCapability | None = None
"""Present if the server supports task-augmented requests."""
model_config = ConfigDict(extra="allow")
|
Experimental, non-standard capabilities that the server supports.
Present if the server supports sending log messages to the client.
Present if the server offers any prompt templates.
Present if the server offers any resources to read.
Present if the server offers any tools to call.
Present if the server offers autocompletion suggestions for prompts and resources.
Present if the server supports task-augmented requests.
Bases: Request[SetLevelRequestParams, Literal['logging/setLevel']]
A request from the client to the server, to enable or adjust logging.
Source code in src/mcp/types.py1393
1394
1395
1396
1397 | class SetLevelRequest(Request[SetLevelRequestParams, Literal["logging/setLevel"]]):
"""A request from the client to the server, to enable or adjust logging."""
method: Literal["logging/setLevel"] = "logging/setLevel"
params: SetLevelRequestParams
|
Bases: Request[SubscribeRequestParams, Literal['resources/subscribe']]
Sent from the client to request resources/updated notifications from the server whenever a particular resource changes.
Source code in src/mcp/types.py917
918
919
920
921
922
923
924 | class SubscribeRequest(Request[SubscribeRequestParams, Literal["resources/subscribe"]]):
"""
Sent from the client to request resources/updated notifications from the server
whenever a particular resource changes.
"""
method: Literal["resources/subscribe"] = "resources/subscribe"
params: SubscribeRequestParams
|
Bases: BaseMetadata
Definition for a tool the client can call.
Source code in src/mcp/types.py1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339 | class Tool(BaseMetadata):
"""Definition for a tool the client can call."""
description: str | None = None
"""A human-readable description of the tool."""
inputSchema: dict[str, Any]
"""A JSON Schema object defining the expected parameters for the tool."""
outputSchema: dict[str, Any] | None = None
"""
An optional JSON Schema object defining the structure of the tool's output
returned in the structuredContent field of a CallToolResult.
"""
icons: list[Icon] | None = None
"""An optional list of icons for this tool."""
annotations: ToolAnnotations | None = None
"""Optional additional tool information."""
meta: dict[str, Any] | None = Field(alias="_meta", default=None)
"""
See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)
for notes on _meta usage.
"""
execution: ToolExecution | None = None
model_config = ConfigDict(extra="allow")
|
A human-readable description of the tool.
A JSON Schema object defining the expected parameters for the tool.
An optional JSON Schema object defining the structure of the tool's output returned in the structuredContent field of a CallToolResult.
An optional list of icons for this tool.
Optional additional tool information.
See MCP specification for notes on _meta usage.
Bases: BaseModel
Controls tool usage behavior during sampling.
Allows the server to specify whether and how the LLM should use tools in its response.
Source code in src/mcp/types.py1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501 | class ToolChoice(BaseModel):
"""
Controls tool usage behavior during sampling.
Allows the server to specify whether and how the LLM should use tools
in its response.
"""
mode: Literal["auto", "required", "none"] | None = None
"""
Controls when tools are used:
- "auto": Model decides whether to use tools (default)
- "required": Model MUST use at least one tool before completing
- "none": Model should not use tools
"""
model_config = ConfigDict(extra="allow")
|
Controls when tools are used: - "auto": Model decides whether to use tools (default) - "required": Model MUST use at least one tool before completing - "none": Model should not use tools
Bases: BaseModel
Content representing the result of a tool execution.
This content type appears in user messages as a response to a ToolUseContent from the assistant. It contains the output of executing the requested tool.
Source code in src/mcp/types.py1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143 | class ToolResultContent(BaseModel):
"""
Content representing the result of a tool execution.
This content type appears in user messages as a response to a ToolUseContent
from the assistant. It contains the output of executing the requested tool.
"""
type: Literal["tool_result"]
"""Discriminator for tool result content."""
toolUseId: str
"""The unique identifier that corresponds to the tool call's id field."""
content: list["ContentBlock"] = []
"""
A list of content objects representing the tool result.
Defaults to empty list if not provided.
"""
structuredContent: dict[str, Any] | None = None
"""
Optional structured tool output that matches the tool's outputSchema (if defined).
"""
isError: bool | None = None
"""Whether the tool execution resulted in an error."""
meta: dict[str, Any] | None = Field(alias="_meta", default=None)
"""
See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)
for notes on _meta usage.
"""
model_config = ConfigDict(extra="allow")
|
The unique identifier that corresponds to the tool call's id field.
A list of content objects representing the tool result. Defaults to empty list if not provided.
Optional structured tool output that matches the tool's outputSchema (if defined).
Whether the tool execution resulted in an error.
See MCP specification for notes on _meta usage.
Bases: BaseModel
Capability for tools operations.
Source code in src/mcp/types.py455
456
457
458
459
460 | class ToolsCapability(BaseModel):
"""Capability for tools operations."""
listChanged: bool | None = None
"""Whether this server supports notifications for changes to the tool list."""
model_config = ConfigDict(extra="allow")
|
Whether this server supports notifications for changes to the tool list.
Bases: BaseModel
Content representing an assistant's request to invoke a tool.
This content type appears in assistant messages when the LLM wants to call a tool during sampling. The server should execute the tool and return a ToolResultContent in the next user message.
Source code in src/mcp/types.py1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107 | class ToolUseContent(BaseModel):
"""
Content representing an assistant's request to invoke a tool.
This content type appears in assistant messages when the LLM wants to call a tool
during sampling. The server should execute the tool and return a ToolResultContent
in the next user message.
"""
type: Literal["tool_use"]
"""Discriminator for tool use content."""
name: str
"""The name of the tool to invoke. Must match a tool name from the request's tools array."""
id: str
"""Unique identifier for this tool call, used to correlate with ToolResultContent."""
input: dict[str, Any]
"""Arguments to pass to the tool. Must conform to the tool's inputSchema."""
meta: dict[str, Any] | None = Field(alias="_meta", default=None)
"""
See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)
for notes on _meta usage.
"""
model_config = ConfigDict(extra="allow")
|
The name of the tool to invoke. Must match a tool name from the request's tools array.
Unique identifier for this tool call, used to correlate with ToolResultContent.
Arguments to pass to the tool. Must conform to the tool's inputSchema.
See MCP specification for notes on _meta usage.
Bases: Request[UnsubscribeRequestParams, Literal['resources/unsubscribe']]
Sent from the client to request cancellation of resources/updated notifications from the server.
Source code in src/mcp/types.py935
936
937
938
939
940
941
942 | class UnsubscribeRequest(Request[UnsubscribeRequestParams, Literal["resources/unsubscribe"]]):
"""
Sent from the client to request cancellation of resources/updated notifications from
the server.
"""
method: Literal["resources/unsubscribe"] = "resources/unsubscribe"
params: UnsubscribeRequestParams
|