001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.partition; 023 024import java.util.regex.Pattern; 025 026import cascading.tuple.Fields; 027import cascading.tuple.TupleEntry; 028import cascading.util.Util; 029 030/** 031 * DelimitedPartition is an implementation of the {@link Partition} interface that allows for simple 032 * text delimited paths as partitions. 033 * <p> 034 * For example, given the delimiter {@code -} (dash), a partition path will have dashes. 035 * <p> 036 * Note the delimiter must not be naturally present in any of the values making up the partition. 037 * <p> 038 * The postfix value will be appended to any partition when created, and removed when the partition is parsed. Use 039 * this value to add static filenames to the output path. It is safe to include the delimiter in the postfix value 040 * (e.g '/somepath/filename.csv' where the delimiter is the default '/'). 041 * <p> 042 * Note some platforms do not allow for referencing files directly on write, only allowing for partitioning into 043 * directories where the actual filename is generated. In this case, if the postfix is intended to be a filename, it 044 * will be interpreted as a directory. 045 */ 046public class DelimitedPartition implements Partition 047 { 048 public static final String PATH_DELIM = "/"; 049 050 final Fields partitionFields; 051 final String delimiter; 052 final String postfix; 053 054 int numSplits; 055 056 transient Pattern pattern; 057 058 public DelimitedPartition( Fields partitionFields ) 059 { 060 this( partitionFields, null, null ); 061 } 062 063 public DelimitedPartition( Fields partitionFields, String delimiter ) 064 { 065 this( partitionFields, delimiter, null ); 066 } 067 068 public DelimitedPartition( Fields partitionFields, String delimiter, String postfix ) 069 { 070 if( partitionFields == null ) 071 throw new IllegalArgumentException( "partitionFields must not be null" ); 072 073 if( !partitionFields.isDefined() ) 074 throw new IllegalArgumentException( "partitionFields must be defined, got: " + partitionFields.printVerbose() ); 075 076 this.partitionFields = partitionFields; 077 this.delimiter = delimiter == null ? PATH_DELIM : delimiter; 078 079 postfix = Util.isEmpty( postfix ) ? null : postfix.startsWith( this.delimiter ) ? postfix.substring( this.delimiter.length() ) : postfix; 080 081 this.numSplits = partitionFields.size() + ( postfix != null ? postfix.split( this.delimiter ).length : 0 ); 082 this.postfix = postfix == null ? null : delimiter + postfix; // prefix the postfix w/ the delimiter 083 } 084 085 @Override 086 public int getPathDepth() 087 { 088 return numSplits; 089 } 090 091 @Override 092 public Fields getPartitionFields() 093 { 094 return partitionFields; 095 } 096 097 protected Pattern getPattern() 098 { 099 if( pattern == null ) 100 pattern = Pattern.compile( delimiter ); 101 102 return pattern; 103 } 104 105 public String getDelimiter() 106 { 107 return delimiter; 108 } 109 110 public String getPostfix() 111 { 112 return postfix; 113 } 114 115 @Override 116 public void toTuple( String partition, TupleEntry tupleEntry ) 117 { 118 if( partition.startsWith( delimiter ) ) 119 partition = partition.substring( 1 ); 120 121 String[] split = getPattern().split( partition, numSplits ); 122 123 tupleEntry.setCanonicalValues( split, 0, partitionFields.size() ); 124 } 125 126 @Override 127 public String toPartition( TupleEntry tupleEntry ) 128 { 129 String partition = Util.join( tupleEntry.asIterableOf( String.class ), delimiter, true ); 130 131 if( postfix != null ) 132 partition = partition + postfix; // delimiter prefixed in ctor 133 134 return partition; 135 } 136 }