Add `thread_subscriptions` as a supported web-facing stream writer in MDAD and route its unstable client endpoints via the same explicit writer-or-main model used for the other web-facing stream-backed APIs. This is not just another generic worker route. Current Synapse gives thread subscriptions their own `writers.thread_subscriptions` configuration, backs them with a multi-writer stream, and asserts on store writes that the current instance is an allowed thread-subscriptions writer. Explicit early routing is also required here because the subscription endpoint is room-scoped. In MDAD's specialized-worker model, the existing room-worker regex would otherwise match `/_matrix/client/unstable/io.element.msc4306/rooms/...` and steal the request before it reached the correct writer-or-main fallback. Unlike `device_lists`, support is added without enabling a thread-subscriptions worker by default in the standard presets. The underlying MSC4306/4308 feature remains unstable and disabled by default upstream, so the conservative default is to keep the worker count at `0` and let the new explicit routes fall back to `main` unless an operator opts in. Refs: -masterb99a58719b/synapse/config/workers.py (L175-L182)-b99a58719b/synapse/rest/client/thread_subscriptions.py (L38-L247)-b99a58719b/synapse/storage/databases/main/thread_subscriptions.py (L66-L83)-b99a58719b/synapse/storage/databases/main/thread_subscriptions.py (L192-L322)
| @@ -936,6 +936,9 @@ matrix_synapse_workers_presets: | |||||
| stream_writer_presence_stream_workers_count: 0 | stream_writer_presence_stream_workers_count: 0 | ||||
| stream_writer_push_rules_stream_workers_count: 0 | stream_writer_push_rules_stream_workers_count: 0 | ||||
| stream_writer_device_lists_stream_workers_count: 0 | stream_writer_device_lists_stream_workers_count: 0 | ||||
| # Keep disabled by default: MSC4306/4308 thread subscriptions are unstable | |||||
| # and disabled in upstream Synapse unless explicitly opted in. | |||||
| stream_writer_thread_subscriptions_stream_workers_count: 0 | |||||
| one-of-each: | one-of-each: | ||||
| room_workers_count: 0 | room_workers_count: 0 | ||||
| sync_workers_count: 0 | sync_workers_count: 0 | ||||
| @@ -956,6 +959,9 @@ matrix_synapse_workers_presets: | |||||
| stream_writer_presence_stream_workers_count: 1 | stream_writer_presence_stream_workers_count: 1 | ||||
| stream_writer_push_rules_stream_workers_count: 1 | stream_writer_push_rules_stream_workers_count: 1 | ||||
| stream_writer_device_lists_stream_workers_count: 1 | stream_writer_device_lists_stream_workers_count: 1 | ||||
| # Keep disabled by default: MSC4306/4308 thread subscriptions are unstable | |||||
| # and disabled in upstream Synapse unless explicitly opted in. | |||||
| stream_writer_thread_subscriptions_stream_workers_count: 0 | |||||
| specialized-workers: | specialized-workers: | ||||
| room_workers_count: 1 | room_workers_count: 1 | ||||
| sync_workers_count: 1 | sync_workers_count: 1 | ||||
| @@ -976,6 +982,9 @@ matrix_synapse_workers_presets: | |||||
| stream_writer_presence_stream_workers_count: 1 | stream_writer_presence_stream_workers_count: 1 | ||||
| stream_writer_push_rules_stream_workers_count: 1 | stream_writer_push_rules_stream_workers_count: 1 | ||||
| stream_writer_device_lists_stream_workers_count: 1 | stream_writer_device_lists_stream_workers_count: 1 | ||||
| # Keep disabled by default: MSC4306/4308 thread subscriptions are unstable | |||||
| # and disabled in upstream Synapse unless explicitly opted in. | |||||
| stream_writer_thread_subscriptions_stream_workers_count: 0 | |||||
| # Controls whether the matrix-synapse container exposes the various worker ports | # Controls whether the matrix-synapse container exposes the various worker ports | ||||
| # (see `port` and `metrics_port` in `matrix_synapse_workers_enabled_list`) outside of the container. | # (see `port` and `metrics_port` in `matrix_synapse_workers_enabled_list`) outside of the container. | ||||
| @@ -1078,6 +1087,10 @@ matrix_synapse_workers_stream_writer_push_rules_stream_workers_count: "{{ matrix | |||||
| # More than 1 worker is also supported of this type. | # More than 1 worker is also supported of this type. | ||||
| matrix_synapse_workers_stream_writer_device_lists_stream_workers_count: "{{ matrix_synapse_workers_presets[matrix_synapse_workers_preset]['stream_writer_device_lists_stream_workers_count'] }}" | matrix_synapse_workers_stream_writer_device_lists_stream_workers_count: "{{ matrix_synapse_workers_presets[matrix_synapse_workers_preset]['stream_writer_device_lists_stream_workers_count'] }}" | ||||
| # matrix_synapse_workers_stream_writer_thread_subscriptions_stream_workers_count controls how many stream writers that handle the `thread_subscriptions` stream to spawn. | |||||
| # More than 1 worker is also supported of this type. | |||||
| matrix_synapse_workers_stream_writer_thread_subscriptions_stream_workers_count: "{{ matrix_synapse_workers_presets[matrix_synapse_workers_preset]['stream_writer_thread_subscriptions_stream_workers_count'] }}" | |||||
| # A list of stream writer workers to enable. This list is built automatically based on other variables. | # A list of stream writer workers to enable. This list is built automatically based on other variables. | ||||
| # You're encouraged to enable/disable stream writer workers by setting `matrix_synapse_workers_stream_writer_*_stream_workers_count` variables, instead of adjusting this list manually. | # You're encouraged to enable/disable stream writer workers by setting `matrix_synapse_workers_stream_writer_*_stream_workers_count` variables, instead of adjusting this list manually. | ||||
| matrix_synapse_workers_stream_writers: | | matrix_synapse_workers_stream_writers: | | ||||
| @@ -1099,6 +1112,8 @@ matrix_synapse_workers_stream_writers: | | |||||
| ([{'stream': 'push_rules'}] * matrix_synapse_workers_stream_writer_push_rules_stream_workers_count | int) | ([{'stream': 'push_rules'}] * matrix_synapse_workers_stream_writer_push_rules_stream_workers_count | int) | ||||
| + | + | ||||
| ([{'stream': 'device_lists'}] * matrix_synapse_workers_stream_writer_device_lists_stream_workers_count | int) | ([{'stream': 'device_lists'}] * matrix_synapse_workers_stream_writer_device_lists_stream_workers_count | int) | ||||
| + | |||||
| ([{'stream': 'thread_subscriptions'}] * matrix_synapse_workers_stream_writer_thread_subscriptions_stream_workers_count | int) | |||||
| }} | }} | ||||
| matrix_synapse_workers_stream_writers_container_arguments: [] | matrix_synapse_workers_stream_writers_container_arguments: [] | ||||
| @@ -2038,6 +2053,7 @@ matrix_synapse_reverse_proxy_companion_synapse_stream_writer_receipts_stream_wor | |||||
| matrix_synapse_reverse_proxy_companion_synapse_stream_writer_presence_stream_worker_client_server_locations: "{{ matrix_synapse_workers_stream_writer_presence_stream_worker_client_server_endpoints }}" | matrix_synapse_reverse_proxy_companion_synapse_stream_writer_presence_stream_worker_client_server_locations: "{{ matrix_synapse_workers_stream_writer_presence_stream_worker_client_server_endpoints }}" | ||||
| matrix_synapse_reverse_proxy_companion_synapse_stream_writer_push_rules_stream_worker_client_server_locations: "{{ matrix_synapse_workers_stream_writer_push_rules_stream_worker_client_server_endpoints }}" | matrix_synapse_reverse_proxy_companion_synapse_stream_writer_push_rules_stream_worker_client_server_locations: "{{ matrix_synapse_workers_stream_writer_push_rules_stream_worker_client_server_endpoints }}" | ||||
| matrix_synapse_reverse_proxy_companion_synapse_stream_writer_device_lists_stream_worker_client_server_locations: "{{ matrix_synapse_workers_stream_writer_device_lists_stream_worker_client_server_endpoints }}" | matrix_synapse_reverse_proxy_companion_synapse_stream_writer_device_lists_stream_worker_client_server_locations: "{{ matrix_synapse_workers_stream_writer_device_lists_stream_worker_client_server_endpoints }}" | ||||
| matrix_synapse_reverse_proxy_companion_synapse_stream_writer_thread_subscriptions_stream_worker_client_server_locations: "{{ matrix_synapse_workers_stream_writer_thread_subscriptions_stream_worker_client_server_endpoints }}" | |||||
| matrix_synapse_reverse_proxy_companion_synapse_media_repository_locations: "{{ matrix_synapse_workers_media_repository_endpoints | default([]) }}" | matrix_synapse_reverse_proxy_companion_synapse_media_repository_locations: "{{ matrix_synapse_workers_media_repository_endpoints | default([]) }}" | ||||
| matrix_synapse_reverse_proxy_companion_synapse_user_dir_locations: "{{ matrix_synapse_workers_user_dir_worker_client_server_endpoints | default([]) }}" | matrix_synapse_reverse_proxy_companion_synapse_user_dir_locations: "{{ matrix_synapse_workers_user_dir_worker_client_server_endpoints | default([]) }}" | ||||
| matrix_synapse_reverse_proxy_companion_client_server_main_override_locations_regex: ^/_matrix/client/(api/v1|r0|v3|unstable)/(account/3pid/|directory/list/room/|rooms/[^/]+/(forget|upgrade|report)|register) | matrix_synapse_reverse_proxy_companion_client_server_main_override_locations_regex: ^/_matrix/client/(api/v1|r0|v3|unstable)/(account/3pid/|directory/list/room/|rooms/[^/]+/(forget|upgrade|report)|register) | ||||
| @@ -12,6 +12,7 @@ | |||||
| {% set stream_writer_presence_stream_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'stream_writer') | selectattr('stream_writer_stream', 'equalto', 'presence') | list %} | {% set stream_writer_presence_stream_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'stream_writer') | selectattr('stream_writer_stream', 'equalto', 'presence') | list %} | ||||
| {% set stream_writer_push_rules_stream_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'stream_writer') | selectattr('stream_writer_stream', 'equalto', 'push_rules') | list %} | {% set stream_writer_push_rules_stream_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'stream_writer') | selectattr('stream_writer_stream', 'equalto', 'push_rules') | list %} | ||||
| {% set stream_writer_device_lists_stream_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'stream_writer') | selectattr('stream_writer_stream', 'equalto', 'device_lists') | list %} | {% set stream_writer_device_lists_stream_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'stream_writer') | selectattr('stream_writer_stream', 'equalto', 'device_lists') | list %} | ||||
| {% set stream_writer_thread_subscriptions_stream_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'stream_writer') | selectattr('stream_writer_stream', 'equalto', 'thread_subscriptions') | list %} | |||||
| {% set media_repository_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'media_repository') | list %} | {% set media_repository_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'media_repository') | list %} | ||||
| {% set user_dir_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'user_dir') | list %} | {% set user_dir_workers = matrix_synapse_reverse_proxy_companion_synapse_workers_list | selectattr('type', 'equalto', 'user_dir') | list %} | ||||
| {% set stream_writer_client_server_routes = [ | {% set stream_writer_client_server_routes = [ | ||||
| @@ -57,6 +58,12 @@ | |||||
| 'locations': matrix_synapse_reverse_proxy_companion_synapse_stream_writer_device_lists_stream_worker_client_server_locations, | 'locations': matrix_synapse_reverse_proxy_companion_synapse_stream_writer_device_lists_stream_worker_client_server_locations, | ||||
| 'upstream': 'stream_writer_device_lists_stream_workers_upstream', | 'upstream': 'stream_writer_device_lists_stream_workers_upstream', | ||||
| }, | }, | ||||
| { | |||||
| 'doc_url': 'https://github.com/element-hq/synapse/blob/b99a58719b274fcbb327fd8d7649185792bfd12c/synapse/rest/client/thread_subscriptions.py#L38-L247', | |||||
| 'workers': stream_writer_thread_subscriptions_stream_workers, | |||||
| 'locations': matrix_synapse_reverse_proxy_companion_synapse_stream_writer_thread_subscriptions_stream_worker_client_server_locations, | |||||
| 'upstream': 'stream_writer_thread_subscriptions_stream_workers_upstream', | |||||
| }, | |||||
| ] %} | ] %} | ||||
| {% macro render_worker_upstream(name, workers, load_balance) %} | {% macro render_worker_upstream(name, workers, load_balance) %} | ||||
| @@ -134,6 +134,12 @@ matrix_synapse_workers_stream_writer_device_lists_stream_worker_client_server_en | |||||
| - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$ | - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$ | ||||
| - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$ | - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$ | ||||
| # matrix_synapse_workers_stream_writer_thread_subscriptions_stream_worker_client_server_endpoints contains the endpoints serviced by the `thread_subscriptions` stream writer. | |||||
| # Ref: https://github.com/element-hq/synapse/blob/b99a58719b274fcbb327fd8d7649185792bfd12c/synapse/rest/client/thread_subscriptions.py#L38-L247 | |||||
| matrix_synapse_workers_stream_writer_thread_subscriptions_stream_worker_client_server_endpoints: | |||||
| - ^/_matrix/client/unstable/io.element.msc4306/rooms/.*/thread/.*/subscription$ | |||||
| - ^/_matrix/client/unstable/io.element.msc4308/thread_subscriptions$ | |||||
| # matrix_synapse_workers_user_dir_worker_client_server_endpoints contains the endpoints serviced by the `type = user_dir` (`app = generic_worker`) worker. | # matrix_synapse_workers_user_dir_worker_client_server_endpoints contains the endpoints serviced by the `type = user_dir` (`app = generic_worker`) worker. | ||||
| # See: https://matrix-org.github.io/synapse/latest/workers.html#updating-the-user-directory | # See: https://matrix-org.github.io/synapse/latest/workers.html#updating-the-user-directory | ||||
| matrix_synapse_workers_user_dir_worker_client_server_endpoints: | matrix_synapse_workers_user_dir_worker_client_server_endpoints: | ||||
| @@ -142,11 +148,11 @@ matrix_synapse_workers_user_dir_worker_client_server_endpoints: | |||||
| # matrix_synapse_workers_known_stream_writer_stream_types contains the list of stream writer stream types that the playbook recognizes. | # matrix_synapse_workers_known_stream_writer_stream_types contains the list of stream writer stream types that the playbook recognizes. | ||||
| # This is used for validation purposes. If adding support for a new type, besides adding it to this list, | # This is used for validation purposes. If adding support for a new type, besides adding it to this list, | ||||
| # don't forget to actually configure it where appropriate (see worker.yaml.j2`, the nginx proxy configuration, etc). | # don't forget to actually configure it where appropriate (see worker.yaml.j2`, the nginx proxy configuration, etc). | ||||
| matrix_synapse_workers_known_stream_writer_stream_types: ['events', 'typing', 'to_device', 'account_data', 'receipts', 'presence', 'push_rules', 'device_lists'] | |||||
| matrix_synapse_workers_known_stream_writer_stream_types: ['events', 'typing', 'to_device', 'account_data', 'receipts', 'presence', 'push_rules', 'device_lists', 'thread_subscriptions'] | |||||
| # matrix_synapse_workers_webserving_stream_writer_types contains a list of stream writer types that serve web (client) requests. | # matrix_synapse_workers_webserving_stream_writer_types contains a list of stream writer types that serve web (client) requests. | ||||
| # Not all stream writers serve web requests. Some just perform background tasks. | # Not all stream writers serve web requests. Some just perform background tasks. | ||||
| matrix_synapse_workers_webserving_stream_writer_types: ['typing', 'to_device', 'account_data', 'receipts', 'presence', 'push_rules', 'device_lists'] | |||||
| matrix_synapse_workers_webserving_stream_writer_types: ['typing', 'to_device', 'account_data', 'receipts', 'presence', 'push_rules', 'device_lists', 'thread_subscriptions'] | |||||
| # matrix_synapse_workers_systemd_services_list contains a list of systemd services (one for each worker systemd service which serves web requests). | # matrix_synapse_workers_systemd_services_list contains a list of systemd services (one for each worker systemd service which serves web requests). | ||||
| # This list is built during runtime. | # This list is built during runtime. | ||||
| @@ -310,6 +316,10 @@ matrix_synapse_workers_generic_worker_endpoints: | |||||
| # - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$ | # - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$ | ||||
| # - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$ | # - ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$ | ||||
| # | # | ||||
| # # Thread subscriptions requests | |||||
| # - ^/_matrix/client/unstable/io.element.msc4306/rooms/.*/thread/.*/subscription$ | |||||
| # - ^/_matrix/client/unstable/io.element.msc4308/thread_subscriptions$ | |||||
| # | |||||
| # # User directory search requests | # # User directory search requests | ||||
| # - ^/_matrix/client/(r0|v3|unstable)/user_directory/search$ | # - ^/_matrix/client/(r0|v3|unstable)/user_directory/search$ | ||||
| # End of intentionally-ignored-endpoints | # End of intentionally-ignored-endpoints | ||||