arrow_flight/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A native Rust implementation of [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html)
19//! for exchanging [Arrow](https://arrow.apache.org) data between processes.
20//!
21//! Please see the [arrow-flight crates.io](https://crates.io/crates/arrow-flight)
22//! page for feature flags and more information.
23//!
24//! # Overview
25//!
26//! This crate contains:
27//!
28//! 1. Low level [prost] generated structs
29//!    for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`],
30//!    [`Location`] and [`Ticket`].
31//!
32//! 2. Low level [tonic] generated [`flight_service_client`] and
33//!    [`flight_service_server`].
34//!
35//! 3. Experimental support for [Flight SQL] in [`sql`]. Requires the
36//!    `flight-sql-experimental` feature of this crate to be activated.
37//!
38//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
39#![allow(rustdoc::invalid_html_tags)]
40#![warn(missing_docs)]
41// The unused_crate_dependencies lint does not work well for crates defining additional examples/bin targets
42#![allow(unused_crate_dependencies)]
43
44use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
45use arrow_schema::{ArrowError, Schema};
46
47use arrow_ipc::convert::try_schema_from_ipc_buffer;
48use base64::prelude::BASE64_STANDARD;
49use base64::Engine;
50use bytes::Bytes;
51use prost_types::Timestamp;
52use std::{fmt, ops::Deref};
53
54type ArrowResult<T> = std::result::Result<T, ArrowError>;
55
56#[allow(clippy::all)]
57mod gen {
58    // Since this file is auto-generated, we suppress all warnings
59    #![allow(missing_docs)]
60    include!("arrow.flight.protocol.rs");
61}
62
63/// Defines a `Flight` for generation or retrieval.
64pub mod flight_descriptor {
65    use super::gen;
66    pub use gen::flight_descriptor::DescriptorType;
67}
68
69/// Low Level [tonic] [`FlightServiceClient`](gen::flight_service_client::FlightServiceClient).
70pub mod flight_service_client {
71    use super::gen;
72    pub use gen::flight_service_client::FlightServiceClient;
73}
74
75/// Low Level [tonic] [`FlightServiceServer`](gen::flight_service_server::FlightServiceServer)
76/// and [`FlightService`](gen::flight_service_server::FlightService).
77pub mod flight_service_server {
78    use super::gen;
79    pub use gen::flight_service_server::FlightService;
80    pub use gen::flight_service_server::FlightServiceServer;
81}
82
83/// Mid Level [`FlightClient`]
84pub mod client;
85pub use client::FlightClient;
86
87/// Decoder to create [`RecordBatch`](arrow_array::RecordBatch) streams from [`FlightData`] streams.
88/// See [`FlightRecordBatchStream`](decode::FlightRecordBatchStream).
89pub mod decode;
90
91/// Encoder to create [`FlightData`] streams from [`RecordBatch`](arrow_array::RecordBatch) streams.
92/// See [`FlightDataEncoderBuilder`](encode::FlightDataEncoderBuilder).
93pub mod encode;
94
95/// Common error types
96pub mod error;
97
98pub use gen::Action;
99pub use gen::ActionType;
100pub use gen::BasicAuth;
101pub use gen::CancelFlightInfoRequest;
102pub use gen::CancelFlightInfoResult;
103pub use gen::CancelStatus;
104pub use gen::Criteria;
105pub use gen::Empty;
106pub use gen::FlightData;
107pub use gen::FlightDescriptor;
108pub use gen::FlightEndpoint;
109pub use gen::FlightInfo;
110pub use gen::HandshakeRequest;
111pub use gen::HandshakeResponse;
112pub use gen::Location;
113pub use gen::PollInfo;
114pub use gen::PutResult;
115pub use gen::RenewFlightEndpointRequest;
116pub use gen::Result;
117pub use gen::SchemaResult;
118pub use gen::Ticket;
119
120/// Helper to extract HTTP/gRPC trailers from a tonic stream.
121mod trailers;
122
123pub mod utils;
124
125#[cfg(feature = "flight-sql-experimental")]
126pub mod sql;
127mod streams;
128
129use flight_descriptor::DescriptorType;
130
131/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
132pub struct SchemaAsIpc<'a> {
133    /// Data type representing a schema and its IPC write options
134    pub pair: (&'a Schema, &'a IpcWriteOptions),
135}
136
137/// IpcMessage represents a `Schema` in the format expected in
138/// `FlightInfo.schema`
139#[derive(Debug)]
140pub struct IpcMessage(pub Bytes);
141
142// Useful conversion functions
143
144fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
145    let data_gen = writer::IpcDataGenerator::default();
146    #[allow(deprecated)]
147    let mut dict_tracker =
148        writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
149    data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
150}
151
152fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
153    let encoded_data = flight_schema_as_encoded_data(schema, options);
154    IpcMessage(encoded_data.ipc_message.into())
155}
156
157// Implement a bunch of useful traits for various conversions, displays,
158// etc...
159
160// Deref
161
162impl Deref for IpcMessage {
163    type Target = [u8];
164
165    fn deref(&self) -> &Self::Target {
166        &self.0
167    }
168}
169
170impl<'a> Deref for SchemaAsIpc<'a> {
171    type Target = (&'a Schema, &'a IpcWriteOptions);
172
173    fn deref(&self) -> &Self::Target {
174        &self.pair
175    }
176}
177
178// Display...
179
180/// Limits the output of value to limit...
181fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
182    if value.len() > limit {
183        write!(f, "{:?}", &value[..limit])
184    } else {
185        write!(f, "{:?}", &value)
186    }
187}
188
189impl fmt::Display for FlightData {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        write!(f, "FlightData {{")?;
192        write!(f, " descriptor: ")?;
193        match &self.flight_descriptor {
194            Some(d) => write!(f, "{d}")?,
195            None => write!(f, "None")?,
196        };
197        write!(f, ", header: ")?;
198        limited_fmt(f, &self.data_header, 8)?;
199        write!(f, ", metadata: ")?;
200        limited_fmt(f, &self.app_metadata, 8)?;
201        write!(f, ", body: ")?;
202        limited_fmt(f, &self.data_body, 8)?;
203        write!(f, " }}")
204    }
205}
206
207impl fmt::Display for FlightDescriptor {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        write!(f, "FlightDescriptor {{")?;
210        write!(f, " type: ")?;
211        match self.r#type() {
212            DescriptorType::Cmd => {
213                write!(f, "cmd, value: ")?;
214                limited_fmt(f, &self.cmd, 8)?;
215            }
216            DescriptorType::Path => {
217                write!(f, "path: [")?;
218                let mut sep = "";
219                for element in &self.path {
220                    write!(f, "{sep}{element}")?;
221                    sep = ", ";
222                }
223                write!(f, "]")?;
224            }
225            DescriptorType::Unknown => {
226                write!(f, "unknown")?;
227            }
228        }
229        write!(f, " }}")
230    }
231}
232
233impl fmt::Display for FlightEndpoint {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        write!(f, "FlightEndpoint {{")?;
236        write!(f, " ticket: ")?;
237        match &self.ticket {
238            Some(value) => write!(f, "{value}"),
239            None => write!(f, " None"),
240        }?;
241        write!(f, ", location: [")?;
242        let mut sep = "";
243        for location in &self.location {
244            write!(f, "{sep}{location}")?;
245            sep = ", ";
246        }
247        write!(f, "]")?;
248        write!(f, ", expiration_time:")?;
249        match &self.expiration_time {
250            Some(value) => write!(f, " {value}"),
251            None => write!(f, " None"),
252        }?;
253        write!(f, ", app_metadata: ")?;
254        limited_fmt(f, &self.app_metadata, 8)?;
255        write!(f, " }}")
256    }
257}
258
259impl fmt::Display for FlightInfo {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        let ipc_message = IpcMessage(self.schema.clone());
262        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
263        write!(f, "FlightInfo {{")?;
264        write!(f, " schema: {schema}")?;
265        write!(f, ", descriptor:")?;
266        match &self.flight_descriptor {
267            Some(d) => write!(f, " {d}"),
268            None => write!(f, " None"),
269        }?;
270        write!(f, ", endpoint: [")?;
271        let mut sep = "";
272        for endpoint in &self.endpoint {
273            write!(f, "{sep}{endpoint}")?;
274            sep = ", ";
275        }
276        write!(f, "], total_records: {}", self.total_records)?;
277        write!(f, ", total_bytes: {}", self.total_bytes)?;
278        write!(f, ", ordered: {}", self.ordered)?;
279        write!(f, ", app_metadata: ")?;
280        limited_fmt(f, &self.app_metadata, 8)?;
281        write!(f, " }}")
282    }
283}
284
285impl fmt::Display for PollInfo {
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287        write!(f, "PollInfo {{")?;
288        write!(f, " info:")?;
289        match &self.info {
290            Some(value) => write!(f, " {value}"),
291            None => write!(f, " None"),
292        }?;
293        write!(f, ", descriptor:")?;
294        match &self.flight_descriptor {
295            Some(d) => write!(f, " {d}"),
296            None => write!(f, " None"),
297        }?;
298        write!(f, ", progress:")?;
299        match &self.progress {
300            Some(value) => write!(f, " {value}"),
301            None => write!(f, " None"),
302        }?;
303        write!(f, ", expiration_time:")?;
304        match &self.expiration_time {
305            Some(value) => write!(f, " {value}"),
306            None => write!(f, " None"),
307        }?;
308        write!(f, " }}")
309    }
310}
311
312impl fmt::Display for CancelFlightInfoRequest {
313    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
314        write!(f, "CancelFlightInfoRequest {{")?;
315        write!(f, " info: ")?;
316        match &self.info {
317            Some(value) => write!(f, "{value}")?,
318            None => write!(f, "None")?,
319        };
320        write!(f, " }}")
321    }
322}
323
324impl fmt::Display for CancelFlightInfoResult {
325    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326        write!(f, "CancelFlightInfoResult {{")?;
327        write!(f, " status: {}", self.status().as_str_name())?;
328        write!(f, " }}")
329    }
330}
331
332impl fmt::Display for RenewFlightEndpointRequest {
333    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334        write!(f, "RenewFlightEndpointRequest {{")?;
335        write!(f, " endpoint: ")?;
336        match &self.endpoint {
337            Some(value) => write!(f, "{value}")?,
338            None => write!(f, "None")?,
339        };
340        write!(f, " }}")
341    }
342}
343
344impl fmt::Display for Location {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        write!(f, "Location {{")?;
347        write!(f, " uri: ")?;
348        write!(f, "{}", self.uri)
349    }
350}
351
352impl fmt::Display for Ticket {
353    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354        write!(f, "Ticket {{")?;
355        write!(f, " ticket: ")?;
356        write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
357    }
358}
359
360// From...
361
362impl From<EncodedData> for FlightData {
363    fn from(data: EncodedData) -> Self {
364        FlightData {
365            data_header: data.ipc_message.into(),
366            data_body: data.arrow_data.into(),
367            ..Default::default()
368        }
369    }
370}
371
372impl From<SchemaAsIpc<'_>> for FlightData {
373    fn from(schema_ipc: SchemaAsIpc) -> Self {
374        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
375        FlightData {
376            data_header: vals,
377            ..Default::default()
378        }
379    }
380}
381
382impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
383    type Error = ArrowError;
384
385    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
386        // According to the definition from `Flight.proto`
387        // The schema of the dataset in its IPC form:
388        //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
389        //   4 bytes - the byte length of the payload
390        //   a flatbuffer Message whose header is the Schema
391        let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
392        Ok(SchemaResult { schema: vals })
393    }
394}
395
396impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
397    type Error = ArrowError;
398
399    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
400        schema_to_ipc_format(schema_ipc)
401    }
402}
403
404fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
405    let pair = *schema_ipc;
406    let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
407
408    let mut schema = vec![];
409    writer::write_message(&mut schema, encoded_data, pair.1)?;
410    Ok(IpcMessage(schema.into()))
411}
412
413impl TryFrom<&FlightData> for Schema {
414    type Error = ArrowError;
415    fn try_from(data: &FlightData) -> ArrowResult<Self> {
416        convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
417            ArrowError::ParseError(format!(
418                "Unable to convert flight data to Arrow schema: {err}"
419            ))
420        })
421    }
422}
423
424impl TryFrom<FlightInfo> for Schema {
425    type Error = ArrowError;
426
427    fn try_from(value: FlightInfo) -> ArrowResult<Self> {
428        value.try_decode_schema()
429    }
430}
431
432impl TryFrom<IpcMessage> for Schema {
433    type Error = ArrowError;
434
435    fn try_from(value: IpcMessage) -> ArrowResult<Self> {
436        try_schema_from_ipc_buffer(&value)
437    }
438}
439
440impl TryFrom<&SchemaResult> for Schema {
441    type Error = ArrowError;
442    fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
443        try_schema_from_ipc_buffer(&data.schema)
444    }
445}
446
447impl TryFrom<SchemaResult> for Schema {
448    type Error = ArrowError;
449    fn try_from(data: SchemaResult) -> ArrowResult<Self> {
450        (&data).try_into()
451    }
452}
453
454// FlightData, FlightDescriptor, etc..
455
456impl FlightData {
457    /// Create a new [`FlightData`].
458    ///
459    /// # See Also
460    ///
461    /// See [`FlightDataEncoderBuilder`] for a higher level API to
462    /// convert a stream of [`RecordBatch`]es to [`FlightData`]s
463    ///
464    /// # Example:
465    ///
466    /// ```
467    /// # use bytes::Bytes;
468    /// # use arrow_flight::{FlightData, FlightDescriptor};
469    /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data
470    /// // Get encoded Arrow IPC data:
471    /// let data_body: Bytes = encode_data();
472    /// // Create the FlightData message
473    /// let flight_data = FlightData::new()
474    ///   .with_descriptor(FlightDescriptor::new_cmd("the command"))
475    ///   .with_app_metadata("My apps metadata")
476    ///   .with_data_body(data_body);
477    /// ```
478    ///
479    /// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder
480    /// [`RecordBatch`]: arrow_array::RecordBatch
481    pub fn new() -> Self {
482        Default::default()
483    }
484
485    /// Add a [`FlightDescriptor`] describing the data
486    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
487        self.flight_descriptor = Some(flight_descriptor);
488        self
489    }
490
491    /// Add a data header
492    pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
493        self.data_header = data_header.into();
494        self
495    }
496
497    /// Add a data body. See [`IpcDataGenerator`] to create this data.
498    ///
499    /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator
500    pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
501        self.data_body = data_body.into();
502        self
503    }
504
505    /// Add optional application specific metadata to the message
506    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
507        self.app_metadata = app_metadata.into();
508        self
509    }
510}
511
512impl FlightDescriptor {
513    /// Create a new opaque command [`CMD`] `FlightDescriptor` to generate a dataset.
514    ///
515    /// [`CMD`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L224-L227
516    pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
517        FlightDescriptor {
518            r#type: DescriptorType::Cmd.into(),
519            cmd: cmd.into(),
520            ..Default::default()
521        }
522    }
523
524    /// Create a new named path [`PATH`] `FlightDescriptor` that identifies a dataset
525    ///
526    /// [`PATH`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L217-L222
527    pub fn new_path(path: Vec<String>) -> Self {
528        FlightDescriptor {
529            r#type: DescriptorType::Path.into(),
530            path,
531            ..Default::default()
532        }
533    }
534}
535
536impl FlightInfo {
537    /// Create a new, empty `FlightInfo`, describing where to fetch flight data
538    ///
539    ///
540    /// # Example:
541    /// ```
542    /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint};
543    /// # use arrow_schema::{Schema, Field, DataType};
544    /// # fn get_schema() -> Schema {
545    /// #   Schema::new(vec![
546    /// #     Field::new("a", DataType::Utf8, false),
547    /// #   ])
548    /// # }
549    /// #
550    /// // Create a new FlightInfo
551    /// let flight_info = FlightInfo::new()
552    ///   // Encode the Arrow schema
553    ///   .try_with_schema(&get_schema())
554    ///   .expect("encoding failed")
555    ///   .with_endpoint(
556    ///      FlightEndpoint::new()
557    ///        .with_ticket(Ticket::new("ticket contents")
558    ///      )
559    ///    )
560    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"));
561    /// ```
562    pub fn new() -> FlightInfo {
563        FlightInfo {
564            schema: Bytes::new(),
565            flight_descriptor: None,
566            endpoint: vec![],
567            ordered: false,
568            // Flight says "Set these to -1 if unknown."
569            //
570            // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289
571            total_records: -1,
572            total_bytes: -1,
573            app_metadata: Bytes::new(),
574        }
575    }
576
577    /// Try and convert the data in this  `FlightInfo` into a [`Schema`]
578    pub fn try_decode_schema(self) -> ArrowResult<Schema> {
579        let msg = IpcMessage(self.schema);
580        msg.try_into()
581    }
582
583    /// Specify the schema for the response.
584    ///
585    /// Note this takes the arrow [`Schema`] (not the IPC schema) and
586    /// encodes it using the default IPC options.
587    ///
588    /// Returns an error if `schema` can not be encoded into IPC form.
589    pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
590        let options = IpcWriteOptions::default();
591        let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
592        self.schema = schema;
593        Ok(self)
594    }
595
596    /// Add specific a endpoint for fetching the data
597    pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
598        self.endpoint.push(endpoint);
599        self
600    }
601
602    /// Add a [`FlightDescriptor`] describing what this data is
603    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
604        self.flight_descriptor = Some(flight_descriptor);
605        self
606    }
607
608    /// Set the number of records in the result, if known
609    pub fn with_total_records(mut self, total_records: i64) -> Self {
610        self.total_records = total_records;
611        self
612    }
613
614    /// Set the number of bytes in the result, if known
615    pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
616        self.total_bytes = total_bytes;
617        self
618    }
619
620    /// Specify if the response is [ordered] across endpoints
621    ///
622    /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275
623    pub fn with_ordered(mut self, ordered: bool) -> Self {
624        self.ordered = ordered;
625        self
626    }
627
628    /// Add optional application specific metadata to the message
629    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
630        self.app_metadata = app_metadata.into();
631        self
632    }
633}
634
635impl PollInfo {
636    /// Create a new, empty [`PollInfo`], providing information for a long-running query
637    ///
638    /// # Example:
639    /// ```
640    /// # use arrow_flight::{FlightInfo, PollInfo, FlightDescriptor};
641    /// # use prost_types::Timestamp;
642    /// // Create a new PollInfo
643    /// let poll_info = PollInfo::new()
644    ///   .with_info(FlightInfo::new())
645    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"))
646    ///   .try_with_progress(0.5)
647    ///   .expect("progress should've been valid")
648    ///   .with_expiration_time(
649    ///     "1970-01-01".parse().expect("invalid timestamp")
650    ///   );
651    /// ```
652    pub fn new() -> Self {
653        Self {
654            info: None,
655            flight_descriptor: None,
656            progress: None,
657            expiration_time: None,
658        }
659    }
660
661    /// Add the current available results for the poll call as a [`FlightInfo`]
662    pub fn with_info(mut self, info: FlightInfo) -> Self {
663        self.info = Some(info);
664        self
665    }
666
667    /// Add a [`FlightDescriptor`] that the client should use for the next poll call,
668    /// if the query is not yet complete
669    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
670        self.flight_descriptor = Some(flight_descriptor);
671        self
672    }
673
674    /// Set the query progress if known. Must be in the range [0.0, 1.0] else this will
675    /// return an error
676    pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
677        if !(0.0..=1.0).contains(&progress) {
678            return Err(ArrowError::InvalidArgumentError(format!(
679                "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
680            )));
681        }
682        self.progress = Some(progress);
683        Ok(self)
684    }
685
686    /// Specify expiration time for this request
687    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
688        self.expiration_time = Some(expiration_time);
689        self
690    }
691}
692
693impl<'a> SchemaAsIpc<'a> {
694    /// Create a new `SchemaAsIpc` from a `Schema` and `IpcWriteOptions`
695    pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
696        SchemaAsIpc {
697            pair: (schema, options),
698        }
699    }
700}
701
702impl CancelFlightInfoRequest {
703    /// Create a new [`CancelFlightInfoRequest`], providing the [`FlightInfo`]
704    /// of the query to cancel.
705    pub fn new(info: FlightInfo) -> Self {
706        Self { info: Some(info) }
707    }
708}
709
710impl CancelFlightInfoResult {
711    /// Create a new [`CancelFlightInfoResult`] from the provided [`CancelStatus`].
712    pub fn new(status: CancelStatus) -> Self {
713        Self {
714            status: status as i32,
715        }
716    }
717}
718
719impl RenewFlightEndpointRequest {
720    /// Create a new [`RenewFlightEndpointRequest`], providing the [`FlightEndpoint`]
721    /// for which is being requested an extension of its expiration.
722    pub fn new(endpoint: FlightEndpoint) -> Self {
723        Self {
724            endpoint: Some(endpoint),
725        }
726    }
727}
728
729impl Action {
730    /// Create a new Action with type and body
731    pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
732        Self {
733            r#type: action_type.into(),
734            body: body.into(),
735        }
736    }
737}
738
739impl Result {
740    /// Create a new Result with the specified body
741    pub fn new(body: impl Into<Bytes>) -> Self {
742        Self { body: body.into() }
743    }
744}
745
746impl Ticket {
747    /// Create a new `Ticket`
748    ///
749    /// # Example
750    ///
751    /// ```
752    /// # use arrow_flight::Ticket;
753    /// let ticket = Ticket::new("SELECT * from FOO");
754    /// ```
755    pub fn new(ticket: impl Into<Bytes>) -> Self {
756        Self {
757            ticket: ticket.into(),
758        }
759    }
760}
761
762impl FlightEndpoint {
763    /// Create a new, empty `FlightEndpoint` that represents a location
764    /// to retrieve Flight results.
765    ///
766    /// # Example
767    /// ```
768    /// # use arrow_flight::{FlightEndpoint, Ticket};
769    /// #
770    /// // Specify the client should fetch results from this server
771    /// let endpoint = FlightEndpoint::new()
772    ///   .with_ticket(Ticket::new("the ticket"));
773    ///
774    /// // Specify the client should fetch results from either
775    /// // `http://example.com` or `https://example.com`
776    /// let endpoint = FlightEndpoint::new()
777    ///   .with_ticket(Ticket::new("the ticket"))
778    ///   .with_location("http://example.com")
779    ///   .with_location("https://example.com");
780    /// ```
781    pub fn new() -> FlightEndpoint {
782        Default::default()
783    }
784
785    /// Set the [`Ticket`] used to retrieve data from the endpoint
786    pub fn with_ticket(mut self, ticket: Ticket) -> Self {
787        self.ticket = Some(ticket);
788        self
789    }
790
791    /// Add a location `uri` to this endpoint. Note each endpoint can
792    /// have multiple locations.
793    ///
794    /// If no `uri` is specified, the [Flight Spec] says:
795    ///
796    /// ```text
797    /// * If the list is empty, the expectation is that the ticket can only
798    /// * be redeemed on the current service where the ticket was
799    /// * generated.
800    /// ```
801    /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312
802    pub fn with_location(mut self, uri: impl Into<String>) -> Self {
803        self.location.push(Location { uri: uri.into() });
804        self
805    }
806
807    /// Specify expiration time for this stream
808    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
809        self.expiration_time = Some(expiration_time);
810        self
811    }
812
813    /// Add optional application specific metadata to the message
814    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
815        self.app_metadata = app_metadata.into();
816        self
817    }
818}
819
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use arrow_ipc::MetadataVersion;
824    use arrow_schema::{DataType, Field, TimeUnit};
825
826    struct TestVector(Vec<u8>, usize);
827
828    impl fmt::Display for TestVector {
829        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
830            limited_fmt(f, &self.0, self.1)
831        }
832    }
833
834    #[test]
835    fn it_creates_flight_descriptor_command() {
836        let expected_cmd = "my_command".as_bytes();
837        let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
838        assert_eq!(fd.r#type(), DescriptorType::Cmd);
839        assert_eq!(fd.cmd, expected_cmd.to_vec());
840    }
841
842    #[test]
843    fn it_accepts_equal_output() {
844        let input = TestVector(vec![91; 10], 10);
845
846        let actual = format!("{input}");
847        let expected = format!("{:?}", vec![91; 10]);
848        assert_eq!(actual, expected);
849    }
850
851    #[test]
852    fn it_accepts_short_output() {
853        let input = TestVector(vec![91; 6], 10);
854
855        let actual = format!("{input}");
856        let expected = format!("{:?}", vec![91; 6]);
857        assert_eq!(actual, expected);
858    }
859
860    #[test]
861    fn it_accepts_long_output() {
862        let input = TestVector(vec![91; 10], 9);
863
864        let actual = format!("{input}");
865        let expected = format!("{:?}", vec![91; 9]);
866        assert_eq!(actual, expected);
867    }
868
869    #[test]
870    fn ser_deser_schema_result() {
871        let schema = Schema::new(vec![
872            Field::new("c1", DataType::Utf8, false),
873            Field::new("c2", DataType::Float64, true),
874            Field::new("c3", DataType::UInt32, false),
875            Field::new("c4", DataType::Boolean, true),
876            Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
877            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
878        ]);
879        // V5 with write_legacy_ipc_format = false
880        // this will write the continuation marker
881        let option = IpcWriteOptions::default();
882        let schema_ipc = SchemaAsIpc::new(&schema, &option);
883        let result: SchemaResult = schema_ipc.try_into().unwrap();
884        let des_schema: Schema = (&result).try_into().unwrap();
885        assert_eq!(schema, des_schema);
886
887        // V4 with write_legacy_ipc_format = true
888        // this will not write the continuation marker
889        let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
890        let schema_ipc = SchemaAsIpc::new(&schema, &option);
891        let result: SchemaResult = schema_ipc.try_into().unwrap();
892        let des_schema: Schema = (&result).try_into().unwrap();
893        assert_eq!(schema, des_schema);
894    }
895}