1#![allow(rustdoc::invalid_html_tags)]
40#![warn(missing_docs)]
41#![allow(unused_crate_dependencies)]
43
44use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
45use arrow_schema::{ArrowError, Schema};
46
47use arrow_ipc::convert::try_schema_from_ipc_buffer;
48use base64::prelude::BASE64_STANDARD;
49use base64::Engine;
50use bytes::Bytes;
51use prost_types::Timestamp;
52use std::{fmt, ops::Deref};
53
54type ArrowResult<T> = std::result::Result<T, ArrowError>;
55
56#[allow(clippy::all)]
57mod gen {
58 #![allow(missing_docs)]
60 include!("arrow.flight.protocol.rs");
61}
62
63pub mod flight_descriptor {
65 use super::gen;
66 pub use gen::flight_descriptor::DescriptorType;
67}
68
69pub mod flight_service_client {
71 use super::gen;
72 pub use gen::flight_service_client::FlightServiceClient;
73}
74
75pub mod flight_service_server {
78 use super::gen;
79 pub use gen::flight_service_server::FlightService;
80 pub use gen::flight_service_server::FlightServiceServer;
81}
82
83pub mod client;
85pub use client::FlightClient;
86
87pub mod decode;
90
91pub mod encode;
94
95pub mod error;
97
98pub use gen::Action;
99pub use gen::ActionType;
100pub use gen::BasicAuth;
101pub use gen::CancelFlightInfoRequest;
102pub use gen::CancelFlightInfoResult;
103pub use gen::CancelStatus;
104pub use gen::Criteria;
105pub use gen::Empty;
106pub use gen::FlightData;
107pub use gen::FlightDescriptor;
108pub use gen::FlightEndpoint;
109pub use gen::FlightInfo;
110pub use gen::HandshakeRequest;
111pub use gen::HandshakeResponse;
112pub use gen::Location;
113pub use gen::PollInfo;
114pub use gen::PutResult;
115pub use gen::RenewFlightEndpointRequest;
116pub use gen::Result;
117pub use gen::SchemaResult;
118pub use gen::Ticket;
119
120mod trailers;
122
123pub mod utils;
124
125#[cfg(feature = "flight-sql-experimental")]
126pub mod sql;
127mod streams;
128
129use flight_descriptor::DescriptorType;
130
131pub struct SchemaAsIpc<'a> {
133 pub pair: (&'a Schema, &'a IpcWriteOptions),
135}
136
137#[derive(Debug)]
140pub struct IpcMessage(pub Bytes);
141
142fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
145 let data_gen = writer::IpcDataGenerator::default();
146 #[allow(deprecated)]
147 let mut dict_tracker =
148 writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
149 data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
150}
151
152fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
153 let encoded_data = flight_schema_as_encoded_data(schema, options);
154 IpcMessage(encoded_data.ipc_message.into())
155}
156
157impl Deref for IpcMessage {
163 type Target = [u8];
164
165 fn deref(&self) -> &Self::Target {
166 &self.0
167 }
168}
169
170impl<'a> Deref for SchemaAsIpc<'a> {
171 type Target = (&'a Schema, &'a IpcWriteOptions);
172
173 fn deref(&self) -> &Self::Target {
174 &self.pair
175 }
176}
177
178fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
182 if value.len() > limit {
183 write!(f, "{:?}", &value[..limit])
184 } else {
185 write!(f, "{:?}", &value)
186 }
187}
188
189impl fmt::Display for FlightData {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 write!(f, "FlightData {{")?;
192 write!(f, " descriptor: ")?;
193 match &self.flight_descriptor {
194 Some(d) => write!(f, "{d}")?,
195 None => write!(f, "None")?,
196 };
197 write!(f, ", header: ")?;
198 limited_fmt(f, &self.data_header, 8)?;
199 write!(f, ", metadata: ")?;
200 limited_fmt(f, &self.app_metadata, 8)?;
201 write!(f, ", body: ")?;
202 limited_fmt(f, &self.data_body, 8)?;
203 write!(f, " }}")
204 }
205}
206
207impl fmt::Display for FlightDescriptor {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 write!(f, "FlightDescriptor {{")?;
210 write!(f, " type: ")?;
211 match self.r#type() {
212 DescriptorType::Cmd => {
213 write!(f, "cmd, value: ")?;
214 limited_fmt(f, &self.cmd, 8)?;
215 }
216 DescriptorType::Path => {
217 write!(f, "path: [")?;
218 let mut sep = "";
219 for element in &self.path {
220 write!(f, "{sep}{element}")?;
221 sep = ", ";
222 }
223 write!(f, "]")?;
224 }
225 DescriptorType::Unknown => {
226 write!(f, "unknown")?;
227 }
228 }
229 write!(f, " }}")
230 }
231}
232
233impl fmt::Display for FlightEndpoint {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 write!(f, "FlightEndpoint {{")?;
236 write!(f, " ticket: ")?;
237 match &self.ticket {
238 Some(value) => write!(f, "{value}"),
239 None => write!(f, " None"),
240 }?;
241 write!(f, ", location: [")?;
242 let mut sep = "";
243 for location in &self.location {
244 write!(f, "{sep}{location}")?;
245 sep = ", ";
246 }
247 write!(f, "]")?;
248 write!(f, ", expiration_time:")?;
249 match &self.expiration_time {
250 Some(value) => write!(f, " {value}"),
251 None => write!(f, " None"),
252 }?;
253 write!(f, ", app_metadata: ")?;
254 limited_fmt(f, &self.app_metadata, 8)?;
255 write!(f, " }}")
256 }
257}
258
259impl fmt::Display for FlightInfo {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 let ipc_message = IpcMessage(self.schema.clone());
262 let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
263 write!(f, "FlightInfo {{")?;
264 write!(f, " schema: {schema}")?;
265 write!(f, ", descriptor:")?;
266 match &self.flight_descriptor {
267 Some(d) => write!(f, " {d}"),
268 None => write!(f, " None"),
269 }?;
270 write!(f, ", endpoint: [")?;
271 let mut sep = "";
272 for endpoint in &self.endpoint {
273 write!(f, "{sep}{endpoint}")?;
274 sep = ", ";
275 }
276 write!(f, "], total_records: {}", self.total_records)?;
277 write!(f, ", total_bytes: {}", self.total_bytes)?;
278 write!(f, ", ordered: {}", self.ordered)?;
279 write!(f, ", app_metadata: ")?;
280 limited_fmt(f, &self.app_metadata, 8)?;
281 write!(f, " }}")
282 }
283}
284
285impl fmt::Display for PollInfo {
286 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287 write!(f, "PollInfo {{")?;
288 write!(f, " info:")?;
289 match &self.info {
290 Some(value) => write!(f, " {value}"),
291 None => write!(f, " None"),
292 }?;
293 write!(f, ", descriptor:")?;
294 match &self.flight_descriptor {
295 Some(d) => write!(f, " {d}"),
296 None => write!(f, " None"),
297 }?;
298 write!(f, ", progress:")?;
299 match &self.progress {
300 Some(value) => write!(f, " {value}"),
301 None => write!(f, " None"),
302 }?;
303 write!(f, ", expiration_time:")?;
304 match &self.expiration_time {
305 Some(value) => write!(f, " {value}"),
306 None => write!(f, " None"),
307 }?;
308 write!(f, " }}")
309 }
310}
311
312impl fmt::Display for CancelFlightInfoRequest {
313 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
314 write!(f, "CancelFlightInfoRequest {{")?;
315 write!(f, " info: ")?;
316 match &self.info {
317 Some(value) => write!(f, "{value}")?,
318 None => write!(f, "None")?,
319 };
320 write!(f, " }}")
321 }
322}
323
324impl fmt::Display for CancelFlightInfoResult {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 write!(f, "CancelFlightInfoResult {{")?;
327 write!(f, " status: {}", self.status().as_str_name())?;
328 write!(f, " }}")
329 }
330}
331
332impl fmt::Display for RenewFlightEndpointRequest {
333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 write!(f, "RenewFlightEndpointRequest {{")?;
335 write!(f, " endpoint: ")?;
336 match &self.endpoint {
337 Some(value) => write!(f, "{value}")?,
338 None => write!(f, "None")?,
339 };
340 write!(f, " }}")
341 }
342}
343
344impl fmt::Display for Location {
345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346 write!(f, "Location {{")?;
347 write!(f, " uri: ")?;
348 write!(f, "{}", self.uri)
349 }
350}
351
352impl fmt::Display for Ticket {
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 write!(f, "Ticket {{")?;
355 write!(f, " ticket: ")?;
356 write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
357 }
358}
359
360impl From<EncodedData> for FlightData {
363 fn from(data: EncodedData) -> Self {
364 FlightData {
365 data_header: data.ipc_message.into(),
366 data_body: data.arrow_data.into(),
367 ..Default::default()
368 }
369 }
370}
371
372impl From<SchemaAsIpc<'_>> for FlightData {
373 fn from(schema_ipc: SchemaAsIpc) -> Self {
374 let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
375 FlightData {
376 data_header: vals,
377 ..Default::default()
378 }
379 }
380}
381
382impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
383 type Error = ArrowError;
384
385 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
386 let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
392 Ok(SchemaResult { schema: vals })
393 }
394}
395
396impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
397 type Error = ArrowError;
398
399 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
400 schema_to_ipc_format(schema_ipc)
401 }
402}
403
404fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
405 let pair = *schema_ipc;
406 let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
407
408 let mut schema = vec![];
409 writer::write_message(&mut schema, encoded_data, pair.1)?;
410 Ok(IpcMessage(schema.into()))
411}
412
413impl TryFrom<&FlightData> for Schema {
414 type Error = ArrowError;
415 fn try_from(data: &FlightData) -> ArrowResult<Self> {
416 convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
417 ArrowError::ParseError(format!(
418 "Unable to convert flight data to Arrow schema: {err}"
419 ))
420 })
421 }
422}
423
424impl TryFrom<FlightInfo> for Schema {
425 type Error = ArrowError;
426
427 fn try_from(value: FlightInfo) -> ArrowResult<Self> {
428 value.try_decode_schema()
429 }
430}
431
432impl TryFrom<IpcMessage> for Schema {
433 type Error = ArrowError;
434
435 fn try_from(value: IpcMessage) -> ArrowResult<Self> {
436 try_schema_from_ipc_buffer(&value)
437 }
438}
439
440impl TryFrom<&SchemaResult> for Schema {
441 type Error = ArrowError;
442 fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
443 try_schema_from_ipc_buffer(&data.schema)
444 }
445}
446
447impl TryFrom<SchemaResult> for Schema {
448 type Error = ArrowError;
449 fn try_from(data: SchemaResult) -> ArrowResult<Self> {
450 (&data).try_into()
451 }
452}
453
454impl FlightData {
457 pub fn new() -> Self {
482 Default::default()
483 }
484
485 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
487 self.flight_descriptor = Some(flight_descriptor);
488 self
489 }
490
491 pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
493 self.data_header = data_header.into();
494 self
495 }
496
497 pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
501 self.data_body = data_body.into();
502 self
503 }
504
505 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
507 self.app_metadata = app_metadata.into();
508 self
509 }
510}
511
512impl FlightDescriptor {
513 pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
517 FlightDescriptor {
518 r#type: DescriptorType::Cmd.into(),
519 cmd: cmd.into(),
520 ..Default::default()
521 }
522 }
523
524 pub fn new_path(path: Vec<String>) -> Self {
528 FlightDescriptor {
529 r#type: DescriptorType::Path.into(),
530 path,
531 ..Default::default()
532 }
533 }
534}
535
536impl FlightInfo {
537 pub fn new() -> FlightInfo {
563 FlightInfo {
564 schema: Bytes::new(),
565 flight_descriptor: None,
566 endpoint: vec![],
567 ordered: false,
568 total_records: -1,
572 total_bytes: -1,
573 app_metadata: Bytes::new(),
574 }
575 }
576
577 pub fn try_decode_schema(self) -> ArrowResult<Schema> {
579 let msg = IpcMessage(self.schema);
580 msg.try_into()
581 }
582
583 pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
590 let options = IpcWriteOptions::default();
591 let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
592 self.schema = schema;
593 Ok(self)
594 }
595
596 pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
598 self.endpoint.push(endpoint);
599 self
600 }
601
602 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
604 self.flight_descriptor = Some(flight_descriptor);
605 self
606 }
607
608 pub fn with_total_records(mut self, total_records: i64) -> Self {
610 self.total_records = total_records;
611 self
612 }
613
614 pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
616 self.total_bytes = total_bytes;
617 self
618 }
619
620 pub fn with_ordered(mut self, ordered: bool) -> Self {
624 self.ordered = ordered;
625 self
626 }
627
628 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
630 self.app_metadata = app_metadata.into();
631 self
632 }
633}
634
635impl PollInfo {
636 pub fn new() -> Self {
653 Self {
654 info: None,
655 flight_descriptor: None,
656 progress: None,
657 expiration_time: None,
658 }
659 }
660
661 pub fn with_info(mut self, info: FlightInfo) -> Self {
663 self.info = Some(info);
664 self
665 }
666
667 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
670 self.flight_descriptor = Some(flight_descriptor);
671 self
672 }
673
674 pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
677 if !(0.0..=1.0).contains(&progress) {
678 return Err(ArrowError::InvalidArgumentError(format!(
679 "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
680 )));
681 }
682 self.progress = Some(progress);
683 Ok(self)
684 }
685
686 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
688 self.expiration_time = Some(expiration_time);
689 self
690 }
691}
692
693impl<'a> SchemaAsIpc<'a> {
694 pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
696 SchemaAsIpc {
697 pair: (schema, options),
698 }
699 }
700}
701
702impl CancelFlightInfoRequest {
703 pub fn new(info: FlightInfo) -> Self {
706 Self { info: Some(info) }
707 }
708}
709
710impl CancelFlightInfoResult {
711 pub fn new(status: CancelStatus) -> Self {
713 Self {
714 status: status as i32,
715 }
716 }
717}
718
719impl RenewFlightEndpointRequest {
720 pub fn new(endpoint: FlightEndpoint) -> Self {
723 Self {
724 endpoint: Some(endpoint),
725 }
726 }
727}
728
729impl Action {
730 pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
732 Self {
733 r#type: action_type.into(),
734 body: body.into(),
735 }
736 }
737}
738
739impl Result {
740 pub fn new(body: impl Into<Bytes>) -> Self {
742 Self { body: body.into() }
743 }
744}
745
746impl Ticket {
747 pub fn new(ticket: impl Into<Bytes>) -> Self {
756 Self {
757 ticket: ticket.into(),
758 }
759 }
760}
761
762impl FlightEndpoint {
763 pub fn new() -> FlightEndpoint {
782 Default::default()
783 }
784
785 pub fn with_ticket(mut self, ticket: Ticket) -> Self {
787 self.ticket = Some(ticket);
788 self
789 }
790
791 pub fn with_location(mut self, uri: impl Into<String>) -> Self {
803 self.location.push(Location { uri: uri.into() });
804 self
805 }
806
807 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
809 self.expiration_time = Some(expiration_time);
810 self
811 }
812
813 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
815 self.app_metadata = app_metadata.into();
816 self
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use arrow_ipc::MetadataVersion;
824 use arrow_schema::{DataType, Field, TimeUnit};
825
826 struct TestVector(Vec<u8>, usize);
827
828 impl fmt::Display for TestVector {
829 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
830 limited_fmt(f, &self.0, self.1)
831 }
832 }
833
834 #[test]
835 fn it_creates_flight_descriptor_command() {
836 let expected_cmd = "my_command".as_bytes();
837 let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
838 assert_eq!(fd.r#type(), DescriptorType::Cmd);
839 assert_eq!(fd.cmd, expected_cmd.to_vec());
840 }
841
842 #[test]
843 fn it_accepts_equal_output() {
844 let input = TestVector(vec![91; 10], 10);
845
846 let actual = format!("{input}");
847 let expected = format!("{:?}", vec![91; 10]);
848 assert_eq!(actual, expected);
849 }
850
851 #[test]
852 fn it_accepts_short_output() {
853 let input = TestVector(vec![91; 6], 10);
854
855 let actual = format!("{input}");
856 let expected = format!("{:?}", vec![91; 6]);
857 assert_eq!(actual, expected);
858 }
859
860 #[test]
861 fn it_accepts_long_output() {
862 let input = TestVector(vec![91; 10], 9);
863
864 let actual = format!("{input}");
865 let expected = format!("{:?}", vec![91; 9]);
866 assert_eq!(actual, expected);
867 }
868
869 #[test]
870 fn ser_deser_schema_result() {
871 let schema = Schema::new(vec![
872 Field::new("c1", DataType::Utf8, false),
873 Field::new("c2", DataType::Float64, true),
874 Field::new("c3", DataType::UInt32, false),
875 Field::new("c4", DataType::Boolean, true),
876 Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
877 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
878 ]);
879 let option = IpcWriteOptions::default();
882 let schema_ipc = SchemaAsIpc::new(&schema, &option);
883 let result: SchemaResult = schema_ipc.try_into().unwrap();
884 let des_schema: Schema = (&result).try_into().unwrap();
885 assert_eq!(schema, des_schema);
886
887 let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
890 let schema_ipc = SchemaAsIpc::new(&schema, &option);
891 let result: SchemaResult = schema_ipc.try_into().unwrap();
892 let des_schema: Schema = (&result).try_into().unwrap();
893 assert_eq!(schema, des_schema);
894 }
895}